diff -r e4fe76719fc0 Lib/test/test_logging.py --- a/Lib/test/test_logging.py Sat Jan 05 23:20:01 2008 +0100 +++ b/Lib/test/test_logging.py Sun Jan 06 05:46:59 2008 +0100 @@ -25,183 +25,280 @@ Copyright (C) 2001-2002 Vinay Sajip. All """ import select -import os, sys, string, struct, types, cPickle, cStringIO +import unittest +import os, sys, string, struct, types, cPickle, cStringIO, re import socket, tempfile, threading, time +import weakref, gc, copy +import textwrap import logging, logging.handlers, logging.config -from test.test_support import run_with_locale - -BANNER = "-- %-10s %-6s ---------------------------------------------------\n" - -FINISH_UP = "Finish up, it's closing time. Messages should bear numbers 0 through 24." - -#---------------------------------------------------------------------------- -# Log receiver -#---------------------------------------------------------------------------- - -TIMEOUT = 10 - from SocketServer import ThreadingTCPServer, StreamRequestHandler -class LogRecordStreamHandler(StreamRequestHandler): +from test.test_support import run_with_locale, run_unittest + +tests = [] # list to accumulate all tests + + +class BaseTest(unittest.TestCase): """ - Handler for a streaming logging request. It basically logs the record - using whatever logging policy is configured locally. + Base class for logging tests. """ - def handle(self): + log_format = "%(name)s -> %(levelname)s: %(message)s" + expected_log_pat = r"^([\w.]+) -> ([\w]+): ([\d]+)$" + msgnum = 0 + + def setUp(self): """ - Handle multiple requests - each expected to be a 4-byte length, - followed by the LogRecord in pickle format. Logs the record - according to whatever policy is configured locally. + Setup the default logging stream to an internal StringIO instance, so + that we can examine log output as we want. """ - while 1: - try: - chunk = self.connection.recv(4) - if len(chunk) < 4: - break - slen = struct.unpack(">L", chunk)[0] - chunk = self.connection.recv(slen) - while len(chunk) < slen: - chunk = chunk + self.connection.recv(slen - len(chunk)) - obj = self.unPickle(chunk) - record = logging.makeLogRecord(obj) - self.handleLogRecord(record) - except: - raise + loggerDict = logging.getLogger().manager.loggerDict + logging._acquireLock() + try: + self.saved_handlers = logging._handlers.copy() + self.saved_handler_list = logging._handlerList[:] + self.saved_loggers = loggerDict.copy() + self.saved_level_names = logging._levelNames.copy() + finally: + logging._releaseLock() - def unPickle(self, data): - return cPickle.loads(data) + self.root_logger = logging.getLogger("") + self.original_logging_level = self.root_logger.getEffectiveLevel() - def handleLogRecord(self, record): - logname = "logrecv.tcp." + record.name - #If the end-of-messages sentinel is seen, tell the server to terminate - if record.msg == FINISH_UP: - self.server.abort = 1 - record.msg = record.msg + " (via " + logname + ")" - logger = logging.getLogger(logname) - logger.handle(record) + self.stream = cStringIO.StringIO() + self.root_logger.setLevel(logging.DEBUG) + self.root_hdlr = logging.StreamHandler(self.stream) + self.root_formatter = logging.Formatter(self.log_format) + self.root_hdlr.setFormatter(self.root_formatter) + self.root_logger.addHandler(self.root_hdlr) -# The server sets socketDataProcessed when it's done. -socketDataProcessed = threading.Event() + def tearDown(self): + """ + Remove our logging stream, and restore the original logging level. + """ + self.stream.close() + self.root_logger.removeHandler(self.root_hdlr) + self.root_logger.setLevel(self.original_logging_level) + logging._acquireLock() + try: + logging._levelNames.clear() + logging._levelNames.update(self.saved_level_names) + logging._handlers.clear() + logging._handlers.update(self.saved_handlers) + logging._handlerList[:] = self.saved_handler_list + loggerDict = logging.getLogger().manager.loggerDict + loggerDict.clear() + loggerDict.update(self.saved_loggers) + finally: + logging._releaseLock() -class LogRecordSocketReceiver(ThreadingTCPServer): + def assertLogLines(self, expected_values, stream=None): + """ + Match the collected log lines against the regular expression + self.expected_log_pat, and compare the extracted group values to + the expected_values list of tuples. + """ + stream = stream or self.stream + pat = re.compile(self.expected_log_pat) + stream.reset() + actual_lines = stream.readlines() + self.assertEquals(len(actual_lines), len(expected_values)) + for actual, expected in zip(actual_lines, expected_values): + match = pat.search(actual) + if not match: + self.fail("Log line does not match expected pattern:\n" + actual) + self.assertEquals(tuple(match.groups()), expected) + s = stream.read() + if s: + self.fail("Remaining output at end of log stream:\n" + s) + + def next_message(self): + """ + Generate a message consisting solely of an auto-incrementing integer. + """ + self.msgnum += 1 + return "%d" % self.msgnum + + +class BuiltinLevelsTest(BaseTest): """ - A simple-minded TCP socket-based logging receiver suitable for test - purposes. + Test builtin levels and their inheritance. """ - allow_reuse_address = 1 + def test_flat(self): + """ + Logging levels in a flat logger namespace. + """ + m = self.next_message - def __init__(self, host='localhost', - port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, - handler=LogRecordStreamHandler): - ThreadingTCPServer.__init__(self, (host, port), handler) - self.abort = 0 - self.timeout = 1 + ERR = logging.getLogger("ERR") + ERR.setLevel(logging.ERROR) + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + DEB = logging.getLogger("DEB") + DEB.setLevel(logging.DEBUG) - def serve_until_stopped(self): - while not self.abort: - rd, wr, ex = select.select([self.socket.fileno()], [], [], - self.timeout) - if rd: - self.handle_request() - #notify the main thread that we're about to exit - socketDataProcessed.set() - # close the listen socket - self.server_close() + # These should log + ERR.log(logging.FATAL, m()) + ERR.error(m()) - def process_request(self, request, client_address): - #import threading - t = threading.Thread(target = self.finish_request, - args = (request, client_address)) - t.start() + INF.log(logging.FATAL, m()) + INF.error(m()) + INF.warn(m()) + INF.info(m()) -def runTCP(tcpserver): - tcpserver.serve_until_stopped() + DEB.log(logging.FATAL, m()) + DEB.error(m()) + DEB.warn (m()) + DEB.info (m()) + DEB.debug(m()) -#---------------------------------------------------------------------------- -# Test 0 -#---------------------------------------------------------------------------- + # These should not log + ERR.warn(m()) + ERR.info(m()) + ERR.debug(m()) -msgcount = 0 + INF.debug(m()) -def nextmessage(): - global msgcount - rv = "Message %d" % msgcount - msgcount = msgcount + 1 - return rv + self.assertLogLines([ + ('ERR', 'CRITICAL', '1'), + ('ERR', 'ERROR', '2'), + ('INF', 'CRITICAL', '3'), + ('INF', 'ERROR', '4'), + ('INF', 'WARNING', '5'), + ('INF', 'INFO', '6'), + ('DEB', 'CRITICAL', '7'), + ('DEB', 'ERROR', '8'), + ('DEB', 'WARNING', '9'), + ('DEB', 'INFO', '10'), + ('DEB', 'DEBUG', '11'), + ]) -def test0(): - ERR = logging.getLogger("ERR") - ERR.setLevel(logging.ERROR) - INF = logging.getLogger("INF") - INF.setLevel(logging.INFO) - INF_ERR = logging.getLogger("INF.ERR") - INF_ERR.setLevel(logging.ERROR) - DEB = logging.getLogger("DEB") - DEB.setLevel(logging.DEBUG) + def test_nested_explicit(self): + """ + Logging levels in a nested namespace, all explicitly set. + """ + m = self.next_message - INF_UNDEF = logging.getLogger("INF.UNDEF") - INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") - UNDEF = logging.getLogger("UNDEF") + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) - GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") - CHILD = logging.getLogger("INF.BADPARENT") + # These should log + INF_ERR.log(logging.FATAL, m()) + INF_ERR.error(m()) - #These should log - ERR.log(logging.FATAL, nextmessage()) - ERR.error(nextmessage()) + # These should not log + INF_ERR.warn(m()) + INF_ERR.info(m()) + INF_ERR.debug(m()) - INF.log(logging.FATAL, nextmessage()) - INF.error(nextmessage()) - INF.warn(nextmessage()) - INF.info(nextmessage()) + self.assertLogLines([ + ('INF.ERR', 'CRITICAL', '1'), + ('INF.ERR', 'ERROR', '2'), + ]) - INF_UNDEF.log(logging.FATAL, nextmessage()) - INF_UNDEF.error(nextmessage()) - INF_UNDEF.warn (nextmessage()) - INF_UNDEF.info (nextmessage()) + def test_nested_inherited(self): + """ + Logging levels in a nested namespace, inherited from parent loggers. + """ + m = self.next_message - INF_ERR.log(logging.FATAL, nextmessage()) - INF_ERR.error(nextmessage()) + INF = logging.getLogger("INF") + INF.setLevel(logging.INFO) + INF_ERR = logging.getLogger("INF.ERR") + INF_ERR.setLevel(logging.ERROR) + INF_UNDEF = logging.getLogger("INF.UNDEF") + INF_ERR_UNDEF = logging.getLogger("INF.ERR.UNDEF") + UNDEF = logging.getLogger("UNDEF") - INF_ERR_UNDEF.log(logging.FATAL, nextmessage()) - INF_ERR_UNDEF.error(nextmessage()) + # These should log + INF_UNDEF.log(logging.FATAL, m()) + INF_UNDEF.error(m()) + INF_UNDEF.warn (m()) + INF_UNDEF.info (m()) + INF_ERR_UNDEF.log(logging.FATAL, m()) + INF_ERR_UNDEF.error(m()) - DEB.log(logging.FATAL, nextmessage()) - DEB.error(nextmessage()) - DEB.warn (nextmessage()) - DEB.info (nextmessage()) - DEB.debug(nextmessage()) + # These should not log + INF_UNDEF.debug(m()) + INF_ERR_UNDEF.warn(m()) + INF_ERR_UNDEF.info(m()) + INF_ERR_UNDEF.debug(m()) - UNDEF.log(logging.FATAL, nextmessage()) - UNDEF.error(nextmessage()) - UNDEF.warn (nextmessage()) - UNDEF.info (nextmessage()) + self.assertLogLines([ + ('INF.UNDEF', 'CRITICAL', '1'), + ('INF.UNDEF', 'ERROR', '2'), + ('INF.UNDEF', 'WARNING', '3'), + ('INF.UNDEF', 'INFO', '4'), + ('INF.ERR.UNDEF', 'CRITICAL', '5'), + ('INF.ERR.UNDEF', 'ERROR', '6'), + ]) - GRANDCHILD.log(logging.FATAL, nextmessage()) - CHILD.log(logging.FATAL, nextmessage()) + def test_nested_with_virtual_parent(self): + """ + Logging levels when some parent does not exist yet. + """ + m = self.next_message - #These should not log - ERR.warn(nextmessage()) - ERR.info(nextmessage()) - ERR.debug(nextmessage()) + INF = logging.getLogger("INF") + GRANDCHILD = logging.getLogger("INF.BADPARENT.UNDEF") + CHILD = logging.getLogger("INF.BADPARENT") + INF.setLevel(logging.INFO) - INF.debug(nextmessage()) - INF_UNDEF.debug(nextmessage()) + # These should log + GRANDCHILD.log(logging.FATAL, m()) + GRANDCHILD.info(m()) + CHILD.log(logging.FATAL, m()) + CHILD.info(m()) - INF_ERR.warn(nextmessage()) - INF_ERR.info(nextmessage()) - INF_ERR.debug(nextmessage()) - INF_ERR_UNDEF.warn(nextmessage()) - INF_ERR_UNDEF.info(nextmessage()) - INF_ERR_UNDEF.debug(nextmessage()) + # These should not log + GRANDCHILD.debug(m()) + CHILD.debug(m()) - INF.info(FINISH_UP) + self.assertLogLines([ + ('INF.BADPARENT.UNDEF', 'CRITICAL', '1'), + ('INF.BADPARENT.UNDEF', 'INFO', '2'), + ('INF.BADPARENT', 'CRITICAL', '3'), + ('INF.BADPARENT', 'INFO', '4'), + ]) -#---------------------------------------------------------------------------- -# Test 1 -#---------------------------------------------------------------------------- +tests.append(BuiltinLevelsTest) + + +class BasicFilterTest(BaseTest): + """ + Test the bundled Filter class. + """ + + def test_filter(self): + """ + Only messages satisfying the specified criteria pass through the + filter. + """ + filt = logging.Filter("spam.eggs") + hand = self.root_logger.handlers[0] + try: + hand.addFilter(filt) + spam = logging.getLogger("spam") + spam_eggs = logging.getLogger("spam.eggs") + spam_eggs_fish = logging.getLogger("spam.eggs.fish") + spam_bakedbeans = logging.getLogger("spam.bakedbeans") + spam.info(self.next_message()) + spam_eggs.info(self.next_message()) + spam_eggs_fish.info(self.next_message()) + spam_bakedbeans.info(self.next_message()) + self.assertLogLines([ + ('spam.eggs', 'INFO', '2'), + ('spam.eggs.fish', 'INFO', '3'), + ]) + finally: + hand.removeFilter(filt) + +tests.append(BasicFilterTest) + + # # First, we define our levels. There can be as many as you want - the only @@ -211,16 +308,16 @@ def test0(): # mapping dictionary to convert between your application levels and the # logging system. # -SILENT = 10 -TACITURN = 9 -TERSE = 8 -EFFUSIVE = 7 -SOCIABLE = 6 -VERBOSE = 5 -TALKATIVE = 4 -GARRULOUS = 3 -CHATTERBOX = 2 -BORING = 1 +SILENT = 120 +TACITURN = 119 +TERSE = 118 +EFFUSIVE = 117 +SOCIABLE = 116 +VERBOSE = 115 +TALKATIVE = 114 +GARRULOUS = 113 +CHATTERBOX = 112 +BORING = 111 LEVEL_RANGE = range(BORING, SILENT + 1) @@ -241,445 +338,563 @@ my_logging_levels = { BORING : 'Boring', } -# -# Now, to demonstrate filtering: suppose for some perverse reason we only -# want to print out all except GARRULOUS messages. Let's create a filter for -# this purpose... -# -class SpecificLevelFilter(logging.Filter): - def __init__(self, lvl): - self.level = lvl +class GarrulousFilter(logging.Filter): + """ + A filter which blocks garrulous messages. + """ + def filter(self, record): + return record.levelno != GARRULOUS - def filter(self, record): - return self.level != record.levelno - -class GarrulousFilter(SpecificLevelFilter): - def __init__(self): - SpecificLevelFilter.__init__(self, GARRULOUS) - -# -# Now, let's demonstrate filtering at the logger. This time, use a filter -# which excludes SOCIABLE and TACITURN messages. Note that GARRULOUS events -# are still excluded. -# class VerySpecificFilter(logging.Filter): + """ + A filter which blocks sociable and taciturn messages. + """ def filter(self, record): return record.levelno not in [SOCIABLE, TACITURN] -def message(s): - sys.stdout.write("%s\n" % s) -SHOULD1 = "This should only be seen at the '%s' logging level (or lower)" +class CustomLevelsAndFiltersTest(BaseTest): + """ + Test various filtering possibilities with custom logging levels. + """ + expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" -def test1(): -# -# Now, tell the logging system to associate names with our levels. -# - for lvl in my_logging_levels.keys(): - logging.addLevelName(lvl, my_logging_levels[lvl]) + def setUp(self): + BaseTest.setUp(self) + for k, v in my_logging_levels.items(): + logging.addLevelName(k, v) -# -# Now, define a test function which logs an event at each of our levels. -# + def log_at_all_levels(self, logger): + for lvl in LEVEL_RANGE: + logger.log(lvl, self.next_message()) - def doLog(log): - for lvl in LEVEL_RANGE: - log.log(lvl, SHOULD1, logging.getLevelName(lvl)) + def test_logger_filter(self): + """ + Filter at logger level. + """ + self.root_logger.setLevel(VERBOSE) + self.log_at_all_levels(self.root_logger) + self.assertLogLines([ + ('Verbose', '5'), + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ]) - log = logging.getLogger("") - hdlr = log.handlers[0] -# -# Set the logging level to each different value and call the utility -# function to log events. -# In the output, you should see that each time round the loop, the number of -# logging events which are actually output decreases. -# - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - # - # Now, we demonstrate level filtering at the handler level. Tell the - # handler defined above to filter at level 'SOCIABLE', and repeat the - # above loop. Compare the output from the two runs. - # - hdlr.setLevel(SOCIABLE) - message("-- Filtering at handler level to SOCIABLE --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) + def test_handler_filter(self): + """ + Filter at handler level. + """ + self.root_logger.handlers[0].setLevel(SOCIABLE) + try: + self.log_at_all_levels(self.root_logger) + self.assertLogLines([ + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ]) + finally: + self.root_logger.handlers[0].setLevel(0) - hdlr.setLevel(0) #turn off level filtering at the handler + def test_specific_filters(self): + """ + Set a specific filter object on the handler, and then add another + filter object on the logger itself. + """ + hdlr = self.root_logger.handlers[0] + spec = None + garr = GarrulousFilter() + hdlr.addFilter(garr) + try: + self.log_at_all_levels(self.root_logger) + first_lines = [ + # Notice how 'Garrulous' is missing + ('Boring', '1'), + ('Chatterbox', '2'), + ('Talkative', '4'), + ('Verbose', '5'), + ('Sociable', '6'), + ('Effusive', '7'), + ('Terse', '8'), + ('Taciturn', '9'), + ('Silent', '10'), + ] + self.assertLogLines(first_lines) - garr = GarrulousFilter() - hdlr.addFilter(garr) - message("-- Filtering using GARRULOUS filter --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) - spec = VerySpecificFilter() - log.addFilter(spec) - message("-- Filtering using specific filter for SOCIABLE, TACITURN --") - for lvl in LEVEL_RANGE: - message("-- setting logging level to '%s' -----" % - logging.getLevelName(lvl)) - log.setLevel(lvl) - doLog(log) + spec = VerySpecificFilter() + self.root_logger.addFilter(spec) + self.log_at_all_levels(self.root_logger) + self.assertLogLines(first_lines + [ + # Not only 'Garrulous' is still missing, but also 'Sociable' + # and 'Taciturn' + ('Boring', '11'), + ('Chatterbox', '12'), + ('Talkative', '14'), + ('Verbose', '15'), + ('Effusive', '17'), + ('Terse', '18'), + ('Silent', '20'), + ]) + finally: + if spec: + self.root_logger.removeFilter(spec) + hdlr.removeFilter(garr) - log.removeFilter(spec) - hdlr.removeFilter(garr) - #Undo the one level which clashes...for regression tests - logging.addLevelName(logging.DEBUG, "DEBUG") +tests.append(CustomLevelsAndFiltersTest) -#---------------------------------------------------------------------------- -# Test 2 -#---------------------------------------------------------------------------- -MSG = "-- logging %d at INFO, messages should be seen every 10 events --" -def test2(): - logger = logging.getLogger("") - sh = logger.handlers[0] - sh.close() - logger.removeHandler(sh) - mh = logging.handlers.MemoryHandler(10,logging.WARNING, sh) - logger.setLevel(logging.DEBUG) - logger.addHandler(mh) - message("-- logging at DEBUG, nothing should be seen yet --") - logger.debug("Debug message") - message("-- logging at INFO, nothing should be seen yet --") - logger.info("Info message") - message("-- logging at WARNING, 3 messages should be seen --") - logger.warn("Warn message") - for i in xrange(102): - message(MSG % i) - logger.info("Info index = %d", i) - mh.close() - logger.removeHandler(mh) - logger.addHandler(sh) +class MemoryHandlerTest(BaseTest): + """ + Tests for the MemoryHandler. + """ + expected_log_pat = r"^[\w.]+ -> ([\w]+): ([\d]+)$" -#---------------------------------------------------------------------------- -# Test 3 -#---------------------------------------------------------------------------- + def setUp(self): + BaseTest.setUp(self) + self.mem_hdlr = logging.handlers.MemoryHandler( + 10, logging.WARNING, self.root_hdlr) + self.mem_logger = logging.getLogger('mem') + self.mem_logger.propagate = 0 + self.mem_logger.addHandler(self.mem_hdlr) -FILTER = "a.b" + def tearDown(self): + self.mem_hdlr.close() -def doLog3(): - logging.getLogger("a").info("Info 1") - logging.getLogger("a.b").info("Info 2") - logging.getLogger("a.c").info("Info 3") - logging.getLogger("a.b.c").info("Info 4") - logging.getLogger("a.b.c.d").info("Info 5") - logging.getLogger("a.bb.c").info("Info 6") - logging.getLogger("b").info("Info 7") - logging.getLogger("b.a").info("Info 8") - logging.getLogger("c.a.b").info("Info 9") - logging.getLogger("a.bb").info("Info 10") + def test_flush(self): + """ + The memory handler flushes to its target handler based on specific + criteria (message count and message level). + """ + self.mem_logger.debug(self.next_message()) + self.assertLogLines([]) + self.mem_logger.info(self.next_message()) + self.assertLogLines([]) + # This will flush because the level is >= logging.WARNING + self.mem_logger.warn(self.next_message()) + lines = [ + ('DEBUG', '1'), + ('INFO', '2'), + ('WARNING', '3'), + ] + self.assertLogLines(lines) + for n in (4, 14): + for i in range(9): + self.mem_logger.debug(self.next_message()) + self.assertLogLines(lines) + # This will flush because it's the 10th message since the last flush. + self.mem_logger.debug(self.next_message()) + lines = lines + [('DEBUG', str(i)) for i in range(n, n + 10)] + self.assertLogLines(lines) -def test3(): - root = logging.getLogger() - root.setLevel(logging.DEBUG) - hand = root.handlers[0] - message("Unfiltered...") - doLog3() - message("Filtered with '%s'..." % FILTER) - filt = logging.Filter(FILTER) - hand.addFilter(filt) - doLog3() - hand.removeFilter(filt) + self.mem_logger.debug(self.next_message()) + self.assertLogLines(lines) -#---------------------------------------------------------------------------- -# Test 4 -#---------------------------------------------------------------------------- +tests.append(MemoryHandlerTest) -# config0 is a standard configuration. -config0 = """ -[loggers] -keys=root -[handlers] -keys=hand1 +class ExceptionFormatter(logging.Formatter): + """ + A special exception formatter. + """ + def formatException(self, ei): + return "Got a [%s]" % ei[0].__name__ -[formatters] -keys=form1 -[logger_root] -level=NOTSET -handlers=hand1 +class ConfigFileTest(BaseTest): + """ + Reading logging config from a .ini-style config file. + """ -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) + expected_log_pat = r"^([\w]+) \+\+ ([\w]+)$" -[formatter_form1] -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" + # config0 is a standard configuration. + config0 = """ + [loggers] + keys=root -# config1 adds a little to the standard configuration. -config1 = """ -[loggers] -keys=root,parser + [handlers] + keys=hand1 -[handlers] -keys=hand1 + [formatters] + keys=form1 -[formatters] -keys=form1 + [logger_root] + level=WARNING + handlers=hand1 -[logger_root] -level=NOTSET -handlers=hand1 + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) -[logger_parser] -level=DEBUG -handlers=hand1 -propagate=1 -qualname=compiler.parser + [formatter_form1] + format=%(levelname)s ++ %(message)s + datefmt= + """ -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) + # config1 adds a little to the standard configuration. + config1 = """ + [loggers] + keys=root,parser -[formatter_form1] -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" + [handlers] + keys=hand1 -# config2 has a subtle configuration error that should be reported -config2 = string.replace(config1, "sys.stdout", "sys.stbout") + [formatters] + keys=form1 -# config3 has a less subtle configuration error -config3 = string.replace( - config1, "formatter=form1", "formatter=misspelled_name") + [logger_root] + level=WARNING + handlers= -def test4(): - for i in range(4): - conf = globals()['config%d' % i] - sys.stdout.write('config%d: ' % i) - loggerDict = logging.getLogger().manager.loggerDict - logging._acquireLock() - try: - saved_handlers = logging._handlers.copy() - saved_handler_list = logging._handlerList[:] - saved_loggers = loggerDict.copy() - finally: - logging._releaseLock() + [logger_parser] + level=DEBUG + handlers=hand1 + propagate=1 + qualname=compiler.parser + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [formatter_form1] + format=%(levelname)s ++ %(message)s + datefmt= + """ + + # config2 has a subtle configuration error that should be reported + config2 = config1.replace("sys.stdout", "sys.stbout") + + # config3 has a less subtle configuration error + config3 = config1.replace("formatter=form1", "formatter=misspelled_name") + + # config4 specifies a custom formatter class to be loaded + config4 = """ + [loggers] + keys=root + + [handlers] + keys=hand1 + + [formatters] + keys=form1 + + [logger_root] + level=NOTSET + handlers=hand1 + + [handler_hand1] + class=StreamHandler + level=NOTSET + formatter=form1 + args=(sys.stdout,) + + [formatter_form1] + class=""" + __name__ + """.ExceptionFormatter + format=%(levelname)s:%(name)s:%(message)s + datefmt= + """ + + def apply_config(self, conf): try: fn = tempfile.mktemp(".ini") f = open(fn, "w") - f.write(conf) + f.write(textwrap.dedent(conf)) f.close() + logging.config.fileConfig(fn) + # "call again to make sure cleanup is correct" (???) + #logging.config.fileConfig(fn) + finally: + os.remove(fn) + + def test_config0_ok(self): + """ + A simple config file which overrides the default settings. + """ + old_stdout = sys.stdout + try: + sys.stdout = cStringIO.StringIO() + self.apply_config(self.config0) + logger = logging.getLogger() + # Won't output anything + logger.info(self.next_message()) + # Outputs a message + logger.error(self.next_message()) + self.assertLogLines([ + ('ERROR', '2'), + ], stream=sys.stdout) + # Original logger output is empty + self.assertLogLines([]) + finally: + sys.stdout = old_stdout + + def test_config1_ok(self): + """ + A config file defining a sub-parser as well. + """ + old_stdout = sys.stdout + try: + sys.stdout = cStringIO.StringIO() + self.apply_config(self.config1) + logger = logging.getLogger("compiler.parser") + # Both will output a message + logger.info(self.next_message()) + logger.error(self.next_message()) + self.assertLogLines([ + ('INFO', '1'), + ('ERROR', '2'), + ], stream=sys.stdout) + # Original logger output is empty + self.assertLogLines([]) + finally: + sys.stdout = old_stdout + + def test_config2_failure(self): + """ + A simple config file which overrides the default settings. + """ + self.assertRaises(StandardError, self.apply_config, self.config2) + + def test_config3_failure(self): + """ + A simple config file which overrides the default settings. + """ + self.assertRaises(StandardError, self.apply_config, self.config3) + + def test_config4_ok(self): + """ + A config file specifying a custom formatter class. + """ + old_stdout = sys.stdout + try: + sys.stdout = cStringIO.StringIO() + self.apply_config(self.config4) + logger = logging.getLogger() try: - logging.config.fileConfig(fn) - #call again to make sure cleanup is correct - logging.config.fileConfig(fn) - except: - t = sys.exc_info()[0] - message(str(t)) + raise RuntimeError() + except RuntimeError: + logging.exception("just testing") + sys.stdout.seek(0) + self.assertEquals(sys.stdout.getvalue(), + "ERROR:root:just testing\nGot a [RuntimeError]\n") + # Original logger output is empty + self.assertLogLines([]) + finally: + sys.stdout = old_stdout + + +tests.append(ConfigFileTest) + + + +TCP_LOG_END = "!!!END!!!" + +class LogRecordStreamHandler(StreamRequestHandler): + """ + Handler for a streaming logging request. It saves the log message in the + TCP server's 'log_output' attribute. + """ + + def handle(self): + """ + Handle multiple requests - each expected to be a 4-byte length, + followed by the LogRecord in pickle format. Logs the record + according to whatever policy is configured locally. + """ + while True: + chunk = self.connection.recv(4) + if len(chunk) < 4: + break + slen = struct.unpack(">L", chunk)[0] + chunk = self.connection.recv(slen) + while len(chunk) < slen: + chunk = chunk + self.connection.recv(slen - len(chunk)) + obj = self.unPickle(chunk) + record = logging.makeLogRecord(obj) + self.handleLogRecord(record) + + def unPickle(self, data): + return cPickle.loads(data) + + def handleLogRecord(self, record): + # If the end-of-messages sentinel is seen, tell the server to terminate + if TCP_LOG_END in record.msg: + self.server.abort = 1 + return + self.server.log_output += record.msg + "\n" + + +class LogRecordSocketReceiver(ThreadingTCPServer): + """ + A simple-minded TCP socket-based logging receiver suitable for test + purposes. + """ + allow_reuse_address = 1 + log_output = "" + + def __init__(self, host='localhost', + port=logging.handlers.DEFAULT_TCP_LOGGING_PORT, + handler=LogRecordStreamHandler): + ThreadingTCPServer.__init__(self, (host, port), handler) + self.abort = False + self.timeout = 0.1 + self.finished = threading.Event() + + def serve_until_stopped(self): + while not self.abort: + rd, wr, ex = select.select([self.socket.fileno()], [], [], + self.timeout) + if rd: + self.handle_request() + # Notify the main thread that we're about to exit + self.finished.set() + # close the listen socket + self.server_close() + + +class SocketHandlerTest(BaseTest): + """ + Test for SocketHandler objects. + """ + + def setUp(self): + """ + Set up a TCP server to receive log messages, and a SocketHandler + pointing to that server's address and port. + """ + BaseTest.setUp(self) + + # Find an unused port number + self.port = logging.handlers.DEFAULT_TCP_LOGGING_PORT + while self.port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100: + try: + self.tcpserver = LogRecordSocketReceiver(port=self.port) + except socket.error: + self.port += 1 else: - message('ok.') - os.remove(fn) + break + else: + raise RuntimeError("Could not find unused port") + + self.threads = [threading.Thread(target=self.tcpserver.serve_until_stopped)] + for thread in self.threads: + thread.start() + + self.sock_hdlr = logging.handlers.SocketHandler('localhost', self.port) + self.sock_hdlr.setFormatter(self.root_formatter) + self.root_logger.removeHandler(self.root_logger.handlers[0]) + self.root_logger.addHandler(self.sock_hdlr) + + def tearDown(self): + """ + Shutdown the TCP server. + """ + try: + self.tcpserver.abort = True + del self.tcpserver + self.root_logger.removeHandler(self.sock_hdlr) + self.sock_hdlr.close() + for thread in self.threads: + thread.join(2.0) finally: - logging._acquireLock() - try: - logging._handlers.clear() - logging._handlers.update(saved_handlers) - logging._handlerList[:] = saved_handler_list - loggerDict = logging.getLogger().manager.loggerDict - loggerDict.clear() - loggerDict.update(saved_loggers) - finally: - logging._releaseLock() + BaseTest.tearDown(self) -#---------------------------------------------------------------------------- -# Test 5 -#---------------------------------------------------------------------------- + def get_output(self): + """ + Get the log output as received by the TCP server. + """ + # Signal the TCP receiver and wait for it to terminate + self.root_logger.critical(TCP_LOG_END) + self.tcpserver.finished.wait(2.0) + return self.tcpserver.log_output -test5_config = """ -[loggers] -keys=root + def test_output(self): + """ + The log message sent to the SocketHandler is properly received. + """ + logger = logging.getLogger("tcp") + logger.error("spam") + logger.debug("eggs") + self.assertEquals(self.get_output(), "spam\neggs\n") -[handlers] -keys=hand1 +tests.append(SocketHandlerTest) -[formatters] -keys=form1 -[logger_root] -level=NOTSET -handlers=hand1 +class MemoryTest(BaseTest): + """ + Test memory persistence of logger objects. + """ -[handler_hand1] -class=StreamHandler -level=NOTSET -formatter=form1 -args=(sys.stdout,) + def setUp(self): + """ + Create a dict to remember potentially destroyed objects. + """ + BaseTest.setUp(self) + self._survivors = {} -[formatter_form1] -class=test.test_logging.FriendlyFormatter -format=%(levelname)s:%(name)s:%(message)s -datefmt= -""" + def _watchForSurvival(self, *args): + """ + Watch the given objects for survival, by creating weakrefs to them. + """ + for obj in args: + key = id(obj), repr(obj) + self._survivors[key] = weakref.ref(obj) -class FriendlyFormatter (logging.Formatter): - def formatException(self, ei): - return "%s... Don't panic!" % str(ei[0]) + def _assertSurvival(self): + """ + Assert that all objects watched for survival have survived. + """ + # Trigger cycle breaking + gc.collect() + dead = [] + for (id_, repr_), ref in self._survivors.items(): + if ref() is None: + dead.append(repr_) + if dead: + self.fail("%d objects should have survived " + "but have been destroyed: %s" % (len(dead), ", ".join(dead))) + def test_persistent_loggers(self): + """ + Logger objects are persistent and retain their configuration, even + if visible references are destroyed. + """ + self.root_logger.setLevel(logging.INFO) + foo = logging.getLogger("foo") + self._watchForSurvival(foo) + foo.setLevel(logging.DEBUG) + self.root_logger.debug(self.next_message()) + foo.debug(self.next_message()) + self.assertLogLines([ + ('foo', 'DEBUG', '2'), + ]) + del foo + # foo has survived + self._assertSurvival() + # foo has retained its settings + bar = logging.getLogger("foo") + bar.debug(self.next_message()) + self.assertLogLines([ + ('foo', 'DEBUG', '2'), + ('foo', 'DEBUG', '3'), + ]) -def test5(): - loggerDict = logging.getLogger().manager.loggerDict - logging._acquireLock() - try: - saved_handlers = logging._handlers.copy() - saved_handler_list = logging._handlerList[:] - saved_loggers = loggerDict.copy() - finally: - logging._releaseLock() - try: - fn = tempfile.mktemp(".ini") - f = open(fn, "w") - f.write(test5_config) - f.close() - logging.config.fileConfig(fn) - try: - raise KeyError - except KeyError: - logging.exception("just testing") - os.remove(fn) - hdlr = logging.getLogger().handlers[0] - logging.getLogger().handlers.remove(hdlr) - finally: - logging._acquireLock() - try: - logging._handlers.clear() - logging._handlers.update(saved_handlers) - logging._handlerList[:] = saved_handler_list - loggerDict = logging.getLogger().manager.loggerDict - loggerDict.clear() - loggerDict.update(saved_loggers) - finally: - logging._releaseLock() +tests.append(MemoryTest) - -#---------------------------------------------------------------------------- -# Test Harness -#---------------------------------------------------------------------------- -def banner(nm, typ): - sep = BANNER % (nm, typ) - sys.stdout.write(sep) - sys.stdout.flush() - -def test_main_inner(): - rootLogger = logging.getLogger("") - rootLogger.setLevel(logging.DEBUG) - hdlr = logging.StreamHandler(sys.stdout) - fmt = logging.Formatter(logging.BASIC_FORMAT) - hdlr.setFormatter(fmt) - rootLogger.addHandler(hdlr) - - # Find an unused port number - port = logging.handlers.DEFAULT_TCP_LOGGING_PORT - while port < logging.handlers.DEFAULT_TCP_LOGGING_PORT+100: - try: - tcpserver = LogRecordSocketReceiver(port=port) - except socket.error: - port += 1 - else: - break - else: - raise ImportError, "Could not find unused port" - - - #Set up a handler such that all events are sent via a socket to the log - #receiver (logrecv). - #The handler will only be added to the rootLogger for some of the tests - shdlr = logging.handlers.SocketHandler('localhost', port) - - #Configure the logger for logrecv so events do not propagate beyond it. - #The sockLogger output is buffered in memory until the end of the test, - #and printed at the end. - sockOut = cStringIO.StringIO() - sockLogger = logging.getLogger("logrecv") - sockLogger.setLevel(logging.DEBUG) - sockhdlr = logging.StreamHandler(sockOut) - sockhdlr.setFormatter(logging.Formatter( - "%(name)s -> %(levelname)s: %(message)s")) - sockLogger.addHandler(sockhdlr) - sockLogger.propagate = 0 - - #Set up servers - threads = [] - #sys.stdout.write("About to start TCP server...\n") - threads.append(threading.Thread(target=runTCP, args=(tcpserver,))) - - for thread in threads: - thread.start() - try: - banner("log_test0", "begin") - - rootLogger.addHandler(shdlr) - test0() - # XXX(nnorwitz): Try to fix timing related test failures. - # This sleep gives us some extra time to read messages. - # The test generally only fails on Solaris without this sleep. - time.sleep(2.0) - shdlr.close() - rootLogger.removeHandler(shdlr) - - banner("log_test0", "end") - - for t in range(1,6): - banner("log_test%d" % t, "begin") - globals()['test%d' % t]() - banner("log_test%d" % t, "end") - - finally: - #wait for TCP receiver to terminate - socketDataProcessed.wait() - # ensure the server dies - tcpserver.abort = 1 - for thread in threads: - thread.join(2.0) - banner("logrecv output", "begin") - sys.stdout.write(sockOut.getvalue()) - sockOut.close() - sockLogger.removeHandler(sockhdlr) - sockhdlr.close() - banner("logrecv output", "end") - sys.stdout.flush() - try: - hdlr.close() - except: - pass - rootLogger.removeHandler(hdlr) # Set the locale to the platform-dependent default. I have no idea # why the test does this, but in any case we save the current locale # first and restore it at the end. @run_with_locale('LC_ALL', '') def test_main(): - # Save and restore the original root logger level across the tests. - # Otherwise, e.g., if any test using cookielib runs after test_logging, - # cookielib's debug-level logger tries to log messages, leading to - # confusing: - # No handlers could be found for logger "cookielib" - # output while the tests are running. - root_logger = logging.getLogger("") - original_logging_level = root_logger.getEffectiveLevel() - try: - test_main_inner() - finally: - root_logger.setLevel(original_logging_level) + run_unittest(*tests) if __name__ == "__main__": - sys.stdout.write("test_logging\n") test_main()