I have a service that is running (Twisted jsonrpc server). When I make a call to "run_procs" the service will look at a bunch of objects and inspect their timestamp property to see if they should run. If they should, they get added to a thread_pool (list) and then every item in the thread_pool gets the start() method called.
I have used this setup for several other applications where I wanted to run a function within my class with theading. However, when I am using a subprocess.Popen call in the function called by each thread, the calls run one-at-a-time instead of running concurrently like I would expect.
Here is some sample code:
class ProcService(jsonrpc.JSONRPC):self.thread_pool = []self.running_threads = []self.lock = threading.Lock()def clean_pool(self, thread_pool, join=False):for th in [x for x in thread_pool if not x.isAlive()]:if join: th.join()thread_pool.remove(th)del threturn thread_pooldef run_threads(self, parallel=10):while len(self.running_threads)+len(self.thread_pool) > 0:self.clean_pool(self.running_threads, join=True)n = min(max(parallel - len(self.running_threads), 0), len(self.thread_pool))if n > 0:for th in self.thread_pool[0:n]: th.start()self.running_threads.extend(self.thread_pool[0:n])del self.thread_pool[0:n]time.sleep(.01)for th in self.running_threads+self.thread_pool: th.join()def jsonrpc_run_procs(self):for i, item in enumerate(self.items):if item.should_run():self.thread_pool.append(threading.Thread(target=self.run_proc, args=tuple([item])))self.run_threads(5)def run_proc(self, proc):self.lock.acquire()print "\nSubprocess started"p = subprocess.Popen('%s/program_to_run.py %s' %(os.getcwd(), proc.data), shell=True, stdin=subprocess.PIPE, stdout=subprocess.PIPE,)stdout_value = proc.communicate('through stdin to stdout')[0]self.lock.release()
Any help/suggestions are appreciated.
* EDIT * OK. So now I want to read back the output from the stdout pipe. This works some of the time, but also fails with select.error: (4, 'Interrupted system call') I assume this is because sometimes the process has already terminated before I try to run the communicate method. the code in the run_proc method has been changed to:
def run_proc(self, proc): self.lock.acquire() p = subprocess.Popen( #etc self.running_procs.append([p, proc.data.id]) self.lock.release()
after I call self.run_threads(5) I call self.check_procs()
check_procs method iterates the list of running_procs to check for poll() is not None. How can I get output from pipe? I have tried both of the following
calling check_procs once:def check_procs(self):for proc_details in self.running_procs:proc = proc_details[0]while (proc.poll() == None):time.sleep(0.1)stdout_value = proc.communicate('through stdin to stdout')[0]self.running_procs.remove(proc_details)print proc_details[1], stdout_valuedel proc_details
calling check_procs in while loop like:while len(self.running_procs) > 0:self.check_procs()def check_procs(self):for proc_details in self.running_procs:if (proc.poll() is not None):stdout_value = proc.communicate('through stdin to stdout')[0]self.running_procs.remove(proc_details)print proc_details[1], stdout_valuedel proc_details