Suppose I have some tasks running asynchronously. They may be totally independent, but I still want to set points where the tasks will pause so they can run concurrently.
What is the correct way to run the tasks concurrently? I am currently using await asyncio.sleep(0)
, but I feel this is adding a lot of overhead.
import asyncioasync def do(name, amount):for i in range(amount):# Do some time-expensive workprint(f'{name}: has done {i}')await asyncio.sleep(0)return f'{name}: done'async def main():res = await asyncio.gather(do('Task1', 3), do('Task2', 2))print(*res, sep='\n')loop = asyncio.get_event_loop()loop.run_until_complete(main())
Output
Task1: has done 0
Task2: has done 0
Task1: has done 1
Task2: has done 1
Task1: has done 2
Task1: done
Task2: done
If we were using simple generators, an empty yield
would pause the flow of a task without any overhead, but empty await
are not valid.
What is the correct way to set such breakpoints without overhead?
As mentioned in the comments, normally asyncio coroutines suspend automatically on calls that would block or sleep in equivalent synchronous code. In your case the coroutine is CPU-bound, so awaiting blocking calls is not enough, it needs to occasionally relinquish control to the event loop to allow the rest of the system to run.
Explicit yields are not uncommon in cooperative multitasking, and using await asyncio.sleep(0)
for that purpose will work as intended, it does carry a risk: sleep too often, and you're slowing down the computation by unnecessary switches; sleep too seldom, and you're hogging the event loop by spending too much time in a single coroutine.
The solution provided by asyncio is to offload CPU-bound code to a thread pool using run_in_executor
. Awaiting it will automatically suspend the coroutine until the CPU-intensive task is done, without any intermediate polling. For example:
import asynciodef do(id, amount):for i in range(amount):# Do some time-expensive workprint(f'{id}: has done {i}')return f'{id}: done'async def main():loop = asyncio.get_event_loop()res = await asyncio.gather(loop.run_in_executor(None, do, 'Task1', 5),loop.run_in_executor(None, do, 'Task2', 3))print(*res, sep='\n')loop = asyncio.get_event_loop()
loop.run_until_complete(main())