Custom Celery task states
Celery tasks always have a state. If a task finished executing successfully, its state is SUCCESS
.
If a task execution resulted in an exception, its state is FAILURE
. Celery knows
six built-in states:
PENDING
(waiting for execution or unknown task id)STARTED
(task has been started)SUCCESS
(task executed successfully)FAILURE
(task execution resulted in exception)RETRY
(task is being retried)REVOKED
(task has been revoked)
In case you wonder why you have never come across the STARTED
state,
it is not reported by default. You have to enable it explicitly via the Celery config,
setting task_track_started = True
.
The update_state method
The Celery task object provides an update_state
method. This method lets you
do three things:
- set the task’s state to one of the built-in states
- provide additional meta data
- set the task’s state to any custom state you define.
All you need to define your own state is a unique name. It is just a string and
does not need to be registered anywhere. For example, if you have
a long running task, you can define a PROGRESS
state and publish the
progress made via the meta
json argument:
import time
from worker import app
@app.task(bind=True)
def task(self):
n = 30
for i in range(0, n):
self.update_state(state='PROGRESS', meta={'done': i, 'total': n})
time.sleep(1)
return n
This task runs for ~30 seconds and sends a task state update every ~1 second,
broadcasting a custom PROGRESS
state and the number of total and
completed iterations. Let’s execute the task asynchronously, wait for the
task to finish and capture the state and meta data while it’s still running:
import time
import tasks
task = tasks.task.s().delay()
while not task.ready():
print(f'State={t.state}, info={t.info}')
time.sleep(1)
print(f'State={t.state}, info={t.info}')
Which produces something like this:
State=PENDING, info=None
State=PROGRESS, info={'done': 0, 'total': 30}
State=PROGRESS, info={'done': 1, 'total': 30}
State=PROGRESS, info={'done': 2, 'total': 30}
State=PROGRESS, info={'done': 3, 'total': 30}
...
State=SUCCESS, info=29
This is a very simple example. But if we take a closer look, there are a few very interesting learnings:
- any string can be a custom state
- a custom state is only temporary and is eventually overriden by a Celery built-in state as soon as the task finishes successfully - or throws an exception, is retried or revoked (the same applies if we uset
update_state
with a built-in state but custom meta data - the custom meta data is ultimatemy overwritten by Celery) - while the task is in a custom state, the
meta
argument we published viaupdate_state
is available asinfo
property on theAsyncResult
object (the object.delay()
returns on the execution side) - when the task is in the built-in
SUCCESS
state, theinfo
property returns the task result (when the task failed, theinfo
property returns the exception type and stacktrace, try it yourself by throwing an exception in the implementation of thetask
function above)
Built-in state with manual task result handling
Say, you want to provide some additional custom data for a failed tasks. Unfortunately, as we established above,
Celery will overwrite the custom meta data, even if we use a built-in state type. Fortunately, there
is a way to prevent this, raising an celery.exceptions.Ignore()
exception. This means,
no state will be recorded for the task, but the message is still removed from the queue
from celery import states
from celery.exceptions import Ignore
from worker import app
@app.task(bind=True)
def task(self):
try:
raise ValueError('Some error')
except Exception as ex:
self.update_state(state=states.FAILURE, meta={'custom': '...'})
raise Ignore()
This works… at least, kind of. This time Celery does not overwrite the meta data:
~$ python
>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.backend.get(t.backend.get_key_for_task(task.id)))
b'{"status": "FAILURE", "result": {"custom": "..."}, "traceback": null, "children": [], "task_id": "1df4b70c-1206-41e5-bcd3-786295d21267"}'
But, it turns out that, depending on the built-in task state, Celery expects the
corresponding meta data dictionary to be in a particular format. And here, the meta data
itself is incompatible with the FAILURE
state:
>>> print(task.state)
Traceback (most recent call last):
File "<stdin>", line 1, in <module>
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 471, in state
return self._get_task_meta()['status']
File "/usr/local/lib/python3.6/site-packages/celery/result.py", line 410, in _get_task_meta
return self._maybe_set_cache(self.backend.get_task_meta(self.id))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 359, in get_task_meta
meta = self._get_task_meta_for(task_id)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 674, in _get_task_meta_for
return self.decode_result(meta)
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 278, in decode_result
return self.meta_from_decoded(self.decode(payload))
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 274, in meta_from_decoded
meta['result'] = self.exception_to_python(meta['result'])
File "/usr/local/lib/python3.6/site-packages/celery/backends/base.py", line 248, in exception_to_python
from_utf8(exc['exc_type']), __name__)
KeyError: 'exc_type'
We can fix this by adding the exc_type
and exc_message
keys to our custom meta dictionary,
effectively mimicking Celery’s default FAILURE
meta structure.
@app.task(bind=True)
def task(self):
try:
raise ValueError('Some error')
except Exception as ex:
self.update_state(
state=states.FAILURE,
meta={
'exc_type': type(ex).__name__,
'exc_message': traceback.format_exc().split('\n')
'custom': '...'
})
raise Ignore()
And this time we can get the task’s state and info without Celery throwing an exception. And
we also have access to the custom
field. Note that we have to retrieve the
result from the backend via task.backend.get(...)
as Celery parses the result dict
depending on the task’s state.
~$ python
>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.state)
'FAILURE'
>>> print(task.info)
ValueError('Traceback (most recent call last):', ' File "/app/tasks.py", line 16, in task', " raise ValueError('some exception')", 'ValueError: some exception', '')
>>> print(task.backend.get(task.backend.get_key_for_task(task.id)))
b'{"status": "FAILURE", "result": {"exc_type": "ValueError", "exc_message": ["Traceback (most recent call last):", " File \\"/app/tasks.py\\", line 16, in task", " raise ValueError(\'some exception\')", "ValueError: some exception", ""], "custom": "..."}, "traceback": null, "children": [], "task_id": "d2f60111-aec6-4c58-83a7-24f0edb7ac5f"}'
Custom state
We can use the same Ignore()
trick from above to instruct Celery to not overwrite
our temporary custom state from the initial example:
from celery.exceptions import Ignore
from worker import app
@app.task(bind=True)
def task(self):
self.update_state(state='SOME-CUSTOM-STATE', meta={'custom': '...'})
raise Ignore()
This time, the task remains in our custom state. Also, Celery does not assume any specific meta dict structure:
>>> import tasks
>>> task = tasks.task.s().delay()
>>> print(task.state)
'SOME-CUSTOM-STATE'
>>> print(task.info)
{'custom': '...'}
>>> print(task.result)
{'custom': '...'}
Conclusion
Celery provides a lot of flexibility when it comes to custom task states and
custom meta data. Transient custom states in combination with custom
meta data can be used to implement task progress trackers. Or, you might have a good
reason to implement your own final custom task state, which Celery
can equally cater for. You can even enrich a built-in the FAILURE
task state
with additional data. For further information, I encourage you to read the docs and play around with a few code examples.