Home

Awesome

Twisted CSP

Communicating sequential processes for Twisted. Channels like Go, or Clojurescript's core.async.

WARNING: This is currently alpha software.

This is a very close port of Clojurescript's core.async. The significant difference is that light-weight processes are implemented using generators (yield) instead of macros.

def slow_pipe(input, output):
    while True:
        value = yield take(input)
        yield sleep(0.5)
        if value is None: # input closed
            close(output)
            yield stop()
        else:
            yield put(output, value)

go(slow_pipe, chan1, chan2))

Examples

Function returning channel (http://talks.golang.org/2012/concurrency.slide#25).

def boring(message):
    c = Channel()
    def counter():
        i = 0
        while True:
            yield put(c, "%s %d" % (message, i))
            yield sleep(random.random())
            i += 1
    go(counter)
    return c


def main():
    b = boring("boring!")
    for i in range(5):
        print "You say: \"%s\"" % (yield take(b))
    print "You are boring; I'm leaving."

Pingpong (http://talks.golang.org/2013/advconc.slide#6).

class Ball:
    hits = 0


@process
def player(name, table):
    while True:
        ball = yield take(table)
        ball.hits += 1
        print name, ball.hits
        yield sleep(0.1)
        yield put(table, ball)


def main():
    table = Channel()

    player("ping", table)
    player("pong", table)

    yield put(table, Ball())
    yield sleep(1)

Timeout using alts (select in Go) (http://talks.golang.org/2012/concurrency.slide#35).

c = boring("Joe")
while True:
    value, chan = yield alts([c, timeout(0.8)])
    if chan is c:
        print value
    else:
        print "You're too slow."
        yield stop()

Running the examples

Use the run script, like

./run example.go.timeout_for_whole_conversation_using_select

Examples under example/go are ports of Go examples from:

Playing around in a REPL

Python 2.7.5+ (default, Sep 19 2013, 13:48:49)
[GCC 4.8.1] on linux2
Type "help", "copyright", "credits" or "license" for more information.
>>> import thread
>>> from csp import *
>>> from twisted.internet import reactor
>>> thread.start_new_thread(reactor.run, (), {"installSignalHandlers": False})
140038185355008
>>> class Ball:
...     hits = 0
...
>>> def player(name, table):
...     while True:
...         ball = yield take(table)
...         if ball is None: # channel's closed
...             print name, "Ball's gone"
...             break
...         ball.hits += 1
...         print name, ball.hits
...         yield sleep(0.1)
...         yield put(table, ball)
...
>>> def main():
...     table = Channel()
...     go(player, "ping", table)
...     go(player, "pong", table)
...     yield put(table, Ball())
...     yield sleep(1)
...     close(table)
...
>>> reactor.callFromThread(lambda: go(main))
>>> ping 1
pong 2
ping 3
pong 4
ping 5
pong 6
ping 7
pong 8
ping 9
pong 10
ping Ball's gone
pong Ball's gone

>>>

Limitations

TODO

Inspiration

Other Python CSP libraries:

These libraries use threads/processes (except for pycsp, which has support for greenlets (which is not portable)). This makes implementation simpler (in principle), sacrificing efficiency (but managing threads/processes can be a chore). On the other hand they are much more generic, and support networked channels (that is not necessarily a good thing though). TODO: More elaborate comparison.