Home

Awesome

Flambo

Flambo

Flambo is a Clojure DSL for Apache Spark

Contents

<a name="overview">

Overview

Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala and Python, and an optimized engine that supports general execution graphs.

Flambo is a Clojure DSL for Spark. It allows you to create and manipulate Spark data structures using idiomatic Clojure.

"So that's where I came from." --Flambo

<a name="versions">

Supported Spark Versions

flambo 0.8.2 targets Spark 2.x flambo 0.7.2 targets Spark 1.x

<a name="installation">

Installation

Flambo is available from clojars. Depending on the version of Spark you're using, add one of the following to the dependences in your project.clj file:

With Leiningen

[yieldbot/flambo "0.8.2"] for Spark 2.x [yieldbot/flambo "0.7.2"] for Spark 1.x

Don't forget to add spark (and possibly your hadoop distribution's hadoop-client library) to the :provided profile in your project.clj file:

{:profiles {:provided
             {:dependencies
              [[org.apache.spark/spark-core_2.11 "2.2.0"]]}}}
<a name="aot">

AOT

It is necessary to AOT compile any namespaces which require flambo.api. You can AOT compile your application uberjar before running it in your spark cluster. This can easily accomplished by adding an :uberjar profile with {:aot :all} in it.

When working locally in a REPL, you'll want to AOT compile those namespaces as well. An easy way to do that is to add an :aot key to your :dev profile in your leiningen project.clj

:profiles {:dev
    {:aot [my.namespace my.other.namespace]}}
<a name="usage">

Usage

Flambo makes developing Spark applications quick and painless by utilizing the powerful abstractions available in Clojure. For instance, you can use the Clojure threading macro -> to chain sequences of operations and transformations.

<a name="initializing-flambo">

Initializing flambo

The first step is to create a Spark configuration object, SparkConf, which contains information about your application. This is used to construct a SparkContext object which tells Spark how to access a cluster.

Here we create a SparkConf object with the string local to run in local mode:

(ns com.fire.kingdom.flambit
  (:require [flambo.conf :as conf])
  (:require [flambo.api :as f]))

(def c (-> (conf/spark-conf)
           (conf/master "local")
           (conf/app-name "flame_princess")))

(def sc (f/spark-context c))

The master url string parameter can be one of the following formats:

Master URLMeaning
spark://HOST:PORTConnect to a standalone Spark cluster master.
mesos://HOST:PORTConnect to a Mesos cluster.
localUse one worker thread to run Spark locally (no parallelism).
local[N]Use N worker threads to run Spark locally.
local[*]Use the same number of threads as cores to run Spark locally. <br> Only available for Spark 1.0.0+

For running on YARN, see running on YARN for details.

Hard-coding the value of master and other configuration parameters can be avoided by passing the values to Spark when running spark-submit (Spark 1.0.0) or by allowing spark-submit to read these properties from a configuration file. See Standalone Applications for information on running flambo applications and see Spark's documentation for more details about configuring Spark properties.

<a name="rdds">

Resilient Distributed Datasets (RDDs)

The main abstraction Spark provides is a resilient distributed dataset, RDD, which is a fault-tolerant collection of elements partitioned across the nodes of the cluster that can be operated on in parallel. There are two ways to create RDDs: parallelizing an existing collection in your driver program, or referencing a dataset in an external storage system, such as a shared filesystem, HDFS, HBase, or any data source offering a Hadoop InputFormat.

Parallelized Collections

Parallelized collections (RDDs) in flambo are created by calling the parallelize function on your Clojure data structure:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(def data (f/parallelize sc [["a" 1] ["b" 2] ["c" 3] ["d" 4] ["e" 5]]))

Once initialized, the distributed dataset or RDD can be operated on in parallel.

An important parameter for parallel collections is the number of slices to cut the dataset into. Spark runs one task for each slice of the cluster. Normally, Spark tries to set the number of slices automatically based on your cluster. However, you can also set it manually in flambo by passing it as a third parameter to parallelize:

