Home

Awesome

Timely Dataflow

Timely dataflow is a low-latency cyclic dataflow computational model, introduced in the paper Naiad: a timely dataflow system. This project is an extended and more modular implementation of timely dataflow in Rust.

This project is something akin to a distributed data-parallel compute engine, which scales the same program up from a single thread on your laptop to distributed execution across a cluster of computers. The main goals are expressive power and high performance. It is probably strictly more expressive and faster than whatever you are currently using, assuming you aren't yet using timely dataflow.

Be sure to read the documentation for timely dataflow. It is a work in progress, but mostly improving. There is more long-form text in mdbook format with examples tested against the current builds. There is also a series of blog posts (part 1, part 2, part 3) introducing timely dataflow in a different way, though be warned that the examples there may need tweaks to build against the current code.

An example

To use timely dataflow, add the following to the dependencies section of your project's Cargo.toml file:

[dependencies]
timely="*"

This will bring in the timely crate from crates.io, which should allow you to start writing timely dataflow programs like this one (also available in timely/examples/simple.rs):

use timely::dataflow::operators::*;

fn main() {
    timely::example(|scope| {
        (0..10).to_stream(scope)
               .inspect(|x| println!("seen: {:?}", x));
    });
}

You can run this example from the root directory of the timely-dataflow repository by typing

% cargo run --example simple
Running `target/debug/examples/simple`
seen: 0
seen: 1
seen: 2
seen: 3
seen: 4
seen: 5
seen: 6
seen: 7
seen: 8
seen: 9

