Home

Awesome

Google Cloud Pub/Sub Group Kafka Connector

Maven Stability

The Google Cloud Pub/Sub Group Kafka Connector library provides Google Cloud Platform (GCP) first-party connectors for Pub/Sub products with Kafka Connect. You can use the library to transmit data from Apache Kafka to Cloud Pub/Sub or Pub/Sub Lite and vice versa.

Prerequisites

You must have a GCP project in order to use Cloud Pub/Sub or Pub/Sub Lite.

Follow these setup steps for Pub/Sub before doing the quickstart.

Follow these setup steps for Pub/Sub Lite before doing the quickstart.

For general information on how to authenticate with GCP when using the Google Cloud Pub/Sub Group Kafka Connector library, please visit Provide credentials for Application Default Credentials.

Quickstart

In this quickstart, you will learn how to send data from a Kafka topic to a Pub/Sub or Pub/Sub Lite topic and vice versa, using Kafka Connect running locally in standalone mode (single process).

  1. Follow the Kafka quickstart to download Kafka, start the Kafka environment, and create a Kafka topic.

    Note: Please use the same Kafka API major version as that used by the connector. Otherwise, the connector may not work properly. Check the Kafka version used by the connector in pom.xml.

  2. Acquire the connector jar.

  3. Update your Kafka Connect configurations.

    Open /config/connect-standalone.properties in the Kafka download folder. Add the filepath of the downloaded connector jar to plugin.path and uncomment the line if needed. In addition, because the connector is using Kafka Connect in standalone mode, include offset.storage.file.filename with a valid filename to store offset data in.

  4. Create a pair of Pub/Sub or Pub/Sub Lite topic and subscription.

    • CloudPubSubSinkConnector and CloudPubSubSourceConnector
    • PubSubLiteSinkConnector and PubSubLiteSourceConnector
  5. Update the connector configurations.

    Open the connector configuration files at /config. Update variables labeled TODO (developer) with appropriate input.

  6. Run the following command to start the appropriate sink or source connector. You can run multiple connector tasks at the same time.

    > bin/connect-standalone.sh \
      config/connect-standalone.properties \
      path/to/pubsub/sink/connector.properties [source.connector.properties ...]
    
  7. Test the connector.

    • CloudPubSubSinkConnector

      1. Follow the instructions in the Kafka quickstart to publish a message to the Kafka topic.
      2. Pull the message from your Pub/Sub subscription.
    • CloudPubSubSourceConnector

      1. Publish a message to your Pub/Sub topic.
      2. Follow the instructions in the Kafka quickstart to read the message from your Kafka topic.
    • PubSubLiteSinkConnector

      1. Publish a message to your Pub/Sub Lite topic.
      2. Follow the instructions in the Kafka quickstart to read the message from your Kafka topic.
    • PubSubLiteSourceConnector

      1. Follow the instructions in the Kafka quickstart to publish a message to the Kafka topic.
      2. Pull the message from your Pub/Sub Lite subscription.

Acquire the connector

The connector is available from Maven Central repository. Select the latest version of the connector, then download "jar" from the Downloads dropdown menu.

You can also build the connector from head.

Run the connector

Please refer to Kafka User Guide for general information on running connectors using Kafka Connect.

To run this connector using Kafka Connect in standalone mode, follow these steps:

  1. Copy the connector jar where you will run Kafka Connect.

  2. Create a configuration file for your Kafka Connect instance. Make sure to include the filepath to the connector jar in plugin.path.

  3. Make a copy of the connector configuration files at /config and update the configuration options accordingly.

  4. Start Kafka Connect with your connector with the following command:

    > bin/connect-standalone.sh config/connect-standalone.properties connector1.properties [connector2.properties ...]
    
  5. If running the Kafka Connect behind a proxy, export the KAFKA_OPTS variable with options for connecting around the proxy.

    > export KAFKA_OPTS="-Dhttp.proxyHost=<host> -Dhttp.proxyPort=<port> -Dhttps.proxyHost=<host> -Dhttps.proxyPort=<port>"
    

When running the connector on a Kafka cluster in distributed mode, "the connector configurations are not passed on the command line. Instead, use the REST API described below to create, modify, and destroy connectors" (Kafka User Guide).

Pub/Sub connector configs

In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub connector supports the following configurations:

Source Connector

