How to Call a Celery Task From Outside its Code Base

ยท

2 min read

The standard Celery docs and tutorials usually assume
that your Celery and your API source code live in the same code base.
In that scenario you simply import the the task into your API module
and invoke the Celery task asynchronously.

from flask import Flask, jsonify
from tasks import fetch_data

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    fetch_data.s(url=request.json['url']).delay()
    return jsonify({'url': request.json['url']}), 201

This only works as long as the task is registered in your current process.
You need an alternative strategy when your Celery and your API source code
are not part of the same code base. Or when you have multiple
Celery microservices and need to call a Celery task
from within another Celery microservice.

Option 1: app.send_task

The Celery app instance has a send_task
method that can be use to call a task by its name.

from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    celery_app.send_task('fetch_data', kwargs={'url': request.json['url']})
    return jsonify({'url': request.json['url']}), 201

Option 2: app.signature

In our initial example, the .s(...) method on the Celery task creates a called Celery Signature object. A Celery Signature essentially wraps the arguments, keyword arguments, and execution options
of a single Celery task invocation so that it can be passed to functions or serialized and
sent across the wire.

An alternative strategy therefore is to explicitly create the Signature object via the Celery app's
signature method,
passing the task name and its kwargs. The resulting Signature object can then be asynchronously executed by the delay() method in the same way we did it in the standard example.

from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    celery_app.signature('fetch_data', kwargs={'url': request.json['url']).delay()
    return jsonify({'url': request.json['url']}), 201

Summary

Celery does not require access to a task's code base in order to invoke it.
The trick is to invoke a task by its name, either directly via celery_app.send_task(...)
or by creating a Signature object celery_app.signature(...) which
is the equivalent of calling task.s(...) when you do have access
to the task's code base.

Which approach you prefer is ultimately a question of taste, there are minor
implictions on how you test your task invocations, which we will cover in a another blog post.

Did you find this article valuable?

Support Bjoern Stiel by becoming a sponsor. Any amount is appreciated!

ย