Home

Awesome

Build Status

AMQP connector for Spark Streaming

This project provides an AMQP (Advanced Message Queuing Protocol) connector for Apache Spark Streaming in order to ingest data as a stream from all possible AMQP based sources like :

The implementation offers the following receivers :

The stream doesn't provide the received AMQP messages directly as elements of the RDDs micro batches but from the driver it's possible to pass a converter function in order to convert each message in the desidered format; it will be the type of the elements inside the RDDs micro batches. Two built in message converter functions are provided (as sample) :

Project References

Using Maven

<dependency>
    <groupId>io.radanalytics</groupId>
    <artifactId>spark-streaming-amqp_2.11</artifactId>
    <version>0.3.0</version>
</dependency>

Using SBT

libraryDependencies += "io.radanalytics" %% "spark-streaming-amqp" % "0.3.0"

The library can be added to a Spark job launched through spark-shell or spark-submit using the --packages or --jars command line options. In order to use the --packages option, the library needs to be installed into the local repository.

bin/spark-shell --packages io.radanalytics:spark-streaming-amqp_2.11:0.3.0

About installing package in the local repository, the mvn clean install command (for Maven) or the sbt publish (for SBT) need to be used.

Receivers

The AMQP receiver is started using the AMQPUtils.createStream method which returns an InputDStream and needs following parameters :

Using default Spark configuration, a non reliable receiver is started. In order to use the reliable version, the WAL (Write Ahead Logs) and checkpoing must be enabled in the driver application. The WAL is enabled setting the following configuration parameter to true :

spark.streaming.receiver.writeAheadLog.enable

Scala

val converter = new AMQPBodyFunction[String]

val receiveStream = AMQPUtils.createStream(ssc,
                host, port, address,
                converter, StorageLevel.MEMORY_ONLY)

Java

Function converter = new JavaAMQPBodyFunction<String>();

String sendMessage = "Spark Streaming & AMQP";
JavaReceiverInputDStream<String>  receiveStream =
        AMQPUtils.createStream(this.jssc,
                this.host,
                this.port,
                this.username,
                this.password,
                this.address, converter, StorageLevel.MEMORY_ONLY());

Python

The Python API leverages on the JSON converter and the RDDs micro batches always contain a String with the JSON representation of the received AMQP message.

receiveStream = AMQPUtils.createStream(ssc, host, port, address)

Example

The Scala example provided with the current project is related to a simple IoT scenario where the AMQP receiver gets temperature values from a temperature address. It could be the name of a queue on a broker or a direct address inside a router network where a device is sending data.

The following message converter function is used, in order to estract the temperature value as an Int from the AMQP message body.

def messageConverter(message: Message): Option[Int] = {
  message.getBody match {
      case body: Data => {
        val temp: Int = new String(body.getValue.getArray).toInt
        Some(temp)
      }
      case body: AmqpValue => {
        val temp: Int = body.asInstanceOf[AmqpValue].getValue.asInstanceOf[String].toInt
        Some(temp)
      }
      case _ => None
  }
}

The input stream returned by the AMQP receiver is processed with the reduceByWindow method in order to get the maximum temperature value in a sliding window (5 seconds on top of a batch interval of 1 second).

val receiveStream = AMQPUtils.createStream(ssc, host, port, username, password, address, messageConverter _, StorageLevel.MEMORY_ONLY)

// get maximum temperature in a window
val max = receiveStream.reduceByWindow((a,b) => if (a > b) a else b, Seconds(5), Seconds(5))

max.print()

The full source code is available in the examples folder with the same version in Python.

Releasing

For details about releasing new version please consult RELEASING.md