Python multiprocessing pool: dynamically set number of processes during execution of tasks

2024/9/8 10:54:18

We submit large CPU intensive jobs in Python 2.7 (that consist of many independent parallel processes) on our development machine which last for days at a time. The responsiveness of the machine slows down a lot when these jobs are running with a large number of processes. Ideally, I would like to limit the number of CPU available during the day when we're developing code and over night run as many processes as efficiently possible.

The Python multiprocessing library allows you to specify the number of process when you initiate a Pool. Is there a way to dynamically change this number each time a new task is initiated?

For instance, allow 20 processes to run during the hours 19-07 and 10 processes from hours 07-19.

One way would be to check the number of active processes using significant CPU. This is how I would like it to work:

from multiprocessing import Pool
import time pool = Pool(processes=20)def big_task(x):while check_n_process(processes=10) is False:time.sleep(60*60)x += 1return x x = 1
multiple_results = [pool.apply_async(big_task, (x)) for i in range(1000)]
print([res.get() for res in multiple_results])

But I would need to write the 'check_n_process' function.

Any other ideas how this problem could be solved?

(The code needs to run in Python 2.7 - a bash implementation is not feasible).

Answer

Python multiprocessing.Pool does not provide a way to change the amount of workers of a running Pool. A simple solution would be relying on third party tools.

The Pool provided by billiard used to provide such a feature.

Task queue frameworks like Celery or Luigi surely allow a flexible workload but are way more complex.

If the use of external dependencies is not feasible, you can try the following approach. Elaborating from this answer, you could set a throttling mechanism based on a Semaphore.

from threading import Semaphore, Lock
from multiprocessing import Pooldef TaskManager(object):def __init__(self, pool_size):self.pool = Pool(processes=pool_size)self.workers = Semaphore(pool_size)# ensures the semaphore is not replaced while usedself.workers_mutex = Lock()  def change_pool_size(self, new_size):"""Set the Pool to a new size."""with self.workers_mutex:  self.workers = Semaphore(new_size)def new_task(self, task):"""Start a new task, blocks if queue is full."""with self.workers_mutex:self.workers.acquire()self.pool.apply_async(big_task, args=[task], callback=self.task_done))def task_done(self):"""Called once task is done, releases the queue is blocked."""with self.workers_mutex:self.workers.release()

The pool would block further attempts to schedule your big_tasks if more than X workers are busy. By controlling this mechanism you could throttle the amount of processes running concurrently. Of course, this means that you give up the Pool queueing mechanism.

task_manager = TaskManager(20)while True:if seven_in_the_morning():task_manager.change_pool_size(10)if seven_in_the_evening():task_manager.change_pool_size(20)task = get_new_task()task_manager.new_task()  # blocks here if all workers are busy
https://en.xdnf.cn/q/72424.html

Related Q&A

TypeError: cant escape psycopg2.extensions.Binary to binary

I try to store binary file into postgresql through sqlalchemy and file is uploaded from client. A bit google on the error message brings me to this source file:" wrapped object is not bytes or a…

Keras: Cannot Import Name np_utils [duplicate]

This question already has answers here:ImportError: cannot import name np_utils(19 answers)Closed 6 years ago.Im using Python 2.7 and a Jupyter notebook to do some basic machine learning. Im following…

Python 3 string index lookup is O(1)?

Short story:Is Python 3 unicode string lookup O(1) or O(n)?Long story:Index lookup of a character in a C char array is constant time O(1) because we can with certainty jump to a contiguous memory loca…

Using PIL to detect a scan of a blank page

So I often run huge double-sided scan jobs on an unintelligent Canon multifunction, which leaves me with a huge folder of JPEGs. Am I insane to consider using PIL to analyze a folder of images to detec…

Pandas: Filling data for missing dates

Lets say Ive got the following table:ProdID Date Val1 Val2 Val3 Prod1 4/1/2019 1 3 4 Prod1 4/3/2019 2 3 54 Prod1 4/4/2019 3 4 54 Prod2 4/1/2019 1 3 3…

Linear Regression: How to find the distance between the points and the prediction line?

Im looking to find the distance between the points and the prediction line. Ideally I would like the results to be displayed in a new column which contains the distance, called Distance.My Imports:impo…

How to draw a Tetrahedron mesh by matplotlib?

I want to plot a tetrahedron mesh by matplotlib, and the following are a simple tetrahedron mesh: xyz = np.array([[-1,-1,-1],[ 1,-1,-1], [ 1, 1,-1],[-1, 1,-1],[-1,-1, 1],[ 1,-1, 1], [ 1, 1, 1],[-1, 1, …

How to set seaborn jointplot axis to log scale

How to set axis to logarithmic scale in a seaborn jointplot? I cant find any log arguments in seaborn.jointplot Notebook import seaborn as sns import pandas as pddf = pd.read_csv("https://storage…

Convert decision tree directly to png [duplicate]

This question already has answers here:graph.write_pdf("iris.pdf") AttributeError: list object has no attribute write_pdf(10 answers)Closed 7 years ago.I am trying to generate a decision tree…

Python: can I modify a Tuple?

I have a 2 D tuple (Actually I thought, it was a list.. but the error says its a tuple) But anyways.. The tuple is of form: (floatnumber_val, prod_id) now I have a dictionary which contains key-> p…