Home

Awesome

enm is an Erlang port driver that wraps the nanomsg C library, allowing Erlang systems to communicate with other nanomsg endpoints. enm supports idioms and approaches common to standard Erlang networking facilities such as gen_tcp and gen_udp.

enm is currently based on version 1.0.0 of nanomsg, and enm itself is new, so its features are experimental and subject to change.

Starting and Stopping

You can start enm as a normal application, using application:start(enm) and application:stop(enm). You can also call enm:start_link/0 or enm:start/0, and call enm:stop/0 to stop it.

Just Open a Socket

enm supports all nanomsg scalability protocols and transports. You can open a socket providing a particular scalability protocol using functions named for each protocol. For example, the enm:pair/0 function opens a pair-type socket for one-to-one communication, and the enm:req/0 and enm:rep/0 functions open the request and reply ends, respectively, of the reqrep scalability protocol. The arity 0 versions of the enm scalability protocol functions listed below use default settings for the open sockets, while the arity 1 versions allow a list of socket options to be passed in order to control socket settings.

If successful, these functions — both their arity 0 and arity 1 versions — all return {ok,Socket}.

Once opened, sockets can be bound or connected using the enm:bind/2 or enm:connect/2 functions respectively. Bind and connect information can alternatively be provided via socket options when sockets are first opened via the functions listed above.

Functions

In addition to the scalability protocol functions, enm supports the following functions:

If you're already familiar with standard Erlang networking capabilities, you'll find these functions similar to functions supplied by standard modules such as gen_tcp, gen_udp and inet.

Address Record Types

To help avoid errors with mistyped string and binary address URLs, enm provides three record types you can use for addresses instead:

Using these types, which is completely optional, requires including the enm.hrl file.

Socket Options

enm supports several socket options that can be set either when the socket is opened, or modified later during operation. Most socket options can also be read from enm sockets. enm supports the following options:

Currently, most but not all nanomsg socket options are implemented. Please file an issue or submit a pull request if an option you need is missing.

Examples

These following examples are based on Tim Dysinger's C examples, but they produce somewhat different output. They are all run with inproc addresses, thereby taking advantage of Erlang's lightweight processes rather than using separate OS processes as for Tim's examples (though we could easily do that with Erlang too).

Note also that each example explicitly starts and stops enm — this is for exposition only, and is not something you'd do explicitly in an actual Erlang application. The output shown comes from an interactive Erlang shell, and it assumes enm beam files are on the shell's load path.

You can find the code for these examples in the repository examples directory.

Pipeline

-module(pipeline).
-export([start/0]).

start() ->
    enm:start_link(),
    Url = "inproc://pipeline",
    {ok,Pull} = enm:pull([{bind,Url},list]),
    {ok,Push} = enm:push([{connect,Url},list]),
    Send1 = "Hello, World!",
    io:format("pushing message \"~s\"~n", [Send1]),
    ok = enm:send(Push, Send1),
    receive
        {nnpull,Pull,Send1} ->
            io:format("pulling message \"~s\"~n", [Send1])
    end,
    Send2 = "Goodbye.",
    io:format("pushing message \"~s\"~n", [Send2]),
    ok = enm:send(Push, Send2),
    receive
        {nnpull,Pull,Send2} ->
            io:format("pulling message \"~s\"~n", [Send2])
    end,
    enm:close(Push),
    enm:close(Pull),
    enm:stop().

Here, note the pattern matching in the receive statements where we use the data variables set for the sent messages as the data to be expected to be received. We put each socket into list mode to ensure these pattern matches succeed, given that Send1 and Send2 are strings. Note also that both the type of the socket and the socket itself are part of the received messages, allowing us to use matching to easily distinguish between what each socket is receiving. If these expected patterns did not match what was being sent, the receive statements would wait forever.

Pipeline Results

1> c("examples/pipeline.erl", [{o,"examples"}]).
{ok,pipeline}
2> pipeline:start().
pushing message "Hello, World!"
pulling message "Hello, World!"
pushing message "Goodbye."
pulling message "Goodbye."
ok

Request/Reply

-module(request_reply).
-export([start/0]).

