My question is very similar to Combining asyncio with a multi-worker ProcessPoolExecutor - however a slight change (I believe it's the async for
) makes the excellent answers there unusuable for me.
I am trying the following MWE:
import concurrent.futures
import asyncio
import timeasync def mygen(u: int = 2):i = 0while i < u:yield ii += 1def blocking(delay):time.sleep(delay+1)return('EXECUTOR: Completed blocking task number ' + str(delay+1))async def non_blocking(loop):with concurrent.futures.ProcessPoolExecutor() as executor:async for i in mygen():print('MASTER: Sending to executor blocking task number ' + str(i+1))result = await loop.run_in_executor(executor, blocking, i)print(result)print('MASTER: Well done executor - you seem to have completed blocking task number ' + str(i+1))loop = asyncio.get_event_loop()
loop.run_until_complete(non_blocking(loop))
The output from this, as expected, is not asynchronous:
MASTER: Sending to executor blocking task number 1
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I would like to adjust the code so that the tasks are running in two concurrent processes and printing the output as it becomes available. Desired output is:
MASTER: Sending to executor blocking task number 1
MASTER: Sending to executor blocking task number 2
EXECUTOR: Completed blocking task number 1
MASTER: Well done executor - you seem to have completed blocking task number 1
EXECUTOR: Completed blocking task number 2
MASTER: Well done executor - you seem to have completed blocking task number 2
I understand from Combining asyncio with a multi-worker ProcessPoolExecutor that, as things stand, my syntax of await loop.run_in_executor()
is blocking. I don't know how to replace it in a way that allows the async for
to move to the next generated value while waiting for the executor to finish their work. Note I am not using asyncio.gather
as in their example.