I am attempting to create a program using multiple processes and I would like to cleanly terminate all the spawned processes if errors occur. below I've wrote out some pseudo type code for what I think I need to do but I don't know what the best way is to communicate to all the processes that an error has occured and they should terminate.
I think I should be using classes for this sort of thing but I'm quite new to Python so I'm just trying to get my head around the basics first.
#importsexitFlag = True# Function for threads to process
def url_thread_worker( ):
# while exitFlag:try:# do somethingexcept:# we've ran into a problem, we need to kill all the spawned processes and cleanly exit the programexitFlag = Falsedef processStarter( ):process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) ) process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) ) process_1.start()process_2.start()if __name__ == '__main__':processStarter( )
Thanks in advance
Here's my suggestion:
import multiprocessing
import threading
import timedef good_worker(): print "[GoodWorker] Starting"time.sleep(4)print "[GoodWorker] all good"def bad_worker():print "[BadWorker] Starting"time.sleep(2)raise Exception("ups!")class MyProcManager(object):def __init__(self):self.procs = []self.errors_flag = Falseself._threads = []self._lock = threading.Lock()def terminate_all(self):with self._lock:for p in self.procs:if p.is_alive():print "Terminating %s" % pp.terminate()def launch_proc(self, func, args=(), kwargs= {}):t = threading.Thread(target=self._proc_thread_runner,args=(func, args, kwargs))self._threads.append(t)t.start()def _proc_thread_runner(self, func, args, kwargs):p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)self.procs.append(p)p.start()while p.exitcode is None:p.join()if p.exitcode > 0:self.errors_flag = Trueself.terminate_all()def wait(self):for t in self._threads:t.join()if __name__ == '__main__':proc_manager = MyProcManager()proc_manager.launch_proc(good_worker) proc_manager.launch_proc(good_worker) proc_manager.launch_proc(bad_worker) proc_manager.wait()if proc_manager.errors_flag:print "Errors flag is set: some process crashed"else:print "Everything closed cleanly"
You need to have a wrapper thread for each process run, that waits for its end.
When a process ends, check for the exitcode: if > 0, means it raised some unhandled exception. Now call terminate_all() to close all remaining active processes.
The wrapper threads will also finish as they are dependent on the process run.
Also, in your code you're completely free to call proc_manager.terminate_all() whenever you want. You can be checking for some flags in a different thread or something like that..
Hope it's good for your case.
PS: btw.. in your original code you did something like an global exit_flag: you can never have a "global" exit_flag in multiprocessing because it simply ain't global as you are using separated processes with separated memory spaces. That only works in threaded environments where state can be shared. If you need it in multiprocessing then you must have explicit communication between processes (Pipe and Queue accomplish that) or something like shared memory objects