Home

Awesome

dvlopt.kafka

Clojars
Project

This Apache Kafka client library is a Clojure wrapper for the official Java libraries. It strives for a balance between being idiomatic but not too clever. Users accustomed to the java libraries will be right at home, although it is not a prerequisite.

It provides namespaces for handling consumers, producers, and doing some administration. Also, we have the pleasure to announce that Kafka Streams is fully supported. The user can create a mock Kafka Streams application which do not need a running Kafka Cluster. This is perfect for learning the library as well as building applications at the REPL and seeing how they behave right away.

Ready for Kafka 2.1.0. Previously known as "Milena", the API of this iteration is considered stable unless something significantly changes in the Java libraries. In general, only the namespace relative to administration is at risk.

We try to provide good documention because many important concepts are poorly explained or confusing in the Java libraries. Feel free to provide feedback, contribute, and let us know if something is not clear.

Usage

First, read the fairly detailed API. Specially if you are not used to the java libraries and their various concepts.

Alternatively, documentation can be generated by running :

$ lein codox
$ cd doc/auto

Then, have a look at the following examples. Just so we are prepared, let us require all namespaces involved.

You can clone this repo and start a REPL, everything needed is imported in the dev namespace.

;; For Kafka :

(require '[dvlopt.kafka       :as K]
         '[dvlopt.kafka.admin :as K.admin]
         '[dvlopt.kafka.in    :as K.in]
         '[dvlopt.kafka.out   :as K.out])


;; For Kafka Streams :

(require '[dvlopt.kstreams          :as KS]
         '[dvlopt.kstreams.mock     :as KS.mock]
         '[dvlopt.kstreams.topology :as KS.topology]
         '[dvlopt.kstreams.ctx      :as KS.ctx]
         '[dvlopt.kstreams.store    :as KS.store]
         '[dvlopt.kstreams.builder  :as KS.builder]
         '[dvlopt.kstreams.stream   :as KS.stream]
         '[dvlopt.kstreams.table    :as KS.table])

Administration

Creating topic "my-topic" using the dvlopt.kafka.admin namespace.

(with-open [admin (K.admin/admin)]
  (K.admin/create-topics admin
                         {"my-topic" {::K.admin/number-of-partitions 4
                                      ::K.admin/replication-factor   3
                                      ::K.admin/configuration        {"cleanup.policy" "compact"}}})
  (println "Existing topics : " (keys @(K.admin/topics admin
                                                       {::K/internal? false}))))

Producing records

Sending 25 records to "my-topic" using the dvlopt.kafka.out namespace.

(with-open [producer (K.out/producer {::K/nodes             [["localhost" 9092]]
                                      ::K/serializer.key    (K/serializers :long)
                                      ::K/serializer.value  :long
                                      ::K.out/configuration {"client.id" "my-producer"}})]
  (doseq [i (range 25)]
    (K.out/send producer
                {::K/topic "my-topic"
                 ::K/key   i
                 ::K/value (* 100 i)}
                (fn callback [exception metadata]
                  (println (format "Record %d : %s"
                                   i
                                   (if exception
                                     "FAILURE"
                                     "SUCCESS")))))))

Consuming records

Reading a batch of records from "my-topic" and manually commit the offset of where we are using the dvlopt.kafka.in namespace.

(with-open [consumer (K.in/consumer {::K/nodes              [["localhost" 9092]]
                                     ::K/deserializer.key   :long
                                     ::K/deserializer.value :long
                                     ::K.in/configuration   {"auto.offset.reset" "earliest"
                                                             "enable.auto.commit" false
                                                             "group.id"           "my-group"}})]
  (K.in/register-for consumer
                     ["my-topic"])
  (doseq [record (K.in/poll consumer
                            {::K/timeout [5 :seconds]})]
    (println (format "Record %d @%d - Key = %d, Value = %d"
                     (::K/offset record)
                     (::K/timestamp record)
                     (::K/key record)
                     (::K/value record))))
  (K.in/commit-offsets consumer))

Kafka Streams low-level API

Useless but simple example of grouping records in two categories based on their key, "odd" and "even", and continuously summing values in each category.

First, we create a topology. We then add a source node fetching records from "my-input-topic". Those records are processed by "my-processor" which needs "my-store" in order to persist the current sum for each category. Finally, a sink node receives processed records and sends them to "my-output-topic".

For testing the topology, we create a mock Kafka Streams application which emulate a Kafka cluster. This is perfect for learning, testing, and fiddling at the REPL. We pipe a few records to see how it is behaving. Of course, you could test it with a running cluster.

(def topology
     (-> (KS.topology/topology)
         (KS.topology/add-source "my-source"
                                 ["my-input-topic"]
                                 {::K/deserializer.key   :long
                                  ::K/deserializer.value :long
                                  ::KS/offset-reset      :earliest})
         (KS.topology/add-processor "my-processor"
                                    ["my-source"]
                                    {::KS/processor.init      (fn [ctx]
                                                                (KS.ctx/kv-store ctx
                                                                                 "my-store"))
                                     ::KS/processor.on-record (fn [ctx my-store record]
                                                                (println "Processing record : " record)
                                                                (let [key' (if (odd? (::K/key record))
                                                                             "odd"
                                                                             "even")
                                                                      sum  (+ (or (KS.store/kv-get my-store
                                                                                                   key')
                                                                                  0)
                                                                              (::K/value record))]
                                                                  (KS.store/kv-put my-store
                                                                                   key'
                                                                                   sum)
                                                                  (KS.ctx/forward ctx
                                                                                  {::K/key   key'
                                                                                   ::K/value sum})))})
         (KS.topology/add-store ["my-processor"]
                                {::K/deserializer.key   :string
                                 ::K/deserializer.value :long
                                 ::K/serializer.key     :string
                                 ::K/serializer.value   :long
                                 ::KS.store/name        "my-store"
                                 ::KS.store/type        :kv.in-memory
                                 ::KS.store/cache?      false})
         (KS.topology/add-sink "my-sink"
                               ["my-processor"]
                               "my-output-topic"
                               {::K/serializer.key   :string
                                ::K/serializer.value :long})))


;; This will run without a Kafka Cluster

(def mock-app
     (KS.mock/mock-app "KS-low-level-test"
                       topology))


;; We pipe a few records into our fake runtime

(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; Run this a few times to se the result

(KS.mock/read-record mock-app
       	             "my-output-topic"
                     {::K/deserializer.key   :string
                      ::K/deserializer.value :long})


;; If you wish to use a real Kafka cluster instead

(comment
    (def app
         (KS/app "my-app-1"
                 topology
                 {::K/nodes         [["localhost" 9092]]
                  ::KS/on-exception (fn [exception _thread]
                                      (println "Exception : " exception))}))


    (KS/start app))

Kafka Streams high-level API

Same example as previously but in a more functional style. In addition, values are aggregated in 2 seconds windows (it is best to run the producer example a few times first).

First, we need a builder. Then, we add a stream fetching records from "my-input-topic". Records are then grouped into our categories and then each category is windowed in 2 seconds windows. Each window is then reduced for computing a sum. We are now ready and we can build a topology out of our builder. It is always a good idea to have a look at the description of the built topology to have a better idea of what is created by the high-level API.

A window store is then retrieved and each window for each category is printed.

(def topology
     (let [builder (KS.builder/builder)]
       (-> builder
           (KS.builder/stream ["my-input-topic"]
                              {::K/deserializer.key   :long
                               ::K/deserializer.value :long
                               ::KS/offset-reset      :earliest})
           (KS.stream/group-by (fn [k v]
                                 (println (format "Grouping [%d %d]"
                                                  k
                                                  v))
                                 (if (odd? k)
                                   "odd"
                                   "even"))
                               {::K/deserializer.key   :string
                                ::K/deserializer.value :long
                                ::K/serializer.key     :string
                                ::K/serializer.value   :long})
           (KS.stream/window [2 :seconds])
           (KS.stream/reduce-windows (fn reduce-window [sum k v]
                                       (println (format "Adding value %d to sum %s for key '%s'"
                                                        v
                                                        sum
                                                        k))
                                       (+ sum
                                          v))
                                     (fn seed []
                                       0)
                                     {::K/deserializer.key   :string
                                      ::K/deserializer.value :long
                                      ::K/serializer.key     :string
                                      ::K/serializer.value   :long
                                      ::KS.store/name        "my-store"
                                      ::KS.store/type        :kv.in-memory
                                      ::KS.store/cache?      false}))
       (KS.topology/topology builder)))


;; Always interesting to see what is the actual topology.

(KS.topology/describe topology)


;; Just like in the previous example, we create a fake runtime and pipe a few records.

(def mock-app
     (KS.mock/mock-app "KS-high-level-test"
                       topology))


(dotimes [i 25]
  (KS.mock/pipe-record mock-app
                       {::K/topic "my-input-topic"
                        ::K/partition 0
                        ::K/offset    i
                        ::K/key       ((K/serializers :long) i)
                        ::K/value     ((K/serializers :long) i)}))


;; And here is how we can read from a store.

(def my-store
     (KS.mock/window-store mock-app
             	           "my-store"))


(with-open [cursor (KS.store/ws-multi-range my-store)]
  (doseq [db-record (iterator-seq cursor)]
    (println (format "Aggregated key = '%s', time windows = [%d;%d), value = %d"
                     (::K/key db-record)
                     (::K/timestamp.from db-record)
                     (::K/timestamp.to db-record)
                     (::K/value db-record)))))

Testing

It is possible to create mock consumers (dvlopt.kafka.in.mock namespace) and producers (dvlopt.kafka.out.mock) for testing purposes. They can use the normal API without needing to contact a Kafka cluster but not everything behave strictly the same (cf. documentation).

Kafka Streams applications can get fairly complex. Providing a fake runtime, as in the examples, is a great solution for unit tests. While building a new application at the REPL, you can gradually inspect if everything seem to work as expected. Hence, this is a valuable tool.

License

Copyright © 2017 Adam Helinski

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.