Home

Awesome

Better Queue - Powerful flow control

npm package

Build status Dependency Status Known Vulnerabilities Gitter

Super simple to use

Better Queue is designed to be simple to set up but still let you do complex things.


Install (via npm)

npm install --save better-queue

Quick Example

var Queue = require('better-queue');

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
})

q.push(1)
q.push({ x: 1 })

Table of contents


You will be able to combine any (and all) of these options for your queue!

Queuing

It's very easy to push tasks into the queue.

var q = new Queue(fn);
q.push(1);
q.push({ x: 1, y: 2 });
q.push("hello");

You can also include a callback as a second parameter to the push function, which would be called when that task is done. For example:

var q = new Queue(fn);
q.push(1, function (err, result) {
  // Results from the task!
});

You can also listen to events on the results of the push call.

var q = new Queue(fn);
q.push(1)
  .on('finish', function (result) {
    // Task succeeded with {result}!
  })
  .on('failed', function (err) {
    // Task failed!
  })

Alternatively, you can subscribe to the queue's events.

var q = new Queue(fn);
q.on('task_finish', function (taskId, result, stats) {
  // taskId = 1, result: 3, stats = { elapsed: <time taken> }
  // taskId = 2, result: 5, stats = { elapsed: <time taken> }
})
q.on('task_failed', function (taskId, err, stats) {
  // Handle error, stats = { elapsed: <time taken> }
})
q.on('empty', function (){})
q.on('drain', function (){})
q.push({ id: 1, a: 1, b: 2 });
q.push({ id: 2, a: 2, b: 3 });

empty event fires when all of the tasks have been pulled off of the queue (there may still be tasks running!)

drain event fires when there are no more tasks on the queue and when no more tasks are running.

You can control how many tasks process at the same time.

var q = new Queue(fn, { concurrent: 3 })

Now the queue will allow 3 tasks running at the same time. (By default, we handle tasks one at a time.)

You can also turn the queue into a stack by turning on filo.

var q = new Queue(fn, { filo: true })

Now items you push on will be handled first.

back to top


Task Management

Task ID

Tasks can be given an ID to help identify and track it as it goes through the queue.

By default, we look for task.id to see if it's a string property, otherwise we generate a random ID for the task.

You can pass in an id property to options to change this behaviour. Here are some examples of how:

var q = new Queue(fn, {
  id: 'id',   // Default: task's `id` property
  id: 'name', // task's `name` property
  id: function (task, cb) {
    // Compute the ID
    cb(null, 'computed_id');
  }
})

One thing you can do with Task ID is merge tasks:

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
}, {
  merge: function (oldTask, newTask, cb) {
    oldTask.count += newTask.count;
    cb(null, oldTask);
  }
})
counter.push({ id: 'apple', count: 2 });
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 1 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

By default, if tasks have the same ID they replace the previous task.

var counter = new Queue(function (task, cb) {
  console.log("I have %d %ss.", task.count, task.id);
  cb();
})
counter.push({ id: 'apple', count: 1 });
counter.push({ id: 'apple', count: 3 });
counter.push({ id: 'orange', count: 1 });
counter.push({ id: 'orange', count: 2 });
// Prints out:
//   I have 3 apples.
//   I have 2 oranges.

You can also use the task ID when subscribing to events from Queue.

var counter = new Queue(fn)
counter.on('task_finish', function (taskId, result) {
  // taskId will be 'jim' or 'bob'
})
counter.push({ id: 'jim', count: 2 });
counter.push({ id: 'bob', count: 1 });

Batch Processing

Your processing function can also be modified to handle multiple tasks at the same time. For example:

var ages = new Queue(function (batch, cb) {
  // Batch 1:
  //   [ { id: 'steve', age: 21 },
  //     { id: 'john', age: 34 },
  //     { id: 'joe', age: 18 } ]
  // Batch 2:
  //   [ { id: 'mary', age: 23 } ]
  cb();
}, { batchSize: 3 })
ages.push({ id: 'steve', age: 21 });
ages.push({ id: 'john', age: 34 });
ages.push({ id: 'joe', age: 18 });
ages.push({ id: 'mary', age: 23 });

Note how the queue will only handle at most 3 items at a time.

Below is another example of a batched call with numbers.

var ages = new Queue(function (batch, cb) {
  // batch = [1,2,3]
  cb();
}, { batchSize: 3 })
ages.push(1);
ages.push(2);
ages.push(3);

Filtering, Validation and Priority

