Index: Python/thread_pthread.h =================================================================== --- Python/thread_pthread.h (revision 82754) +++ Python/thread_pthread.h (working copy) @@ -315,16 +315,17 @@ return (status == -1) ? errno : status; } -int -PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds) +PyLockStatus +PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds, + int intr_flag) { - int success; + PyLockStatus success; sem_t *thelock = (sem_t *)lock; int status, error = 0; struct timespec ts; - dprintf(("PyThread_acquire_lock_timed(%p, %lld) called\n", - lock, microseconds)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n", + lock, microseconds, intr_flag)); if (microseconds > 0) MICROSECONDS_TO_TIMESPEC(microseconds, ts); @@ -335,33 +336,38 @@ status = fix_status(sem_trywait(thelock)); else status = fix_status(sem_wait(thelock)); - } while (status == EINTR); /* Retry if interrupted by a signal */ + /* Retry if interrupted by a signal, unless the caller wants to be + notified. */ + } while (!intr_flag && status == EINTR); - if (microseconds > 0) { - if (status != ETIMEDOUT) - CHECK_STATUS("sem_timedwait"); + /* Don't check the status if we're stopping because of an interrupt. */ + if (!(intr_flag && status == EINTR)) { + if (microseconds > 0) { + if (status != ETIMEDOUT) + CHECK_STATUS("sem_timedwait"); + } + else if (microseconds == 0) { + if (status != EAGAIN) + CHECK_STATUS("sem_trywait"); + } + else { + CHECK_STATUS("sem_wait"); + } } - else if (microseconds == 0) { - if (status != EAGAIN) - CHECK_STATUS("sem_trywait"); + + if (status == 0) { + success = PY_LOCK_ACQUIRED; + } else if (intr_flag && status == EINTR) { + success = PY_LOCK_INTR; + } else { + success = PY_LOCK_FAILURE; } - else { - CHECK_STATUS("sem_wait"); - } - success = (status == 0) ? 1 : 0; - - dprintf(("PyThread_acquire_lock_timed(%p, %lld) -> %d\n", - lock, microseconds, success)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) -> %d\n", + lock, microseconds, intr_flag, success)); return success; } -int -PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) -{ - return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0); -} - void PyThread_release_lock(PyThread_type_lock lock) { @@ -435,21 +441,25 @@ free((void *)thelock); } -int -PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds) +PyLockStatus +PyThread_acquire_lock_timed(PyThread_type_lock lock, PY_TIMEOUT_T microseconds, + int intr_flag) { - int success; + PyLockStatus success; pthread_lock *thelock = (pthread_lock *)lock; int status, error = 0; - dprintf(("PyThread_acquire_lock_timed(%p, %lld) called\n", - lock, microseconds)); + dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) called\n", + lock, microseconds, intr_flag)); status = pthread_mutex_lock( &thelock->mut ); CHECK_STATUS("pthread_mutex_lock[1]"); - success = thelock->locked == 0; - if (!success && microseconds != 0) { + if (thelock->locked == 0) { + success = PY_LOCK_ACQUIRED; + } else if (microseconds == 0) { + success = PY_LOCK_FAILURE; + } else { struct timespec ts; if (microseconds > 0) MICROSECONDS_TO_TIMESPEC(microseconds, ts); @@ -457,7 +467,8 @@ /* mut must be locked by me -- part of the condition * protocol */ - while (thelock->locked) { + success = PY_LOCK_FAILURE; + while (success == PY_LOCK_FAILURE) { if (microseconds > 0) { status = pthread_cond_timedwait( &thelock->lock_released, @@ -472,25 +483,30 @@ &thelock->mut); CHECK_STATUS("pthread_cond_wait"); } + + if (intr_flag && status == 0 && thelock->locked) { + /* We were woken up, but didn't get the lock. We probably received + * a signal. Return PY_LOCK_INTR to allow the caller to handle + * it and retry. */ + success = PY_LOCK_INTR; + break; + } else if (status == 0 && !thelock->locked) { + success = PY_LOCK_ACQUIRED; + } else { + success = PY_LOCK_FAILURE; + } } - success = (status == 0); } - if (success) thelock->locked = 1; + if (success == PY_LOCK_ACQUIRED) thelock->locked = 1; status = pthread_mutex_unlock( &thelock->mut ); CHECK_STATUS("pthread_mutex_unlock[1]"); - if (error) success = 0; - dprintf(("PyThread_acquire_lock_timed(%p, %lld) -> %d\n", - lock, microseconds, success)); + if (error) success = PY_LOCK_FAILURE; + dprintf(("PyThread_acquire_lock_timed(%p, %lld, %d) -> %d\n", + lock, microseconds, intr_flag, success)); return success; } -int -PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) -{ - return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0); -} - void PyThread_release_lock(PyThread_type_lock lock) { @@ -514,6 +530,12 @@ #endif /* USE_SEMAPHORES */ +int +PyThread_acquire_lock(PyThread_type_lock lock, int waitflag) +{ + return PyThread_acquire_lock_timed(lock, waitflag ? -1 : 0, /*intr_flag=*/0); +} + /* set the thread stack size. * Return 0 if size is valid, -1 if size is invalid, * -2 if setting stack size is not supported. Index: Python/thread_nt.h =================================================================== --- Python/thread_nt.h (revision 82754) +++ Python/thread_nt.h (working copy) @@ -238,10 +238,13 @@ * and 0 if the lock was not acquired. This means a 0 is returned * if the lock has already been acquired by this thread! */ -int -PyThread_acquire_lock_timed(PyThread_type_lock aLock, PY_TIMEOUT_T microseconds) +PyLockStatus +PyThread_acquire_lock_timed(PyThread_type_lock aLock, + PY_TIMEOUT_T microseconds, int intr_flag) { - int success ; + /* Fow now, intr_flag does nothing on Windows, and lock acquires are + * uninterruptible. */ + PyLockStatus success; PY_TIMEOUT_T milliseconds; if (microseconds >= 0) { @@ -258,7 +261,13 @@ dprintf(("%ld: PyThread_acquire_lock_timed(%p, %lld) called\n", PyThread_get_thread_ident(), aLock, microseconds)); - success = aLock && EnterNonRecursiveMutex((PNRMUTEX) aLock, (DWORD) milliseconds) == WAIT_OBJECT_0 ; + if (aLock && EnterNonRecursiveMutex((PNRMUTEX)aLock, + (DWORD)milliseconds) == WAIT_OBJECT_0) { + success = PY_LOCK_ACQUIRED; + } + else { + success = PY_LOCK_FAILURE; + } dprintf(("%ld: PyThread_acquire_lock(%p, %lld) -> %d\n", PyThread_get_thread_ident(), aLock, microseconds, success)); @@ -268,7 +277,7 @@ int PyThread_acquire_lock(PyThread_type_lock aLock, int waitflag) { - return PyThread_acquire_lock_timed(aLock, waitflag ? -1 : 0); + return PyThread_acquire_lock_timed(aLock, waitflag ? -1 : 0, 0); } void Index: Include/pythread.h =================================================================== --- Include/pythread.h (revision 82754) +++ Include/pythread.h (working copy) @@ -9,6 +9,14 @@ extern "C" { #endif +/* Return status codes for Python lock acquisition. Chosen for maximum + * backwards compatibility, ie failure -> 0, success -> 1. */ +typedef enum PyLockStatus { + PY_LOCK_FAILURE = 0, + PY_LOCK_ACQUIRED = 1, + PY_LOCK_INTR +} PyLockStatus; + PyAPI_FUNC(void) PyThread_init_thread(void); PyAPI_FUNC(long) PyThread_start_new_thread(void (*)(void *), void *); PyAPI_FUNC(void) PyThread_exit_thread(void); @@ -49,11 +57,18 @@ even when the lock can't be acquired. If microseconds > 0, the call waits up to the specified duration. If microseconds < 0, the call waits until success (or abnormal failure) - + microseconds must be less than PY_TIMEOUT_MAX. Behaviour otherwise is - undefined. */ -PyAPI_FUNC(int) PyThread_acquire_lock_timed(PyThread_type_lock, - PY_TIMEOUT_T microseconds); + undefined. + + If intr_flag is true and the acquire is interrupted by a signal, then the + call will return PY_LOCK_INTR. The caller may reattempt to acquire the + lock. +*/ +PyAPI_FUNC(PyLockStatus) PyThread_acquire_lock_timed(PyThread_type_lock, + PY_TIMEOUT_T microseconds, + int intr_flag); + PyAPI_FUNC(void) PyThread_release_lock(PyThread_type_lock); PyAPI_FUNC(size_t) PyThread_get_stacksize(void); Index: PC/_subprocess.c =================================================================== --- PC/_subprocess.c (revision 82754) +++ PC/_subprocess.c (working copy) @@ -681,6 +681,7 @@ defint(d, "SW_HIDE", SW_HIDE); defint(d, "INFINITE", INFINITE); defint(d, "WAIT_OBJECT_0", WAIT_OBJECT_0); + defint(d, "WAIT_TIMEOUT", WAIT_TIMEOUT); defint(d, "CREATE_NEW_CONSOLE", CREATE_NEW_CONSOLE); defint(d, "CREATE_NEW_PROCESS_GROUP", CREATE_NEW_PROCESS_GROUP); Index: Doc/whatsnew/3.2.rst =================================================================== --- Doc/whatsnew/3.2.rst (revision 82754) +++ Doc/whatsnew/3.2.rst (working copy) @@ -143,7 +143,13 @@ Similarly, :meth:`threading.Semaphore.acquire` also gains a *timeout* argument. (Contributed by Torsten Landschoff; :issue:`850728`.) +* Regular and recursive lock acquisitions can now be interrupted by signals on + platforms using pthreads. This means that Python programs that deadlock while + acquiring locks can be successfully killed by repeatedly sending SIGINT to the + process (ie, by pressing Ctl+C in most shells). + (Contributed by Reid Kleckner; :issue:`8844`.) + Optimizations ============= Index: Doc/library/subprocess.rst =================================================================== --- Doc/library/subprocess.rst (revision 82754) +++ Doc/library/subprocess.rst (working copy) @@ -211,15 +211,21 @@ This module also defines four shortcut functions: -.. function:: call(*popenargs, **kwargs) +.. function:: call(*popenargs, timeout=None, **kwargs) Run command with arguments. Wait for command to complete, then return the :attr:`returncode` attribute. - The arguments are the same as for the :class:`Popen` constructor. Example:: + The arguments are the same as for the :class:`Popen` constructor, with the + exception of the *timeout* argument, which is given to :meth:`Popen.wait`. + Example:: >>> retcode = subprocess.call(["ls", "-l"]) + If the timeout expires, the child process will be killed and then waited for + again. The :exc:`TimeoutExpired` exception will be re-raised after the child + process has terminated. + .. warning:: Like :meth:`Popen.wait`, this will deadlock when using @@ -227,34 +233,43 @@ generates enough output to a pipe such that it blocks waiting for the OS pipe buffer to accept more data. + .. versionchanged:: 3.2 + *timeout* was added. -.. function:: check_call(*popenargs, **kwargs) +.. function:: check_call(*popenargs, timeout=None, **kwargs) + Run command with arguments. Wait for command to complete. If the exit code was zero then return, otherwise raise :exc:`CalledProcessError`. The :exc:`CalledProcessError` object will have the return code in the :attr:`returncode` attribute. - The arguments are the same as for the :class:`Popen` constructor. Example:: + The arguments are the same as for the :func:`call` function. Example:: >>> subprocess.check_call(["ls", "-l"]) 0 + As in the :func:`call` function, if the timeout expires, the child process + will be killed and the wait retried. The :exc:`TimeoutExpired` exception + will be re-raised after the child process has terminated. + .. warning:: See the warning for :func:`call`. + .. versionchanged:: 3.2 + *timeout* was added. -.. function:: check_output(*popenargs, **kwargs) +.. function:: check_output(*popenargs, timeout=None, **kwargs) + Run command with arguments and return its output as a byte string. If the exit code was non-zero it raises a :exc:`CalledProcessError`. The :exc:`CalledProcessError` object will have the return code in the - :attr:`returncode` - attribute and output in the :attr:`output` attribute. + :attr:`returncode` attribute and output in the :attr:`output` attribute. - The arguments are the same as for the :class:`Popen` constructor. Example:: + The arguments are the same as for the :func:`call` function. Example:: >>> subprocess.check_output(["ls", "-l", "/dev/null"]) b'crw-rw-rw- 1 root root 1, 3 Oct 18 2007 /dev/null\n' @@ -267,9 +282,18 @@ ... stderr=subprocess.STDOUT) b'ls: non_existent_file: No such file or directory\n' + As in the :func:`call` function, if the timeout expires, the child process + will be killed and the wait retried. The :exc:`TimeoutExpired` exception + will be re-raised after the child process has terminated. The output from + the child process so far will be in the :attr:`output` attribute of the + exception. + .. versionadded:: 3.1 + .. versionchanged:: 3.2 + *timeout* was added. + .. function:: getstatusoutput(cmd) Return ``(status, output)`` of executing *cmd* in a shell. @@ -319,7 +343,11 @@ check_call() will raise :exc:`CalledProcessError`, if the called process returns a non-zero return code. +All of the functions and methods that accept a *timeout* parameter, such as +:func:`call` and :meth:`Popen.communicate` will raise :exc:`TimeoutExpired` if +the timeout expires before the process exits. + Security ^^^^^^^^ @@ -340,11 +368,15 @@ attribute. -.. method:: Popen.wait() +.. method:: Popen.wait(timeout=None) Wait for child process to terminate. Set and return :attr:`returncode` attribute. + If the process does not terminate after *timeout* seconds, raise a + :exc:`TimeoutExpired` exception. It is safe to catch this exception and + retry the wait. + .. warning:: This will deadlock when using ``stdout=PIPE`` and/or @@ -352,11 +384,14 @@ a pipe such that it blocks waiting for the OS pipe buffer to accept more data. Use :meth:`communicate` to avoid that. + .. versionchanged:: 3.2 + *timeout* was added. -.. method:: Popen.communicate(input=None) +.. method:: Popen.communicate(input=None, timeout=None) + Interact with process: Send data to stdin. Read data from stdout and stderr, - until end-of-file is reached. Wait for process to terminate. The optional + until end-of-file is reached. Wait for process to terminate. The optional *input* argument should be a byte string to be sent to the child process, or ``None``, if no data should be sent to the child. @@ -367,12 +402,30 @@ ``None`` in the result tuple, you need to give ``stdout=PIPE`` and/or ``stderr=PIPE`` too. + If the process does not terminate after *timeout* seconds, a + :exc:`TimeoutExpired` exception will be raised. Catching this exception and + retrying communication will not lose any output. + + The child process is not killed if the timeout expires, so in order to + cleanup properly a well-behaved application should kill the child process and + finish communication:: + + proc = subprocess.Popen(...) + try: + outs, errs = proc.communicate(timeout=15) + except TimeoutExpired: + proc.kill() + outs, errs = proc.communicate() + .. note:: The data read is buffered in memory, so do not use this method if the data size is large or unlimited. + .. versionchanged:: 3.2 + *timeout* was added. + .. method:: Popen.send_signal(signal) Sends the signal *signal* to the child. Index: Doc/library/threading.rst =================================================================== --- Doc/library/threading.rst (revision 82754) +++ Doc/library/threading.rst (working copy) @@ -386,6 +386,9 @@ .. versionchanged:: 3.2 The *timeout* parameter is new. + .. versionchanged:: 3.2 + Lock acquires can now be interrupted by signals on POSIX. + .. method:: Lock.release() Release a lock. Index: Lib/test/test_threadsignals.py =================================================================== --- Lib/test/test_threadsignals.py (revision 82754) +++ Lib/test/test_threadsignals.py (working copy) @@ -6,6 +6,7 @@ import sys from test.support import run_unittest, import_module thread = import_module('_thread') +import time if sys.platform[:3] in ('win', 'os2') or sys.platform=='riscos': raise unittest.SkipTest("Can't test signal on %s" % sys.platform) @@ -66,7 +67,77 @@ def spawnSignallingThread(self): thread.start_new_thread(send_signals, ()) + def alarm_interrupt(self, sig, frame): + raise KeyboardInterrupt + def test_lock_acquire_interruption(self): + # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck + # in a deadlock. + oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) + try: + lock = thread.allocate_lock() + lock.acquire() + signal.alarm(1) + self.assertRaises(KeyboardInterrupt, lock.acquire) + finally: + signal.signal(signal.SIGALRM, oldalrm) + + def test_rlock_acquire_interruption(self): + # Mimic receiving a SIGINT (KeyboardInterrupt) with SIGALRM while stuck + # in a deadlock. + oldalrm = signal.signal(signal.SIGALRM, self.alarm_interrupt) + try: + rlock = thread.RLock() + # For reentrant locks, the initial acquisition must be in another + # thread. + def other_thread(): + rlock.acquire() + thread.start_new_thread(other_thread, ()) + # Wait until we can't acquire it without blocking... + while rlock.acquire(blocking=False): + rlock.release() + time.sleep(0.01) + signal.alarm(1) + self.assertRaises(KeyboardInterrupt, rlock.acquire) + finally: + signal.signal(signal.SIGALRM, oldalrm) + + def acquire_retries_on_intr(self, lock): + self.sig_recvd = False + def my_handler(signal, frame): + self.sig_recvd = True + old_handler = signal.signal(signal.SIGUSR1, my_handler) + try: + def other_thread(): + # Acquire the lock in a non-main thread, so this test works for + # RLocks. + lock.acquire() + # Wait until the main thread is blocked in the lock acquire, and + # then wake it up with this. + time.sleep(0.1) + os.kill(process_pid, signal.SIGUSR1) + # Let the main thread take the interrupt, handle it, and retry + # the lock acquisition. Then we'll let it run. + time.sleep(0.1) + lock.release() + thread.start_new_thread(other_thread, ()) + # Wait until we can't acquire it without blocking... + while lock.acquire(blocking=False): + lock.release() + time.sleep(0.01) + result = lock.acquire() # Block while we receive a signal. + self.assertTrue(self.sig_recvd) + self.assertTrue(result) + finally: + signal.signal(signal.SIGUSR1, old_handler) + + def test_lock_acquire_retries_on_intr(self): + self.acquire_retries_on_intr(thread.allocate_lock()) + + def test_rlock_acquire_retries_on_intr(self): + self.acquire_retries_on_intr(thread.RLock()) + + def test_main(): global signal_blackboard Index: Lib/test/test_subprocess.py =================================================================== --- Lib/test/test_subprocess.py (revision 82754) +++ Lib/test/test_subprocess.py (working copy) @@ -65,6 +65,15 @@ "import sys; sys.exit(47)"]) self.assertEqual(rc, 47) + def test_call_timeout(self): + # call() function with timeout argument; we want to test that the child + # process gets killed when the timeout expires. If the child isn't + # killed, this call will deadlock since subprocess.call waits for the + # child. + self.assertRaises(subprocess.TimeoutExpired, subprocess.call, + [sys.executable, "-c", "while True: pass"], + timeout=0.1) + def test_check_call_zero(self): # check_call() function with zero return code rc = subprocess.check_call([sys.executable, "-c", @@ -107,6 +116,18 @@ self.fail("Expected ValueError when stdout arg supplied.") self.assertIn('stdout', c.exception.args[0]) + def test_check_output_timeout(self): + # check_output() function with timeout arg + with self.assertRaises(subprocess.TimeoutExpired) as c: + output = subprocess.check_output( + [sys.executable, "-c", + "import sys; sys.stdout.write('BDFL')\n" + "sys.stdout.flush()\n" + "while True: pass"], + timeout=0.5) + self.fail("Expected TimeoutExpired.") + self.assertEqual(c.exception.output, b'BDFL') + def test_call_kwargs(self): # call() function with keyword args newenv = os.environ.copy() @@ -343,6 +364,41 @@ self.assertEqual(stdout, b"banana") self.assertStderrEqual(stderr, b"pineapple") + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stderr.write("pineapple\\n");' + 'time.sleep(2);' + 'sys.stderr.write("pear\\n");' + 'sys.stdout.write(sys.stdin.read())'], + universal_newlines=True, + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, b"banana", + timeout=1) + # Make sure we can keep waiting for it, and that we get the whole output + # after it completes. + (stdout, stderr) = p.communicate() + self.assertEqual(stdout, "banana") + self.assertStderrEqual(stderr.encode(), b"pineapple\npear\n") + + def test_communicate_timeout_large_ouput(self): + # Test a expring timeout while the child is outputting lots of data. + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));' + 'time.sleep(0.2);' + 'sys.stdout.write("a" * (64 * 1024));'], + stdout=subprocess.PIPE) + self.assertRaises(subprocess.TimeoutExpired, p.communicate, timeout=0.4) + (stdout, _) = p.communicate() + self.assertEqual(len(stdout), 4 * 64 * 1024) + # This test is Linux specific for simplicity to at least have # some coverage. It is not a platform specific bug. @unittest.skipUnless(os.path.isdir('/proc/%d/fd' % os.getpid()), @@ -504,6 +560,13 @@ self.assertEqual(p.wait(), 0) + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", "import time; time.sleep(1)"]) + self.assertRaises(subprocess.TimeoutExpired, p.wait, timeout=0.1) + self.assertEqual(p.wait(timeout=2), 0) + + def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise # TypeError. @@ -761,7 +824,7 @@ # The process should not terminate prematurely self.assertIsNone(p.poll()) # Retry if the process do not receive the signal. - count, maxcount = 0, 3 + count, maxcount = 0, 5 while count < maxcount and p.poll() is None: getattr(p, method)(*args) time.sleep(0.1) Index: Lib/subprocess.py =================================================================== --- Lib/subprocess.py (revision 82754) +++ Lib/subprocess.py (working copy) @@ -333,12 +333,13 @@ import sys mswindows = (sys.platform == "win32") +import builtins +import gc import io import os +import signal +import time import traceback -import gc -import signal -import builtins # Exception classes used by this module. class CalledProcessError(Exception): @@ -355,6 +356,19 @@ return "Command '%s' returned non-zero exit status %d" % (self.cmd, self.returncode) +class TimeoutExpired(Exception): + """This exception is raised when the timeout expires while waiting for a + child process. + """ + def __init__(self, cmd, output=None): + self.cmd = cmd + self.output = output + + def __str__(self): + return ("Command '%s' timed out after %s seconds" % + (self.cmd, self.timeout)) + + if mswindows: import threading import msvcrt @@ -427,15 +441,21 @@ raise -def call(*popenargs, **kwargs): - """Run command with arguments. Wait for command to complete, then - return the returncode attribute. +def call(*popenargs, timeout=None, **kwargs): + """Run command with arguments. Wait for command to complete or + timeout, then return the returncode attribute. The arguments are the same as for the Popen constructor. Example: retcode = call(["ls", "-l"]) """ - return Popen(*popenargs, **kwargs).wait() + p = Popen(*popenargs, **kwargs) + try: + return p.wait(timeout=timeout) + except TimeoutExpired: + p.kill() + p.wait() + raise def check_call(*popenargs, **kwargs): @@ -444,7 +464,7 @@ CalledProcessError. The CalledProcessError object will have the return code in the returncode attribute. - The arguments are the same as for the Popen constructor. Example: + The arguments are the same as for the call function. Example: check_call(["ls", "-l"]) """ @@ -457,7 +477,7 @@ return 0 -def check_output(*popenargs, **kwargs): +def check_output(*popenargs, timeout=None, **kwargs): r"""Run command with arguments and return its output as a byte string. If the exit code was non-zero it raises a CalledProcessError. The @@ -480,13 +500,15 @@ if 'stdout' in kwargs: raise ValueError('stdout argument not allowed, it will be overridden.') process = Popen(*popenargs, stdout=PIPE, **kwargs) - output, unused_err = process.communicate() + try: + output, unused_err = process.communicate(timeout=timeout) + except TimeoutExpired: + process.kill() + output, unused_err = process.communicate() + raise TimeoutExpired(process.args, output=output) retcode = process.poll() if retcode: - cmd = kwargs.get("args") - if cmd is None: - cmd = popenargs[0] - raise CalledProcessError(retcode, cmd, output=output) + raise CalledProcessError(retcode, process.args, output=output) return output @@ -613,6 +635,8 @@ _cleanup() self._child_created = False + self._input = None + self._communication_started = False if bufsize is None: bufsize = 0 # Restore default if not isinstance(bufsize, int): @@ -635,6 +659,7 @@ raise ValueError("creationflags is only supported on Windows " "platforms") + self.args = args self.stdin = None self.stdout = None self.stderr = None @@ -709,7 +734,7 @@ _active.append(self) - def communicate(self, input=None): + def communicate(self, input=None, timeout=None): """Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached. Wait for process to terminate. The optional input argument should be a @@ -718,9 +743,19 @@ communicate() returns a tuple (stdout, stderr).""" - # Optimization: If we are only using one pipe, or no pipe at - # all, using select() or threads is unnecessary. - if [self.stdin, self.stdout, self.stderr].count(None) >= 2: + if self._communication_started and input: + raise ValueError("Cannot send input after starting communication") + + if timeout is not None: + endtime = time.time() + timeout + else: + endtime = None + + # Optimization: If we are not worried about timeouts, we haven't + # started communicating, and we have one or zero pipes, using select() + # or threads is unnecessary. + if (endtime is None and not self._communication_started and + [self.stdin, self.stdout, self.stderr].count(None) >= 2): stdout = None stderr = None if self.stdin: @@ -736,13 +771,50 @@ self.wait() return (stdout, stderr) - return self._communicate(input) + try: + stdout, stderr = self._communicate(input, endtime) + finally: + self._communication_started = True + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = b''.join(stdout) + if stderr is not None: + stderr = b''.join(stderr) + # Translate newlines, if requested. + # This also turns bytes into strings. + if self.universal_newlines: + if stdout is not None: + stdout = self._translate_newlines(stdout, self.stdout.encoding) + if stderr is not None: + stderr = self._translate_newlines(stderr, self.stderr.encoding) + + sts = self.wait(timeout=self._remaining_time(endtime)) + + return (stdout, stderr) + + def poll(self): return self._internal_poll() + def _remaining_time(self, endtime): + """Convenience for _communicate when computing timeouts.""" + if endtime is None: + return None + else: + return endtime - time.time() + + + def _check_timeout(self, endtime): + """Convenience for checking if a timeout has expired.""" + if endtime is None: + return + if time.time() > endtime: + raise TimeoutExpired(self.args) + + if mswindows: # # Windows methods @@ -924,12 +996,17 @@ return self.returncode - def wait(self): + def wait(self, timeout=None): """Wait for child process to terminate. Returns returncode attribute.""" + if timeout is None: + timeout = _subprocess.INFINITE + else: + timeout = int(timeout * 1000) if self.returncode is None: - _subprocess.WaitForSingleObject(self._handle, - _subprocess.INFINITE) + result = _subprocess.WaitForSingleObject(self._handle, timeout) + if result == _subprocess.WAIT_TIMEOUT: + raise TimeoutExpired(self.args) self.returncode = _subprocess.GetExitCodeProcess(self._handle) return self.returncode @@ -938,23 +1015,24 @@ buffer.append(fh.read()) - def _communicate(self, input): - stdout = None # Return - stderr = None # Return + def _communicate(self, input, endtime): + # Start reader threads feeding into a list hanging off of this + # object, unless they've already been started. + if self.stdout and not hasattr(self, "_stdout_buff"): + self._stdout_buff = [] + self.stdout_thread = \ + threading.Thread(target=self._readerthread, + args=(self.stdout, self._stdout_buff)) + self.stdout_thread.daemon = True + self.stdout_thread.start() + if self.stderr and not hasattr(self, "_stderr_buff"): + self._stderr_buff = [] + self.stderr_thread = \ + threading.Thread(target=self._readerthread, + args=(self.stderr, self._stderr_buff)) + self.stderr_thread.daemon = True + self.stderr_thread.start() - if self.stdout: - stdout = [] - stdout_thread = threading.Thread(target=self._readerthread, - args=(self.stdout, stdout)) - stdout_thread.daemon = True - stdout_thread.start() - if self.stderr: - stderr = [] - stderr_thread = threading.Thread(target=self._readerthread, - args=(self.stderr, stderr)) - stderr_thread.daemon = True - stderr_thread.start() - if self.stdin: if input is not None: self.stdin.write(input) @@ -965,13 +1043,29 @@ if self.stderr: stderr_thread.join() - # All data exchanged. Translate lists into strings. + # Wait for the reader threads, or time out. If we time out, the + # threads remain reading and the fds left open in case the user + # calls communicate again. if stdout is not None: - stdout = stdout[0] + self.stdout_thread.join(self._remaining_time(endtime)) + if self.stdout_thread.isAlive(): + raise TimeoutExpired(self.args) if stderr is not None: - stderr = stderr[0] + self.stderr_thread.join(self._remaining_time(endtime)) + if self.stderr_thread.isAlive(): + raise TimeoutExpired(self.args) - self.wait() + # Collect the output from and close both pipes, now that we know + # both have been read successfully. + stdout = None + stderr = None + if self.stdout: + stdout = self._stdout_buff + self.stdout.close() + if self.stderr: + stderr = self._stderr_buff + self.stderr.close() + return (stdout, stderr) def send_signal(self, sig): @@ -1292,17 +1386,37 @@ return self.returncode - def wait(self): + def wait(self, timeout=None, endtime=None): """Wait for child process to terminate. Returns returncode attribute.""" - if self.returncode is None: + if self.returncode is not None: + return self.returncode + if timeout is not None: + # Enter a busy loop if we have a timeout. This busy + # loop was cribbed from Lib/threading.py in + # Thread.wait() at r71065. + endtime = time.time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + (pid, sts) = _eintr_retry_call(os.waitpid, + self.pid, os.WNOHANG) + assert pid == self.pid or pid == 0 + if pid == self.pid: + self._handle_exitstatus(sts) + break + remaining = endtime - time.time() + if remaining <= 0: + raise TimeoutExpired(self.args) + delay = min(delay * 2, remaining, .05) + time.sleep(delay) + elif self.returncode is None: pid, sts = _eintr_retry_call(os.waitpid, self.pid, 0) self._handle_exitstatus(sts) return self.returncode - def _communicate(self, input): - if self.stdin: + def _communicate(self, input, endtime): + if self.stdin and not self._communication_started: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self.stdin.flush() @@ -1310,80 +1424,80 @@ self.stdin.close() if _has_poll: - stdout, stderr = self._communicate_with_poll(input) + stdout, stderr = self._communicate_with_poll(input, endtime) else: - stdout, stderr = self._communicate_with_select(input) + stdout, stderr = self._communicate_with_select(input, endtime) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = b''.join(stdout) - if stderr is not None: - stderr = b''.join(stderr) - - # Translate newlines, if requested. - # This also turns bytes into strings. - if self.universal_newlines: - if stdout is not None: - stdout = self._translate_newlines(stdout, - self.stdout.encoding) - if stderr is not None: - stderr = self._translate_newlines(stderr, - self.stderr.encoding) - - self.wait() + self.wait(timeout=self._remaining_time(endtime)) return (stdout, stderr) - def _communicate_with_poll(self, input): + def _communicate_with_poll(self, input, endtime): stdout = None # Return stderr = None # Return - fd2file = {} - fd2output = {} + if not self._communication_started: + self._fd2file = {} + poller = select.poll() def register_and_append(file_obj, eventmask): poller.register(file_obj.fileno(), eventmask) - fd2file[file_obj.fileno()] = file_obj + self._fd2file[file_obj.fileno()] = file_obj def close_unregister_and_remove(fd): poller.unregister(fd) - fd2file[fd].close() - fd2file.pop(fd) + self._fd2file[fd].close() + self._fd2file.pop(fd) if self.stdin and input: register_and_append(self.stdin, select.POLLOUT) + # Only create this mapping if we haven't already. + if not self._communication_started: + self._fd2output = {} + if self.stdout: + self._fd2output[self.stdout.fileno()] = [] + if self.stderr: + self._fd2output[self.stderr.fileno()] = [] + select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI if self.stdout: register_and_append(self.stdout, select_POLLIN_POLLPRI) - fd2output[self.stdout.fileno()] = stdout = [] + stdout = self._fd2output[self.stdout.fileno()] if self.stderr: register_and_append(self.stderr, select_POLLIN_POLLPRI) - fd2output[self.stderr.fileno()] = stderr = [] + stderr = self._fd2output[self.stderr.fileno()] - input_offset = 0 - while fd2file: + # Save the input here so that if we time out while communicating, + # we can continue sending input if we retry. + if not self._input: + self._input_offset = 0 + self._input = input + + while self._fd2file: try: - ready = poller.poll() + ready = poller.poll(self._remaining_time(endtime)) except select.error as e: if e.args[0] == errno.EINTR: continue raise + self._check_timeout(endtime) # XXX Rewrite these to use non-blocking I/O on the # file objects; they are no longer using C stdio! for fd, mode in ready: if mode & select.POLLOUT: - chunk = input[input_offset : input_offset + _PIPE_BUF] - input_offset += os.write(fd, chunk) - if input_offset >= len(input): + chunk = self._input[self._input_offset : + self._input_offset + _PIPE_BUF] + self._input_offset += os.write(fd, chunk) + if self._input_offset >= len(self._input): close_unregister_and_remove(fd) elif mode & select_POLLIN_POLLPRI: data = os.read(fd, 4096) if not data: close_unregister_and_remove(fd) - fd2output[fd].append(data) + self._fd2output[fd].append(data) else: # Ignore hang up or errors. close_unregister_and_remove(fd) @@ -1391,53 +1505,74 @@ return (stdout, stderr) - def _communicate_with_select(self, input): - read_set = [] - write_set = [] + def _communicate_with_select(self, input, endtime): + if not self._communication_started: + self._read_set = [] + self._write_set = [] + if self.stdin and input: + self._write_set.append(self.stdin) + if self.stdout: + self._read_set.append(self.stdout) + if self.stderr: + self._read_set.append(self.stderr) + + if not self._input: + self._input = input + self._input_offset = 0 + stdout = None # Return stderr = None # Return - if self.stdin and input: - write_set.append(self.stdin) if self.stdout: - read_set.append(self.stdout) - stdout = [] + if not self._communication_started: + self._stdout_buff = [] + stdout = self._stdout_buff if self.stderr: - read_set.append(self.stderr) - stderr = [] + if not self._communication_started: + self._stderr_buff = [] + stderr = self._stderr_buff - input_offset = 0 - while read_set or write_set: + while self._read_set or self._write_set: + remaining = self._remaining_time(endtime) try: - rlist, wlist, xlist = select.select(read_set, write_set, []) + rlist, wlist, xlist = select.select(self._read_set, + self._write_set, [], + remaining) except select.error as e: if e.args[0] == errno.EINTR: continue raise + self._check_timeout(endtime) + if not (rlist or wlist or xlist): + # According to the docs, returning three empty lists + # indicates that the timeout expired. + raise TimeoutExpired(self.args) + # XXX Rewrite these to use non-blocking I/O on the # file objects; they are no longer using C stdio! if self.stdin in wlist: - chunk = input[input_offset : input_offset + _PIPE_BUF] + chunk = self._input[self._input_offset : + self._input_offset + _PIPE_BUF] bytes_written = os.write(self.stdin.fileno(), chunk) - input_offset += bytes_written - if input_offset >= len(input): + self._input_offset += bytes_written + if self._input_offset >= len(self._input): self.stdin.close() - write_set.remove(self.stdin) + self._write_set.remove(self.stdin) if self.stdout in rlist: data = os.read(self.stdout.fileno(), 1024) if not data: self.stdout.close() - read_set.remove(self.stdout) + self._read_set.remove(self.stdout) stdout.append(data) if self.stderr in rlist: data = os.read(self.stderr.fileno(), 1024) if not data: self.stderr.close() - read_set.remove(self.stderr) + self._read_set.remove(self.stderr) stderr.append(data) return (stdout, stderr) Index: Modules/_threadmodule.c =================================================================== --- Modules/_threadmodule.c (revision 82754) +++ Modules/_threadmodule.c (working copy) @@ -46,7 +46,7 @@ int blocking = 1; double timeout = -1; PY_TIMEOUT_T microseconds; - int r; + PyLockStatus r; if (!PyArg_ParseTupleAndKeywords(args, kwds, "|id:acquire", kwlist, &blocking, &timeout)) @@ -76,11 +76,20 @@ microseconds = (PY_TIMEOUT_T) timeout; } - Py_BEGIN_ALLOW_THREADS - r = PyThread_acquire_lock_timed(self->lock_lock, microseconds); - Py_END_ALLOW_THREADS + do { + int intr_flag = (microseconds == 0); + Py_BEGIN_ALLOW_THREADS + r = PyThread_acquire_lock_timed(self->lock_lock, microseconds, intr_flag); + Py_END_ALLOW_THREADS - return PyBool_FromLong(r); + /* Run signal handlers if we were interrupted. Propagate exceptions from + * signal handlers, such as KeyboardInterrupt. */ + if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) { + return NULL; + } + } while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */ + + return PyBool_FromLong(r == PY_LOCK_ACQUIRED); } PyDoc_STRVAR(acquire_doc, @@ -92,7 +101,7 @@ the lock, and return None once the lock is acquired.\n\ With an argument, this will only block if the argument is true,\n\ and the return value reflects whether the lock is acquired.\n\ -The blocking operation is not interruptible."); +The blocking operation is interruptible."); static PyObject * lock_PyThread_release_lock(lockobject *self) @@ -217,7 +226,7 @@ double timeout = -1; PY_TIMEOUT_T microseconds; long tid; - int r = 1; + PyLockStatus r = PY_LOCK_ACQUIRED; if (!PyArg_ParseTupleAndKeywords(args, kwds, "|id:acquire", kwlist, &blocking, &timeout)) @@ -264,17 +273,27 @@ if (microseconds == 0) { Py_RETURN_FALSE; } - Py_BEGIN_ALLOW_THREADS - r = PyThread_acquire_lock_timed(self->rlock_lock, microseconds); - Py_END_ALLOW_THREADS + do { + int intr_flag = (microseconds == 0); + Py_BEGIN_ALLOW_THREADS + r = PyThread_acquire_lock_timed(self->rlock_lock, microseconds, + intr_flag); + Py_END_ALLOW_THREADS + + /* Run signal handlers if we were interrupted. Propagate exceptions + * from signal handlers, such as KeyboardInterrupt. */ + if (r == PY_LOCK_INTR && Py_MakePendingCalls() < 0) { + return NULL; + } + } while (r == PY_LOCK_INTR); /* Retry if we were interrupted. */ } - if (r) { + if (r == PY_LOCK_ACQUIRED) { assert(self->rlock_count == 0); self->rlock_owner = tid; self->rlock_count = 1; } - return PyBool_FromLong(r); + return PyBool_FromLong(r == PY_LOCK_ACQUIRED); } PyDoc_STRVAR(rlock_acquire_doc, @@ -286,7 +305,7 @@ immediately. If `blocking` is True and another thread holds\n\ the lock, the method will wait for the lock to be released,\n\ take it and then return True.\n\ -(note: the blocking operation is not interruptible.)\n\ +(note: the blocking operation is interruptible.)\n\ \n\ In all other cases, the method will return True immediately.\n\ Precisely, if the current thread already holds the lock, its\n\