ConfigValue RangeDefaultDescription
cps.subscriptionStringREQUIRED (No default)The Pub/Sub subscription ID, e.g. "baz" for subscription "/projects/bar/subscriptions/baz".
cps.projectStringREQUIRED (No default)The project containing the Pub/Sub subscription, e.g. "bar" from above.
cps.endpointString"pubsub.googleapis.com:443"The Pub/Sub endpoint to use.
kafka.topicStringREQUIRED (No default)The Kafka topic which will receive messages from the Pub/Sub subscription.
cps.maxBatchSizeInteger100The maximum number of messages per batch in a pull request to Pub/Sub.
cps.makeOrderingKeyAttributeBooleanfalseWhen true, copy the ordering key to the set of attributes set in the Kafka message.
kafka.key.attributeStringnullThe Pub/Sub message attribute to use as a key for messages published to Kafka. If set to "orderingKey", use the message's ordering key.
kafka.partition.countInteger1The number of Kafka partitions for the Kafka topic in which messages will be published to. NOTE: this parameter is ignored if partition scheme is "kafka_partitioner".
kafka.partition.schemeround_robin, hash_key, hash_value, kafka_partitioner, ordering_keyround_robinThe scheme for assigning a message to a partition in Kafka. The scheme "round_robin" assigns partitions in a round robin fashion, while the schemes "hash_key" and "hash_value" find the partition by hashing the message key and message value respectively. "kafka_partitioner" scheme delegates partitioning logic to Kafka producer, which by default detects number of partitions automatically and performs either murmur hash based partition mapping or round robin depending on whether message key is provided or not. "ordering_key" uses the hash code of a message's ordering key. If no ordering key is present, uses "round_robin".
gcp.credentials.file.pathStringOptionalThe filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
gcp.credentials.jsonStringOptionalGCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
kafka.record.headersBooleanfalseUse Kafka record headers to store Pub/Sub message attributes.
cps.streamingPull.enabledBooleanfalseWhether to use streaming pull for the connector to connect to Pub/Sub. If provided, cps.maxBatchSize is ignored.
cps.streamingPull.flowControlMessagesLong1,000The maximum number of outstanding messages per task when using streaming pull.
cps.streamingPull.flowControlBytesLong100L * 1024 * 1024 (100 MiB)The maximum number of outstanding message bytes per task when using streaming pull.
cps.streamingPull.parallelStreamsInteger1The maximum number of outstanding message bytes per task when using streaming pull.
cps.streamingPull.maxAckExtensionMsLong0The maximum number of milliseconds the subscribe deadline will be extended to in milliseconds when using streaming pull. A value of 0 implies the java-pubsub library default value.
cps.streamingPull.maxMsPerAckExtensionLong0The maximum number of milliseconds to extend the subscribe deadline for at a time when using streaming pull. A value of 0 implies the java-pubsub library default value.

Sink Connector

ConfigValue RangeDefaultDescription
cps.topicStringREQUIRED (No default)The Pub/Sub topic ID, e.g. "foo" for topic "/projects/bar/topics/foo".
cps.projectStringREQUIRED (No default)The project containing the Pub/Sub topic, e.g. "bar" from above.
cps.endpointString"pubsub.googleapis.com:443"The Pub/Sub endpoint to use.
maxBufferSizeInteger100The maximum number of messages that can be received for the messages on a topic partition before publishing them to Pub/Sub.
maxBufferBytesLong10,000,000The maximum number of bytes that can be received for the messages on a topic partition before publishing them to Pub/Sub.
maxOutstandingRequestBytesLongLong.MAX_VALUEThe maximum number of total bytes that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.
maxOutstandingMessagesLongLong.MAX_VALUEThe maximum number of messages that can be outstanding (including incomplete and pending batches) before the publisher will block further publishing.
maxDelayThresholdMsInteger100The maximum amount of time to wait to reach maxBufferSize or maxBufferBytes before publishing outstanding messages to Pub/Sub.
maxRequestTimeoutMsInteger10,000The timeout for individual publish requests to Pub/Sub.
maxTotalTimeoutMsInteger60,000The total timeout for a call to publish (including retries) to Pub/Sub.
maxShutdownTimeoutMsInteger60,000The maximum amount of time to wait for a publisher to shutdown when stopping task in Kafka Connect.
gcp.credentials.file.pathStringOptionalThe filepath, which stores GCP credentials. If not defined, GOOGLE_APPLICATION_CREDENTIALS env is used.
gcp.credentials.jsonStringOptionalGCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
metadata.publishBooleanfalseWhen true, include the Kafka topic, partition, offset, and timestamp as message attributes when a message is published to Pub/Sub.
headers.publishBooleanfalseWhen true, include any headers as attributes when a message is published to Pub/Sub.
orderingKeySourceString (none, key, partition)noneWhen set to "none", do not set the ordering key. When set to "key", uses a message's key as the ordering key. If set to "partition", converts the partition number to a String and uses that as the ordering key. Note that using "partition" should only be used for low-throughput topics or topics with thousands of partitions.
messageBodyNameString"cps_message_body"When using a struct or map value schema, this field or key name indicates that the corresponding value will go into the Pub/Sub message body.
enableCompressionBooleanfalseWhen true, enable publish-side compression in order to save on networking costs between Kafka Connect and Cloud Pub/Sub.
compressionBytesThresholdLong240When enableCompression is true, the minimum size of publish request (in bytes) to compress.

Pub/Sub Lite connector configs

In addition to the Kafka Connect configurations supplied by the Kafka Connect API, the Pub/Sub Lite connector supports the following configurations:

