lazy processpoolexecutor in Python?

2024/11/18 21:36:59

I have a large number of tasks that I want to execute and make the results available via a generator. However, using a ProcessPoolExecutor and as_completed will evaluate the results greedily and store them all in memory. Is there a way to block after a certain number of results are stored in the generator?

Answer

The idea for this is to split what you want to process in chunks, I'll be using almost the same example than in the ProcessPoolExecutor documentation:

import concurrent.futures
import math
import itertools as itPRIMES = [293,171,293,773,99,5419,293,171,293,773,99,5419,293,171,293,773,99,5419]def is_prime(n):if n % 2 == 0:return Falsesqrt_n = int(math.floor(math.sqrt(n)))for i in range(3, sqrt_n + 1, 2):if n % i == 0:return Falsereturn Truedef main():with concurrent.futures.ProcessPoolExecutor() as executor:for number, prime in zip(PRIMES, executor.map(is_prime, PRIMES)):print('%d is prime: %s' % (number, prime))def main_lazy():chunks = map(lambda x: it.islice(PRIMES, x, x+4), range(0, len(PRIMES), 4))with concurrent.futures.ProcessPoolExecutor() as executor:results = zip(PRIMES, it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), chunks)))for number, prime in (next(results) for _ in range(4)):print('%d is prime: %s' % (number, prime))if __name__ == "__main__":main_lazy()

Notice the differences between main and main_lazy, let's explain this a bit:

Instead of having a list of all what we want to process I split it into chunks of size 4 (it's useful to use itertools.islice), the idea is that instead of mapping with the executor the whole list we will be mapping the chunks. Then just using python3 lazy map we can map that executor call lazily to each of the chunks. So, we know that executor.map is not lazy so that chunk will be evaluated immediately when we request it, but till we don't request the other chunks the executor.map for that chunks will not be called. As you can see I'm only requesting the first 4 elements from the whole list of results, but since I also used itertools.chain it will just consume the ones from the first chunk, without calculating the rest of the iterable.

So, since you wanted to return a generator, it would be as easy as return the results from the main_lazy function, you can even abstract the chunk size (probably you would need a good function to get the propper chunks, but this is out of scope):

def main_lazy(chunk_size):chunks = map(lambda x: it.islice(PRIMES, x, x+chunk_size), range(0, len(PRIMES), chunk_size))with concurrent.futures.ProcessPoolExecutor() as executor:results = zip(PRIMES, it.chain.from_iterable(map(lambda x: executor.map(is_prime, x), chunks)))return results
https://en.xdnf.cn/q/71014.html

Related Q&A

error occurs when installing cryptography for scrapy in virtualenv on OS X [closed]

Closed. This question needs debugging details. It is not currently accepting answers.Edit the question to include desired behavior, a specific problem or error, and the shortest code necessary to repro…

Can Python do DI seamlessly without relying on a service locator?

Im coming from the C# world, so my views may be a little skewed. Im looking to do DI in Python, however Im noticing a trend with libraries where they all appear to rely on a service locator. That is, y…

Producing pdf report from python with bullet points

I would like to produce a dynamic pdf document from a python script that looks like the image below. Each sentence starts with a bullet point, and the text and number of lines depends on what the user …

Converting bits to bytes in Python

I am trying to convert a bit string into a byte string, in Python 3.x. In each byte, bits are filled from high order to low order. The last byte is filled with zeros if necessary. The bit string is ini…

Install python package from private pypiserver

I have setup a pypiserver behind an nginx proxy which uses htpasswd for authentication. I am currently able to upload sdists, but I cant figure out how to download them. I want to be able to download t…

Matplotlib Table- Assign different text alignments to different columns

I am creating a two column table and want the text to be as close as possible. How can I specify that the first column be right aligned and the second be left aligned?Ive tried by setting the general …

How would I make a random hexdigit code generator using .join and for loops?

I am new to programming and one assignment I have to do is create a random hexdigit colour code generator using for loops and .join. Is my program below even close to how you do it, or is it completely…

Multiline python regex

I have a file structured like this : A: some text B: more text even more text on several lines A: and we start again B: more text more multiline textIm trying to find the regex that will split my file …

How can I process xml asynchronously in python?

I have a large XML data file (>160M) to process, and it seems like SAX/expat/pulldom parsing is the way to go. Id like to have a thread that sifts through the nodes and pushes nodes to be processed …

python postgresql: reliably check for updates in a specific table

Situation: I have a live trading script which computes all sorts of stuff every x minutes in my main thread (Python). the order sending is performed through such thread. the reception and execution of …