How do you ensure a Celery chord callback gets called with failed subtasks?

2024/12/9 20:52:20

I am using a Chord in Celery to have a callback that gets called when a Group of parallel tasks finish executing. Specifically, I have a group of functions that wrap calls to an external API. I want to wait for all of these to return before I process the results and update my database in the Chord callback. I would like the callback to execute when all of the API calls have finished, regardless of their status.

My problem is that the callback function only gets called if none of the group's subtasks raise an exception. If, however, one subtask raises an exception then an optional error handler on_error() gets called with a string representation of the task_id of the chord. The remaining tasks in the group do continue execution but the callback is never called.

I'll illustrate this with an example below:

@app.task
def maybe_succeed():divisor = randint(0, 10)return 1 / divisor@app.task
def master_task():g = group([maybe_succeed.s() for i in range(100)])c = g | chord_callback.s()return c.delay()@app.task
def chord_callback(results):print 'Made it here!'

In the above example, calling master_task() will run all of the tasks in the group, however, the callback will never get called because one of the maybe_succeed() will fail (unless you're super lucky!).


Right now, I'm dealing with this problem by catching all exceptions in my equivalent of maybe_succeed() so that the chord will never fail. I guess this is a fine solution though it doesn't feel right.

So, my question is: Is there a way to have a Celery Chord callback execute regardless of the return status of its group's subtasks?

Answer

You could try calling the original callback in the errback:

@celery.task
def plus(x, y):print(f'Running plus {x}, {y}')return x + y@celery.task
def failure():print('Running failure')raise ValueError('BAD')@celery.task
def callme(stuff):print('Callback')print(f'Callback arg: {stuff}')@celery.task
def on_chord_error(task_id, extra_info):print('ON ERROR CALLBACK')print(f'Task ID: {task_id}')print(f'Extra info: {extra_info}')callme.delay(extra_info)@celery.task
def chord_test():tasks = [plus.s(1, 1), plus.s(2, 2), failure.s(), plus.s(3, 3)]callback = callme.s().on_error(on_chord_error.s('extra info'))chord(tasks)(callback)

Which results in:

Received task: tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e]
Running plus 1, 1
Task tasks.plus[b0d084a5-0956-4f13-bf0d-580a3e3cd55e] succeeded in 0.020222999999532476s: 2
Received task:tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481]
Running plus 2, 2
Task tasks.plus[44a9d306-a0a5-4a7d-b71d-0fef56fe3481] succeeded in 0.019981499994173646s: 4
Task tasks.chord_test[b6173c52-aa62-4dad-84f2-f3df2e1efcd1] succeeded in 0.45647509998525493s: None
Received task: tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee]
Running failure
Task tasks.failure[3880e8bd-2a09-4735-bb5f-9a49e992dfee] raised unexpected: ValueError('BAD',)
Received task: tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473]
Running plus 3, 3
Task tasks.plus[b3290ce9-fc74-45f2-a820-40bd6dea8473] succeeded in 0.016270199994323775s: 6
celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7]  ETA:[2018-09-14 03:08:58.441070+00:00]
Chord 'dadece86-d399-4e64-b63a-f02a2a3de434' raised: ValueError('BAD',)
Traceback (most recent call last):File "/home/flask/.local/lib/python3.6/site-packages/celery/app/builtins.py", line 81, in unlock_chordret = j(timeout=3.0, propagate=True)File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 739, in joininterval=interval, no_ack=no_ack, on_interval=on_interval,File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 213, in getself.maybe_throw(callback=callback)File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 329, in maybe_throwself.throw(value, self._to_remote_traceback(tb))File "/home/flask/.local/lib/python3.6/site-packages/celery/result.py", line 322, in throwself.on_ready.throw(*args, **kwargs)File "/home/flask/.local/lib/python3.6/site-packages/vine/promises.py", line 217, in throwreraise(type(exc), exc, tb)File "/home/flask/.local/lib/python3.6/site-packages/vine/five.py", line 179, in reraiseraise value
ValueError: BAD
Received task: tasks.on_chord_error[cf3056bc-34ea-4681-87e7-cded53acb958]
Task celery.chord_unlock[0f37fa4d-4f12-4c65-9e08-b69f0cf2afd7] succeeded in 0.12482409999938682s: None
ON ERROR CALLBACK
Task ID: fe3dae19-0641-47fa-9c4d-953b868992e7
Extra info: extra info
Received task: tasks.callme[d6dfd6c0-f0d9-474f-9d98-be43e031de69]
Callback
Callback arg: extra info
https://en.xdnf.cn/q/73396.html

Related Q&A

Unpacking nested C structs in Python

I am trying to unpack a C struct that is handed to my Python program in binary form and includes another nested struct. The relevant part of the C header looks like this:typedef struct {uint8_t seq;uin…

Remove black borders on images with watermarks in Python

I have a bunch of image I would like to uniformise by removing black borders. Usually I use the Trim function of Imagemagick with the fuzz parameters but in the case the image have some watermark the r…

scipy cdist with sparse matrices

I need to calculate the distances between two sets of vectors, source_matrix and target_matrix.I have the following line, when both source_matrix and target_matrix are of type scipy.sparse.csr.csr_matr…

NumPy arrays with SQLite

The most common SQLite interface Ive seen in Python is sqlite3, but is there anything that works well with NumPy arrays or recarrays? By that I mean one that recognizes data types and does not requir…

Binary Phase Shift Keying in Python

Im currently working on some code to transmit messages/files/and other data over lasers using audio transformation. My current code uses the hexlify function from the binascii module in python to conve…

Django. Listing files from a static folder

One seemingly basic thing that Im having trouble with is rendering a simple list of static files (say the contents of a single repository directory on my server) as a list of links. Whether this is sec…

inconsistent migration history when changing a django apps name

Im trying to rename one of the apps in my django website. There is another app which depends on it and its mysql tables. I went over all the files in both apps and changed the instances of the old name…

Tensorflow vs Numpy math functions

Is there any real difference between the math functions performed by numpy and tensorflow. For example, exponential function, or the max function? The only difference I noticed is that tensorflow take…

PyQt5: I cant understand QGraphicsScenes setSceneRect(x, y, w, h)

I see some people say if you want to put QGraphicsScenes origin of coordinates at the origin of QGraphicsView, i.e. top-left corner. You need to let both of them have the same size.So here is what I do…

Remove first character from string Django template

I know this has been asked multiple times but the solution that everyone reaches (and the documentation) doesnt seem to be working for me...Trying to remove first characterCode is {{ picture.picture_pa…