Bjoern

Hey, I'm Bjoern 👋
I write friendly tutorials for Celery, the Python task queue.

How to call a Celery task from another app

Published on Jun 19, 2018

The standard Celery docs and tutorials usually assume that your Celery and your API source code live in the same code base. In that scenario you simply import the the task into your API module and invoke the Celery task asynchronously.

from flask import Flask, jsonify
from tasks import fetch_data

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    fetch_data.s(url=request.json['url']).delay()
    return jsonify({'url': request.json['url']}), 201

This only works as long as the task is registered in your current process. You need an alternative strategy when your Celery and your API source code are not part of the same code base. Or when you have multiple Celery microservices and need to call a Celery task from within another Celery microservice.

Option 1: app.send_task

The Celery app instance has a send_task method that can be use to call a task by its name.

from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    celery_app.send_task('fetch_data', kwargs={'url': request.json['url']})
    return jsonify({'url': request.json['url']}), 201

Option 2: app.signature

In our initial example, the .s(...) method on the Celery task creates a called Celery Signature object. A Celery Signature essentially wraps the arguments, keyword arguments, and execution options of a single Celery task invocation so that it can be passed to functions or serialized and sent across the wire.

An alternative strategy therefore is to explicitly create the Signature object via the Celery app’s signature method, passing the task name and its kwargs. The resulting Signature object can then be asynchronously executed by the delay() method in the same way we did it in the standard example.

from flask import Flask, jsonify
from worker import app as celery_app

app = Flask(__name__)

@app.route('/', methods=['POST'])
def index():
    celery_app.signature('fetch_data', kwargs={'url': request.json['url']).delay()
    return jsonify({'url': request.json['url']}), 201

Summary

Celery does not require access to a task’s code base in order to invoke it. The trick is to invoke a task by its name, either directly via celery_app.send_task(...) or by creating a Signature object celery_app.signature(...) which is the equivalent of calling task.s(...) when you do have access to the task’s code base.

Which approach you prefer is ultimately a question of taste, there are minor implictions on how you test your task invocations, which we will cover in a another blog post.