Home

Awesome

RocketMQ Client for Node.js

Version Downloads License TravisCI Dependency

This official Node.js client is a lightweight wrapper around rocketmq-client-cpp, a finely tuned CPP client.

Notice 1: This client is still in dev version. Use it cautiously in production.

Notice 2: This SDK is now only support macOS and Ubuntu 14.04. Ubuntu 16+ is not supported and CentOS is not tested yet.

Installation

$ npm install --save apache-rocketmq

Examples

You may view example/producer.js and example/push_consumer.js for quick start.

Usage

Require this package first.

const { Producer, PushConsumer } = require("apache-rocketmq");

Producer

Constructor

new Producer(groupId[, instanceName][, options]);

Producer's constructor receives three parameters:

e.g.

const { Producer } = require("apache-rocketmq");
const producer = new Producer("GROUP_ID", "INSTANCE_NAME", {
    nameServer: "127.0.0.1:9876",
});

start

producer.start([callback]);

.start receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

producer.start(function(err) {
    if(err) {
        //
    }
});

// or

producer.start().then(() => {
    //
}).catch(err => {
    //
});

shutdown

producer.shutdown([callback]);

.shutdown receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

producer.shutdown(function(err) {
    if(err) {
        //
    }
});

// or

producer.shutdown().then(() => {
    //
}).catch(err => {
    //
});

send

producer.send(topic, body[, options][, callback]);

.send receives 4 parameters including a callback. If no callback passed, this function will return a Promise object.

e.g.

producer.send("test", `baz ${i}`, {
    keys: "foo",
    tags: "bar"
}, function(err, result) {
    if(err) {
        // ...    
    } else {
        console.log(result);

        // console example:
        //
        //  { status: 0,
        //    statusStr: 'OK',
        //    msgId: '0101007F0000367E0000309DD68B0700',
        //    offset: 0 }
    }
});
send status and statusStr
statusstatusStr
0OK
1FLUSH_DISK_TIMEOUT
2FLUSH_SLAVE_TIMEOUT
3SLAVE_NOT_AVAILABLE

PushConsumer

Constructor

new PushConsumer(groupId[, instanceName][, options]);

PushConsumer's constructor receives three parameters:

e.g.

const { PushConsumer } = require("apache-rocketmq");
const consumer = new PushConsumer("GROUP_ID", "INSTANCE_NAME", {
    nameServer: "127.0.0.1:9876",
    threadCount: 3
});

start

consumer.start([callback]);

.start receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

consumer.start(function(err) {
    if(err) {
        //
    }
});

// or

consumer.start().then(() => {
    //
}).catch(err => {
    //
});

shutdown

consumer.shutdown([callback]);

.shutdown receives a callback function. If no callback passed, this function will return a Promise object.

e.g.

consumer.shutdown(function(err) {
    if(err) {
        //
    }
});

// or

consumer.shutdown().then(() => {
    //
}).catch(err => {
    //
});

subscribe

Add a subscription relationship to consumer.

consumer.subscribe(topic[, expression]);

.subscribe receives two parameters which the second parameter is optional.

On Message Event

If you want to receive messages from RocketMQ Server, you should add a listener for message event which receives 2 parameters.

function YOUR_LISTENER(msg, ack) {
    //
}

msg object looks like:

{ topic: 'test',
  tags: 'bar',
  keys: 'foo',
  body: 'baz 7',
  msgId: '0101007F0000367E0000339DD68B0800' }

You may call ack.done() to tell RocketMQ that you've finished your message successfully which is same as ack.done(true). And you may call ack.done(false) to tell it that you've failed.

e.g.

consumer.on("message", function(msg, ack) {
    console.log(msg);
    ack.done();
});

Apache RocketMQ Community

Contact Us

How to Contribute

Contributions are warmly welcome! Be it trivial cleanup, major new feature or other suggestion. Read this how to contribute guide for more details.

License

Apache License, Version 2.0 Copyright (C) Apache Software Foundation