How to retrieve values from a function run in parallel processes?

2024/10/13 7:26:10

The Multiprocessing module is quite confusing for python beginners specially for those who have just migrated from MATLAB and are made lazy with its parallel computing toolbox. I have the following function which takes ~80 Secs to run and I want to shorten this time by using Multiprocessing module of Python.

from time import timexmax   = 100000000start = time()
for x in range(xmax):y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)
end  = time()
tt   = end-start #total time
print('Each iteration took: ', tt/xmax)
print('Total time:          ', tt)

This outputs as expected:

Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2
Each iteration took:  8.667453265190124e-07
Total time:           86.67453265190125

As any iteration of the loop is not dependent on others, I tried to adopt this Server Process from the official documentation to scan chunks of the range in separate processes. And finally I came up with vartec's answer to this question and could prepare the following code. I also updated the code based on Darkonaut's response to the current question.

from time import time 
import multiprocessing as mpdef chunker (rng, t): # this functions makes t chunks out of rngL  = rng[1] - rng[0]Lr = L % tLm = L // th  = rng[0]-1chunks = []for i in range(0, t):c  = [h+1, h + Lm]h += Lmchunks.append(c)chunks[t-1][1] += Lr + 1return chunksdef worker(lock, xrange, return_dict):'''worker function'''for x in range(xrange[0], xrange[1]):y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)return_dict['x'].append(x)return_dict['y'].append(y)with lock:                list_x = return_dict['x']list_y = return_dict['y']list_x.append(x)list_y.append(y)return_dict['x'] = list_xreturn_dict['y'] = list_yif __name__ == '__main__':start = time()manager = mp.Manager()return_dict = manager.dict()lock = manager.Lock()return_dict['x']=manager.list()return_dict['y']=manager.list()xmax = 100000000nw = mp.cpu_count()workers = list(range(0, nw))chunks = chunker([0, xmax], nw)jobs = []for i in workers:p = mp.Process(target=worker, args=(lock, chunks[i],return_dict))jobs.append(p)p.start()for proc in jobs:proc.join()end = time()tt   = end-start #total timeprint('Each iteration took: ', tt/xmax)print('Total time:          ', tt)print(return_dict['x'])print(return_dict['y'])

which considerably reduces the run time to ~17 Secs. But, my shared variable cannot retrieve any values. Please help me find out which part of the code is going wrong.

the output I get is:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[]
[]

from which I expect:

Each iteration took:  1.7742713451385497e-07
Total time:           17.742713451385498
[0, 1, 2]
[-15, -3, 11]
Answer

The issue in your example is that modifications to standard mutable structures within Manager.dict will not be propagated. I'm first showing you how to fix it with manager, just to show you better options afterwards.

multiprocessing.Manager is a bit heavy since it uses a separate Process just for the Manager and working on a shared object needs using locks for data consistency. If you run this on one machine, there are better options with multiprocessing.Pool, in case you don't have to run customized Process classes and if you have to, multiprocessing.Process together with multiprocessing.Queue would be the common way of doing it.

The quoting parts are from the multiprocessing docs.


Manager

If standard (non-proxy) list or dict objects are contained in a referent, modifications to those mutable values will not be propagated through the manager because the proxy has no way of knowing when the values contained within are modified. However, storing a value in a container proxy (which triggers a setitem on the proxy object) does propagate through the manager and so to effectively modify such an item, one could re-assign the modified value to the container proxy...

In your case this would look like:

def worker(xrange, return_dict, lock):"""worker function"""for x in range(xrange[0], xrange[1]):y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)with lock:list_x = return_dict['x']list_y = return_dict['y']list_x.append(x)list_y.append(y)return_dict['x'] = list_xreturn_dict['y'] = list_y

The lock here would be a manager.Lock instance you have to pass along as argument since the whole (now) locked operation is not by itself atomic. (Here is an easier example with Manager using Lock)

This approach is perhaps less convenient than employing nested Proxy Objects for most use cases but also demonstrates a level of control over the synchronization.

Since Python 3.6 proxy objects are nestable:

Changed in version 3.6: Shared objects are capable of being nested. For example, a shared container object such as a shared list can contain other shared objects which will all be managed and synchronized by the SyncManager.

Since Python 3.6 you can fill your manager.dict before starting multiprocessing with manager.list as values and then append directly in the worker without having to reassign.

return_dict['x'] = manager.list()
return_dict['y'] = manager.list()

EDIT:

Here is the full example with Manager:

import time
import multiprocessing as mp
from multiprocessing import Manager, Process
from contextlib import contextmanager
# mp_util.py from first link in code-snippet for "Pool"
# section below
from mp_utils import calc_batch_sizes, build_batch_ranges# def context_timer ... see code snippet in "Pool" section belowdef worker(batch_range, return_dict, lock):"""worker function"""for x in batch_range:y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)with lock:return_dict['x'].append(x)return_dict['y'].append(y)if __name__ == '__main__':N_WORKERS = mp.cpu_count()X_MAX = 100000000batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)batch_ranges = build_batch_ranges(batch_sizes)print(batch_ranges)with Manager() as manager:lock = manager.Lock()return_dict = manager.dict()return_dict['x'] = manager.list()return_dict['y'] = manager.list()tasks = [(batch_range, return_dict, lock)for batch_range in batch_ranges]with context_timer():pool = [Process(target=worker, args=args)for args in tasks]for p in pool:p.start()for p in pool:p.join()# Create standard container with data from manager before exiting# the manager.result = {k: list(v) for k, v in return_dict.items()}print(result)

Pool

Most often a multiprocessing.Pool will just do it. You have an additional challenge in your example since you want to distribute iteration over a range. Your chunker function doesn't manage to divide the range even so every process has about the same work to do:

