diff -ruN py3k/Lib/test/subprocessdata/qcat.py py3k-subprocess2-unittest2/Lib/test/subprocessdata/qcat.py --- py3k/Lib/test/subprocessdata/qcat.py 1970-01-01 02:00:00.000000000 +0200 +++ py3k-subprocess2-unittest2/Lib/test/subprocessdata/qcat.py 2010-12-11 23:22:46.000000000 +0200 @@ -0,0 +1,7 @@ +"""When ran as a script, simulates cat with no arguments.""" + +import sys + +if __name__ == "__main__": + for line in sys.stdin: + sys.stdout.write(line) diff -ruN py3k/Lib/test/subprocessdata/qgrep.py py3k-subprocess2-unittest2/Lib/test/subprocessdata/qgrep.py --- py3k/Lib/test/subprocessdata/qgrep.py 1970-01-01 02:00:00.000000000 +0200 +++ py3k-subprocess2-unittest2/Lib/test/subprocessdata/qgrep.py 2010-12-11 23:23:50.000000000 +0200 @@ -0,0 +1,10 @@ +"""When called with a single argument, simulated fgrep with a single +argument and no options.""" + +import sys + +if __name__ == "__main__": + pattern = sys.argv[1] + for line in sys.stdin: + if pattern in line: + sys.stdout.write(line) --- py3k/Lib/test/test_subprocess.py 2010-12-10 20:09:56.000000000 +0200 +++ py3k-subprocess2-unittest2/Lib/test/test_subprocess.py 2010-12-12 00:30:16.000000000 +0200 @@ -10,6 +10,7 @@ import re import sysconfig import warnings +import select try: import gc except ImportError: @@ -980,6 +981,83 @@ exitcode = subprocess.call([program, "-c", "pass"], env=envb) self.assertEqual(exitcode, 0) + def test_pipe_cloexec(self): + sleeper = support.findfile("input_reader.py", subdir="subprocessdata") + fd_status = support.findfile("fd_status.py", subdir="subprocessdata") + + p1 = subprocess.Popen([sys.executable, sleeper], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + stderr=subprocess.PIPE, close_fds=False) + + self.addCleanup(p1.communicate, b'') + + p2 = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=False) + + output, error = p2.communicate() + result_fds = set(map(int, output.split(b','))) + unwanted_fds = set([p1.stdin.fileno(), p1.stdout.fileno(), + p1.stderr.fileno()]) + + self.assertFalse(result_fds & unwanted_fds, + "Expected no fds from %r to be open in child, " + "found %r" % + (unwanted_fds, result_fds & unwanted_fds)) + + def test_pipe_cloexec_real_tools(self): + qcat = support.findfile("qcat.py", subdir="subprocessdata") + qgrep = support.findfile("qgrep.py", subdir="subprocessdata") + + subdata = b'zxcvbn' + data = subdata * 4 + b'\n' + + p1 = subprocess.Popen([sys.executable, qcat], + stdin=subprocess.PIPE, stdout=subprocess.PIPE, + close_fds=False) + + p2 = subprocess.Popen([sys.executable, qgrep, subdata], + stdin=p1.stdout, stdout=subprocess.PIPE, + close_fds=False) + + self.addCleanup(p1.wait) + self.addCleanup(p2.wait) + self.addCleanup(p1.terminate) + self.addCleanup(p2.terminate) + + p1.stdin.write(data) + p1.stdin.close() + + readfiles, ignored1, ignored2 = select.select([p2.stdout], [], [], 10) + + self.assertTrue(readfiles, "The child hung") + self.assertEqual(p2.stdout.read(), data) + + def test_close_fds(self): + fd_status = support.findfile("fd_status.py", subdir="subprocessdata") + + fds = os.pipe() + self.addCleanup(os.close, fds[0]) + self.addCleanup(os.close, fds[1]) + + open_fds = set(fds) + + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=False) + output, ignored = p.communicate() + remaining_fds = set(map(int, output.split(b','))) + + self.assertEqual(remaining_fds & open_fds, open_fds, + "Some fds were closed") + + p = subprocess.Popen([sys.executable, fd_status], + stdout=subprocess.PIPE, close_fds=True) + output, ignored = p.communicate() + remaining_fds = set(map(int, output.split(b','))) + + self.assertFalse(remaining_fds & open_fds, + "Some fds were left open") + self.assertIn(1, remaining_fds, "Subprocess failed") + @unittest.skipUnless(mswindows, "Windows specific tests") class Win32ProcessTestCase(BaseTestCase):