Home

Awesome

RSMQ-Worker

Build Status Windows Tests Coveralls Coverage

Deps Status npm version npm downloads

Join the chat at https://gitter.im/mpneuried/rsmq-worker

Helper to simply implement a worker RSMQ ( Redis Simple Message Queue ).

NPM

:warning: Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+.

Install

  npm install rsmq-worker

Initialize

  new RSMQWorker( queuename, options );

Example:

  var RSMQWorker = require( "rsmq-worker" );
  var worker = new RSMQWorker( "myqueue" );

  worker.on( "message", function( msg, next, id ){
  	// process your message
  	console.log("Message id : " + id);
  	console.log(msg);
  	next()
  });

  // optional error listeners
  worker.on('error', function( err, msg ){
      console.log( "ERROR", err, msg.id );
  });
  worker.on('exceeded', function( msg ){
      console.log( "EXCEEDED", msg.id );
  });
  worker.on('timeout', function( msg ){
      console.log( "TIMEOUT", msg.id, msg.rc );
  });

  worker.start();

Config

Raw message format

A message ( e.g. received by the event data or customExceedCheck ) contains the following keys:

Methods

.start()

If you haven't defined the config autostart to true you have to call the .start() method.

Return

( Self ): The instance itself for chaining.

.stop()

Just stop the receive interval. This will not cut the connection to rsmq/redis. If you want you script to end call .quit()

Return

( Self ): The instance itself for chaining.

.send( msg [, delay ][, cb ] )

Helper function to simply send a message in the configured queue.

Arguments

Return

( Self ): The instance itself for chaining.

.del( id [, cb ] )

Helper function to simply delete a message after it has been processed.

Arguments

Return

( Self ): The instance itself for chaining.

.changeInterval( interval )

Change the interval timeouts in operation.

Arguments

Return

( Self ): The instance itself for chaining.

.quit()

Stop the worker and close the connection. After this it's no longer possible to reuse the worker-instance. It's just intended to kill all timers and connections so your script will end.

.info( cb )

Get the current queue attributes. This is just a shortcut to the rsmq.getQueueAttributes.

Arguments

.size( [hidden=false], cb )

Get the current queue size.

Arguments

Return

( Self ): The instance itself for chaining.

Events

message

Main event to catch and process a message. If you do not set a handler for this Event nothing will happen.

Example:

worker.on( "message", function( message, next, msgid ){
	// process message ... 
	next();
});

Arguments

ready

Fired until the worker is connected to rsmq/redis and has been initialized with the given queuename.

data

The raw event when a message has been received.

Arguments

deleted

Fired after a message has been deleted.

Arguments

exceeded

Fired after a message has been exceeded and immediately will be deleted.

Arguments

timeout

Fired if a message processing exceeds the configured timeout.

Arguments

error

Fired if a message processing throws an error.

Arguments

Advanced example

This is an advanced example showing some features in action.

	var fs = require( "fs" );
	var RSMQWorker = require( "rsmq-worker" );

	var fnCheck = function( msg ){
		// check function to not exceed the message if the content is `createmessages`
		if( msg.message === "createmessages" ){
			return true
		}
		return false
	}

	
	var worker = new RSMQWorker( "myqueue", {
		interval: [ .1, 1 ],				// wait 100ms between every receive and step up to 1,3 on empty receives
		invisibletime: 2,						// hide received message for 5 sec
		maxReceiveCount: 2,					// only receive a message 2 times until delete
		autostart: true,						// start worker on init
		customExceedCheck: fnCheck	// set the custom exceed check
	});

	// Listen to errors
	worker.on('error', function( err, msg ){
	    console.log( "ERROR", err, msg.id );
	});
	worker.on('timeout', function( msg ){
	    console.log( "TIMEOUT", msg.id, msg.rc );
	});
	
	// handle exceeded messages
	// grab the internal rsmq instance
	var rsmq = worker._getRsmq();
	worker.on('exceeded', function( msg ){
		console.log( "EXCEEDED", msg.id );
		// NOTE: make sure this queue exists
		rsmq.sendMessage( "YOUR_EXCEEDED_QUEUE", msq, function( err, resp ){
			if( err ){
				console.error( "write-to-exceeded-queue", err )
			}
		});
	});

	// listen to messages
	worker.on( "message", function( message, next, id ){
		
		console.log( "message", message );
		
		if( message === "createmessages" ){
			next( false )
			worker.send( JSON.stringify( { type: "writefile", filename: "./test.txt", txt: "Foo Bar" } ) );
			worker.send( JSON.stringify( { type: "deletefile", filename: "./test.txt" } ) );
			return	
		}

		var _data = JSON.parse( message )
		switch( _data.type ){
			case "writefile": 
				fs.writeFile( _data.filename, _data.txt, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
			case "deletefile": 
				fs.unlink( _data.filename, function( err ){
					if( err ){
						next( err );
					}else{
						next()
					}
				});
				break;
		}
		
	});

	worker.send( "createmessages" );

Details

Options interval

The option interval can:

A.) be a Number so the worker will poll the queue every n seconds (e.g. interval: .5 = twice a second second)

