diff -r 135e1a8144ec Doc/library/pty.rst --- a/Doc/library/pty.rst Mon Jan 09 11:21:37 2017 +0100 +++ b/Doc/library/pty.rst Mon Jan 09 13:20:08 2017 +0100 @@ -2,8 +2,8 @@ ======================================== .. module:: pty - :platform: Linux - :synopsis: Pseudo-Terminal Handling for Linux. + :platform: Unix + :synopsis: Pseudo-Terminal Handling for Unix. .. moduleauthor:: Steen Lumholt .. sectionauthor:: Moshe Zadka @@ -16,9 +16,9 @@ concept: starting another process and being able to write to and read from its controlling terminal programmatically. -Because pseudo-terminal handling is highly platform dependent, there is code to -do it only for Linux. (The Linux code is supposed to work on other platforms, -but hasn't been tested yet.) +Pseudo-terminal handling is highly platform dependent, this code is mainly +tested on Linux, FreeBSD, and OS X (it is supposed to work on other POSIX +platforms). The :mod:`pty` module defines the following functions: @@ -41,9 +41,13 @@ .. function:: spawn(argv[, master_read[, stdin_read]]) - Spawn a process, and connect its controlling terminal with the current - process's standard io. This is often used to baffle programs which insist on - reading from the controlling terminal. + Spawn a child process, and connect its controlling terminal with the + current process's standard io. This is often used to baffle programs which + insist on reading from the controlling terminal. + + A loop copies STDIN of the current process to the child and data received + from the child to STDOUT of the current process. It is not signaled to the + child if STDIN of the current process closes down. The functions *master_read* and *stdin_read* should be functions which read from a file descriptor. The defaults try to read 1024 bytes each time they are @@ -91,3 +95,14 @@ script.write(('Script done on %s\n' % time.asctime()).encode()) print('Script done, file is', filename) + +Caveats +------ + +.. sectionauthor:: Cornelius Diekmann + +Be aware that processes started by :func:`spawn` do not receive any information +about STDIN of their parent shutting down. For example, if run on a terminal on +a Linux system, ``/bin/sh < /dev/null`` closes immediately. However, +``./python -c 'import pty; pty.spawn("/bin/sh")' < /dev/null`` does not close +because the spawned child shell is not notified that STDIN is closed. diff -r 135e1a8144ec Lib/pty.py --- a/Lib/pty.py Mon Jan 09 11:21:37 2017 +0100 +++ b/Lib/pty.py Mon Jan 09 13:20:08 2017 +0100 @@ -1,7 +1,7 @@ """Pseudo terminal utilities.""" # Bugs: No signal handling. Doesn't set slave termios and window size. -# Only tested on Linux. +# Only tested on Linux, FreeBSD, and OS X. # See: W. Richard Stevens. 1992. Advanced Programming in the # UNIX Environment. Chapter 19. # Author: Steen Lumholt -- with additions by Guido. @@ -133,19 +133,26 @@ standard input -> pty master (stdin_read)""" fds = [master_fd, STDIN_FILENO] while True: + # The expected path to leave this infinite loop is that the + # child exits and its slave_fd is destroyed. In this case, + # master_fd will become ready in select() and reading from + # master_fd either causes an IOError (Linux) or returns + # EOF (BSD). rfds, wfds, xfds = select(fds, [], []) + # Start with STDIN. We expect a BrokenPipeError if data is left + # in STDIN but child exited (thus master_fd became invalid). + if STDIN_FILENO in rfds: + data = stdin_read(STDIN_FILENO) + if not data: # Reached EOF. + fds.remove(STDIN_FILENO) + else: + _writen(master_fd, data) if master_fd in rfds: data = master_read(master_fd) if not data: # Reached EOF. - fds.remove(master_fd) + return else: os.write(STDOUT_FILENO, data) - if STDIN_FILENO in rfds: - data = stdin_read(STDIN_FILENO) - if not data: - fds.remove(STDIN_FILENO) - else: - _writen(master_fd, data) def spawn(argv, master_read=_read, stdin_read=_read): """Create a spawned process.""" @@ -153,7 +160,16 @@ argv = (argv,) pid, master_fd = fork() if pid == CHILD: - os.execlp(argv[0], *argv) + try: + #XXX issue17824 still open + os.execlp(argv[0], *argv) + except: + # If we wanted to be really clever, we would use + # the same method as subprocess() to pass the error + # back to the parent. For now just dump stack trace. + traceback.print_exc() + finally: + os._exit(1) try: mode = tty.tcgetattr(STDIN_FILENO) tty.setraw(STDIN_FILENO) @@ -163,6 +179,10 @@ try: _copy(master_fd, master_read, stdin_read) except OSError: + # Some OSes never return an EOF on pty, just raise + # an error instead. + pass + finally: if restore: tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode) diff -r 135e1a8144ec Lib/test/test_pty.py --- a/Lib/test/test_pty.py Mon Jan 09 11:21:37 2017 +0100 +++ b/Lib/test/test_pty.py Mon Jan 09 13:20:08 2017 +0100 @@ -1,7 +1,7 @@ from test.support import verbose, import_module, reap_children # Skip these tests if termios is not available -import_module('termios') +termios = import_module('termios') import errno import pty @@ -15,9 +15,21 @@ TEST_STRING_1 = b"I wish to buy a fish license.\n" TEST_STRING_2 = b"For my pet fish, Eric.\n" +# Set this to True if you are in a forked child which continues to run +# the same python interpreter as the main process. If set, debug output +# is flushed directly. It is only used in the background process of the +# integration tests for simpler interactive debugging. +_flush_debug = False + if verbose: def debug(msg): - print(msg) + if not _flush_debug: + print(msg) + else: + # Print debug information in a way we can call it from a + # forked child which happens to have the same STDOUT as the + # parent. Fragile! + print(msg, flush=True) else: def debug(msg): pass @@ -34,6 +46,7 @@ # This is about the best we can do without getting some feedback # from someone more knowledgable. + #debug("normalize_output: {}".format(data)) # OSF/1 (Tru64) apparently turns \n into \r\r\n. if data.endswith(b'\r\r\n'): return data.replace(b'\r\r\n', b'\n') @@ -44,6 +57,65 @@ return data +def _os_read_exactly(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough. + Does not touch the pipe beyond numbytes.""" + # Normal os.read() may give us less-than-expected which makes the + # tests fail SOMETIMES. + + # Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I + # get back 'foo\r\n'. For a proper read, numbytes = len('foo\r\n'). + # If you just wrap everything in normalize_output, you won't know + # how much data including line endings you expect. I use (in the + # slave) always print('foo', end='', flush=True) and read back + # exactly len('foo') bytes, i.e. no newlines! + + # XXX(diekmann): PtyTest fails *sometimes*. + # I did not touch it!. + # It may also need to use this function. + # At least we identified one possible reason why this test suite + # fails randomly. + ret = [] + numread = 0 + + while numread < numbytes: + if numread > 0: + # Possible non-determinism caught and prevented + debug("[_os_read_exactly] Took more than one os.read() call") + r = os.read(fd, numbytes - numread) + ret.append(r) + numread += len(r) + + assert numread == numbytes + + return b''.join(ret) + +class _ReadExactlyNotExhaustive(Exception): + """The protocol used for the test suite wants to read exactly the + specified amount of data. If we read more or less, there is an + error. Note that os.read() is nondeterministic so we need to be + very careful to make the test suite deterministic.""" + pass + +def _os_read_exhaust_exactly(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough. + Throws _ReadExactlyNotExhaustive if more data is in the pipe.""" + assert numbytes > 0 + first = _os_read_exactly(fd, numbytes - 1) + final = os.read(fd, 1024) #expect to read exactly 1 byte + ret = first + final + + if len(ret) != numbytes: + debug("Read more data than expected. Fix your test protocol.") + debug("Read so far: `{:s}' ({:d} bytes), expected to read only " + "{:d} bytes".format(str(ret), len(ret), numbytes)) + raise _ReadExactlyNotExhaustive + + return ret + + +# We will access internal functions for mocking. +#pylint: disable=protected-access # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. @@ -61,6 +133,7 @@ signal.signal(signal.SIGALRM, self.old_alarm) def handle_sig(self, sig, frame): + #pylint: disable=unused-argument self.fail("isatty hung") def test_basic(self): @@ -197,9 +270,44 @@ # pty.fork() passed. +class PtyPosixIntegrationTest(unittest.TestCase): + """Test black-box functionality. May actually fork() and exec() a + fresh python interpreter. Should not be intrusive for your local + machine. + """ + # Tests go amok if you pipe data to STDIN. This is expected and + # should never happen, unless the way the test suite is called is + # completely broken. -class SmallPtyTests(unittest.TestCase): - """These tests don't spawn children or hang.""" + def _spawn_py_get_retcode(self, python_src): + """Helper function to do pty.spawn() on the supplied python code + and return the return code, assuming successful termination.""" + cmd = [sys.executable, "-c"+python_src] + debug("executing: {:s}".format(' '.join(cmd))) + ret = pty.spawn(cmd) + + # behavior of waitpid in module posix + self.assertLess(ret, 2**16) + killsig = ret & 0xff + self.assertEqual(killsig, 0) + + retcode = (ret & 0xff00) >> 8 + return retcode + + def test_spawn_exitsuccess(self): + """Spawn the python-equivalent of /bin/true.""" + retcode = self._spawn_py_get_retcode('import sys; sys.exit()') + self.assertEqual(retcode, 0) + + def test_spawn_exitfailure(self): + """Spawn the python-equivalent of /bin/false.""" + retcode = self._spawn_py_get_retcode('import sys; sys.exit(1)') + self.assertEqual(retcode, 1) + + +class PtyMockingTestBase(unittest.TestCase): + """Base class for tests which replace STDIN and STDOUT of the pty + module with their own pipes.""" def setUp(self): self.orig_stdin_fileno = pty.STDIN_FILENO @@ -230,15 +338,633 @@ self.fds.extend(pipe_fds) return pipe_fds +class PtySpawnTestBase(PtyMockingTestBase): + """A base class for the following integration test setup: A child + process is spawned with pty.spawn(). The child runs a fresh python + interpreter; its python code is passed via command line argument as + string. A background process is forked, reusing the current + instance of the python interpreter. These processes are connected + over STDIN/STDOUT pipes. These tests run fork(), select(), + execlp(), and other calls on your system. + + Starting from the parent (the main thread of this test suite), two + additional processes are forked in this test setup. We call the + spawn()-ed child the 'slave' because it is connected to the slave + side of the pty. The background process is connected to the master + side of the pty. + + parent + | + create mock + STDIN/STDOUT + pipes + | + | + ...fork()..> background + | | + | + .......................pty.spawn(*)..>slave + _copy + and | <-- STDIN/STDOUT --> | + wait for | pipes | + slave | | + | | + _exit | + | + _exit + wait for + background + | + + + *) python -c "slave child python code here" + + + The code for the spawned slave is python code in a string. This + makes the test suite very portable. + """ + # We introduce this generic base class for the test setup to + # encapsulate multiple different types of tests in individual + # classes. + + # Helper functions for the background process + @staticmethod + def _exit_success(): + """Clean exit. sys.exit() may be hooked by the test suite.""" + # Only use this in the forked background process! + sys.stdout.flush() + sys.stderr.flush() + os._exit(0) + + @staticmethod + def _exit_failure(): + """Clean exit. sys.exit() may be hooked by the test suite.""" + # Only use this in the forked background process! + sys.stdout.flush() + sys.stderr.flush() + os._exit(1) + + # Helper building blocks for the spawned (slave) python shell + _EXEC_IMPORTS = """ +import sys +import time +import signal +import termios + +""" + + _EXEC_DISABLE_TERMINAL_ECHO = """ +def _disable_terminal_echo(terminal_fd): + "Disable echoing of every character in the terminal." + old = termios.tcgetattr(terminal_fd) + old[3] = old[3] & ~ termios.ECHO + termios.tcsetattr(terminal_fd, termios.TCSADRAIN, old) + +""" + + @staticmethod + def _fork_background_process(master_fun, to_stdin, from_stdout): + # to_stdin: write to slave's STDIN + # from_stdout: read from slave's STDOUT + pid = os.fork() + assert pid >= 0, "fork failure must raise OSError" + if pid > 0: + return pid + # WARNING: we have forked. + # You don't get nice python exceptions in your master_fun! + global _flush_debug + _flush_debug = True + debug("[background] started") + master_fun(to_stdin, from_stdout) + + # Forked! Does not return. + # Use _exit_success or _exit_failure to signal success. + + def _spawn_master_and_slave(self, master_fun, slave_src, close_stdin=False): + """Spawn a slave and fork a master background process. + master_fun must be a python function. slave_src must be python + code as string. This function forks them and connects their + STDIN/STDOUT over a pipe and checks that they cleanly exit. + Warning: You don't get python exception handling in master_fun + and it does not return anything.""" + read_from_stdout_fd, mock_stdout_fd = self._pipe() + pty.STDOUT_FILENO = mock_stdout_fd + mock_stdin_fd, write_to_stdin_fd = self._pipe() + pty.STDIN_FILENO = mock_stdin_fd + + if close_stdin: + debug("Closing stdin") + os.close(write_to_stdin_fd) + + sys.stdout.flush() + # Without this flush, we interfere with the debug output from + # the child that will be spawned. It will work fine without the + # flush, unless you enable debug and have your STDOUT piped! + # The forked child will set _flush_debug and print to the same + # fd as we (the parent) print our debug information to. + + background_pid = self._fork_background_process( + master_fun, write_to_stdin_fd, read_from_stdout_fd) + + # spawn the slave in a new python interpreter, passing the code + # with the -c option + retcode_slave = pty.spawn([sys.executable, '-c '+slave_src]) + if debug and retcode_slave != 0: + debug("ERROR: spawned slave returned but failed.") + debug("The child code was:") + debug("--- BEGIN child code ---") + debug(slave_src) + debug("--- END child code ---") + debug("Trying to dump what the salve wrote for debugging. " + "Will block the testsuite indefinitely.") + while True: + sdbg = os.read(read_from_stdout_fd, 1024) + debug(sdbg) + self.assertEqual(retcode_slave, 0) + + retcode_background = os.waitpid(background_pid, 0)[1] + self.assertEqual(retcode_background, 0) + + debug("background and slave are done") + + # We require that b'slave exits now' is left in the slave's + # STDOUT to confirm clean exit. We carefully read only the + # first letter first. + slave_wrote = _os_read_exactly(read_from_stdout_fd, 1) + if slave_wrote in [b'\x04']: #characters to ignore + # On OS X, we found that the EOF test leaves an EOF + # character in STDOUT which we ignore here. Tested on OS X + # 10.6.8 + debug("An EOF character was left in STDOUT, we ignore this.") + slave_wrote = _os_read_exhaust_exactly(read_from_stdout_fd, + len(b'slave exits now')) + else: + # Already got the first letter + self.assertEqual(slave_wrote, b's') + slave_wrote += _os_read_exhaust_exactly(read_from_stdout_fd, + len(b'lave exits now')) + self.assertEqual(slave_wrote, b'slave exits now') + + + +class PtyWhiteBoxIntegrationPingPong1000Test(PtySpawnTestBase): + """Master and Slave count to 1000 by turns.""" + + _EXEC_CHILD = """ +# Disable echoing, otherwise the terminal (remember, we are in a +# pty here!) will echo back everything it reads. Without +# disabling echo, the background process in master will read +# back what it writes to us (the slave) and parsing fails. +_disable_terminal_echo(sys.stdin.fileno()) + +# Start handshaking with master. We initiate. +print("slave ready!", end='', flush=True) +inp = input() +if inp != "Hello, I'm background process!": + sys.exit("failure handshake with background") +# Handshake done. + +# Ping-Pong count to 1000 with master. We start. +for i in range(1000): + print("Ping {:d}".format(i), end='', flush=True) + pong, num = input().split() + if pong != "Pong" or int(num) != i: + self._exit_failure() + +# Send final confirmation that all went well to master. +print("slave exits now", end='', flush=True) +sys.exit() #success +""" + + def _background_process(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + everything_is_awesome = True + + # handshake with slave, let the slave have the first word when + # it is ready + debug("[background] waiting for input from slave.") + read = _os_read_exhaust_exactly(from_stdout, len(b'slave ready!')) + if read != b'slave ready!': + everything_is_awesome = False + debug("[background] handshake with slave part 1 of 2 done.") + + answer = b"Hello, I'm background process!\n" + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + debug("[background] handshake with slave completed.") + + # Ping-Pong count to 1000 with slave. + # Read Ping from slave, reply with pong. + # If something is wrong, this may block. + for i in range(1000): + len_expected = len("Ping {:d}".format(i)) + inp = _os_read_exhaust_exactly(from_stdout, + len_expected).decode('ascii') + #debug("background read: {:s}".format(inp)) + ping, num = inp.split() + if ping != "Ping" or int(num) != i: + everything_is_awesome = False + answer = "Pong {:d}\n".format(i).encode('ascii') + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + #debug("background write {}".format(answer)) + + if everything_is_awesome: + self._exit_success() + else: + debug("things are not awesome") + self._exit_failure() + + def test_partner_count_to_thousand(self): + """Spawn a slave and fork a master background process. Let them + Ping-Pong count alternately to 1000.""" + child_code = self._EXEC_IMPORTS + self._EXEC_DISABLE_TERMINAL_ECHO + \ + self._EXEC_CHILD + self._spawn_master_and_slave(self._background_process, + child_code) + +class PtyWhiteBoxIntegrationReadSlaveTest(PtySpawnTestBase): + """Read from (slow) slave, make sure we get everything. Slow tests.""" + + def _background_process(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + debug("[background] starting to read") + + if self.background_close_stdin: + debug("[background] closing stdin (only in forked background " + "process). Master copy process is not affected!") + os.close(to_stdin) + + bytes_transferred = 0 + for i in range(500): + expecting = "long cat is long "*10 + "ID {:d}".format(i) + expecting = expecting.encode('ascii') + inp = _os_read_exactly(from_stdout, len(expecting)) + #debug(inp) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + bytes_transferred += len(inp) + + debug("[background] received {} bytes from the slave.". + format(bytes_transferred)) + self._exit_success() + + # a dynamic sleep time needs to be formatted. + _EXEC_CHILD_FMT = """ +for i in range(500): + print("long cat is long "*10 + "ID {{:d}}".format(i), end='', flush=True) + if {sleeptime:f} and i % 400 == 0: + time.sleep({sleeptime:f}) # make slow, once. + +# Send final confirmation that all went well to master. +print("slave exits now", end='', flush=True) +sys.exit() +""" + + def test_read_large_from_slave(self): + """Spawn a slave and fork a master background process. Receive + several kBytes from the slave.""" + self.background_close_stdin = False + child_code = self._EXEC_IMPORTS + \ + self._EXEC_CHILD_FMT.format(sleeptime=0.05) + debug("Test may take up to 1 second ...") + self._spawn_master_and_slave(self._background_process, child_code) + + def test_read_large_from_slave_close_stdin(self): + """Spawn a slave and fork a master background process. Close + STDIN and receive several kBytes from the slave.""" + self.background_close_stdin = False + # only sleep in one test to speed this up + child_code = self._EXEC_IMPORTS + \ + self._EXEC_CHILD_FMT.format(sleeptime=0) + self._spawn_master_and_slave(self._background_process, child_code, + close_stdin=True) + + def test_read_large_from_slave_close_stdin_background(self): + """Spawn a slave and fork a master background process. Close + STDIN only in the background process and receive several kBytes + from the slave.""" + self.background_close_stdin = True + child_code = self._EXEC_IMPORTS + \ + self._EXEC_CHILD_FMT.format(sleeptime=0) + self._spawn_master_and_slave(self._background_process, child_code, + close_stdin=False) + +class PtyWhiteBoxIntegrationTermiosTest(PtySpawnTestBase): + """Terminals are not just pipes. This testsuite uses some of their + magic services. It tests ISIG, which transforms sending 0x03 at the + master side (usually triggered by humans by pressing ctrl+c) to + sending an INTR signal to the slave. In addition, on Linux, it + tests pretty printing of control characters, for example ^C, and ^G, + which is not defined in the POSIX.1-2008 Standard but implemented. + + This class contains larger integration tests which verify the subtle + interplay of several modules. It depends on termios, tty, pty, + signal, and os. It uses the Linux-only control character pretty + printing feature because this is one of the simple features of + terminals which are easy to test without digging into full-fledged + os-level integration tests. + """ + + def _background_handshake(self, to_stdin, from_stdout): + # Handshake with slave, let the slave have the first word when + # it is ready and initialized. The slave's terminal is set to + # echo everything back it receives. + debug("[background] waiting for input from slave.") + read = _os_read_exhaust_exactly(from_stdout, len(b'slave ready!')) + if read != b'slave ready!': + self._exit_failure() + debug("[background] handshake with slave part 1 of 3 done.") + + answer = b"Hello, I'm background process!\n" + if os.write(to_stdin, answer) != len(answer): + everything_is_awesome = False + debug("[background] handshake with slave 2 of 3 done.") + debug("[background] reading echoed") + + # Slave terminal echoes back everything, rewriting line endings! + answer = answer[:-1] + b'\r\n' + read = _os_read_exhaust_exactly(from_stdout, len(answer)) + if read != answer: + debug("Unexpected answer: {:s}".format(str(read))) + self._exit_failure() + debug("[background] handshake with slave 3 of 3 completed.") + + def _enable_echoctl(self): + if sys.platform == 'linux': + self.echoctl = True + else: + raise unittest.SkipTest('Test only available on Linux.') + + _EXEC_BASE_TERMINAL_SETUP_FMT = r""" +def _base_terminal_setup(additional_lflag=0): + "Set up terminal to sane defaults (with regard to my Linux" + "system). See POSIX.1-2008, Chapter 11, General Terminal" + "Interface." + #WARNING: expected to be called in pty.spawn()-ed child only, + # otherwise, you mess up you terminal. + + # Warning: ECHOCTL is not defined in POSIX. Works on Linux 4.4 with + # Ubuntu GLIBC 2.23. Did not work on Mac. + if {echoctl:b}: + echoctl = termios.ECHOCTL + else: + echoctl = 0 + + terminal_fd = sys.stdin.fileno() + old = termios.tcgetattr(terminal_fd) + # don't need iflag + old[0] = 0 + + # oflag: output processing: replace \n by \r\n + old[1] = termios.ONLCR | termios.OPOST + + # don't need cflag + old[2] = 0 + + # lflag: canonical mode (line-buffer), echoing, echoing of control chars ^C + old[3] = termios.ICANON | termios.ECHO | echoctl | additional_lflag + + termios.tcsetattr(terminal_fd, termios.TCSADRAIN, old) +""" + + def _background_process_bell(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending bell escape sequence to slave") + + BELL = b'\a' + to_slave = b'Bell here -> '+BELL+b' <-Hello slave!\n' + os.write(to_stdin, to_slave) + + # Bell character gets `pretty-printed' when echoed by terminal + expecting = b'Bell here -> ^G <-Hello slave!\r\n' + inp = _os_read_exactly(from_stdout, len(expecting)) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + + debug("[background] got it back") + + self._exit_success() + + _EXEC_CHILD_BELL = r""" +_base_terminal_setup() + +# Start handshaking with master. We initiate. +print("slave ready!", end='', flush=True) +inp = input() +if inp != "Hello, I'm background process!": + sys.exit("failure handshake with background") +# Handshake done. + +command = input() +# note how background process gets ^G and slave gets \a +if command != 'Bell here -> \a <-Hello slave!': + sys.exit("failure getting bell") +# terminal has automatically echoed the command, we can ignore it + +# Send final confirmation that all went well to master. +print("slave exits now", end='', flush=True) +sys.exit() +""" + + def test_magic_in_magic_out_bell_echoctl(self): + """Terminals: Magic comes in, magic goes out. The terminal bell.""" + self._enable_echoctl() + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(echoctl=self.echoctl) + \ + self._EXEC_CHILD_BELL + self._spawn_master_and_slave(self._background_process_bell, child_code) + + def _background_process_eof(self, to_stdin, from_stdout): + #WARNING: we have forked. You don't get nice python exceptions here! + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending slave an EOF") + + EOF = b'\x04' + os.write(to_stdin, EOF) + + self._exit_success() + + _EXEC_CHILD_EOF = """ +_base_terminal_setup() + +# Start handshaking with master. We initiate. +print("slave ready!", end='', flush=True) +inp = input() +if inp != "Hello, I'm background process!": + self._exit_failure() +# Handshake done. + +try: + input() + # unreachable if we got our EOF: + sys.exit("failure, no EOF received") +except EOFError: + # we expect an EOF here, this is good + pass + +# Send final confirmation that all went well to master. +print("slave exits now", end='', flush=True) +sys.exit() +""" + + def test_magic_in_magic_out_eof_echoctl(self): + """Terminals: Magic comes in, magic goes out. EOF.""" + self._enable_echoctl() + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(echoctl=self.echoctl) + \ + self._EXEC_CHILD_EOF + self._spawn_master_and_slave(self._background_process_eof, child_code) + + def test_magic_in_magic_out_eof(self): + """Terminals: Magic comes in, magic goes out. EOF.""" + self.echoctl = False + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(echoctl=self.echoctl) + \ + self._EXEC_CHILD_EOF + self._spawn_master_and_slave(self._background_process_eof, child_code) + + def _background_process_intr(self, to_stdin, from_stdout): + """Try to send SIGINT to child. Careful: Testsuite also watches + for SIGINT. We only set our signal handler in the forked + slave.""" + #WARNING: we have forked. You don't get nice python exceptions here! + + self._background_handshake(to_stdin, from_stdout) + + debug("[background] sending interupt escape sequence to slave.") + + INTR = b'\x03' + to_slave = b'This buffered stuff will be ignored'+INTR+b' Ohai slave!\n' + os.write(to_stdin, to_slave) + + if self.echoctl: + expecting = b'^C Ohai slave!\r\n' + else: + expecting = INTR+b' Ohai slave!\r\n' + inp = _os_read_exactly(from_stdout, len(expecting)) + if inp != expecting: + debug("expecting {} but got {}".format(expecting, inp)) + self._exit_failure() + + debug("[background] got it back") + + self._exit_success() + + _EXEC_CHILD_INTR = """ +_sigint_received = False + +def _SIGINT_handler(a, b): + global _sigint_received + _sigint_received = True + +signal.signal(signal.SIGINT, _SIGINT_handler) + +# tell our controlling terminal to send signals to us on special characters +_base_terminal_setup(termios.ISIG) + +# Start handshaking with master. We initiate. +print("slave ready!", end='', flush=True) +inp = input() +if inp != "Hello, I'm background process!": + sys.exit("failure handshake with background process") +# Handshake done. + +command = input() +# Yes, only this arrives at STDIN here! +if command != ' Ohai slave!': + print(command) + sys.exit("failure getting interrupted input") +# terminal has automatically echoed the command and ^C, we can ignore it + +# Send final confirmation that all went well to master. +print("slave exits now", end='', flush=True) + +if _sigint_received: + sys.exit() +else: + sys.exit("failure, did not receive SIGINT") + """ + + def test_magic_in_magic_out_intr_echoctls(self): + """Terminals: Magic comes in, magic goes out. Receiving an INTR, + i.e. pressing ctrl+c in master ^C.""" + self._enable_echoctl() + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(echoctl=self.echoctl) + \ + self._EXEC_CHILD_INTR + self._spawn_master_and_slave(self._background_process_intr, child_code) + + def test_magic_in_magic_out_intr(self): + """Terminals: Magic comes in, magic goes out. Receiving an INTR, + i.e. pressing ctrl+c in master.""" + self.echoctl = False + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(echoctl=self.echoctl) + \ + self._EXEC_CHILD_INTR + self._spawn_master_and_slave(self._background_process_intr, child_code) + +class _MockSelectOnNoWaitablesError(Exception): + """Raised when _mock_select is called with three empty lists as file + descriptors to wait on. Behavior of real select is + platform-dependent and likely infinite blocking on Linux.""" + pass + +class _MockSelectEternalWait(Exception): + """Used both as exception and placeholder value. Models that select + will block indefinitely.""" + pass + +class SmallPtyTests(PtyMockingTestBase): + """These tests don't spawn children or hang.""" + def _socketpair(self): socketpair = socket.socketpair() self.files.extend(socketpair) return socketpair def _mock_select(self, rfds, wfds, xfds): + """Simulates the behavior of select.select. Only implemented + for reader waiting list (first parameter).""" + assert wfds == [] and xfds == [] # This will raise IndexError when no more expected calls exist. self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + if len(rfds) == 0: + raise _MockSelectOnNoWaitablesError + rfds_result = self.select_rfds_results.pop(0) + # Placeholder to model eternal blocking + if rfds_result is _MockSelectEternalWait: + raise _MockSelectEternalWait + return rfds_result, [], [] + + def test__mock_select(self): + """Test the select proxy of the test class. Such meta testing. + """ + self.select_rfds_lengths.append(0) + with self.assertRaises(_MockSelectOnNoWaitablesError): + self._mock_select([], [], []) + + # Prepare two select calls. Second one will block forever. + self.select_rfds_lengths.append(3) + self.select_rfds_results.append("foo") + self.select_rfds_lengths.append(3) + self.select_rfds_results.append(_MockSelectEternalWait) + + # Call one + self.assertEqual(self._mock_select([1, 2, 3], [], []), + ("foo", [], [])) + + # Call two + with self.assertRaises(_MockSelectEternalWait): + self._mock_select([1, 2, 3], [], []) + + assert self.select_rfds_lengths == [], "select_rfds_lengths is cleaned" + assert self.select_rfds_results == [], "select_rfds_results is cleaned" def test__copy_to_each(self): """Test the normal data case on both master_fd and stdin.""" @@ -253,8 +979,10 @@ os.write(masters[1], b'from master') os.write(write_to_stdin_fd, b'from stdin') + # monkey-patch pty.select with our mock + pty.select = self._mock_select + # Expect two select calls, the last one will cause IndexError - pty.select = self._mock_select self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) self.select_rfds_lengths.append(2) @@ -268,8 +996,9 @@ self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" + def __copy_eof_helper(self, close_stdin, close_master): + """Helper to test the empty read EOF case on master_fd and/or + stdin.""" read_from_stdout_fd, mock_stdout_fd = self._pipe() pty.STDOUT_FILENO = mock_stdout_fd mock_stdin_fd, write_to_stdin_fd = self._pipe() @@ -277,20 +1006,78 @@ socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] - socketpair[1].close() - os.close(write_to_stdin_fd) + # close fds or fill with dummy data in order to prevent blocking on + # one read call + if close_master: + socketpair[1].close() + else: + os.write(masters[1], b'from master') + if close_stdin: + os.close(write_to_stdin_fd) + else: + os.write(write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError + # monkey-patch pty.select with our mock pty.select = self._mock_select + + # Expect at least one select() call. This call returns master_fd + # and STDIN. self.select_rfds_lengths.append(2) self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) + # If master_fd is closed, we expect the copy loop to + # exit. There are two possible ways the loop can exit in this + # first iteration: + # * Immediat, clean return if both fds are closed. + # * BrokenPipeError if only master_fd is closed. + # + # If master_fd is not closed, we expect a second select() call + # which always causes _MockSelectEternalWait. + if not close_master: + # We expect that STDIN is removed from the waiters if it + # reached EOF. + remaining_fds_after_first_select = 1 if close_stdin else 2 + self.select_rfds_lengths.append(remaining_fds_after_first_select) + self.select_rfds_results.append(_MockSelectEternalWait) + + # run the _copy tests + if close_master and close_stdin: + #returns nothing, clean exit + self.assertEqual(pty._copy(masters[0]), None) + elif close_master and not close_stdin: + with self.assertRaises(BrokenPipeError): + # We expect to find a closed master but some data in + # STDIN. The _copy loop will try to write the data from + # STDIN to master, which is a broken pipe. + pty._copy(masters[0]) + + else: + with self.assertRaises(_MockSelectEternalWait): + pty._copy(masters[0]) + + # We expect that everything is consumed + self.assertEqual(self.select_rfds_results, []) + self.assertEqual(self.select_rfds_lengths, []) + + # We expect that calling again raises an error with self.assertRaises(IndexError): pty._copy(masters[0]) + def test__copy_eof_on_all(self): + """Test the empty read EOF case on both master_fd and stdin.""" + self.__copy_eof_helper(close_stdin=True, close_master=True) + + def test__copy_eof_on_stdin(self): + """Test the empty read EOF case on only stdin.""" + self.__copy_eof_helper(close_stdin=True, close_master=False) + + def test__copy_eof_on_master(self): + """Test the empty read EOF case on only master_fd.""" + self.__copy_eof_helper(close_stdin=False, close_master=True) + + def test__copy_eof_on_helper(self): + """Test that _copy will block forever in this setting.""" + self.__copy_eof_helper(close_stdin=False, close_master=False) def tearDownModule(): reap_children()