Index: Lib/test/test_logging.py =================================================================== --- Lib/test/test_logging.py (revision 88640) +++ Lib/test/test_logging.py (working copy) @@ -45,6 +45,9 @@ import unittest import warnings import weakref +import time +import traceback +import imp try: import threading except ImportError: @@ -350,6 +353,12 @@ finally: handler.removeFilter(filterfunc) + def test_filter_noName(self): + filter = logging.Filter("") + self.assertEquals(filter.nlen, 0) + record = logging.LogRecord(name=None, level=None, pathname=None, + lineno=None, msg=None, args=None, exc_info=None) + self.assertEquals(filter.filter(record), 1) # # First, we define our levels. There can be as many as you want - the only @@ -1790,6 +1799,12 @@ logging.warning('should not appear in logged') self.assertEqual(logged, ['should appear in logged']) + + def test_setLogRecordFactory(self): + factory = logging._logRecordFactory + manager = logging.Manager(None) + manager.setLogRecordFactory(factory) + self.assertEquals(manager.logRecordFactory, factory) class ChildLoggerTest(BaseTest): @@ -1940,6 +1955,49 @@ f = logging.Formatter('asctime', style='$') self.assertFalse(f.usesTime()) + def test_constructor(self): + self.assertRaises(ValueError, logging.Formatter, None, None, 'x') + + def test_formatTime(self): + formatter = logging.Formatter() + record = logging.LogRecord(name=None, level=None, pathname=None, + lineno=None, msg=None, args=None, exc_info=None) + fmtTime = formatter.formatTime(record) + t = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(record.created)) + expectedFmtTime = "%s,%03d" % (t, record.msecs) + self.assertEquals(fmtTime, expectedFmtTime) + fmt = "%d-%m-%Y %H:%M:%S" + expected = time.strftime(fmt, time.localtime(record.created)) + fmtTime = formatter.formatTime(record, fmt) + self.assertEquals(fmtTime, expected) + + def test_formatException(self): + formatter = logging.Formatter() + try: + raise ValueError("") + except ValueError: + fmtException = formatter.formatException(sys.exc_info()) + expected = ''.join(traceback.format_exception(*sys.exc_info())).rstrip('\n') + self.assertEquals(fmtException, expected) + + def test_formatStack(self): + formatter = logging.Formatter() + x = 2 + self.assertEquals(formatter.formatStack(x), x) + + def test_format(self): + def _dummyUsesTime(): + return True + formatter = logging.Formatter() + formatter.usesTime = _dummyUsesTime + record = logging.LogRecord(name=None, level=None, pathname=None, + lineno=None, msg=None, args=None, exc_info=None) + record.stack_info = "test" + fmtStr = formatter.format(record) + self.assertTrue(hasattr(record, "asctime")) + self.assertEqual(record.asctime, formatter.formatTime(record, formatter.datefmt)) + self.assertIn(formatter.formatStack(record.stack_info), fmtStr) + class LastResortTest(BaseTest): def test_last_resort(self): # Test the last resort handler @@ -2037,6 +2095,504 @@ # test methods added below pass +class ModuleScopeTest(BaseTest): + """ + Tests statements made in the 'global' scope of the logging module, i.e. when + user imports the logging module. + Uses imp.reload() which may trip up other tests if these rely on references + to the original imported version of the logging module... + """ + def test__srcfile_py2exe(self): + setattr(sys, "frozen", None) + try: + expectedValue = "logging%s__init__%s" % (os.sep, logging.__file__[-4:]) + expectedValue = os.path.normcase(expectedValue) + imp.reload(logging) + self.assertEquals(logging._srcfile, expectedValue) + finally: + delattr(sys, "frozen") + imp.reload(logging) + +class UnboundLoggingFunctionsTest(BaseTest): + """ + Tests all unbound functions provided by the logging module. + """ + def test__checkLevel_raisesValueError_ok(self): + self.assertRaises(ValueError, logging._checkLevel, "NonExistantLevel") + + def test__checkLevel_raisesTypeError_ok(self): + self.assertRaises(TypeError, logging._checkLevel, object()) + + def test_setLoggerClass_and_getLoggerClass(self): + class MyLogger(logging.Logger): pass + class InvalidLogger(object): pass + loggerBackup = logging._loggerClass + try: + logging.setLoggerClass(MyLogger) + self.assertEquals(logging._loggerClass, MyLogger) + self.assertEquals(logging.getLoggerClass(), MyLogger) + self.assertRaises(TypeError, logging.setLoggerClass, InvalidLogger) + finally: + logging._loggerClass = loggerBackup + + def test_basicConfig_lockAcquiredAndReleased(self): + class DummyLogger(logging.Logger): + pass + self._acquireCalled = False + self._releaseCalled = False + def dummy_acquireLock(): + self._acquireCalled = True + def dummy_releaseLock(): + self._releaseCalled = True + _acquireBackup = logging._acquireLock + _releaseBackup = logging._releaseLock + rootBackup = logging.root + logging._acquireLock = dummy_acquireLock + try: + logging._releaseLock = dummy_releaseLock + try: + logging.root = DummyLogger("dummy root logger") + try: + logging.basicConfig() + self.assertTrue(self._acquireCalled) + self.assertTrue(self._releaseCalled) + finally: + logging.root = rootBackup + finally: + logging._releaseLock = _releaseBackup + finally: + logging._acquireLock = _acquireBackup + + def test_basicConfig_handlerTypes(self): + class DummyLogger(logging.Logger): + pass + rootBackup = logging.root + logging.root = DummyLogger("dummy root logger") + try: + # StreamHandler: + logging.basicConfig(level=20) + self.assertEqual(logging.root.level, 20) + self.assertIn(logging.StreamHandler, [type(obj) for obj in logging.root.handlers]) + logging.basicConfig(stream=io.StringIO()) + self.assertIn(logging.StreamHandler, [type(obj) for obj in logging.root.handlers]) + logging.root.handlers = [] + # FileHandler: + logging.basicConfig(filename="testFile") + self.assertEqual(logging.root.handlers[0].mode, 'a') + logging.root.handlers = [] + mode = 'w' + logging.basicConfig(filename="testFile", filemode=mode) + self.assertEqual(logging.root.handlers[0].mode, mode) + finally: + logging.root = rootBackup + + def test_moduleLogFunctions(self): + functions = [logging.critical, logging.error, logging.warning, + logging.info, logging.debug, logging.log] + class DummyLogger(object): + lastAttr = None + def dummyLog(*args, **kwargs): + pass + def __getattr__(self, attr): + self.lastAttr = attr + if attr == "handlers": + return [] + return self.dummyLog + self.basicConfigCalled = False + def dummyBasicConfig(**kwargs): + self.basicConfigCalled = True + rootBackup = logging.root + basicConfigBackup = logging.basicConfig + logging.basicConfig = dummyBasicConfig + try: + logging.root = DummyLogger() + try: + for function in functions: + self.basicConfigCalled = False + function(None, None) + self.assertTrue(self.basicConfigCalled) + self.assertEqual(logging.root.lastAttr, function.__name__) + logging.root.handlers = [] + finally: + logging.root = rootBackup + finally: + logging.basicConfig = basicConfigBackup + + def test_disable(self): + disableBackup = logging.root.manager.disable + logging.disable(10) + self.assertEqual(logging.root.manager.disable, 10) + logging.root.manager.disable = disableBackup + + def test_shutdown_checkFlushAndCloseCalled(self): + class MyHandler(logging.Handler): + def __init__(self): + self.flushCalled = False + self.closeCalled = False + def flush(self, *args, **kwargs): + self.flushCalled = True + def close(self, *args, **kwargs): + self.closeCalled = True + handler = MyHandler() + handlerList = [weakref.ref(handler)] + logging.shutdown(handlerList) + self.assertTrue(handler.flushCalled) + self.assertTrue(handler.closeCalled) + + def test_shutdown_suppressesIOValueError(self): + for exceptionClass in (IOError, ValueError): + class MyHandler(logging.Handler): + def __init__(self):pass + def flush(self, *args, **kwargs): + raise exceptionClass() + handler = MyHandler() + handlerList = [weakref.ref(handler)] + try: + logging.shutdown(handlerList) + except exceptionClass: + self.fail("%s was not caught by shutdown function" % exceptionClass.__name__) + + def test_shutdown_otherExceptions(self): + raiseExceptionsBackup = logging.raiseExceptions + try: + class MyException(Exception):pass + class MyHandler(logging.Handler): + def __init__(self):pass + def flush(self, *args, **kwargs): + raise MyException() + for logging.raiseExceptions in (True, False): + handler = MyHandler() + handlerList = [weakref.ref(handler)] + try: + logging.shutdown(handlerList) + except MyException: + if not logging.raiseExceptions: + self.fail("All exceptions should be suppressed, logging.raiseExceptions=%s" % + logging.raiseExceptions) + finally: + logging.raiseExceptions = raiseExceptionsBackup + + def test__showwarning(self): + # ToDo: test method completely - currently only testing that NullHandler is added + # if logger has no handlers + logger = logging.getLogger("py.warnings") + logger.addHandler(logging.NullHandler()) + for handler in logger.handlers: + if type(handler) == logging.NullHandler: + logger.handlers.remove(handler) + logging._showwarning("testmsg", Warning, "testfile", 12) + self.assertIn(logging.NullHandler, [type(handler) for handler in logger.handlers]) + +class LogRecordTest(BaseTest): + def test_argsSetToSingleDict_ok(self): + testDict = {"test": 123} + record = logging.LogRecord(name="", level="", pathname="", + lineno=0, msg="test msg", args=[testDict,], exc_info=()) + self.assertEqual(record.args, testDict) + +class BufferingFormatterTest(BaseTest): + def test_constructor(self): + class MyFormatter(logging.Formatter): + pass + formatter = logging.BufferingFormatter() + self.assertEquals(formatter.linefmt, logging._defaultFormatter) + formatter = logging.BufferingFormatter(linefmt=MyFormatter) + self.assertEquals(formatter.linefmt, MyFormatter) + def test_formatHeader(self): + formatter = logging.BufferingFormatter() + self.assertEquals(formatter.formatHeader(None), "") + def test_formatFooter(self): + formatter = logging.BufferingFormatter() + self.assertEquals(formatter.formatFooter(None), "") + def test_format(self): + formatter = logging.BufferingFormatter() + self.assertEquals(formatter.format([]), "") + record = logging.LogRecord(name="test record", level=10, pathname="testpath", + lineno=12, msg="testing testing", args=None, exc_info=None) + records = [record, ] + expected = formatter.formatHeader(records) + for record in records: + expected += formatter.linefmt.format(record) + expected += formatter.formatFooter(records) + self.assertEquals(formatter.format(records), expected) + +class HandlerTest(BaseTest): + def test_get_name(self): + handler = logging.Handler() + self.assertEquals(handler.get_name(), None) + handler.set_name("test_name") + self.assertEquals(handler.get_name(), "test_name") + + def test_set_name(self): + # check calls del on existing handler with same name + firstHandler = logging.Handler() + logging._handlers[None] = firstHandler + firstHandler.set_name(None) + self.assertNotIn(firstHandler, logging._handlers) + + def test_createLock(self): + threadBackup = logging.thread + try: + logging.thread = None + handler = logging.Handler() + handler.createLock() + self.assertEquals(handler.lock, None) + finally: + logging.thread = threadBackup + + def test_emit(self): + handler = logging.Handler() + self.assertRaises(NotImplementedError, handler.emit, None) + + def test_flush(self): + # No test here, just increasing coverage + handler = logging.Handler() + handler.flush() + + def test_handleError(self): + class HandlerTestException(Exception):pass + class TestHandler(logging.Handler): + def emit(self, record): + try: + raise HandlerTestException("Testing") + except: + self.last_exc_info = sys.exc_info() + self.handleError(record) + record = logging.LogRecord(name="test record", level=10, pathname="testpath", + lineno=12, msg="testing testing", args=None, exc_info=None) + handler = TestHandler() + stderrBackup = sys.stderr + try: + sys.stderr = io.StringIO() + logging.raiseExceptions = True + handler.emit(record) + tbStr = ''.join(traceback.format_exception(*handler.last_exc_info)) + self.assertIn(tbStr, sys.stderr.getvalue()) + sys.stderr.close() + sys.stderr = io.StringIO() + logging.raiseExceptions = False + handler.emit(record) + self.assertEqual(sys.stderr.getvalue(), "") + finally: + sys.stderr = stderrBackup + +class StreamHandlerTest(BaseTest): + handleErrorCalled = False + def _dummyHandleError(self, *args, **kwargs): + self.handleErrorCalled = True + + def test_emit_exceptionHandling(self): + self.handleErrorCalled = False + class DummyStream(io.StringIO): + exceptionClass = None + def write(self, *args, **kwargs): + raise self.exceptionClass("Testing exception handling") + # Test KeyboardInterrupt and SystemExit are re-raised: + exceptionsToTest = [KeyboardInterrupt, SystemExit] + record = logging.LogRecord(name="test record", level=10, pathname="testpath", + lineno=12, msg="testing testing", args=None, exc_info=None) + stream = DummyStream() + handler = logging.StreamHandler(stream) + handler.handleError = self._dummyHandleError + for exception in exceptionsToTest: + stream.exceptionClass = exception + self.assertRaises(exception, handler.emit, record) + self.assertFalse(self.handleErrorCalled) + # All other exceptions result in handleError() being called: + stream.exceptionClass = AttributeError + handler.emit(record) + self.assertTrue(self.handleErrorCalled) + +class FileHandlerTest(BaseTest): + handlerConstructorCalled = False + def _dummyHandlerConstructor(self, *args, **kwargs): + self.handlerConstructorCalled = True + + streamHandlerConstructorCalled = False + def _dummyStreamHandlerConstructor(self, *args, **kwargs): + self.streamHandlerConstructorCalled = True + + def test_constructor(self): + """ + Sets logging.codecs to None, ensures encoding is set to None. + Sets delay to non-zero value, ensures Handler __init__ is called, + not StreamHandler __init__ + """ + self.handlerConstructorCalled = False + self.streamHandlerConstructorCalled = False + codecsBackup = logging.codecs + baseClassConstructorBackup = logging.Handler.__init__ + streamHandlerConstructorBackup = logging.StreamHandler.__init__ + try: + logging.codecs = None + logging.Handler.__init__ = self._dummyHandlerConstructor + logging.StreamHandler.__init__ = self._dummyStreamHandlerConstructor + handler = logging.FileHandler(filename="testfile", encoding="%", delay=1) + self.assertIsNone(handler.encoding) + self.assertTrue(self.handlerConstructorCalled) + self.assertFalse(self.streamHandlerConstructorCalled) + finally: + logging.codecs = codecsBackup + logging.Handler.__init__ = baseClassConstructorBackup + logging.StreamHandler.__init__ = streamHandlerConstructorBackup + + def test_emit_opensStream(self): + self.openCalled = False + def _dummyOpen(*args, **kwargs): + self.openCalled = True + return io.StringIO() + record = logging.LogRecord(name="test record", level=10, pathname="testpath", + lineno=12, msg="testing testing", args=None, exc_info=None) + handler = logging.FileHandler("test") + handler.stream = None + handler._open = _dummyOpen + handler.emit(record) + self.assertTrue(self.openCalled) + +class LoggerTest(BaseTest): + def test_exception(self): + def dummyError(msg, *args, **kwargs): + self.assertIn('exc_info', kwargs.keys()) + logger = logging.Logger("test logger") + logger.error = dummyError + kwargs = {} + logger.exception(msg="test msg", **kwargs) + + def test_log_invalidLevel(self): + # 1. With raiseExceptions disabled: + self._logCalled = False + def dummy_log(level, msg, args, **kwargs): + self._logCalled = True + logger = logging.Logger("test logger") + logger._log = dummy_log + logger.log(level="badLevel", msg="test msg") + self.assertFalse(self._logCalled) + #2. With raiseExceptions enabled: + raiseExceptionsBackup = logging.raiseExceptions + logging.raiseExceptions = True + try: + self._logCalled = False + self.assertRaises(TypeError, logger.log, level="badLevel", msg="test msg") + self.assertFalse(self._logCalled) + finally: + logging.raiseExceptions = raiseExceptionsBackup + + def test_findCaller(self): + class DummyStringIO(io.StringIO): + contents = '' + def write(self, msg, *args, **kwargs): + self.contents += str(msg) + def getvalue(self): + return self.contents + logger = logging.Logger("test logger") + sioBackup = io.StringIO + io.StringIO = DummyStringIO + try: + f = logging.currentframe() + expected = ''.join(traceback.format_list(traceback.extract_stack(f))) + expected = 'Stack (most recent call last):\n' + expected + (co_filename, f_lineno, co_name, sinfo) = logger.findCaller(stack_info=True) + self.assertIn(sinfo, expected) + finally: + io.StringIO = sioBackup + + def test_makeRecord_extraDict(self): + logger = logging.Logger("test logger") + keys = ["message", "asctime"] + keys.extend(list(logging._logRecordFactory(name=None, level=None, pathname=None, lineno=None, + msg=None, args=None, exc_info=None).__dict__)) + for key in keys: + extra = {key:None} + self.assertRaises(KeyError, logger.makeRecord, name="", level=10, fn=None, lno=None, msg="", + args=None, exc_info=None, func=None, extra=extra) + extra = {"testAttr" : "testValue"} + rv = logger.makeRecord(name="", level=10, fn=None, lno=None, msg="", + args=None, exc_info=None, func=None, extra=extra) + self.assertTrue(hasattr(rv, "testAttr")) + self.assertEquals(getattr(rv, "testAttr"), "testValue") + + def test__log_exceptionHandling(self): + def dummyFindCaller(*args, **kwargs): + raise ValueError() + def dummyMakeRecord(name, level, fn, lno, msg, args, exc_info, func, extra, sinfo): + self.assertEquals(name, "Test Logger") + self.assertEquals(fn, "(unknown file)") + self.assertEquals(lno, 0) + self.assertEquals(func, "(unknown function)") + logger = logging.Logger("Test Logger") + logger.findCaller = dummyFindCaller + logger.makeRecord = dummyMakeRecord + logger._log(level=10, msg="test msg", args=None) + _srcfileBackup = logging._srcfile + logging._srcfile = None + try: + logger._log(level=10, msg="test msg", args=None) + finally: + logging._srcfile = _srcfileBackup + + def test_hasHandlers_breakOnPropagateFalse(self): + class TestLogger(logging.Logger): + parentAttributePolled = False + def __getattr__(self, name): + if name == "parent": + self.parentAttributePolled = True + logger = TestLogger("test logger") + logger.handlers = [] + del logger.parent + logger.propagate = False + self.assertFalse(logger.hasHandlers()) + self.assertFalse(logger.parentAttributePolled) + + def test_isEnabledFor(self): + logger = logging.getLogger("test logger") + logger.manager.disable = 10 + self.assertEquals(logger.isEnabledFor(5), 0) + +class LoggerAdapterTest(BaseTest): + def test_exception(self): + def dummyLog(level, msg, *args, **kwargs): + self.assertIn('exc_info', kwargs.keys()) + self.assertEqual(kwargs['exc_info'], 1) + logger = logging.Logger("test logger") + adapter = logging.LoggerAdapter(logger, {}) + adapter.log = dummyLog + kwargs = {} + adapter.exception(msg="test msg", **kwargs) + + def test_critical(self): + def dummyLog(level, msg, *args, **kwargs): + self.assertEquals(level, logging.CRITICAL) + logger = logging.Logger("test logger") + adapter = logging.LoggerAdapter(logger, {}) + adapter.log = dummyLog + adapter.critical('test msg') + + def test_isEnabledFor_managerDisableLevel(self): + logger = logging.Logger("test logger") + adapter = logging.LoggerAdapter(logger, {}) + logger.manager.disable = 20 + self.assertFalse(adapter.isEnabledFor(20)) + self.assertFalse(adapter.isEnabledFor(19)) + + def test_hasHandlers(self): + logger = logging.Logger("test logger") + adapter = logging.LoggerAdapter(logger, {}) + self.assertEqual(adapter.hasHandlers(), logger.hasHandlers()) + +class NullHandlerTest(BaseTest): + def test_handle(self): + handler = logging.NullHandler() + handler.handle(None) + + def test_emit(self): + handler = logging.NullHandler() + handler.emit(None) + + def test_createLock(self): + handler = logging.NullHandler() + handler.createLock() + self.assertEquals(handler.lock, None) + def secs(**kw): return datetime.timedelta(**kw) // datetime.timedelta(seconds=1) @@ -2097,7 +2653,17 @@ LogRecordFactoryTest, ChildLoggerTest, QueueHandlerTest, RotatingFileHandlerTest, LastResortTest, - TimedRotatingFileHandlerTest + TimedRotatingFileHandlerTest, + ModuleScopeTest, + UnboundLoggingFunctionsTest, + LogRecordTest, + BufferingFormatterTest, + HandlerTest, + StreamHandlerTest, + FileHandlerTest, + LoggerTest, + LoggerAdapterTest, + NullHandlerTest, ) if __name__ == "__main__":