Bjoern

Hey, I'm Bjoern 👋
I write friendly tutorials for Celery, the Python task queue.

Dynamic Task Routing in Celery

Published on Jun 5, 2018

In my previous blog post we looked into custom queues and task routing. We had to configure per task which queue we want Celery to task a route to. While this approach works well for a simple setup, it does not scale well for applications and micro-services where many Celery tasks need to be routed to a number of different worker queues.

Step 1: Celery task_routes config

Instead of configuring the task_routes per task, which queue you want to route a task to, you can tell Celery to use a custom class instead by specifying the path to said class (also, have a look at the Celery docs available at http://docs.celeryproject.org/en/latest/userguide/routing.html):

app = Celery(__name__)
app.conf.update({
    'broker_url': os.environ['CELERY_BROKER_URL'],
    'imports': (
        'tasks',
    ),
    'task_routes': ('task_router.TaskRouter',),
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})</code></pre>

Step 2: Define TaskRouter class

As per our task_routes value above, we need to define the custom TaskRouter class in the module task_router.py. Celery expects the method route_for_task that passes the task name as its first argument. Note how the method returns a dict that looks exactly like the one used for manual task routing.

class TaskRouter:
    def route_for_task(self, task, *args, **kwargs):
        if ':' not in task:
            return {'queue': 'default'}

        namespace, _ = task.split(':')
        return {'queue': namespace}

Our idea is to route a task based on its task name, in particular we want to assume that our task names follow the pattern queue:taskname. In our previous blog post’s example we had a task named fetch_bitcoin_price_index that we wanted to be routed to a queue called feeds. We rename this task to feeds:fetch_bitcoin_price_index.

@app.task(bind=True, name='feeds:fetch_bitcoin_price_index')
def fetch_bitcoin_price_index(self, start_date, end_date):
   ...


@app.task(bind=True, name='filters:calculate_moving_average')
def calculate_moving_average(self, args, window):
    ...

We need to run two Celery workers. One subscribes to the feeds, the other one to the filters queue:

~$ celery worker --app=worker.app --hostname=worker.feeds@%h --queues=feeds
~$ celery worker --app=worker.app --hostname=worker.filters@%h --queues=filters

Note the --queues command line arguments. They your workers subscribe to particular queues. For subscribing to more than one queue, use a comma-separated list, like so --queues=feeds,filters. For further information, have a look at the Celery docs.

Step 3: Ready for action

Bring up the docker-compose stack and run example.py:

# start up stack
~$ docker-compose up -d

# execute python example.py in container
~$ docker-compose exec worker-feeds python example.py --start_date=2018-01-01 --end_date=2018-05-29 --window=3

The script invokes the Celery chain that consists of two tasks: fetch_bitcoin_price_index fetches Bicoin Price Index data from the Coindesk API via the feeds queue to the worker-feeds Celery worker. When the task completes successfully, the result is passed onto the calculate_moving_average via the filters queue to the worker-filters Celery worker.

Check the docker-compose logs to follow the task flow through the two workers:

~$ docker-compose logs -f

The docker-compose.yml stack also comes with a flower instance. Flower a tool for monitoring Celery workers and tasks. Check out your browser on http://localhost:5555.

Summary

In this blog post you learned how to configure Celery to route tasks using a custom task router. This solution scales well when using many tasks across many queues and workers.