Home

Awesome

Apache Kafka + Kafka Connect + MQTT Connector + Sensor Data

This demo shows an Internet of Things (IoT) integration example using Apache Kafka + Kafka Connect + MQTT Connector + Sensor Data.

This project does not include any source code as Kafka Connect allows integration with data sources and sinks just with configuration.

Example configuration and step-by-step guide can be found below. If you want to find more details about Kafka + MQTT integration, take a look at my slides from Kafka Summit 2018 in San Francisco: IoT Integration with MQTT and Apache Kafka. The video recording is available on the website of Kafka Summit for free: Kafka MQTT Integration - Video Recording.

Architecture: Sensor Data via MQTT Broker and Kafka Connect MQTT Connector to Kafka Cluster

This project focuses on the integration of MQTT sensor data into Kafka via MQTT Broker and Kafka Connect for further processing:

As alternative to using Kafka Connect, you can also leverage Confluent MQTT Proxy to integrate IoT data from IoT devices directly withou the need for a MQTT Broker. See Deep Learning UDF for KSQL for Streaming Anomaly Detection of MQTT IoT Sensor Data for an example and source code.

If you want to see the other part (integration with sink applications like Elasticsearch / Grafana), please take a look at the project "KSQL for streaming IoT data", which shows how to realize the integration with ElasticSearch via Kafka Connect.

Live Demo Video - MQTT with Kafka Connect and MQTT Proxy

If you want to see Apache Kafka / MQTT integration in a video, please check out the following 15min recording showing a demo my two Github examples:

Apache Kafka + MQTT Integration

Kafka Connect Configuration (No Source Code Needed!)

Here is the full configuration for the MQTT Connector for Kafka Connect's Standalone mode, which we use with Confluent CLI for a local setup:

            name=MqttSourceConnector1
            connector.class=io.confluent.connect.mqtt.MqttSourceConnector
            tasks.max=1
            mqtt.server.uri=< Required Configuration >
            mqtt.topics=< Required Configuration >

For distributed mode, you can use the same configuration with REST API:

            curl -s -X POST -H 'Content-Type: application/json' http://localhost:8083/connectors -d '{
                "name" : "< Required Configuration >",
            "config" : {
                "connector.class" : "io.confluent.connect.mqtt.MqttSourceConnector",
                "tasks.max" : "1",
                "mqtt.server.uri" : "< Required Configuration >",
                "mqtt.topics" : "< Required Configuration >",
                "kafka.topics" : "< Required Configuration >"
            }
            }'

The documentation explains the differences between standalone and distributed Kafka Connect mode. In short: Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks. Distributed mode is used in most production scenarios and provides scalability and automatic fault tolerance for Kafka Connect. You can also use distributed mode for local development leveraging its advantages like a REST API even if you just have one single Connect instance (and you can scale it later easily without complex changes). We will use distributed mode in this project therefore.

Confluent documentation contains more details about installing and using Confluent's MQTT Connector.

How to run it?

Requirements

The code is developed and tested on Mac and Linux operating systems. As Kafka does not support and work well on Windows, this is not tested at all.

Step-by-step demo

Follow these steps to configure the MQTT Connector, start all components, generate MQTT sensor data and consume it from a Kafka consumer.