diff -r d8659dbebfd1 Doc/library/subprocess.rst --- a/Doc/library/subprocess.rst Thu Mar 27 20:51:15 2014 -0700 +++ b/Doc/library/subprocess.rst Mon Apr 14 14:29:34 2014 -0700 @@ -689,6 +689,64 @@ .. versionchanged:: 3.3 *timeout* was added. +.. method:: Popen.write_nonblocking(input, timeout=0) + + Send data to the child process without blocking. This function will return + the number of bytes that were written to the child process. Note that even + zero-byte sends do not necessarily imply that the child process is not + reading or is not running (check the results of :meth:`poll` instead). + + .. note:: + + On Windows, due to the system calls we use to send data to the child + process, this method will return one of two values: 0 or the length of the + data passed to the method. If the return value is nonzero, the write has + been sent to the underlying system for sending to the subprocess pipe, but + has not necessarily completed. If the return value is 0, then either the + caller passed 0 bytes, or the previous write is still pending completion. + + .. note:: + + The meaning of the ``timeout`` parameter varies depending on platform. If + :meth:`write_nonblocking` completes prior to the provided timeout on + Windows, that means that the write is completed and you can probably write + more data immediately. On other platforms, ``timeout`` is how long you are + willing to wait for ``stdin`` to be ready to be written to. + + .. warning:: + + Unlike :meth:`communicate`, :meth:`write_nonblocking` does not accept text + string arguments for writing to the child process when + ``universal_newlines=True``. This is because writing may result in + multi-byte encoded characters being split up on non-blocking write, which + would prevent subsequent attempts to re-send the partial input from + finishing the character. + + .. versionadded:: 3.5 + +.. method:: Popen.read_nonblocking(bufsize=4096) + + Receive data from the child's ``stdout`` without blocking. This function will + attempt to read up to ``buffsize`` bytes from the child ``stdout`` as + possible without blocking, returning the results unmodified. + + .. warning:: + + Unlike results from :meth:`communicate`, bytes read from the child + ``stdout`` are returned unaltered, even when ``universal_newlines=True``. + + .. versionadded:: 3.5 + +.. method:: Popen.read_stderr_nonblocking(bufsize=4096) + + Same functionality as :meth:`read_nonblocking()`, only applied to ``stderr``. + + .. warning:: + + Unlike results from :meth:`communicate`, bytes read from the child + ``stdout`` are returned unaltered, even when ``universal_newlines=True``. + + .. versionadded:: 3.5 .. method:: Popen.send_signal(signal) diff -r d8659dbebfd1 Lib/asyncio/windows_utils.py --- a/Lib/asyncio/windows_utils.py Thu Mar 27 20:51:15 2014 -0700 +++ b/Lib/asyncio/windows_utils.py Mon Apr 14 14:29:34 2014 -0700 @@ -8,7 +8,6 @@ raise ImportError('win32 only') import socket -import itertools import msvcrt import os import subprocess @@ -22,11 +21,10 @@ # Constants/globals -BUFSIZE = 8192 PIPE = subprocess.PIPE STDOUT = subprocess.STDOUT -_mmap_counter = itertools.count() - +pipe = subprocess._pipe +BUFSIZE = subprocess.BUFSIZE # Replacement for socket.socketpair() @@ -71,53 +69,6 @@ return (ssock, csock) -# Replacement for os.pipe() using handles instead of fds - - -def pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE): - """Like os.pipe() but with overlapped support and using handles not fds.""" - address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' % - (os.getpid(), next(_mmap_counter))) - - if duplex: - openmode = _winapi.PIPE_ACCESS_DUPLEX - access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE - obsize, ibsize = bufsize, bufsize - else: - openmode = _winapi.PIPE_ACCESS_INBOUND - access = _winapi.GENERIC_WRITE - obsize, ibsize = 0, bufsize - - openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE - - if overlapped[0]: - openmode |= _winapi.FILE_FLAG_OVERLAPPED - - if overlapped[1]: - flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED - else: - flags_and_attribs = 0 - - h1 = h2 = None - try: - h1 = _winapi.CreateNamedPipe( - address, openmode, _winapi.PIPE_WAIT, - 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) - - h2 = _winapi.CreateFile( - address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, - flags_and_attribs, _winapi.NULL) - - ov = _winapi.ConnectNamedPipe(h1, overlapped=True) - ov.GetOverlappedResult(True) - return h1, h2 - except: - if h1 is not None: - _winapi.CloseHandle(h1) - if h2 is not None: - _winapi.CloseHandle(h2) - raise - # Wrapper for a pipe handle diff -r d8659dbebfd1 Lib/subprocess.py --- a/Lib/subprocess.py Thu Mar 27 20:51:15 2014 -0700 +++ b/Lib/subprocess.py Mon Apr 14 14:29:34 2014 -0700 @@ -392,6 +392,15 @@ if mswindows: + # There is a circular dependency that asyncio relies on subprocess, but + # asyncio requires the use of asyncio._overlapped to handle non-blocking + # writes to the child stdin. We handle the dependency the first time + # someone tries a non-blocking read or write, or calls communicate(). + _overlapped = None + from collections import namedtuple + import itertools + import locale + import tempfile import threading import msvcrt import _winapi @@ -401,8 +410,10 @@ hStdOutput = None hStdError = None wShowWindow = 0 + _OverlappedIO = namedtuple('_OverlappedIO', 'name io event') else: import _posixsubprocess + import fcntl import select import selectors @@ -454,6 +465,55 @@ __del__ = Close __str__ = __repr__ + BUFSIZE = 8192 + _mmap_counter = itertools.count() + # Replacement for os.pipe() using handles instead of fds, originally lived + # in asyncio/windows_utils.py, but used here due to circular import + # prevention. + def _pipe(*, duplex=False, overlapped=(True, True), bufsize=BUFSIZE): + """Like os.pipe() but with overlapped support and using handles not fds.""" + address = tempfile.mktemp(prefix=r'\\.\pipe\python-pipe-%d-%d-' % + (os.getpid(), next(_mmap_counter))) + + if duplex: + openmode = _winapi.PIPE_ACCESS_DUPLEX + access = _winapi.GENERIC_READ | _winapi.GENERIC_WRITE + obsize, ibsize = bufsize, bufsize + else: + openmode = _winapi.PIPE_ACCESS_INBOUND + access = _winapi.GENERIC_WRITE + obsize, ibsize = 0, bufsize + + openmode |= _winapi.FILE_FLAG_FIRST_PIPE_INSTANCE + + if overlapped[0]: + openmode |= _winapi.FILE_FLAG_OVERLAPPED + + if overlapped[1]: + flags_and_attribs = _winapi.FILE_FLAG_OVERLAPPED + else: + flags_and_attribs = 0 + + h1 = h2 = None + try: + h1 = _winapi.CreateNamedPipe( + address, openmode, _winapi.PIPE_WAIT, + 1, obsize, ibsize, _winapi.NMPWAIT_WAIT_FOREVER, _winapi.NULL) + + h2 = _winapi.CreateFile( + address, access, 0, _winapi.NULL, _winapi.OPEN_EXISTING, + flags_and_attribs, _winapi.NULL) + + ov = _winapi.ConnectNamedPipe(h1, overlapped=True) + ov.GetOverlappedResult(True) + return h1, h2 + except: + if h1 is not None: + _winapi.CloseHandle(h1) + if h2 is not None: + _winapi.CloseHandle(h2) + raise + try: MAXFD = os.sysconf("SC_OPEN_MAX") except: @@ -595,8 +655,7 @@ if 'input' in kwargs: if 'stdin' in kwargs: raise ValueError('stdin and input arguments may not both be used.') - inputdata = kwargs['input'] - del kwargs['input'] + inputdata = kwargs.pop('input') kwargs['stdin'] = PIPE else: inputdata = None @@ -952,6 +1011,16 @@ sts = self.wait(timeout=self._remaining_time(endtime)) + stdout = b''.join(stdout or []) + stderr = b''.join(stderr or []) + if self.universal_newlines: + if self.stdout is not None: + stdout = self._translate_newlines(stdout, self.stdout.encoding) + if self.stderr is not None: + stderr = self._translate_newlines(stderr, self.stderr.encoding) + + stdout = None if self.stdout is None else stdout + stderr = None if self.stderr is None else stderr return (stdout, stderr) @@ -974,6 +1043,15 @@ if _time() > endtime: raise TimeoutExpired(self.args, orig_timeout) + def read_nonblocking(self, bufsize=4096): + """Reads any avialable data from the child stdout. + """ + return self._read_nonblocking(self.stdout, max(bufsize, 1)) + + def read_stderr_nonblocking(self, bufsize=4096): + """Reads any available data from the child stderr. + """ + return self._read_nonblocking(self.stderr, max(bufsize, 1)) if mswindows: # @@ -997,7 +1075,7 @@ p2cread = Handle(p2cread) _winapi.CloseHandle(_) elif stdin == PIPE: - p2cread, p2cwrite = _winapi.CreatePipe(None, 0) + p2cread, p2cwrite = _pipe(overlapped=(False, True), duplex=True) p2cread, p2cwrite = Handle(p2cread), Handle(p2cwrite) elif stdin == DEVNULL: p2cread = msvcrt.get_osfhandle(self._get_devnull()) @@ -1015,7 +1093,7 @@ c2pwrite = Handle(c2pwrite) _winapi.CloseHandle(_) elif stdout == PIPE: - c2pread, c2pwrite = _winapi.CreatePipe(None, 0) + c2pread, c2pwrite = _pipe(overlapped=(True, False)) c2pread, c2pwrite = Handle(c2pread), Handle(c2pwrite) elif stdout == DEVNULL: c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) @@ -1033,7 +1111,7 @@ errwrite = Handle(errwrite) _winapi.CloseHandle(_) elif stderr == PIPE: - errread, errwrite = _winapi.CreatePipe(None, 0) + errread, errwrite = _pipe(overlapped=(True, False)) errread, errwrite = Handle(errread), Handle(errwrite) elif stderr == STDOUT: errwrite = c2pwrite @@ -1156,74 +1234,121 @@ return self.returncode - def _readerthread(self, fh, buffer): - buffer.append(fh.read()) - fh.close() + def _communicate(self, input, endtime, orig_timeout): + # handle circular dependency + global _overlapped + if not _overlapped: + from asyncio import _overlapped + # Prep the output buffers for convenient access and the ability to + # call .communicate() again. + output = {} + if self.stdout is not None: + if not hasattr(self, "_stdout_buff"): + self._stdout_buff = [] + output['stdout'] = self._stdout_buff + if self.stderr is not None: + if not hasattr(self, "_stderr_buff"): + self._stderr_buff = [] + output['stderr'] = self._stderr_buff - def _communicate(self, input, endtime, orig_timeout): - # Start reader threads feeding into a list hanging off of this - # object, unless they've already been started. - if self.stdout and not hasattr(self, "_stdout_buff"): - self._stdout_buff = [] - self.stdout_thread = \ - threading.Thread(target=self._readerthread, - args=(self.stdout, self._stdout_buff)) - self.stdout_thread.daemon = True - self.stdout_thread.start() - if self.stderr and not hasattr(self, "_stderr_buff"): - self._stderr_buff = [] - self.stderr_thread = \ - threading.Thread(target=self._readerthread, - args=(self.stderr, self._stderr_buff)) - self.stderr_thread.daemon = True - self.stderr_thread.start() + # Prep our handles for reading + h_or_none = lambda x: msvcrt.get_osfhandle(x.fileno()) if x else None + handles = {'stdout': h_or_none(self.stdout), + 'stderr': h_or_none(self.stderr)} - if self.stdin: - if input is not None: + # Also keep our pending_ios between calls if we need to re-call + # communicate() after a timeout. + pending_ios = getattr(self, '_pending_ios', []) + self._pending_ios = pending_ios + + if self.stdin is not None and not self._communication_started: + # Best part about overlapped IO? This right here. You can send + # an arbitrarily large chunk of data to your child process, and + # Windows will keep sending it in the background for you. You + # just need to wait for it to complete, either due to a full + # send, or the child dying. + if input is None: + writing = getattr(self, 'writing', None) + if not writing: + self.stdin.close() + else: + # Wait until any earlier overlapped write completes + # before closing stdin. + pending_ios.append(_OverlappedIO('stdin', writing, writing.event)) + else: + if self.universal_newlines: + input = input.encode(self.stdin.encoding) + ov = _overlapped.Overlapped() try: - self.stdin.write(input) - except OSError as e: - if e.errno == errno.EPIPE: - # communicate() should ignore pipe full error - pass - elif (e.errno == errno.EINVAL - and self.poll() is not None): - # Issue #19612: stdin.write() fails with EINVAL - # if the process already exited before the write - pass - else: - raise - self.stdin.close() + ov.WriteFile(msvcrt.get_osfhandle(self.stdin.fileno()), input) + except BrokenPipeError: + # Ignore pipe errors and let anything else bubble up + # the stack. This preserves the pre-existing behavior. + pass + else: + # If we had any previously pending overlapped write, + # this write will still work just fine, so we can use + # this overlapped IO completion to signal closing the + # stdin pipe. + pending_ios.append(_OverlappedIO('stdin', ov, ov.event)) - # Wait for the reader threads, or time out. If we time out, the - # threads remain reading and the fds left open in case the user - # calls communicate again. - if self.stdout is not None: - self.stdout_thread.join(self._remaining_time(endtime)) - if self.stdout_thread.is_alive(): - raise TimeoutExpired(self.args, orig_timeout) - if self.stderr is not None: - self.stderr_thread.join(self._remaining_time(endtime)) - if self.stderr_thread.is_alive(): + def read(name): + if handles[name]: + ov = _overlapped.Overlapped() + ov.ReadFile(handles[name], 32768) + pending_ios.append(_OverlappedIO(name, ov, ov.event)) + + # Prep the stdout/stderr reads, as applicable + if not self._communication_started: + read('stdout') + read('stderr') + + # Time for an event loop! + while pending_ios: + timeout = self._remaining_time(endtime) + if timeout is not None and timeout < 0: raise TimeoutExpired(self.args, orig_timeout) - # Collect the output from and close both pipes, now that we know - # both have been read successfully. - stdout = None - stderr = None + # Prep the real timeout and wait for the pending IOs + timeout = 100 if timeout is None else int(timeout * 1000) + # There is technically a race condition here that if the IO + # finished between our calls, this may not signal. While the + # situation is rare, we can cut our polling time to about 100ms, + # which should at least let us recover in those situations + # fairly quickly, and which should still be minimal overhead. + _winapi.WaitForMultipleObjects( + [e for _, _, e in pending_ios], False, min(timeout, 100)) + + # split the ios into completed and still pending + completed = [p for p in pending_ios if not p.io.pending] + pending_ios[:] = [p for p in pending_ios if p not in completed] + + # Process any completed io events + for comp in completed: + # If we finished the stdin write, go ahead and close the + # stdin file handle to prevent subsequent writes. + if comp.name == 'stdin': + self.stdin.close() + continue + + # As long as we've received a result from our read, there + # might be more data available on the other side. Keep + # trying to read from that handle. + result = comp.io.getresult() + if result: + output[comp.name].append(result) + read(comp.name) + + # Close stdout and stderr if self.stdout: - stdout = self._stdout_buff self.stdout.close() if self.stderr: - stderr = self._stderr_buff self.stderr.close() - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = stdout[0] - if stderr is not None: - stderr = stderr[0] + # Return the buffers (if any) to the communicate() method. + stdout = self._stdout_buff if self.stdout else None + stderr = self._stderr_buff if self.stderr else None return (stdout, stderr) @@ -1254,6 +1379,76 @@ kill = terminate + def write_nonblocking(self, input, timeout=0): + """Writes as much input to the child stdin as possible, returns the + number of bytes written. + """ + # Handle circular dependency. + global _overlapped + if not _overlapped: + from asyncio import _overlapped + + writing = getattr(self, '_writing', None) + if writing: + if writing.error not in {0, _winapi.ERROR_IO_PENDING}: + raise Exception("got overlapped result error: %s" % writing.error) + + # Don't accept any more IOs until the last one is done + if writing.pending: + return 0 + + # Get the result and discard it. + try: + writing.getresult(True) + except BrokenPipeError: + # This can happen even if the full write succeeded. + pass + del self._writing + + if not input: + return 0 + + handle = msvcrt.get_osfhandle(self.stdin.fileno()) + self._writing = ov = _overlapped.Overlapped() + try: + ov.WriteFile(handle, input) + except BrokenPipeError: + del self._writing + return 0 + + if ov.error not in {0, _winapi.ERROR_IO_PENDING}: + raise Exception("got overlapped result error: %s"%ov.error) + + # As in communicate(), there is technically a race condition between + # our attempt to write and using the Wait system call, so we need + # to cut our timeout down a bit so that we can at least return if we + # are unlucky enough to encounter the race condition. + WFSO = _winapi.WaitForSingleObject + TO = _winapi.WAIT_TIMEOUT + timeout = int(timeout * 1000) + while timeout > 0 and WFSO(handle, min(timeout, 100)) == TO: + timeout -= 100 + + # return that we sent all the input, even if it is still pending + return len(input) + + def _read_nonblocking(self, pipe, bufsize): + # Handle circular dependency. + global _overlapped + if not _overlapped: + from asyncio import _overlapped + + handle = msvcrt.get_osfhandle(pipe.fileno()) + try: + avail = _winapi.PeekNamedPipe(handle, 0)[0] + if avail > 0: + ov = _overlapped.Overlapped() + ov.ReadFile(handle, min(bufsize, avail)) + return ov.getresult(True) + except BrokenPipeError: + pass + return b'' + else: # # POSIX methods @@ -1614,22 +1809,7 @@ self.wait(timeout=self._remaining_time(endtime)) - # All data exchanged. Translate lists into strings. - if stdout is not None: - stdout = b''.join(stdout) - if stderr is not None: - stderr = b''.join(stderr) - - # Translate newlines, if requested. - # This also turns bytes into strings. - if self.universal_newlines: - if stdout is not None: - stdout = self._translate_newlines(stdout, - self.stdout.encoding) - if stderr is not None: - stderr = self._translate_newlines(stderr, - self.stderr.encoding) - + # Return the buffers (if any) to the communicate() method. return (stdout, stderr) @@ -1658,3 +1838,44 @@ """Kill the process with SIGKILL """ self.send_signal(signal.SIGKILL) + + def write_nonblocking(self, input, timeout=0): + """Writes as much input to the child stdin as possible, returns the + number of bytes written. + """ + try: + with _PopenSelector() as selector: + selector.register(self.stdin, selectors.EVENT_WRITE) + + if selector.select(max(timeout, 0)): + return os.write(self.stdin.fileno(), input) + return 0 + except BrokenPipeError: + # We also suppress any pipe errors on Linux. + return 0 + + def _read_nonblocking(self, pipe, bufsize): + # Yes, fcntl non-blocking bit flipping is necessary for this to + # work. + flags = fcntl.fcntl(pipe, fcntl.F_GETFL) + fcntl.fcntl(pipe, fcntl.F_SETFL, flags | os.O_NONBLOCK) + + try: + # Try to read up front because if we get the PollSelector, then + # a read event may never be triggered if the child process has + # already sent data. Commenting the next 3 lines breaks + # test_nonblocking. + ret = pipe.read(bufsize) + if ret: + return ret + + with _PopenSelector() as selector: + selector.register(pipe, selectors.EVENT_READ) + + if selector.select(0): + return pipe.read(bufsize) + finally: + if not pipe.closed: + fcntl.fcntl(pipe, fcntl.F_SETFL, flags) + + return b'' diff -r d8659dbebfd1 Lib/test/subprocessdata/qcat.py --- a/Lib/test/subprocessdata/qcat.py Thu Mar 27 20:51:15 2014 -0700 +++ b/Lib/test/subprocessdata/qcat.py Mon Apr 14 14:29:34 2014 -0700 @@ -5,3 +5,5 @@ if __name__ == "__main__": for line in sys.stdin: sys.stdout.write(line) + if '--flush' in sys.argv: + sys.stdout.flush() diff -r d8659dbebfd1 Lib/test/test_subprocess.py --- a/Lib/test/test_subprocess.py Thu Mar 27 20:51:15 2014 -0700 +++ b/Lib/test/test_subprocess.py Mon Apr 14 14:29:34 2014 -0700 @@ -759,10 +759,32 @@ self.addCleanup(p.stderr.close) self.addCleanup(p.stdin.close) p.stdin.write(b"banana") + p.stdin.flush() (stdout, stderr) = p.communicate(b"split") self.assertEqual(stdout, b"bananasplit") self.assertStderrEqual(stderr, b"") + def test_nonblocking_writes_before_communicate(self): + # stdin.write before communicate() + p = subprocess.Popen([sys.executable, "-c", + 'import sys, time;' + 'time.sleep(1);' + 'sys.stdout.write(sys.stdin.read())'], + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + self.addCleanup(p.stdout.close) + self.addCleanup(p.stderr.close) + self.addCleanup(p.stdin.close) + # Make sure to induce a fully overlapped IO on Windows + oremain = b"banana\n" * (100000 if subprocess.mswindows else 1) + remain = memoryview(oremain) + while remain: + remain = remain[p.write_nonblocking(remain):] + (stdout, stderr) = p.communicate(b"split\n") + self.assertEqual(stdout.replace(b'\r', b''), oremain + b"split\n") + self.assertStderrEqual(stderr, b"") + def test_universal_newlines(self): p = subprocess.Popen([sys.executable, "-c", 'import sys,os;' + SETBINARY + @@ -909,6 +931,28 @@ _bootlocale.getpreferredencoding = old_getpreferredencoding self.assertEqual(stdout, '1\n2\n3\n4') + def test_universal_newlines_only_takes_strings(self): + # Verify that when universal newlines are enabled that we can only + # send strings, not bytes. + p = subprocess.Popen( + [sys.executable, '-c', + "import sys; sys.stdout.write(sys.stdin.read());"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) + self.assertRaises(AttributeError, p.communicate, input=b'I should fail\n', timeout=1) + p.stdin.close() + p.stdout.close() + + def test_universal_newlines_no_strings_nonblocking(self): + # Verify that when universal newlines are enabled that we can only + # send strings, not bytes. + p = subprocess.Popen( + [sys.executable, '-c', + "import sys; sys.stdout.write(sys.stdin.read());"], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True) + self.assertRaises(TypeError, p.write_nonblocking, input='I should fail\n') + p.stdin.close() + p.stdout.close() + def test_no_leaking(self): # Make sure we leak no resources if not mswindows: @@ -1145,6 +1189,110 @@ fds_after_exception = os.listdir(fd_directory) self.assertEqual(fds_before_popen, fds_after_exception) + def test_nonblocking(self): + # We just want to verify that we can send and receive data incrementally + # from the child process. + qcat = support.findfile("qcat.py", subdir="subprocessdata") + subdata = b'zxcvbn' + data = subdata * 4 + os.linesep.encode('latin-1') + + p = subprocess.Popen([sys.executable, qcat, '--flush'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.stdin.close) + self.addCleanup(p.stdout.close) + self.addCleanup(p.kill) + + for i in range(3): + # Do it a few times to make sure that we can actually interact. + self.assertEqual(p.write_nonblocking(data), len(data)) + r = b'' + # Try for a bit for slow buildbots. + timeout = time.time() + 3 + while len(r) != len(data) and time.time() < timeout: + # Also read small chunks to make sure that we are okay reading + # small chunks. + re = p.read_nonblocking(8) + self.assertLessEqual(len(re), 8) + r += re + time.sleep(.01) + self.assertEqual(r, data) + + def test_nonblocking_sleeper(self): + # We want to make sure that even with a "hung" child process, we can + # at least send some data into the pipe buffers. + p = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.stdin.close) + self.addCleanup(p.stdout.close) + self.addCleanup(p.kill) + + total = 0 + passes = 0 + timeout = time.time() + 3 + while time.time() < timeout: + passes += 1 + # A single write should fill up any pipe buffers with the child + # not reading at all. + total += p.write_nonblocking(32765*b'1' + b'\n') + time.sleep(.001) + self.assertTrue(passes > 1) + self.assertTrue(total) + + def test_nonblocking_write_timeout(self): + # Test to make sure that when writing to a child process we can use + # timeouts to stall our writing if the child isn't reading fast enough. + p = subprocess.Popen([sys.executable, '-c', 'import time; time.sleep(1)'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + + self.addCleanup(p.stdin.close) + self.addCleanup(p.stdout.close) + self.addCleanup(p.kill) + + total = 0 + passes = 0 + waited = False + timeout = time.time() + 1.5 + while time.time() < timeout: + passes += 1 + # A single write should fill up any pipe buffers with the child + # not reading at all. + s = time.time() + total += p.write_nonblocking(32765*b'1' + b'\n', .25) + e = time.time() + waited = waited or e - s > .25 + self.assertTrue(passes > 1) + self.assertTrue(total) + self.assertTrue(waited) + + def test_nonblocking_large_write(self): + # Test whether clients can + p = subprocess.Popen([sys.executable, '-c', + 'import sys; sys.stdout.write(str(len(sys.stdin.read())));'], + stdin=subprocess.PIPE, stdout=subprocess.PIPE) + time.sleep(.1) + + self.addCleanup(p.stdin.close) + self.addCleanup(p.stdout.close) + self.addCleanup(p.kill) + + block = (2**20 - 1)*b'1' + b'\n' + total = 0 + passes = 0 + timeout = time.time() + .25 + while time.time() < timeout: + passes += 1 + total += p.write_nonblocking(block) + p.stdin.close() + self.assertTrue(passes > 1) + self.assertTrue(total) + time.sleep(.25) + + received = int(p.stdout.read()) + self.assertEqual(total, received) + + @unittest.skipIf(mswindows, "POSIX specific tests") class POSIXProcessTestCase(BaseTestCase):