You can also format (and filter) the input that arrives from a push before it gets processed by the queue by passing in a filter function.

var greeter = new Queue(function (name, cb) {
  console.log("Hello, %s!", name)
  cb();
}, {
  filter: function (input, cb) {
    if (input === 'Bob') {
      return cb('not_allowed');
    }
    return cb(null, input.toUpperCase())
  }
});
greeter.push('anna'); // Prints 'Hello, ANNA!'

This can be particularly useful if your queue needs to do some pre-processing, input validation, database lookup, etc. before you load it onto the queue.

You can also define a priority function to control which tasks get processed first.

var greeter = new Queue(function (name, cb) {
  console.log("Greetings, %s.", name);
  cb();
}, {
  priority: function (name, cb) {
    if (name === "Steve") return cb(null, 10);
    if (name === "Mary") return cb(null, 5);
    if (name === "Joe") return cb(null, 5);
    cb(null, 1);
  }
})
greeter.push("Steve");
greeter.push("John");
greeter.push("Joe");
greeter.push("Mary");

// Prints out:
//   Greetings, Steve.
//   Greetings, Joe.
//   Greetings, Mary.
//   Greetings, John.

If filo is set to true in the example above, then Joe and Mary would swap order.

back to top


Queue Management

Retry

You can set tasks to retry maxRetries times if they fail. By default, tasks will fail (and will not retry.) Optionally, you can set a retryDelay to wait a little while before retrying.

var q = new Queue(fn, { maxRetries: 10, retryDelay: 1000 })

Timing

You can configure the queue to have a maxTimeout.

var q = new Queue(function (name, cb) {
  someLongTask(function () {
    cb();
  })
}, { maxTimeout: 2000 })

After 2 seconds, the process will throw an error instead of waiting for the callback to finish.

You can also delay the queue before it starts its processing. This is the behaviour of a timed cargo.

var q = new Queue(function (batch, cb) {
  // Batch [1,2] will process after 2s.
  cb();
}, { batchSize: 5, batchDelay: 2000 })
q.push(1);
setTimeout(function () {
  q.push(2);
}, 1000)

You can also set afterProcessDelay, which will delay processing between tasks.

var q = new Queue(function (task, cb) {
  cb(); // Will wait 1 second before taking the next task
}, { afterProcessDelay: 1000 })
q.push(1);
q.push(2);

Instead of just the batchDelay, you can add a batchDelayTimeout, which is for firing off a batch if it hasn't had any new tasks pushed to the queue in the batchDelayTimeout time (in milliseconds.)

var q = new Queue(fn, {
  batchSize: 50,
  batchDelay: 5000,
  batchDelayTimeout: 1000
})
q.push(1);
q.push(2);

In the example above, the queue will wait for 50 items to fill up in 5s or process the queue if no new tasks were added in 1s.

Precondition

You can define a function called precondition that checks that it's ok to process the next batch. If the preconditions fail, it will keep calling this function until it passes again.

var q = new Queue(function (batch, cb) {

  // Do something that requires internet

}, {
  precondition: function (cb) {
    isOnline(function (err, ok) {
      if (ok) {
        cb(null, true);
      } else {
        cb(null, false);
      }
    })
  },
  preconditionRetryTimeout: 10*1000 // If we go offline, retry every 10s
})

Pause/Resume

There are options to control processes while they are running.

You can return an object in your processing function with the functions cancel, pause and resume. This will allow operations to pause, resume or cancel while it's running.

var uploader = new Queue(function (file, cb) {
  
  var worker = someLongProcess(file);

  return {
    cancel: function () {
      // Cancel the file upload
    },
    pause: function () {
      // Pause the file upload
    },
    resume: function () {
      // Resume the file upload
    }
  }
})
uploader.push('/path/to/file.pdf');
uploader.pause();
uploader.resume();

Cancel/Abort

You can also set cancelIfRunning to true. This will cancel a running task if a task with the same ID is pushed onto the queue.

var uploader = new Queue(function (file, cb) {
  var request = someLongProcess(file);
  return {
    cancel: function () {
      request.cancel();
    }
  }
}, {
  id: 'path',
  cancelIfRunning: true
})
uploader.push({ path: '/path/to/file.pdf' });
// ... Some time later
uploader.push({ path: '/path/to/file.pdf' });

In the example above, the first upload process is cancelled and the task is requeued.

You can also call .cancel(taskId) to cancel and unqueue the task.

