Home

Awesome

NOTICE

This lib is a fork of the upstream repo disclosed by EMQ Technologies Co., Ltd. GitHub/emqx

Wolff

Kafka's publisher. See why the name

How is it different from brod

More resilient to network and Kafka disturbances

With replayq_dir producer config set to a directory, wolff will queue pending messages on disk so it can survive from message loss in case of application, network or kafka disturbances.

In case of producer restart, messages queued on disk are replayed towards kafka, however, async callback functions are not evaluated upon acknowledgements received from kafka for replayed messages.

More flexible connection management

wolff provides per_partition and per_broker connection management strategy. In case of per_partition strategy, wolff establishes one TCP connection per-partition leader. brod however, only establishes connections per-broker, that is, if two partition leaders happen to reside on the same broker, they will have to share the same TCP connection.

There is still a lack of benchmarking to tell the difference of how performant per_partition is though.

Auto partition count refresh

wolff periodically refreshes topic metata to discover partition increase and automatically rebalance the partitioner.

Example Code

Sync Produce

application:ensure_all_started(wolff).
ClientCfg = #{}.
{ok, Client} = wolff:ensure_supervised_client(<<"client-1">>, [{"localhost", 9092}], ClientCfg).
ProducerCfg = #{replayq_dir => "/tmp/wolff-replayq-1"}.
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg).
Msg = #{key => <<"key">>, value => <<"value">>}.
{Partition, BaseOffset} = wolff:send_sync(Producers, [Msg], 3000).
io:format(user, "\nmessage produced to partition ~p at offset ~p\n",
          [Partition, BaseOffset]).
ok = wolff:stop_producers(Producers).
ok = wolff:stop_client(Client).

If you want to use more than one producer pointing to the same topic, be sure to define an unique alias for each one to avoid clashes.

Topic = <<"test-topic">>.
{ok, Producers1} = wolff:start_producers(Client, Topic, ProducerCfg#{alias => <<"a1">>}).
{ok, Producers2} = wolff:start_producers(Client, Topic, ProducerCfg#{alias => <<"a2">>}).

Async Produce with Callback

application:ensure_all_started(wolff).
ClientCfg = #{}.
{ok, Client} = wolff:ensure_supervised_client(<<"client-2">>, [{"localhost", 9092}], ClientCfg).
ProducerCfg = #{replayq_dir => "/tmp/wolff-replayq-2"}.
{ok, Producers} = wolff:start_producers(Client, <<"test-topic">>, ProducerCfg).
Msg = #{headers => [{<<"foo">>, <<"bar">>}], key => <<"key">>, value => <<"value">>}.
Self = self().
AckFun = fun(Partition, BaseOffset) ->
            io:format(user, "\nmessage produced to partition ~p at offset ~p\n",
                      [Partition, BaseOffset]),
            ok
         end.
wolff:send(Producers, [Msg], AckFun).

For upgrade safety, it's recommended to avoid using anonymous function as ack callback. In production code, the caller should provide a {fun module:handle_ack/3, [ExtraArg]} for AckFun. The handle_ack function should expect first two args as Partition and BaseOffset, and the third arg ExtraArg is served back to the caller. For example.

-export([handle_ack/3]).

send(...) ->
  wolff:send(Producers, Messages, {fun ?MODULE/handle_ack, [self()]}).

handle_ack(Partition, Offset, Caller) ->
  Caller ! {kafka_acked, Partition, Offset},
  ok. % must return ok

Supervised Producers

application:ensure_all_started(wolff).
Client = <<"client-1">>.
ClientCfg = #{}.
{ok, _ClientPid} = wolff:ensure_supervised_client(Client, [{"localhost", 9092}], ClientCfg).
ProducerCfg = #{replayq_dir => "/tmp/wolff-replayq-3"}.
{ok, Producers} = wolff:ensure_supervised_producers(Client, <<"test-topic">>, ProducerCfg).
Msg = #{headers => [{<<"foo">>, <<"bar">>}], key => <<"key">>, value => <<"value">>}.
Self = self().
AckFun = fun(Partition, BaseOffset) ->
            io:format(user, "\nmessage produced to partition ~p at offset ~p\n",
                      [Partition, BaseOffset]),
            ok
         end.
wolff:send(Producers, [Msg], AckFun).

Client Config

Producer Config

Beam Telemetry Hooks

Beam Telemetry is a library for defining telemetry events. Wolff defines such telemetry events. Users of Wolff can attach functions to the events, for example, to record when a message has been successfully sent to Kafka. Wolff's telemetry events are described in the wolff_metrics module. One can read more about how to attach code to the events in Beam Telemetry's documentation. The third parameter of the Beam Telemetry handler function is a meta data map. One can send a custom meta data map for each Kafka producer instance by setting the Kafka producer configuration parameter telemetry_meta_data to the map one wants to use.

How to Test

Start Kafka in docker containers from dokcer-compose.yml in this repo.

docker-compose up -d

License

Apache License Version 2.0