start() ->
    enm:start_link(),
    Url = "inproc://request_reply",
    {ok,Rep} = enm:rep([{bind,Url}]),
    {ok,Req} = enm:req([{connect,Url}]),
    DateReq = <<"DATE">>,
    io:format("sending date request~n"),
    ok = enm:send(Req, DateReq),
    receive
        {nnrep,Rep,DateReq} ->
            io:format("received date request~n"),
            Now = httpd_util:rfc1123_date(),
            io:format("sending date ~s~n", [Now]),
            ok = enm:send(Rep, Now)
    end,
    receive
        {nnreq,Req,Date} ->
            io:format("received date ~s~n", [Date])
    end,
    enm:close(Req),
    enm:close(Rep),
    enm:stop().

This is similar to the pipeline example except that data flows in both directions, and both sockets default to binary mode.

Request/Reply Results

1> c("examples/request_reply.erl", [{o,"examples"}]).
{ok,request_reply}
2> request_reply:start().
sending date request
received date request
sending date Tue, 09 Sep 2014 23:05:26 GMT
received date Tue, 09 Sep 2014 23:05:26 GMT
ok

Pair

-module(pair).
-export([start/0, node/4]).

start() ->
    enm:start_link(),
    Self = self(),
    Url = "inproc://pair",
    spawn(?MODULE, node, [Self, Url, bind, "Node0"]),
    spawn(?MODULE, node, [Self, Url, connect, "Node1"]),
    collect(["Node0","Node1"]).

node(Parent, Url, F, Name) ->
    {ok,P} = enm:pair([{active,3}]),
    {ok,Id} = enm:F(P,Url),
    send_recv(P, Name),
    enm:shutdown(P, Id),
    Parent ! {done,Name}.

send_recv(Sock, Name) ->
    receive
        {_,Sock,Buf} ->
            io:format("~s received \"~s\"~n", [Name, Buf])
    after
        100 ->
            ok
    end,
    case enm:getopts(Sock, [active]) of
        {ok, [{active,false}]} ->
            ok;
        {error, Error} ->
            error(Error);
        _ ->
            timer:sleep(1000),
            io:format("~s sending \"~s\"~n", [Name, Name]),
            ok = enm:send(Sock, Name),
            send_recv(Sock, Name)
    end.

collect([]) ->
    ok;
collect([Name|Names]) ->
    receive
        {done,Name} ->
            collect(Names)
    end.

This code is a little more involved than previous examples because we spawn two child processes that receive and send messages. Note how we use the {active,N} socket mode for each end of the pair to eventually break out of the recursive send_recv/2 function, by using enm:getopts/2 to check for when each socket flips into {active,false} mode.

Pair Results

1> c("examples/pair.erl",[{o,"examples"}]).
{ok,pair}
2> pair:start().
Node0 sending "Node0"
Node1 sending "Node1"
Node0 received "Node1"
Node1 received "Node0"
Node1 sending "Node1"
Node0 sending "Node0"
Node0 received "Node1"
Node1 received "Node0"
Node1 sending "Node1"
Node0 sending "Node0"
Node0 received "Node1"
Node1 received "Node0"
ok

Pub/Sub

-module(pubsub).
-export([start/0]).

-define(COUNT, 3).

start() ->
    enm:start_link(),
    Url = "inproc://pubsub",
    Pub = pub(Url),
    collect(subs(Url, self())),
    enm:close(Pub),
    enm:stop().

pub(Url) ->
    {ok,Pub} = enm:pub([{bind,Url}]),
    spawn_link(fun() -> pub(Pub, ?COUNT) end),
    Pub.
pub(_, 0) ->
    ok;
pub(Pub, Count) ->
    Now = httpd_util:rfc1123_date(),
    io:format("publishing date \"~s\"~n", [Now]),
    ok = enm:send(Pub, ["DATE: ", Now]),
    timer:sleep(1000),
    pub(Pub, Count-1).

subs(Url, Parent) ->
    subs(Url, Parent, ?COUNT, []).
subs(_, _, 0, Acc) ->
    Acc;
subs(Url, Parent, Count, Acc) ->
    {ok, Sub} = enm:sub([{connect,Url},{subscribe,"DATE:"},{active,false}]),
    Name = "Subscriber" ++ integer_to_list(Count),
    spawn_link(fun() -> sub(Sub, Parent, Name) end),
    subs(Url, Parent, Count-1, [Name|Acc]).
sub(Sub, Parent, Name) ->
    case enm:recv(Sub, 2000) of
        {ok,Data} ->
            io:format("~s received \"~s\"~n", [Name, Data]),
            sub(Sub, Parent, Name);
        {error,etimedout} ->
            enm:close(Sub),
            Parent ! {done, Name},
            ok
    end.

