Home

Awesome

Haskakafka

Kafka bindings for Haskell backed by the librdkafka C module. It has been tested and fully supports Kafka 0.9.0.1 using librdkafka 0.9.0.99 and higher on Linux and OS X. Haskakafka supports both producers and consumers with optional batch operations.

Hackage: http://hackage.haskell.org/package/haskakafka

Usage

A quick walkthrough of producers and consumers:

import Haskakafka

import qualified Data.ByteString.Char8 as C8

example :: IO ()
example = do
  let
      -- Optionally, we can configure certain parameters for Kafka
      kafkaConfig = [("socket.timeout.ms", "50000")]
      topicConfig = [("request.timeout.ms", "50000")]

      -- Payloads are just ByteStrings
      samplePayload = C8.pack "Hello world"


  -- withKafkaProducer opens a producer connection and gives us
  -- two objects for subsequent use.
  withKafkaProducer kafkaConfig topicConfig
                    "localhost:9092" "test_topic"
                    $ \kafka topic -> do

    -- Produce a single unkeyed message to partition 0
    let message = KafkaProduceMessage samplePayload
    _ <- produceMessage topic (KafkaSpecifiedPartition 0) message

    -- Produce a single keyed message
    let keyMessage = KafkaProduceKeyedMessage (C8.pack "Key") samplePayload
    _ <- produceKeyedMessage topic keyMessage

    -- We can also use the batch API for better performance
    _ <- produceMessageBatch topic KafkaUnassignedPartition [message, keyMessage]

    putStrLn "Done producing messages, here was our config: "
    dumpConfFromKafka kafka >>= \d -> putStrLn $ "Kafka config: " ++ (show d)
    dumpConfFromKafkaTopic topic >>= \d -> putStrLn $ "Topic config: " ++ (show d)


  -- withKafkaConsumer opens a consumer connection and starts consuming
  let partition = 0
  withKafkaConsumer kafkaConfig topicConfig
                    "localhost:9092" "test_topic"
                    partition -- locked to a specific partition for each consumer
                    KafkaOffsetBeginning -- start reading from beginning (alternatively, use
                                         -- KafkaOffsetEnd, KafkaOffset or KafkaOffsetStored)
                    $ \kafka topic -> do
    -- Consume a single message at a time
    let timeoutMs = 1000
    me <- consumeMessage topic partition timeoutMs
    case me of
      (Left err) -> putStrLn $ "Uh oh, an error! " ++ (show err)
      (Right m) -> putStrLn $ "Woo, payload was " ++ (C8.unpack $ messagePayload m)

    -- For better performance, consume in batches
    let maxMessages = 10
    mes <- consumeMessageBatch topic partition timeoutMs maxMessages
    case mes of
      (Left err) -> putStrLn $ "Something went wrong in batch consume! " ++ (show err)
      (Right ms) -> putStrLn $ "Woohoo, we got " ++ (show $ length ms) ++ " messages"


    -- Be a little less noisy
    setLogLevel kafka KafkaLogCrit

  -- we can also fetch metadata about our Kafka infrastructure
  let timeoutMs = 1000
  emd <- fetchBrokerMetadata [] "localhost:9092" timeoutMs
  case emd of
    (Left err) -> putStrLn $ "Uh oh, error time: " ++ (show err)
    (Right md) -> putStrLn $ "Kafka metadata: " ++ (show md)

Configuration Options

Configuration options are set in the call to withKafkaConsumer and withKafkaProducer. For the full list of supported options, see librdkafka's list.

High Level Consumers

High level consumers are supported by librdkafka starting from version 0.9. High-level consumers have the ability to handle more than one partition and even more than one topic. Scalability and rebalancing are taken care of by librdkafka: once a new consumer in the same consumer group is started the rebalance happens and all consumer share the load.

This version of Haskakafka adds (experimental) support for high-level consumers, here is how such a consumer can be used in code:

import           Haskakafka
import           Haskakafka.Consumer

runConsumerExample :: IO ()
runConsumerExample = do
    res <- runConsumer
              (ConsumerGroupId "test_group")    -- group id is required
              []                                -- extra kafka conf properties
              (BrokersString "localhost:9092")  -- kafka brokers to connect to
              [TopicName "^hl-test*"]           -- list of topics to consume, supporting regex
              processMessages                   -- handler to consume messages
    print $ show res

-- this function is used inside consumer
-- and it is responsible for polling and handling messages
-- In this case I will do 10 polls and then return a success
processMessages :: Kafka -> IO (Either KafkaError ())
processMessages kafka = do
    mapM_ (\_ -> do
                   msg1 <- pollMessage kafka 1000
                   print $ show msg1) [1..10]
    return $ Right ()

Installation

Installing librdkafka

Although librdkafka is available on many platforms, most of the distribution packages are too old to support haskakafka. As such, we suggest you install from the source:

git clone https://github.com/edenhill/librdkafka
cd librdkafka
./configure
make && sudo make install

If the C++ bindings fail for you, just install the C bindings alone.

cd librdkafka/src
make && sudo make install

On Debian and OS X, this will install the shared and static libraries to /usr/local/lib.

Installing Kafka

The full Kafka guide is at http://kafka.apache.org/documentation.html#quickstart

Installing Haskakafka

If you want to use cabal—since haskakafka uses c2hs to generate C bindings—you may need to explicitly install c2hs somewhere on your path (i.e. outside of a sandbox). To do so, run:

cabal install c2hs

Afterwards installation should work, so go for

cabal install haskakafka

This uses the latest version of Haskakafka from Hackage.

Testing

Haskakafka ships with a suite of integration tests to verify the library against a live Kafka instance. To get these setup you must have a broker running on localhost:9092 (or overwrite the HASKAKAFKA_TEST_BROKER environment variable) with a haskakafka_tests topic created (or overwrite the HASKAKAFKA_TEST_TOPIC environment variable).

To get a broker running, download a Kafka distribution and untar it into a directory. From there, run zookeeper using

bin/zookeeper-server-start.sh config/zookeeper.properties

and run kafka in a separate window using

bin/kafka-server-start.sh config/server.properties

With both Kafka and Zookeeper running, you can run tests through stack:

stack test

You can also run tests through cabal:

cabal install --only-dependencies --enable-tests
cabal test --log=/dev/stdout

Running Examples

stack build
stack exec -- basic --help
basic example [OPTIONS]
  Fetch metadata, produce, and consume a message

Common flags:
  -b       --brokers=<brokers>  Comma separated list in format
                                <hostname>:<port>,<hostname>:<port>
  -t       --topic=<topic>      Topic to fetch / produce
  -C       --consumer           Consumer mode
  -P       --producer           Producer mode
  -L       --list               Metadata list mode
  -A       --all                Run producer, consumer, and metadata list
  -p=<num>                      Partition (-1 for random partitioner when
                                using producer)
           --pretty             Pretty print output
  -?       --help               Display help message
  -V       --version            Print version information

The following will produce 11 messages on partition 5 for topic test_topic:

stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -t test_topic -p 5 -P

The following will consume 11 messages on partition 5 for topic test_topic:

stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -t test_topic -p 5 -C

The following will pretty print a list of all brokers and topics:

stack exec -- basic -b "broker1.example.com:9092,broker2.example.com:9092,broker3.example.com:9092" -L --pretty