Stucking of python multiprocessing.pool

2024/10/6 6:23:09

i have multiprocessing script with "pool.imap_unordered".

I ran into a problem when the script got stuck but i check CPU usage — nothing happening (by "top" command on ubuntu).

Going into the screen session, I see a process stuck on execution.

As I understand it, I do not use the problematic fork() method.

Also when i am running script on small amount of returning data freezes do not occur (But they occur on relatively small amounts - table < 5 MB in csv).

Can someone suggest what exactly can help? semaphore, lock or something else... I tried more processes with less data - didn't help. it may be easier to change to python parallel...


import multiprocessingdef my_func(df):# modify df here# df = df.head(1)return dfif __name__ == "__main__":df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})with multiprocessing.Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:groups = (g for _, g in df.groupby("a"))print(df)print(groups)out = []for res in pool.imap_unordered(my_func, groups):out.append(res)final_df = pd.concat(out)

also i tried

import multiprocessingdef my_func(df):# modify df here# df = df.head(1)return dfif __name__ == "__main__":df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})with multiprocessing.get_context("spawn").Pool(processes = (multiprocessing.cpu_count() - 1)) as pool:groups = (g for _, g in df.groupby("a"))print(df)print(groups)out = []for res in pool.imap_unordered(my_func, groups):out.append(res)final_df = pd.concat(out)
Answer

You say your code doesn't work for big data. I don't have much to go on by that description but I will work on the assumption that lack of memory is the cause of your problem. If so, how large is your dataframe and how much memory do you have for running user application? There may be no solution other than getting more memory. But before you do that, we can ask: What can be done to limit memory utilization in your current processing? Will it be enough so that the program now runs? The following is a rather lengthy analysis of the situation and I propose a couple of changes you could make that might help assuming my analysis is correct.

One of the issues using method imap_unordered is that you have little control over how many tasks initially get submitted to the pool's task queue and continue to be submitted as tasks complete. If you had (1) a large dataframe and (2) a large number of tasks that need to be submitted and (3) the entire dataframe is being passed to your worker function for each task, then that dataframe would initially be replicated on the task queue a number of time for which you have very little control and you could quickly run out of memory. The solution then would be to somehow limit how many tasks can be sitting on the task queue waiting to be processed at any point in time.

But that is not exactly your situation because you are not passing to my_func the entire df dataframe for each submitted task but rather a subset. Even if all subsets created by the groupby method were sitting on the task queue together, the storage requirements of the task queue would approximately equal to the size of the entire dataframe. Then as these groups are processed and a results returned, the storage taken up by the task queue would be decreasing as the storage required for your out list would be increasing. The total storage requirements would probably not be changing all that much as the tasks are being processed. That is, the task queue will be decreasing in size as out grows in size. But when you create final_df you simultaneously have storage requirements for df, out, and final_df. So you currently need to be able to hold in memory essentially 3 instances of your dataframe.

The simplest thing you can do is to delete the initial df dataframe before you perform the concatenation of your out list. Will this resolve your memory problem? I don't know but now we only need enough memory to hold two copies of your large dataframe.

The additional thing we could do is to control the rate at which tasks are queued up and thus limit the storage required by the task queue. The code below shows how we can do this by replacing method imap_unordered with apply_async specifying a callback. We use a multiprocessing.BoundedSemaphore initialized to N where N is the maximum number of tasks we want queued up at any time. Before apply_async can be called to submit the next task, the main process must first acquire the semaphore. It will be able to do this N times without blocking. As tasks complete our callback function does a release on the semaphore allowing a new task to be submitted. But even with this change you would still need adequate storage to be able to hold two copies of your dataframe and this change will probably not help. But if your actual my_func is returning something smaller than the passed df, then this should help. If your code now works, you can simply comment out the calls semaphore.acquire() semaphore.release() to remove this change and see if it still continues to work.

import multiprocessing
import pandas as pddef my_func(df):# modify df here# df = df.head(1)return dfif __name__ == "__main__":# Use number of cores or possibly a smaller number if# we still have memory issues:POOL_SIZE = multiprocessing.cpu_count()# So that when a task completes there is always another task# on the input queue ready to run. If memory is still an issue,# then set SEMAPORE_SIZE = POOL_SIZESEMAPHORE_SIZE = 2 * POOL_SIZEsemaphore = multiprocessing.BoundedSemaphore(SEMAPHORE_SIZE)out = []def my_callback(result_df):out.append(result_df)# Allow another task to be submitted:semaphore.release()df = pd.DataFrame({'a': [2, 2, 1, 1, 3, 3], 'b': [4, 5, 6, 4, 5, 6], 'c': [4, 5, 6, 4, 5, 6]})pool = multiprocessing.Pool(processes=POOL_SIZE)for _, group in df.groupby("a"):semaphore.acquire()pool.apply_async(my_func, args=(group,), callback=my_callback)# Wait for all tasks to complete:pool.close()pool.join()del df # reclaim storage we do not need any morefinal_df = pd.concat(out)print(final_df)

Prints:

   a  b  c
2  1  6  6
3  1  4  4
0  2  4  4
1  2  5  5
4  3  5  5
5  3  6  6
https://en.xdnf.cn/q/118866.html

Related Q&A

python: merge two csv files

I have a problem while Im doing my assignment with python. Im new to python so I am a complete beginner.Question: How can I merge two files below?s555555,7 s333333,10 s666666,9 s111111,10 s999999,9and…

Generate a custom formated string with python

I have a Javascript code that generates a string (similar to uuid) stringHere it is the js code: var t = "xxxxxxxx-xxxx-xxxx-xxxx-xxxx-xxxxxxxx", i = (new Date).getTime(); return e = t.repla…

PySide-6.6: clicked signal sends extra boolean argument to slot

I just upgraded PySide6 (PySide6==6.6.0 from 6.2.2) and a new behavior is wreaking havoc with my GUI program. Every place where I have a clicked signal, the hooked up slot is receiving an extra bool==F…

SMTPConnectError when using Django

Im using django-registration for handling of users registration. I tried to signup in order to test it, after testing it, I got this errorSMTPConnectError at /accounts/register/Being trying to find a s…

how to have a single search API using path parameters (No form used)

I have been using this view for searching a word as:db refers mongo connection (just for ref)@app.route(/) def index():return render_template(index.html)@app.route(/words-<word>, methods=[GET, PO…

understanding the return type of anonymous function lambda

I am trying to understand how can lambda function be used. def adder_func(a, b):return a + bprint(adder_func(4, 5))# trying with lambda print(list(lambda a, b: a + b))When trying to use lambda as a add…

What is the meaning of Failed building wheel for flask-mysqldb in pip3 install?

I have a MacBook Air with macOs Sonoma 14.0 and when I write in the terminal $ pip3 install flask-mysqldbI get the error:How can I fix this?

How do I install pygame for a new version of idle? (Windows) [duplicate]

This question already has answers here:Error on install Pygame(3.9) install (Win10) [duplicate](1 answer)Unable to install pygame on Python via pip (Windows 10)(6 answers)Closed 3 years ago.I installed…

How to parse a dynamic dom element?

I want to make a parser for scraping price, however I cant find the working method of parsing innerHTMLI dont know why, but selenium (getAttribute(innerHTML)), phantomjs (page.evaluation function(){ret…

Get a string in Shell/Python with subprocess

After this topic Get a string in Shell/Python using sys.argv , I need to change my code, I need to use a subprocess in a main.py with this function :def download_several_apps(self):subproc_two = subpro…