Awesome
kafboy
a low latency http server for writing to kafka. Optimized for heavy loads, hundreds of partition workers, supports batching, and more. Written in Erlang. Powered by ekaf
and Cowboy
see https://github.com/helpshift/ekaf for more information
Architecture
With 0.8, Kafka clients take greater responsibility of deciding which broker and partition to publish to for a given topic.
kafboy is a http wrapper over the ekafka client, that takes care of routing http requests to the right kafka broker socket. kafboy is self-aware over a cluster, and supoprts nodes routing requests arriving on any node, to the right process in the cluster.
Simply send a POST with the desired JSON, to one of the following paths
Fire and forget
% fire and forget asynchronous call. the event is immediately send to kafka asynchronously
POST /async/topic
Synchronous calls
% synchronous call that returns with the response after sending to kafka
% `NOTE: a reply is sent until after kafka resonds, so is not recommended for low latency needs`
POST /sync/topic
Batching
% will be added to a queue, and sent to the broker in a batch.
% batch size, and flush timeout are configurable
POST /batch/async/topic
The payload is expected to be of the JSON format, but this can be configured to send the data as is. Very little else is done by this server in terms of dealing with kafka. It simply calls ekafka's produce function.
Configuring kafboy
{kafboy,[
% optional. you get to edit the json before it goes to kafka over here
{kafboy_callback_edit_json, {my_module, massage_json}},
% M:F({post, Topic, Req, Json, Callback}) will be called. return with what you want to send to kafka
% if an error occurs M:F({error, StatusCode, Message}) wil be called
% optional.
{kafboy_load_balancer, "http://localhost:8080/disco"}
% should return plaintext of a node name with the right cookie eg: `node2@some-host`
% can be used to distribute work to other nodes if ekaf thinks this one is too busy
% optional, see more in kafboy_app.erl
{kafboy_routes_async_batch,["/1/foo/:topic"]},
{kafboy_routes_async,[]},
{kafboy_routes_sync,[]}
]}
In this example, you have to implement my_module:massage_json/1, on the lines of
massage_json({post, Topic, Req, Body, Callback})->
Callback ! { edit_json_callback, Topic, Body }.
Here is a more elaborate example:
%% Let's check for the contents of Body
%% and if its valid, add an extra field
%% and then submit to kafka
massage_json({post, Topic, _Req, Body, CallbackPid})->
case Body of
[{<<"hello">>, Foo}] ->
% either reply like this
CallbackPid ! { edit_json_callback, Topic, Foo };
[] ->
CallbackPid ! { edit_json_callback, {error, <<Topic/binary,".insufficient">>}};
_ ->
%% i want to first reply
CallbackPid ! { edit_json_callback, {200, <<"{\"ok\":\"fast reply\"}">>}},
%% then directly call ekaf, adding this msg to a batch
Final = jsx:encode([{<<"extra">>,<<"true">>}| Body]),
ekaf:produce_async_batched(Topic, Final)
end;
massage_json({error, Status, Message}) ->
io:format("~n some ~p error: ~p",[Status, Message]),
ok.
kafboy will handle sending batch requests where the batch size is configurable, disconnections with brokers, and max retries.
To see the API of ekaf, see http://github.com/helpshift/ekaf
Quick start
On terminal 1
git clone https://github.com/helpshift/kafboy
cd kafboy
rebar get-deps compile
erl -pa deps/*/ebin -pa ebin -s kafboy_demo
On terminal 2
curl localhost:9903/batch/async/ekaf -XPOST -d 'test=a'
{"ok":"fast reply"}
curl localhost:9903/batch/async/ekaf -XPOST -d 'hello=a'
{"ok":1}
curl localhost:9903/batch/async/ekaf -XPOST
{"error":"ekaf.insufficient"}
Configuring ekaf
An example ekaf config
{ekaf,[
% required.
{ekaf_bootstrap_broker, {"localhost", 9091} },
% pass the {BrokerHost,Port} of atleast one permanent broker. Ideally should be
% the IP of a load balancer so that any broker can be contacted
% optional
{ekaf_per_partition_workers,100},
% how big is the connection pool per partition
% eg: if the topic has 3 partitions, then with this eg: 300 workers will be started
% optional
{ekaf_max_buffer_size, [{<<"topic">>,10000}, % for specific topic
{ekaf_max_buffer_size,100}]}, % for other topics
% how many events should the worker wait for before flushing to kafka as a batch
% optional
{ekaf_partition_strategy, random}
% if you are not bothered about the order, use random for speed
% else the default is ordered_round_robin
]},
To see how to configure the number of workers per topic+partition, the buffer batch size, buffer flush ttl, and more see the extensive README for ekaf
https://github.com/helpshift/ekaf
License
Copyright 2014, Helpshift, Inc.
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
Add a feature request at https://github.com/helpshift/ekaf or check the ekaf web server at https://github.com/helpshift/kafboy