Awesome
FastAPI + Celery for Async ML Inference
This repo is a Proof of Concept (PoC) to build a machine learning inference system using Python and the FastAPI and Celery frameworks.
The idea is to have a client, that can be a frontend or backend app, making requests to an API which will send tasks to a Celery infrastructure. The process will be asynchronous using a task queue from Celery to have workers dealing with task processing.
The diagram below illustrates the idea deployed on Kubernetes pods and using RabbitMQ as the broker and Redis as the database.
To generate the diagram above, run: pipenv run diagram
(you will need Graphviz installed).
Workers for Machine Learning Inference
The possibilities with Celery workers as a machine learning inference system are promising. Workers can be hosted on any k8s pod and take advantage on autoscaling approaches, or cloud instances (like AWS EC2) and have hardware settings customized to the model needs (deep learning could benefit from GPUs).
With a more complex broker, like RabbitMQ, workers can subscribe to specific queues, which will give the possibility to deal with several different problems using almost the same infrastructure. The API component can serve as a gateway to all models, or multiple APIs could publish tasks into the broker where each worker group (workers subscribed to the same queue) can share the load.
Having all components running on different docker files shows the path to have separate git repos for workers and APIs, which make it simpler to automate the deploy of each component. The only thing that the API and the Worker share are the task's name and credentials to broker and database, but this will be a configured using environment variables.
Next Steps
- Figure how to consume from broker the results instead use FastAPI BackgroundTask.
- Add a simple ML model for gender identification from audio data (check kaggle).
- Add unit and integration tests (add make file to run all tests).
Local Development
To run all components you need to have Docker and Python 3.7 available locally, and to manage the Python virtual enviroment you need to install pipenv
. Before start run pipenv sync -d
to install all packages.
Local Monitoring
Flower is a monitoring UI for Celery that can be run with the command bellow. To know more about Flower.
pipenv run flower
External Dependencies
Broker: RabbitMQ serves as a broker for the Celery framework where the tasks are registered, and the workers consume the queue. On the Celery website, you can find more about broker usage for Celery.
Database: Redis will store the result of each task where the key is the task ID, and the value is the result itself. The result schema will depend on how the task returns the output.
Local Broker and Database Instances with Docker
The RabbitMQ server launched with the command bellow has a management UI where you can check for configs and monitoring usage of cluster. After run the command go to http://localhost:8080.
pipenv run broker
To launch the Redis database, execute the command below.
pipenv run backend
Both commands use Pipenv shortcuts and the full command can be found at the Pipfile
file.
Environment variables
If you already have these services deployed on your infrastructure, you can use these environment variables to connect to them. Use the .env
file to place the respective values.
Variable | Description |
---|---|
REDIS_HOST | Redis host address |
REDIS_PORT | Redis host port |
REDIS_PASS | Redis password |
REDIS_DB | Redis database number |
RABBITMQ_HOST | RabbitMQ host address |
RABBITMQ_PORT | RabbitMQ Host port |
RABBITMQ_USER | RabbitMQ username |
RABBITMQ_PASS | RabbitMQ password |
RABBITMQ_VHOST | RabbitMQ virtual host |
Components Available
Workers
The Celery workers are responsible for consuming the task queue and store the results into the database. Using Redis as the database, the results will be stored using the task id
as key and the task return
as value.
The workers can subscribe to specific queues and can execute different types of tasks. The following workers are available.
Worker Audio Length
This worker receives an URL and tries to download an audio file, and then calculates and returns the audio length.
To run this worker execute pipenv run wAudio
Worker Euromillions
This worker receives a date and tries to scrap an EuroMillions results page and then extract and returns the numbers.
To run this worker execute pipenv run wEuro
API
All the have processing will be done by the workers, so the idea is to have one
API serving as a gateway to all workers. The API must have one post
endpoint for each Celery task registered.
To run the API execute pipenv run api
API Endpoints
-
The post endpoint
/audio/length
receives in the request body a URL containing an audio file to be analyzed. In this case, it will just get the audio length in seconds. The endpoint sends a task to the queue and returns atask id
to the client with HTTP code response 201 (HTTP_201_CREATED
). The endpoint has an option to enable a callback method making the API check on Celery the task result and send it to console (can be used to send the result to any other system). -
The post endpoint
/euro/results
receives in the request body a reference date (DD-MM-YYYY) to scrap the EuroMillions balls and stars from a web page. In this case, it will just get the seven numbers draw. The endpoint sends a task to the queue and returns atask id
to the client with HTTP code response 201 (HTTP_201_CREATED
). The endpoint has an option to enable a callback method making the API check on Celery the task result and send it to console (can be used to send the result to any other system). -
The get endpoint
/task
receives a parameter with thetask id
and returns the task status and results (when it's finished successfully). An client needs to implement a strategy to retrieve the results from the API. This endpoint can be used to retrieve results from all task types. The tasks have, basically, three statuses:PENDING
,FAILURE
, andSUCCESS
.
Client
The client
is a system that wants to extract features or insights from data and, for that, needs to call an API and send the requests.
To run the client execute pipenv run client
For the PoC, the client
has the following tasks:
-
A list of files retrieved from the The Open Speech Repository. The American English files are enumerated from 10 to 61 in a no sequential way. The
client
component has a sequential list from 10 to 65, which will make some URLs to fail since the file does not exist. -
A list of dates, between the current day and going back 15 days, to retrieve the EuroMillions number that were draw on each day. Since the draw doesn't happen every day, some of them will fail to retrieve the data from the website.
After the client
make the request with the URL, the API will send a task id
which needs to be retrieved from the API with a pulling strategy.
Running All Services with Docker Compose
The docker-compose has all the services configured, and there is no need to have a Redis or RabbitMQ instances already configured.
To launch build and images for the components, you need to run:
docker-compose build
Be aware that there is no control over the startup process, so you can find yourself sending requests to an API or worker not ready. So, it's safer to start the services one by one.
When you run the workers, both broker and backend will run also.
docker-compose run --rm audio
docker-compose run --rm euro
When you run the client, the API will be run also.
docker-compose run --rm client
To shutdown all services, run:
docker-compose down