collect([Sub|Subs]) ->
    receive
        {done,Sub} ->
            collect(Subs)
    end;
collect([]) ->
    ok.

This code sets up a publisher and 3 subscribers, and the publisher publishes dates to the subscribers. It includes the text "DATE:" in each message, and messages containing that text are what the subscribers are looking to receive. Note the use of {active,false} mode for the subscriber sockets; this is done because the Erlang process that creates the sockets, known as the controlling process for the socket, is not the same process that receives the messages. Only the controlling process can receive messages in an active mode from a socket.

Pub/Sub Results

1> c("examples/pubsub.erl", [{o,"examples"}]).
{ok,pubsub}
2> pubsub:start().
publishing date "Tue, 09 Sep 2014 23:08:10 GMT"
Subscriber3 received "DATE: Tue, 09 Sep 2014 23:08:10 GMT"
Subscriber2 received "DATE: Tue, 09 Sep 2014 23:08:10 GMT"
Subscriber1 received "DATE: Tue, 09 Sep 2014 23:08:10 GMT"
publishing date "Tue, 09 Sep 2014 23:08:11 GMT"
Subscriber3 received "DATE: Tue, 09 Sep 2014 23:08:11 GMT"
Subscriber2 received "DATE: Tue, 09 Sep 2014 23:08:11 GMT"
Subscriber1 received "DATE: Tue, 09 Sep 2014 23:08:11 GMT"
publishing date "Tue, 09 Sep 2014 23:08:12 GMT"
Subscriber3 received "DATE: Tue, 09 Sep 2014 23:08:12 GMT"
Subscriber2 received "DATE: Tue, 09 Sep 2014 23:08:12 GMT"
Subscriber1 received "DATE: Tue, 09 Sep 2014 23:08:12 GMT"
ok

Survey

-module(survey).
-export([start/0]).

-define(COUNT, 3).

start() ->
    enm:start_link(),
    Url = "inproc://survey",
    Self = self(),
    {ok,Survey} = enm:surveyor([{bind,Url},{deadline,3000}]),
    Clients = clients(Url, Self),
    ok = enm:send(Survey, httpd_util:rfc1123_date()),
    get_responses(Survey),
    wait_for_clients(Clients),
    enm:close(Survey),
    enm:stop().

clients(Url, Parent) ->
    clients(Url, Parent, ?COUNT, []).
clients(_, _, 0, Acc) ->
    Acc;
clients(Url, Parent, Count, Acc) ->
    {ok, Respondent} = enm:respondent([{connect,Url},{active,false},list]),
    Name = "Respondent" ++ integer_to_list(Count),
    Pid = spawn_link(fun() -> client(Respondent, Name, Parent) end),
    clients(Url, Parent, Count-1, [Pid|Acc]).

client(Respondent, Name, Parent) ->
    {ok,Msg} = enm:recv(Respondent, 5000),
    Date = httpd_util:convert_request_date(Msg),
    ok = enm:send(Respondent, term_to_binary(Date)),
    io:format("~s got \"~s\"~n", [Name, Msg]),
    Parent ! {done, self(), Respondent}.

get_responses(Survey) ->
    get_responses(Survey, ?COUNT+1).
get_responses(_, 0) ->
    ok;
get_responses(Survey, Count) ->
    receive
        {nnsurveyor,Survey,BinResp} ->
            Response = binary_to_term(BinResp),
            io:format("received survey response ~p~n", [Response]);
        {nnsurveyor_deadline,Survey} ->
            io:format("survey has expired~n")
    end,
    get_responses(Survey, Count-1).

wait_for_clients([Client|Clients]) ->
    receive
        {done,Client,Respondent} ->
            enm:close(Respondent),
            wait_for_clients(Clients)
    end;
wait_for_clients([]) ->
    ok.

This example creates a surveyor, and several respondents connect to it. The {deadline,3000} option used when creating the surveyor socket means respondents have a maximum of 3 seconds to respond to any survey. The surveyor sends out the survey, and then collects responses from each of the respondents. When we hit the survey deadline, the controlling process for the surveyor socket gets a {nnsurveyor_deadline,Socket} message.

Survey Results

