Home

Awesome

amqp10

travis npm coverage npm gitter

NOTE: This repository is no longer being maintained. Please use rhea which is a fully compliant amqp 1.0 library that is actively maintained.

amqp10 is a promise-based, AMQP 1.0 compliant node.js client

Contents

Usage

The basic usage is to require the module, new up a client with the appropriate policy for the server you're connecting against, connect, and then send/receive as necessary. So a simple example for a local Apache Qpid server would look like:

var AMQPClient = require('amqp10').Client,
    Promise = require('bluebird');

var client = new AMQPClient(); // Uses PolicyBase default policy
client.connect('amqp://localhost')
  .then(function() {
    return Promise.all([
      client.createReceiver('amq.topic'),
      client.createSender('amq.topic')
    ]);
  })
  .spread(function(receiver, sender) {
    receiver.on('errorReceived', function(err) { /* Check for errors */ });
    receiver.on('message', function(message) {
      console.log('Rx message: ', message.body);
    });

    return sender.send({ key: "Value" });
  })
  .error(function(err) {
    console.log("error: ", err);
  });

By default send promises are resolved when a disposition frame is received from the remote link for the sent message, at this point the message is considered "settled". To tune this behavior, you can tweak the policy you give to AMQPClient on construction. For instance, to force send promises to be resolved immediately on successful sending of the payload, you would build AMQPClient like so:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.merge({
  senderLinkPolicy: {
    callbackPolicy: Policy.Utils.SenderCallbackPolicies.OnSent
  }
}, Policy.DefaultPolicy));

In addition to the above, you can also tune how message link credit is doled out (for throttling), as well as most other AMQP behaviors, all through policy overrides. See DefaultPolicy and the policy utilities for more details on altering various behaviors.

Flow Control and Message Dispositions

Flow control in AMQP occurs at both the Session and Link layers. Using our default policy, we start out with some sensible Session windows and Link credits, and renew those every time they get to the half-way point. In addition, receiver links start in "auto-settle" mode, which means that the sender side can consider the message "settled" as soon as it's sent. However, all of those settings are easily tune-able through Policy overrides (Policy.merge(<overrides>, <base policy>)).

For instance. we've provided a convenience helper for throttling your receiver links to only renew credits on messages they've "settled". To use this with Azure ServiceBus Queues for instance, it would look like:

var AMQPClient = require('amqp10').Client,
    Policy = require('amqp10').Policy;
var client = new AMQPClient(Policy.Utils.RenewOnSettle(1, 1, Policy.ServiceBusQueue));

Where the first number is the initial credit, and the second is the threshold - once remaining credit goes below that, we will give out more credit by the number of messages we've settled. In this case we're setting up the client for one-by-one message processing. Behind the scenes, this does the following:

  1. Sets the Link's creditQuantum to the first number (1), which you can do for yourself via the Policy mix-in { receiverLink: { creditQuantum: 1 } }

  2. Sets the Link to not auto-settle messages at the sender, which you can do for yourself via { receiverLink: { attach: { receiverSettleMode: 1 } } } Where did that magic "1" come from? Well, that's the value from the spec, but you could use the constant we've defined at require('amqp10').Constants.receiverSettleMode.settleOnDisposition

  3. Sets the Link's credit renewal policy to a custom method that renews only when the link credit is below the threshold and we've settled some messages. You can do this yourself by using your own custom method:

{
  receiverLink: {
    credit: function (link, options) {
      // If the receiver link was just connected, set the initial link credit to the quantum. Otherwise, give more credit for every message we've settled.
      var creditQuantum = (!!options && options.initial) ? link.policy.creditQuantum : link.settledMessagesSinceLastCredit;
      if (creditQuantum > 0 && link.linkCredit < threshold) {
        link.addCredits(creditQuantum);
      }
    }
  }
}

Note that once you've set the policy to not auto-settle messages, you'll need to settle them yourself. We've tried to make that easy by providing methods on the receiver link for each of the possible "disposition states" that AMQP allows:

All of these methods accept an array of messages, allowing you to settle many at once.

Plugins

The amqp10 module now supports pluggable Client behaviors with the exported use method. Officially supported plugins include:

Supported Servers

We are currently actively running integration tests against the following servers:

  1. Azure EventHubs
  2. Azure ServiceBus Queues and Topics
  3. Apache Qpid C++ broker (qpidd)

We have been tested against the following servers, but not exhaustively so issues may remain:

  1. ActiveMQ (open issue related to ActiveMQ ignoring the auto-settle setting and disposition frames may cause messages to re-deliver or stop sending after a certain period)
  2. RabbitMQ with the amqp 1.0 experimental extension
  3. Apache Qpid Java broker

If you find any issues, please report them via GitHub.

Todos and Known Issues

  1. Disposition support is incomplete in that we don't send proper "unsettled" information when re-attaching links.
  2. There are some AMQP types we don't process - notably the Decimal23/64/128 types. These are unused by the protocol, and no-one seems to be using them to convey information in messages, so ignoring them is likely safe.

Implementation Notes