diff -r 2d705c883872 Doc/library/pty.rst --- a/Doc/library/pty.rst Mon Jan 02 02:51:59 2017 -0500 +++ b/Doc/library/pty.rst Mon Jan 02 19:22:29 2017 +0100 @@ -41,9 +41,13 @@ .. function:: spawn(argv[, master_read[, stdin_read]]) - Spawn a process, and connect its controlling terminal with the current - process's standard io. This is often used to baffle programs which insist on - reading from the controlling terminal. + Spawn a slave process, and connect its controlling master terminal with the + current process's standard io. This is often used to baffle programs which + insist on reading from the controlling terminal. + + A loop copies STDIN of the master to the slave and data received from the + slave to STDOUT. It is not signaled to the slave if STDIN of the master + closes down. The functions *master_read* and *stdin_read* should be functions which read from a file descriptor. The defaults try to read 1024 bytes each time they are @@ -91,3 +95,14 @@ script.write(('Script done on %s\n' % time.asctime()).encode()) print('Script done, file is', filename) + +Caveats +------ + +.. sectionauthor:: Cornelius Diekmann + +Be aware that processes started by :func:`spawn` do not receive any information +about STDIN of their master shutting down. For example, if run on a terminal on +a Linux system, ``/bin/sh < /dev/null`` closes immediately. However, +``./python -c "import pty; pty.spawn(\"/bin/sh\")" < /dev/null`` does not close +because the spawned slave shell is not notified that STDIN is closed. diff -r 2d705c883872 Lib/pty.py --- a/Lib/pty.py Mon Jan 02 02:51:59 2017 -0500 +++ b/Lib/pty.py Mon Jan 02 19:22:29 2017 +0100 @@ -6,15 +6,18 @@ # UNIX Environment. Chapter 19. # Author: Steen Lumholt -- with additions by Guido. -from select import select +import select as lib_select import os import tty __all__ = ["openpty","fork","spawn"] +# These definitions may be overwritten with mocks by the test suite. STDIN_FILENO = 0 STDOUT_FILENO = 1 STDERR_FILENO = 2 +_select = lib_select.select +_execlp = os.execlp CHILD = 0 @@ -133,7 +136,7 @@ standard input -> pty master (stdin_read)""" fds = [master_fd, STDIN_FILENO] while True: - rfds, wfds, xfds = select(fds, [], []) + rfds, wfds, xfds = _select(fds, [], []) if master_fd in rfds: data = master_read(master_fd) if not data: # Reached EOF. @@ -144,6 +147,9 @@ data = stdin_read(STDIN_FILENO) if not data: fds.remove(STDIN_FILENO) + # Proposal for future behavior change: Signal EOF to + # slave if STDIN of master is gone. Solves issue29054. + # os.write(master_fd, b'\x04') else: _writen(master_fd, data) @@ -153,7 +159,7 @@ argv = (argv,) pid, master_fd = fork() if pid == CHILD: - os.execlp(argv[0], *argv) + _execlp(argv[0], *argv) try: mode = tty.tcgetattr(STDIN_FILENO) tty.setraw(STDIN_FILENO) diff -r 2d705c883872 Lib/test/test_pty.py --- a/Lib/test/test_pty.py Mon Jan 02 02:51:59 2017 -0500 +++ b/Lib/test/test_pty.py Mon Jan 02 19:22:29 2017 +0100 @@ -10,14 +10,30 @@ import select import signal import socket +import termios +import time import unittest TEST_STRING_1 = b"I wish to buy a fish license.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n" +# Set this to True if you are in a forked child which continues to run +# python. Yes, this is fragile! It is only carefully used for mocking +# a real exec-call. +_raw_debug = False + if verbose: def debug(msg): - print(msg) + if not _raw_debug: + print(msg) + else: + # Bypass python built-in print. Print debug information in + # a way we can call it from a forked child which happens to + # have the same STDOUT as the parent. Fragile! + stdout = sys.stdout.fileno() + assert stdout == 1 + msg = msg.encode('ascii') + b'\n' + os.write(stdout, msg) else: def debug(msg): pass @@ -34,6 +50,7 @@ # This is about the best we can do without getting some feedback # from someone more knowledgable. + debug("normalize_output: {}".format(data)) # OSF/1 (Tru64) apparently turns \n into \r\r\n. if data.endswith(b'\r\r\n'): return data.replace(b'\r\r\n', b'\n') @@ -45,6 +62,72 @@ return data +class _TestReadExactlyTooMuch(Exception): + """The protocol used for the test suite wants to read exactly the + specified amount of data. If we read more or less, there is an + error. Note that os.read() is nondeterministic so we need to be very + careful to make the test suite deterministic.""" + pass + +def _os_read_exactly(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough. + Throws _TestReadExactlyTooMuch if more data is in the pipe.""" + # Normal os.read() may give us less-than-expected which makes the + # tests fail SOMETIMES. + + # Beware, on my Linux system, if I put 'foo\n' into a pipe, I get + # back 'foo\r\n'. For a proper read, numbytes = len('foo\r\n') If + # you just wrap everything in normalize_output, you won't know how + # much data including line endings you expect. I use (in the slave) + # always print('foo', end='', flush=True) and read back exactly + # len('foo') bytes, i.e. no newlines! + + # XXX(diekmann): PtyTest fails *sometimes*. + # I did not touch it!. + # It may also need to use this function. + # At least we identified one possible reason why this test suite + # fails randomly. + ret = [] + numread = 0 + + while numread < numbytes: + if numread > 0: + # Yay, possible non-determinism caught and prevented + debug("[_os_read_exactly] Took more than one os.read() call." + "Read so far: {}".format(ret)) + r = os.read(fd, 1024) + ret.append(r) + numread += len(r) + + if numread != numbytes: + debug("Read more data than expected. Fix your test protocol.") + debug("Read so far: `{:s}' ({:d} bytes), expected to read only " + "{:d} bytes".format(str(ret), numread, numbytes)) + raise _TestReadExactlyTooMuch + + return b''.join(ret) + +def _os_read_only(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough. + Does not touch the pipe beyond numbytes.""" + ret = [] + numread = 0 + + while numread < numbytes: + if numread > 0: + # Yay, possible non-determinism caught and prevented + debug("[_os_read_only] Took more than one os.read() call") + r = os.read(fd, numbytes - numread) + ret.append(r) + numread += len(r) + + assert numread == numbytes + + return b''.join(ret) + +# We will access a lot of internal functions for mocking. +#pylint: disable=protected-access + # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. # XXX(nnorwitz): these tests leak fds when there is an error. @@ -61,6 +144,7 @@ signal.signal(signal.SIGALRM, self.old_alarm) def handle_sig(self, sig, frame): + #pylint: disable=unused-argument self.fail("isatty hung") def test_basic(self): @@ -197,14 +281,71 @@ # pty.fork() passed. +class PtyPosixIntegrationTest(unittest.TestCase): + """Test black-box functionality. May actually fork() and exec() + local programs! Should not be intrusive for your local machine. + """ + def setUp(self): + pass -class SmallPtyTests(unittest.TestCase): - """These tests don't spawn children or hang.""" + def tearDown(self): + pass + + @staticmethod + def _skip_not_available(executable): + assert '/' not in executable + # /bin/true on linux, /usr/bin/true on OSX and BSD + defpath = ['/bin', '/usr/bin'] + has_exec = lambda exe_path: (os.path.isfile(exe_path) and + os.access(exe_path, os.R_OK | os.X_OK)) + for p in defpath: + path = os.path.join(p, executable) + if has_exec(path): + debug('Found {} in {}'.format(executable, path)) + return path + + raise unittest.SkipTest("File `{:s}' seems not to be available on " + "your system (searched in {}).".format(executable, defpath)) + + def _spawn_binary_return_returncode(self, binary): + """Helper function to do pty.spawn() on binary and return the + return code, assuming successful termination.""" + binary = self._skip_not_available(binary) + + ret = pty.spawn(binary) + + # behavior of waitpid in module posix + self.assertLess(ret, 2**16) + killsig = ret & 0xff + self.assertEqual(killsig, 0) + + retcode = (ret & 0xff00) >> 8 + return retcode + + # Warning: test goes amok if you pipe data to STDIN! + # This is (sort of) expected! + def test_spawn_bin_true(self): + """Spawn /bin/true and read return code.""" + retcode = self._spawn_binary_return_returncode('true') + self.assertEqual(retcode, 0) + + # Warning: test goes amok if you pipe data to STDIN! + # This is (sort of) expected! + def test_spawn_bin_false(self): + """Spawn /bin/false and read return code.""" + retcode = self._spawn_binary_return_returncode('false') + self.assertEqual(retcode, 1) + + +class PtyMockingTestBase(unittest.TestCase): + """Base class for tests which reaplce STDIN and STDOUT of pty module + with their own pipes.""" def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO self.orig_stdout_fileno = pty.STDOUT_FILENO - self.orig_pty_select = pty.select + self.orig_pty_select = pty._select + self.orig_pty_ececlp = pty._execlp self.fds = [] # A list of file descriptors to close. self.files = [] self.select_rfds_lengths = [] @@ -213,7 +354,8 @@ def tearDown(self): pty.STDIN_FILENO = self.orig_stdin_fileno pty.STDOUT_FILENO = self.orig_stdout_fileno - pty.select = self.orig_pty_select + pty._select = self.orig_pty_select + pty._execlp = self.orig_pty_ececlp for file in self.files: try: file.close() @@ -230,15 +372,596 @@ self.fds.extend(pipe_fds) return pipe_fds +class PtyMockingExecTestBase(PtyMockingTestBase): + """Base class for tests with the following properties: They spawn a + slave process and a master process in the background. These + processes are connected over STDIN/STDOUT pipes. These tests + replace the exec-call in pty.spawn() with python code. They run + fork, select, and all other calls on your system. They are + integration tests. + + Starting from the master (the main thread of this test suite), two + additional processes are forked in this test setup. + + master + | + create mock + STDIN/STDOUT + pipes + | + | + ...fork()..> background + | | + | + ........................pty.spawn()..>slave + _copy + and | <-- STDIN/STDOUT --> | + wait for | pipes | + slave | | + | | + _exit | + | + _exit + wait for + background + | + + + + The pyt.spawn() does not do a normal fork()+exec(). We have + replaced exec() by our python code for the test cases. This makes + the test suite very portable. + """ + + @staticmethod + def _exit_success(): + """Clean exit. sys.exit() may be hooked by the test suite.""" + # Only use this in the forked or spawned childs! + os._exit(0) + + @staticmethod + def _exit_failure(): + """Clean exit. sys.exit() may be hooked by the test suite.""" + # Only use this in the forked or spawned childs! + os._exit(1) + + @staticmethod + def _disable_terminal_echo(terminal_fd): + """Disable echoing of every character in the terminal.""" + old = termios.tcgetattr(terminal_fd) + old[3] = old[3] & ~ termios.ECHO + termios.tcsetattr(terminal_fd, termios.TCSADRAIN, old) + + + def __fork_background_process(self, master_fun, to_stdin, from_stdout): + # to_stdin: write to slave's STDIN + # from_stdout: read from slave's STDOUT + pid = os.fork() + if pid < 0: + raise RuntimeError("fork failed") + if pid > 0: + return pid + # WARNING: we have forked. + # You don't get nice python exceptions in your master_fun! + global _raw_debug + _raw_debug = True + debug("[background] started") + master_fun(to_stdin, from_stdout) + + # Forked! Does not return. + # Use _exit_success or _exit_failure to signal success. + + def _spawn_master_and_slave(self, master_fun, slave_fun, close_stdin=False): + """Spawn a slave and fork a master background process. + master_fun and slave_fun must be python functions. This + function forks them and connects their STDIN/STDOUT over a pipe + and checks that they cleanly exit. Warning: You don't get + python exception handling in master_fun and slave_fun and they + don't return anything.""" + read_from_stdout_fd, mock_stdout_fd = self._pipe() + pty.STDOUT_FILENO = mock_stdout_fd + mock_stdin_fd, write_to_stdin_fd = self._pipe() + pty.STDIN_FILENO = mock_stdin_fd + + # overwrite pty._execlp to directly run python code! + pty._execlp = slave_fun + + if close_stdin: + debug("Closing stdin") + os.close(write_to_stdin_fd) + + sys.stdout.flush() + # Without this flush, we interfere with the debug output from + # the child that will be spawned. It will work fine without the + # flush, unless you enable debug and have your STDOUT piped! + # The forked child will set _raw_debug and print to the same fd + # as we (the parent) print our debug information to. + + background_pid = self.__fork_background_process( + master_fun, write_to_stdin_fd, read_from_stdout_fd) + + retcode_slave = pty.spawn('ignored') + if debug and retcode_slave != 0: + debug("ERROR: spawned slave returned but failed.") + debug("Trying to dump what the salve wrote for debugging. " + "Will block the testsuite indefinitely.") + while True: + sdbg = os.read(read_from_stdout_fd, 1024) + debug(sdbg) + self.assertEqual(retcode_slave, 0) + + retcode_background = os.waitpid(background_pid, 0)[1] + self.assertEqual(retcode_background, 0) + + debug("background and slave are done") + + # We require that b'slave exits now' is left in the slave's + # STDOUT to confirm clean exit. We carefully read only the + # first letter first. + slave_wrote = _os_read_only(read_from_stdout_fd, 1) + if slave_wrote in [b'\x04']: #characters to ignore + # On OS X, we found that the EOF test leaves an EOF + # character in STDOUT which we ignore here. Tested on OS X + # 10.6.8 + debug("An EOF character was left in STDOUT, we ignore this.") + slave_wrote = _os_read_exactly(read_from_stdout_fd, len(b'slave exits now')) + else: + # Already got the first letter + self.assertEqual(slave_wrote, b's') + slave_wrote += _os_read_exactly(read_from_stdout_fd, len(b'lave exits now')) + self.assertEqual(slave_wrote, b'slave exits now') + +class PtyWhiteBoxIntegrationPingPong1000Test(PtyMockingExecTestBase): + """Master and Slave count to 1000 by turns.""" + + def _background_process(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + everything_is_awesome = True + + # handshake with slave, let the slave have the first word when it is ready + debug("[background] waiting for input from slave.") + read = _os_read_exactly(from_stdout, len(b'slave ready!')) + if read != b'slave ready!': + everything_is_awesome = False + debug("[background] handshake with slave part 1 of 2 done.") + + answer = b"Hello, I'm background process!\n" + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + debug("[background] handshake with slave completed.") + + # Ping-Pong count to 1000 with slave. + # Read Ping from slave, reply with pong. + # If something is wrong, this may block. + for i in range(1000): + len_expected = len("Ping {:d}".format(i)) + inp = _os_read_exactly(from_stdout, len_expected).decode('ascii') + #debug("background read: {:s}".format(inp)) + try: + ping, num = inp.split() + except ValueError as e: + debug("Failed to parse ping from slave: {:s} for input `{:s}'".format(str(e), inp)) + raise e + if ping != "Ping" or int(num) != i: + everything_is_awesome = False + answer = "Pong {:d}\n".format(i).encode('ascii') + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + #debug("background write {}".format(answer)) + + if everything_is_awesome: + self._exit_success() + else: + debug("things are not awesome") + self._exit_failure() + + def _mock_execlp(self, prog, argv): + """The child/slave process to be run in the newly spawned pty. + Ignores its arguments, runs the python code instead. + Requires that STDIN and STDOUT are set correctly.""" + #pylint: disable=unused-argument + # WARNING: we are in the forked slave process. + # You don't get nice python exceptions here! + + # Disable echoing, otherwise the terminal (remember, we are in a + # pty here!) will echo back everything it reads. Without + # disabling echo, the background process in master will read + # back what it writes to us (the slave) and parsing fails. + self._disable_terminal_echo(sys.stdin.fileno()) + + # Start handshaking with master. We initiate. + print("slave ready!", end='', flush=True) + inp = input() + if inp != "Hello, I'm background process!": + self._exit_failure() + # Handshake done. + + # Ping-Pong count to 1000 with master. We start. + for i in range(1000): + print("Ping {:d}".format(i), end='', flush=True) + pong, num = input().split() + if pong != "Pong" or int(num) != i: + self._exit_failure() + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + self._exit_success() + + def test_partner_count_to_thousand(self): + """Spawn a slave and fork a master background process. Let them + Ping-Pong count alternately to 1000.""" + self._spawn_master_and_slave(self._background_process, + self._mock_execlp) + +class PtyWhiteBoxIntegrationReadSlaveTest(PtyMockingExecTestBase): + """Read from (slow) slave, make sure we get everything.""" + + def _background_process(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + debug("[background] starting to read") + debug("[background] may take 1 second until client has sent everything.") + + if self.background_close_stdin: + debug("[background] closing stdin (only in forked background process). " + "Master copy process is not affected!") + os.close(to_stdin) + + bytes_transferred = 0 + for i in range(1000): + expecting = "long cat is long "*10 + "ID {:d}".format(i) + expecting = expecting.encode('ascii') + inp = _os_read_only(from_stdout, len(expecting)) + #debug(inp) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + bytes_transferred += len(inp) + + debug("[background] received {} bytes from the slave.".format(bytes_transferred)) + self._exit_success() + + def _mock_execlp(self, prog, argv): + #pylint: disable=unused-argument + # WARNING: we are in the forked slave process. + # You don't get nice python exceptions here! + + for i in range(1000): + print("long cat is long "*10 + "ID {:d}".format(i), end='', flush=True) + if i % 400 == 0: + time.sleep(0.1) #make slow, sometimes (two times). + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + self._exit_success() + + def test_read_large_from_slave(self): + """Spawn a slave and fork a master background process. Receive + several kBytes from the slave.""" + self.background_close_stdin = False + self._spawn_master_and_slave(self._background_process, + self._mock_execlp) + + def test_read_large_from_slave_close_stdin(self): + """Spawn a slave and fork a master background process. Close + STDIN and receive several kBytes from the slave.""" + self.background_close_stdin = False + self._spawn_master_and_slave(self._background_process, + self._mock_execlp, close_stdin=True) + + def test_read_large_from_slave_close_stdin_background(self): + """Spawn a slave and fork a master background process. Close + STDIN only in the background process and receive several kBytes + from the slave.""" + self.background_close_stdin = True + self._spawn_master_and_slave(self._background_process, + self._mock_execlp, close_stdin=False) + +class PtyWhiteBoxIntegrationTermiosTest(PtyMockingExecTestBase): + """Terminals are not just pipes. This testsuite uses some of their + magic services. It tests ISIG, which transforms sending 0x03 at the + master side (usually triggered by humans by pressing ctrl+c) to + sending an INTR signal to the slave. In addition, on Linux, it + tests prtty printing of control characters, for example ^C, and ^G, + which is not defined in the POSIX.1-2008 Standard but implemented. + """ + + def _background_handshake(self, to_stdin, from_stdout): + # Handshake with slave, let the slave have the first word when + # it is ready and initialized. The slave's terminal is set to + # echo everything back it receives. + debug("[background] waiting for input from slave.") + read = _os_read_exactly(from_stdout, len(b'slave ready!')) + if read != b'slave ready!': + self._exit_failure() + debug("[background] handshake with slave part 1 of 3 done.") + + answer = b"Hello, I'm background process!\n" + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + debug("[background] handshake with slave 2 of 3 done.") + debug("[background] reading echoed") + + # Slave terminal echoes back everything, rewriting line endings! + answer = answer[:-1] + b'\r\n' + read = _os_read_exactly(from_stdout, len(answer)) + if read != answer: + debug("Unexpected answer: {:s}".format(str(read))) + self._exit_failure() + debug("[background] handshake with slave 3 of 3 completed.") + + def _enable_echoctl(self): + if sys.platform == 'linux': + self.echoctl = True + else: + raise unittest.SkipTest('Test only available on Linux.') + + def _base_terminal_setup(self, additional_lflag=0): + """Set up terminal to sane defaults (with regard to my Linux + system). See POSIX.1‐2008, Chapter 11, General Terminal + Interface.""" + #WARNING: expected to be called in pty.spawn()-ed child only, + # otherwise, you mess up you terminal. + + # Warning: ECHOCTL is not defined in POSIX. Works on Linux in + # the year 2015. Did not work on Mac. + if self.echoctl: + echoctl = termios.ECHOCTL + else: + echoctl = 0 + + terminal_fd = sys.stdin.fileno() + old = termios.tcgetattr(terminal_fd) + # don't need iflag + old[0] = 0 + + # oflag: output processing: replace \n by \r\n + old[1] = termios.ONLCR | termios.OPOST + + # don't need cflag + old[2] = 0 + + # lflag: canonical mode (line-buffer), echoing, echoing of control chars ^C + old[3] = termios.ICANON | termios.ECHO | echoctl | additional_lflag + + termios.tcsetattr(terminal_fd, termios.TCSADRAIN, old) + + def _background_process_bell(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending bell escape sequence to slave") + + BELL = b'\a' + to_slave = b'Bell here -> '+BELL+b' <-Hello slave!\n' + os.write(to_stdin, to_slave) + + # Bell charchter gets `pretty-printed' when echoed by terminal + expecting = b'Bell here -> ^G <-Hello slave!\r\n' + inp = _os_read_only(from_stdout, len(expecting)) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + + debug("[background] got it back") + + self._exit_success() + + def _mock_execlp_bell(self, prog, argv): + #pylint: disable=unused-argument + # WARNING: we are in the forked slave process. + # You don't get nice python exceptions here! + + self._base_terminal_setup() + + # Start handshaking with master. We initiate. + print("slave ready!", end='', flush=True) + inp = input() + if inp != "Hello, I'm background process!": + self._exit_failure() + # Handshake done. + + command = input() + # note how background process gets ^G and slave gets \a + if command != 'Bell here -> \a <-Hello slave!': + self._exit_failure() + # terminal has automatically echoed the command, we can ignore it + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + self._exit_success() + + def test_magic_in_magic_out_bell_echoctl(self): + """Terminals: Magic comes in, magic goes out. The terminal bell.""" + self._enable_echoctl() + self._spawn_master_and_slave(self._background_process_bell, + self._mock_execlp_bell) + + def _background_process_eof(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending slave an EOF") + + EOF = b'\x04' + os.write(to_stdin, EOF) + + self._exit_success() + + def _mock_execlp_eof(self, prog, argv): + #pylint: disable=unused-argument + # WARNING: we are in the forked slave process. + # You don't get nice python exceptions here! + + self._base_terminal_setup() + + # Start handshaking with master. We initiate. + print("slave ready!", end='', flush=True) + inp = input() + if inp != "Hello, I'm background process!": + self._exit_failure() + # Handshake done. + + try: + input() + # unreachable if we got our EOF: + fail = True + except EOFError: + # we expect an EOF here + fail = False + + if fail: + self._exit_failure() + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + self._exit_success() + + def test_magic_in_magic_out_eof_echoctl(self): + """Terminals: Magic comes in, magic goes out. EOF.""" + self._enable_echoctl() + self._spawn_master_and_slave(self._background_process_eof, + self._mock_execlp_eof) + + def test_magic_in_magic_out_eof(self): + """Terminals: Magic comes in, magic goes out. EOF.""" + self.echoctl = False + self._spawn_master_and_slave(self._background_process_eof, + self._mock_execlp_eof) + + def _background_process_intr(self, to_stdin, from_stdout): + """Try to send SIGINT to child. Careful: Testsuite also watches + for SIGINT. We only set our signal handler in the forked + slave.""" + #WARNING: we have forked. You don't get nice python exceptions here! + + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending interupt escape sequence to slave.") + + INTR = b'\x03' + to_slave = b'This buffered stuff will be ignored'+INTR+b' Ohai slave!\n' + os.write(to_stdin, to_slave) + + if self.echoctl: + expecting = b'^C Ohai slave!\r\n' + else: + expecting = INTR+b' Ohai slave!\r\n' + inp = _os_read_only(from_stdout, len(expecting)) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + + debug("[background] got it back") + + self._exit_success() + + def __slave_SIGINT_handler(self, a, b): + self._sigint_received = True + pass + + def _mock_execlp_intr(self, prog, argv): + #pylint: disable=unused-argument + # WARNING: we are in the forked slave process and destry signal handlers. + + # unittest suite catches SIGINT and aborts if we don't install our handler. + self._sigint_received = False + signal.signal(signal.SIGINT, self.__slave_SIGINT_handler) + + # also tell terminal to send signals to child on special characters + self._base_terminal_setup(termios.ISIG) + + # Start handshaking with master. We initiate. + print("slave ready!", end='', flush=True) + inp = input() + if inp != "Hello, I'm background process!": + self._exit_failure() + # Handshake done. + + command = input() + # Yes, only this arrives at STDIN here! + if command != ' Ohai slave!': + print(command) + self._exit_failure() + # terminal has automatically echoed the command and ^C, we can ignore it + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + + if self._sigint_received: + self._exit_success() + else: + self._secit_failure() + + def test_magic_in_magic_out_intr_echoctls(self): + """Terminals: Magic comes in, magic goes out. Receiving an INTR, + i.e. pressing ctrl+c in master ^C.""" + self._enable_echoctl() + self._spawn_master_and_slave(self._background_process_intr, + self._mock_execlp_intr) + + def test_magic_in_magic_out_intr(self): + """Terminals: Magic comes in, magic goes out. Receiving an INTR, + i.e. pressing ctrl+c in master.""" + self.echoctl = False + self._spawn_master_and_slave(self._background_process_intr, + self._mock_execlp_intr) + +class _MockSelectOnNoWaitablesError(Exception): + """Raised when _mock_select is called with three empty lists as file + descriptors to wait on. Behavior of real select is + platform-dependent and likely infinite blocking on Linux.""" + pass + +class _MockSelectEternalWait(Exception): + """Used both as exception and placeholder value. Models that select + will block indefinitely.""" + pass + +class SmallPtyTests(PtyMockingTestBase): + """These tests don't spawn children or hang.""" + def _socketpair(self): socketpair = socket.socketpair() self.files.extend(socketpair) return socketpair def _mock_select(self, rfds, wfds, xfds): + """Simulates the behavior of select.select. Only implemented + for reader waiting list (first parameter).""" + assert wfds == [] and xfds == [] # This will raise IndexError when no more expected calls exist. self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + if len(rfds) == 0: + raise _MockSelectOnNoWaitablesError + rfds_result = self.select_rfds_results.pop(0) + # Placeholder to model eternal blocking + if rfds_result is _MockSelectEternalWait: + raise _MockSelectEternalWait + return rfds_result, [], [] + + def test__mock_select(self): + """Test the select proxy of the test class. Such meta testing. + """ + self.select_rfds_lengths.append(0) + with self.assertRaises(_MockSelectOnNoWaitablesError): + self._mock_select([], [], []) + + # Prepare two select calls. Second one will block forever. + self.select_rfds_lengths.append(3) + self.select_rfds_results.append("foo") + self.select_rfds_lengths.append(3) + self.select_rfds_results.append(_MockSelectEternalWait) + + # Call one + self.assertEqual(self._mock_select([1, 2, 3], [], []), + ("foo", [], [])) + + # Call two + with self.assertRaises(_MockSelectEternalWait): + self._mock_select([1, 2, 3], [], []) + + assert self.select_rfds_lengths == [], "select_rfds_lengths is cleaned" + assert self.select_rfds_results == [], "select_rfds_results is cleaned" def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" @@ -253,8 +976,10 @@ os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') + # overwrite pty._select with our mock + pty._select = self._mock_select + # Expect two select calls, the last one will cause IndexError - pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) @@ -268,8 +993,9 @@ self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" + def __copy_eof_helper(self, close_stdin, close_master): + """Helper to test the empty read EOF case on master_fd and/or + stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() @@ -277,20 +1003,83 @@ socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] - socketpair[1].close() - os.close(write_to_stdin_fd) + # close fds or fill with dummy data in order to prevent blocking on + # one read call + if close_master: + socketpair[1].close() + else: + os.write(masters[1], b'from master') + if close_stdin: + os.close(write_to_stdin_fd) + else: + os.write(write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError - pty.select = self._mock_select + # overwrite pty._select with our mock + pty._select = self._mock_select + + # Expect two select calls. The first one returns master_fd and STDIN. + # The second one will cause: + # _MockSelectOnNoWaitablesError if both fds are closed. + # BrokenPipeError if only master_fd is closed. + # _MockSelectEternalWait otherwise. self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) + # We expect that all closed fds were removed from the fds list as all + # closed fds encountered an EOF before the second call. + remaining_fds_after_first_select = sum(0 if closed else 1 for closed + in [close_master, close_stdin]) + self.select_rfds_lengths.append(remaining_fds_after_first_select) + self.select_rfds_results.append(_MockSelectEternalWait) + if close_master and close_stdin: + with self.assertRaises(_MockSelectOnNoWaitablesError): + pty._copy(masters[0]) + #clean, result is never consumed + self.assertEqual(self.select_rfds_results.pop(0), + _MockSelectEternalWait) + elif close_master and not close_stdin: + with self.assertRaises(BrokenPipeError): + # We expect to find a closed master but some data in + # STDIN. The _copy loop will try to write the data from + # STDIN to master, which is a broken pipe. + # + # Only closed master and not closed STDIN is a problem: + # closed STDIN but not closed master is not a broken + # pipe. In this case, we just read from master and + # write to STDOUT + pty._copy(masters[0]) + #clean, we exited after first select call + self.assertEqual(self.select_rfds_lengths.pop(0), 1) + self.assertEqual(self.select_rfds_results.pop(0), + _MockSelectEternalWait) + + else: + with self.assertRaises(_MockSelectEternalWait): + pty._copy(masters[0]) + + # We expect that everything is consumed + self.assertEqual(self.select_rfds_results, []) + self.assertEqual(self.select_rfds_lengths, []) + + # We expect that calling again raises an error with self.assertRaises(IndexError): pty._copy(masters[0]) + def test__copy_eof_on_all(self): + """Test the empty read EOF case on both master_fd and stdin.""" + self.__copy_eof_helper(close_stdin=True, close_master=True) + + def test__copy_eof_on_stdin(self): + """Test the empty read EOF case on only stdin.""" + self.__copy_eof_helper(close_stdin=True, close_master=False) + + def test__copy_eof_on_master(self): + """Test the empty read EOF case on only master_fd.""" + self.__copy_eof_helper(close_stdin=False, close_master=True) + + def test__copy_eof_on_helper(self): + """Test that _copy will block forever in this setting.""" + self.__copy_eof_helper(close_stdin=False, close_master=False) def tearDownModule(): reap_children()