Index: Lib/subprocess.py =================================================================== --- Lib/subprocess.py (revision 73814) +++ Lib/subprocess.py (working copy) @@ -413,6 +413,7 @@ error = IOError else: import select + _has_poll = hasattr(select, 'poll') import errno import fcntl import pickle @@ -425,12 +426,10 @@ except: MAXFD = 256 -# True/False does not exist on 2.2.0 -#try: -# False -#except NameError: -# False = 0 -# True = 1 +# When select or poll has indicated that the file is writable, +# we can write up to _PIPE_BUF bytes without risk of blocking. +# POSIX defines PIPE_BUF as >= 512. +_PIPE_BUF = getattr(select, 'PIPE_BUF', 512) _active = [] @@ -1191,20 +1190,101 @@ def _communicate(self, input): - read_set = [] - write_set = [] - stdout = None # Return - stderr = None # Return - if self.stdin: # Flush stdio buffer. This might block, if the user has # been writing to .stdin in an uncontrolled fashion. self.stdin.flush() - if input: - write_set.append(self.stdin) - else: + if not input: self.stdin.close() + + if _has_poll: + stdout, stderr = self._communicate_with_poll(input) + else: + stdout, stderr = self._communicate_with_select(input) + + # All data exchanged. Translate lists into strings. + if stdout is not None: + stdout = ''.join(stdout) + if stderr is not None: + stderr = ''.join(stderr) + + # Translate newlines, if requested. We cannot let the file + # object do the translation: It is based on stdio, which is + # impossible to combine with select (unless forcing no + # buffering). + if self.universal_newlines and hasattr(file, 'newlines'): + if stdout: + stdout = self._translate_newlines(stdout) + if stderr: + stderr = self._translate_newlines(stderr) + + self.wait() + return (stdout, stderr) + + + def _communicate_with_poll(self, input): + stdout = None # Return + stderr = None # Return + fd2file = {} + fd2output = {} + + poller = select.poll() + def register_and_append(file_obj, eventmask): + poller.register(file_obj.fileno(), eventmask) + fd2file[file_obj.fileno()] = file_obj + + def close_unregister_and_remove(fd): + poller.unregister(fd) + fd2file[fd].close() + fd2file.pop(fd) + + if self.stdin and input: + register_and_append(self.stdin, select.POLLOUT) + + select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI if self.stdout: + register_and_append(self.stdout, select_POLLIN_POLLPRI) + fd2output[self.stdout.fileno()] = stdout = [] + if self.stderr: + register_and_append(self.stderr, select_POLLIN_POLLPRI) + fd2output[self.stderr.fileno()] = stderr = [] + + input_offset = 0 + while fd2file: + try: + ready = poller.poll() + except select.error, e: + if e.args[0] == errno.EINTR: + continue + raise + + for fd, mode in ready: + if mode & select.POLLOUT: + chunk = input[input_offset : input_offset + _PIPE_BUF] + input_offset += os.write(fd, chunk) + if input_offset >= len(input): + close_unregister_and_remove(fd) + elif mode & select_POLLIN_POLLPRI: + data = os.read(fd, 4096) + if not data: + close_unregister_and_remove(fd) + fd2output[fd].append(data) + else: + # Ignore hang up or errors. + close_unregister_and_remove(fd) + + return (stdout, stderr) + + + def _communicate_with_select(self, input): + read_set = [] + write_set = [] + stdout = None # Return + stderr = None # Return + + if self.stdin and input: + write_set.append(self.stdin) + if self.stdout: read_set.append(self.stdout) stdout = [] if self.stderr: @@ -1221,10 +1301,7 @@ raise if self.stdin in wlist: - # When select has indicated that the file is writable, - # we can write up to PIPE_BUF bytes without risk - # blocking. POSIX defines PIPE_BUF >= 512 - chunk = input[input_offset : input_offset + 512] + chunk = input[input_offset : input_offset + _PIPE_BUF] bytes_written = os.write(self.stdin.fileno(), chunk) input_offset += bytes_written if input_offset >= len(input): @@ -1245,25 +1322,9 @@ read_set.remove(self.stderr) stderr.append(data) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = ''.join(stdout) - if stderr is not None: - stderr = ''.join(stderr) + return (stdout, stderr) - # Translate newlines, if requested. We cannot let the file - # object do the translation: It is based on stdio, which is - # impossible to combine with select (unless forcing no - # buffering). - if self.universal_newlines and hasattr(file, 'newlines'): - if stdout: - stdout = self._translate_newlines(stdout) - if stderr: - stderr = self._translate_newlines(stderr) - self.wait() - return (stdout, stderr) - def send_signal(self, sig): """Send a signal to the process """ Index: Lib/test/test_subprocess.py =================================================================== --- Lib/test/test_subprocess.py (revision 73814) +++ Lib/test/test_subprocess.py (working copy) @@ -766,8 +766,24 @@ p.terminate() self.assertNotEqual(p.wait(), 0) + +unit_tests = [ProcessTestCase] + +if subprocess._has_poll: + class ProcessTestCaseNoPoll(ProcessTestCase): + def setUp(self): + subprocess._has_poll = False + ProcessTestCase.setUp(self) + + def tearDown(self): + subprocess._has_poll = True + ProcessTestCase.tearDown(self) + + unit_tests.append(ProcessTestCaseNoPoll) + + def test_main(): - test_support.run_unittest(ProcessTestCase) + test_support.run_unittest(*unit_tests) if hasattr(test_support, "reap_children"): test_support.reap_children()