Home

Awesome

wsq Build Status Coverage Status Package Version License

Websocket task queue

What is it?

An easy to use task queue that handles streaming data, also works in the browser.

Example

Video encoding

server.js (see wsq-server for a standalone server with logging)

var Server = require('wsq/server')
var leveldown = require('leveldown')
var BlobStore = require('fs-blob-store')

new WsqServer({
  socketOptions: {port: 4242},
  dbLocation: '/var/leveldb/wsq',
  dbOptions: {db: leveldown}, // db can be any 'abstract-leveldown' compatible instance
  blobStore: new BlobStore('/var/storage/wsq') // same here any 'abstract-blob-store' will do
})

add.js:

// usage: node add.js <videofile> <ffmpeg arguments>

var Client = require('wsq/client')
var fs = require('fs')

var client = new Client('ws://localhost:4242')
var queue = client.queue('ffmpeg')

var data = {
	video: fs.createReadStream(process.argv[2]),
	args: process.argv.slice(3)
}

var task = queue.add(data, function(error){
	if (error) {
		console.log('Error queueing video: ' + error.message)
		process.exit(1)
	} else {
		console.log('Video queued for processing.')
		process.exit()
	}
})

worker.js:

var Client = require('wsq/client')
var fs = require('fs')
var os = require('os')
var path = require('path')

var client = new Client('ws://localhost:4242')

var videoQueue = client.queue('ffmpeg')
var resultQueue = client.quueu('ffmpeg-results')

videoQueue.process(function(task, callback) {
	var encoder = new VideoEncoder(task.data.args)

	encoder.on('progress', function(percent) {
		// update task progress, this will also reset the task timeout (default 60 seconds)
		// useful for long running tasks like this one
		task.updateProgress(percent)
	})

	// start encoding
	task.data.video.pipe(encoder)

	// start streaming the encoded video to the result queue, if the stream emits an error
	// the result task will not be created and any partial data streamed is discarded
	resultQueue.add({video: encoder}, function(error){
		if (error) {
			console.log('Encoding failed: ' + error.message)
			callback(error) // task is marked as failed, and possibly re-queued based on its options
		} else {
			// all good, ready to accept next task
			callback()
		}
	})
})

Documentation

Class: Client

This class is a wsq client. It is an EventEmitter.

new Client(address, [options])

Construct a new client object.

address

Address to wsq server, e.g. 'ws://localhost:1324'

options.backoff

Function with the signature function(tries){} that should return number of milliseconds to wait until next conneciton attempt.

The default funciton looks like:

function(tries){
	return Math.min(Math.pow(tries * 10, 2), 60 * 1000)
}

client.queue(name)

Return a ClientQueue instance. Will be created if nonexistent.

client.listQueues()

Return an array of active ClientQueue instances.

client.getEventStream()

Return a object stream that writes all the events as they come in from the server.

{
	"event": "<event name>",
	"args": [..]
}

Event: 'error'

function(error){}

Event: 'connect'

function(){}

Connected to server.

Event: 'disconnect'

function(){}

Connection was lost.

Class: ClientQueue

This class is the client's representation of a queue. It is an EventEmitter.

queue.add(data, [options], [callback])

Add a task to the queue. The optional callback is called when the task is successfully added queued or with an Error object on failure.

options.timeout

How long to wait for the task to complete without hearing from the worker in milliseconds. Set to -1 to disable timeout (not recommended, use progress updates for long running tasks instead)

options.retries

How many times the task should be re-queued on failure. A value of zero means no retries before the task have to be re-queued or removed explicitly. Can also be set to -1 to retry forever.

options.autoremove

Wether to remove the task and any associated streams that where buffered on completion. Note that failed tasks will always have to be handled explicitly.

queue.process(workerFn)

Add a worker to the queue. workerFn has the signature function(task, callback){}.

The callback should be called when the worker has completed processing the task, or with an Error object on failure.

queue.all(callback)

Callback with a list of all Task instances in the queue.

queue.waiting(callback)

Callback with a list of all waiting Task instances in the queue.

queue.active(callback)

Callback with a list of all active Task instances in the queue.

queue.completed(callback)

Callback with a list of all completed Task instances in the queue.

queue.failed(callback)

Callback with a list of all failed Task instances in the queue.

Event: 'worker added'

function(worker){}

Worker was added to the queue.

Event: 'worker removed'

function(worker){}

Worker was removed from the queue.

Event: 'worker started'

function(worker, task){}

Worker started processing task.

Event: 'task <task_event>'

See Task events.

Class: Task

This class represents a task. It is an EventEmitter.

task.updateProgress(percentage)

Update the progress of the task. Percentage is a fraction between 0 and 1. Calling this resets the task timeout timer.

task.touch()

Reset the task timeout. Useful if your task process does not have any useful progress information but you still want to keep long living tasks running.

task.remove(callback)

Remove the task from the system. Do not call this from inside a worker.

task.retry(callback)

Reschedule a failed task. Do not call this from inside a worker.

task.getData(callback)

Return task data with streams resolved. Note that task.data will already be resolved for tasks passed to a worker.

Event: 'added'

function(task){}

Added to queue.

Event: 'queued'

function(task){}

Queued for processing.

Event: 'started'

function(task){}

Started processing.

Event: 'progress'

function(task, percentage){}

Progress updated. Percentage is a fraction between 0 and 1.

Event: 'completed'

function(task){}

Successfully processed.

Event: 'failed'

function(task, willRetry){}

Task failed, task.error will contain the failure message. willRetry is true if the task will be retried.

Event: 'deleted'

function(task){}

Task and associated streams was removed from the queue.

License

MIT