Awesome
Python SDK to communicate with Centrifugo v5 HTTP API. Python >= 3.9 supported.
To install run:
pip install cent
Centrifugo compatibility
- Cent v5 and higher works only with Centrifugo v5.
- If you need to work with Centrifugo v3, v4 => use Cent v4
- If you need to work with Centrifugo v2 => use Cent v3
Usage
First of all, see the description of Centrifugo server API in the documentation. This library also supports API extensions provided by Centrifugo PRO. In general, refer to api.proto Protobuf schema file as a source of truth about all available Centrifugo server APIs. Don't forget that Centrifugo supports both HTTP and GRPC API – so you can switch to GRPC by using api.proto
file to generate stubs for communication.
This library contains Client
and AsyncClient
to work with Centrifugo HTTP server API. Both clients have the same methods to work with Centrifugo API and raise the same top-level exceptions.
Sync HTTP client
from cent import Client
Required init arguments:
api_url
(str
) - Centrifugo HTTP API URL address, for example,http://localhost:8000/api
api_key
(str
) - Centrifugo HTTP API key for auth
Optional arguments:
timeout
(float
) - base timeout for all requests in seconds, default is 10 seconds.session
(requests.Session
) - customrequests
session to use.
Example:
from cent import Client, PublishRequest
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
client = Client(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = client.publish(request)
print(result)
Async HTTP client
from cent import AsyncClient
Required init arguments:
api_url
(str
) - Centrifugo HTTP API URL address, for example,http://localhost:8000
api_key
(str
) - Centrifugo HTTP API key for auth
Optional arguments:
timeout
(float
) - base timeout for all requests in seconds, default is 10 seconds.session
(aiohttp.ClientSession
) - customaiohttp
session to use.
Example:
import asyncio
from cent import AsyncClient, PublishRequest
api_url = "http://localhost:8000/api"
api_key = "<CENTRIFUGO_API_KEY>"
async def main():
client = AsyncClient(api_url, api_key)
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
result = await client.publish(request)
print(result)
if __name__ == "__main__":
asyncio.run(main())
Handling errors
This library raises exceptions if sth goes wrong. All exceptions are subclasses of cent.CentError
.
CentError
- base class for all exceptionsCentNetworkError
- raised in case of network related errors (connection refused)CentTransportError
- raised in case of transport related errors (HTTP status code is not 2xx)CentTimeoutError
- raised in case of timeoutCentUnauthorizedError
- raised in case of unauthorized access (signal of invalid API key)CentDecodeError
- raised in case of server response decoding errorCentApiResponseError
- raised in case of API response error (i.e. error returned by Centrifugo itself, you can inspect code and message returned by Centrifugo in this case)
Note, that BroadcastRequest
and BatchRequest
are quite special – since they contain multiple commands in one request, handling CentApiResponseError
is still required, but not enough – you also need to manually iterate over the results to check for individual errors. For example, one publish command can fail while another one can succeed. For example:
from cent import *
c = Client("http://localhost:8000/api", "api_key")
req = BroadcastRequest(channels=["1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='rqKx')),
# Response[PublishResult](error=None, result=PublishResult(offset=7, epoch='nUrf'))
# ]
# )
req = BroadcastRequest(channels=["invalid:1", "2"], data={})
c.broadcast(req)
# BroadcastResult(
# responses=[
# Response[PublishResult](error=Error(code=102, message='unknown channel'), result=None),
# Response[PublishResult](error=None, result=PublishResult(offset=8, epoch='nUrf'))
# ]
# )
I.e. cent
library does not raise exceptions for individual errors in BroadcastRequest
or BatchRequest
, only for top-level response error, for example, sending empty list of channels in broadcast:
req = BroadcastRequest(channels=[], data={})
c.broadcast(req)
Traceback (most recent call last):
...
raise CentApiResponseError(
cent.exceptions.CentApiResponseError: Server API response error #107: bad request
So this all adds some complexity, but that's the trade-off for the performance and efficiency of these two methods. You can always write some convenient wrappers around cent
library to handle errors in a way that suits your application.
Using for async consumers
You can use this library to constructs events for Centrifugo async consumers. For example, to get proper method and payload for async publish:
from cent import PublishRequest
request = PublishRequest(channel="channel", data={"input": "Hello world!"})
method = request.api_method
payload = request.api_payload
# use method and payload to construct async consumer event.
Using Broadcast and Batch
To demonstrate the benefits of using BroadcastRequest
and BatchRequest
let's compare approaches. Let's say at some point in your app you need to publish the same message into 10k different channels. Let's compare sequential publish, batch publish and broadcast publish. Here is the code to do the comparison:
from cent import *
from time import time
def main():
publish_requests = []
channels = []
for i in range(10000):
channel = f"test_{i}"
publish_requests.append(PublishRequest(channel=channel, data={"msg": "hello"}))
channels.append(channel)
batch_request = BatchRequest(requests=publish_requests)
broadcast_request = BroadcastRequest(channels=channels, data={"msg": "hello"})
client = Client("http://localhost:8000/api", "api_key")
start = time()
for request in publish_requests:
client.publish(request)
print("sequential", time() - start)
start = time()
client.batch(batch_request)
print("batch", time() - start)
start = time()
client.broadcast(broadcast_request)
print("broadcast", time() - start)
if __name__ == "__main__":
main()
On local machine, the output may look like this:
sequential 5.731332778930664
batch 0.12313580513000488
broadcast 0.06050515174865723
So BatchRequest
is much faster than sequential requests in this case, and BroadcastRequest
is the fastest - publication to 10k Centrifugo channels took only 60ms. Because all the work is done in one network round-trip. In reality the difference will be even more significant because of network latency.
For contributors
Tests and benchmarks
Prerequisites – start Centrifugo server locally:
CENTRIFUGO_API_KEY=api_key CENTRIFUGO_HISTORY_TTL=300s CENTRIFUGO_HISTORY_SIZE=100 \
CENTRIFUGO_PRESENCE=true CENTRIFUGO_GRPC_API=true ./centrifugo
And install dependencies:
make dev
Then to run tests, run:
make test
To run benchmarks, run:
make bench
Migrate to Cent v5
Cent v5 contains the following notable changes compared to Cent v4:
- Client constructor slightly changed, refer to the examples above.
- To call desired API import and construct a request object (inherited from Pydantic
BaseModel
) and then call corresponding method of client. This should feel very similar to how GRPC is usually structured. - Base exception class is now
CentError
instead ofCentException
, exceptions SDK raises were refactored. - To send multiple commands in one HTTP request SDK provides
batch
method.