Dynamic Task Routing in Celery
In my previous blog post we looked into custom queues and task routing. We had to configure per task which queue we want Celery to task a route to. While this approach works well for a simple setup, it does not scale well for applications and micro-services where many Celery tasks need to be routed to a number of different worker queues.
Step 1: Celery task_routes config
Instead of configuring the task_routes
per task, which queue you want to route a task to, you can tell Celery to use a custom class instead by specifying the path to said class (also, have a look at the Celery docs available at http://docs.celeryproject.org/en/latest/userguide/routing.html):
app = Celery(__name__)
app.conf.update({
'broker_url': os.environ['CELERY_BROKER_URL'],
'imports': (
'tasks',
),
'task_routes': ('task_router.TaskRouter',),
'task_serializer': 'json',
'result_serializer': 'json',
'accept_content': ['json']})</code></pre>
Step 2: Define TaskRouter class
As per our task_routes
value above, we need to define the custom TaskRouter
class in the module task_router.py
. Celery expects the method route_for_task
that passes the task name as its first argument. Note how the method returns a dict that looks
exactly like the one used for manual task routing.
class TaskRouter:
def route_for_task(self, task, *args, **kwargs):
if ':' not in task:
return {'queue': 'default'}
namespace, _ = task.split(':')
return {'queue': namespace}
Our idea is to route a task based on its task name, in particular we want to assume that our task
names follow the pattern queue:taskname
. In our previous blog post’s example
we had a task named fetch_bitcoin_price_index
that we wanted to be routed to a queue
called feeds
. We rename this task to feeds:fetch_bitcoin_price_index
.
@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
...
@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
...
We need to run two Celery workers. One subscribes to the feeds
,
the other one to the filters
queue:
~$ celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
~$ celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters
Note the --queues
command line arguments. They your workers subscribe to particular queues.
For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters
.
For further information, have a look at the Celery docs.
Step 3: Ready for action
Bring up the docker-compose stack and run example.py
:
# start up stack
~$ docker-compose up -d
# execute python example.py in container
~$ docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3
The script invokes the Celery chain that consists of two tasks: fetch_bitcoin_price_index
fetches
Bicoin Price Index data from the Coindesk API via the feeds
queue to the worker-feeds
Celery worker. When the task completes successfully, the result is passed onto the calculate_moving_average
via the filters
queue to the worker-filters
Celery worker.
Check the docker-compose logs to follow the task flow through the two workers:
~$ docker-compose logs -f
The docker-compose.yml
stack also comes with a flower instance.
Flower a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.
Summary
In this blog post you learned how to configure Celery to route tasks using a custom task router. This solution scales well when using many tasks across many queues and workers.