How to share state when using concurrent futures

2024/10/3 23:25:36

I am aware using the traditional multiprocessing library I can declare a value and share the state between processes.

https://docs.python.org/3/library/multiprocessing.html?highlight=multiprocessing#sharing-state-between-processes

When using the newer concurrent.futures library how can I share state between my processes?

import concurrent.futuresdef get_user_object(batch):# do some workcounter = counter + 1print(counter)def do_multithreading(batches):with concurrent.futures.ThreadPoolExecutor(max_workers=25) as executor:threadingResult = executor.map(get_user_object, batches)def run():data_pools = get_data()start = time.time()with concurrent.futures.ProcessPoolExecutor(max_workers=PROCESSES) as executor:processResult = executor.map(do_multithreading, data_pools)end = time.time()print("TIME TAKEN:", end - start)if __name__ == '__main__':run()

I want to keep a synchronized value of this counter.

In the previous library I might have used multiprocessing.Value and a Lock.

Answer

You can pass an initializer and initargs to ProcessPoolExecutor just as you would to multiprocessing.Pool. Here's an example:

import concurrent.futures
import multiprocessing as mpdef get_user_object(batch):with _COUNTER.get_lock():_COUNTER.value += 1print(_COUNTER.value, end=' ')def init_globals(counter):global _COUNTER_COUNTER = counterdef main():counter = mp.Value('i', 0)with concurrent.futures.ProcessPoolExecutor(initializer=init_globals, initargs=(counter,)) as executor:for _ in executor.map(get_user_object, range(10)):passprint()if __name__ == "__main__":import syssys.exit(main())

Use:

$ python3 glob_counter.py 
1 2 4 3 5 6 7 8 10 9 

Where:

  • for _ in executor.map(get_user_object, range(10)): lets you iterate over each result. In this case, get_user_object() returns None, so you don't really have anything to process; you just pass and take no further action.
  • The last print() call gives you an extra newline, because the original print() call does not use a newline (end=' '')
https://en.xdnf.cn/q/70672.html

Related Q&A

Does IronPython implement python standard library?

I tried IronPython some time ago and it seemed that it implements only python language, and uses .NET for libraries. Is this still the case? Can one use python modules from IronPython?

finding the last occurrence of an item in a list python

I wish to find the last occurrence of an item x in sequence s, or to return None if there is none and the position of the first item is equal to 0This is what I currently have:def PositionLast (x,s):co…

pandas cut a series with nan values

I would like to apply the pandas cut function to a series that includes NaNs. The desired behavior is that it buckets the non-NaN elements and returns NaN for the NaN-elements.import pandas as pd numbe…

Using Selenium with PyCharm CE

Im trying to use Selenium with PyCharm CE. I have installed Selenium using pip install Selenium and Im able to use it via the terminal however when I try to use it with PyCharm I get an import error Im…

Reusing generator expressions

Generator expressions is an extremely useful tool, and has a huge advantage over list comprehensions, which is the fact that it does not allocate memory for a new array.The problem I am facing with gen…

ModuleNotFoundError: No module named librosa

Currently I am working on voice recognition where I wanted to use Librosa library. I install librosa with the command on ubuntu: conda install -c conda-forge librosaBut when I run the code I got the fo…

Python - Convert Very Large (6.4GB) XML files to JSON

Essentially, I have a 6.4GB XML file that Id like to convert to JSON then save it to disk. Im currently running OSX 10.8.4 with an i7 2700k and 16GBs of ram, and running Python 64bit (double checked). …

Python create tree from a JSON file

Lets say that we have the following JSON file. For the sake of the example its emulated by a string. The string is the input and a Tree object should be the output. Ill be using the graphical notation …

disable `functools.lru_cache` from inside function

I want to have a function that can use functools.lru_cache, but not by default. I am looking for a way to use a function parameter that can be used to disable the lru_cache. Currently, I have a two ver…

How to clear tf.flags?

If I run this code twice:tf.flags.DEFINE_integer("batch_size", "2", "batch size for training")I will get this error:DuplicateFlagError: The flag batch_size is defined twic…