Python: using threads to call subprocess.Popen multiple times

2024/10/8 0:35:21

I have a service that is running (Twisted jsonrpc server). When I make a call to "run_procs" the service will look at a bunch of objects and inspect their timestamp property to see if they should run. If they should, they get added to a thread_pool (list) and then every item in the thread_pool gets the start() method called.

I have used this setup for several other applications where I wanted to run a function within my class with theading. However, when I am using a subprocess.Popen call in the function called by each thread, the calls run one-at-a-time instead of running concurrently like I would expect.

Here is some sample code:

class ProcService(jsonrpc.JSONRPC):self.thread_pool = []self.running_threads = []self.lock = threading.Lock()def clean_pool(self, thread_pool, join=False):for th in [x for x in thread_pool if not x.isAlive()]:if join: th.join()thread_pool.remove(th)del threturn thread_pooldef run_threads(self, parallel=10):while len(self.running_threads)+len(self.thread_pool) > 0:self.clean_pool(self.running_threads, join=True)n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))if n > 0:for th in self.thread_pool[0:n]: th.start()self.running_threads.extend(self.thread_pool[0:n])del self.thread_pool[0:n]time.sleep(.01)for th in self.running_threads+self.thread_pool: th.join()def jsonrpc_run_procs(self):for i, item in enumerate(self.items):if item.should_run():self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))self.run_threads(5)def run_proc(self, proc):self.lock.acquire()print "\nSubprocess started"p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)stdout_value = proc.communicate('through stdin to stdout')[0]self.lock.release()

Any help/suggestions are appreciated.

* EDIT * OK. So now I want to read back the output from the stdout pipe. This works some of the time, but also fails with select.error: (4, 'Interrupted system call') I assume this is because sometimes the process has already terminated before I try to run the communicate method. the code in the run_proc method has been changed to:

def run_proc(self, proc): self.lock.acquire() p = subprocess.Popen( #etc self.running_procs.append([p, proc.data.id]) self.lock.release()

after I call self.run_threads(5) I call self.check_procs()

check_procs method iterates the list of running_procs to check for poll() is not None. How can I get output from pipe? I have tried both of the following

calling check_procs once:def check_procs(self):for proc_details in self.running_procs:proc = proc_details[0]while (proc.poll() == None):time.sleep(0.1)stdout_value = proc.communicate('through stdin to stdout')[0]self.running_procs.remove(proc_details)print proc_details[1], stdout_valuedel proc_details

calling check_procs in while loop like:while len(self.running_procs) > 0:self.check_procs()def check_procs(self):for proc_details in self.running_procs:if (proc.poll() is not None):stdout_value = proc.communicate('through stdin to stdout')[0]self.running_procs.remove(proc_details)print proc_details[1], stdout_valuedel proc_details
Answer

I think the key code is:

    self.lock.acquire()print "\nSubprocess started"p = subprocess.Popen( # etcstdout_value = proc.communicate('through stdin to stdout')[0]self.lock.release()

the explicit calls to acquire and release should guarantee serialization -- don't you observe serialization just as invariably if you do other things in this block instead of the subprocess use?

Edit: all silence here, so I'll add the suggestion to remove the locking and instead put each stdout_value on a Queue.Queue() instance -- Queue is intrinsicaly threadsafe (deals with its own locking) so you can get (or get_nowait, etc etc) results from it once they're ready and have been put there. In general, Queue is the best way to arrange thread communication (and often synchronization too) in Python, any time it can be feasibly arranged to do things that way.

Specifically: add import Queue at the start; give up making, acquiring and releasing self.lock (just delete those three lines); add self.q = Queue.Queue() to the __init__; right after the call stdout_value = proc.communicate(... add one statement self.q.put(stdout_value); now e.g finish the jsonrpc_run_procs method with

while not self.q.empty():result = self.q.get()print 'One result is %r' % result

to confirm that all the results are there. (Normally the empty method of queues is not reliable, but in this case all threads putting to the queue are already finished, so you should be fine).

https://en.xdnf.cn/q/118760.html

Related Q&A

Find a substring [closed]

Closed. This question needs to be more focused. It is not currently accepting answers.Want to improve this question? Update the question so it focuses on one problem only by editing this post.Closed 5…

Image processing with single to multiple images

I have an Image showing below: I need to crop the order using python coding. What I need is only the card. So I want to crop the border. How to do it??This is the output I got using the code mentione…

SQLAlchemy Automap not loading table

I am using SQLAlchemy version 2.0.19 (latest public release). I am trying to map existing tables as documented in https://docs.sqlalchemy.org/en/20/orm/extensions/automap.html#basic-use I created a SQL…

Create a base 12 calculator with different limits at diferent digits with python

I want o create a calculator that can add (and multiply, divide, etc) numbers in base 12 and with different limits at the different digits.Base 12 sequence: [0,1,2,3,4,5,6,7,8,9,"A","B&q…

Python Program to check if a number is armstrong or not is not working, what am I doing wrong?

n=int(input("Enter a Number: ")) x=0 y=0 z=0while(n>0):x=n%10y=x**3z=z+yn=n//10print (z) #The z here is the same value which I enter, yet it doesnt work. #If I enter 407 as n, z becomes (4…

Python(Scrapy) unpredictable mistake with import load_entry_point

I have such problem, I did nothing with Python or Scrapy, but when I started today my computer I got such error. I have found many different posts and tried some tips and advices, unfortunately, they a…

Debugging RadioButtons program in Python

from Tkinter import *class Application (Frame):def __init__(self, master):Frame.__init__(self, master)self.grid()self.create_widgets()def create_widgets(self):Label(self, text = "Select the last b…

Why am I getting an Internal Server error

My python script runs just fine on the Apache server locally set up on my computer, however, on importing the json2html library I am getting an internal server errorThe moment I comment the import stat…

Save pixel data in txt format in PIL

My program is to extract the pixel from an image and to save the pixel data in the text file for analysis. My picture is a binary image that gives only 255 and 0 sHere is the program:from PIL import Im…

ValueError: view limit minimum 0.0 is less than 1 and is an invalid Matplotlib date value

Ive been given the python script where matplotlib is used , when running the script it opens the window and display graph. its working perfectly on my laptop. But this error occurs when I upload the fi…