Awesome
Google Pub/Sub Lite Spark Connector Client for Java
Java idiomatic client for Pub/Sub Lite Spark Connector.
Quickstart
If you are using Maven, add this to your pom.xml file:
<dependency>
<groupId>com.google.cloud</groupId>
<artifactId>pubsublite-spark-sql-streaming</artifactId>
<version>1.0.0</version>
</dependency>
If you are using Gradle without BOM, add this to your dependencies:
implementation 'com.google.cloud:pubsublite-spark-sql-streaming:1.0.0'
If you are using SBT, add this to your dependencies:
libraryDependencies += "com.google.cloud" % "pubsublite-spark-sql-streaming" % "1.0.0"
Authentication
See the Authentication section in the base directory's README.
Authorization
The client application making API calls must be granted authorization scopes required for the desired Pub/Sub Lite Spark Connector APIs, and the authenticated principal must have the IAM role(s) required to access GCP resources using the Pub/Sub Lite Spark Connector API calls.
Getting Started
Prerequisites
You will need a Google Cloud Platform Console project with the Pub/Sub Lite Spark Connector API enabled.
You will need to enable billing to use Google Pub/Sub Lite Spark Connector.
Follow these instructions to get your project set up. You will also need to set up the local development environment by
installing the Google Cloud SDK and running the following commands in command line:
gcloud auth login
and gcloud config set project [YOUR PROJECT ID]
.
Installation and setup
You'll need to obtain the pubsublite-spark-sql-streaming
library. See the Quickstart section
to add pubsublite-spark-sql-streaming
as a dependency in your code.
About Pub/Sub Lite Spark Connector
Google Cloud Pub/Sub Lite is a zonal, real-time messaging service that lets you send and receive messages between independent applications. You can manually configure the throughput and storage capacity for Pub/Sub Lite systems.
The Pub/Sub Lite Spark connector supports Pub/Sub Lite as an input source to Apache Spark Structured Streaming in both the default micro-batch processing mode and the experimental continous processing mode. The connector works in all Apache Spark distributions, including Google Cloud Dataproc and manual Spark installations.
Requirements
Creating a new subscription or using an existing subscription
Follow the instruction to create a new subscription or use an existing subscription. If using an existing subscription, the connector will read from the oldest unacknowledged message in the subscription.
Creating a Google Cloud Dataproc cluster (Optional)
If you do not have an Apache Spark environment, you can create a Cloud Dataproc cluster with pre-configured auth. The following examples assume you are using Cloud Dataproc, but you can use spark-submit
on any cluster.
MY_CLUSTER=...
gcloud dataproc clusters create "$MY_CLUSTER"
Downloading and Using the Connector
The latest version of the connector is publicly available from the Maven Central repository. You can download and pass it in the --jars
option when using the spark-submit
command.
Compatibility
Connector version | Spark version |
---|---|
≤0.3.4 | 2.4.X |
Current | 3.X.X |
Usage
Samples
There are 3 java samples (word count, simple write, simple read) under samples that shows using the connector inside Dataproc.
Reading data from Pub/Sub Lite
Here is an example in Python:
df = spark.readStream \
.format("pubsublite") \
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID") \
.load
Here is an example in Java:
Dataset<Row> df = spark
.readStream()
.format("pubsublite")
.option("pubsublite.subscription", "projects/$PROJECT_NUMBER/locations/$LOCATION/subscriptions/$SUBSCRIPTION_ID")
.load();
Note that the connector supports both MicroBatch Processing and Continuous Processing.
Writing data to Pub/Sub Lite
Here is an example in Python:
df.writeStream \
.format("pubsublite") \
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID") \
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode("complete") \
.trigger(processingTime="2 seconds") \
.start()
Here is an example in Java:
df.writeStream()
.format("pubsublite")
.option("pubsublite.topic", "projects/$PROJECT_NUMBER/locations/$LOCATION/topics/$TOPIC_ID")
.option("checkpointLocation", "path/to/HDFS/dir")
.outputMode(OutputMode.Complete())
.trigger(Trigger.ProcessingTime(2, TimeUnit.SECONDS))
.start();
Properties
When reading from Pub/Sub Lite, the connector supports a number of configuration options:
Option | Type | Required | Default Value | Meaning |
---|---|---|---|---|
pubsublite.subscription | String | Y | Full subscription path that the connector will read from. | |
pubsublite.flowcontrol.byteoutstandingperpartition | Long | N | 50_000_000 | Max number of bytes per partition that will be cached in workers before Spark processes the messages. |
pubsublite.flowcontrol.messageoutstandingperpartition | Long | N | Long.MAX | Max number of messages per partition that will be cached in workers before Spark processes the messages. |
pubsublite.flowcontrol.maxmessagesperbatch | Long | N | Long.MAX | Max number of messages in micro batch. |
gcp.credentials.key | String | N | Application Default Credentials | Service account JSON in base64. |
When writing to Pub/Sub Lite, the connector supports a number of configuration options:
Option | Type | Required | Default Value | Meaning |
---|---|---|---|---|
pubsublite.topic | String | Y | Full topic path that the connector will write to. | |
gcp.credentials.key | String | N | Application Default Credentials | Service account JSON in base64. |
Data Schema
When reading from Pub/Sub Lite, the connector has a fixed data schema as follows:
Data Field | Spark Data Type | Notes |
---|---|---|
subscription | StringType | Full subscription path |
partition | LongType | |
offset | LongType | |
key | BinaryType | |
data | BinaryType | |
attributes | MapType[StringType, ArrayType[BinaryType]] | |
publish_timestamp | TimestampType | |
event_timestamp | TimestampType | Nullable |
When writing to Pub/Sub Lite, the connetor matches the following data field and data types as follows:
Data Field | Spark Data Type | Required |
---|---|---|
key | BinaryType | N |
data | BinaryType | N |
attributes | MapType[StringType, ArrayType[BinaryType]] | N |
event_timestamp | TimestampType | N |
Note that when a data field is present in the table but the data type mismatches, the connector will throw IllegalArgumentException that terminates the query.
Building the Connector
The connector is built using Maven. Following command creates a JAR file with shaded dependencies:
mvn package
FAQ
What is the cost for the Pub/Sub Lite?
See the Pub/Sub Lite pricing documentation.
Can I configure the number of Spark partitions?
No, the number of Spark partitions is set to be the number of Pub/Sub Lite partitions of the topic that the subscription is attached to.
How do I authenticate outside Cloud Compute Engine / Cloud Dataproc?
Use a service account JSON key and GOOGLE_APPLICATION_CREDENTIALS
as described here.
Credentials can be provided with gcp.credentials.key
option, it needs to be passed in as a base64-encoded string.
Example:
spark.readStream.format("pubsublite").option("gcp.credentials.key", "<SERVICE_ACCOUNT_JSON_IN_BASE64>")
Samples
Samples are in the samples/
directory.
Sample | Source Code | Try it |
---|---|---|
Admin Utils | source code | |
Common Utils | source code | |
Publish Words | source code | |
Read Results | source code | |
Simple Read | source code | |
Simple Write | source code | |
Word Count | source code |
Troubleshooting
To get help, follow the instructions in the shared Troubleshooting document.
Transport
Pub/Sub Lite Spark Connector uses gRPC for the transport layer.
Supported Java Versions
Java 8 or above is required for using this client.
Google's Java client libraries, Google Cloud Client Libraries and Google Cloud API Libraries, follow the Oracle Java SE support roadmap (see the Oracle Java SE Product Releases section).
For new development
In general, new feature development occurs with support for the lowest Java LTS version covered by Oracle's Premier Support (which typically lasts 5 years from initial General Availability). If the minimum required JVM for a given library is changed, it is accompanied by a semver major release.
Java 11 and (in September 2021) Java 17 are the best choices for new development.
Keeping production systems current
Google tests its client libraries with all current LTS versions covered by Oracle's Extended Support (which typically lasts 8 years from initial General Availability).
Legacy support
Google's client libraries support legacy versions of Java runtimes with long term stable libraries that don't receive feature updates on a best efforts basis as it may not be possible to backport all patches.
Google provides updates on a best efforts basis to apps that continue to use Java 7, though apps might need to upgrade to current versions of the library that supports their JVM.
Where to find specific information
The latest versions and the supported Java versions are identified on
the individual GitHub repository github.com/GoogleAPIs/java-SERVICENAME
and on google-cloud-java.
Versioning
This library follows Semantic Versioning.
Contributing
Contributions to this library are always welcome and highly encouraged.
See CONTRIBUTING for more information how to get started.
Please note that this project is released with a Contributor Code of Conduct. By participating in this project you agree to abide by its terms. See Code of Conduct for more information.
License
Apache 2.0 - See LICENSE for more information.
CI Status
Java Version | Status |
---|---|
Java 8 | |
Java 8 OSX | |
Java 8 Windows | |
Java 11 |
Java is a registered trademark of Oracle and/or its affiliates.