(def data (f/parallelize sc [1 2 3 4 5] 4))

External Datasets

Spark can create RDDs from any storage source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase, Amazon S3, etc. Spark supports text files, SequenceFiles, and any other Hadoop InputFormat.

Text file RDDs can be created in flambo using the text-file function under the flambo.api namespace. This function takes a URI for the file (either a local path on the machine, or a hdfs://..., s3n://..., etc URI) and reads it as a collection of lines. Note, text-file supports S3 and HDFS globs.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(def data (f/text-file sc "hdfs://hostname:<port>/home/user/data_archive/2013/12/23/*/*.bz2"))
<a name="rdd-operations">

RDD Operations

RDDs support two types of operations:

<a name="basics">

Basics

To illustrate RDD basics in flambo, consider the following simple application using the sample data.txt file located at the root of the flambo repo.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

;; NOTE: we are using the flambo.api/fn not clojure.core/fn
(-> (f/text-file sc "data.txt")   ;; returns an unrealized lazy dataset
    (f/map (f/fn [s] (count s)))  ;; returns RDD array of length of lines
    (f/reduce (f/fn [x y] (+ x y)))) ;; returns a value, should be 1406

The first line defines a base RDD from an external file. The dataset is not loaded into memory; it is merely a pointer to the file. The second line defines an RDD of the lengths of the lines as a result of the map transformation. Note, the lengths are not immediately computed due to laziness. Finally, we run reduce on the transformed RDD, which is an action, returning only a value to the driver program.

If we also wanted to reuse the resulting RDD of length of lines in later steps, we could insert:

(f/cache)

before the reduce action, which would cause the line-lengths RDD to be saved to memory after the first time it is realized. See RDD Persistence for more on persisting and caching RDDs in flambo.

<a name="flambo-functions">

Passing Functions to flambo

Spark’s API relies heavily on passing functions in the driver program to run on the cluster. Flambo makes it easy and natural to define serializable Spark functions/operations and provides two ways to do this:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(f/defsparkfn square [x] (* x x))
(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map (f/fn [x] (* x x))))

When we evaluate this map transformation on the initial RDD, the result is another RDD. The result of this transformation can be seen using the f/collect action to return all of the elements of the RDD.

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map (f/fn [x] (* x x)))
    f/collect)
;; => [1 4 9 16 25]

We can also use f/first or f/take to return just a subset of the data.

(-> (f/parallelize sc [1 2 3 4 5])
    (f/map square)
    (f/take 2))
;; => [1 4]
<a name="key-value-pairs">

Working with Key-Value Pairs

While most Spark operations work on RDDs containing any type of objects, a few special operations are only available on RDDs of key-value pairs. The most common ones are distributed "shuffle" operations, such as grouping or aggregating the elements by a key.

In flambo, these operations are available on RDDs of (key, value) tuples. Flambo handles all of the transformations/serializations to/from Tuple, Tuple2, JavaRDD, JavaPairRDD, etc., so you only need to define the sequence of operations you'd like to perform on your data.

The following code generates pairs of word and count using ft/tuple. We can then use the reduce-by-key operation on the pairs to count how many times each word occurs in a file:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]
            [flambo.tuple :as ft]
            [clojure.string :as s]))

(-> (f/text-file sc "data.txt")
    (f/flat-map (f/iterator-fn [l] (s/split l #" ")))
    (f/map-to-pair (f/fn [w] (ft/tuple w 1)))
    (f/reduce-by-key (f/fn [x y] (+ x y))))

After the reduce-by-key operation, we can sort the pairs alphabetically using f/sort-by-key. To collect the word counts as an array of objects in the repl or to write them to a filesysten, we can use the f/collect action:

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]
            [flambo.tuple :as ft]
            [clojure.string :as s]))

