Clean Python multiprocess termination dependant on an exit flag

2024/9/22 17:38:08

I am attempting to create a program using multiple processes and I would like to cleanly terminate all the spawned processes if errors occur. below I've wrote out some pseudo type code for what I think I need to do but I don't know what the best way is to communicate to all the processes that an error has occured and they should terminate.

I think I should be using classes for this sort of thing but I'm quite new to Python so I'm just trying to get my head around the basics first.

#importsexitFlag = True# Function for threads to process   
def url_thread_worker( ):   
# while exitFlag:try:# do somethingexcept:# we've ran into a problem, we need to kill all the spawned processes and cleanly exit the programexitFlag = Falsedef processStarter( ):process_1 = multiprocessing.Process( name="Process-1", target=url_thread_worker, args=( ) ) process_2 = multiprocessing.Process( name="Process-2", target=url_thread_worker, args=( ) ) process_1.start()process_2.start()if __name__ == '__main__':processStarter( )

Thanks in advance

Answer

Here's my suggestion:

import multiprocessing
import threading
import timedef good_worker():   print "[GoodWorker] Starting"time.sleep(4)print "[GoodWorker] all good"def bad_worker():print "[BadWorker] Starting"time.sleep(2)raise Exception("ups!")class MyProcManager(object):def __init__(self):self.procs = []self.errors_flag = Falseself._threads = []self._lock = threading.Lock()def terminate_all(self):with self._lock:for p in self.procs:if p.is_alive():print "Terminating %s" % pp.terminate()def launch_proc(self, func, args=(), kwargs= {}):t = threading.Thread(target=self._proc_thread_runner,args=(func, args, kwargs))self._threads.append(t)t.start()def _proc_thread_runner(self, func, args, kwargs):p = multiprocessing.Process(target=func, args=args, kwargs=kwargs)self.procs.append(p)p.start()while p.exitcode is None:p.join()if p.exitcode > 0:self.errors_flag = Trueself.terminate_all()def wait(self):for t in self._threads:t.join()if __name__ == '__main__':proc_manager = MyProcManager()proc_manager.launch_proc(good_worker) proc_manager.launch_proc(good_worker) proc_manager.launch_proc(bad_worker) proc_manager.wait()if proc_manager.errors_flag:print "Errors flag is set: some process crashed"else:print "Everything closed cleanly"

You need to have a wrapper thread for each process run, that waits for its end. When a process ends, check for the exitcode: if > 0, means it raised some unhandled exception. Now call terminate_all() to close all remaining active processes. The wrapper threads will also finish as they are dependent on the process run.

Also, in your code you're completely free to call proc_manager.terminate_all() whenever you want. You can be checking for some flags in a different thread or something like that..

Hope it's good for your case.

PS: btw.. in your original code you did something like an global exit_flag: you can never have a "global" exit_flag in multiprocessing because it simply ain't global as you are using separated processes with separated memory spaces. That only works in threaded environments where state can be shared. If you need it in multiprocessing then you must have explicit communication between processes (Pipe and Queue accomplish that) or something like shared memory objects

https://en.xdnf.cn/q/71920.html

Related Q&A

Any limitations on platform constraints for wheels on PyPI?

Are there any limitations declared anywhere (PEPs or elsewhere) about how broad a scope the Linux wheels uploaded to PyPI should have? Specifically: is it considered acceptable practice to upload li…

Match set of dictionaries. Most elegant solution. Python

Given two lists of dictionaries, new one and old one. Dictionaries represent the same objects in both lists. I need to find differences and produce new list of dictionaries where will be objects from n…

Cookies using Python and Google App Engine

Im developing an app on the Google App Engine and have run into a problem. I want to add a cookie to each user session so that I will be able to differentiate amongst the current users. I want them all…

Matplotlib animations - how to export them to a format to use in a presentation?

So, I learned how to make cute little animations in matplotlib. For example, this:import numpy as np import matplotlib import matplotlib.pyplot as pltplt.ion()fig = plt.figure() ax = fig.add_subplot(…

Where is python interpreter located in virtualenv?

Where is python intrepreter located in virtual environment ? I am making a GUI project and I stuck while finding the python interpreter in my virtual environment.

Tkinter check which Entry last had focus

I am working on a program that has a virtual keyboard I created using Tkinter. The pages that have the keyboard enabled have entry widgets where the users need to input data. I am using pyautogui as …

Python Popen grep

Id like Popen to execute:grep -i --line-buffered "grave" data/*.txtWhen run from the shell, this gives me the wanted result. If I start, in the very same directory where I test grep, a python…

Url structure and form posts with Flask

In Flask you write the route above the method declaration like so: @app.route(/search/<location>/) def search():return render_template(search.html)However in HTML the form will post to the url in…

How can I simulate a key press in a Python subprocess?

The scenario is, I have a Python script which part of it is to execute an external program using the code below:subprocess.run(["someExternalProgram", "some options"], shell=True)An…

Difference between pd.merge() and dataframe.merge()

Im wondering what the difference is when you merge by pd.merge versus dataframe.merge(), examples below:pd.merge(dataframe1, dataframe2)anddataframe1.merge(dataframe2)