Running SimpleXMLRPCServer in separate thread and shutting down

2024/11/14 18:14:46

I have a class that I wish to test via SimpleXMLRPCServer in python. The way I have my unit test set up is that I create a new thread, and start SimpleXMLRPCServer in that. Then I run all the test, and finally shut down.

This is my ServerThread:

class ServerThread(Thread):running = Truedef run(self):self.server = #Creates and starts SimpleXMLRPCServerwhile (self.running):self.server.handle_request()def stop(self):self.running = Falseself.server.server_close()

The problem is, that calling ServerThread.stop(), followed by Thread.stop() and Thread.join() will not cause the thread to stop properly if it's already waiting for a request in handle_request. And since there doesn't seem to be any interrupt or timeout mechanisms here that I can use, I am at a loss for how I can cleanly shut down the server thread.

Answer

I had the same problem and after hours of research i solved it by switching from using my own handle_request() loop to serve_forever() to start the server.

serve_forever() starts an internal loop like yours. This loop can be stopped by calling shutdown(). After stopping the loop it is possible to stop the server with server_close().

I don't know why this works and the handle_request() loop don't, but it does ;P

Here is my code:

from threading import Thread
from xmlrpc.server import SimpleXMLRPCServer
from pyWebService.server.service.WebServiceRequestHandler import WebServiceRquestHandlerclass WebServiceServer(Thread):def __init__(self, ip, port):super(WebServiceServer, self).__init__()self.running = Trueself.server = SimpleXMLRPCServer((ip, port),requestHandler=WebServiceRquestHandler)self.server.register_introspection_functions()def register_function(self, function):self.server.register_function(function)def run(self):self.server.serve_forever()def stop_server(self):self.server.shutdown()self.server.server_close()print("starting server")
webService = WebServiceServer("localhost", 8010)
webService.start()
print("stopping server")
webService.stop_server()
webService.join()
print("server stopped")
https://en.xdnf.cn/q/71921.html

Related Q&A

Clean Python multiprocess termination dependant on an exit flag

I am attempting to create a program using multiple processes and I would like to cleanly terminate all the spawned processes if errors occur. below Ive wrote out some pseudo type code for what I think …

Any limitations on platform constraints for wheels on PyPI?

Are there any limitations declared anywhere (PEPs or elsewhere) about how broad a scope the Linux wheels uploaded to PyPI should have? Specifically: is it considered acceptable practice to upload li…

Match set of dictionaries. Most elegant solution. Python

Given two lists of dictionaries, new one and old one. Dictionaries represent the same objects in both lists. I need to find differences and produce new list of dictionaries where will be objects from n…

Cookies using Python and Google App Engine

Im developing an app on the Google App Engine and have run into a problem. I want to add a cookie to each user session so that I will be able to differentiate amongst the current users. I want them all…

Matplotlib animations - how to export them to a format to use in a presentation?

So, I learned how to make cute little animations in matplotlib. For example, this:import numpy as np import matplotlib import matplotlib.pyplot as pltplt.ion()fig = plt.figure() ax = fig.add_subplot(…

Where is python interpreter located in virtualenv?

Where is python intrepreter located in virtual environment ? I am making a GUI project and I stuck while finding the python interpreter in my virtual environment.

Tkinter check which Entry last had focus

I am working on a program that has a virtual keyboard I created using Tkinter. The pages that have the keyboard enabled have entry widgets where the users need to input data. I am using pyautogui as …

Python Popen grep

Id like Popen to execute:grep -i --line-buffered "grave" data/*.txtWhen run from the shell, this gives me the wanted result. If I start, in the very same directory where I test grep, a python…

Url structure and form posts with Flask

In Flask you write the route above the method declaration like so: @app.route(/search/<location>/) def search():return render_template(search.html)However in HTML the form will post to the url in…

How can I simulate a key press in a Python subprocess?

The scenario is, I have a Python script which part of it is to execute an external program using the code below:subprocess.run(["someExternalProgram", "some options"], shell=True)An…