diff -r 91fbe0fff882 Lib/test/test_faulthandler.py --- a/Lib/test/test_faulthandler.py Mon Mar 16 11:52:32 2015 +0100 +++ b/Lib/test/test_faulthandler.py Thu Mar 19 01:00:30 2015 +0800 @@ -33,16 +33,13 @@ else: return '^' + regex + '$' -@contextmanager -def temporary_filename(): - filename = tempfile.mktemp() - try: - yield filename - finally: - support.unlink(filename) +def temporary_filename(use_filename, use_fd): + if use_filename or use_fd: + return tempfile.mktemp() + return None class FaultHandlerTests(unittest.TestCase): - def get_output(self, code, filename=None, fd=None): + def get_output(self, code, filename=None): """ Run the specified code in Python (in a new child process) and read the output from the standard error or from a file (if filename is set). @@ -52,12 +49,16 @@ build, and replace "Current thread 0x00007f8d8fbd9700" by "Current thread XXX". """ + try: + return self._get_output(code, filename) + finally: + if filename: + support.unlink(filename) + + def _get_output(self, code, filename=None): code = dedent(code).strip() - pass_fds = [] - if fd is not None: - pass_fds.append(fd) with support.SuppressCrashReport(): - process = script_helper.spawn_python('-c', code, pass_fds=pass_fds) + process = script_helper.spawn_python('-c', code) stdout, stderr = process.communicate() exitcode = process.wait() output = support.strip_python_stderr(stdout) @@ -67,20 +68,14 @@ with open(filename, "rb") as fp: output = fp.read() output = output.decode('ascii', 'backslashreplace') - elif fd is not None: - self.assertEqual(output, '') - os.lseek(fd, os.SEEK_SET, 0) - with open(fd, "rb", closefd=False) as fp: - output = fp.read() - output = output.decode('ascii', 'backslashreplace') output = re.sub('Current thread 0x[0-9a-f]+', 'Current thread XXX', output) return output.splitlines(), exitcode def check_fatal_error(self, code, line_number, name_regex, - filename=None, all_threads=True, other_regex=None, - fd=None): + all_threads=True, other_regex=None, + use_filename=False): """ Check that the fault handler for fatal errors is enabled and check the traceback from the child process output. @@ -103,7 +98,9 @@ header=re.escape(header))).strip() if other_regex: regex += '|' + other_regex - output, exitcode = self.get_output(code, filename=filename, fd=fd) + filename = temporary_filename(use_filename, False) + code = code.format(filename=filename) + output, exitcode = self.get_output(code, filename=filename) output = '\n'.join(output) self.assertRegex(output, regex) self.assertNotEqual(exitcode, 0) @@ -210,31 +207,27 @@ 'Segmentation fault') def test_enable_file(self): - with temporary_filename() as filename: - self.check_fatal_error(""" - import faulthandler - output = open({filename}, 'wb') - faulthandler.enable(output) - faulthandler._sigsegv() - """.format(filename=repr(filename)), - 4, - 'Segmentation fault', - filename=filename) + self.check_fatal_error(""" + import faulthandler + output = open({filename!r}, 'wb') + faulthandler.enable(output) + faulthandler._sigsegv() + """, + 4, + 'Segmentation fault', + use_filename=True) - @unittest.skipIf(sys.platform == "win32", - "subprocess doesn't support pass_fds on Windows") def test_enable_fd(self): - with tempfile.TemporaryFile('wb+') as fp: - fd = fp.fileno() - self.check_fatal_error(""" - import faulthandler - import sys - faulthandler.enable(%s) - faulthandler._sigsegv() - """ % fd, - 4, - 'Segmentation fault', - fd=fd) + self.check_fatal_error(""" + import faulthandler + import os + fd = os.open({filename!r}, os.O_WRONLY | os.O_CREAT) + faulthandler.enable(fd) + faulthandler._sigsegv() + """, + 5, + 'Segmentation fault', + use_filename=True) def test_enable_single_thread(self): self.check_fatal_error(""" @@ -322,49 +315,50 @@ output = subprocess.check_output(args, env=env) self.assertEqual(output.rstrip(), b"True") - def check_dump_traceback(self, *, filename=None, fd=None): + def check_dump_traceback(self, *, use_filename=False, use_fd=False): """ Explicitly call dump_traceback() function and check its output. Raise an error if the output doesn't match the expected format. """ code = """ import faulthandler + import os + + filename = {filename!r} + use_fd = {use_fd} - filename = {filename!r} - fd = {fd} + if use_fd: + assert filename + file = os.open(filename, os.O_WRONLY | os.O_CREAT) + elif filename: + file = open(filename, 'wb') + else: + file = None def funcB(): - if filename: - with open(filename, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=False) - elif fd is not None: - faulthandler.dump_traceback(fd, - all_threads=False) - else: - faulthandler.dump_traceback(all_threads=False) + faulthandler.dump_traceback(file, all_threads=False) def funcA(): funcB() funcA() + if use_fd: + os.close(file) + elif filename: + file.close() """ + filename = temporary_filename(use_filename, use_fd) code = code.format( filename=filename, - fd=fd, + use_fd=use_fd, ) - if filename: - lineno = 9 - elif fd is not None: - lineno = 12 - else: - lineno = 14 expected = [ 'Stack (most recent call first):', - ' File "", line %s in funcB' % lineno, - ' File "", line 17 in funcA', - ' File "", line 19 in ' + ' File "", line 16 in funcB', + ' File "", line 19 in funcA', + ' File "", line 21 in ' ] - trace, exitcode = self.get_output(code, filename, fd) + trace, exitcode = self.get_output(code, filename) self.assertEqual(trace, expected) self.assertEqual(exitcode, 0) @@ -372,14 +366,10 @@ self.check_dump_traceback() def test_dump_traceback_file(self): - with temporary_filename() as filename: - self.check_dump_traceback(filename=filename) + self.check_dump_traceback(use_filename=True) - @unittest.skipIf(sys.platform == "win32", - "subprocess doesn't support pass_fds on Windows") def test_dump_traceback_fd(self): - with tempfile.TemporaryFile('wb+') as fp: - self.check_dump_traceback(fd=fp.fileno()) + self.check_dump_traceback(use_fd=True) def test_truncate(self): maxlen = 500 @@ -406,7 +396,7 @@ self.assertEqual(exitcode, 0) @unittest.skipIf(not HAVE_THREADS, 'need threads') - def check_dump_traceback_threads(self, filename): + def check_dump_traceback_threads(self, use_filename=False, use_fd=False): """ Call explicitly dump_traceback(all_threads=True) and check the output. Raise an error if the output doesn't match the expected format. @@ -415,13 +405,21 @@ import faulthandler from threading import Thread, Event import time + import os + + filename = {filename!r} + use_fd = {use_fd} + + if use_fd: + assert filename + file = os.open(filename, os.O_WRONLY | os.O_CREAT) + elif filename: + file = open(filename, 'wb') + else: + file = None def dump(): - if {filename}: - with open({filename}, "wb") as fp: - faulthandler.dump_traceback(fp, all_threads=True) - else: - faulthandler.dump_traceback(all_threads=True) + faulthandler.dump_traceback(file, all_threads=True) class Waiter(Thread): # avoid blocking if the main thread raises an exception. @@ -442,40 +440,43 @@ dump() waiter.stop.set() waiter.join() + if use_fd: + os.close(file) + elif filename: + file.close() """ - code = code.format(filename=repr(filename)) + filename = temporary_filename(use_filename, use_fd) + code = code.format(filename=filename, use_fd=use_fd) output, exitcode = self.get_output(code, filename) output = '\n'.join(output) - if filename: - lineno = 8 - else: - lineno = 10 regex = """ ^Thread 0x[0-9a-f]+ \(most recent call first\): (?: File ".*threading.py", line [0-9]+ in [_a-z]+ - ){{1,3}} File "", line 23 in run + ){1,3} File "", line 31 in run File ".*threading.py", line [0-9]+ in _bootstrap_inner File ".*threading.py", line [0-9]+ in _bootstrap Current thread XXX \(most recent call first\): - File "", line {lineno} in dump - File "", line 28 in $ + File "", line 18 in dump + File "", line 36 in $ """ - regex = dedent(regex.format(lineno=lineno)).strip() + regex = dedent(regex).strip() self.assertRegex(output, regex) self.assertEqual(exitcode, 0) def test_dump_traceback_threads(self): - self.check_dump_traceback_threads(None) + self.check_dump_traceback_threads() def test_dump_traceback_threads_file(self): - with temporary_filename() as filename: - self.check_dump_traceback_threads(filename) + self.check_dump_traceback_threads(use_filename=True) + + def test_dump_traceback_threads_fd(self): + self.check_dump_traceback_threads(use_fd=True) @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'), 'need faulthandler.dump_traceback_later()') def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1, - *, filename=None, fd=None): + *, use_filename=None, use_fd=None): """ Check how many times the traceback is written in timeout x 2.5 seconds, or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending @@ -487,14 +488,14 @@ code = """ import faulthandler import time - import sys + import os timeout = {timeout} repeat = {repeat} cancel = {cancel} loops = {loops} filename = {filename!r} - fd = {fd} + use_fd = {use_fd} def func(timeout, repeat, cancel, file, loops): for loop in range(loops): @@ -504,23 +505,27 @@ time.sleep(timeout * 5) faulthandler.cancel_dump_traceback_later() - if filename: - file = open(filename, "wb") - elif fd is not None: - file = sys.stderr.fileno() + if use_fd: + assert filename + file = os.open(filename, os.O_WRONLY | os.O_CREAT) + elif filename: + file = open(filename, 'wb') else: file = None func(timeout, repeat, cancel, file, loops) - if filename: + if use_fd: + os.close(file) + elif filename: file.close() """ + filename=temporary_filename(use_filename, use_fd) code = code.format( timeout=TIMEOUT, repeat=repeat, cancel=cancel, loops=loops, filename=filename, - fd=fd, + use_fd=use_fd, ) trace, exitcode = self.get_output(code, filename) trace = '\n'.join(trace) @@ -530,7 +535,7 @@ if repeat: count *= 2 header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str - regex = expected_traceback(17, 26, header, min_count=count) + regex = expected_traceback(17, 27, header, min_count=count) self.assertRegex(trace, regex) else: self.assertEqual(trace, '') @@ -546,22 +551,18 @@ self.check_dump_traceback_later(cancel=True) def test_dump_traceback_later_file(self): - with temporary_filename() as filename: - self.check_dump_traceback_later(filename=filename) + self.check_dump_traceback_later(use_filename=True) - @unittest.skipIf(sys.platform == "win32", - "subprocess doesn't support pass_fds on Windows") def test_dump_traceback_later_fd(self): - with tempfile.TemporaryFile('wb+') as fp: - self.check_dump_traceback_later(fd=fp.fileno()) + self.check_dump_traceback_later(use_fd=True) def test_dump_traceback_later_twice(self): self.check_dump_traceback_later(loops=2) @unittest.skipIf(not hasattr(faulthandler, "register"), "need faulthandler.register") - def check_register(self, filename=False, all_threads=False, - unregister=False, chain=False, fd=None): + def check_register(self, all_threads=False, unregister=False, chain=False, + use_filename=False, use_fd=False): """ Register a handler displaying the traceback on a user signal. Raise the signal and check the written traceback. @@ -582,7 +583,7 @@ unregister = {unregister} chain = {chain} filename = {filename!r} - fd = {fd} + use_fd = {use_fd} def func(signum): os.kill(os.getpid(), signum) @@ -591,10 +592,11 @@ handler.called = True handler.called = False - if filename: + if use_fd: + assert filename + file = os.open(filename, os.O_WRONLY | os.O_CREAT) + elif filename: file = open(filename, "wb") - elif fd is not None: - file = sys.stderr.fileno() else: file = None if chain: @@ -613,17 +615,20 @@ exitcode = 1 else: exitcode = 0 - if filename: + if use_fd: + os.close(file) + elif filename: file.close() sys.exit(exitcode) """ + filename=temporary_filename(use_filename, use_fd) code = code.format( all_threads=all_threads, signum=signum, unregister=unregister, chain=chain, filename=filename, - fd=fd, + use_fd=use_fd, ) trace, exitcode = self.get_output(code, filename) trace = '\n'.join(trace) @@ -632,7 +637,7 @@ regex = 'Current thread XXX \(most recent call first\):\n' else: regex = 'Stack \(most recent call first\):\n' - regex = expected_traceback(14, 32, regex) + regex = expected_traceback(14, 33, regex) self.assertRegex(trace, regex) else: self.assertEqual(trace, '') @@ -648,14 +653,10 @@ self.check_register(unregister=True) def test_register_file(self): - with temporary_filename() as filename: - self.check_register(filename=filename) + self.check_register(use_filename=True) - @unittest.skipIf(sys.platform == "win32", - "subprocess doesn't support pass_fds on Windows") def test_register_fd(self): - with tempfile.TemporaryFile('wb+') as fp: - self.check_register(fd=fp.fileno()) + self.check_register(use_fd=True) def test_register_threads(self): self.check_register(all_threads=True)