1> c("examples/survey.erl", [{o,"examples"}]).
{ok,survey}
2> survey:start().
Respondent3 got "Tue, 09 Sep 2014 23:09:34 GMT"
Respondent2 got "Tue, 09 Sep 2014 23:09:34 GMT"
Respondent1 got "Tue, 09 Sep 2014 23:09:34 GMT"
received survey response {{2014,9,9},{23,9,34}}
received survey response {{2014,9,9},{23,9,34}}
received survey response {{2014,9,9},{23,9,34}}
survey has expired
ok

Bus

-module(bus).
-export([start/0]).

-define(COUNT, 4).

start() ->
    enm:start_link(),
    UrlBase = "inproc://bus",
    Buses = connect_buses(UrlBase),
    Pids = send_and_receive(Buses, self()),
    wait_for_pids(Pids),
    enm:stop().

connect_buses(UrlBase) ->
    connect_buses(UrlBase, lists:seq(1,?COUNT), []).
connect_buses(UrlBase, [1=Node|Nodes], Buses) ->
    Url = make_url(UrlBase, Node),
    {ok,Bus} = enm:bus([{bind,Url},{active,false}]),
    {ok,_} = enm:connect(Bus, make_url(UrlBase, 2)),
    {ok,_} = enm:connect(Bus, make_url(UrlBase, 3)),
    connect_buses(UrlBase, Nodes, [{Bus,Node}|Buses]);
connect_buses(UrlBase, [?COUNT=Node|Nodes], Buses) ->
    Url = make_url(UrlBase, Node),
    {ok,Bus} = enm:bus([{bind,Url},{active,false}]),
    {ok,_} = enm:connect(Bus, make_url(UrlBase, 1)),
    connect_buses(UrlBase, Nodes, [{Bus,Node}|Buses]);
connect_buses(UrlBase, [Node|Nodes], Buses) ->
    Url = make_url(UrlBase, Node),
    {ok,Bus} = enm:bus([{bind,Url},{active,false}]),
    Urls = [make_url(UrlBase,N) || N <- lists:seq(Node+1,?COUNT)],
    [{ok,_} = enm:connect(Bus,U) || U <- Urls],
    connect_buses(UrlBase, Nodes, [{Bus,Node}|Buses]);
connect_buses(_, [], Buses) ->
    Buses.

send_and_receive(Buses, Parent) ->
    send_and_receive(Buses, Parent, []).
send_and_receive([{Bus,Id}|Buses], Parent, Acc) ->
    Pid = spawn_link(fun() -> bus(Bus, Id, Parent) end),
    send_and_receive(Buses, Parent, [Pid|Acc]);
send_and_receive([], _, Acc) ->
    Acc.

bus(Bus, Id, Parent) ->
    Name = "node"++integer_to_list(Id),
    io:format("node ~w sending \"~s\"~n", [Id, Name]),
    ok = enm:send(Bus, Name),
    collect(Bus, Id, Parent).

collect(Bus, Id, Parent) ->
    case enm:recv(Bus, 1000) of
        {ok,Data} ->
            io:format("node ~w received \"~s\"~n", [Id, Data]),
            collect(Bus, Id, Parent);
        {error,etimedout} ->
            Parent ! {done, self(), Bus}
    end.

wait_for_pids([Pid|Pids]) ->
    receive
        {done,Pid,Bus} ->
            enm:close(Bus),
            wait_for_pids(Pids)
    end;
wait_for_pids([]) ->
    ok.

make_url(Base,N) ->
    Base++integer_to_list(N).

In this example consisting of four nodes, each node is connected such that it receives one message from each of the other nodes. Each node binds to one bus address and connects to one or more of the other bus addresses — for example, node 1 connects to nodes 2 and 3, and node 4 connects only to node 1. This example uses {active,false} mode since the Erlang processes calling enm:recv/2 are not the controlling processes for the receiving sockets.

Bus Results

1> c("examples/bus", [{o,"examples"}]).
{ok,bus}
2> bus:start().
node 4 sending "node4"
node 3 sending "node3"
node 2 sending "node2"
node 1 sending "node1"
node 3 received "node4"
node 2 received "node4"
node 1 received "node4"
node 4 received "node3"
node 3 received "node2"
node 2 received "node3"
node 1 received "node3"
node 4 received "node2"
node 3 received "node1"
node 2 received "node1"
node 1 received "node2"
node 4 received "node1"
ok