chunker((0, 21), 4)
# Out: [[0, 4], [5, 9], [10, 14], [15, 21]]  # 4, 4, 4, 6!

For the code below please grab the code snippet for mp_utils.py from my answer here, it provides two functions to chunk ranges as even as possible.

With multiprocessing.Pool your worker function just has to return the result and Pool will take care of transporting the result back over internal queues back to the parent process. The result will be a list, so you will have to rearange your result again in a way you want it to have. Your example could then look like this:

import time
import multiprocessing as mp
from multiprocessing import Pool
from contextlib import contextmanager
from itertools import chainfrom mp_utils import calc_batch_sizes, build_batch_ranges@contextmanager
def context_timer():start_time = time.perf_counter()yieldend_time = time.perf_counter()total_time   = end_time-start_timeprint(f'\nEach iteration took: {total_time / X_MAX:.4f} s')print(f'Total time:          {total_time:.4f} s\n')def worker(batch_range):"""worker function"""result = []for x in batch_range:y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)result.append((x, y))return resultif __name__ == '__main__':N_WORKERS = mp.cpu_count()X_MAX = 100000000batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)batch_ranges = build_batch_ranges(batch_sizes)print(batch_ranges)with context_timer():with Pool(N_WORKERS) as pool:results = pool.map(worker, iterable=batch_ranges)print(f'results: {results}')x, y = zip(*chain.from_iterable(results))  # filter and sort resultsprint(f'results sorted: x: {x}, y: {y}')

Example Output:

[range(0, 12500000), range(12500000, 25000000), range(25000000, 37500000), 
range(37500000, 50000000), range(50000000, 62500000), range(62500000, 75000000), range(75000000, 87500000), range(87500000, 100000000)]
Condition met at:  -15 0
Condition met at:  -3 1
Condition met at:  11 2Each iteration took: 0.0000 s
Total time:          8.2408 sresults: [[(0, -15), (1, -3), (2, 11)], [], [], [], [], [], [], []]
results sorted: x: (0, 1, 2), y: (-15, -3, 11)Process finished with exit code 0

If you had multiple arguments for your worker you would build a "tasks"-list with argument-tuples and exchange pool.map(...) with pool.starmap(...iterable=tasks). See docs for further details on that.


Process & Queue

If you can't use multiprocessing.Pool for some reason, you have to take care of inter-process communication (IPC) yourself, by passing a multiprocessing.Queue as argument to your worker-functions in the child- processes and letting them enqueue their results to be send back to the parent.

You will also have to build your Pool-like structure so you can iterate over it to start and join the processes and you have to get() the results back from the queue. More about Queue.get usage I've written up here.

A solution with this approach could look like this:

def worker(result_queue, batch_range):"""worker function"""result = []for x in batch_range:y = ((x+5)**2+x-40)if y <= 0xf+1:print('Condition met at: ', y, x)result.append((x, y))result_queue.put(result)  # <--if __name__ == '__main__':N_WORKERS = mp.cpu_count()X_MAX = 100000000result_queue = mp.Queue()  # <--batch_sizes = calc_batch_sizes(X_MAX, n_workers=N_WORKERS)batch_ranges = build_batch_ranges(batch_sizes)print(batch_ranges)with context_timer():pool = [Process(target=worker, args=(result_queue, batch_range))for batch_range in batch_ranges]for p in pool:p.start()results = [result_queue.get() for _ in batch_ranges]for p in pool:p.join()print(f'results: {results}')x, y = zip(*chain.from_iterable(results))  # filter and sort resultsprint(f'results sorted: x: {x}, y: {y}')
https://en.xdnf.cn/q/69560.html

Related Q&A

SignalR Alternative for Python

What would be an alternative for SignalR in Python world?To be precise, I am using tornado with python 2.7.6 on Windows 8; and I found sockjs-tornado (Python noob; sorry for any inconveniences). But s…

Python Variable Scope and Classes

In Python, if I define a variable:my_var = (1,2,3)and try to access it in __init__ function of a class:class MyClass:def __init__(self):print my_varI can access it and print my_var without stating (glo…

How to check if valid excel file in python xlrd library

Is there any way with xlrd library to check if the file you use is a valid excel file? I know theres other libraries to check headers of files and I could use file extension check. But for the sake of…

ValueError: Invalid file path or buffer object type: class tkinter.StringVar

Here is a simplified version of some code that I have. In the first frame, the user selects a csv file using tk.filedialog and it is meant to be plotted on the same frame on the canvas. There is also a…

Is there a way to reopen a socket?

I create many "short-term" sockets in some code that look like that :nb=1000 for i in range(nb):sck = socket.socket(socket.AF_INET, socket.SOCK_STREAM)sck.connect((adr, prt)sck.send(question …

How django handles simultaneous requests with concurrency over global variables?

I have a django instance hosted via apache/mod_wsgi. I use pre_save and post_save signals to store the values before and after save for later comparisons. For that I use global variables to store the p…

Why cant I string.print()?

My understanding of the print() in both Python and Ruby (and other languages) is that it is a method on a string (or other types). Because it is so commonly used the syntax:print "hi"works.S…

Difference between R.scale() and sklearn.preprocessing.scale()

I am currently moving my data analysis from R to Python. When scaling a dataset in R i would use R.scale(), which in my understanding would do the following: (x-mean(x))/sd(x)To replace that function I…

Generate random timeseries data with dates

I am trying to generate random data(integers) with dates so that I can practice pandas data analytics commands on it and plot time series graphs. temp depth acceleration 2019-01-1 -0.218062 -1.21…

Spark select top values in RDD

The original dataset is:# (numbersofrating,title,avg_rating) newRDD =[(3,monster,4),(4,minions 3D,5),....] I want to select top N avg_ratings in newRDD.I use the following code,it has an error.selectne…