Home

Awesome

kafka-streams-scala

This is a thin Scala wrapper for the Kafka Streams API. It does not intend to provide a Scala-idiomatic API, but rather intends to make the original API simpler to use from Scala. In particular, it provides the following adjustments:

This API also contains a few Serdes (Serializers/Deserializers):

Finally, the API provides the following extensions:

Usage of the Kafka Streams API in Scala

The main objects are:

Using the builder

With the original Java API, you would create an instance of KStreamBuilder, then use it to create streams or tables. Here, KStreamsBuilderS is an object that can be used directly:

val stream: KStreamS[String, String] = KStreamBuilderS.stream[String, String]("my-stream")

val table: KTableS[String, String] = KStreamBuilderS.table[String, String]("my-table")

When starting the application, you just need to unwrap the KStreamBuilder by calling KStreamBuilderS.inner:

val streams = new KafkaStreams(KStreamBuilderS.inner, config)

Serdes (declare them as implicit)

It is a common mistake to forget to specify Serdes when using the Java API, then resulting in class cast errors when objects are serialized or deserialized.

To work around this issue, this API requires Serdes to be used. Most of the times, it is enough to declare your Serdes as implicit values, and they will be picked up automatically:

implicit val stringSerde: Serde[String] = Serdes.String()
implicit val userSerde: Serde[User] = new MyUserSerde

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")

Resolution is based on the type of the object to serialize/deserialize, so make sure you have a Serde of the appropriate type. If not, you should see an error such as:

Error:(87, 80) could not find implicit value for parameter keySerde: org.apache.kafka.common.serialization.Serde[String]

If, on the other hand, you have multiple Serdes for the same type, you might see the following error:

Error:(88, 80) ambiguous implicit values:
 both value stringSerde2 of type org.apache.kafka.common.serialization.Serde[String]
 and value stringSerde1 of type org.apache.kafka.common.serialization.Serde[String]
 match expected type org.apache.kafka.common.serialization.Serde[String]

In this case, just pass the Serde explicitly:

val usersByIdStream = KStreamBuilderS.stream[String, User]("users-by-id")(stringSerde, userSerde)

Usage of the Serdes in Scala

To convert Scala Int/Long/Double to/from their binary representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntAsStringSerde
implicit val longSerde = LongAsStringSerde
implicit val doubleSerde = DoubleAsStringSerde

To convert Scala Int/Long/Double to/from string representation:

import com.github.aseigneurin.kafka.serialization.scala._

implicit val intSerde = IntSerde
implicit val longSerde = LongSerde
implicit val doubleSerde = DoubleSerde

To convert case classes to/from JSON:

Example:

import com.github.aseigneurin.kafka.serialization.scala._

case class User(name: String)

implicit val stringSerde = Serdes.String
implicit val userSerde = new JsonSerde[User]

// read JSON -> case class
KStreamBuilderS.stream[String, User]("users")
  .mapValues { user => user.name }
  .to("names")

// write case class -> JSON
KStreamBuilderS.stream[String, String]("names")
  .mapValues { name => User(name) }
  .to("users")

Example

This repository contains a Scala version of the Java Word Count Demo.

Here is the code to implement a word count:

val props = new Properties()
// ...

implicit val stringSerde = Serdes.String
implicit val longSerde = LongAsStringSerde

val source = KStreamBuilderS.stream[String, String]("streams-file-input")

val counts: KTableS[String, Long] = source
  .flatMapValues { value => value.toLowerCase(Locale.getDefault).split(" ") }
  .map { (_, value) => (value, value) }
  .groupByKey
  .count("Counts")

counts.to("streams-wordcount-output")

val streams = new KafkaStreams(KStreamBuilderS.inner, props)
streams.start()

Extensions

KStreamS.split()

This method applies a predicate and returns two KStreamSs, one with the messages that match the predicate, and another one with the messages that don't match.

The two KStreamSs are returned in a tuple that can be easily deconstructed:

def isValidMessage(v: ...): Boolean = ???

val (goodMessages, badMessages) = deserializedMessages.split((k, v) => isValidMessage(v))