uploader.cancel('/path/to/file.pdf');

Note that if you enable this option in batch mode, it will cancel the entire batch!

back to top


Advanced

Updating Task Status

The process function will be run in a context with progress, finishBatch and failedBatch functions.

The example below illustrates how you can use these:

var uploader = new Queue(function (file, cb) {
  this.failedBatch('some_error')
  this.finishBatch(result)
  this.progressBatch(bytesUploaded, totalBytes, "uploading")
});
uploader.on('task_finish', function (taskId, result) {
  // Handle finished result
})
uploader.on('task_failed', function (taskId, errorMessage) {
  // Handle error
})
uploader.on('task_progress', function (taskId, completed, total) {
  // Handle task progress
})

uploader.push('/some/file.jpg')
  .on('finish', function (result) {
    // Handle upload result
  })
  .on('failed', function (err) {
    // Handle error
  })
  .on('progress', function (progress) {
    // progress.eta - human readable string estimating time remaining
    // progress.pct - % complete (out of 100)
    // progress.complete - # completed so far
    // progress.total - # for completion
    // progress.message - status message
  })

Update Status in Batch mode (batchSize > 1)

You can also complete individual tasks in a batch by using failedTask and finishTask functions.

var uploader = new Queue(function (files, cb) {
  this.failedTask(0, 'some_error')         // files[0] has failed with 'some_error'
  this.finishTask(1, result)               // files[1] has finished with {result}
  this.progressTask(2, 30, 100, "copying") // files[2] is 30% done, currently copying
}, { batchSize: 3 });
uploader.push('/some/file1.jpg')
uploader.push('/some/file2.jpg')
uploader.push('/some/file3.jpg')

Note that if you use *-Task and *-Batch functions together, the batch functions will only apply to the tasks that have not yet finished/failed.

Queue Statistics

You can inspect the queue at any given time to see information about how many items are queued, average queue time in milliseconds, success rate and total item processed.

var q = new Queue(fn);
var stats = q.getStats();

// stats.total = Total tasks processed
// stats.average = Average process time in milliseconds
// stats.successRate = % success (between 0 and 1)
// stats.peak = Most tasks queued at any given point in time

back to top


Storage

Using a store

For your convenience, we have added compatibility for a few storage options.

By default, we are using an in-memory store that doesn't persist. You can change to one of our other built in stores by passing in the store option.

Built-in store

Currently, we support the following stores:

SQLite store (npm install sqlite3)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'sqlite',
    path: '/path/to/sqlite/file'
  }
});

Note that this requires better-queue-sql or better-queue-sqlite.

PostgreSQL store (npm install pg)

var q = new Queue(fn, {
  store: {
    type: 'sql',
    dialect: 'postgres',
    host: 'localhost',
    port: 5432,
    username: 'username',
    password: 'password',
    dbname: 'template1',
    tableName: 'tasks'
  }
});

Please help us add support for more stores; contributions are welcome!

Custom Store

Writing your own store is very easy; you just need to implement a few functions then call queue.use(store) on your store.

var q = new Queue(fn, { store: myStore });

or

q.use(myStore);

Your store needs the following functions:

q.use({
  connect: function (cb) {
    // Connect to your db
  },
  getRunningTasks: function (cb) {
    // Returns a map of running tasks (lockId => taskIds)
  },
  getTask: function (taskId, cb) {
    // Retrieves a task
  },
  putTask: function (taskId, task, priority, cb) {
    // Save task with given priority
  },
  takeFirstN: function (n, cb) {
    // Removes the first N items (sorted by priority and age)
  },
  takeLastN: function (n, cb) {
    // Removes the last N items (sorted by priority and recency)
  }
})

back to top


Using with Webpack

Better Queue can be used in the browser using the default in-memory store. However you have to create and pass the store to its constructor.

import Queue = require('better-queue')
import MemoryStore = require('better-queue-memory')

var q = new Queue(function (input, cb) {
  
  // Some processing here ...

  cb(null, result);
},
{
    store: new MemoryStore(),
  }
)

TypeScript Support

Better Queue can be used in TypeScript projects by installing type definitions from the Definitely Typed repository:

npm install --save @types/better-queue

Afterwards, you can simply import the library:

import Queue = require('better-queue')

const q: Queue = new Queue(() => {});

back to top


Full Documentation

new Queue(process, options)

The first argument can be either the process function or the options object.

A process function is required, all other options are optional.



Methods on Queue

Events on Queue

Events on Ticket