diff -r b0ae96700301 Lib/subprocess.py --- a/Lib/subprocess.py Wed Oct 30 20:31:04 2013 +0100 +++ b/Lib/subprocess.py Fri Nov 01 11:37:41 2013 +0100 @@ -403,7 +403,7 @@ wShowWindow = 0 else: import select - _has_poll = hasattr(select, 'poll') + import selectors import _posixsubprocess # When select or poll has indicated that the file is writable, @@ -411,6 +411,14 @@ # POSIX defines PIPE_BUF as >= 512. _PIPE_BUF = getattr(select, 'PIPE_BUF', 512) + # poll/select have the advantage of not requiring any extra file + # descriptor, contrarily to epoll/kqueue (also, they require a single + # syscall). + if hasattr(selectors, 'PollSelector'): + _PopenSelector = selectors.PollSelector + else: + _PopenSelector = selectors.SelectSelector + __all__ = ["Popen", "PIPE", "STDOUT", "call", "check_call", "getstatusoutput", "getoutput", "check_output", "CalledProcessError", "DEVNULL"] @@ -1532,12 +1540,65 @@ if not input: self.stdin.close() - if _has_poll: - stdout, stderr = self._communicate_with_poll(input, endtime, - orig_timeout) - else: - stdout, stderr = self._communicate_with_select(input, endtime, - orig_timeout) + stdout = None + stderr = None + + # Only create this mapping if we haven't already. + if not self._communication_started: + self._fileobj2output = {} + if self.stdout: + self._fileobj2output[self.stdout] = [] + if self.stderr: + self._fileobj2output[self.stderr] = [] + + if self.stdout: + stdout = self._fileobj2output[self.stdout] + if self.stderr: + stderr = self._fileobj2output[self.stderr] + + self._save_input(input) + + with _PopenSelector() as selector: + if self.stdin and input: + selector.register(self.stdin, selectors.EVENT_WRITE) + if self.stdout: + selector.register(self.stdout, selectors.EVENT_READ) + if self.stderr: + selector.register(self.stderr, selectors.EVENT_READ) + + while selector.get_map(): + timeout = self._remaining_time(endtime) + if timeout is not None and timeout < 0: + raise TimeoutExpired(self.args, orig_timeout) + + ready = selector.select(timeout) + self._check_timeout(endtime, orig_timeout) + + # XXX Rewrite these to use non-blocking I/O on the file + # objects; they are no longer using C stdio! + + for key, events in ready: + if events & selectors.EVENT_WRITE: + chunk = self._input[self._input_offset : + self._input_offset + _PIPE_BUF] + try: + self._input_offset += os.write(key.fd, chunk) + except OSError as e: + if e.errno == errno.EPIPE: + selector.unregister(key.fileobj) + key.fileobj.close() + else: + raise + else: + if self._input_offset >= len(self._input): + selector.unregister(key.fileobj) + key.fileobj.close() + elif events & selectors.EVENT_READ: + data = os.read(key.fd, 4096) + if not data: + selector.unregister(key.fileobj) + key.fileobj.close() + self._fileobj2output[key.fileobj].append(data) self.wait(timeout=self._remaining_time(endtime)) @@ -1571,167 +1632,6 @@ self._input = self._input.encode(self.stdin.encoding) - def _communicate_with_poll(self, input, endtime, orig_timeout): - stdout = None # Return - stderr = None # Return - - if not self._communication_started: - self._fd2file = {} - - poller = select.poll() - def register_and_append(file_obj, eventmask): - poller.register(file_obj.fileno(), eventmask) - self._fd2file[file_obj.fileno()] = file_obj - - def close_unregister_and_remove(fd): - poller.unregister(fd) - self._fd2file[fd].close() - self._fd2file.pop(fd) - - if self.stdin and input: - register_and_append(self.stdin, select.POLLOUT) - - # Only create this mapping if we haven't already. - if not self._communication_started: - self._fd2output = {} - if self.stdout: - self._fd2output[self.stdout.fileno()] = [] - if self.stderr: - self._fd2output[self.stderr.fileno()] = [] - - select_POLLIN_POLLPRI = select.POLLIN | select.POLLPRI - if self.stdout: - register_and_append(self.stdout, select_POLLIN_POLLPRI) - stdout = self._fd2output[self.stdout.fileno()] - if self.stderr: - register_and_append(self.stderr, select_POLLIN_POLLPRI) - stderr = self._fd2output[self.stderr.fileno()] - - self._save_input(input) - - while self._fd2file: - timeout = self._remaining_time(endtime) - if timeout is not None and timeout < 0: - raise TimeoutExpired(self.args, orig_timeout) - try: - ready = poller.poll(timeout) - except OSError as e: - if e.args[0] == errno.EINTR: - continue - raise - self._check_timeout(endtime, orig_timeout) - - # XXX Rewrite these to use non-blocking I/O on the - # file objects; they are no longer using C stdio! - - for fd, mode in ready: - if mode & select.POLLOUT: - chunk = self._input[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - self._input_offset += os.write(fd, chunk) - except OSError as e: - if e.errno == errno.EPIPE: - close_unregister_and_remove(fd) - else: - raise - else: - if self._input_offset >= len(self._input): - close_unregister_and_remove(fd) - elif mode & select_POLLIN_POLLPRI: - data = os.read(fd, 4096) - if not data: - close_unregister_and_remove(fd) - self._fd2output[fd].append(data) - else: - # Ignore hang up or errors. - close_unregister_and_remove(fd) - - return (stdout, stderr) - - - def _communicate_with_select(self, input, endtime, orig_timeout): - if not self._communication_started: - self._read_set = [] - self._write_set = [] - if self.stdin and input: - self._write_set.append(self.stdin) - if self.stdout: - self._read_set.append(self.stdout) - if self.stderr: - self._read_set.append(self.stderr) - - self._save_input(input) - - stdout = None # Return - stderr = None # Return - - if self.stdout: - if not self._communication_started: - self._stdout_buff = [] - stdout = self._stdout_buff - if self.stderr: - if not self._communication_started: - self._stderr_buff = [] - stderr = self._stderr_buff - - while self._read_set or self._write_set: - timeout = self._remaining_time(endtime) - if timeout is not None and timeout < 0: - raise TimeoutExpired(self.args, orig_timeout) - try: - (rlist, wlist, xlist) = \ - select.select(self._read_set, self._write_set, [], - timeout) - except OSError as e: - if e.args[0] == errno.EINTR: - continue - raise - - # According to the docs, returning three empty lists indicates - # that the timeout expired. - if not (rlist or wlist or xlist): - raise TimeoutExpired(self.args, orig_timeout) - # We also check what time it is ourselves for good measure. - self._check_timeout(endtime, orig_timeout) - - # XXX Rewrite these to use non-blocking I/O on the - # file objects; they are no longer using C stdio! - - if self.stdin in wlist: - chunk = self._input[self._input_offset : - self._input_offset + _PIPE_BUF] - try: - bytes_written = os.write(self.stdin.fileno(), chunk) - except OSError as e: - if e.errno == errno.EPIPE: - self.stdin.close() - self._write_set.remove(self.stdin) - else: - raise - else: - self._input_offset += bytes_written - if self._input_offset >= len(self._input): - self.stdin.close() - self._write_set.remove(self.stdin) - - if self.stdout in rlist: - data = os.read(self.stdout.fileno(), 1024) - if not data: - self.stdout.close() - self._read_set.remove(self.stdout) - stdout.append(data) - - if self.stderr in rlist: - data = os.read(self.stderr.fileno(), 1024) - if not data: - self.stderr.close() - self._read_set.remove(self.stderr) - stderr.append(data) - - return (stdout, stderr) - - def send_signal(self, sig): """Send a signal to the process """ diff -r b0ae96700301 Lib/test/test_subprocess.py --- a/Lib/test/test_subprocess.py Wed Oct 30 20:31:04 2013 +0100 +++ b/Lib/test/test_subprocess.py Fri Nov 01 11:37:41 2013 +0100 @@ -11,6 +11,7 @@ import tempfile import time import re +import selectors import sysconfig import warnings import select @@ -2186,15 +2187,16 @@ os.rmdir(dir) -@unittest.skipUnless(getattr(subprocess, '_has_poll', False), - "poll system call not supported") +@unittest.skipUnless(hasattr(selectors, 'PollSelector'), + "Test needs selectors.PollSelector") class ProcessTestCaseNoPoll(ProcessTestCase): def setUp(self): - subprocess._has_poll = False + self.orig_selector = subprocess._PopenSelector + subprocess._PopenSelector = selectors.SelectSelector ProcessTestCase.setUp(self) def tearDown(self): - subprocess._has_poll = True + subprocess._PopenSelector = self.orig_selector ProcessTestCase.tearDown(self)