This is a very simple example (it's in the name), which only just suggests at how you might write dataflow programs.

Doing more things

For a more involved example, consider the very similar (but more explicit) examples/hello.rs, which creates and drives the dataflow separately:

use timely::dataflow::{InputHandle, ProbeHandle};
use timely::dataflow::operators::{Input, Exchange, Inspect, Probe};

fn main() {
    // initializes and runs a timely dataflow.
    timely::execute_from_args(std::env::args(), |worker| {

        let index = worker.index();
        let mut input = InputHandle::new();
        let mut probe = ProbeHandle::new();

        // create a new input, exchange data, and inspect its output
        worker.dataflow(|scope| {
            scope.input_from(&mut input)
                 .exchange(|x| *x)
                 .inspect(move |x| println!("worker {}:\thello {}", index, x))
                 .probe_with(&mut probe);
        });

        // introduce data and watch!
        for round in 0..10 {
            if index == 0 {
                input.send(round);
            }
            input.advance_to(round + 1);
            while probe.less_than(input.time()) {
                worker.step();
            }
        }
    }).unwrap();
}

This example does a fair bit more, to show off more of what timely can do for you.

We first build a dataflow graph creating an input stream (with input_from), whose output we exchange to drive records between workers (using the data itself to indicate which worker to route to). We inspect the data and print the worker index to indicate which worker received which data, and then probe the result so that each worker can see when all of a given round of data has been processed.

We then drive the computation by repeatedly introducing rounds of data, where the round itself is used as the data. In each round, each worker introduces the same data, and then repeatedly takes dataflow steps until the probe reveals that all workers have processed all work for that epoch, at which point the computation proceeds.

With two workers, the output looks like

% cargo run --example hello -- -w2
Running `target/debug/examples/hello -w2`
worker 0:   hello 0
worker 1:   hello 1
worker 0:   hello 2
worker 1:   hello 3
worker 0:   hello 4
worker 1:   hello 5
worker 0:   hello 6
worker 1:   hello 7
worker 0:   hello 8
worker 1:   hello 9

Note that despite worker zero introducing the data (0..10), each element is routed to a specific worker, as we intended.

Execution

The hello.rs program above will by default use a single worker thread. To use multiple threads in a process, use the -w or --workers options followed by the number of threads you would like to use. (note: the simple.rs program always uses one worker thread; it uses timely::example which ignores user-supplied input).

To use multiple processes, you will need to use the -h or --hostfile option to specify a text file whose lines are hostname:port entries corresponding to the locations you plan on spawning the processes. You will need to use the -n or --processes argument to indicate how many processes you will spawn (a prefix of the host file), and each process must use the -p or --process argument to indicate their index out of this number.

Said differently, you want a hostfile that looks like so,

% cat hostfile.txt
host0:port
host1:port
host2:port
host3:port
...

and then to launch the processes like so:

host0% cargo run -- -w 2 -h hostfile.txt -n 4 -p 0
host1% cargo run -- -w 2 -h hostfile.txt -n 4 -p 1
host2% cargo run -- -w 2 -h hostfile.txt -n 4 -p 2
host3% cargo run -- -w 2 -h hostfile.txt -n 4 -p 3

The number of workers should be the same for each process.

The ecosystem

Timely dataflow is intended to support multiple levels of abstraction, from the lowest level manual dataflow assembly, to higher level "declarative" abstractions.

There are currently a few options for writing timely dataflow programs. Ideally this set will expand with time, as interested people write their own layers (or build on those of others).

There are also a few applications built on timely dataflow, including a streaming worst-case optimal join implementation and a PageRank implementation, both of which should provide helpful examples of writing timely dataflow programs.

Contributing

If you are interested in working with or helping out with timely dataflow, great!

There are a few classes of work that are helpful for us, and may be interesting for you. There are a few broad categories, and then an ever-shifting pile of issues of various complexity.

There are also some larger themes of work, whose solutions are not immediately obvious and each with the potential to sort out various performance issues:

Rate-controlling output

At the moment, the implementations of unary and binary operators allow their closures to send un-bounded amounts of output. This can cause unwelcome resource exhaustion, and poor performance generally if the runtime needs to allocate lots of new memory to buffer data sent in bulk without being given a chance to digest it. It is commonly the case that when large amounts of data are produced, they are eventually reduced given the opportunity.

With the current interfaces there is not much to be done. One possible change would be to have the input and notificator objects ask for a closure from an input message or timestamp, respectively, to an output iterator. This gives the system the chance to play the iterator at the speed they feel is appropriate. As many operators produce data-parallel output (based on independent keys), it may not be that much of a burden to construct such iterators.

Buffer management

The timely communication layer currently discards most buffers it moves through exchange channels, because it doesn't have a sane way of rate controlling the output, nor a sane way to determine how many buffers should be cached. If either of these problems were fixed, it would make sense to recycle the buffers to avoid random allocations, especially for small batches. These changes have something like a 10%-20% performance impact in the dataflow-join triangle computation workload.

Support for non-serializable types

The communication layer is based on a type Content<T> which can be backed by typed or binary data. Consequently, it requires that the type it supports be serializable, because it needs to have logic for the case that the data is binary, even if this case is not used. It seems like the Stream type should be extendable to be parametric in the type of storage used for the data, so that we can express the fact that some types are not serializable and that this is ok.

NOTE: Differential dataflow demonstrates how to do this at the user level in its operators/arrange.rs, if somewhat sketchily (with a wrapper that lies about the properties of the type it transports).

This would allow us to safely pass Rc<T> types around, as long as we use the Pipeline parallelization contract.

Coarse- vs fine-grained timestamps

The progress tracking machinery involves some non-trivial overhead per timestamp. This means that using very fine-grained timestamps, for example the nanosecond at which a record is processed, can swamp the progress tracking logic. By contrast, the logging infrastructure demotes nanoseconds to data, part of the logged payload, and approximates batches of events with the smallest timestamp in the batch. This is less accurate from a progress tracking point of view, but more performant. It may be possible to generalize this so that users can write programs without thinking about granularity of timestamp, and the system automatically coarsens when possible (essentially boxcar-ing times).

NOTE: Differential dataflow demonstrates how to do this at the user level in its collection.rs. The lack of system support means that the user ends up indicating the granularity, which isn't horrible but could plausibly be improved. It may also be that leaving the user with control of the granularity leaves them with more control over the latency/throughput trade-off, which could be a good thing for the system to do.