Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(8)

Unified Diff: Lib/test/test_signal.py

Issue 10639: reindent.py converts newlines to platform default
Patch Set: Created 8 years, 8 months ago
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View side-by-side diff with in-line comments
Download patch
« no previous file with comments | « Lib/test/test_shutil.py ('k') | Lib/test/test_smtplib.py » ('j') | no next file with comments »
Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
--- a/Lib/test/test_signal.py Tue Jul 26 09:37:46 2011 +0300
+++ b/Lib/test/test_signal.py Mon Jul 25 09:47:18 2011 -0400
@@ -224,120 +224,92 @@
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class WakeupSignalTests(unittest.TestCase):
- def check_wakeup(self, test_body, *signals):
- # use a subprocess to have only one thread
- code = """if 1:
- import fcntl
- import os
- import signal
- import struct
+ TIMEOUT_FULL = 10
+ TIMEOUT_HALF = 5
- signals = {!r}
+ def handler(self, signum, frame):
+ pass
- def handler(signum, frame):
- pass
-
- def check_signum(signals):
- data = os.read(read, len(signals)+1)
- raised = struct.unpack('%uB' % len(data), data)
- if raised != signals:
- raise Exception("%r != %r" % (raised, signals))
-
- {}
-
- signal.signal(signal.SIGALRM, handler)
- read, write = os.pipe()
- for fd in (read, write):
- flags = fcntl.fcntl(fd, fcntl.F_GETFL, 0)
- flags = flags | os.O_NONBLOCK
- fcntl.fcntl(fd, fcntl.F_SETFL, flags)
- signal.set_wakeup_fd(write)
-
- test()
- check_signum(signals)
-
- os.close(read)
- os.close(write)
- """.format(signals, test_body)
-
- assert_python_ok('-c', code)
+ def check_signum(self, *signals):
+ data = os.read(self.read, len(signals)+1)
+ raised = struct.unpack('%uB' % len(data), data)
+ # We don't care of the signal delivery order (it's not portable or
+ # reliable)
+ raised = set(raised)
+ signals = set(signals)
+ self.assertEqual(raised, signals)
def test_wakeup_fd_early(self):
- self.check_wakeup("""def test():
- import select
- import time
+ import select
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
-
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the sleep,
- # before select is called
- time.sleep(TIMEOUT_FULL)
- mid_time = time.time()
- dt = mid_time - before_time
- if dt >= TIMEOUT_HALF:
- raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
- select.select([read], [], [], TIMEOUT_FULL)
- after_time = time.time()
- dt = after_time - mid_time
- if dt >= TIMEOUT_HALF:
- raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
- """, signal.SIGALRM)
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the sleep,
+ # before select is called
+ time.sleep(self.TIMEOUT_FULL)
+ mid_time = time.time()
+ self.assertTrue(mid_time - before_time < self.TIMEOUT_HALF)
+ select.select([self.read], [], [], self.TIMEOUT_FULL)
+ after_time = time.time()
+ self.assertTrue(after_time - mid_time < self.TIMEOUT_HALF)
+ self.check_signum(signal.SIGALRM)
def test_wakeup_fd_during(self):
- self.check_wakeup("""def test():
- import select
- import time
+ import select
- TIMEOUT_FULL = 10
- TIMEOUT_HALF = 5
-
- signal.alarm(1)
- before_time = time.time()
- # We attempt to get a signal during the select call
- try:
- select.select([read], [], [], TIMEOUT_FULL)
- except select.error:
- pass
- else:
- raise Exception("select.error not raised")
- after_time = time.time()
- dt = after_time - before_time
- if dt >= TIMEOUT_HALF:
- raise Exception("%s >= %s" % (dt, TIMEOUT_HALF))
- """, signal.SIGALRM)
+ signal.alarm(1)
+ before_time = time.time()
+ # We attempt to get a signal during the select call
+ self.assertRaises(select.error, select.select,
+ [self.read], [], [], self.TIMEOUT_FULL)
+ after_time = time.time()
+ self.assertTrue(after_time - before_time < self.TIMEOUT_HALF)
+ self.check_signum(signal.SIGALRM)
def test_signum(self):
- self.check_wakeup("""def test():
- signal.signal(signal.SIGUSR1, handler)
- os.kill(os.getpid(), signal.SIGUSR1)
- os.kill(os.getpid(), signal.SIGALRM)
- """, signal.SIGUSR1, signal.SIGALRM)
+ old_handler = signal.signal(signal.SIGUSR1, self.handler)
+ self.addCleanup(signal.signal, signal.SIGUSR1, old_handler)
+ os.kill(os.getpid(), signal.SIGUSR1)
+ os.kill(os.getpid(), signal.SIGALRM)
+ self.check_signum(signal.SIGUSR1, signal.SIGALRM)
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
+ 'need signal.pthread_kill()')
def test_pending(self):
- signals = (signal.SIGUSR1, signal.SIGUSR2)
- # when signals are unblocked, pending signal ared delivered in the
- # reverse order of their number
- signals = tuple(sorted(signals, reverse=True))
+ signum1 = signal.SIGUSR1
+ signum2 = signal.SIGUSR2
+ tid = threading.current_thread().ident
- self.check_wakeup("""def test():
- signum1 = signal.SIGUSR1
- signum2 = signal.SIGUSR2
+ old_handler = signal.signal(signum1, self.handler)
+ self.addCleanup(signal.signal, signum1, old_handler)
+ old_handler = signal.signal(signum2, self.handler)
+ self.addCleanup(signal.signal, signum2, old_handler)
- signal.signal(signum1, handler)
- signal.signal(signum2, handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
+ signal.pthread_kill(tid, signum1)
+ signal.pthread_kill(tid, signum2)
+ # Unblocking the 2 signals calls the C signal handler twice
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
- signal.pthread_sigmask(signal.SIG_BLOCK, (signum1, signum2))
- os.kill(os.getpid(), signum1)
- os.kill(os.getpid(), signum2)
- # Unblocking the 2 signals calls the C signal handler twice
- signal.pthread_sigmask(signal.SIG_UNBLOCK, (signum1, signum2))
- """, *signals)
+ self.check_signum(signum1, signum2)
+ def setUp(self):
+ import fcntl
+
+ self.alrm = signal.signal(signal.SIGALRM, self.handler)
+ self.read, self.write = os.pipe()
+ flags = fcntl.fcntl(self.write, fcntl.F_GETFL, 0)
+ flags = flags | os.O_NONBLOCK
+ fcntl.fcntl(self.write, fcntl.F_SETFL, flags)
+ self.old_wakeup = signal.set_wakeup_fd(self.write)
+
+ def tearDown(self):
+ signal.set_wakeup_fd(self.old_wakeup)
+ os.close(self.read)
+ os.close(self.write)
+ signal.signal(signal.SIGALRM, self.alrm)
@unittest.skipIf(sys.platform == "win32", "Not valid on Windows")
class SiginterruptTest(unittest.TestCase):
@@ -365,9 +337,6 @@
if interrupt is not None:
signal.siginterrupt(signal.SIGALRM, interrupt)
- print("ready")
- sys.stdout.flush()
-
# run the test twice
for loop in range(2):
# send a SIGALRM in a second (during the read)
@@ -384,15 +353,11 @@
""" % (interrupt,)
with spawn_python('-c', code) as process:
try:
- # wait until the child process is loaded and has started
- first_line = process.stdout.readline()
-
- stdout, stderr = process.communicate(timeout=5.0)
+ stdout, stderr = process.communicate(timeout=3.0)
except subprocess.TimeoutExpired:
process.kill()
return False
else:
- stdout = first_line + stdout
exitcode = process.wait()
if exitcode not in (2, 3):
raise Exception("Child error (exit code %s): %s"
@@ -522,6 +487,60 @@
Test pthread_sigmask(), pthread_kill(), sigpending() and sigwait()
functions.
"""
+ def setUp(self):
+ self.has_pthread_kill = hasattr(signal, 'pthread_kill')
+
+ def handler(self, signum, frame):
+ 1/0
+
+ def read_sigmask(self):
+ return signal.pthread_sigmask(signal.SIG_BLOCK, [])
+
+ def can_test_blocked_signals(self, skip):
+ """
+ Check if a blocked signal can be raised to the main thread without
+ calling its signal handler. We need pthread_kill() or exactly one
+ thread (the main thread).
+
+ Return True if it's possible. Otherwise, return False and print a
+ warning if skip is False, or raise a SkipTest exception if skip is
+ True.
+ """
+ if self.has_pthread_kill:
+ return True
+
+ # The fault handler timeout thread masks all signals. If the main
+ # thread masks also SIGUSR1, all threads mask this signal. In this
+ # case, if we send SIGUSR1 to the process, the signal is pending in the
+ # main or the faulthandler timeout thread. Unblock SIGUSR1 in the main
+ # thread calls the signal handler only if the signal is pending for the
+ # main thread. Stop the faulthandler timeout thread to workaround this
+ # problem.
+ import faulthandler
+ faulthandler.cancel_dump_tracebacks_later()
+
+ # Issue #11998: The _tkinter module loads the Tcl library which
+ # creates a thread waiting events in select(). This thread receives
+ # signals blocked by all other threads. We cannot test blocked
+ # signals
+ if '_tkinter' in sys.modules:
+ message = ("_tkinter is loaded and pthread_kill() is missing, "
+ "cannot test blocked signals (issue #11998)")
+ if skip:
+ self.skipTest(message)
+ else:
+ print("WARNING: %s" % message)
+ return False
+ return True
+
+ def kill(self, signum):
+ if self.has_pthread_kill:
+ tid = threading.get_ident()
+ signal.pthread_kill(tid, signum)
+ else:
+ pid = os.getpid()
+ os.kill(pid, signum)
+
@unittest.skipUnless(hasattr(signal, 'sigpending'),
'need signal.sigpending()')
def test_sigpending_empty(self):
@@ -532,198 +551,75 @@
@unittest.skipUnless(hasattr(signal, 'sigpending'),
'need signal.sigpending()')
def test_sigpending(self):
- code = """if 1:
- import os
- import signal
+ self.can_test_blocked_signals(True)
- def handler(signum, frame):
- 1/0
+ signum = signal.SIGUSR1
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
- signum = signal.SIGUSR1
- signal.signal(signum, handler)
-
- signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- os.kill(os.getpid(), signum)
- pending = signal.sigpending()
- if pending != {signum}:
- raise Exception('%s != {%s}' % (pending, signum))
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- """
- assert_python_ok('-c', code)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
+ self.kill(signum)
+ self.assertEqual(signal.sigpending(), {signum})
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
@unittest.skipUnless(hasattr(signal, 'pthread_kill'),
'need signal.pthread_kill()')
def test_pthread_kill(self):
- code = """if 1:
- import signal
- import threading
- import sys
+ signum = signal.SIGUSR1
+ current = threading.get_ident()
- signum = signal.SIGUSR1
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
- def handler(signum, frame):
- 1/0
-
- signal.signal(signum, handler)
-
- if sys.platform == 'freebsd6':
- # Issue #12392 and #12469: send a signal to the main thread
- # doesn't work before the creation of the first thread on
- # FreeBSD 6
- def noop():
- pass
- thread = threading.Thread(target=noop)
- thread.start()
- thread.join()
-
- tid = threading.get_ident()
- try:
- signal.pthread_kill(tid, signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- """
- assert_python_ok('-c', code)
-
- @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
- 'need signal.pthread_sigmask()')
- def wait_helper(self, blocked, test):
- """
- test: body of the "def test(signum):" function.
- blocked: number of the blocked signal
- """
- code = '''if 1:
- import signal
- import sys
-
- def handler(signum, frame):
- 1/0
-
- %s
-
- blocked = %s
- signum = signal.SIGALRM
-
- # child: block and wait the signal
- try:
- signal.signal(signum, handler)
- signal.pthread_sigmask(signal.SIG_BLOCK, [blocked])
-
- # Do the tests
- test(signum)
-
- # The handler must not be called on unblock
- try:
- signal.pthread_sigmask(signal.SIG_UNBLOCK, [blocked])
- except ZeroDivisionError:
- print("the signal handler has been called",
- file=sys.stderr)
- sys.exit(1)
- except BaseException as err:
- print("error: {}".format(err), file=sys.stderr)
- sys.stderr.flush()
- sys.exit(1)
- ''' % (test.strip(), blocked)
-
- # sig*wait* must be called with the signal blocked: since the current
- # process might have several threads running, use a subprocess to have
- # a single thread.
- assert_python_ok('-c', code)
+ with self.assertRaises(ZeroDivisionError):
+ signal.pthread_kill(current, signum)
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
+ @unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
+ 'need signal.pthread_sigmask()')
+ @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork()')
def test_sigwait(self):
- self.wait_helper(signal.SIGALRM, '''
def test(signum):
signal.alarm(1)
received = signal.sigwait([signum])
if received != signum:
- raise Exception('received %s, not %s' % (received, signum))
- ''')
+ print("sigwait() received %s, not %s"
+ % (received, signum),
+ file=sys.stderr)
+ os._exit(1)
- @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
- 'need signal.sigwaitinfo()')
- def test_sigwaitinfo(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- signal.alarm(1)
- info = signal.sigwaitinfo([signum])
- if info.si_signo != signum:
- raise Exception("info.si_signo != %s" % signum)
- ''')
+ signum = signal.SIGALRM
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- signal.alarm(1)
- info = signal.sigtimedwait([signum], (10, 1000))
- if info.si_signo != signum:
- raise Exception('info.si_signo != %s' % signum)
- ''')
+ # sigwait must be called with the signal blocked: since the current
+ # process might have several threads running, we fork() a child process
+ # to have a single thread.
+ pid = os.fork()
+ if pid == 0:
+ # child: block and wait the signal
+ try:
+ signal.signal(signum, self.handler)
+ signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_poll(self):
- # check that polling with sigtimedwait works
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- import os
- os.kill(os.getpid(), signum)
- info = signal.sigtimedwait([signum], (0, 0))
- if info.si_signo != signum:
- raise Exception('info.si_signo != %s' % signum)
- ''')
+ # Do the tests
+ test(signum)
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_timeout(self):
- self.wait_helper(signal.SIGALRM, '''
- def test(signum):
- received = signal.sigtimedwait([signum], (1, 0))
- if received is not None:
- raise Exception("received=%r" % (received,))
- ''')
-
- @unittest.skipUnless(hasattr(signal, 'sigtimedwait'),
- 'need signal.sigtimedwait()')
- def test_sigtimedwait_negative_timeout(self):
- signum = signal.SIGALRM
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, -1))
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (0, -1))
- self.assertRaises(ValueError, signal.sigtimedwait, [signum], (-1, 0))
-
- @unittest.skipUnless(hasattr(signal, 'sigwaitinfo'),
- 'need signal.sigwaitinfo()')
- def test_sigwaitinfo_interrupted(self):
- self.wait_helper(signal.SIGUSR1, '''
- def test(signum):
- import errno
-
- hndl_called = True
- def alarm_handler(signum, frame):
- hndl_called = False
-
- signal.signal(signal.SIGALRM, alarm_handler)
- signal.alarm(1)
- try:
- signal.sigwaitinfo([signal.SIGUSR1])
- except OSError as e:
- if e.errno == errno.EINTR:
- if not hndl_called:
- raise Exception("SIGALRM handler not called")
- else:
- raise Exception("Expected EINTR to be raised by sigwaitinfo")
+ # The handler must not be called on unblock
+ try:
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ except ZeroDivisionError:
+ print("the signal handler has been called",
+ file=sys.stderr)
+ os._exit(1)
+ except BaseException as err:
+ print("error: {}".format(err), file=sys.stderr)
+ os._exit(1)
else:
- raise Exception("Expected EINTR to be raised by sigwaitinfo")
- ''')
+ os._exit(0)
+ else:
+ # parent: check that the child correcty received the signal
+ self.assertEqual(os.waitpid(pid, 0), (pid, 0))
@unittest.skipUnless(hasattr(signal, 'sigwait'),
'need signal.sigwait()')
@@ -771,98 +667,46 @@
@unittest.skipUnless(hasattr(signal, 'pthread_sigmask'),
'need signal.pthread_sigmask()')
def test_pthread_sigmask(self):
- code = """if 1:
- import signal
- import os; import threading
-
- def handler(signum, frame):
- 1/0
-
- def kill(signum):
- os.kill(os.getpid(), signum)
-
- def read_sigmask():
- return signal.pthread_sigmask(signal.SIG_BLOCK, [])
-
+ test_blocked_signals = self.can_test_blocked_signals(False)
signum = signal.SIGUSR1
# Install our signal handler
- old_handler = signal.signal(signum, handler)
+ old_handler = signal.signal(signum, self.handler)
+ self.addCleanup(signal.signal, signum, old_handler)
# Unblock SIGUSR1 (and copy the old mask) to test our signal handler
old_mask = signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- try:
- kill(signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
+ self.addCleanup(signal.pthread_sigmask, signal.SIG_SETMASK, old_mask)
+ with self.assertRaises(ZeroDivisionError):
+ self.kill(signum)
# Block and then raise SIGUSR1. The signal is blocked: the signal
# handler is not called, and the signal is now pending
signal.pthread_sigmask(signal.SIG_BLOCK, [signum])
- kill(signum)
+ if test_blocked_signals:
+ self.kill(signum)
# Check the new mask
- blocked = read_sigmask()
- if signum not in blocked:
- raise Exception("%s not in %s" % (signum, blocked))
- if old_mask ^ blocked != {signum}:
- raise Exception("%s ^ %s != {%s}" % (old_mask, blocked, signum))
+ blocked = self.read_sigmask()
+ self.assertIn(signum, blocked)
+ self.assertEqual(old_mask ^ blocked, {signum})
# Unblock SIGUSR1
- try:
- # unblock the pending signal calls immediatly the signal handler
+ if test_blocked_signals:
+ with self.assertRaises(ZeroDivisionError):
+ # unblock the pending signal calls immediatly the signal handler
+ signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
+ else:
signal.pthread_sigmask(signal.SIG_UNBLOCK, [signum])
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
- try:
- kill(signum)
- except ZeroDivisionError:
- pass
- else:
- raise Exception("ZeroDivisionError not raised")
+ with self.assertRaises(ZeroDivisionError):
+ self.kill(signum)
# Check the new mask
- unblocked = read_sigmask()
- if signum in unblocked:
- raise Exception("%s in %s" % (signum, unblocked))
- if blocked ^ unblocked != {signum}:
- raise Exception("%s ^ %s != {%s}" % (blocked, unblocked, signum))
- if old_mask != unblocked:
- raise Exception("%s != %s" % (old_mask, unblocked))
- """
- assert_python_ok('-c', code)
-
- @unittest.skipIf(sys.platform == 'freebsd6',
- "issue #12392: send a signal to the main thread doesn't work "
- "before the creation of the first thread on FreeBSD 6")
- @unittest.skipUnless(hasattr(signal, 'pthread_kill'),
- 'need signal.pthread_kill()')
- def test_pthread_kill_main_thread(self):
- # Test that a signal can be sent to the main thread with pthread_kill()
- # before any other thread has been created (see issue #12392).
- code = """if True:
- import threading
- import signal
- import sys
-
- def handler(signum, frame):
- sys.exit(3)
-
- signal.signal(signal.SIGUSR1, handler)
- signal.pthread_kill(threading.get_ident(), signal.SIGUSR1)
- sys.exit(2)
- """
-
- with spawn_python('-c', code) as process:
- stdout, stderr = process.communicate()
- exitcode = process.wait()
- if exitcode != 3:
- raise Exception("Child error (exit code %s): %s" %
- (exitcode, stdout))
+ unblocked = self.read_sigmask()
+ self.assertNotIn(signum, unblocked)
+ self.assertEqual(blocked ^ unblocked, {signum})
+ self.assertSequenceEqual(old_mask, unblocked)
+ # Finally, restore the previous signal handler and the signal mask
def test_main():
« no previous file with comments | « Lib/test/test_shutil.py ('k') | Lib/test/test_smtplib.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+