Home

Awesome

Build Status

PO Box

High throughput Erlang applications often get bitten by the fact that Erlang mailboxes are unbounded and will keep accepting messages until the node runs out of memory.

In most cases, this problem can be solved by imposing a rate limit on the producers, and it is recommended to explore this idea before looking at this library.

When it is impossible to rate-limit the messages coming to a process and that your optimization efforts remain fruitless, you need to start shedding load by dropping messages.

PO Box can help by shedding the load for you, and making sure you won't run out of memory.

The Principles

PO Box is a library that implements a buffer process. Erlang processes will receive their messages locally (at home), and may become overloaded because they have to both deal with their mailbox and day-to-day tasks:

         messages
            |
            V
+-----[Pid or Name]-----+
|      |         |      |
|      | mailbox |      |
|      +---------+      |
|       |               |
|    receive            |
+-----------------------+

A PO Box process will be where you will ask for your messages to go through. The PO Box process will implement a buffer (see Types of Buffer for details) that will do nothing but churn through messages and drop them when the buffer is full for you.

Depending on how you use the API, the PO Box can tell you it received new data, so you can then ask for the data, or you can tell it to send the data to you directly, without notification:

                                                  messages
                                                     |
                                                     V
+---------[Pid]---------+                +--------[POBox]--------+
|                       |<-- got mail ---|      |         |      |
|                       |                |      | mailbox |      |
|   <important stuff>   |--- send it! -->|      +---------+      |
|                       |                |       |               |
|                       |<---<messages>--|<---buffer             |
+-----------------------+                +-----------------------+

To be more detailed, a PO Box is a state machine with an owner process (which it receives messages for), and it has 3 states:

The passive state basically does nothing but accumulate messages in the buffer and drop them when necessary.

The notify state is enabled by the user by calling the PO Box. Its sole task is to verify if there is any message in the buffer. If there is, it will respond to the PO Box's owner with a {mail, BoxPid, new_data} message sent directly to the pid. If there is no message in the buffer, the process will wait in the notify state until it gets one. As soon as the notification is sent, it reverts back to the passive state.

The active state is the only one that can send actual messages to the owner process. The user can call the PO Box to set it active, and if there are any messages in the buffer, all the messages it contains get sent as a list to the owner. If there are no messages, the PO Box waits until there is one to send it. After forwarding the messages, the PO Box reverts to the passive state.

The FSM can be illustrated as crappy ASCII as:

         ,---->[passive]------(user makes active)----->[active]
         |         | ^                                  |  ^  |
         |         | '---(sends message to user)--<-----'  |  |
         |  (user makes notify)                            |  |
         |         |                                       |  |
(user is notified) |                                       |  |
         |         V                                       |  |
         '-----[notify]---------(user makes active)--------'  |
                     ^----------(user makes notify)<----------'

Types of buffer

Currently, there are three types of built-in buffers supported: queues and stacks, and keep_old queues. You can also provide your own buffer implementation using the pobox_buf behaviour. See samples/pobox_queue_buf.erl for an example implementation.

Queues will keep messages in order, and drop oldest messages to make place for new ones. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [c,d,e].

keep_old queues will keep messages in order, but block newer messages from entering, favoring keeping old messages instead. If you have a buffer of size 3 and receive messages a, b, c, d, e in that order, the buffer will contain messages [a,b,c].

Stacks will not guarantee any message ordering, and will drop the top of the stack to make place for the new messages first. for the same messages, the stack buffer should contain the messages [e,b,a].

To choose between a queue and a stack buffer, you should consider the following criterias:

More buffer types could be supported in the future, if people require them.

How to build it

./rebar compile

How to run tests

./rebar compile ct

How to use it

Start a buffer with any of the following:

start_link(OwnerPid, MaxSize, BufferType)
start_link(OwnerPid, MaxSize, BufferType, InitialState)
start_link(Name, OwnerPid, MaxSize, BufferType)
start_link(Name, OwnerPid, MaxSize, BufferType, InitialState)
start_link(#{
    name => Name,
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => HeirPid,
    heir_data => HeirData
})
start_link(Name, #{
    owner => OwnerPid,
    max => MaxSize, %% mandatory
    type => BufferType,
    initial_state => InitialState,
    heir => Heir,
    heir_data => HeirData
})