Source Connector

ConfigValue RangeDefaultDescription
pubsublite.subscriptionStringREQUIRED (No default)The Pub/Sub Lite subscription ID, e.g. "baz" for the subscription "/projects/bar/locations/europe-south7-q/subscriptions/baz".
pubsublite.projectStringREQUIRED (No default)The project containing the Pub/Sub Lite subscription, e.g. "bar" from above.
pubsublite.locationStringREQUIRED (No default)The location of the Pub/Sub Lite subscription, e.g. "europe-south7-q" from above.
kafka.topicStringREQUIRED (No default)The Kafka topic which will receive messages from Pub/Sub Lite.
pubsublite.partition_flow_control.messagesLongLong.MAX_VALUEThe maximum number of outstanding messages per Pub/Sub Lite partition.
pubsublite.partition_flow_control.bytesLong20,000,000The maximum number of outstanding bytes per Pub/Sub Lite partition.

Sink Connector

ConfigValue RangeDefaultDescription
pubsublite.topicStringREQUIRED (No default)The Pub/Sub Lite topic ID, e.g. "foo" for topic "/projects/bar/locations/europe-south7-q/topics/foo".
pubsublite.projectStringREQUIRED (No default)The project containing the Pub/Sub Lite topic, e.g. "bar" from above.
pubsublite.locationStringREQUIRED (No default)The location of the Pub/Sub Lite topic, e.g. "europe-south7-q" from above.

Shared miscellaneous configs

These following configurations are shared by the Pub/Sub and Pub/Sub Lite connectors.

ConfigValue RangeDefaultDescription
gcp.credentials.file.pathStringOptionalThe filepath, which stores GCP credentials. If not defined, the environment variable GOOGLE_APPLICATION_CREDENTIALS is used. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.
gcp.credentials.jsonStringOptionalGCP credentials JSON blob. If specified, use the explicitly handed credentials. Consider using the externalized secrets feature in Kafka Connect for passing the value.

Schema support and data model

Pub/Sub Connector

The message data field of PubSubMessage is a ByteString object that translates well to and from the byte[] bodies of Kafka messages. We recommend using a converter that produces primitive data types (i.e. integer, float, string, or bytes types) where possible to avoid deserializing and re-serializing the same message body.

Additionally, a Pub/Sub message cannot exceed 10 MB. We recommend checking your message.max.bytes configuration to prevent possible errors.

The sink connector handles message conversion in the following way:

Note: Pub/Sub message attributes have the following limitations:

The connector will transform Kafka record-level message headers that meet these limitations and ignore those that don't.

The source connector handles the conversion from a Pub/Sub message into a Kafka SourceRecord in a similar way:

Pub/Sub Lite Connector

Pub/Sub Lite messages have the following structure:

class Message {

  ByteString key;
  ByteString data;
  ListMultimap<String, ByteString> attributes;
  Optional<Timestamp> eventTime;
}

This table shows how each field in a Kafka SinkRecord maps to a Pub/Sub Lite message by the sink connector:

SinkRecordMessage
key{Schema}key
value{Schema}data
headersattributes
topicattributes["x-goog-pubsublite-source-kafka-topic"]
kafkaPartitionattributes["x-goog-pubsublite-source-kafka-partition"]
kafkaOffsetattributes["x-goog-pubsublite-source-kafka-offset"]
timestampeventTime
timestampTypeattributes["x-goog-pubsublite-source-kafka-event-time-type"]

When a key, value or header value with a schema is encoded as a ByteString, the following logic will be used:

The source connector performs a one-to-one mapping from SequencedMessage fields to their Kafka SourceRecord counterparts.

Pub/Sub Lite message of empty message.key fields will have their field values be converted to null, and they will be assigned to Kafka partitions using the round-robin scheme. Messages with identical, non-empty keys will be routed to the same Kafka partition.

SequencedMessageSourceRecord fieldSourceRecord schema
message.keykeyBYTES
message.datavalueBYTES
message.attributesheadersBYTES
<source topic>sourcePartition["topic"]String field in map
<source partition>sourcePartition["partition"]Integer field in map
cursor.offsetsourceOffset["offset"]Long field in map
message.event_timetimestamplong milliseconds since unix epoch if present
publish_timetimestamplong milliseconds since unix epoch if no event_time exists

Build the connector

These instructions assume you are using Maven.

  1. Clone this repository:

    > git clone https://github.com/googleapis/java-pubsub-group-kafka-connector
    
  2. Package the connector jar:

    > mvn clean package -DskipTests=True
    

    You should see the resulting jar at target/pubsub-group-kafka-connector-${VERSION}-SNAPSHOT.jar on success.

Versioning

This library follows Semantic Versioning.

Contributing

Contributions to this library are always welcome and highly encouraged.

See CONTRIBUTING for more information how to get started.

Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.

License

Apache 2.0 - See LICENSE for more information.