Home

Awesome

kafka_protocol

Kafka protocol library for Erlang/Elixir

This library provides:

See brod for a complete kafka client implementation.

Compression Support

Since 4.0, this lib no longer includes snappyer and lz4b as rebar dependencies. However kafka_protocol still defaults to use snappyer and lz4b_frame for compress and decompress.

Provide compression module overrides

User may override default compression libs with modules having below APIs implemented:

-callback compress(iodata()) -> iodata().
-callback decompress(binary()) -> iodata().

There are two approaches to inject such dynamic dependencies to kakfa_protocol:

Set application environment

e.g. Set {provide_compression, [{snappy, my_snappy_module}, {lz4, my_lz4_module}]} in kafka_protocol application environment, (or provide from sys.config).

Call kpro:provide_compression

e.g. kpro:provide_compression([{snappy, my_snappy_module}, {lz4, my_lz4_module}]).

Test (make eunit)

To make a testing environment locally (requires docker) run make test-env. To test against a specific kafka version (e.g. 0.9), set environment variable KAFKA_VERSION. e.g. export KAFKA_VERSION=0.9

To test with an existing kafka cluster set below environment variables:

Connecting to kafka 0.9

The api_versions API was introduced in kafka 0.10. This API can be used to query all API version ranges. When connecting to kafka, kpro_connection would immediately perform an RPC of this API and cache the version ranges in RPC reply in its looping state. When connecting to kafka 0.9, query_api_versions config entry should be set to false otherwise the socket will be closed by kafka.

Schema Explained

The schemas of all API requests and respones can be found in src/kpro_schema.erl which is generated from priv/kafka.bnf.

The root level schema is always a struct. A struct consists of fields having lower level (maybe nested) schema

Struct fields are documented in priv/kafka.bnf as comments, but the comments are not generated as Erlang comments in kpro_schema.erl

Take produce API for example

req(produce, V) when V >= 0, V =< 2 ->
  [{acks,int16},
   {timeout,int32},
   {topic_data,{array,[{topic,string},
                       {data,{array,[{partition,int32},
                                     {record_set,records}]}}]}}];

It is generated from below BNF block.

ProduceRequestV0 => acks timeout [topic_data]
  acks => INT16
  timeout => INT32
  topic_data => topic [data]
    topic => STRING
    data => partition record_set
      partition => INT32
      record_set => RECORDS

Code Generation

Schema code is generated from JAVA class org.apache.kafka.common.protocol.Protocol Generated code are committed to the git repo, there is usually no need to re-generate the code unless there are changes in code-generation scripts or supporting a new kafka version.

How to Generate priv/kafka.bnf

Ensure you have JDK (1.7+) and gradle (2.0+) installed. Change kafka version in priv/kafka_protocol_bnf/build.gradle if needed.

make kafka-bnf

How to Generate src/kafka_schema.erl

make gen-code