Where:

The buffer can be made active by calling:

pobox:active(BoxPid, FilterFun, FilterState)

The FilterFun is a function that will take messages one by one along with custom state and can return:

A function that would blindly forward all messages could be written as:

fun(Msg, _) -> {{ok,Msg},nostate} end

A function that would limit binary messages by size could be written as:

fun(Msg, Allowed) ->
    case Allowed - byte_size(Msg) of
        N when N < 0 -> skip;
        N -> {{ok, Msg}, N}
    end
end

Or you could drop messages that are empty binaries by doing:

fun(<<>>, State) -> {drop, State};
   (Msg, State) -> {{ok,Msg}, State}
end.

The resulting message sent will be:

{mail, BoxPid, Messages, MessageCount, MessageDropCount}

Finally, the PO Box can be forced to notify by calling:

pobox:notify(BoxPid)

Which is objectively much simpler.

Messages can be sent to a PO Box by calling pobox:post(BoxPid, Msg) or sending a message directly to the process as BoxPid ! {post, Msg}.

The ownership of the PO Box can be transfered to another process by calling:

pobox:give_away(BoxPid, DestPid, DestData, Timeout)

or

pobox:give_away(BoxPid, DestPid, Timeout)

which is equivalent to:

pobox_give_away(BoxPid, DestPid, undefined, Timeout)

The call should return true on success and false on failure. Note that you can only call this from within the owner process otherwise the call always fails. If DestData is not provided it will be sent as undefined in the pobox_transfer message.

The destination process should receive a message of the following form:

{pobox_transfer, BoxPid, PreviousOwnerPid, DestData | undefined, give_away}

Example Session

First start a PO Box for the current process:

1> {ok, Box} = pobox:start_link(self(), 10, queue).
{ok,<0.39.0>}

We'll also define a spammer function that will just keep mailing a bunch of messages:

2> Spam = fun(F,N) -> pobox:post(Box,N), F(F,N+1) end.
#Fun<erl_eval.12.17052888>

Because we're in the shell, the function takes itself as an argument so it can both remain anonymous and loop. Each message is an increasing integer.

I can start the process and wait for a while:

3> Spammer = spawn(fun() -> Spam(Spam,0) end).
<0.42.0>

Let's see if we have anything in our PO box:

4> flush().
Shell got {mail, <0.39.0>, new_data}
ok

Yes! Let's get that content:

5> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
6> flush().
Shell got {mail,<0.39.0>,
                [778918,778919,778920,778921,778922,778923,778924,778925,
                 778926,778927],
                10,778918}
ok

So we have 10 messages with seqential IDs (we used a queue buffer), and the process kindly dropped over 700,000 messages for us, keeping our node's memory safe.

The spammer is still going and our PO Box is in passive mode. Let's cut to the chase and go directly to the active state:

7> pobox:active(Box, fun(X,ok) -> {{ok,X},ok} end, ok).
ok
8> flush().
Shell got {mail,<0.39.0>,
                [1026883,1026884,1026885,1026886,1026887,1026888,1026889,
                 1026890,1026891,1026892],
                10,247955}
ok

Nice. We can go back to notification mode too:

9> pobox:notify(Box).
ok
10> flush().
Shell got {mail, <0.39.0>, new_data}
ok

And keep going on and on and on.

Notes

Contributing

Accepted contributions need to be non-aesthetic, and provide some new functionality, fix abstractions, improve performance or semantics, and so on.

All changes received must be tested and not break existing tests.

Changes to currently untested functionality should ideally first provide a separate commit that shows the current behaviour working with the new tests (or some of the new tests, if you expand on the functionality), and then your own feature (and additional tests if required) in its own commit so we can verify nothing breaks in unpredictable ways.

Tests are written using Common Test. PropEr tests will be accepted, because they objectively rule. Ideally, you will wrap your PropEr tests in a Common Test suite so we can run everything with one command.

If you need help, feel free to ask for it in issues or pull requests. These rules are strict, but we're nice people!

Roadmap

This is more a wishlist than a roadmap, in no particular order:

Changelog

Authors / Thanks