diff -r 7e179ee91af0 Doc/library/subprocess.rst --- a/Doc/library/subprocess.rst Mon Mar 23 15:26:49 2015 +0200 +++ b/Doc/library/subprocess.rst Wed Mar 25 11:31:45 2015 -0700 @@ -665,6 +665,64 @@ .. versionchanged:: 3.3 *timeout* was added. +.. method:: Popen.write_nonblocking(input, timeout=0) + + Send data to the child process via the stdin pipe without blocking. Only + valid if ``stdin=PIPE`` specified during creation. This method accepts + ``bytes()`` objects as input and returns the number of bytes written to the + child ``stdin`` pipe. + + A write of 0 bytes or a ``BrokenPipeError`` does not necessarily mean that + the child process has ended, check the result of :meth:`poll` to determine + if the subprocess has ended. Note that while you may be able to send data to + a subprocess that had returned 0 bytes sent, a ``BrokenPipeError`` implies + that no subsequent data will read by the child process from ``stdin``. + + On Windows, due to the system calls we use to send data to the child process, + this method will return one of two values: 0 or the length of the data passed + to the method. If the return value is nonzero, the write has been sent to the + underlying system for sending to the subprocess pipe, but has not necessarily + completed. If the return value is 0, then either the caller passed 0 bytes, + or the previous write is still pending completion. + + The meaning of the ``timeout`` parameter varies depending on platform, but it + should always be provided as a nonnegative integer or float in seconds. If + :meth:`write_nonblocking` completes prior to the provided timeout on Windows, + that means that the write is completed and you can probably write more data + immediately. On other platforms, ``timeout`` is how long you are willing to + wait for the child ``stdin`` to be ready to be written to. + + Unlike :meth:`communicate`, :meth:`write_nonblocking` does not accept text + string arguments for writing to the child process when + ``universal_newlines=True``. This is because writing may result in multi-byte + encoded characters being split up on non-blocking write, which would prevent + subsequent attempts to re-send the partial input from sending the rest of the + character. + + .. versionadded:: 3.5 + +.. method:: Popen.read_nonblocking(bufsize=4096) + + Receive data from the child's ``stdout`` without blocking. This function will + attempt to read up to ``bufsize`` bytes from the child ``stdout`` as + possible without blocking, returning the results unmodified. The subprocess + must have been created with ``stdout=PIPE``, and the *bufsize* argument + must be greater than 0. + + Unlike results from :meth:`communicate`, bytes read from the child ``stdout`` + are returned unaltered, even when ``universal_newlines=True``, for the same + reasons why :meth:`write_nonblocking` does not accept text. + + If this method raises a ``BrokenPipeError``, the pipe was closed by the child + process and no more data is readable from the ``stdout`` pipe. + + .. versionadded:: 3.5 + +.. method:: Popen.read_stderr_nonblocking(bufsize=4096) + + Same functionality as :meth:`read_nonblocking()`, only applied to ``stderr``. + + .. versionadded:: 3.5 .. method:: Popen.send_signal(signal) diff -r 7e179ee91af0 Lib/subprocess.py --- a/Lib/subprocess.py Mon Mar 23 15:26:49 2015 +0200 +++ b/Lib/subprocess.py Wed Mar 25 11:31:45 2015 -0700 @@ -365,7 +365,10 @@ import builtins import warnings import errno -from time import monotonic as _time +try: + from time import monotonic as _time +except ImportError: + from time import time as _time # Exception classes used by this module. class SubprocessError(Exception): pass @@ -400,6 +403,11 @@ if mswindows: + from collections import namedtuple + import itertools + import locale + import _overlapped + import tempfile import threading import msvcrt import _winapi @@ -409,8 +417,10 @@ hStdOutput = None hStdError = None wShowWindow = 0 + _OverlappedIO = namedtuple('_OverlappedIO', 'name io event') else: import _posixsubprocess + import fcntl import select import selectors try: @@ -461,11 +471,67 @@ raise ValueError("already closed") def __repr__(self): - return "%s(%d)" % (self.__class__.__name__, int(self)) + return "Handle(%d)" % int(self) __del__ = Close __str__ = __repr__ + _WAIT = 0xffffffff + BUFSIZE = 8192 + _mmap_counter = itertools.count() + # Replacement for os.pipe() using handles instead of fds, originally lived + # in asyncio/windows_utils.py, but used here due to circular import + # prevention. + def _pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE): + """Like os.pipe() but with overlapped support and using handles not fds.""" + address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' % + (os.getpid(), next(_mmap_counter))) + + if duplex: + openmode = _winapi.PIPE_ACCESS_DUPLEX + access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE + obsize, ibsize = bufsize, bufsize + else: + openmode = _winapi.PIPE_ACCESS_INBOUND + access = _winapi.GENERIC_WRITE + obsize, ibsize = 0, bufsize + + openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + + if overlapped[0]: + openmode |= _winapi.FILE_FLAG_OVERLAPPED + + if overlapped[1]: + flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED + else: + flags_and_attribs = 0 + + h1 = h2 = None + try: + h1 = _winapi.CreateNamedPipe( + address, openmode, _winapi.PIPE_WAIT, + 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) + + h2 = _winapi.CreateFile( + address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, + flags_and_attribs, _winapi.NULL) + + ov = _winapi.ConnectNamedPipe(h1, overlapped=True) + ov.GetOverlappedResult(True) + return h1, h2 + except: + if h1 is not None: + _winapi.CloseHandle(h1) + if h2 is not None: + _winapi.CloseHandle(h2) + raise + +try: + MAXFD = os.sysconf("SC_OPEN_MAX") +except: + MAXFD = 256 + +_READ_BLOCK_SIZE = 32768 # This lists holds Popen instances for which the underlying process had not # exited at the time its __del__ method got called: those processes are wait()ed @@ -489,6 +555,14 @@ DEVNULL = -3 +def _eintr_retry_call(func, *args): + while True: + try: + return func(*args) + except InterruptedError: + continue + + # XXX This function is only used by multiprocessing and the test suite, # but it's here so that it can be imported when Python is compiled without # threads. @@ -595,8 +669,7 @@ if 'input' in kwargs: if 'stdin' in kwargs: raise ValueError('stdin and input arguments may not both be used.') - inputdata = kwargs['input'] - del kwargs['input'] + inputdata = kwargs.pop('input') kwargs['stdin'] = PIPE else: inputdata = None @@ -892,12 +965,10 @@ self.stdout.close() if self.stderr: self.stderr.close() - try: # Flushing a BufferedWriter may raise an error - if self.stdin: - self.stdin.close() - finally: - # Wait for the process to terminate, to avoid zombies. - self.wait() + if self.stdin: + self.stdin.close() + # Wait for the process to terminate, to avoid zombies. + self.wait() def __del__(self, _maxsize=sys.maxsize): if not self._child_created: @@ -914,22 +985,6 @@ self._devnull = os.open(os.devnull, os.O_RDWR) return self._devnull - def _stdin_write(self, input): - if input: - try: - self.stdin.write(input) - except BrokenPipeError: - # communicate() must ignore broken pipe error - pass - except OSError as e: - if e.errno == errno.EINVAL and self.poll() is not None: - # Issue #19612: On Windows, stdin.write() fails with EINVAL - # if the process already exited before the write - pass - else: - raise - self.stdin.close() - def communicate(self, input=None, timeout=None): """Interact with process: Send data to stdin. Read data from stdout and stderr, until end-of-file is reached. Wait for @@ -955,12 +1010,18 @@ stdout = None stderr = None if self.stdin: - self._stdin_write(input) + if input: + try: + self.stdin.write(input) + except OSError as e: + if e.errno != errno.EPIPE and e.errno != errno.EINVAL: + raise + self.stdin.close() elif self.stdout: - stdout = self.stdout.read() + stdout = _eintr_retry_call(self.stdout.read) self.stdout.close() elif self.stderr: - stderr = self.stderr.read() + stderr = _eintr_retry_call(self.stderr.read) self.stderr.close() self.wait() else: @@ -976,6 +1037,16 @@ sts = self.wait(timeout=self._remaining_time(endtime)) + stdout = b''.join(stdout or []) + stderr = b''.join(stderr or []) + if self.universal_newlines: + if self.stdout is not None: + stdout = self._translate_newlines(stdout, self.stdout.encoding) + if self.stderr is not None: + stderr = self._translate_newlines(stderr, self.stderr.encoding) + stdout = None if self.stdout is None else stdout + stderr = None if self.stderr is None else stderr + return (stdout, stderr) @@ -998,6 +1069,15 @@ if _time() > endtime: raise TimeoutExpired(self.args, orig_timeout) + def read_nonblocking(self, bufsize=4096): + """Reads any avialable data from the child stdout. + """ + return self._read_nonblocking(self.stdout, max(bufsize, 1)) + + def read_stderr_nonblocking(self, bufsize=4096): + """Reads any available data from the child stderr. + """ + return self._read_nonblocking(self.stderr, max(bufsize, 1)) if mswindows: # @@ -1021,7 +1101,7 @@ p2cread = Handle(p2cread) _winapi.CloseHandle(_) elif stdin == PIPE: - p2cread, p2cwrite = _winapi.CreatePipe(None, 0) + p2cread, p2cwrite = _pipe(overlapped=(False, True), duplex=True) p2cread, p2cwrite = Handle(p2cread), Handle(p2cwrite) elif stdin == DEVNULL: p2cread = msvcrt.get_osfhandle(self._get_devnull()) @@ -1039,7 +1119,7 @@ c2pwrite = Handle(c2pwrite) _winapi.CloseHandle(_) elif stdout == PIPE: - c2pread, c2pwrite = _winapi.CreatePipe(None, 0) + c2pread, c2pwrite = _pipe(overlapped=(True, False)) c2pread, c2pwrite = Handle(c2pread), Handle(c2pwrite) elif stdout == DEVNULL: c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) @@ -1057,7 +1137,7 @@ errwrite = Handle(errwrite) _winapi.CloseHandle(_) elif stderr == PIPE: - errread, errwrite = _winapi.CreatePipe(None, 0) + errread, errwrite = _pipe(overlapped=(True, False)) errread, errwrite = Handle(errread), Handle(errwrite) elif stderr == STDOUT: errwrite = c2pwrite @@ -1180,60 +1260,111 @@ return self.returncode - def _readerthread(self, fh, buffer): - buffer.append(fh.read()) - fh.close() + def _communicate(self, input, endtime, orig_timeout): + # Prepare the output buffers for convenient access and the ability + # to call .communicate() again. + output = {} + if self.stdout is not None: + if not hasattr(self, "_stdout_buff"): + self._stdout_buff = [] + output['stdout'] = self._stdout_buff + if self.stderr is not None: + if not hasattr(self, "_stderr_buff"): + self._stderr_buff = [] + output['stderr'] = self._stderr_buff + # Prepare our handles for reading + h_or_none = lambda x: msvcrt.get_osfhandle(x.fileno()) if x else None + handles = {'stdout': h_or_none(self.stdout), + 'stderr': h_or_none(self.stderr)} - def _communicate(self, input, endtime, orig_timeout): - # Start reader threads feeding into a list hanging off of this - # object, unless they've already been started. - if self.stdout and not hasattr(self, "_stdout_buff"): - self._stdout_buff = [] - self.stdout_thread = \ - threading.Thread(target=self._readerthread, - args=(self.stdout, self._stdout_buff)) - self.stdout_thread.daemon = True - self.stdout_thread.start() - if self.stderr and not hasattr(self, "_stderr_buff"): - self._stderr_buff = [] - self.stderr_thread = \ - threading.Thread(target=self._readerthread, - args=(self.stderr, self._stderr_buff)) - self.stderr_thread.daemon = True - self.stderr_thread.start() + # Also keep our pending_ios between calls if we need to re-call + # communicate() after a timeout. + pending_ios = getattr(self, '_pending_ios', []) + self._pending_ios = pending_ios - if self.stdin: - self._stdin_write(input) + if self.stdin is not None and not self._communication_started: + # Best part about overlapped IO? This right here. You can send + # an arbitrarily large chunk of data to your child process, and + # Windows will keep sending it in the background for you. You + # just need to wait for it to complete, either due to a full + # send, or the child dying. + if input is None: + if not self._input: + self.stdin.close() + else: + # Wait until any earlier overlapped write completes + # before closing stdin. + pending_ios.append( + _OverlappedIO('stdin', self._input, self._input.event)) + else: + if self.universal_newlines: + input = input.encode(self.stdin.encoding) + ov = _overlapped.Overlapped() + try: + ov.WriteFile(msvcrt.get_osfhandle(self.stdin.fileno()), input) + except BrokenPipeError: + # Ignore pipe errors and let anything else bubble up + # the stack. This preserves the pre-existing behavior. + pass + else: + # If we had any previously pending overlapped write, + # this write will still work just fine, so we can use + # this overlapped IO completion to signal closing the + # stdin pipe. + pending_ios.append(_OverlappedIO('stdin', ov, ov.event)) - # Wait for the reader threads, or time out. If we time out, the - # threads remain reading and the fds left open in case the user - # calls communicate again. - if self.stdout is not None: - self.stdout_thread.join(self._remaining_time(endtime)) - if self.stdout_thread.is_alive(): - raise TimeoutExpired(self.args, orig_timeout) - if self.stderr is not None: - self.stderr_thread.join(self._remaining_time(endtime)) - if self.stderr_thread.is_alive(): + def read(name): + if handles[name]: + ov = _overlapped.Overlapped() + ov.ReadFile(handles[name], _READ_BLOCK_SIZE) + pending_ios.append(_OverlappedIO(name, ov, ov.event)) + + # Prepare the stdout/stderr reads, as applicable + if not self._communication_started: + read('stdout') + read('stderr') + + # Time for an event loop! + while pending_ios: + timeout = self._remaining_time(endtime) + if timeout is not None and timeout < 0: raise TimeoutExpired(self.args, orig_timeout) - # Collect the output from and close both pipes, now that we know - # both have been read successfully. - stdout = None - stderr = None + # Prepare the real timeout and wait for the pending IOs + timeout = _WAIT if timeout is None else int(timeout * 1000) + _winapi.WaitForMultipleObjects( + [io.event for io in pending_ios], False, min(timeout, _WAIT)) + + # split the ios into completed and still pending + completed = [p for p in pending_ios if not p.io.pending] + pending_ios[:] = [p for p in pending_ios if p not in completed] + + # Process any completed io events + for comp in completed: + # If we finished the stdin write, go ahead and close the + # stdin file handle to prevent subsequent writes. + if comp.name == 'stdin': + self.stdin.close() + continue + + # As long as we've received a result from our read, there + # might be more data available on the other side. Keep + # trying to read from that handle. + result = comp.io.getresult() + if result: + output[comp.name].append(result) + read(comp.name) + + # Close stdout and stderr if self.stdout: - stdout = self._stdout_buff self.stdout.close() if self.stderr: - stderr = self._stderr_buff self.stderr.close() - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = stdout[0] - if stderr is not None: - stderr = stderr[0] + # Return the buffers (if any) to the communicate() method. + stdout = self._stdout_buff if self.stdout else None + stderr = self._stderr_buff if self.stderr else None return (stdout, stderr) @@ -1264,6 +1395,50 @@ kill = terminate + def write_nonblocking(self, input, timeout=0): + """Writes as much input to the child stdin as possible, returns the + number of bytes written. + """ + if self._input: + if self._input.error not in {0, _winapi.ERROR_IO_PENDING}: + raise Exception("got overlapped result error: %s" % self._input.error) + + # Don't accept any more IOs until the last one is done + if self._input.pending: + return 0 + + # Get the result and discard it. + self._input.getresult(True) + self._input = None + + if not input: + return 0 + + handle = msvcrt.get_osfhandle(self.stdin.fileno()) + self._input = ov = _overlapped.Overlapped() + ov.WriteFile(handle, input) + + if ov.error not in {0, _winapi.ERROR_IO_PENDING}: + self._input = None + raise Exception("got overlapped result error: %s"%ov.error) + + if timeout > 0: + _winapi.WaitForSingleObject(handle, min(int(timeout * 1000), _WAIT)) + + # return that we sent all the input, even if it is still pending + return len(input) + + def _read_nonblocking(self, pipe, bufsize): + # Handle circular dependency. + handle = msvcrt.get_osfhandle(pipe.fileno()) + avail = _winapi.PeekNamedPipe(handle, 0)[0] + if avail > 0: + # The use of overlapped IOs here is necessary as per + # documentation on ReadFile. + ov = _overlapped.Overlapped() + ov.ReadFile(handle, min(bufsize, avail)) + return ov.getresult(True) + else: # # POSIX methods @@ -1319,6 +1494,16 @@ errread, errwrite) + def _close_fds(self, fds_to_keep): + start_fd = 3 + for fd in sorted(fds_to_keep): + if fd >= start_fd: + os.closerange(start_fd, fd) + start_fd = fd + 1 + if start_fd <= MAXFD: + os.closerange(start_fd, MAXFD) + + def _execute_child(self, args, executable, preexec_fn, close_fds, pass_fds, cwd, env, startupinfo, creationflags, shell, @@ -1404,7 +1589,7 @@ # exception (limited in size) errpipe_data = bytearray() while True: - part = os.read(errpipe_read, 50000) + part = _eintr_retry_call(os.read, errpipe_read, 50000) errpipe_data += part if not part or len(errpipe_data) > 50000: break @@ -1414,9 +1599,10 @@ if errpipe_data: try: - os.waitpid(self.pid, 0) - except ChildProcessError: - pass + _eintr_retry_call(os.waitpid, self.pid, 0) + except OSError as e: + if e.errno != errno.ECHILD: + raise try: exception_name, hex_errno, err_msg = ( errpipe_data.split(b':', 2)) @@ -1499,8 +1685,10 @@ def _try_wait(self, wait_flags): """All callers to this function MUST hold self._waitpid_lock.""" try: - (pid, sts) = os.waitpid(self.pid, wait_flags) - except ChildProcessError: + (pid, sts) = _eintr_retry_call(os.waitpid, self.pid, wait_flags) + except OSError as e: + if e.errno != errno.ECHILD: + raise # This happens if SIGCLD is set to be ignored or waiting # for child processes has otherwise been disabled for our # process. This child is dead, we can't get the status. @@ -1612,15 +1800,18 @@ self._input_offset + _PIPE_BUF] try: self._input_offset += os.write(key.fd, chunk) - except BrokenPipeError: - selector.unregister(key.fileobj) - key.fileobj.close() + except OSError as e: + if e.errno == errno.EPIPE: + selector.unregister(key.fileobj) + key.fileobj.close() + else: + raise else: if self._input_offset >= len(self._input): selector.unregister(key.fileobj) key.fileobj.close() elif key.fileobj in (self.stdout, self.stderr): - data = os.read(key.fd, 32768) + data = os.read(key.fd, _READ_BLOCK_SIZE) if not data: selector.unregister(key.fileobj) key.fileobj.close() @@ -1628,22 +1819,7 @@ self.wait(timeout=self._remaining_time(endtime)) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = b''.join(stdout) - if stderr is not None: - stderr = b''.join(stderr) - - # Translate newlines, if requested. - # This also turns bytes into strings. - if self.universal_newlines: - if stdout is not None: - stdout = self._translate_newlines(stdout, - self.stdout.encoding) - if stderr is not None: - stderr = self._translate_newlines(stderr, - self.stderr.encoding) - + # Return the buffers (if any) to the communicate() method. return (stdout, stderr) @@ -1672,3 +1848,36 @@ """Kill the process with SIGKILL """ self.send_signal(signal.SIGKILL) + + def write_nonblocking(self, input, timeout=0): + """Writes as much input to the child stdin as possible, returns the + number of bytes written. + """ + with _PopenSelector() as selector: + selector.register(self.stdin, selectors.EVENT_WRITE) + + if selector.select(max(timeout, 0)): + return os.write(self.stdin.fileno(), input) + return 0 + + def _read_nonblocking(self, pipe, bufsize): + # Yes, fcntl non-blocking bit flipping is necessary for this to + # work. + flags = fcntl.fcntl(pipe, fcntl.F_GETFL) + fcntl.fcntl(pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + try: + with _PopenSelector() as selector: + selector.register(pipe, selectors.EVENT_READ) + + if selector.select(0): + ret = os.read(pipe.fileno(), bufsize) + if not ret: + # Zero byte read after being readable == closed + raise BrokenPipeError() + return ret + + return b'' + finally: + if not pipe.closed: + fcntl.fcntl(pipe, fcntl.F_SETFL, flags) diff -r 7e179ee91af0 Lib/test/subprocessdata/qcat.py --- a/Lib/test/subprocessdata/qcat.py Mon Mar 23 15:26:49 2015 +0200 +++ b/Lib/test/subprocessdata/qcat.py Wed Mar 25 11:31:45 2015 -0700 @@ -5,3 +5,5 @@ if __name__ == "__main__": for line in sys.stdin: sys.stdout.write(line) + if '--flush' in sys.argv: + sys.stdout.flush() diff -r 7e179ee91af0 Lib/test/test_subprocess.py --- a/Lib/test/test_subprocess.py Mon Mar 23 15:26:49 2015 +0200 +++ b/Lib/test/test_subprocess.py Wed Mar 25 11:31:45 2015 -0700 @@ -763,6 +763,32 @@ self.assertEqual(stdout, b"bananasplit") self.assertStderrEqual(stderr, b"") + def test_nonblocking_writes_before_communicate(self): + # stdin.write before communicate() + p = subprocess.Popen([sys.executable, "-c", + 'import sys, time;' + 'time.sleep(1);' + 'sys.stdout.write(sys.stdin.read())'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdin.close) + self.addCleanup(p.stderr.close) + self.addCleanup(p.stdout.close) + self.addCleanup(p.wait) + + # Make sure to induce a fully overlapped IO on Windows + oremain = b"b" * support.PIPE_MAX_SIZE + remain = memoryview(oremain) + while remain: + written = p.write_nonblocking(remain) + remain = remain[written:] + if not written: + time.sleep(.001) + (stdout, stderr) = p.communicate(b"split") + self.assertEqual(stdout, oremain + b"split") + self.assertStderrEqual(stderr, b"") + def test_universal_newlines(self): p = subprocess.Popen([sys.executable, "-c", 'import sys,os;' + SETBINARY + @@ -910,6 +936,26 @@ _bootlocale.getpreferredencoding = old_getpreferredencoding self.assertEqual(stdout, '1\n2\n3\n4') + def test_universal_newlines_only_takes_strings(self): + # Verify that when universal newlines are enabled that we can only + # send strings, not bytes. + p = subprocess.Popen( + [sys.executable, '-c', + "import sys; sys.stdout.write(sys.stdin.read());"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) + with p: + self.assertRaises(AttributeError, p.communicate, input=b'I should fail\n', timeout=1) + + def test_universal_newlines_no_strings_nonblocking(self): + # Verify that when universal newlines are enabled that we can only + # send strings, not bytes. + p = subprocess.Popen( + [sys.executable, '-c', + "import sys; sys.stdout.write(sys.stdin.read());"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) + with p: + self.assertRaises(TypeError, p.write_nonblocking, input='I should fail\n') + def test_no_leaking(self): # Make sure we leak no resources if not mswindows: @@ -1232,6 +1278,135 @@ fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) + def test_nonblocking(self, bsize=-1): + # We just want to verify that we can send and receive data incrementally + # from the child process. + qcat = support.findfile("qcat.py", subdir="subprocessdata") + subdata = b'zxcvbn' + data = subdata * 4 + os.linesep.encode('latin-1') + + p = subprocess.Popen([sys.executable, qcat, '--flush'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, bufsize=-1) + + self.addCleanup(p.wait) + self.addCleanup(p.kill) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stdin.close) + + same = None + + for i in range(3): + # Do it a few times to make sure that we can actually interact. + self.assertEqual(p.write_nonblocking(data), len(data)) + r = b'' + # Try for a bit for slow buildbots. + timeout = time.monotonic() + 3 + while len(r) != len(data) and time.monotonic() < timeout: + # Also read small chunks to make sure that we are okay reading + # small chunks. + re = p.read_nonblocking(8) + self.assertLessEqual(len(re), 8) + r += re + time.sleep(.01) + self.assertEqual(r, data) + + def test_nonblocking_no_buffer(self): + self.test_nonblocking(0) + + def test_nonblocking_sleeper(self): + # We want to make sure that even with a "hung" child process, we can + # at least send some data into the pipe buffers. + p = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.kill) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stdin.close) + + total = 0 + passes = 0 + timeout = time.monotonic() + 3 + while time.monotonic() < timeout: + passes += 1 + # A single write should fill up any pipe buffers with the child + # not reading at all (default buffer size is 8k), but we keep + # writing data until the pipe breaks due to the child process + # ending. + try: + total += p.write_nonblocking(32765*b'1' + b'\n') + except BrokenPipeError: + break + time.sleep(.001) + self.assertGreater(passes, 1) + self.assertGreater(total, 0) + + def test_nonblocking_write_timeout(self): + # Test to make sure that when writing to a child process we can use + # timeouts to stall our writing if the child isn't reading fast enough. + p = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.wait) + self.addCleanup(p.kill) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stdin.close) + + total = 0 + passes = 0 + waited = False + timeout = time.monotonic() + 1.5 + while time.monotonic() < timeout and not waited: + passes += 1 + # A single write should fill up any pipe buffers with the child + # not reading at all (default buffer size is 8k), but we keep + # writing data until the write blocks longer than .25 seconds. + s = time.monotonic() + total += p.write_nonblocking(32765*b'1' + b'\n', .25) + e = time.monotonic() + waited = waited or e - s > .25 + self.assertGreater(passes, 1) + self.assertGreater(total, 0) + self.assertTrue(waited) + + def test_nonblocking_large_write(self): + # Test whether we can send huge blocks of data to the child process + # without losing or mis-counting data written/read. + p = subprocess.Popen([sys.executable, '-c', + 'import sys; sys.stdout.write(str(len(sys.stdin.read())));'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.wait) + self.addCleanup(p.kill) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stdin.close) + + block = memoryview((2**20 - 1)*b'1' + b'\n') + total = 0 + passes = 0 + timeout = time.time() + .25 + while time.time() < timeout and block: + passes += 1 + sent = p.write_nonblocking(block) + block = block[sent:] + total += sent + p.stdin.close() + self.assertGreater(passes, 0) + self.assertGreater(total, 0) + + received = int(p.stdout.read()) + self.assertEqual(total, received) + + def test_exit_subprocess(self): + p = subprocess.Popen([sys.executable, '-c', + 'import sys,time; sys.stdout.write("foo\\n");sys.stdout.flush();sys.stdout.close();time.sleep(.5);'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + self.addCleanup(p.wait) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stdin.close) + p.wait() + self.assertEqual(p.read_nonblocking(8192), b'foo\n') + self.assertRaises(BrokenPipeError, lambda: p.read_nonblocking(8192)) + @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase): @@ -2515,6 +2690,8 @@ self.assertIsNone(proc.returncode) # EPIPE expected under POSIX; EINVAL under Windows self.assertRaises(OSError, proc.__exit__, None, None, None) + # Get the return code, because the exception we just caught prevented it + proc.wait() self.assertEqual(proc.returncode, 0) self.assertTrue(proc.stdin.closed)