#!/usr/bin/env python3 #################################################################################################### # # Start this script, which will be the producer: # python3 multiprocessing_socketserver.py # # Start a consumer: # nc localhost 1234 # # Watch the logs, which should have all logs across all child processes. # tail -F multiprocessing_socketserver.py.log # # Watch the processes. # watch -n 1 'ps faux 2>/dev/null | grep multiprocessing_socketserver.py' # # Watch network connections. # watch -n 1 'netstat -tunap 2>/dev/null | grep 1234' # #################################################################################################### from logging import Formatter from logging import INFO from logging import getLogger from logging.handlers import QueueHandler from logging.handlers import QueueListener from logging.handlers import TimedRotatingFileHandler from multiprocessing import Event from multiprocessing import Process from multiprocessing import Queue from os import getcwd from os import getpid from os import makedirs from os.path import abspath from os.path import basename from os.path import exists from os.path import join from os.path import realpath from socketserver import BaseRequestHandler from socketserver import ForkingTCPServer from time import sleep USE_FIX = False SERVER_ADDRESS = ('localhost', 1234) LOG = None def wrap_process(stop, func, *func_args, **func_kwargs): """ Simply wraps a function intended to be ran in a new process so that all exceptions are logged. :param Event stop: An event to signal everything should stop when a KeyboardInterrupt happens. :param callable func: The function to call. :param list func_args: Arguments to the function. :param dict func_kwargs: Keyword arguments to the function. """ try: func(*func_args, **func_kwargs) except KeyboardInterrupt: LOG.info('Shutting down.') if stop: stop.set() except BaseException as e: LOG.exception(e) finally: LOG.warning('Process terminating.') def master_logging_process(stop, logging_queue): """ The function that runs in a separate process that performs the real log handling via a QueueListener. :param Event stop: An event to signal everything should stop when a KeyboardInterrupt happens. :param Queue logging_queue: :return: """ print('Master logging process started with PID: {}'.format(getpid())) log_dir = realpath(abspath(join(getcwd()))) if not exists(log_dir): makedirs(log_dir) formatter = Formatter('%(asctime)s' ' %(levelname)-8s' ' %(process)d %(threadName)-10s' ' %(name)s(%(lineno)d)' ': %(message)s') log_file = '{}/{}.log'.format(log_dir, basename(__file__)) real_handler = TimedRotatingFileHandler(log_file, backupCount=24) real_handler.setFormatter(formatter) listener = QueueListener(logging_queue, real_handler) try: listener.start() # Since the QueueListener runs in a separate daemon thread, keep this process alive until # told to terminate via the "stop" event. while not stop.is_set(): sleep(1) finally: # We must stop the listener first because it enqueues a sentinel object for stopping its # background thread. It also performs a "join()" on the thread, so the listener's "stop()" # call will block until that is done. After which we can stop the queue if it has a # callable "stop" attribute. listener.stop() if hasattr(logging_queue, 'stop'): logging_queue.stop() real_handler.close() def setup_logging(stop): global LOG LOG = getLogger(basename(__file__)) LOG.setLevel(INFO) # Create a queue so child processes can send events to the master process. logging_queue = Queue() logging_queue.cancel_join_thread() # Create the master process that will accept all log events and send them to the real logging # handlers. p = Process(name='master_logging_process', target=wrap_process, args=(stop, master_logging_process, stop, logging_queue)) p.start() # Reset the root handler so all sub-loggers send to a QueueHandler. getLogger().handlers = [QueueHandler(logging_queue)] class MyClientHandler(BaseRequestHandler): """ Since everything in this class runs in a sub-process, any log messages are ignored because the QueueHandler does not have a valid worker thread when using the straight forking-based server. """ def __init__(self, request, client_address, server): LOG.info('Starting connection from %s to %s', client_address, server.server_address) BaseRequestHandler.__init__(self, request, client_address, server) def handle(self): try: while not self.server.stop.is_set(): print('{}, LOG.root.handlers[0].queue._thread.is_alive() = {}'.format( getpid(), LOG.root.handlers[0].queue._thread.is_alive())) i = self.server.q.get() msg = 'Consumed from socket: {}'.format(i) LOG.info(msg) self.request.sendall((msg + '\n').encode('UTF-8')) except BrokenPipeError as e: print(str(e)) except KeyboardInterrupt: pass def finish(self): LOG.warning('Closing request from client: %s', self.client_address) self.request.close() BaseRequestHandler.finish(self) class MyForkingTCPServer(ForkingTCPServer): """ Since this uses fork() under the hood, anything logged under the RequestHandlerClass is silently dropped. """ def __init__(self, server_address, RequestHandlerClass, stop, q): self.stop = stop self.q = q ForkingTCPServer.__init__(self, server_address, RequestHandlerClass) LOG.info('Created new server: %s', self.__class__.__name__) if USE_FIX: from socketserver import ProcessingTCPServer class MyProcessingTCPServer(ProcessingTCPServer): """ Since this does not use fork() under the hood, anything logged under the RequestHandlerClass is properly send to the root logger's QueueHandler. """ def __init__(self, server_address, RequestHandlerClass, stop, q): self.stop = stop self.q = q ProcessingTCPServer.__init__(self, server_address, RequestHandlerClass) LOG.info('Created new server: %s', self.__class__.__name__) def start_server(stop, q): if USE_FIX: server = MyProcessingTCPServer(SERVER_ADDRESS, MyClientHandler, stop, q) else: server = MyForkingTCPServer(SERVER_ADDRESS, MyClientHandler, stop, q) while not stop.is_set(): server.serve_forever(1) if stop.is_set(): server.server_close() server.shutdown() def producer_client(stop, q): i = 0 while not stop.is_set(): i += 1 q.put(i) LOG.info('Produced: %d', i) sleep(2) def main(): stop = Event() setup_logging(stop) q = Queue() q.cancel_join_thread() consumer = Process(name='consumer', target=wrap_process, args=(stop, start_server, stop, q)) consumer.start() producer = Process(name='producer', target=wrap_process, args=(stop, producer_client, stop, q)) producer.start() if __name__ == '__main__': main()