Home

Awesome

qlobber-fsq   Build Status Build status Coverage Status NPM version

Shared file system queue for Node.js.

Example:

var QlobberFSQ = require('qlobber-fsq').QlobberFSQ;
var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
fsq.subscribe('foo.*', function (data, info)
{
    console.log(info.topic, data.toString('utf8'));
    var assert = require('assert');
    assert.equal(info.topic, 'foo.bar');
    assert.equal(data, 'hello');
});
fsq.on('start', function ()
{
    this.publish('foo.bar', 'hello');
});

You can publish messages using a separate process if you like:

var QlobberFSQ = require('qlobber-fsq').QlobberFSQ;
var fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
fsq.stop_watching();
fsq.on('stop', function ()
{
    this.publish('foo.bar', 'hello');
});

Or use the streaming interface to read and write messages:

const { QlobberFSQ } = require('qlobber-fsq');
const fsq = new QlobberFSQ({ fsq_dir: '/shared/fsq' });
function handler(stream, info)
{
    const data = [];

    stream.on('readable', function ()
    {
        let chunk;
        while (chunk = this.read())
        {
            data.push(chunk);
        }
    });

    stream.on('end', function ()
    {
        const str = Buffer.concat(data).toString('utf8');
        console.log(info.topic, str);
        const assert = require('assert');
        assert.equal(info.topic, 'foo.bar');
        assert.equal(str, 'hello');
    });
}
handler.accept_stream = true;
fsq.subscribe('foo.*', handler);
fsq.on('start', function ()
{
    fsq.publish('foo.bar').end('hello');
});

The API is described here.

Installation

npm install qlobber-fsq

Limitations

Distributed filesystems

Note: When using a distributed file system with qlobber-fsq, ensure that you synchronize the time and date on all the computers you're using.

FraunhoferFS (BeeGFS)

When using the FraunhoferFS distributed file system, set the following options in fhgfs-client.conf:

tuneFileCacheType             = none
tuneUseGlobalFileLocks        = true

qlobber-fsq has been tested with FraunhoferFS 2014.01 on Ubuntu 14.04 and FraunhoferFS 2012.10 on Ubuntu 13.10.

CephFS

qlobber-fsq has been tested with CephFS 0.80 on Ubuntu 14.04. Note that you'll need to upgrade your kernel to at least 3.14.1 in order to get the fix for a bug in CephFS.

How it works

How it works

Under the directory you specify for fsq_dir, qlobber-fsq creates the following sub-directories:

qlobber-fsq reads UPDATE at regular intervals to determine whether a new message has been written to a bucket. If it has then it processes each filename in the bucket's directory listing.

If the expiry time in the filename has passed then it deletes the message.

If the filename indicates the message can be read by many subscribers:

If the filename indicates the message can be read by only one subscriber (i.e. work queue semantics):

Licence

MIT

Test

To run the default tests:

grunt test [--fsq-dir=<path>] [--getdents_size=<buffer size>] [--disruptor]

If you don't specify --fsq-dir then the default will be used (a directory named fsq in the test directory).

If you specify --getdents_size then use of getdents will be included in the tests.

If you specify --disruptor then use of shared memory LMAX Disruptors will be included in the tests.

To run the stress tests (multiple queues in a single Node process):

grunt test-stress [--fsq-dir=<path>] [--disruptor]

To run the multi-process tests (each process publishing and subscribing to different messages):

grunt test-multi [--fsq-dir=<path>] [--queues=<number of queues>] [--disruptor]

If you omit --queues then one process will be created per core (detected with os.cpus()).

To run the distributed tests (one process per remote host, each one publishing and subscribing to different messages):

grunt test-multi --fsq-dir=<path> --remote=<host1> --remote=<host2>

You can specify as many remote hosts as you like. The test uses cp-remote to run a module on each remote host. Make sure on each host:

Please note the distributed tests don't run on Windows.

Lint

grunt lint

Code Coverage

grunt coverage [--fsq-dir=<path>]

c8 results are available here.

