Home

Awesome

Introduction

NkCLUSTER is a framework for creating clusters of Erlang nodes of any size, and distributing and managing jobs into them. It uses its own cluster management solution, based on NkDIST, riak_core and a custom distribution protocol. NkCLUSTER is one of the core pieces of the upcoming NetComposer platform, but can be used on its own.

Standard Erlang clusters are very convenient and easy to use, but they have some important limitations:

NkCLUSTER allows the creation of clusters with a very large number of nodes. It uses a hybrid approach. Any node in the cluster can have two different roles:

All of the nodes of the cluster run the same Erlang application (nkcluster), but the first 'N' nodes in the cluster are primary nodes (they have control and worker roles), and the rest of the nodes are secondary nodes (they only have the worker role). N must be power of two, typically 16, 32, 64 or 128. Primary nodes create a riak_core cluster among them. The set of all primary nodes is called the primary cluster, a subset of the whole cluster.

Full nodes talk among them using standard Erlang distribution protocol. From the NkCLUSTER point of view, worker nodes talk only with their node proxy, placed at some control node, using TCP, TLS, SCTP, WS or WSS transports (they can of course talk with other worker nodes or whatever they want to using other means) . NkCLUSTER is designed to allow worker nodes to be deployed at WAN distances from control nodes (for example, at different cloud providers). They can be very firewall-friendly, for example using websockets transports on port 80 or 443, and in some circumstances, without any opened incoming port. However, all control nodes should still belong to the same LAN.

NkCLUSTER uses the concepts of requests, tasks, events and job classes. For each worker node, a node proxy process is started at some specific control node, and can be used to send requests and tasks to its managed worker node. Requests are short-lived RPC calls. Tasks are long-living Erlang processes running at a worker node and managed from a control process at a control node. Events are pieces of information sent from a worker node to its node proxy. All of them must belong to a previously defined job_class.

Any node in the cluster can have a set of labels or metadata associated with it, with an url-like format. For example, core;group=group1;master, meta2;labelA=1;labelB=2 would add two metadata items, core and meta2, each with some keys (master, labelA...) and, optionally, a value the key (group1, 2...). This metadata can be used for any purpose, for example to group nodes or decide where to put an specific Task in the cluster.

NkCLUSTER includes some out-of-the-box tools like metadata management, hot remote loading (or updating) of Erlang code, launching OS commands, Docker management, etc. NkCLUSTER allows on the fly addition and removal of nodes (both control and workers) and it is designed to allow jobs to be developed so that service is not disrupted at any moment.

NkCLUSTER scales seamlessly, from a single machine to a 10-20 control+worker nodes cluster, all the way to a huge cluster, where, for example, 50-100 control nodes manage thousands or tens of thousands of worker nodes. In a future version, NkCLUSTER will allow dynamic creation of nodes in public clouds like Amazon or Azure or private ones like OpenStack.

NkCLUSTER requires Erlang R17+.

Quick Start

git clone https://github.com/Nekso/nkcluster.git
make

Then, you can start five different consoles, and start five nodes, one at each: make dev1, make dev2, make dev3, make dev4 and make dev5.

Nodes 1, 2 and 3 are control/worker nodes. Nodes 4 and 5 are worker nodes. The cluster should discover everything automatically, but you must deploy the cluster plan. At any node of 1, 2 or 3:

> nkcluster_nodes:get_nodes().
[<<"dev1">>,<<"dev2">>,<<"dev3">>,<<"dev4">>,<<"dev5">>]

