Multiprocessing Pool hangs if child process killed

2024/10/4 15:39:19

I launched a pool of worker processes and submitted a bunch of tasks. The system ran low on memory and the oomkiller killed one of the worker processes. The parent process just hung there waiting for the tasks to finish and never returned.

Here's a runnable example that reproduces the problem. Instead of waiting for oomkiller to kill one of the worker processes, I find the process ids of all the worker processes and tell the first task to kill that process. (The call to ps won't work on all operating systems.)

import os
import signal
from multiprocessing import Pool
from random import choice
from subprocess import run, PIPE
from time import sleepdef run_task(task):target_process_id, n = taskprint(f'Processing item {n} in process {os.getpid()}.')delay = n + 1sleep(delay)if n == 0:print(f'Item {n} killing process {target_process_id}.')os.kill(target_process_id, signal.SIGKILL)else:print(f'Item {n} finished.')return n, delaydef main():print('Starting.')pool = Pool()ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],stdout=PIPE, encoding='utf8')child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]target_process_id = choice(child_process_ids[1:-1])tasks = ((target_process_id, i) for i in range(10))for n, delay in pool.imap_unordered(run_task, tasks):print(f'Received {delay} from item {n}.')print('Closing.')pool.close()pool.join()print('Done.')if __name__ == '__main__':main()

When I run that on a system with eight CPU's, I see this output:

Starting.
Processing item 0 in process 303.
Processing item 1 in process 304.
Processing item 2 in process 305.
Processing item 3 in process 306.
Processing item 4 in process 307.
Processing item 5 in process 308.
Processing item 6 in process 309.
Processing item 7 in process 310.
Item 0 killing process 308.
Processing item 8 in process 303.
Received 1 from item 0.
Processing item 9 in process 315.
Item 1 finished.
Received 2 from item 1.
Item 2 finished.
Received 3 from item 2.
Item 3 finished.
Received 4 from item 3.
Item 4 finished.
Received 5 from item 4.
Item 6 finished.
Received 7 from item 6.
Item 7 finished.
Received 8 from item 7.
Item 8 finished.
Received 9 from item 8.
Item 9 finished.
Received 10 from item 9.

You can see that item 5 never returns, and the pool just waits forever.

How can I get the parent process to notice when a child process is killed?

Answer

This problem is described in Python bug 9205, but they decided to fix it in the concurrent.futures module instead of in the multiprocessing module. In order to take advantage of the fix, switch to the newer process pool.

import os
import signal
from concurrent.futures.process import ProcessPoolExecutor
from random import choice
from subprocess import run, PIPE
from time import sleepdef run_task(task):target_process_id, n = taskprint(f'Processing item {n} in process {os.getpid()}.')delay = n + 1sleep(delay)if n == 0:print(f'Item {n} killing process {target_process_id}.')os.kill(target_process_id, signal.SIGKILL)else:print(f'Item {n} finished.')return n, delaydef main():print('Starting.')pool = ProcessPoolExecutor()pool.submit(lambda: None)  # Force the pool to launch some child processes.ps_output = run(['ps', '-opid', '--no-headers', '--ppid', str(os.getpid())],stdout=PIPE, encoding='utf8')child_process_ids = [int(line) for line in ps_output.stdout.splitlines()]target_process_id = choice(child_process_ids[1:-1])tasks = ((target_process_id, i) for i in range(10))for n, delay in pool.map(run_task, tasks):print(f'Received {delay} from item {n}.')print('Closing.')pool.shutdown()print('Done.')if __name__ == '__main__':main()

Now when you run it, you get a clear error message.

Starting.
Processing item 0 in process 549.
Processing item 1 in process 550.
Processing item 2 in process 552.
Processing item 3 in process 551.
Processing item 4 in process 553.
Processing item 5 in process 554.
Processing item 6 in process 555.
Processing item 7 in process 556.
Item 0 killing process 556.
Processing item 8 in process 549.
Received 1 from item 0.
Traceback (most recent call last):File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 42, in <module>main()File "/home/don/.config/JetBrains/PyCharm2020.1/scratches/scratch2.py", line 33, in mainfor n, delay in pool.map(run_task, tasks):File "/usr/lib/python3.7/concurrent/futures/process.py", line 483, in _chain_from_iterable_of_listsfor element in iterable:File "/usr/lib/python3.7/concurrent/futures/_base.py", line 598, in result_iteratoryield fs.pop().result()File "/usr/lib/python3.7/concurrent/futures/_base.py", line 428, in resultreturn self.__get_result()File "/usr/lib/python3.7/concurrent/futures/_base.py", line 384, in __get_resultraise self._exception
concurrent.futures.process.BrokenProcessPool: A process in the process pool was terminated abruptly while the future was running or pending.
https://en.xdnf.cn/q/70597.html

Related Q&A

What does sys.maxunicode mean?

CPython stores unicode strings as either utf-16 or utf-32 internally depending on compile options. In utf-16 builds of Python string slicing, iteration, and len seem to work on code units, not code po…

How to detect dialogs close event?

Hi everyone.I am making a GUI application using python3.4, PyQt5 in windows 7. Application is very sample. User clicks a main windows button, information dialog pops up. And when a user clicks informat…

How to Make a Portable Jupyter Slideshow

How do I make a Jupyter slide show portable? I can serve the slideshow locally, but I cant send that to anyone and have it work with all the images, slide animation functionality, etc. I am using jupy…

How to animate a bar char being updated in Python

I want to create an animated, stacked bar chart.There is a great tutorial, which shows how to animate line graphs.However, for animating bar charts, the BarContainer object, does not have any attribute…

Add text to end of line without loading file

I need to store information into a very big file, in form of many dictionaries. Thats not so important, is just to say that I tried to first get all the data into these dictionaries and I run out of me…

How does one use `dis.dis` to analyze performance?

Im trying to use pythons dis library to experiment with & understand performance. Below is an experiment i tried, with the results.import disdef myfunc1(dictionary):t = tuple(dictionary.items())ret…

How do I require HTTPS for this Django view?

(r^login/?$,django.contrib.auth.views.login,{template_name:login.html, authentication_form:CustomAuthenticationForm}),How do I add HTTPS required to this? I usually have a decorator for it..But in th…

How many times a number appears in a numpy array

I need to find a way to count how many times each number from 0 to 9 appears in a random matrix created using np.random.randint()import numpy as np p = int(input("Length of matrix: ")) m = np…

python: How to remove values from 2 lists based on whats in 1 list

I have 2 lists of numbers, one called xVar and the other called yVar. I will use these 2 elements to plot X and Y values on a graph. They both have the same number of elements. Normally, I would jus…

merge two dataframe columns into 1 in pandas

I have 2 columns in my data frame and I need to merge it into 1 single columnIndex A Index B 0 A 0 NAN 1 NAN 1 D 2 B 2 …