Combining asyncio with a multi-worker ProcessPoolExecutor and for async

2024/10/14 23:17:18

My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for) makes the excellent answers there unusuable for me.

I am trying the following MWE:

import concurrent.futures
import asyncio
import timeasync def mygen(u: int = 2):i = 0while i < u:yield ii += 1def blocking(delay):time.sleep(delay+1)return('EXECUTOR: Completed blocking task number ' + str(delay+1))async def non_blocking(loop):with concurrent.futures.ProcessPoolExecutor() as executor:async for i in mygen():print('MASTER: Sending to executor blocking task number ' + str(i+1))result = await loop.run_in_executor(executor, blocking, i)print(result)print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))

The output from this, as expected, is not asynchronous:

MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2 
EXECUTOR: Completed blocking task number 2 
MASTER: Well done executor - you seem to have completed blocking task number 2

I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:

MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2

I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor() is blocking. I don't know how to replace it in a way that allows the async for to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather as in their example.

Answer

If you want to have a maximum of two processes running your tasks, the simplest way to achieve that is to create the executor with max_workers=2. Then you can submit tasks as fast as possible, i.e. proceed with the next iteration of async for without waiting for the previous task to finish. You can gather the results of all tasks at the end, to ensure the exceptions don't go unnoticed (and possibly to get the actual results).

The following code produces the expected output:

from concurrent.futures import ProcessPoolExecutor
import asyncio
import timeasync def mygen(u: int = 2):i = 0while i < u:yield ii += 1def blocking(delay):time.sleep(delay+1)return('EXECUTOR: Completed blocking task number ' + str(delay+1))async def run_blocking(executor, task_no, delay):print('MASTER: Sending to executor blocking task number '+ str(task_no))result = await loop.run_in_executor(executor, blocking, delay)print(result)print('MASTER: Well done executor - you seem to have completed ''blocking task number ' + str(task_no))async def non_blocking(loop):tasks = []with ProcessPoolExecutor(max_workers=2) as executor:async for i in mygen():# spawn the task and let it run in the backgroundtasks.append(asyncio.create_task(run_blocking(executor, i + 1, i)))# if there was an exception, retrieve it nowawait asyncio.gather(*tasks)loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
https://en.xdnf.cn/q/69359.html

Related Q&A

Convert UTF-8 to string literals in Python

I have a string in UTF-8 format but not so sure how to convert this string to its corresponding character literal. For example I have the string:My string is: Entre\xc3\xa9Example one:This code:uEntre\…

Memory usage not getting lowered even after job is completed successfully

I have a job added in apscheduler which loads some data in memory and I am deleting all the objects after the job is complete. Now if I run this job with python it works successfully and memory drop af…

How to output sklearn standardscaler

I have standardized my data in sklearn using preprocessing.standardscaler. Question is how could I save this in my local for latter use?Thanks

How to use Jobqueue in Python-telegram-bot

I have able to make a bot very easily by reading the docs but Jobqueue is not working as per it is written. The run_daily method uses a datetime.time object to send the message at a particular time but…

Is there a way to override default assert in pytest (python)?

Id like to a log some information to a file/database every time assert is invoked. Is there a way to override assert or register some sort of callback function to do this, every time assert is invoked?…

How to install pycairo on osx?

I am trying to install the pycairo (Python bindings for the cairo graphics library) under OSX.I started witheasy_install pycairoand got: Requested cairo >= 1.8.8 but version of cairo is 1.0.4error: …

How do I change a value in a .npz file?

I want to change one value in an npz file.The npz file contains several npys, I want all but one ( run_param ) to remain unchanged and I want to save over the original file.This is my working code:DATA…

How to load *.hdr files using python

I would like to read an environment map in *.hdr file format. It seems that very popular libraries doesnt support .hdr file reading, for example, OpenCV, PIL etc.. So how to read a .hdr file into a num…

What is the preferred way to compose a set from multiple lists in Python

I have a few different lists that I want to turn into a set and use to find the difference from another set. Lets call them A, B, and C. Is the more optimal way to do this set(A + B + C) or set(A).unio…

matplotlib plot small image without resampling

Im trying to plot a small image in python using matplotlib and would like the displayed axes to have the same shape as the numpy array it was generated from, i.e. the data should not be resampled. In o…