Correct usage of asyncio.Conditions wait_for() method

2024/9/8 10:42:35

I'm writing a project using Python's asyncio module, and I'd like to synchronize my tasks using its synchronization primitives. However, it doesn't seem to behave as I'd expect.

From the documentation, it seems that Condition.wait_for() offers a means by which to allow a coroutine to wait for a particular user-defined condition to evaluate as true. However, on attempting to use the method, it seems to behave in ways I wouldn't expect - my condition is only checked once, and if it is found to be false, the waiting task simply hangs forever, without ever checking again. I've written a short example below to demonstrate what I'm trying to do:

#!/usr/bin/env pythonimport asynciothing = Falsesetter_done = None
getter_done = Noneasync def main():setter_done = asyncio.Event()getter_done = asyncio.Event()setter = asyncio.ensure_future(set_thing())getter = asyncio.ensure_future(get_thing())#To avoid the loop exiting prematurely:await setter_done.wait()await getter_done.wait()async def set_thing():global thingglobal setter_donething = False#sleep for some arbitrary amount of time; simulate work happeningawait asyncio.sleep(10)thing = Trueprint("Thing was set to True!")setter_done.set()async def get_thing():global thingglobal getter_donedef check_thing():print("Checking...")return thingc = asyncio.Condition()await c.acquire()await c.wait_for(check_thing)c.release()print("Thing was found to be true!")getter_done.set()if __name__ == "__main__":loop = asyncio.get_event_loop()loop.run_until_complete(main())

I'd expect this to print something like the following:

Checking...
Thing was set to True!
Checking...
Thing was found to be True!

Instead, I get:

Checking...
Thing was set to True!
... (hangs indefinitely)
Answer

I'm posting a full answer with a lot of comments to help these with a similar issue. I've changed the code example to use classes instead of globals. It's a bit longer but I hope it's not too complicated.

Basically the Command class represents a task. It's asynchronous, so it could do a lot of things. In my case I create just two dummy commands (read "two tasks"), one pausing for 5 seconds and one for 8 seconds, and I wait for both of them to be over with a condition. Obviously, conditions aren't the only way to do what I've done, but in keeping with the original answer, I think that's interesting to provide a fully-working example. So here goes!

import asyncio
from typing import Setclass Command:"""A command, an asynchronous task, imagine an asynchronous action."""async def run(self):"""To be defined in sub-classes."""passasync def start(self, condition: asyncio.Condition,commands: Set['Command']):"""Start the task, calling run asynchronously.This method also keeps track of the running commands."""commands.add(self)await self.run()commands.remove(self)# At this point, we should ask the condition to update# as the number of running commands might have reached 0.async with condition:condition.notify()class Look(Command):"""A subclass of a command, running a dummy task."""async def run(self):print("Before looking...")await asyncio.sleep(5)print("After looking")class Scan(Command):"""A subclass of a command, running a dummy task."""async def run(self):print("Before scanning...")await asyncio.sleep(8)print("After scanning")async def main():"""Our main coroutine, starting commands."""condition = asyncio.Condition()commands = set()commands.add(Look())commands.add(Scan())asyncio.gather(*(cmd.start(condition, commands) for cmd in commands))# Wait for the number of commands to reach 0async with condition:await condition.wait_for(lambda: len(commands) == 0)print("There's no running command now, exiting.")asyncio.run(main())

So in practice (as usual, start from the end), we call main as a coroutine. In main we create two commands, Look and Scan, and call their start method. The start method is defined on every command and it's basically responsible for writing the command itself in a set before it runs, and remove it after it has run (that is, after it's fully finished). And then it should notify the condition to check the length of commands again. When there's no command left, the program ends. If you run this script (I ran it with Python 3.8) you should see something like:

Before scanning...
Before looking...
After looking
After scanning
There's no running command now, exiting.

Notice that both commands start at the same time (well, Look begins slightly before, as a matter of fact, but still, Scan begins before Look is done). But Look does end before Scan does (roughly 3 seconds). Our condition isn't checked until both commands are done.

Could events or locks or semaphores be used instead? Possibly, but I like to use a condition in that example. You can easily have a lot more tasks without a lot of modifications.

https://en.xdnf.cn/q/72568.html

Related Q&A

displaying newlines in the help message when using pythons optparse

Im using the optparse module for option/argument parsing. For backwards compatibility reasons, I cant use the argparse module. How can I format my epilog message so that newlines are preserved?In th…

How to use a learnable parameter in pytorch, constrained between 0 and 1?

I want to use a learnable parameter that only takes values between 0 and 1. How can I do this in pytorch? Currently I am using: self.beta = Parameter(torch.Tensor(1)) #initialize zeros(self.beta)But I…

generating a CSV file online on Google App Engine

I am using Google App Engine (python), I want my users to be able to download a CSV file generated using some data from the datastore (but I dont want them to download the whole thing, as I re-order th…

Python equivalence of Rs match() for indexing

So i essentially want to implement the equivalent of Rs match() function in Python, using Pandas dataframes - without using a for-loop. In R match() returns a vector of the positions of (first) matches…

Why doesnt Pydantic validate field assignments?

I want to use Pydantic to validate fields in my object, but it seems like validation only happens when I create an instance, but not when I later modify fields. from pydantic import BaseModel, validato…

Format OCR text annotation from Cloud Vision API in Python

I am using the Google Cloud Vision API for Python on a small program Im using. The function is working and I get the OCR results, but I need to format these before being able to work with them.This is …

Does pybtex support accent/special characters in .bib file?

from pybtex.database.input import bibtex parser = bibtex.Parser() bibdata = parser.parse_file("sample.bib")The above code snippet works really well in parsing a .bib file but it seems not to …

How do I count specific values across multiple columns in pandas

I have the DataFrame df = pd.DataFrame({colA:[?,2,3,4,?],colB:[1,2,?,3,4],colC:[?,2,3,4,5] })I would like to get the count the the number of ? in each column and return the following output - colA…

Split Python source into separate directories?

Here are some various Python packages my company "foo.com" uses:com.foo.bar.web com.foo.bar.lib com.foo.zig.web com.foo.zig.lib com.foo.zig.lib.lib1 com.foo.zig.lib.lib2Heres the traditional …

How can I use a raw_input with twisted?

I am aware that raw_input cannot be used in twisted. However here is my desired application.I have an piece of hardware that provides an interactive terminal serial port. I am trying to connect to th…