Awesome
Skiff
![Dependencies] (https://david-dm.org/pgte/skiff-algorithm.png) ![Gitter](https://badges.gitter.im/Join Chat.svg)
Abstract Node.js implementation of the Raft Consensus Algorithm.
If you're looking for a directly usable module, take a look at skiff (on top of LevelDB + Msgpack).
Contents:
Install
$ node install skiff --save
Require
var Node = require('skiff');
Create a node
var node = Node();
or, with options:
var options = {
// ...
};
var node = Node(options);
Node create options
id
: id of the node. if not defined, it's self assigned. accessible onnode.id
standby
: if true, will start at thestandby
state instead of thefollower
state. In thestandby
state the node only waits for a leader to send commands. Defaults tofalse
.cluster
: the id of the cluster this node will be a part oftransport
: the transport to communicate with peers. See the transport APIpersistence
: the node persistence layer. See the persistence APIuuid
: function that generates a UUID. Defaults to using thecuid
package.heartbeatInterval
: the interval between heartbeats sent from leader. defaults to 50 ms.minElectionTimeout
: the minimum election timeout. defaults to 150 ms.maxElectionTimeout
: the maximum election timeout. defaults to 300 ms.commandTimeout
: the maximum amount of time you're willing to wait for a command to propagate. Defaults to 3 seconds. You can override this in each command call.retainedLogEntries
: the maximum number of log entries that are committed to the state machine that should remain in memory. Defaults to 50.metadata
: to be used by plugins if necessary
Node API
.listen(options, listener)
Makes the peer listen for peer communications. Takes the following arguments:
options
- connection options, depends on the transport provider being used.listener
- a function with the following signature:function (peerId, connection)
. The arguments for the listener function are:peerId
- the identification of the peerconnection
- a connection with the peer, an object implementing the Connection API (see below).
.join(peer, [peerMetadata], cb)
Joins a peer into the cluster.
node.join(peer, cb);
The peer is a string describing the peer. The description depends on the transport you're using.
.leave(peer, cb)
Removes a peer from the cluster,
node.leave(peer, cb);
The peer is a string describing the peer. The description depends on the transport you're using.
.command(command[, options], callback)
Appends a command to the leader log. If node is not the leader, callback gets invoked with an error. Example:
node.command('some command', function(err) {
if (err) {
if (err.code == 'ENOTLEADER') {
// redirect client to err.leader
}
} else {
console.log('cluster agreed on this command');
}
});
This command times out after the options.commandTimeout
passes by, but you can override this by passing in some options:
node.command('some command', {timeout: 5000}, function(err) {
if (err) {
if (err.code == 'ENOTLEADER') {
// redirect client to err.leader
}
} else {
console.log('cluster agreed on this command');
}
});
Command options are:
timeout
: maximum time waiting to replicate to a majority. Defaults to nodeoptions.commandTimeout
, which defaults to to 3000 (3 seconds).waitForNode
: node id to wait to commit to. This may be useful to enforce read-your-writes on proxying clients. Defaults toundefined
.
.peerMeta(url)
Returns the peer metadata if the peer is known.
Events
A node emits the following events that may or not be interesting to you:
error(error)
- when an unexpected error occurs.state(stateName)
- when a new state transition occurs. Possible values forstateName
are:idle
,follower
,candidate
,leader
.loaded()
- when a node has loaded configuration from persistence provider.election timeout()
- when an election timeout occurs.applied log(logIndex)
- when a node applies a log entry to the state machine
Plugins
Skiff if failry high-level and doesn't implement the network transport or the persistence layers. Instead, you have to provide an implementation for these.
Transport provider API
The node transport
option accepts a provider object that implements the following interface:
connect(localNodeId, options)
— for connecting to the peer. returns a connection object. ThelocalNodeId
argument contains the local node id.listen(localNodeId, options, fn)
— for listening to incoming connection requests. Thefn
argument is a function with the signaurefunction (peerId, connection)
that gets invoked when there is a connection request, passing in a connection object that implements the Connection API (see below). ThelocalNodeId
argument contains the local node id.
Connection API
The connection API implements the following interface:
send(type, arguments, callback)
— for making a remote call into the peer. Thecallback
argument is a function with the signaturefunction (err, result)
.receive(fn)
— listen for messages from the remote peer. Thefn
argument is a function with the signaturefunction (type, args, cb)
.cb
is a function that accepts the reply arguments.close(callback)
— for closing the connection. Thecallback
argument is a function with the signaturefunction (err)
.
The connection object is an EventEmitter, emitting the following events:
close
- once the connection closes
Persistence provider API
The node persistence
option accepts a provider object that implements the following interface:
saveMeta(nodeId, state, callback)
— saves the raft engine metadata.nodeId
is a string that represents the current node.state
is an arbitrary object (hash map) andcallback
is a function with the signaturefunction callback(err)
;loadMeta(nodeId, callback)
— loads the engine metadata state.callback
is a function with the signaturefunction callback(err, state)
;applyCommand(nodeId, commitIndex, command, callback)
- applies a command to the node state machine.- Persistence layer should save the commitIndex if it wants to make sure that log entries are not repeated.
- Saving this should be atomic: the
commitIndex
and the log application to the state machine should be successful or fail entirely. - If the commitIndex has already been applied in the past, just callback with success.
callback
is a function with the following signature:function callback(err)
.
lastAppliedCommitIndex(nodeId, callback)
- returns the lastcommitIndex
that was successfully applied to the node state machine.- is asynchronous:
callback
is a function invoked once the result is ready callback
is a function with the following signature:function(err, commitIndex)
- if operation resulted in error,err
contains an error object. Otherwise,commitIndex
may contain an integer with the index of the latest appliedcommitIndex
if there was one.
- is asynchronous:
saveCommitIndex(nodeId, commitIndex, callback)
- saves only the commit indexcreateReadStream(nodeId)
- returns a read stream that streams all the state machine data.createWriteStream(nodeId)
- resets the state machine and returns a write stream to overwrite all the state machine data.removeAllState(nodeId, callback)
- remove all state for the given node
Cluster Setup
Setting up a Skiff cluster can be kind of tricky. To avoid partitions you will need to start with a node that will become leader and then add the followers in the standby mode. Mind you that you can only send join
commands to a leader node (to avoid partitions — it's all explained in detail in the Raft paper). Once this is done and persisted you should never need to do this again since the nodes will know each other and elect a leader at random if leader goes down.
So typically the bootstrap code for the leader would be something like:
var Node = require('skiff');
var leader = Node({
transport: transport,
persistence: persistence
});
leader.listen(address);
/// wait for the leader node to actually become a leader of it's one node
leader.once('leader', function() {
leader.join('node1');
leader.join('node2');
});
leader.on('joined', function(peer) {
console.log('leader joined %s', peer.id);
});
The follower bootstrapping code would look something like this:
var Node = require('skiff');
var node = Node({
transport: transport,
persistence: persistence,
standby: true // important
});
node.listen(address);
This makes the follower start in the standby mode.
As mentioned, once the cluster enters stationary mode you just need to bootstrap all the nodes in the same way:
var Node = require('skiff');
var node = Node({
transport: transport,
persistence: persistence,
});
node.listen(address);
License
ISC
© Pedro Teixeira