Home

Awesome

Perfect-Kafka 简体中文

<p align="center"> <a href="http://perfect.org/get-involved.html" target="_blank"> <img src="http://perfect.org/assets/github/perfect_github_2_0_0.jpg" alt="Get Involed with Perfect!" width="854" /> </a> </p> <p align="center"> <a href="https://github.com/PerfectlySoft/Perfect" target="_blank"> <img src="http://www.perfect.org/github/Perfect_GH_button_1_Star.jpg" alt="Star Perfect On Github" /> </a> <a href="http://stackoverflow.com/questions/tagged/perfect" target="_blank"> <img src="http://www.perfect.org/github/perfect_gh_button_2_SO.jpg" alt="Stack Overflow" /> </a> <a href="https://twitter.com/perfectlysoft" target="_blank"> <img src="http://www.perfect.org/github/Perfect_GH_button_3_twit.jpg" alt="Follow Perfect on Twitter" /> </a> <a href="http://perfect.ly" target="_blank"> <img src="http://www.perfect.org/github/Perfect_GH_button_4_slack.jpg" alt="Join the Perfect Slack" /> </a> </p> <p align="center"> <a href="https://developer.apple.com/swift/" target="_blank"> <img src="https://img.shields.io/badge/Swift-3.0-orange.svg?style=flat" alt="Swift 3.0"> </a> <a href="https://developer.apple.com/swift/" target="_blank"> <img src="https://img.shields.io/badge/Platforms-OS%20X%20%7C%20Linux%20-lightgray.svg?style=flat" alt="Platforms OS X | Linux"> </a> <a href="http://perfect.org/licensing.html" target="_blank"> <img src="https://img.shields.io/badge/License-Apache-lightgrey.svg?style=flat" alt="License Apache"> </a> <a href="http://twitter.com/PerfectlySoft" target="_blank"> <img src="https://img.shields.io/badge/Twitter-@PerfectlySoft-blue.svg?style=flat" alt="PerfectlySoft Twitter"> </a> <a href="http://perfect.ly" target="_blank"> <img src="http://perfect.ly/badge.svg" alt="Slack Status"> </a> </p>

This project provides an express Swift wrapper of librdkafka.

This package builds with Swift Package Manager and is part of the Perfect project but can also be used as an independent module.

Release Notes for MacOS X

Before importing this library, please install librdkafka first:

$ brew install librdkafka

Please also note that a proper pkg-config path setting is required:

$ export PKG_CONFIG_PATH="/usr/local/lib/pkgconfig"

Release Notes for Linux

Before importing this library, please install librdkafka-dev first:

$ sudo apt-get install librdkafka-dev

Quick Start

Kafka Client Configurations

Before starting any stream operations, it is necessary to apply settings to clients, i.e., producers or consumers.

Perfect Kafka provides two different categories of configuration, i.e. Kafka.Config() for global configurations and Kafka.TopicConfig() for topic configurations.

Initialization of Global Configurations

To create a configuration set with default value settings, simple call:

let conf = try Kafka.Config()

or, if another configuration based on an existing one can be also duplicated in such a form:

let conf = try Kafka.Config()
// this will keep the original settings and duplicate a new one
let conf2 = try Kafka.Config(conf)

Initialization of Topic Configurations

Topic configuration shares the same initialization fashion with global configuration.

To create a topic configuration with default settings, call:

let conf = try Kafka.TopicConfig()

or, if another configuration based on an existing one can be also duplicated in such a form:

let conf = try Kafka.TopicConfig()
// this will keep the original settings and duplicate a new one
let conf2 = try Kafka.TopicConfig(conf)

Access Settings of Configuration

Both Kafka.Config and Kafka.TopicConfig have the same api of accessing settings.

List All Variables with Value

Kafka.Config.properties and Kafka.TopicConfig.properties provides dictionary type settings:

// this will print out all variables in a configuration
print(conf.properties)
// for example, it will print out something like:
// ["topic.metadata.refresh.fast.interval.ms": "250",
// "receive.message.max.bytes": "100000000", ...]

Get a Variable Value

Call get() to retrieve the value from a specific variable:

let maxBytes = try conf.get("receive.message.max.bytes")
// maxBytes would be "100000000" by default

Set a Variable with New Value

Call set() to save settings for a specific variable:

// this will restrict message receiving buffer to 1MB
try conf.set("receive.message.max.bytes", "1048576")

Producer

Perfect-Kafka provides a Producer class to send data / message to Kafka hosts. Producer can send a message one at a time, or sent multiple messages in a batch. Messages can be either text string or binary bytes.

let producer = try Producer("VideoTest")
let brokers = producer.connect(brokers: "host:9092")
if brokers > 0 {
  let _ = try producer.send(message: "hello, world!")
}

Before sending any actual messages, a few steps are required to setup the connection to Kafka hosts.

Producer Instance with a Topic

To initialize a Producer instance, a topic name is required no matter whether this topic exists in the Kafka hosts or not.

