Home

Awesome

Python SDK to communicate with Centrifugo v5 HTTP API. Python >= 3.9 supported.

To install run:

pip install cent

Centrifugo compatibility

Usage

First of all, see the description of Centrifugo server API in the documentation. This library also supports API extensions provided by Centrifugo PRO. In general, refer to api.proto Protobuf schema file as a source of truth about all available Centrifugo server APIs. Don't forget that Centrifugo supports both HTTP and GRPC API – so you can switch to GRPC by using api.proto file to generate stubs for communication.

This library contains Client and AsyncClient to work with Centrifugo HTTP server API. Both clients have the same methods to work with Centrifugo API and raise the same top-level exceptions.

Sync HTTP client

from cent import Client

Required init arguments:

Optional arguments:

Example:

from cent import Client, PublishRequest

api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"

client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.publish(request)
print(result)

Async HTTP client

from cent import AsyncClient

Required init arguments:

Optional arguments:

Example:

import asyncio
from cent import AsyncClient, PublishRequest

api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"

async def main():
    client = AsyncClient(api_url, api_key)
    request = PublishRequest(channel="channel", data={"input": "Hello world!"})
    result = await client.publish(request)
    print(result)

if __name__ == "__main__":
    asyncio.run(main())

Handling errors

This library raises exceptions if sth goes wrong. All exceptions are subclasses of cent.CentError.

Note, that BroadcastRequest and BatchRequest are quite special – since they contain multiple commands in one request, handling CentApiResponseError is still required, but not enough – you also need to manually iterate over the results to check for individual errors. For example, one publish command can fail while another one can succeed. For example:

from cent import *

c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
#   responses=[
#       Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
#       Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
#   ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
#   responses=[
#       Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
#       Response[PublishResult](error=None, result=PublishResult(offset=8, epoch='nUrf'))
#   ]
# )

I.e. cent library does not raise exceptions for individual errors in BroadcastRequest or BatchRequest, only for top-level response error, for example, sending empty list of channels in broadcast:

req = BroadcastRequest(channels=[], data={})
c.broadcast(req)
Traceback (most recent call last):
    ...
    raise CentApiResponseError(
cent.exceptions.CentApiResponseError: Server API response error #107: bad request

So this all adds some complexity, but that's the trade-off for the performance and efficiency of these two methods. You can always write some convenient wrappers around cent library to handle errors in a way that suits your application.

Using for async consumers

You can use this library to constructs events for Centrifugo async consumers. For example, to get proper method and payload for async publish:

from cent import PublishRequest

request = PublishRequest(channel="channel", data={"input": "Hello world!"})
method = request.api_method
payload = request.api_payload
# use method and payload to construct async consumer event.

Using Broadcast and Batch

To demonstrate the benefits of using BroadcastRequest and BatchRequest let's compare approaches. Let's say at some point in your app you need to publish the same message into 10k different channels. Let's compare sequential publish, batch publish and broadcast publish. Here is the code to do the comparison:

from cent import *
from time import time


def main():
    publish_requests = []
    channels = []
    for i in range(10000):
        channel = f"test_{i}"
        publish_requests.append(PublishRequest(channel=channel, data={"msg": "hello"}))
        channels.append(channel)
    batch_request = BatchRequest(requests=publish_requests)
    broadcast_request = BroadcastRequest(channels=channels, data={"msg": "hello"})

    client = Client("http://localhost:8000/api", "api_key")

    start = time()
    for request in publish_requests:
        client.publish(request)
    print("sequential", time() - start)

    start = time()
    client.batch(batch_request)
    print("batch", time() - start)

    start = time()
    client.broadcast(broadcast_request)
    print("broadcast", time() - start)


if __name__ == "__main__":
    main()

On local machine, the output may look like this:

sequential 5.731332778930664
batch 0.12313580513000488
broadcast 0.06050515174865723

So BatchRequest is much faster than sequential requests in this case, and BroadcastRequest is the fastest - publication to 10k Centrifugo channels took only 60ms. Because all the work is done in one network round-trip. In reality the difference will be even more significant because of network latency.

For contributors

Tests and benchmarks

Prerequisites – start Centrifugo server locally:

CENTRIFUGO_API_KEY=api_key CENTRIFUGO_HISTORY_TTL=300s CENTRIFUGO_HISTORY_SIZE=100 \
CENTRIFUGO_PRESENCE=true CENTRIFUGO_GRPC_API=true ./centrifugo

And install dependencies:

make dev

Then to run tests, run:

make test

To run benchmarks, run:

make bench

Migrate to Cent v5

Cent v5 contains the following notable changes compared to Cent v4: