Home

Awesome

nsqjs

The official NodeJS client for the nsq client protocol. This implementation attempts to be fully compliant and maintain feature parity with the official Go (go-nsq) and Python (pynsq) clients.

Build Status

NPM

Usage

new Reader(topic, channel, options)

The topic and channel arguments are strings and must be specified. The options argument is optional. Below are the parameters that can be specified in the options object.

Reader events are:

Reader.MESSAGE and Reader.DISCARD both produce Message objects. Reader.NSQD_CONNECTED and Reader.NSQD_CLOSED events both provide the host and port of the nsqd to which the event pertains.

These methods are available on a Reader object:

Message

The following properties and methods are available on Message objects produced by a Reader instance.

new Writer(nsqdHost, nsqdPort, options)

Allows messages to be sent to an nsqd.

Available Writer options:

Writer events are:

These methods are available on a Writer object:

Simple example

Start nsqd and nsqdlookupd

# nsqdLookupd Listens on 4161 for HTTP requests and 4160 for TCP requests
$ nsqlookupd &
$ nsqd --lookupd-tcp-address=127.0.0.1:4160 &

JavaScript

var nsq = require('nsqjs');

var reader = new nsq.Reader('sample_topic', 'test_channel', {
  lookupdHTTPAddresses: '127.0.0.1:4161'
});

reader.connect();

reader.on('message', function (msg) {
  console.log('Received message [%s]: %s', msg.id, msg.body.toString());
  msg.finish();
});

CoffeeScript

nsq = require 'nsqjs'

topic = 'sample_topic'
channel = 'test_channel'
options =
  lookupdHTTPAddresses: '127.0.0.1:4161'

reader = new nsq.Reader topic, channel, options
reader.connect()

reader.on nsq.Reader.MESSAGE, (msg) ->
  console.log "Received message [#{msg.id}]: #{msg.body.toString()}"
  msg.finish()

Publish a message to nsqd to be consumed by the sample client:

$ curl -d "it really tied the room together" http://localhost:4151/pub?topic=sample_topic

Example with message timeouts

This script simulates a message that takes a long time to process or at least longer than the default message timeout. To ensure that the message doesn't timeout while being processed, touch events are sent to keep it alive.

JavaScript

var nsq = require('nsqjs');

var reader = new nsq.Reader('sample_topic', 'test_channel', {
  lookupdHTTPAddresses: '127.0.0.1:4161'
});

reader.connect();

reader.on('message', function (msg) {
  console.log('Received message [%s]', msg.id);

  function touch() {
    if (!msg.hasResponded) {
      console.log('Touch [%s]', msg.id);
      msg.touch();
      // Touch the message again a second before the next timeout.
      setTimeout(touch, msg.timeUntilTimeout() - 1000);
    }
  }

  function finish() {
    console.log('Finished message [%s]: %s', msg.id, msg.body.toString());
    msg.finish();
  }

  console.log('Message timeout is %f secs.', msg.timeUntilTimeout() / 1000);
  setTimeout(touch, msg.timeUntilTimeout() - 1000);

  // Finish the message after 2 timeout periods and 1 second.
  setTimeout(finish, msg.timeUntilTimeout() * 2 + 1000);
});

CoffeeScript

{Reader} = require 'nsqjs'

topic = 'sample_topic'
channel = 'test_channel'
options =
  lookupdHTTPAddresses: '127.0.0.1:4161'

reader = new Reader topic, channel, options
reader.connect()

reader.on Reader.MESSAGE, (msg) ->
  console.log "Received message [#{msg.id}]"

  touch = ->
    unless msg.hasResponded
      console.log "Touch [#{msg.id}]"
      msg.touch()
      # Touch the message again a second before the next timeout.
      setTimeout touch, msg.timeUntilTimeout() - 1000

  finish = ->
    console.log "Finished message [#{msg.id}]: #{msg.body.toString()}"
    msg.finish()

  console.log "Message timeout is #{msg.timeUntilTimeout() / 1000} secs."
  setTimeout touch, msg.timeUntilTimeout() - 1000

  # Finish the message after 2 timeout periods and 1 second.
  setTimeout finish, msg.timeUntilTimeout() * 2 + 1000

Enable nsqjs debugging

nsqjs used debug to log debug output.

To see all nsqjs events:

$ DEBUG=nsqjs:* node my_nsqjs_script.js

To see all reader events:

$ DEBUG=nsqjs:reader:* node my_nsqjs_script.js

To see a specific reader's events:

$ DEBUG=nsqjs:reader:<topic>/<channel>:* node my_nsqjs_script.js

Replace <topic> and <channel>

To see all writer events:

$ DEBUG=nsqjs:writer:* node my_nsqjs_script.js

A Writer Example

The writer sends a single message and then a list of messages.

JavaScript

var nsq = require('nsqjs');

var w = new nsq.Writer('127.0.0.1', 4150);

w.connect();

w.on('ready', function () {
  w.publish('sample_topic', 'it really tied the room together');
  w.publish('sample_topic', [
    'Uh, excuse me. Mark it zero. Next frame.', 
    'Smokey, this is not \'Nam. This is bowling. There are rules.'
  ]);
  w.publish('sample_topic', 'Wu?', function (err) {
    if (err) { return console.error(err.message); }
    console.log('Message sent successfully');
    w.close();
  });
});

w.on('closed', function () {
  console.log('Writer closed');
});

CoffeeScript

{Writer} = require 'nsqjs'

w = new Writer '127.0.0.1', 4150
w.connect()

w.on Writer.READY, ->
  w.publish 'sample_topic', 'it really tied the room together'
  w.publish 'sample_topic', ['Uh, excuse me. Mark it zero. Next frame.', 
    'Smokey, this is not \'Nam. This is bowling. There are rules.']
  w.publish 'sample_topic', 'Wu?', (err) ->
    console.log 'Message sent successfully' unless err?
    w.close()

w.on Writer.CLOSED, ->
  console.log 'Writer closed'

Changes