> nkcluster_nodes:get_node_info(<<"dev4">>).
{ok, #{id=><<"dev4">>, ...}}

> nkcluster:call(<<"dev3">>, erlang, node, [], 5000). 
{reply, 'dev3@127.0.0.1'}

Use cases and deployment scenarios

NkCLUSTER is designed to solve an specific type of distributed work problem. It is probably useful for other scenarios, but it is specially well suited for the following case, where any job class is a compound of up to three layers:

NkCLUSTER includes full support for managing OS processes (using NkLIB) and for managing Docker containers (using NkDOCKER). The NkCLUSTER recommended approach is having, at the worker nodes, a local Erlang application monitoring processes and containers, and sending high-level, aggregated information to the control cluster, instead of sending raw stdout or similar low-level information. This way, even if the connection is temporarily lost, the worker node can continue working at some extend, while the connection to the control nodes is re-established.

This architecture is quite well suited for many real case scenarios, for example:

Depending on the configuration, several possible scenarios are supported:

Startup and Discovery

The same Erlang application (nkcluster) is used at control (control+worker nodes actually) nodes and worker-only nodes. When configuring a new node you must supply:

If a set of cluster discovery addresses are configured, the node will extract from them all transports, IP addresses and ports, it will randomize the list and try to connect to each one, until one accepts the connection. You should include all planned control nodes, DNS addresses returning several addresses, etc, for example:

{cluster_addr, "nkcluster://my_domain, nkcluster://10.0.0.1:1234;transport=wss"}

It this example, the node will extract all IP addresses and ports from my_domain (using DNS), and will add 10.0.0.1:1234 using wss transport to the list. It will then randomize the list and start trying to connect to each one in turn.

The password for this specific node and tls options can be included in the url:

{cluster_addr, "nkcluster://10.0.0.1:1234;transport=wss;password=pass1;tls_depth=2"}

NkPACKET supports the following DNS records for discovery:

example.com NAPTR 10 100 "S" "NKS+D2W" "" _nks._ws.example.com. example.com NAPTR 10 200 "S" "NKS+D2T" "" _nks._tcp.example.com. example.com NAPTR 10 300 "S" "NK+D2W" "" _nk._ws.example.com. example.com NAPTR 10 400 "S" "NK+D2T" "" _nk._tcp.example.com. example.com NAPTR 10 500 "S" "NK+D2S" "" _nk._sctp.example.com. ``` NkCLUSTER will first try WSS transport (resolving _nks._ws.example.com as a SRV domain find IPs and ports), then TLS, then WS, then TCP and finally SCTP as a last resort.

_nk._tcp.example.com. 86400 IN SRV 0 5 1972 cluster.example.com. ``` NkCLUSER will try to resolve cluster.example.com, taking all IPs from there and using port 1972.

The receiving control node will accept the connection, and both parties will authenticate each other, sending a challenge (the node UUID, a random string auto-generated at boot) that must be signed using the local password, with PBKDF2 protocol. If everything is ok, the control node will select the right node to host the node proxy process and it will start it there. If the selected node is different, the node proxy will try to start a direct connection to the worker, if possible.

Using this node proxy process, you can send requests and start tasks at the worker node. Worker nodes will also send periodic information about its state (status, cpu, memory, etc.). If the connection fails, the control process will try to set it up again. Worker nodes will also try to connect again by themselves if no control node contacts them in a while. In some cases (like a network split) a single worker node could be connected to several proxy processes at different nodes, but this should be a temporary situation, resolved once the cluster converges again.

Operation

Based on riak_core, NkCLUSTER allows the addition and removal of nodes at any moment.

When a new control node is added, it automatically discovers and joins the riak_core cluster. While the cluster is reorganized, node proxy processes are relocated to be evenly distributed among the cluster again. When a node is removed, the opposite happens, also automatically.

Worker nodes can also be added and removed at any moment, and the changes are recognized automatically by the control nodes. Worker nodes can operate at the following states:

Requests are allowed in ready, standby, stopping and stopped states.

Requests, Tasks and Job Classes

Before sending any request or task to a worker node, you must define a job_class module, implementing the nkcluster_job_class behaviour. This callback module must be available a both sides (control and worker). You must implement the following callbacks:

NameSideDescription
request/2WorkerCalled at the worker node when a new request must be processed. Must send an immediate reply.
task/2WorkerCalled at the worker node when a new task is asked to start. Must start a new process and return its pid() and, optionally, an immediate response. The task can send events at any moment.
command/3WorkerCalled at the worker when a new command is sent to an existing task. Must return an immediate reply.
status/2WorkerCalled at the worker when the node status changes. If the status changes to stopping, the task should stop as soon as possible.
event/2ControlCalled at the control node when a task at the worker side sends an event back.

You can send your own Erlang module (or modules) to the remote side, over the wire, using nkcluster:load_module/3 or nkcluster:load_modules/3.

Once defined your job_class module, you can send requests, start tasks and send messages to tasks. Tasks can send events back to the job class.

You can send requests calling nkcluster:request/3,4. The callback request/2 will be called at the remote (worker) side, and your call will block until a response is sent back. You can define a timeout, and also ask NkCLUSTER to start a new, exclusive connection to the worker if possible. If the connection or the node fails, an error will be returned. If you are not asking for a new, exclusive connection, you request processing time must be very short (< 100 msecs). Otherwise, the periodic ping will be delayed and the connection may be dropped.

For long-running jobs, you must start a new task, calling nkcluster:task/3,4. The callback task/2 will be called at the remote side, and it must start a new Erlang process and return its pid() and, optionally, a reply. A job_id is returned to the caller, along with the reply if sent. You can send mesages to any started task calling nkcluster_jobs:command/4,5.

The started task can send events back to the control node at any moment, calling nkcluster_jobs:send_event/2. The event will arrive at the control node, and the callback event/2 will be called for the corresponding job class. NkCLUSTER will also send some automatic events:

EventDescription
{nkcluster, {task_started, TaskId}}The task has successfully started.
{nkcluster, {task_stopped, TaskId, Reason}}The task has stopped (meaning the Erlang process has stopped).
{nkcluster, {node_status, nkcluster:node_status()}}The node status have changed. If it changes to not_connected some event may have been lost.

Built-in requests

NkCLUSTER offers some standard utility requests out of the box, available in the nkcluster module:

NameDesc
get_info/0Gets current information about the node, including its status
set_status/1Changes the status of the node
get_tasks/1Gets all tasks running at a worker node
get_tasks/2Gets all tasks running at a worker node and belonging to a job class
get_meta/1Gets the current metadata of a remote node
update_meta/2Updates the metadata at a remote node
get_data/2Stored a piece of data at the remote registry
put_data/3Updates a piece of data at the remote registry
call/5Calls an Erlang function at a remote node
spawn_call/5Calls an Erlang function at a remote node, not blocking the channel
send_file/3Sends a file to a remote node. For files > 4KB, it switches to send_bigfile/3.
send_bigfile/3Sends a file to a remote node, starting a new connection (if possible), in 1MByte chunks
load_module/2Sends a currently loaded Erlang module and loads it a the remote node
load_modules/2Sends a currently loaded Erlang set of modules and loads them to the remote node

Failure scenarios

Connection failure

When the main connection of a node proxy to its managed worker node fails, the following things happen:

Requests can start a secondary, exclusive connection. In this case, the failure of the main connection does not affect them (but affect their events). If this exclusive connection is the one who fails, the request will fail in the same manner described above.

Node failure

If the node proxy process fails (because of the failure of the whole control node or a bug), all pending requests will also fail (since they are using gen_server:call/3 under the hood). The worker node will detect this and try to announce itself again. The control cluster will then start a new node proxy process at the same (if again alive) or other node.

If the worker node process fails (because of the failure of the whole worker node or a bug), the control process will enter into not_connected state, notifying all detected job classes as described before. If the worker node no longer exists, you must call nkcluster_nodes:stop(NodeId) to remove it.

Configuration

NkCLUSTER uses standard Erlang application environment variables. The same Erlang application is used for agents and controllers.

OptionTypeDefaultDesc
cluster_nameterm()"nkcluster"Nodes will only connect to other nodes in the same cluster
cluster_addrnklib:user_uri()""List of control nodes to connect to (see above)
password`string()binary()`"nkcluster"
meta`string()binary()`""
is_controlboolean()trueIf this node has the control role
listennklib:user_uri()"nkcluster://all;transport=tlsList of addresses, ports and transports to listen on (see above)
tls_certfilestring()-Custom certificate file
tls_keyfilestring()-Custom key file
tls_cacertfilestring()-Custom CA certificate file
tls_passwordstring()-Password fort the certificate
tls_verifyboolean()falseIf we must check certificate
tls_depthinteger()0TLS check depth