Home

Awesome

Samza-Luwak Proof of Concept

This project is an experimental setup for using Luwak within Samza.

Use case: imagine you want to implement something like Google Alerts. Every time a new document is published, you want to run it through a list of search queries, and notify someone if the document matches one of the queries. (There's actually a whole industry that specialises in doing this.) A similar use case is Twitter search, when you want to see a stream of tweets matching a search query.

We're exploring ways of making such streaming search queries scalable (scaling both to large throughput of documents and large numbers of queries), using the following tools:

What we're doing is a bit similar to ElasticSearch's percolator, but we think it has the potential to perform better and be more scalable.

High-level architecture

There are two input streams and one output stream, which are implemented as Kafka topics:

Note that there is currently no persistent index of documents. This is pure stream search: search results include only documents that are published after the query is registered, but not historical documents.

Partitioning

How do we make this fast and scalable? Evaluating a single search query is fast, but if you're matching each document against hundreds of thousands of queries (some of which can be very complex), it can take a significant time to process every document. We are investigating several approaches:

At the moment, we are exploring partitioning of the query set. This requires a multi-stage pipeline:

  1. Documents need to be published to all partitions of the documents stream. Queries need to be partitioned by query ID (and only published to one partition).
  2. The matching job matches each documents against the queries within its partition, and emits a message to the "matches" stream, indicating which query IDs matched that document ID.
  3. A second job consumes the "matches" stream, and waits for the matches from all the partitions to arrive. When all partitions have processed a particular document, this job emits a message to the "combined-matches" stream, which now includes the matching query IDs from all partitions.

Building

This project is very hacky and experimental, and may not work at all. It's also a bit convoluted to build right now, because it depends on various unreleased components.

Check out Samza (master branch), build it and install it to your local Maven repository:

git clone https://github.com/apache/incubator-samza.git samza
cd samza
./gradlew -PscalaVersion=2.10 clean publishToMavenLocal

Check out hello-samza (latest branch), and use it to launch Zookeeper, Kafka and YARN locally:

git clone https://github.com/apache/incubator-samza-hello-samza.git hello-samza
cd hello-samza
git checkout latest
bin/grid bootstrap
bin/grid start all
for topic in documents queries matches1 combinedmatches matches-combiner-changelog; do
  deploy/kafka/bin/kafka-topics.sh --zookeeper localhost:2181 --create --topic $topic --partitions 2 --replication-factor 1
done

Check out Flax's fork of Lucene (positions-5x branch), which has a new feature that Luwak needs, but is not yet upstream (it should be committed upstream eventually):

git clone https://github.com/flaxsearch/lucene-solr-intervals.git
cd lucene-solr-internals
git checkout positions-5x
mvn -DskipTests install

Check out Luwak (1.1.x branch), build it and install it to your local Maven repository (note this currently doesn't work with JDK8):

git clone https://github.com/flaxsearch/luwak.git
cd luwak
git checkout 1.1.x
mvn install

Build and run this project:

git clone https://github.com/romseygeek/samza-luwak.git
cd samza-luwak
mvn clean package
mkdir deploy
tar -xzvf target/samza-luwak-1.0-SNAPSHOT-dist.tar.gz -C deploy/
deploy/bin/run-job.sh --config-path=file://$PWD/deploy/config/luwak.properties
deploy/bin/run-job.sh --config-path=file://$PWD/deploy/config/combiner.properties

Now you can try adding some test documents and queries to the system, and observe the output:

hello-samza/deploy/kafka/bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic combinedmatches --from-beginning &
java -jar samza-luwak/target/samza-luwak-1.0-SNAPSHOT.jar
q query1 foo AND bar
q query2 bar AND baz
d doc1 this document contains the words foo and bar only
d doc2 this document, on the other hand, mentions bar and baz.
d doc3 this one goes nuts and mentions foo, bar and baz -- all three!
d doc4 finally, this one mentions none of those words.
quit

In that command-line tool, queries are defined with "q", followed by the query ID, followed by a Lucene query string. Documents are defined with "d", followed by the document ID, followed by the text of the document.