Awesome
level-q
Priority queuing for leveldb/levelup
Installation
This module is installed via npm:
$ npm install level-q
Example Usage
Basic Usage
By default adding items to a queue will be ordered first in, first out.
var queue = require('level-q'),
level = require('level'),
bytewise = require('bytewise');
// instantiate queue (adds a 'queue' property on the db)
var q = queue(level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }));
// data to put in queue
var data = {
id: 1,
name: 'Eugene',
value: 42
};
// add to queue
q.queue.push(data);
// read a single item from the queue, then stop listening
q.queue.read(function (err, value) {
// value should equal data
});
// keep listening to the queue
q.queue.listen(function (err, value, key, next) {
// do something with the value
console.log(value);
// ok, done processing, get the next item from the queue
next();
});
Custom Priority Queue
By providing an order by
function you can return a key that can be used
to sort the queue. This example uses a quality of service (QOS) field
so that the higher the QOS, the earlier it will get serviced.
var queue = require('level-q'),
level = require('level'),
bytewise = require('bytewise');
// order the queue by the QOS field
var db = queue(level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }),
{ order: orderByQos });
function orderByQos(data) {
return data.QOS;
}
// data to put in queue
var data = [
// should be serviced last
{
id: 1,
QOS: 'C',
name: 'Eugene',
value: 42
},
// should be serviced first
{
id: 2,
QOS: 'A',
name: 'Eugene',
value: 42
},
// should be serviced second
{
id: 3,
QOS: 'B',
name: 'Eugene',
value: 42
}
];
// add to queue
var count = data.length;
data.forEach(function (item) {
db.queue.push(item, function (err) {
if (err) throw err;
--count || listen();
});
});
function listen() {
db.queue.listen(function (err, value, key, next) {
console.log(value);
// should print the second item, then the third item, then the first
next();
});
}
Control when an item is able to be released from the queue
You can use the release
option when instantiating your queue to control
when an item is able to be returned to consumers of the queue.
For example, you may wish to create a job queue, and delay jobs until to a scheduled time in the future (eg. like a cron job);
var bytewise = require('bytewise'),
level = require('level')
range = require('range'),
queue = require('level-q');
// create a priority queue based on time to next service
var db = queue(
level('/db/path', { keyEncoding: bytewise, valueEncoding: 'json' }),
{ order: orderByDeadline, release: releaseOnDeadline });
// sort queue by deadline
function orderByDeadline(data) {
return data.deadline;
}
// only allow an item to be read from queue if it's time is up
function releaseOnDeadline(data) {
return Date.now() >= data.deadline;
}
// add some work to the queue that will be scheduled 5 seconds in the future
db.queue.push({
deadline: Date.now() + 5000,
state: 'square'
n: 2,
squareResult: 0,
sinResult: 0
});
// The job will ONLY be available for processing once the 5 seconds has passed
db.queue.listen(function (err, value, key, next) {
if (err) throw err;
switch (value.state) {
case 'square':
// do the work
value.squareResult = value.n * value.n;
// schedule then next part of work 2 seconds in the future
value.state = 'sin';
value.deadline = Date.now() + 2000;
db.queue.push(value, next);
break;
case 'sin':
// do the work
value.sinResult = Math.sin(value.n);
console.log(value);
// Should print out:
// { deadline: 1390487269236,
// state: 'sin',
// n: 2,
// squareResult: 4,
// sinResult: 0.9092974268256817 }
next();
break;
}
});