Awesome
clj-spark
A Clojure api for the Spark Project. It is useable, but not complete. I provide it as a starting point. It should be simple enough to add the additional wrappers as you need them. This is the result of about three weeks of work.
It handles many of the initial problems like serializing anonymous functions, converting back and forth between Scala Tuples and Clojure seqs, and converting RDDs to PairRDDs.
What is Spark?
From http://spark-project.org/docs/latest/index.html
Spark is a MapReduce-like cluster computing framework designed for low-latency iterative jobs and interactive use from an interpreter. It provides clean, language-integrated APIs in Scala and Java, with a rich array of parallel operators. Spark can run on top of the Apache Mesos cluster manager, Hadoop YARN, Amazon EC2, or without an independent resource manager (“standalone mode”).
Example usage
There is a complete sample program in src/clj_spark/examples/query.clj. To run it, clone this repo and cd into it. You will need Leiningen 2 installed (assuming this is available on your PATH as lein2):
$ git clone https://github.com/TheClimateCorporation/clj-spark.git
$ cd clj-spark
$ lein2 deps
$ lein2 compile
$ lein2 run
Compiling clj-spark.api
2013-01-02 13:18:41.477 java[65466:1903] Unable to load realm mapping info from
SCDynamicStore
==============
Premium Per State
NY 600.0
==============
TOP100
#{1 2}
==============
CTE Per State
NY 70.0
==============
TOP100PERSTATE
{NY #{1 2}}
==============
Standalone CTE Per State
NY 70.0
==============
The following are subsections copied from query.clj:
Here is a sample of creating an RDD:
(-> (.textFile sc testfile)
(k/map k/csv-split)
; _,policy-id,field-id,_,_,element-id,element-value
(k/map (k/feach identity as-integer as-long identity identity as-integer as-double))
(k/map (juxt (k/fchoose 1 2) (k/fchoose 5 6))) ; [ [policy-id field-id] [element-id element-value] ]
k/cache)
And a sample query on that data:
(-> input-rdd
(k/map second) ; [element-id element-value]
(k/reduce-by-key +) ; [element-id total]
(k/map (k/fchoose 1 0)) ; [total element-id]
(k/sort-by-key false) ; desc
(k/map second) ; element-id
(k/take 2) ; TODO n=100
set)
Running queries from the REPL
You can also start a repl and play around:
# assuming you already did deps and compile above...
$ lein2 repl
; deleted results to be more concise
user=> (use 'serializable.fn 'clj-spark.util)
user=> (require '[clj-spark.api :as k])
user=> (def sc (k/spark-context :master "local" :job-name "Simple Job"))
user=> (def r1 (k/parallelize sc [10 20 25 30 35]))
user=> (def r2 (k/text-file sc "test/resources/input.csv"))
user=> (k/count r2)
5
user=> (def result (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +)))
#'clj-spark.examples.query/result
user=> (k/collect result)
#<ArrayList [[false 63], [true 62]]>
; or, all in one step:
user=> (-> r1 (k/map inc) (k/map (fn [t] [(even? t) t])) (k/reduce-by-key +) k/collect)
#<ArrayList [[false 63], [true 62]]>
Running on a cluster
Start a cluster first
You can use the spark EC2 scripts to do this. Full documentation is at http://spark-project.org/docs/latest/ec2-scripts.html, but the short version is:
# set you AWS credentials
$ export AWS_ACCESS_KEY_ID="..."
$ export AWS_SECRET_ACCESS_KEY="..."
$ cd $SPARK_HOME
# This is your keypair used by EC2...
$ KEY_PAIR_NAME=your-keypaid
$ KEY_PAIR_FILE=~/.ssh/your-keypair-file
# Launch
$ ec2/spark-ec2 -k $KEY_PAIR_NAME -i $KEY_PAIR_FILE -s 1 launch marcs-test
# It starts a cluster with 2 m1.large nodes
# Wait for the script to finish and find the hostname of the master, and set it:
SPARK_MASTER=ec2-184-73-18-84.compute-1.amazonaws.com
You can open http://$SPARK_MASTER:8080
in your browser to see the Spark UI for tracking purposes
Now upload the code and test input file.
$ lein2 uberjar
$ scp -i $KEY_PAIR_FILE target/clj-spark-0.1.0-SNAPSHOT-standalone.jar root@$SPARK_MASTER:~/
$ scp -r -i $KEY_PAIR_FILE test/resources/input.csv root@$SPARK_MASTER:~/
And login to the cluster
ec2/spark-ec2 -k $KEY_PAIR_NAME -i $KEY_PAIR_FILE login marcs-test
Now that you are on the master, execute these commands their:
# Extract the clj source files, so we can execute with clojure.main path-to-query.clj
$ cd $HOME
$ jar xf clj-spark-0.1.0-SNAPSHOT-standalone.jar clj_spark/examples/query.clj
# Config
$ export SPARK_CLASSPATH=clj-spark-0.1.0-SNAPSHOT-standalone.jar
$ export SPARK_JAVA_OPTS="-Dlog4j.configuration=file:///root/spark/conf/log4j.properties.template"
# Put the test data into HDFS
$ ephemeral-hdfs/bin/hadoop fs -put input.csv /root/test/resources/input.csv
# Run it
$ spark/run clojure.main clj_spark/examples/query.clj --master "mesos://localhost:5050" \
--jars clj-spark-0.1.0-SNAPSHOT-standalone.jar \
--input hdfs://`hostname`:9000/root/test/resources/input.csv
Note: there is a lot of noise in the log output for level INFO. But if you don't have this logging configured, you won't see any errors if things go wrong. And, surprisingly, Exceptions are logged at the INFO level, not the WARN level.
At this point, if you refresh the Spark web UI mentioned above, you should see a new entry under Framework History.
When you are done, remember to terminate the cluster:
ec2/spark-ec2 destroy marcs-test
You can find copies of the log files for tasks by ssh'ing to one of the nodes and looking in /mnt/mesos-work/slaves/
Other clojure apis
After working on this, I found another Clojure API for Spark project: https://github.com/markhamstra/spark/tree/master/cljspark
It's a bit more complete, but no examples. You might find good ideas in both projects.
Known Issues
Function serialization
You must create your anonymous functions using serializable.fn/fn, as in:
(ns ...
(:require \[serializable.fn :as sfn\]))
(sfn/fn my-inc \[x\] (+ x 1))
Do not use clojure.core/fn or #(). This is necessary because the anonymous function must be serialized so it can be passed around to distributed tasks.
AOT compilation
Generally speaking, any functions that are used in the Spark calls will need to be part of AOT compiled namespaces. I.e. they need to be compiled or the distributed Spark tasks will not be able to find them. In some cases, compiling on the fly might work also:
(compile 'your-namespace)
But you need to do this somewhere where it will be executed for each task.
NOTE: This should be avoidable using the serializable.fn as above, but I did not get that to work in my initial attempts.
None of the Double* method are implemented
The Spark Java API provides versions of some methods that accept or return Doubles. E.g. (copied from Spark docs, using Scala syntax):
def map[R](f: DoubleFunction[T]): JavaDoubleRDD
So class DoubleFunction<T> has a function type of T => Double
Compare this to the standard:
def map[R](f: Function[T, R]): JavaRDD[R]
Where Function<T, R> has type T => R
I didn't wrap any of these. To be honest, I don't see why they are needed. Maybe I'm missing something or maybe it just doesn't matter when called from a dynamically typed language like Clojure. Instead of DoubleFunction[T], just use Function<T, Double> which has type T => Double. I don't see why this wouldn't work, but interested to know if there is a case where this fails or is sub-optimal.