(-> (f/text-file sc "data.txt")
    (f/flat-map (f/iterator-fn [l] (s/split l #" ")))
    (f/map-to-pair (f/fn [w] (ft/tuple w 1)))
    (f/reduce-by-key (f/fn [x y] (+ x y)))
    f/sort-by-key
    f/collect
    clojure.pprint/pprint)
<a name="rdd-transformations">

RDD Transformations

Flambo supports the following RDD transformations:

<a name="rdd-actions">

RDD Actions

Flambo supports the following RDD actions:

<a name="tuple-functions">

Tuple functions

Flambo supports the following tuple functions:

To see an example of these functions in use, check out the tf-idf example.

<a name="rdd-persistence">

RDD Persistence

Spark provides the ability to persist (or cache) a dataset in memory across operations. Spark’s cache is fault-tolerant – if any partition of an RDD is lost, it will automatically be recomputed using the transformations that originally created it. Caching is a key tool for iterative algorithms and fast interactive use. Like Spark, flambo provides the functions f/persist and f/cache to persist RDDs. f/persist sets the storage level of an RDD to persist its values across operations after the first time it is computed. Storage levels are available in the flambo.api/STORAGE-LEVELS map. This can only be used to assign a new storage level if the RDD does not have a storage level set already. cache is a convenience function for using the default storage level, 'MEMORY_ONLY'.

(ns com.fire.kingdom.flambit
  (:require [flambo.api :as f]))

(let [line-lengths (-> (f/text-file sc "data.txt")
                       (f/map (f/fn [s] (count s)))
                       f/cache)]
  (-> line-lengths
      (f/reduce (f/fn [x y] (+ x y)))))
<a name="running-flambo">

Standalone Applications

To run your flambo application as a standalone application using the Spark API, you'll need to package your application in an uberjar using lein and execute it with:

$ lein uberjar
...

$ SPARK_CLASSPATH=uberjar.jar spark-class com.some.class.with.main --flag1 arg1 --flag2 arg2
...
<output>

$ spark-submit --class com.some.class.with.main uberjar.jar --flag1 arg1 --flag2 arg2
...
<output>
<a name="kryo">

Kryo

Flambo requires that Spark is configured to use kryo for serialization. This is configured by default using system properties.

If you need to register custom serializers, extend flambo.kryo.BaseFlamboRegistrator and override its register method. Finally, configure your SparkContext to use your custom registrator by setting spark.kryo.registrator to your custom class.

There is a convenience macro for creating registrators, flambo.kryo.defregistrator. The namespace where a registrator is defined should be AOT compiled.

Here is an Example (this won't work in your REPL):

(ns com.fire.kingdom.flambit
  (:require [flambo.kryo :as kryo])
  (:import [flameprincess FlamePrincessHeat FlamePrincessHeatSerializer]))

(kryo/defregistrator flameprincess [this kryo]
  (.register kryo FlamePrincessHeat (FlamePrincessHeatSerializer.)))

(def c (-> (conf/spark-conf)
       (conf/set "spark.kryo.registrator" flameprincess)))
<a name="acknowledgements">

Acknowledgements

Thanks to The Climate Corporation and their open source project clj-spark which served as the starting point for this project.

Thanks to Ben Black for doing the work on the streaming api.

<a name="support">

Support

There is a #flambo channel available for support on the Clojurians Slack as well as a flambo-users google group.

YourKit

<img src="http://www.yourkit.com/images/yklogo.png"></img>

YourKit has generously supplied an open source license for their profiler to improve the performance of Flambo.

YourKit supports open source projects with its full-featured Java Profiler. YourKit, LLC is the creator of <a href="http://www.yourkit.com/java/profiler/index.jsp">YourKit Java Profiler</a> and <a href="http://www.yourkit.com/.net/profiler/index.jsp">YourKit .NET Profiler</a>, innovative and intelligent tools for profiling Java and .NET applications.

<a name="license">

License

Copyright © 2014,2015 Yieldbot, Inc.

Distributed under the Eclipse Public License either version 1.0 or (at your option) any later version.