Index: Lib/test/test_subprocess.py =================================================================== --- Lib/test/test_subprocess.py (revision 71065) +++ Lib/test/test_subprocess.py (working copy) @@ -336,6 +336,25 @@ self.assertEqual(remove_stderr_debug_decorations(stderr), "pineapple") + def test_communicate_timeout(self): + p = subprocess.Popen([sys.executable, "-c", + 'import sys,os,time;' + 'sys.stderr.write("pineapple");' + 'time.sleep(2);' + 'sys.stdout.write(sys.stdin.read())'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + (stdout, stderr) = p.communicate("banana", timeout=1) + self.assertEqual(stdout, "") + self.assertEqual(remove_stderr_debug_decorations(stderr), + "pineapple") + # Under POSIX, assert that the process was killed. We have no + # way of knowing if it was killed under Windows. + if not mswindows: + p.wait() # Make sure it terminates fully... + self.assertEqual(p.returncode, -9) + # This test is Linux specific for simplicity to at least have # some coverage. It is not a platform specific bug. if os.path.isdir('/proc/%d/fd' % os.getpid()): @@ -510,6 +529,16 @@ self.assertEqual(p.wait(), 0) + def test_wait_timeout(self): + p = subprocess.Popen([sys.executable, + "-c", "import time; time.sleep(1)"]) + # It is possible that there is a race in this test, and it might + # fail under rare conditions. If wanted to do this test + # correctly, we could stub out the time module in subprocess.py. + self.assertEqual(p.wait(timeout=0.1), None) + self.assertEqual(p.wait(timeout=2), 0) + + def test_invalid_bufsize(self): # an invalid type of the bufsize argument should raise # TypeError. Index: Lib/subprocess.py =================================================================== --- Lib/subprocess.py (revision 71065) +++ Lib/subprocess.py (working copy) @@ -369,6 +369,7 @@ import traceback import gc import signal +import time # Exception classes used by this module. class CalledProcessError(Exception): @@ -449,14 +450,19 @@ def call(*popenargs, **kwargs): - """Run command with arguments. Wait for command to complete, then - return the returncode attribute. + """Run command with arguments. Wait for command to complete or + timeout, then return the returncode attribute. The arguments are the same as for the Popen constructor. Example: retcode = call(["ls", "-l"]) """ - return Popen(*popenargs, **kwargs).wait() + if 'timeout' in kwargs: + timeout = kwargs['timeout'] + del kwargs['timeout'] + else: + timeout = None + return Popen(*popenargs, **kwargs).wait(timeout=timeout) def check_call(*popenargs, **kwargs): @@ -681,7 +687,7 @@ _active.append(self) - def communicate(self, input=None): + def communicate(self, input=None, timeout=None): """Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached. Wait for process to terminate. The optional input argument should be a @@ -690,9 +696,16 @@ communicate() returns a tuple (stdout, stderr).""" - # Optimization: If we are only using one pipe, or no pipe at - # all, using select() or threads is unnecessary. - if [self.stdin, self.stdout, self.stderr].count(None) >= 2: + if timeout is not None: + endtime = time.time() + timeout + else: + endtime = None + + # Optimization: If we are not worried about timeouts and are + # only using one pipe, or no pipe at all, using select() or + # threads is unnecessary. + if (endtime is None and + [self.stdin, self.stdout, self.stderr].count(None) >= 2): stdout = None stderr = None if self.stdin: @@ -708,13 +721,50 @@ self.wait() return (stdout, stderr) - return self._communicate(input) + stdout, stderr = self._communicate(input, endtime) + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if self.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = self._translate_newlines(stdout) + if stderr: + stderr = self._translate_newlines(stderr) + + sts = self.wait(timeout=self._remaining_time(endtime)) + if sts is None: + # Kill the process if we're still waiting for it. + print 'attempting to kill...' + try: + self.kill() + except OSError, e: + # If the process finishes before we can kill it, don't + # worry. + print 'unable to kill' + + return (stdout, stderr) + + def poll(self): return self._internal_poll() + def _remaining_time(self, endtime): + """Convenience for _communicate when computing timeouts.""" + if endtime is None: + return None + else: + return endtime - time.time() + + if mswindows: # # Windows methods @@ -886,11 +936,15 @@ return self.returncode - def wait(self): + def wait(self, timeout=None): """Wait for child process to terminate. Returns returncode attribute.""" + if timeout is None: + timeout = INFINITE + else: + timeout = int(timeout * 1000) if self.returncode is None: - obj = WaitForSingleObject(self._handle, INFINITE) + obj = WaitForSingleObject(self._handle, timeout) self.returncode = GetExitCodeProcess(self._handle) return self.returncode @@ -899,7 +953,7 @@ buffer.append(fh.read()) - def _communicate(self, input): + def _communicate(self, input, endtime): stdout = None # Return stderr = None # Return @@ -922,27 +976,23 @@ self.stdin.close() if self.stdout: - stdout_thread.join() + stdout_thread.join(self._remaining_time(endtime)) + self.stdout.close() + # Return no output if the timeout expired. + if stdout_thread.isAlive(): + stdout = [] if self.stderr: - stderr_thread.join() + stderr_thread.join(self._remaining_time(endtime)) + self.stderr.close() + # Return no output if the timeout expired. + if stderr_thread.isAlive(): + stderr = [] + # TODO: Somebody needs to research what happens to those + # threads if they are still running. Also, what happens if + # you close a file descriptor on Windows in one thread? + # Will it interrupt the other, or does the other keep its + # own handle? - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = stdout[0] - if stderr is not None: - stderr = stderr[0] - - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = self._translate_newlines(stdout) - if stderr: - stderr = self._translate_newlines(stderr) - - self.wait() return (stdout, stderr) def send_signal(self, sig): @@ -1164,16 +1214,35 @@ return self.returncode - def wait(self): + def wait(self, timeout=None): """Wait for child process to terminate. Returns returncode attribute.""" - if self.returncode is None: + if timeout is not None: + # Enter a busy loop if we have a timeout. This busy + # loop was cribbed from Lib/threading.py in + # Thread.wait() at r71065. + endtime = time.time() + timeout + delay = 0.0005 # 500 us -> initial delay of 1 ms + while True: + (pid, sts) = os.waitpid(self.pid, os.WNOHANG) + assert pid == self.pid or pid == 0 + if pid == self.pid: + self._handle_exitstatus(sts) + break + remaining = endtime - time.time() + if remaining <= 0: + # Alternatively, would it be better to throw an + # exception? + break + delay = min(delay * 2, remaining, .05) + time.sleep(delay) + elif self.returncode is None: pid, sts = os.waitpid(self.pid, 0) self._handle_exitstatus(sts) return self.returncode - def _communicate(self, input): + def _communicate(self, input, endtime): read_set = [] write_set = [] stdout = None # Return @@ -1196,13 +1265,28 @@ input_offset = 0 while read_set or write_set: + remaining = self._remaining_time(endtime) try: - rlist, wlist, xlist = select.select(read_set, write_set, []) + rlist, wlist, xlist = select.select(read_set, + write_set, [], + remaining) except select.error, e: if e.args[0] == errno.EINTR: continue raise + if not (rlist or wlist or xlist): + # According to the docs, returning three empty lists + # indicates that the timeout expired. Close the + # file handles and return what we have. + if self.stdin: + self.stdin.close() + if self.stdout: + self.stdout.close() + if self.stderr: + self.stderr.close() + return (stdout, stderr) + if self.stdin in wlist: # When select has indicated that the file is writable, # we can write up to PIPE_BUF bytes without risk @@ -1228,23 +1312,6 @@ read_set.remove(self.stderr) stderr.append(data) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) - - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = self._translate_newlines(stdout) - if stderr: - stderr = self._translate_newlines(stderr) - - self.wait() return (stdout, stderr) def send_signal(self, sig):