Home

Awesome

RSMQ: Redis Simple Message Queue for Node.js

Redis Simple Message Queue

A lightweight message queue for Node.js that requires no dedicated queue server. Just a Redis server.

Build Status Dependency Status

tl;dr: If you run a Redis server and currently use Amazon SQS or a similar message queue you might as well use this fast little replacement. Using a shared Redis server multiple Node.js processes can send / receive messages.

Features

Note: RSMQ uses the Redis EVAL command (LUA scripts) so the minimum Redis version is 2.6+.

Usage

Installation

npm install rsmq

Modules for RSMQ

To keep the core of RSMQ small additional functionality is available as modules:

RSMQ in other languages

The simplicity of RSMQ is useful in other languages. Here is a list of implementations in other languages:

Note: Should you plan to port RSQM to another language please make sure to have tests to ensure compatibility with all RSMQ clients. And of course: let me know so i can mention your port here.

Methods

Constructor

Creates a new instance of RSMQ.

Parameters:

Example:

const RedisSMQ = require("rsmq");
const rsmq = new RedisSMQ( {host: "127.0.0.1", port: 6379, ns: "rsmq"} );

Queue

createQueue(options, callback)

Create a new queue.

Parameters:

Returns:

Example:

rsmq.createQueue({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("queue created")
	}
});

listQueues(options, callback)

List all queues

Returns an array:

Example:

rsmq.listQueues(function (err, queues) {
	if (err) {
		console.error(err)
		return
	}

	console.log("Active queues: " + queues.join( "," ) )
});

deleteQueue(options, callback)

Deletes a queue and all messages.

Parameters:

Returns:

Example:

rsmq.deleteQueue({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("Queue and all messages deleted.")
	} else {
		console.log("Queue not found.")
	}
});

getQueueAttributes(options, callback)

Get queue attributes, counter and stats

Parameters:

Returns an object:

Example:

rsmq.getQueueAttributes({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err);
		return;
	}

	console.log("==============================================");
	console.log("=================Queue Stats==================");
	console.log("==============================================");
	console.log("visibility timeout: ", resp.vt);
	console.log("delay for new messages: ", resp.delay);
	console.log("max size in bytes: ", resp.maxsize);
	console.log("total received messages: ", resp.totalrecv);
	console.log("total sent messages: ", resp.totalsent);
	console.log("created: ", resp.created);
	console.log("last modified: ", resp.modified);
	console.log("current n of messages: ", resp.msgs);
	console.log("hidden messages: ", resp.hiddenmsgs);
});

setQueueAttributes(options, callback)

Sets queue parameters.

Parameters:

Note: At least one attribute (vt, delay, maxsize) must be supplied. Only attributes that are supplied will be modified.

Returns an object:

Example:

rsmq.setQueueAttributes({ qname: "myqueue", vt: "30"}, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	console.log("changed the invisibility time of messages that have been received to 30 seconds");
	console.log(resp);
});

Messages

sendMessage

Sends a new message.

Parameters:

Returns:

Example:

rsmq.sendMessage({ qname: "myqueue", message: "Hello World "}, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	console.log("Message sent. ID:", resp);
});

receiveMessage(options, callback)

Receive the next message from the queue.

Parameters:

Returns an object:

Note: Will return an empty object if no message is there

Example:

rsmq.receiveMessage({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp.id) {
		console.log("Message received.", resp)
	} else {
		console.log("No messages for me...")
	}
});

deleteMessage(options, callback)

Parameters:

Returns:

Example:

rsmq.deleteMessage({ qname: "myqueue", id: "dhoiwpiirm15ce77305a5c3a3b0f230c6e20f09b55" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("Message deleted.")
	} else {
		console.log("Message not found.")
	}
});

popMessage(options, callback)

Receive the next message from the queue and delete it.

Important: This method deletes the message it receives right away. There is no way to receive the message again if something goes wrong while working on the message.

Parameters:

Returns an object:

Note: Will return an empty object if no message is there

Example:

rsmq.popMessage({ qname: "myqueue" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp.id) {
		console.log("Message received and deleted from queue", resp)
	} else {
		console.log("No messages for me...")
	}
});

changeMessageVisibility(options, callback)

Change the visibility timer of a single message. The time when the message will be visible again is calculated from the current time (now) + vt.

Parameters:

Returns:

Example:

rsmq.changeMessageVisibility({ qname: "myqueue", vt: "60", id: "dhoiwpiirm15ce77305a5c3a3b0f230c6e20f09b55" }, function (err, resp) {
	if (err) {
		console.error(err)
		return
	}

	if (resp === 1) {
		console.log("message hidden for 60 seconds")
	}
});

quit(callback)

Disconnect the redis client. This is only useful if you are using rsmq within a script and want node to be able to exit.

Realtime

When initializing RSMQ you can enable the realtime PUBLISH for new messages. On every new message that gets sent to RSQM via sendMessage a Redis PUBLISH will be issued to {rsmq.ns}:rt:{qname}.

Example for RSMQ with default settings:

How to use the realtime option

Besides the PUBLISH when a new message is sent to RSMQ nothing else will happen. Your app could use the Redis SUBSCRIBE command to be notified of new messages and issue a receiveMessage then. However make sure not to listen with multiple workers for new messages with SUBSCRIBE to prevent multiple simultaneous receiveMessage calls.

Changes

see the CHANGELOG

Other projects

NameDescription
node-cacheSimple and fast Node.js internal caching. Node internal in memory cache like memcached.
redis-taggingA Node.js helper library to make tagging of items in any legacy database (SQL or NoSQL) easy and fast.
redis-sessionsAn advanced session store for Node.js and Redis
rsmq-workerHelper to implement a worker based on RSMQ (Redis Simple Message Queue).
connect-redis-sessionsA connect or express middleware to use redis sessions that lets you handle multiple sessions per user_id.

The MIT License

Please see the LICENSE.md file.