#!/usr/bin/python3 import unittest from unittest.mock import patch import queue import multiprocessing import logging import logging.handlers import time class DeterministicQueueListener(logging.handlers.QueueListener): """ This fixes dropped log messages of the base QueueListener. It does away with the double dequeue loop and _stop flag. """ def _monitor(self): q = self.queue has_task_done = hasattr(q, 'task_done') while True: try: record = self.dequeue(True) if record is self._sentinel: break self.handle(record) if has_task_done: q.task_done() except queue.Empty: break class TestMultilogging(unittest.TestCase): """ Tests demonstrate that the standard lib QueueListener drops some log messages while a patched QueueListener with simplified _monitor thread handles all log messages. """ repeat = 20 @staticmethod def setup_and_log_to_queue_logger(log_queue, listener, id): """ Creates a logger with a QueueHandler that logs to a queue read by a QueueListener. Starts the listener, logs five messages, and stops the queue listener """ logger = logging.getLogger("test_logger_with_id_".format(id)) logger.setLevel(logging.DEBUG) queue_handler = logging.handlers.QueueHandler(log_queue) logger.addHandler(queue_handler) queue_listener = listener(log_queue) queue_listener.start() logger.info("one") logger.info("two") logger.info("three") logger.info("four") logger.info("five") queue_listener.stop() @patch.object(logging.handlers.QueueListener, 'handle') def test_handle_called_for_standard_lib_queue_listener_with_queue_queue(self, mock_handle): for i in range(self.repeat): log_queue = queue.Queue() self.setup_and_log_to_queue_logger(log_queue, logging.handlers.QueueListener, '{}_{}'.format(self.id(), i)) self.assertEqual(mock_handle.call_count, 5 * self.repeat, 'actual vs expected number of handled log messages') @patch.object(logging.handlers.QueueListener, 'handle') def test_handle_called_for_standard_lib_queue_listener_with_multiprocessing_queue(self, mock_handle): for i in range(self.repeat): log_queue = multiprocessing.Queue() self.setup_and_log_to_queue_logger(log_queue, logging.handlers.QueueListener, '{}_{}'.format(self.id(), i)) self.assertEqual(mock_handle.call_count, 5 * self.repeat, 'actual vs expected number of handled log messages') @patch.object(DeterministicQueueListener, 'handle') def test_handle_called_for_deterministic_queue_listener_with_multiprocessing_queue(self, mock_handle): for i in range(self.repeat): log_queue = multiprocessing.Queue() self.setup_and_log_to_queue_logger(log_queue, DeterministicQueueListener, '{}_{}'.format(self.id(), i)) self.assertEqual(mock_handle.call_count, 5 * self.repeat, 'actual vs expected number of handled log messages') @staticmethod def get_all_from_queue(log_queue): list = [] try: while True: yield log_queue.get_nowait() except queue.Empty: return list def test_no_messages_in_queue_after_stop(self): """ Five messages are logged then the QueueListener is stopped. This test then waits one second then gets everything off the queue. Failure of this test indicates that messages were not registered on the queue until _after_ the QueueListener stopped """ for i in range(self.repeat): log_queue = multiprocessing.Queue() self.setup_and_log_to_queue_logger(log_queue, logging.handlers.QueueListener, '{}_{}'.format(self.id(), i)) #time.sleep(1) items_from_queue = list(self.get_all_from_queue(log_queue)) self.assertTrue(items_from_queue == [] or items_from_queue == [logging.handlers.QueueListener._sentinel], "should have been no messages in queue. Found: {}".format( [m.msg if isinstance(m, logging.LogRecord) else str(m) for m in items_from_queue])) if __name__ == '__main__': unittest.main()