I have a class that I wish to test via SimpleXMLRPCServer in python. The way I have my unit test set up is that I create a new thread, and start SimpleXMLRPCServer in that. Then I run all the test, and finally shut down.
This is my ServerThread:
class ServerThread(Thread):running = Truedef run(self):self.server = #Creates and starts SimpleXMLRPCServerwhile (self.running):self.server.handle_request()def stop(self):self.running = Falseself.server.server_close()
The problem is, that calling ServerThread.stop(), followed by Thread.stop() and Thread.join() will not cause the thread to stop properly if it's already waiting for a request in handle_request. And since there doesn't seem to be any interrupt or timeout mechanisms here that I can use, I am at a loss for how I can cleanly shut down the server thread.
I had the same problem and after hours of research i solved it by switching from using my own handle_request() loop to serve_forever() to start the server.
serve_forever() starts an internal loop like yours. This loop can be stopped by calling shutdown(). After stopping the loop it is possible to stop the server with server_close().
I don't know why this works and the handle_request() loop don't, but it does ;P
Here is my code:
from threading import Thread
from xmlrpc.server import SimpleXMLRPCServer
from pyWebService.server.service.WebServiceRequestHandler import WebServiceRquestHandlerclass WebServiceServer(Thread):def __init__(self, ip, port):super(WebServiceServer, self).__init__()self.running = Trueself.server = SimpleXMLRPCServer((ip, port),requestHandler=WebServiceRquestHandler)self.server.register_introspection_functions()def register_function(self, function):self.server.register_function(function)def run(self):self.server.serve_forever()def stop_server(self):self.server.shutdown()self.server.server_close()print("starting server")
webService = WebServiceServer("localhost", 8010)
webService.start()
print("stopping server")
webService.stop_server()
webService.join()
print("server stopped")