''' AsyncPopen is a subclass of subprocess.Popen which uses unbuffered non-blocking pipes. It has a select() method which can be used to wait till an attached pipe is ready for reading or writing. One can use it as follows: p = AsyncPopen(..., stdin=PIPE, stdout=PIPE, stderr=PIPE) for f in p.select([p.stdin, p.stdout]): if f is p.stdin: # we can write to p.stdin or close it elif f is p.stdout: # we can read from p.stdout elif f is p.stderr: # we can read from p.stderr else: raise RuntimeError('should not get here') Note that on Windows the attributes stdin, stdout, stderr are file-like objects which subclass io.RawIOBase but not io.FileIO. Additionally, closing stdin may raise BlockingIOError unless you first wait for stdin to be ready using the select() method. ''' import os import subprocess import sys from subprocess import PIPE __all__ = ['AsyncPopen', 'PIPE'] if sys.platform != 'win32': # # Unix # import select import fcntl def _make_nonblocking(fd): flags = fcntl.fcntl(fd, fcntl.F_GETFL) assert flags >= 0 flags = fcntl.fcntl(fd, fcntl.F_SETFL , flags | os.O_NONBLOCK) assert flags >= 0 class AsyncPopen(subprocess.Popen): def __init__(self, args, *, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=()): super().__init__(args, bufsize=0, stdin=stdin, stdout=stdout, stderr=stderr, executable=executable, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, startupinfo=startupinfo, creationflags=creationflags, restore_signals=restore_signals, start_new_session=start_new_session, pass_fds=pass_fds) for f in (self.stdin, self.stdout, self.stderr): if f is not None: _make_nonblocking(f) def select(self, file_objects, timeout=None): ''' Wait till one of the file objects in ``file_objects`` is ready. Only ``self.stdin``, ``self.stdout`` and ``self.stderr`` can be waited on. Returns list of ready file objects. ''' file_objects = set(file_objects) if not (file_objects <= {self.stdin, self.stdout, self.stderr}): raise ValueError('unexpected object in file_objects') if None in file_objects: raise ValueError('None in file_objects') if not file_objects: raise ValueError('nothing to wait for') if hasattr(select, 'poll'): return self._select_with_poll(file_objects, timeout) else: return self._select_with_poll(file_objects, timeout) def _select_with_select(self, file_objects, timeout=None): readers = [] writers = [] if self.stdin in file_objects: writers.append(self.stdin) for f in (self.stdout, self.stderr): if f in file_objects: readers.append(f) r, w, x = select.select(readers, writers, [], timeout) return r + w def _select_with_poll(self, file_objects, timeout=None): p = select.poll() d = {} if self.stdin in file_objects: p.register(self.stdin, select.POLLOUT) d[self.stdin.fileno()] = self.stdin for f in (self.stdout, self.stderr): if f in file_objects: p.register(f, select.POLLIN) d[f.fileno()] = f if timeout is not None: timeout = int(timeout * 1000 + 0.5) return [d[fd] for fd, flags in p.poll(timeout)] else: # # Windows # import io import itertools import msvcrt import tempfile import _winapi from multiprocessing.connection import _exhaustive_wait BUFSIZE = 8192 _mmap_counter=itertools.count() def _pipe(duplex=True, overlapped=(True, True)): address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' % (os.getpid(), next(_mmap_counter))) if duplex: openmode = _winapi.PIPE_ACCESS_DUPLEX access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE obsize, ibsize = BUFSIZE, BUFSIZE else: openmode = _winapi.PIPE_ACCESS_INBOUND access = _winapi.GENERIC_WRITE obsize, ibsize = 0, BUFSIZE openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE if overlapped[0]: openmode |= _winapi.FILE_FLAG_OVERLAPPED if overlapped[1]: flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED else: flags_and_attribs = 0 h1 = h2 = None try: h1 = _winapi.CreateNamedPipe( address, openmode, _winapi.PIPE_WAIT, 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) h2 = _winapi.CreateFile( address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, flags_and_attribs, _winapi.NULL) ov = _winapi.ConnectNamedPipe(h1, overlapped=True) ov.GetOverlappedResult(True) return h1, h2 except: if h1 is not None: _winapi.CloseHandle(h1) if h2 is not None: _winapi.CloseHandle(h2) raise class AsyncReader(io.RawIOBase): def __init__(self, handle): self._handle = handle @property def handle(self): return self._handle def readable(self): return True def close(self, *, CloseHandle=_winapi.CloseHandle): super().close() if self._handle is not None: CloseHandle(self._handle) self._handle = None def read(self, n): if self.closed: raise ValueError('file closed') try: ov, err = _winapi.ReadFile(self._handle, n, True) if err == _winapi.ERROR_IO_PENDING: ov.cancel() transferred, err = ov.GetOverlappedResult(True) if err == _winapi.ERROR_OPERATION_ABORTED: return None assert err == 0 return ov.getbuffer() except OSError as e: if e.winerror == _winapi.ERROR_BROKEN_PIPE: return b'' raise def readinto(self, buf): res = self.read(len(buf)) if res is None: return None buf[:len(res)] = res return len(res) class AsyncWriter(io.RawIOBase): def __init__(self, handle): self._handle = handle self._ov = None @property def handle(self): return self._handle def writable(self): return True def close(self, *, _CloseHandle=_winapi.CloseHandle): if self._pending(): raise BlockingIOError('use AsyncPopen.select() to ' 'wait till ready to close') super().close() if self._handle is not None: _CloseHandle(self._handle) self._handle = None def _pending(self): if self._ov is None: return False try: transferred, err = self._ov.GetOverlappedResult(False) except OSError: return True return err == 996 # _winapi.ERROR_IO_INCOMPLETE def write(self, buf): if self.closed: raise ValueError('file closed') if self._pending(): return None buf = bytes(buf[:BUFSIZE]) self._ov, err = _winapi.WriteFile(self._handle, buf, True) return len(buf) class AsyncPopen(subprocess.Popen): def __init__(self, args, *, executable=None, stdin=None, stdout=None, stderr=None, preexec_fn=None, close_fds=False, shell=False, cwd=None, env=None, startupinfo=None, creationflags=0, restore_signals=True, start_new_session=False, pass_fds=()): stdin_rfd = stdout_wfd = stderr_wfd = None stdin_wh = stdout_rh = stderr_rh = None if stdin == PIPE: stdin_rh, stdin_wh = _pipe(False, (False, True)) stdin_rfd = msvcrt.open_osfhandle(stdin_rh, os.O_RDONLY) if stdout == PIPE: stdout_rh, stdout_wh = _pipe(False, (True, False)) stdout_wfd = msvcrt.open_osfhandle(stdout_wh, 0) if stderr == PIPE: stderr_rh, stderr_wh = _pipe(False, (True, False)) stderr_wfd = msvcrt.open_osfhandle(stderr_wh, 0) try: super().__init__(args, stdin=stdin_rfd, stdout=stdout_wfd, stderr=stderr_wfd, executable=executable, preexec_fn=preexec_fn, close_fds=close_fds, shell=shell, cwd=cwd, env=env, startupinfo=startupinfo, creationflags=creationflags, restore_signals=restore_signals, start_new_session=start_new_session, pass_fds=pass_fds) finally: if stdin == PIPE: os.close(stdin_rfd) self.stdin = AsyncWriter(stdin_wh) if stdout == PIPE: os.close(stdout_wfd) self.stdout = AsyncReader(stdout_rh) if stderr == PIPE: os.close(stderr_wfd) self.stderr = AsyncReader(stderr_rh) def select(self, file_objects, timeout=None): ''' Wait till one of the file objects in ``file_objects`` is ready. Only ``self.stdin``, ``self.stdout`` and ``self.stderr`` can be waited on. Returns list of ready file objects. ''' ready = set() events = {} ovs_to_cancel = [] file_objects = set(file_objects) if not (file_objects <= {self.stdin, self.stdout, self.stderr}): raise ValueError('unexpected object in file_objects') if None in file_objects: raise ValueError('None in file_objects') if not file_objects: raise ValueError('nothing to wait for') try: if self.stdin in file_objects: if self.stdin._pending(): events[self.stdin._ov.event] = self.stdin else: ready.add(self.stdin) for f in (self.stdout, self.stderr): if f in file_objects: try: ov, err = _winapi.ReadFile(f._handle, 0, True) ovs_to_cancel.append(ov) except OSError: ready.add(f) else: events[ov.event] = f if events: if ready: ms = 0 elif timeout is None: ms = _winapi.INFINITE else: ms = int(timeout * 1000 + 0.5) ready.update(events[e] for e in _exhaustive_wait(events, ms)) finally: for ov in ovs_to_cancel: ov.cancel() for ov in ovs_to_cancel: try: ov.GetOverlappedResult(True) except OSError: pass result = [] for f in (self.stdin, self.stdout, self.stderr): if f and f in ready: result.append(f) return result if __name__ == '__main__': import collections # reimplementation of Popen.communicate() def communicate(p, input): buf = memoryview(input) collected = collections.defaultdict(list) registered = [f for f in (p.stdin, p.stdout, p.stderr) if f is not None] while registered: for f in p.select(registered): if f is p.stdin: if not buf: f.close() registered.remove(f) else: n = f.write(buf) if n is not None: buf = buf[n:] elif f in (p.stdout, p.stderr): s = f.read(8192) if s == b'': f.close() registered.remove(f) elif s is not None: collected[f].append(s) else: raise RuntimeError('should not get here') return (b''.join(collected[p.stdout]) if p.stdout else None, b''.join(collected[p.stderr]) if p.stderr else None) code = r''' import os os.write(2, b"starting\n") while True: s = os.read(0, 1024) if not s: break s = s.upper() while s: n = os.write(1, s) s = s[n:] os.write(2, b"exiting\n") ''' msg = b"x" * (1024**2 * 10) # 10MB if 0: p = subprocess.Popen([sys.executable, '-c', code], stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = p.communicate(msg) else: p = AsyncPopen([sys.executable, '-c', code], stdin=PIPE, stdout=PIPE, stderr=PIPE) out, err = communicate(p, msg) assert out == msg.upper() assert err == b'starting\nexiting\n'