I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.
My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error()
gets called with a string representation of the task_id
of the chord. The remaining tasks in the group do continue execution but the callback is never called.
I'll illustrate this with an example below:
@app.task
def maybe_succeed():divisor = randint(0, 10)return 1 / divisor@app.task
def master_task():g = group([maybe_succeed.s() for i in range(100)])c = g | chord_callback.s()return c.delay()@app.task
def chord_callback(results):print 'Made it here!'
In the above example, calling master_task()
will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed()
will fail (unless you're super lucky!).
Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed()
so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.
So, my question is:
Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?
You could try calling the original callback in the errback:
@celery.task
def plus(x, y):print(f'Running plus {x}, {y}')return x + y@celery.task
def failure():print('Running failure')raise ValueError('BAD')@celery.task
def callme(stuff):print('Callback')print(f'Callback arg: {stuff}')@celery.task
def on_chord_error(task_id, extra_info):print('ON ERROR CALLBACK')print(f'Task ID: {task_id}')print(f'Extra info: {extra_info}')callme.delay(extra_info)@celery.task
def chord_test():tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]callback = callme.s().on_error(on_chord_error.s('extra info'))chord(tasks)(callback)
Which results in:
Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chordret = j(timeout=3.0, propagate=True)File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in joininterval=interval, no_ack=no_ack, on_interval=on_interval,File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in getself.maybe_throw(callback=callback)File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throwself.throw(value, self._to_remote_traceback(tb))File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throwself.on_ready.throw(*args, **kwargs)File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throwreraise(type(exc), exc, tb)File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraiseraise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info