Coveralls page is here.

Benchmarks

To run the benchmark:

grunt bench [--fsq-dir=<path>] \
            --rounds=<number of rounds> \
            --size=<message size> \
            --ttl=<message time-to-live in seconds> \
            [--disruptor] \
            [--num_elements=<number of disruptor elements>] \
            [--element_size=<disruptor element size>] \
            [--bucket_stamp_size=<number of bytes to write to UPDATE file] \
            [--getdents_size=<buffer size>] \
            [--ephemeral] \
            [--refresh_ttl=<period between expiration check in seconds>] \
            (--queues=<number of queues> | \
             --remote=<host1> --remote=<host2> ...)

If you don't specify --fsq-dir then the default will be used (a directory named fsq in the bench directory).

If you provide at least one --remote=<host> argument then the benchmark will be distributed across multiple hosts using cp-remote. Make sure on each host:

API

<a name="tableofcontents"></a>

Constructor

Publish and subscribe

Lifecycle

Events

QlobberFSQ([options])

Creates a new QlobberFSQ object for publishing and subscribing to a file system queue.

Parameters:

<sub>Go: TOC</sub>

<a name="qlobberfsqprototype"></a>

QlobberFSQ.prototype.subscribe(topic, handler, [options], [cb])

Subscribe to messages in the file system queue.

Parameters:

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.prototype.unsubscribe([topic], [handler], [cb])

Unsubscribe from messages in the file system queue.

Parameters:

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.prototype.publish(topic, [payload], [options], [cb])

Publish a message to the file system queue.

Parameters:

Return:

{Stream | undefined} A Writable stream if no payload was passed, otherwise undefined.

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.prototype.stop_watching([cb])

Stop scanning for new messages.

Parameters:

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.prototype.refresh_now()

Check the UPDATE file now rather than waiting for the next periodic check to occur

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.prototype.force_refresh()

Scan for new messages in the messages sub-directory without checking whether the UPDATE file has changed.

<sub>Go: TOC | QlobberFSQ.prototype</sub>

QlobberFSQ.get_num_buckets(bucket_base, bucket_num_chars)

Given a radix to use for characters in bucket names and the number of digits in each name, return the number of buckets that can be represented.

Parameters:

Return:

{Integer} The number of buckets that can be represented.

<sub>Go: TOC | QlobberFSQ</sub>

<a name="qlobberfsqevents"></a>

QlobberFSQ.events.start()

start event

QlobberFSQ objects fire a start event when they're ready to publish messages. Don't call publish until the start event is emitted or the message may be dropped. You can subscribe to messages before start is fired, however.

A start event won't be fired after a stop event.

<sub>Go: TOC | QlobberFSQ.events</sub>

QlobberFSQ.events.stop()

stop event

QlobberFSQ objects fire a stop event after you call stop_watching and they've stopped scanning for new messages. Messages already read may still be being processed, however.

<sub>Go: TOC | QlobberFSQ.events</sub>

QlobberFSQ.events.error(err)

error event

QlobberFSQ objects fire an error event if an error occurs before start is emitted. The QlobberFSQ object is unable to continue at this point and is not scanning for new messages.

Parameters:

<sub>Go: TOC | QlobberFSQ.events</sub>

QlobberFSQ.events.warning(err)

warning event

QlobberFSQ objects fire a warning event if an error occurs after start is emitted. The QlobberFSQ object will still be scanning for new messages after emitting a warning event.

Parameters:

<sub>Go: TOC | QlobberFSQ.events</sub>

QlobberFSQ.events.single_disabled(err)

single_disabled event

QlobberFSQ objects fire a single_disabled event if they can't support work queue semantics.

Parameters:

<sub>Go: TOC | QlobberFSQ.events</sub>

QlobberFSQ.events.getdents_disabled(err)

getdents_disabled event

QlobberFSQ objects fire a getdents_disabled event if they can't support enumerating bucket directories using getdents.

Parameters:

<sub>Go: TOC | QlobberFSQ.events</sub>

—generated by apidox