Who runs the callback when using apply_async method of a multiprocessing pool?

2024/11/19 9:35:34

I'm trying to understand a little bit of what's going on behind the scenes when using the apply_sync method of a multiprocessing pool.

Who runs the callback method? Is it the main process that called apply_async?

Let's say I send out a whole bunch of apply_async commands with callbacks and then continue with my program. My program is still doing things when the apply_async's start to finish. How does the callback get run my the "main process" while the main process is still busy with the script?

Here's an example.

import multiprocessing
import timedef callback(x):print '{} running callback with arg {}'.format(multiprocessing.current_process().name, x)def func(x):print '{} running func with arg {}'.format(multiprocessing.current_process().name, x)return xpool = multiprocessing.Pool()args = range(20)for a in args:pool.apply_async(func, (a,), callback=callback)print '{} going to sleep for a minute'.format(multiprocessing.current_process().name)t0 = time.time()
while time.time() - t0 < 60:passprint 'Finished with the script'

The output is something like

PoolWorker-1 running func with arg 0

PoolWorker-2 running func with arg 1

PoolWorker-3 running func with arg 2

MainProcess going to sleep for a minute <-- main process is busy

PoolWorker-4 running func with arg 3

PoolWorker-1 running func with arg 4

PoolWorker-2 running func with arg 5

PoolWorker-3 running func with arg 6

PoolWorker-4 running func with arg 7

MainProcess running callback with arg 0 <-- main process running callback while it's still in the while loop!!

MainProcess running callback with arg 1

MainProcess running callback with arg 2

MainProcess running callback with arg 3

MainProcess running callback with arg 4

PoolWorker-1 running func with arg 8

...

Finished with script

How is MainProcess running the callback while it's in the middle of that while loop??

There is this statement about the callback in the documentation for multiprocessing.Pool that seems like a hint but I don't understand it.

apply_async(func[, args[, kwds[, callback]]])

A variant of the apply() method which returns a result object.

If callback is specified then it should be a callable which accepts a single argument. When the result becomes ready callback is applied to it (unless the call failed). callback should complete immediately since otherwise the thread which handles the results will get blocked.

Answer

There is indeed a hint in the docs:

callback should complete immediately since otherwise the thread whichhandles the results will get blocked.

The callbacks are handled in the main process, but they're run in their own separate thread. When you create a Pool it actually creates a few Thread objects internally:

class Pool(object):Process = Processdef __init__(self, processes=None, initializer=None, initargs=(),maxtasksperchild=None):self._setup_queues()self._taskqueue = Queue.Queue()self._cache = {}... # stuff we don't care aboutself._worker_handler = threading.Thread(target=Pool._handle_workers,args=(self, ))self._worker_handler.daemon = Trueself._worker_handler._state = RUN self._worker_handler.start()self._task_handler = threading.Thread(target=Pool._handle_tasks,args=(self._taskqueue, self._quick_put, self._outqueue,self._pool, self._cache))self._task_handler.daemon = Trueself._task_handler._state = RUN self._task_handler.start()self._result_handler = threading.Thread(target=Pool._handle_results,args=(self._outqueue, self._quick_get, self._cache))self._result_handler.daemon = Trueself._result_handler._state = RUNself._result_handler.start()

The interesting thread for us is _result_handler; we'll get to why shortly.

Switching gears for a second, when you run apply_async, it creates an ApplyResult object internally to manage getting the result from the child:

def apply_async(self, func, args=(), kwds={}, callback=None):assert self._state == RUNresult = ApplyResult(self._cache, callback)self._taskqueue.put(([(result._job, None, func, args, kwds)], None))return resultclass ApplyResult(object):def __init__(self, cache, callback):self._cond = threading.Condition(threading.Lock())self._job = job_counter.next()self._cache = cacheself._ready = Falseself._callback = callbackcache[self._job] = selfdef _set(self, i, obj):self._success, self._value = objif self._callback and self._success:self._callback(self._value)self._cond.acquire()try:self._ready = Trueself._cond.notify()finally:self._cond.release()del self._cache[self._job]

As you can see, the _set method is the one that ends up actually executing the callback passed in, assuming the task was successful. Also notice that it adds itself to a global cache dict at the end of __init__.

Now, back to the _result_handler thread object. That object calls the _handle_results function, which looks like this:

    while 1:try:task = get()except (IOError, EOFError):debug('result handler got EOFError/IOError -- exiting')returnif thread._state:assert thread._state == TERMINATEdebug('result handler found thread._state=TERMINATE')breakif task is None:debug('result handler got sentinel')breakjob, i, obj = tasktry:cache[job]._set(i, obj)  # Here is _set (and therefore our callback) being called!except KeyError:pass# More stuff

It's a loop that just pulls results from children out of queue, finds the entry for it in cache, and calls _set, which executes our callback. It's able to run even though you're in a loop because it isn't running in the main thread.

https://en.xdnf.cn/q/26459.html

Related Q&A

Difference between hash() and id()

I have two user-defined objects, say a and b. Both these objects have the same hash values. However, the id(a) and id(b) are unequal.Moreover, >>> a is b False >>> a == b TrueFrom th…

get class name for empty queryset in django

I have empty queryset of model Studentstudents = Students.objects.all()If the above queryset is empty, then how can i get the model(class name)?How can i get the model name for empty queryset?EDIT:Ho…

`Sudo pip install matplotlib` fails to find freetype headers. [OS X Mavericks / 10.9] [closed]

Closed. This question does not meet Stack Overflow guidelines. It is not currently accepting answers.This question does not appear to be about a specific programming problem, a software algorithm, or s…

Parallel processing from a command queue on Linux (bash, python, ruby... whatever)

I have a list/queue of 200 commands that I need to run in a shell on a Linux server. I only want to have a maximum of 10 processes running (from the queue) at once. Some processes will take a few secon…

How do I select and store columns greater than a number in pandas? [duplicate]

This question already has answers here:How do I select rows from a DataFrame based on column values?(17 answers)Closed 28 days ago.I have a pandas DataFrame with a column of integers. I want the rows …

Plotting transparent histogram with non transparent edge

I am plotting a histogram, and I have three datasets which I want to plot together, each one with different colours and linetype (dashed, dotted, etc). I am also giving some transparency, in order to s…

Cross-platform desktop notifier in Python

I am looking for Growl-like, Windows balloon-tip-like notifications library in Python. Imagine writing code like:>>> import desktopnotifier as dn >>> dn.notify(Title, Long description…

set object is not JSON serializable [duplicate]

This question already has answers here:How to JSON serialize sets? [duplicate](12 answers)Closed 9 years ago.When I try to run the following code:import jsond = {testing: {1, 2, 3}} json_string = json…

Python Sqlite3: INSERT INTO table VALUE(dictionary goes here)

I would like to use a dictionary to insert values into a table, how would I do this? import sqlite3db = sqlite3.connect(local.db) cur = db.cursor()cur.execute(DROP TABLE IF EXISTS Media)cur.execute(CR…

Count the uppercase letters in a string with Python

I am trying to figure out how I can count the uppercase letters in a string. I have only been able to count lowercase letters:def n_lower_chars(string):return sum(map(str.islower, string))Example of w…