Bjoern

Hey, I'm Bjoern 👋
I write friendly tutorials for Celery, the Python task queue.

Filesystem as Celery message broker

Published on Jul 3, 2018

Having to deal with the full-blown Celery infrastructure can be an overkill. If you want to run Celery on a Raspbery Pi, need to try something out quickly or are simply after a simple dev setup, a message broker (plus a result store) feels like using a sledgehammer to crack a nut. In this blog post you’ll learn how to set up an dead-simple no-frills system with Celery, Flask and nothing else.

Kombu’s file-system transport

Celery uses Kombu to send and receive messages. Kombu itself is a messaging library that provides a high-level interface for the AMQ protocol and supports different message brokers.

The message broker is the store which interacts as the transport between the producer and consumer of messages. Typically, RabbitMQ or Redis are used as message stores. But Kombu also supports a File-system transport whereby producer and consumer simply communicate via files. All you need is to ensure that both producer and consumer have access to the same folder on a local (or network) drive.

Celery configuration

Even though documentation on the file-system transport is a bit sparse, setting it up is straightforward. Use filesystem:// (without any path) as the broker_url. In addition, you need to supply the broker_transport_options config to specify the path where messages are exchanged. Note that data_folder_in and data_folder_out need to point to the same path.

app = Celery(__name__)
app.conf.update({
    'broker_url': 'filesystem://',
    'broker_transport_options': {
        'data_folder_in': '/app/broker/out',
        'data_folder_out': '/app/broker/out',
        'data_folder_processed': '/app/broker/processed'
    },
    'imports': ('tasks',),
    'result_persistent': False,
    'task_serializer': 'json',
    'result_serializer': 'json',
    'accept_content': ['json']})

Make sure that both folders /app/broker/out and /app/broker/processed exist (the example application below takes care of it on startup).

An example with Celery and Flask

Clone the example repository from GitHub and bring up the Flask and Celery apps via docker-compose up -d. Alternatively, without docker, create and activate a virtual environment, cd into the app folder and start the Celery worker:

# start celery worker
~$ celery worker --app=worker.app --concurrency=1 --loglevel=INFO

Start the Flask process on port 8000:

# start flask app
~$ python app.py

Trigger the long_running_task Celery task via a POST on the Flask root endpoint:

# curl
~$ curl -d '{}' -H "Content-Type: application/json" -X POST http://localhost:8000

Which runs the task asynchronously in the Celery worker process. Via simple filesystem-transport, without RabbitMQ, Redis or a SQL database. You should see something like this in the Celery logs:

[2018-07-03 16:44:52,105: INFO/ForkPoolWorker-1] Task long_running_task[a31af3f0-78b8-499f-ae1b-a277c2319bbf] succeeded in 0.15355589999671793s: 501.67632

Hope you find this useful. What do you think?