diff --git a/Lib/test/test_faulthandler.py b/Lib/test/test_faulthandler.py --- a/Lib/test/test_faulthandler.py +++ b/Lib/test/test_faulthandler.py @@ -10,6 +10,7 @@ from test.script_helper import assert_python_ok import tempfile import unittest +from textwrap import dedent try: import threading @@ -51,6 +52,7 @@ build, and replace "Current thread 0x00007f8d8fbd9700" by "Current thread XXX". """ + code = dedent(code).strip() with support.SuppressCrashReport(): process = script_helper.spawn_python('-c', code) stdout, stderr = process.communicate() @@ -80,15 +82,15 @@ else: header = 'Stack (most recent call first)' regex = """ -^Fatal Python error: {name} + ^Fatal Python error: {name} -{header}: - File "", line {lineno} in -""".strip() - regex = regex.format( + {header}: + File "", line {lineno} in + """ + regex = dedent(regex.format( lineno=line_number, name=name_regex, - header=re.escape(header)) + header=re.escape(header))).strip() if other_regex: regex += '|' + other_regex output, exitcode = self.get_output(code, filename) @@ -100,29 +102,29 @@ "the first page of memory is a mapped read-only on AIX") def test_read_null(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._read_null() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._read_null() + """, 3, # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion '(?:Segmentation fault|Bus error|Illegal instruction)') def test_sigsegv(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigsegv() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigsegv() + """, 3, 'Segmentation fault') def test_sigabrt(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigabrt() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigabrt() + """, 3, 'Aborted') @@ -130,10 +132,10 @@ "SIGFPE cannot be caught on Windows") def test_sigfpe(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._sigfpe() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._sigfpe() + """, 3, 'Floating point exception') @@ -141,13 +143,13 @@ @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS') def test_sigbus(self): self.check_fatal_error(""" -import _testcapi -import faulthandler -import signal + import _testcapi + import faulthandler + import signal -faulthandler.enable() -_testcapi.raise_signal(signal.SIGBUS) -""".strip(), + faulthandler.enable() + _testcapi.raise_signal(signal.SIGBUS) + """, 6, 'Bus error') @@ -155,21 +157,21 @@ @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL') def test_sigill(self): self.check_fatal_error(""" -import _testcapi -import faulthandler -import signal + import _testcapi + import faulthandler + import signal -faulthandler.enable() -_testcapi.raise_signal(signal.SIGILL) -""".strip(), + faulthandler.enable() + _testcapi.raise_signal(signal.SIGILL) + """, 6, 'Illegal instruction') def test_fatal_error(self): self.check_fatal_error(""" -import faulthandler -faulthandler._fatal_error(b'xyz') -""".strip(), + import faulthandler + faulthandler._fatal_error(b'xyz') + """, 2, 'xyz') @@ -180,52 +182,52 @@ 'need faulthandler._stack_overflow()') def test_stack_overflow(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._stack_overflow() -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._stack_overflow() + """, 3, '(?:Segmentation fault|Bus error)', other_regex='unable to raise a stack overflow') def test_gil_released(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable() -faulthandler._read_null(True) -""".strip(), + import faulthandler + faulthandler.enable() + faulthandler._read_null(True) + """, 3, '(?:Segmentation fault|Bus error|Illegal instruction)') def test_enable_file(self): with temporary_filename() as filename: self.check_fatal_error(""" -import faulthandler -output = open({filename}, 'wb') -faulthandler.enable(output) -faulthandler._sigsegv() -""".strip().format(filename=repr(filename)), + import faulthandler + output = open({filename}, 'wb') + faulthandler.enable(output) + faulthandler._sigsegv() + """.format(filename=repr(filename)), 4, 'Segmentation fault', filename=filename) def test_enable_single_thread(self): self.check_fatal_error(""" -import faulthandler -faulthandler.enable(all_threads=False) -faulthandler._sigsegv() -""".strip(), + import faulthandler + faulthandler.enable(all_threads=False) + faulthandler._sigsegv() + """, 3, 'Segmentation fault', all_threads=False) def test_disable(self): code = """ -import faulthandler -faulthandler.enable() -faulthandler.disable() -faulthandler._sigsegv() -""".strip() + import faulthandler + faulthandler.enable() + faulthandler.disable() + faulthandler._sigsegv() + """ not_expected = 'Fatal Python error' stderr, exitcode = self.get_output(code) stder = '\n'.join(stderr) @@ -293,20 +295,20 @@ Raise an error if the output doesn't match the expected format. """ code = """ -import faulthandler + import faulthandler -def funcB(): - if {has_filename}: - with open({filename}, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=False) - else: - faulthandler.dump_traceback(all_threads=False) + def funcB(): + if {has_filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=False) + else: + faulthandler.dump_traceback(all_threads=False) -def funcA(): - funcB() + def funcA(): + funcB() -funcA() -""".strip() + funcA() + """ code = code.format( filename=repr(filename), has_filename=bool(filename), @@ -337,13 +339,13 @@ func_name = 'x' * (maxlen + 50) truncated = 'x' * maxlen + '...' code = """ -import faulthandler + import faulthandler -def {func_name}(): - faulthandler.dump_traceback(all_threads=False) + def {func_name}(): + faulthandler.dump_traceback(all_threads=False) -{func_name}() -""".strip() + {func_name}() + """ code = code.format( func_name=func_name, ) @@ -363,37 +365,37 @@ Raise an error if the output doesn't match the expected format. """ code = """ -import faulthandler -from threading import Thread, Event -import time + import faulthandler + from threading import Thread, Event + import time -def dump(): - if {filename}: - with open({filename}, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=True) - else: - faulthandler.dump_traceback(all_threads=True) + def dump(): + if {filename}: + with open({filename}, "wb") as fp: + faulthandler.dump_traceback(fp, all_threads=True) + else: + faulthandler.dump_traceback(all_threads=True) -class Waiter(Thread): - # avoid blocking if the main thread raises an exception. - daemon = True + class Waiter(Thread): + # avoid blocking if the main thread raises an exception. + daemon = True - def __init__(self): - Thread.__init__(self) - self.running = Event() - self.stop = Event() + def __init__(self): + Thread.__init__(self) + self.running = Event() + self.stop = Event() - def run(self): - self.running.set() - self.stop.wait() + def run(self): + self.running.set() + self.stop.wait() -waiter = Waiter() -waiter.start() -waiter.running.wait() -dump() -waiter.stop.set() -waiter.join() -""".strip() + waiter = Waiter() + waiter.start() + waiter.running.wait() + dump() + waiter.stop.set() + waiter.join() + """ code = code.format(filename=repr(filename)) output, exitcode = self.get_output(code, filename) output = '\n'.join(output) @@ -402,17 +404,17 @@ else: lineno = 10 regex = """ -^Thread 0x[0-9a-f]+ \(most recent call first\): -(?: File ".*threading.py", line [0-9]+ in [_a-z]+ -){{1,3}} File "", line 23 in run - File ".*threading.py", line [0-9]+ in _bootstrap_inner - File ".*threading.py", line [0-9]+ in _bootstrap + ^Thread 0x[0-9a-f]+ \(most recent call first\): + (?: File ".*threading.py", line [0-9]+ in [_a-z]+ + ){{1,3}} File "", line 23 in run + File ".*threading.py", line [0-9]+ in _bootstrap_inner + File ".*threading.py", line [0-9]+ in _bootstrap -Current thread XXX \(most recent call first\): - File "", line {lineno} in dump - File "", line 28 in $ -""".strip() - regex = regex.format(lineno=lineno) + Current thread XXX \(most recent call first\): + File "", line {lineno} in dump + File "", line 28 in $ + """ + regex = dedent(regex.format(lineno=lineno)).strip() self.assertRegex(output, regex) self.assertEqual(exitcode, 0) @@ -433,29 +435,29 @@ """ timeout_str = str(datetime.timedelta(seconds=TIMEOUT)) code = """ -import faulthandler -import time + import faulthandler + import time -def func(timeout, repeat, cancel, file, loops): - for loop in range(loops): - faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) - if cancel: - faulthandler.cancel_dump_traceback_later() - time.sleep(timeout * 5) - faulthandler.cancel_dump_traceback_later() + def func(timeout, repeat, cancel, file, loops): + for loop in range(loops): + faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file) + if cancel: + faulthandler.cancel_dump_traceback_later() + time.sleep(timeout * 5) + faulthandler.cancel_dump_traceback_later() -timeout = {timeout} -repeat = {repeat} -cancel = {cancel} -loops = {loops} -if {has_filename}: - file = open({filename}, "wb") -else: - file = None -func(timeout, repeat, cancel, file, loops) -if file is not None: - file.close() -""".strip() + timeout = {timeout} + repeat = {repeat} + cancel = {cancel} + loops = {loops} + if {has_filename}: + file = open({filename}, "wb") + else: + file = None + func(timeout, repeat, cancel, file, loops) + if file is not None: + file.close() + """ code = code.format( timeout=TIMEOUT, repeat=repeat, @@ -522,45 +524,45 @@ """ signum = signal.SIGUSR1 code = """ -import faulthandler -import os -import signal -import sys + import faulthandler + import os + import signal + import sys -def func(signum): - os.kill(os.getpid(), signum) + def func(signum): + os.kill(os.getpid(), signum) -def handler(signum, frame): - handler.called = True -handler.called = False + def handler(signum, frame): + handler.called = True + handler.called = False -exitcode = 0 -signum = {signum} -unregister = {unregister} -chain = {chain} + exitcode = 0 + signum = {signum} + unregister = {unregister} + chain = {chain} -if {has_filename}: - file = open({filename}, "wb") -else: - file = None -if chain: - signal.signal(signum, handler) -faulthandler.register(signum, file=file, - all_threads={all_threads}, chain={chain}) -if unregister: - faulthandler.unregister(signum) -func(signum) -if chain and not handler.called: - if file is not None: - output = file - else: - output = sys.stderr - print("Error: signal handler not called!", file=output) - exitcode = 1 -if file is not None: - file.close() -sys.exit(exitcode) -""".strip() + if {has_filename}: + file = open({filename}, "wb") + else: + file = None + if chain: + signal.signal(signum, handler) + faulthandler.register(signum, file=file, + all_threads={all_threads}, chain={chain}) + if unregister: + faulthandler.unregister(signum) + func(signum) + if chain and not handler.called: + if file is not None: + output = file + else: + output = sys.stderr + print("Error: signal handler not called!", file=output) + exitcode = 1 + if file is not None: + file.close() + sys.exit(exitcode) + """ code = code.format( filename=repr(filename), has_filename=bool(filename),