Index: Lib/test/test_subprocess.py =================================================================== --- Lib/test/test_subprocess.py (revision 79884) +++ Lib/test/test_subprocess.py (working copy) @@ -914,7 +914,51 @@ if dir is not None: os.rmdir(dir) +# The following test covers #6610, handling child pipes when the standard +# fds (0, 1, 2) are closed, i.e., in a daemon process +# This test case backs up and closes the standard fds, restoring them later +@unittest.skipUnless(os.name == 'posix', "only relevant for UNIX") +class NoStandardFdsTest(BaseTestCase): + def setUp(self): + # acquire/release 16 fds to make sure fds are (a) high (b) available + dummy_fds = [os.open('/dev/null', os.O_RDONLY) for i in range(16)] + for dummy_fd in dummy_fds: + os.close(dummy_fd) + # duplicate standard fds to the highest available fds + for std_fd, dup_fd in enumerate(dummy_fds[-3:]): + os.dup2(std_fd, dup_fd) + self._dup_fds = dummy_fds[-3:] + ProcessTestCase.setUp(self) + def test_coprocess(self): + # coprocess: see Advanced Programming in the UNIX env., 2nd ed, 15.4 + with _NoStandardFds(self._dup_fds): + b = b'teolicy was here' + p = subprocess.Popen('/bin/cat', + stdin=subprocess.PIPE, + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + p.stdin.write(b) + p.stdin.close() + self.assertEqual(p.stdout.read(), b) + p.wait() + + def tearDown(self): + for dup_fd in self._dup_fds: + os.close(dup_fd) + ProcessTestCase.tearDown(self) + +# context manager that receives backups of the standard file descriptors, +# closes them on enter and restores them on exit +class _NoStandardFds(object): + def __init__(self, dup_fds): + self.dup_fds = dup_fds + def __enter__(self): + os.closerange(0, 3) + def __exit__(self, error_type, error_value, traceback): + for stdfd, dupfd in enumerate(self.dup_fds): + os.dup2(dupfd, stdfd) + @unittest.skipUnless(getattr(subprocess, '_has_poll', False), "poll system call not supported") class ProcessTestCaseNoPoll(ProcessTestCase): @@ -967,7 +1011,8 @@ ProcessTestCasePOSIXPurePython, CommandTests, ProcessTestCaseNoPoll, - HelperFunctionTests) + HelperFunctionTests, + NoStandardFdsTest) support.run_unittest(*unit_tests) support.reap_children()