Home

Awesome

Build Status

Vert.x Kafka

The Vert.x kafka library allows asynchronous publishing and receiving of messages on Kafka topic through the vert.x event bus.

####To use this library you must have kafka and zookeeper up and running. Follow instructions at Kafka quick start guide

This is a multi-threaded worker library that consumes kafka messages and then re-broadcast them on an address on the vert.x event bus.

Getting Started

Add a dependency to vertx-kafka:

<dependency>
    <groupId>com.cyngn.vertx</groupId>
    <artifactId>vertx-kafka</artifactId>
    <version>3.3.0-SNAPSHOT</version>
</dependency>
vertx-kafkavert.xkafka
3.3.0-SNAPSHOT3.3.0-SNAPSHOT0.9.0
0.4.13.1.00.9.0

Consumer

Listening for messages coming from a kafka broker.

Configuration

{
    "zookeeper.connect" : "<host1:2181,host:2181...>",
    "group.id" : "<kafkaConsumerGroupId>",
    "bootstrap.servers" "<host1:9092,host2:9092...>",
    "backoff.increment.ms" : "<backTimeInMilli>",
    "autooffset.reset" : "<kafkaAutoOffset>",
    "topics" : ["<topic1>", "<topic2>"],
    "eventbus.address" : "<default kafka.message.consumer>",
    "consumer.poll.interval.ms" : <default 100 ms>
}

For example:

{
    "zookeeper.host" : "localhost:2181",
    "group.id" : "testGroup",
    "bootstrap.servers" "localhost:9092",
    "backoff.increment.ms" : "100",
    "autooffset.reset" : "smallest",
    "topics" : ["testTopic"],
    "eventbus.address" : "kafka.to.vertx.bridge",
    "consumer.poll.interval.ms" : 1000
}

Field breakdown:

For a deeper look at kafka configuration parameters check this page out.

Usage

You should only need one consumer per application.

Deploy the verticle in your server


vertx = Vertx.vertx();

// sample config
JsonObject consumerConfig = new JsonObject();
consumerConfig.put(ConfigConstants.GROUP_ID, "testGroup");
List<String> topics = new ArrayList<>();
topics.add("testTopic");
consumerConfig.put("topics", new JsonArray(topics));

deployKafka(config);

public void deployKafka(JsonObject config) {
   // use your vert.x reference to deploy the consumer verticle
	 vertx.deployVerticle(MessageConsumer.class.getName(),
      new DeploymentOptions().setConfig(config),
			deploy -> {
   		     if(deploy.failed()) {
        	     System.err.println(String.format("Failed to start kafka consumer verticle, ex: %s", deploy.cause()));
               vertx.close()
               return;
            }
            System.out.println("kafka consumer verticle started");
      }
	);
}

Listen for messages


vertx.eventBus().consumer(MessageConsumer.EVENTBUS_DEFAULT_ADDRESS,
	message -> {
		   System.out.println(String.format("got message: %s", message.body()))
		   // message handling code
		   KafkaEvent event = new KafkaEvent(message.body());
	 });

Consumer Errors

You can listen on the address kafka.producer.error for errors from the kafka producer.

Producer

Send a message to a kafka cluster on a predefined topic.

Configuration

{
    "serializer.class":"<the default encoder>",
    "key.serializer":"<the key encoder>",
    "value.serializer":"<the value encoder>",
    "bootstrap.servers":"<host1:9092,host2:9092>,"
    "default_topic":"<default kafka topic to send to>,"
    "eventbus.address":"<the event bus topic where you send messages to send to kafka>"
    "max.block.ms" : <defaults to 60000>
}

For example:

{
    "serializer.class":"org.apache.kafka.common.serialization.StringSerializer",
    "bootstrap.servers":"localhost:9092",
    "default_topic":"testTopic"
}

For a deeper look at kafka configuration parameters check this page out.

Usage

You should only need one producer per application.

Deploy the verticle in your server


vertx = Vertx.vertx();

// sample config
JsonObject producerConfig = new JsonObject();
producerConfig.put("bootstrap.servers", "localhost:9092");
producerConfig.put("serializer.class", "org.apache.kafka.common.serialization.StringSerializer");
producerConfig.put("default_topic", "testTopic");

deployKafka(producerConfig);

public void deployKafka(JsonObject config) {
   // use your vert.x reference to deploy the consumer verticle
	 vertx.deployVerticle(MessageProducer.class.getName(),
     new DeploymentOptions().setConfig(config),
		 deploy -> {
   	   if(deploy.failed()) {
         System.err.println(String.format("Failed to start kafka producer verticle, ex: %s", deploy.cause()));
          vertx.close()
          return;
       }
       System.out.println("kafka producer verticle started");
   });
}

Send message to kafka topic


KafkaPublisher publisher = new KafkaPublisher(vertx.eventBus());

// send to the default topic
publisher.send("a test message on a default topic");
// send to a specific topic
publisher.send("SomeSpecialTopic", "a test message on a default topic");
// send to a specific topic with custom key
publisher.send("SomeSpecialTopic", "aUserId", "a test message on a default topic");
// send to a specific topic and partition
publisher.send("SomeSpecialTopic", "", 5, "a test message on a default topic");

Producer Errors

You can listen on the address kafka.producer.error for errors from the kafka producer.

Test setup