B.) be an Array of Numbers. On start interval[0] is the time to poll the queue. Everytime the worker receives an empty response (queue is empty) the next interval will be used to wait for the next poll (interval[+1]) until the last definition interval[ n ] was reached. On every received message the wait time will be reset to interval[0].

E.g: interval: [ .2, 1, 3 ]

Todos/Ideas

Release History

VersionDateDescription
0.5.22016-10-24Optimized README and updated dependencies
0.5.12016-08-22Fixed reconnect error Issue#20. Thanks to mstduff; updated deps; removed generated code docs from repo
0.5.02016-07-14Added methods .info(cb) (Issue#17) and .size( [hidden,] cb )
0.4.32016-06-20Optimized event listeners Issue#15. Thanks to Kevin Turner
0.4.22016-05-06Added the .quit() function Issue#11. Thanks to Sam Fung
0.4.12016-04-05Fixed missing isNumber function
0.4.02016-03-30Updated dependencies (especially lodash to 4.x). Fixed a config bug caused by the array merge from extend Issue#7. Thanks to Peter Hanneman
0.3.82015-11-04Fixed stop behavior. Pull#5. Thanks to Exinferis
0.3.72015-09-02Added tests to check the behavior during errors within message processing; Added option alwaysLogErrors to prevent console logs if an error event handler was attached. Issue #3
0.3.62015-09-02Updated dependencies; optimized readme (thanks to Tobias Lidskog for the pull #4)
0.3.52015-04-27again ... fixed argument dispatch for .send()
0.3.42015-04-27fixed argument dispatch for .send() and added optional cb for .del()
0.3.32015-03-27added changeInterval to modify the interval in operation
0.3.22015-02-23changed default prefix/namespace;
0.3.02015-02-16It's now possible to return an error as first argument of next. This will lead to an error emit + optimized readme
0.2.22015-01-27added option defaultDelay and optimized arguments of the send method; fixed travis.yml
0.2.02015-01-27Added timeout, better error handling and send callback
0.1.22015-01-20Reorganized code, added code docs and optimized readme
0.1.12015-01-17Added test scripts and optimized repository file list
0.1.02015-01-16First working and documented version
0.0.12015-01-14Initial commit

NPM

Initially Generated with generator-mpnodemodule

Other projects

NameDescription
rsmqA really simple message queue based on Redis
rsmq-clia terminal client for rsmq
rest-rsmqREST interface for.
redis-notificationsA redis based notification engine. It implements the rsmq-worker to safely create notifications and recurring reports.
node-cacheSimple and fast NodeJS internal caching. Node internal in memory cache like memcached.
redis-sessionsAn advanced session store for NodeJS and Redis
obj-schemaSimple module to validate an object by a predefined schema
connect-redis-sessionsA connect or express middleware to simply use the redis sessions. With redis sessions you can handle multiple sessions per user_id.
systemhealthNode module to run simple custom checks for your machine or it's connections. It will use redis-heartbeat to send the current state to redis.
task-queue-workerA powerful tool for background processing of tasks that are run by making standard http requests.
soyerSoyer is small lib for serverside use of Google Closure Templates with node.js.
grunt-soy-compileCompile Goggle Closure Templates ( SOY ) templates inclding the handling of XLIFF language files.
backlunrA solution to bring Backbone Collections together with the browser fulltext search engine Lunr.js

The MIT License (MIT)

Copyright © 2015 Mathias Peter, http://www.tcs.de

Permission is hereby granted, free of charge, to any person obtaining a copy of this software and associated documentation files (the “Software”), to deal in the Software without restriction, including without limitation the rights to use, copy, modify, merge, publish, distribute, sublicense, and/or sell copies of the Software, and to permit persons to whom the Software is furnished to do so, subject to the following conditions:

The above copyright notice and this permission notice shall be included in all copies or substantial portions of the Software.

THE SOFTWARE IS PROVIDED “AS IS”, WITHOUT WARRANTY OF ANY KIND, EXPRESS OR IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE SOFTWARE.