Home

Awesome

MirrorTool for Kafka Connect (Deprecated)

Note: Since version 2.4 Kafka ships with a Kafka Source Connector that implements this functionality. As such, this repository is no longer actively maintained.

A Kafka Source Connector for Kafka Connect. Mirror topics from a source Kafka cluster into your destination Kafka cluster.

Downloads

You can grab a pre-built jar on the Releases page.

Why

The aim of this connector is to enable MirrorMaker-like functionality within the Kafka Connect framework.

At Comcast, we have seen two key benefits to using MirrorTool over MirrorMaker:

High level diagram of MirrorTool. Source Kafka cluster on the left of the image, with arrow indicating data flow from the source Kafka cluster to the MirrorTool connector/Kafka Connect  in the center of image. Dashed line from MirrorTool to source Kafka cluster indicating monitoring of the source Kafka cluster for topics and partitions to mirror. Arrow indicating data flow from Kafka Connect to the destination Kafka cluster on the right of the image.

How it works

Build

Build a plugin jar to build/libs/MirrorTool-[version].jar by running:

./gradlew build

Copy the resulting jar file into your Kafka Connect plugin directory.

Configuration Options

Note that these options are for this MirrorTool connector, and assumes some familiarity with the base Kafka Connect configuration options. (You will need to configure the parameters for your destination Kafka Cluster in the base Kafka Connect configuration)

Common Options

These are the most common options that are required when configuring this connector:

Configuration ParameterExampleDescription
source.bootstrap.serverssource.broker1:9092,source.broker2:9092Mandatory. Comma separated list of boostrap servers for the source Kafka cluster
source.topic.whitelisttopic, topic-prefix*Java regular expression to match topics to mirror. For convenience, comma (',') is interpreted as the regex-choice symbol ('|').
source.auto.offset.resetearliestIf there is no stored offset* for a partition, indicates where to start consuming from. Options are "earliest", "latest", or "none". Default: earliest
source.group.idkafka-connectGroup ID used when writing offsets back to source cluster (for offset lag tracking)

Advanced Options

Some use cases may require modifying the following default connector options. Use with care.

Configuration ParameterDefaultDescription
include.message.headerstrueIndicates whether message headers from source records should be included when delivered to the destination cluster.
topic.list.timeout.ms60000Amount of time (in milliseconds) the partition monitor thread should wait for the source kafka cluster to return topic information before logging a timeout error.
topic.list.poll.interval.ms300000Amount of time (in milliseconds) the partition monitor will wait before re-querying the source cluster for a change in the topic partitions to be consumed
reconfigure.tasks.on.leader.changefalseIndicates whether the partition monitor should request a task reconfiguration when partition leaders have changed. In some cases this may be a minor optimization as when generating task configurations, the connector will try to group partitions to be consumed by each task by the leader node. The downside to this is that it may result in excessive rebalances.
poll.loop.timeout.ms1000Maximum amount of time (in milliseconds) the connector will wait in each poll loop without data before returning control to the kafka connect task thread.
max.shutdown.wait.ms2000Maximum amount of time (in milliseconds) to wait for the connector to gracefully shut down before forcing the consumer and admin clients to close. Note that any values greater than the kafka connect parameter task.shutdown.graceful.timeout.ms will not have any effect.
source.max.poll.records500Maximum number of records to return from each poll of the internal KafkaConsumer. When dealing with topics with very large messages, the connector may sometimes spend too long processing each batch of records, causing lag in offset commits, or in serious cases, unnecessary consumer rebalances. Reducing this value can help in these scenarios. Conversely, when processing very small messages, increasing this value may improve overall throughput.
source.key.deserializerorg.apache.kafka.common.serialization.ByteArrayDeserializerKey deserializer to use for the kafka consumers connecting to the source cluster.
source.value.deserializerorg.apache.kafka.common.serialization.ByteArrayDeserializerValue deserializer to use for the kafka consumers connecting to the source cluster.
source.enable.auto.committrueIf true the consumer's offset will be periodically committed to the source cluster in the background.
Note that these offsets are not used to resume the connector (They are stored in the Kafka Connect offset store), but may be useful in monitoring the current offset lag of this connector on the source cluster

Overriding the internal KafkaConsumer and AdminClient Configuration

Note that standard Kafka parameters can be passed to the internal KafkaConsumer and AdminClient by prefixing the standard configuration parameters with "source.". For cases where the configuration for the KafkaConsumer and AdminClient diverges, you can use the more explicit "connector.consumer." and "connector.admin." configuration parameter prefixes to fine tune the settings used for each.

Example Configuration

{
  "name": "kafka-connect-kafka-source-example", // Name of the kafka connect task
    "config": {
    "tasks.max": "2", // Maximum number of parallel tasks to run
    "key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", // Set the format of data that is written to kafka. As the connector parses source cluster messages as binary arrays, this will be the usual required value here
    "value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter", // Set the format of data that is written to kafka. As the connector parses source cluster messages as binary arrays, this will be the usual required value here
    "connector.class": "com.comcast.kafka.connect.kafka.KafkaSourceConnector", // The full class name of this connector
    "source.bootstrap.servers": "kafka.bootstrap.server1:9092,kafka.bootstrap.server2:9093", // Kafka boostrap servers for source cluster
    "source.topic.whitelist": "test.topic.*", // Mirror topics matching this regex
    "source.auto.offset.reset": "earliest", // For partitions without existing offsets stored, start at the head of the partition
    "source.group.id": "kafka-connect-testing", // Use this group ID when commiting offsets to the source cluster
    "connector.consumer.reconnect.backoff.max.ms": "10000" // Override the default consumer setting "reconnect.backoff.max.ms"
  }
}