If the topic didn't exist when connected to Kafka hosts / brokers, Producer() would try to create a new one; Otherwise it would use the existing topic for further operations.

For example, the demo below shows how to start a producer with a topic named "VideoTest":

let producer = try Producer("VideoTest")

Connect to Brokers

Use method connect() to connect to one or more message brokers, i.e., Kafka hosts ( host and port ):

let brokers = producer.connect(brokers: "host1:9092,host2:9092,host3:9092")

If success, it will return the number of hosts that connected.

Alternatively, it is also possible to connect to brokers by different parameter fashions, take example, hosts can be an array of string:

let brokers = producer.connect(brokers: ["host1:9092", "host2:9092", "host3:9092"])

or dictionary:

let brokers = producer.connect(brokers: ["host1": 9092, "host2": 9092, "host3": 9092])

Send Messages

Perfect Kafka allows to send either text or binary messages to brokers one at a time or in a batch.

MethodDescriptionReturns
send(message: String, key: String? = nil)a text message with an optional key to sendan Int64 message id
send(message: [Int8], key: [Int8] = [])a binary message with an optional key to sendan Int64 message id
send(messages: [(String, String?)])text messages with optional keys in an array[Int64] message IDs for each message
send(messages: [([Int8], [Int8])])binary messages with optional keys in an array[Int64] message IDs for each message

Sent or Not

Perfect Kafka send() is asynchronous function so the library provides a few extra methods to determine the sending status of each message.

Consumer

Before actually receiving messages from Kafka with a specific topic, a few procedures are required to initialize a Consumer instance:

let consumer = try Consumer("VideoTest")
let brokers = consumer.connect(brokers: ["host1": 9092, "host2": 9092, "host3": 9092])
guard brokers > 0 else {
  // connection failed
}//end guard

Partitions

Once connected, it is a good idea to get the information from the brokers to see if there are sufficient resources, i.e., partitions, for further operations:

let info = try consumer.brokerInfo()
print(info)

The above variable info is a MetaData structure as reference below:

MemberTypeDescription
brokers[Broker]An array of Broker structure
topics[Topic]An array of Topic structure

Structure Broker stores the information of a broker:

MemberTypeDescription
idIntBroker Id
hostStringHost name of the broker
portIntHost port that listens

The major content of Topic structure is to record how many partitions are using in such a topic:

MemberTypeDescription
nameStringTopic name
errExceptionTopic error reported by broker
partitions[Partition]Partitions of this topic

Data structure Partition is vitally important to indicate the partition id for messaging:

MemberTypeDescription
idIntPartition Id - use this to start / stop messaging
errExceptionPartition error reported by broker
leaderIntLeader broker
replicas[Int]Replica brokers
isrs[Int]In-Sync-Replica brokers

Practically, partition info could be acquired by way below:

let consumer = try Consumer("VideoTest")
let brokers = consumer.connect(brokers: ["host1": 9092, "host2": 9092, "host3": 9092])
guard brokers > 0 else {
  // connection failed
}//end guard
consumer.OnArrival = { m in print("message : #\(m.offset) \(m.text)")}
let info = try consumer.brokerInfo()
guard info.topics.count > 0 else {
  // no topic found
}//end guard
guard info.topics[0].name == "VideoTest" else {
  // it is not the topic we want
}//end guar
let partitions = info.topics[0].partitions

Download Messages From A Partition

Code below shows how to download messages from a partition. In this demo, we assume let partId = partitions[0].id:

consumer.OnArrival = { m in
  print("message #\(m.offset) : \(m.text)")
}//end event

// start downloading
try consumer.start(partition: partId)
// run until end of program
while(notEndOfProgram) {
  let total = try consumer.poll(partition: partId)
  print("\(total) messages arrived in this moment")
}//end while
consumer.stop(partId)

Now we take a walk through:

Firstly, OnArrival() event is a callback with a Message data structure:

MemberTypeDescription
errExceptionError: if the message is good or not
topicStringtopic name of the message
partitionIntpartition of the message
isTextBoolif the message is a valid UTF-8 text or not
data[Int8]the original binary data of message body
textStringdecoded message body in a UTF-8 string, if isText
keyIsTextBoolif the key is a valid UTF-8 text
keybuf[Int8]the original binary data of optional key
keyStringdecoded key in a UTF-8 string, if keyIsText
offsetInt64offset inside the topic

Secondly, call start() to start download messages: func start(_ from: Position = .BEGIN, partition: Int32 = RD_KAFKA_PARTITION_UA), here are the parameter details:

Then Perfect Kafka provides the poll() function to wait a short while to listen the activity of a specific partition: func poll(_ timeout: UInt = 10, partition: Int32 = RD_KAFKA_PARTITION_UA). The timeout is the milliseconds to wait for polling.

Finally, call stop() to end the messaging.

Further Information

For more information on the Perfect project, please visit perfect.org.