Home

Awesome

Build Status License

Mirus

A tool for distributed, high-volume replication between Apache Kafka clusters based on Kafka Connect. Designed for easy operation in a high-throughput, multi-cluster environment.

Features

Overview

Mirus is built around Apache Kafka Connect, providing SourceConnector and SourceTask implementations optimized for reading data from Kafka source clusters. The MirusSourceConnector runs a KafkaMonitor thread, which monitors the source and destination Apache Kafka cluster partition allocations, looking for changes and applying a configurable topic whitelist. Each task is responsible for a subset of the matching partitions, and runs an independent KafkaConsumer and KafkaProducer client pair to do the work of replicating those partitions.

Tasks can be restarted independently without otherwise affecting a running cluster, are monitored continuously for failure, and are optionally automatically restarted.

To understand how Mirus distributes work across a cluser of machines please read the Kafka Connect documentation.

Installation

To build a package containing the Mirus jar file and all dependencies, run mvn package -P all:

This package can be unzipped for use (see Quick Start).

Maven also builds the following artifacts when you run mvn package. These are useful if you need customized packaging for your own environment:

Usage

These instructions assume you have expanded the mirus-${project.version}-all.zip package.

Mirus Worker Instance

A single Mirus Worker can be started using this helper script.

> bin/mirus-worker-start.sh [worker-properties-file]

worker-properties-file: Path to the Mirus worker properties file, which configures the Kafka Connect framework. See quickstart-worker.properties for an example.

Options:

--override property=value: optional command-line override for any item in the Mirus worker properties file. Multiple override options are supported (similar to the equivalent flag in Kafka).

Mirus Offset Tool

Mirus includes a simple tool for reading and writing offsets. This can be useful for migration from other replication tools, for debugging, and for offset monitoring in production. The tool supports CSV and JSON input and output.

For detailed usage:

> bin/mirus-offset-tool.sh --help

Quick Start

To run the Quick Start example you will need running Kafka and Zookeeper clusters to work with. We will assume you have a standard Apache Kafka Quickstart test cluster running on localhost. Follow the Kafka Quick Start instructions.

For this tutorial we will set up a Mirus worker instance to mirror the topic test in loop-back mode to another topic in the same cluster. To avoid a conflict the destination topic name will be set to test.mirror using the destination.topic.name.suffix configuration option.

  1. Build the full Mirus project using Maven

    > mvn package -P all
    
  2. Unpack the Mirus "all" package

    > mkdir quickstart; cd quickstart; unzip ../target/mirus-*-all.zip
    
  3. Start the quickstart worker using the sample worker properties file

    > bin/mirus-worker-start.sh config/quickstart-worker.properties
    
    
  4. In another terminal, confirm the Mirus Kafka Connect REST API is running

    > curl localhost:8083
    
    {"version":"1.1.0","commit":"fdcf75ea326b8e07","kafka_cluster_id":"xdxNfx84TU-ennOs7EznZQ"}
    
  5. Submit a new MirusSourceConnector configuration to the REST API with the name mirus-quickstart-source

    > curl localhost:8083/connectors/mirus-quickstart-source/config \
          -X PUT \
          -H 'Content-Type: application/json' \
          -d '{
               "name": "mirus-quickstart-source",
               "connector.class": "com.salesforce.mirus.MirusSourceConnector",
               "tasks.max": "5",
               "topics.whitelist": "test",
               "destination.topic.name.suffix": ".mirror",
               "destination.consumer.bootstrap.servers": "localhost:9092",
               "consumer.bootstrap.servers": "localhost:9092",
               "consumer.client.id": "mirus-quickstart",
               "consumer.key.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
               "consumer.value.deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer"
           }'
    
  6. Confirm the new connector is running

    > curl localhost:8083/connectors
    
    ["mirus-quickstart-source"]
    
    > curl localhost:8083/connectors/mirus-quickstart-source/status
    
    {"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"1.2.3.4:8083"},"tasks":[],"type":"source"}
    
  7. Create source and destination topics test and test.mirror in your Kafka cluster

    > cd ${KAFKA_HOME}
    
    > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 'test' --partitions 1 --replication-factor 1
    Created topic "test".
    
    > bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic 'test.mirror' --partitions 1 --replication-factor 1
    Created topic "test.mirror".
    
  8. Mirus should detect that the new source and destination topics are available and create a new Mirus Source Task:

    > curl localhost:8083/connectors/mirus-quickstart-source/status
    
    {"name":"mirus-quickstart-source","connector":{"state":"RUNNING","worker_id":"10.126.22.44:8083"},"tasks":[{"state":"RUNNING","id":0,"worker_id":"10.126.22.44:8083"}],"type":"source"}
    

Any message you write to the topic test will now be mirrored to test.mirror.

REST API

See the documentation for Kafka Connect REST API.

Configuration

Kafka Connect Configuration

Mirus shares most Worker and Source configuration with the Kafka Connect framework. For general information on configuring the framework see:

Mirus Specific Configuration

Mirus-specific configuration properties are documented in these files:

Destination Topic Checking

By default, Mirus checks that the destination topic exists in the destination Kafka cluster before starting to replicate data to it. This feature can be disabled by setting the enable.destination.topic.checking config option to false.

As of version 0.2.0, destination topic checking can also support topic re-routing performed by the RegexRouter Single-Message Transformation. No other Router Transformations are supported, so destination topic checking must be disabled in order to use them.

Metrics

Mirus produces some custom metrics in addition to the standard Kafka Connect metrics.

JMX Queries are as follows

Latency (MirrorJmxReporter)

objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-max"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-min"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-avg"
objectName="mirus:type=MirusSourceConnector,topic=*" attribute="replication-latency-ms-count"

Destination Information (MissingPartitionsJmxReporter)

objectName="mirus:type=mirus" attribute="missing-dest-partitions-count"

Connector Metrics (ConnectorJmxReporter)

objectName="mirus:type=connector-metrics,connector=*" attribute="task-failed-restart-attempts-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="connector-failed-restart-attempts-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="failed-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="paused-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="destroyed-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="running-task-count"
objectName="mirus:type=connector-metrics,connector=*" attribute="unassigned-task-count"

Task Metrics (TaskJmxReporter)

objectName="mirus:type=connector-task-metrics,connector=*" attribute="task-failed-restart-attempts-count"

Developer Info

To preform a release: mvn release:prepare release:perform GPG Keys may need to be passed to maven with -Darguments='-Dgpg.passphrase= -Dgpg.keyname=55Z32RD1'

Discussion

Questions or comments can also be posted on the Mirus Github issues page.

Maintainers

Paul Davidson and contributors.

Code Style

This project uses the Google Java Format.