diff --git a/Doc/library/pty.rst b/Doc/library/pty.rst index 0ab7660..0c6ef4d 100644 --- a/Doc/library/pty.rst +++ b/Doc/library/pty.rst @@ -2,8 +2,8 @@ ======================================== .. module:: pty - :platform: Linux - :synopsis: Pseudo-Terminal Handling for Linux. + :platform: Unix + :synopsis: Pseudo-Terminal Handling for Unix. .. moduleauthor:: Steen Lumholt .. sectionauthor:: Moshe Zadka @@ -16,9 +16,9 @@ The :mod:`pty` module defines operations for handling the pseudo-terminal concept: starting another process and being able to write to and read from its controlling terminal programmatically. -Because pseudo-terminal handling is highly platform dependent, there is code to -do it only for Linux. (The Linux code is supposed to work on other platforms, -but hasn't been tested yet.) +Pseudo-terminal handling is highly platform dependent. This code is mainly +tested on Linux, FreeBSD, and OS X (it is supposed to work on other POSIX +platforms). The :mod:`pty` module defines the following functions: @@ -41,9 +41,13 @@ The :mod:`pty` module defines the following functions: .. function:: spawn(argv[, master_read[, stdin_read]]) - Spawn a process, and connect its controlling terminal with the current - process's standard io. This is often used to baffle programs which insist on - reading from the controlling terminal. + Spawn a child process, and connect its controlling terminal with the + current process's standard io. This is often used to baffle programs which + insist on reading from the controlling terminal. + + A loop copies STDIN of the current process to the child and data received + from the child to STDOUT of the current process. It is not signaled to the + child if STDIN of the current process closes down. The functions *master_read* and *stdin_read* should be functions which read from a file descriptor. The defaults try to read 1024 bytes each time they are @@ -91,3 +95,14 @@ pseudo-terminal to record all input and output of a terminal session in a script.write(('Script done on %s\n' % time.asctime()).encode()) print('Script done, file is', filename) + +Caveats +------- + +.. sectionauthor:: Cornelius Diekmann + +Be aware that processes started by :func:`spawn` do not receive any information +about STDIN of their parent shutting down. For example, if run on a terminal on +a Linux system, ``/bin/sh < /dev/null`` closes immediately. However, +``./python -c 'import pty; pty.spawn("/bin/sh")' < /dev/null`` does not close +because the spawned child shell is not notified that STDIN is closed. diff --git a/Lib/pty.py b/Lib/pty.py index e841f12..18ef640 100644 --- a/Lib/pty.py +++ b/Lib/pty.py @@ -1,13 +1,14 @@ """Pseudo terminal utilities.""" # Bugs: No signal handling. Doesn't set slave termios and window size. -# Only tested on Linux. +# Only tested on Linux, FreeBSD, and OS X. # See: W. Richard Stevens. 1992. Advanced Programming in the # UNIX Environment. Chapter 19. # Author: Steen Lumholt -- with additions by Guido. from select import select import os +import sys import tty __all__ = ["openpty","fork","spawn"] @@ -133,11 +134,16 @@ def _copy(master_fd, master_read=_read, stdin_read=_read): standard input -> pty master (stdin_read)""" fds = [master_fd, STDIN_FILENO] while True: + # The expected path to leave this infinite loop is that the + # child exits and its slave_fd is destroyed. In this case, + # master_fd will become ready in select() and reading from + # master_fd either raises an OSError (Input/output error) on + # Linux or returns EOF on BSD. rfds, wfds, xfds = select(fds, [], []) if master_fd in rfds: data = master_read(master_fd) if not data: # Reached EOF. - fds.remove(master_fd) + return else: os.write(STDOUT_FILENO, data) if STDIN_FILENO in rfds: @@ -153,7 +159,16 @@ def spawn(argv, master_read=_read, stdin_read=_read): argv = (argv,) pid, master_fd = fork() if pid == CHILD: - os.execlp(argv[0], *argv) + try: + #XXX issue17824 still open + os.execlp(argv[0], *argv) + except: + # If we wanted to be really clever, we would use + # the same method as subprocess() to pass the error + # back to the parent. For now just dump stack trace. + sys.excepthook(*sys.exc_info()) + finally: + os._exit(1) try: mode = tty.tcgetattr(STDIN_FILENO) tty.setraw(STDIN_FILENO) @@ -163,6 +178,10 @@ def spawn(argv, master_read=_read, stdin_read=_read): try: _copy(master_fd, master_read, stdin_read) except OSError: + # Some OSes never return an EOF on pty, just raise + # an error instead. + pass + finally: if restore: tty.tcsetattr(STDIN_FILENO, tty.TCSAFLUSH, mode) diff --git a/Lib/test/test_pty.py b/Lib/test/test_pty.py index 15f88e4..4b8392b 100644 --- a/Lib/test/test_pty.py +++ b/Lib/test/test_pty.py @@ -1,7 +1,7 @@ from test.support import verbose, import_module, reap_children # Skip these tests if termios is not available -import_module('termios') +termios = import_module('termios') import errno import pty @@ -10,6 +10,8 @@ import sys import select import signal import socket +import textwrap +import subprocess import unittest TEST_STRING_1 = b"I wish to buy a fish license.\n" @@ -17,7 +19,10 @@ TEST_STRING_2 = b"For my pet fish, Eric.\n" if verbose: def debug(msg): - print(msg) + # Print debug information in a way we can call it from a forked + # child which uses the same STDOUT as the parent. Flush, so + # that we can debug deadlocks and blocking of the test suite. + print(msg, flush=True) else: def debug(msg): pass @@ -44,11 +49,86 @@ def normalize_output(data): return data +def _os_timeout_read(fd, n): + """Raw wrapper around os.read which raises a TimeoutError if no data + arrived within 10 seconds.""" + rd, _, _ = select.select([fd], [], [], 10) + if not rd: + raise TimeoutError + return os.read(fd, n) + +# Note that os.read() is nondeterministic so we need to be very careful +# to make the test suite deterministic. A normal call to os.read() may +# give us less than expected. Three wrappers with different focus +# around os.read() follow. +# +# Beware, on my Linux system, if I put 'foo\n' into a terminal fd, I get +# back 'foo\r\n' at the other end. The behavior depends on the termios +# setting. The newline translation may be OS-specific. To make the +# test suite deterministic and OS-independent, _os_readline and +# normalize_output can be used. +# +# In order to avoid newline translation and normalize_output completely, +# some test cases never emit newline characters and flush the fd +# manually. For example, using print('foo', end='', flush=True) in a +# forked child allows reading exactly len('foo') in the parent. For +# this, _os_read_exactly and _os_read_exhaust_exactly can be used. + +def _os_readline(fd): + """Use os.read() to read byte by byte until a newline is + encountered. May block forever if no newline is read.""" + buf = [] + while True: + r = os.read(fd, 1) + if not r: + raise EOFError + buf.append(r) + if r == b'\n': + break + return b''.join(buf) + +def _os_read_exactly(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough or + raises TimeoutError. Does not touch the channel beyond numbytes.""" + ret = [] + numread = 0 + + while numread < numbytes: + if numread > 0: + # Possible non-determinism caught and prevented + debug("[_os_read_exactly] More than one os.read() call") + r = _os_timeout_read(fd, numbytes - numread) + if not r: + raise EOFError + ret.append(r) + numread += len(r) + assert numread == numbytes + return b''.join(ret) + +def _os_read_exhaust_exactly(fd, numbytes): + """Read exactly numbytes out of fd. Blocks until we have enough or + raises TimeoutError. Raises ValueError if more data is in fd.""" + assert numbytes > 0 + first = _os_read_exactly(fd, numbytes - 1) + final = _os_timeout_read(fd, 1024) #expect to read exactly 1 byte + ret = first + final + + # The protocol used for the test suite expects exactly the specified + # amount of data in fd. If there is more data, there is an error. + if len(ret) != numbytes: + raise ValueError("Read more data than expected. Fix your protocol. " + "Read: {:s} ({:d} bytes), expected to read only " + "{:d} bytes".format(repr(ret), len(ret), numbytes)) + return ret + + +# We will access internal functions for mocking. +#pylint: disable=protected-access # Marginal testing of pty suite. Cannot do extensive 'do or fail' testing # because pty code is not too portable. # XXX(nnorwitz): these tests leak fds when there is an error. -class PtyTest(unittest.TestCase): +class PtyBasicTest(unittest.TestCase): def setUp(self): # isatty() and close() can hang on some platforms. Set an alarm # before running the test to make sure we don't hang forever. @@ -61,6 +141,7 @@ class PtyTest(unittest.TestCase): signal.signal(signal.SIGALRM, self.old_alarm) def handle_sig(self, sig, frame): + #pylint: disable=unused-argument self.fail("isatty hung") def test_basic(self): @@ -98,14 +179,14 @@ class PtyTest(unittest.TestCase): debug("Writing to slave_fd") os.write(slave_fd, TEST_STRING_1) - s1 = os.read(master_fd, 1024) + s1 = _os_readline(master_fd) self.assertEqual(b'I wish to buy a fish license.\n', normalize_output(s1)) debug("Writing chunked output") os.write(slave_fd, TEST_STRING_2[:5]) os.write(slave_fd, TEST_STRING_2[5:]) - s2 = os.read(master_fd, 1024) + s2 = _os_readline(master_fd) self.assertEqual(b'For my pet fish, Eric.\n', normalize_output(s2)) os.close(slave_fd) @@ -197,100 +278,778 @@ class PtyTest(unittest.TestCase): # pty.fork() passed. +class PtyPosixIntegrationTest(unittest.TestCase): + """Test black-box functionality. May actually fork() and exec() a + fresh python interpreter. Should not be intrusive for your local + machine. + """ + # Tests go amok if you pipe data to STDIN. This is expected and + # should never happen, unless the way the test suite is called is + # completely broken. + + def _spawn_py_get_retcode(self, python_src): + """Helper function to do pty.spawn() on the supplied python code + and return the return code, assuming successful termination.""" + cmd = [sys.executable, "-c", python_src] + debug("executing: {:s}".format(' '.join(cmd))) + ret = pty.spawn(cmd) + + # behavior of waitpid in module posix + self.assertLess(ret, 2**16) + killsig = ret & 0xff + self.assertEqual(killsig, 0) + + retcode = (ret & 0xff00) >> 8 + return retcode + + def test_spawn_exitsuccess(self): + # Spawn the python-equivalent of /bin/true. + retcode = self._spawn_py_get_retcode('import sys; sys.exit()') + self.assertEqual(retcode, 0) + + def test_spawn_exitfailure(self): + # Spawn the python-equivalent of /bin/false. + retcode = self._spawn_py_get_retcode('import sys; sys.exit(1)') + self.assertEqual(retcode, 1) + + def test_spawn_uncommon_exit_code(self): + # Test an uncommon exit code, which is less likely to be caused + # by a Python exception or other failure. + retcode = self._spawn_py_get_retcode('import sys; sys.exit(81)') + self.assertEqual(retcode, 81) + + +class PtySpawnTestBase(unittest.TestCase): + """A base class for the following integration test setup: A child + process is spawned with pty.spawn() and a background process is + forked from the test suite. The two processes are connected over + STDIN/STDOUT pipes to communicate. The tests run fork(), select(), + execlp(), and other calls on your system. + + Starting from the parent (the main thread of this test suite), the + following processes are forked for this setup: + * The 'slave'. We call the pty.spawn()-ed child the 'slave' because + it is connected to the slave side of the pty. It executes python + code passed as a string. This makes the test suite very + portable. The main test code runs here. + * The 'background' process, which interacts with the slave and + drives the tests. It is connected to the master side of the pty. + Being forked from the parent, it reuses the parent's instance of + the python interpreter. + * The 'spawn_runner', an internal helper to capture the + input/output of the slave. + The actual tests are executed between slave and background. + + Sequence diagram of the overall test setup: + + ┌──────┐ + │parent│ + └──────┘ + | + create + STDIN/STDOUT + pipes + | + | ┌──────────┐ + |---------------------os.fork()------------------->│background│ + | └──────────┘ + | ┌──────┐ | + |-_pty_spawn->│spawn │ master_fun + | │runner│ | + wait for └──────┘ | + slave | | + . | ┌─────┐ | + . |--pty.spawn-->│slave│ | + . . └─────┘ | + . . | | + . . slave_src | + . . | <- STDIN/STDOUT -> | + . . | pipes | + . . | | + . . | | + . . | exit + . . | . + . |< . . . . . . . exit . + |< . . . . . . .exit . + wait for . + background . + |< . . . . . . . . . . . . . . . . . . . . . . . . . . . . + + """ + # We introduce this generic base class for the test setup to + # encapsulate multiple different types of tests in individual + # classes. + + # Helper building blocks for the spawned (slave) python shell + _EXEC_IMPORTS = textwrap.dedent("""\ + import sys + import time + import signal + import tty, termios + """) + + @staticmethod + def _pty_spawn(stdin, stdout, args, pre_spawn_hook=''): + """Execute pty.spawn() in a fresh python interpreter, make stdin + and stdout available through pipes. Use the pre_spawn_hook to + allow monkey patching of the pty module.""" + # We cannot use the test.support.captured_output() functions + # because the pty module writes to filedescriptors directly. + # We cannot use test.support.script_helper because we need to + # interact with the spawned slave. + # We cannot monkey-patch pty.STDIN_FILENO to point to a pipe + # because the fallback path of pty.fork() relies on them. + # Invoking this functions creates two children: the spawn_runner + # as isolated child to execute pty.spawn and capture stdout and + # its grandchild (the slave, running args) created by pty.spawn. + spawn_runner = pre_spawn_hook + textwrap.dedent(""" + import pty, sys; + ret = pty.spawn(sys.argv[1:]); + retcode = (ret & 0xff00) >> 8; + sys.exit(retcode)""") + subprocess.run([sys.executable, "-c", spawn_runner]+args, + stdin=stdin, stdout=stdout, check=True) + + @staticmethod + def _fork_background_process(master_fun, io_fds): + pid = os.fork() + assert pid >= 0, "fork failure must raise OSError" + if pid > 0: + debug("forked child ({:d}) from parent ({:d})".format(pid, os.getpid())) + return pid + + # Forked. Run master_fun and pass return code back to parent, + # wrapped to catch all exceptions. + try: + debug("[background] started ({:d})".format(os.getpid())) + rc = master_fun(*io_fds) + if not isinstance(rc, int): + raise Exception("master_fun must return an int") + except: + debug("[background] Abort due to exception") + sys.excepthook(*sys.exc_info()) + rc = 1 + finally: + if rc != 0: + debug("[background] Abnormal termination ({:d})".format(os.getpid())) + # Destroy forked background process. + # Do not use sys.exit(), it is hooked by the test suite. + sys.stdout.flush() + sys.stderr.flush() + os._exit(rc) + + def _spawn_master_and_slave(self, master_fun, slave_src, close_stdin=False, + pre_spawn_hook=''): + """Spawn a slave and fork a master background process. + master_fun must be a python function. slave_src must be python + code as string. This function forks them and connects their + STDIN/STDOUT over a pipe and checks that they cleanly exit. + Control never returns from master_fun. Optionally, python code + supplied via the pre_spawn_hook will be executed before the + slave is spawned.""" + mock_stdin_fd, write_to_stdin_fd = os.pipe() + read_from_stdout_fd, mock_stdout_fd = os.pipe() + + if close_stdin: + debug("Closing stdin") + os.close(write_to_stdin_fd) + + sys.stdout.flush() + sys.stderr.flush() + # Without this flush, we interfere with the debug output from + # the background process (running master_fun). Parent and + # background process print debug information to the same fd. + + io_fds = (write_to_stdin_fd, read_from_stdout_fd) + background_pid = self._fork_background_process(master_fun, io_fds) + + # spawn the slave in a new python interpreter, passing the code + # with the -c option + try: + self._pty_spawn(mock_stdin_fd, mock_stdout_fd, + [sys.executable, '-c', slave_src], + pre_spawn_hook=pre_spawn_hook) + except subprocess.CalledProcessError as e: + debug("Slave failed.") + debug("killing background process ({:d})".format(background_pid)) + os.kill(background_pid, 9) + errmsg = ["Spawned slave returned but failed.", + "The failed slave code was:", + "--- BEGIN slave code ---", + slave_src, + "--- END slave code ---"] + rd, _, _ = select.select([read_from_stdout_fd], [], [], 0) + if rd: + errmsg.append("Dumping what the slave wrote last:") + rawoutput = os.read(read_from_stdout_fd, 1024*1024) + errmsg.append(repr(rawoutput)) + else: + errmsg.append("No output from child.") + self.fail('\n'.join(errmsg)) + + retcode_background = os.waitpid(background_pid, 0)[1] + self.assertEqual(retcode_background, 0) + debug("background and slave are done") + + # We require that b'slave exits now' is left in the slave's + # STDOUT to confirm clean exit. + expecting = b'slave exits now' + slave_wrote = _os_read_exhaust_exactly(read_from_stdout_fd, len(expecting)) + self.assertEqual(slave_wrote, expecting) + for fd in [mock_stdin_fd, read_from_stdout_fd, mock_stdout_fd]: + os.close(fd) + if not close_stdin: + os.close(write_to_stdin_fd) + + +class PtyPingTest(PtySpawnTestBase): + """Master and Slave count to 1000 by turns.""" + + _EXEC_CHILD = textwrap.dedent(""" + # Set terminal to a well-defined state. Disable echoing by + # setting it to raw mode. + tty.setraw(sys.stdin.fileno()) + + # Ping-Pong count to 1000 with master. We start. + for i in range(1000): + print("Ping {:d}".format(i), end='', flush=True) + pong, num = input().split() + if pong != "Pong" or int(num) != i: + sys.exit("Did not get Pong") + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + sys.exit() #success + """) + + @staticmethod + def _background_process(to_stdin, from_stdout): + debug("Staring Ping Pong") + # Ping-Pong count to 1000 with slave. + # Read Ping from slave, reply with pong. + # If something is wrong, this may block. + for i in range(1000): + expected = "Ping {:d}".format(i) + received = _os_read_exhaust_exactly(from_stdout, + len(expected)).decode('ascii') + if expected != received: + raise RuntimeError("Expected {:s}, received " + "{:s}".format(expected, received)) + answer = "Pong {:d}\n".format(i).encode('ascii') + pty._writen(to_stdin, answer) + + return 0 # success + + def test_alternate_ping(self): + # Let background and slave Ping-Pong count to 1000 by turns. + child_code = self._EXEC_IMPORTS + self._EXEC_CHILD + self._spawn_master_and_slave(self._background_process, child_code) + + # os.forkpty is only available on some flavours of UNIX. Replace it + # by a function which always fails. Used to guarantee that the + # fallback code path in pty.fork is also tested. + _DISABLE_OS_FORKPTY = textwrap.dedent(""" + import os + def _mock_disabled_osforkpty(): + if {}: os.write(2, b'os.forkpty successfully disabled.\\n') + raise OSError + os.forkpty = _mock_disabled_osforkpty + """.format(verbose)) + + def test_ping_disable_osforkpty(self): + # Disable os.forkpty(), trigger pty.fork() fallback code path. + child_code = self._EXEC_IMPORTS + self._EXEC_CHILD + self._spawn_master_and_slave(self._background_process, child_code, + pre_spawn_hook=self._DISABLE_OS_FORKPTY) + +class PtyReadAllTest(PtySpawnTestBase): + """Read from (slow) pty.spawn()ed child, make sure we get + everything. Slow tests.""" + + @staticmethod + def _background_process(to_stdin, from_stdout): + debug("[background] starting to read") + + bytes_transferred = 0 + for i in range(500): + expected = "long cat is long "*10 + "ID {:d}".format(i) + expected = expected.encode('ascii') + received = _os_read_exactly(from_stdout, len(expected)) + if expected != received: + raise RuntimeError("Expected {!r} but got {!r}".format(expected, received)) + bytes_transferred += len(received) + + debug("[background] received {} bytes from the slave.".format(bytes_transferred)) + return 0 # success + + # a dynamic sleep time needs to be formatted. + _EXEC_CHILD_FMT = textwrap.dedent(""" + tty.setraw(sys.stdin.fileno()) + + for i in range(500): + print("long cat is long "*10 + "ID {{:d}}".format(i), end='', flush=True) + if {sleeptime:f} and i % 400 == 0: + time.sleep({sleeptime:f}) # make slow, once. + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + sys.exit() + """) + + def test_read(self): + # Receive several kBytes from the slave. + child_code = self._EXEC_IMPORTS + \ + self._EXEC_CHILD_FMT.format(sleeptime=0.05) + debug("Test may take up to 1 second ...") + self._spawn_master_and_slave(self._background_process, child_code) + + def test_read_close_stdin(self): + # Close STDIN and receive several kBytes from the slave. + # only sleep in one test to speed this up + child_code = self._EXEC_IMPORTS + \ + self._EXEC_CHILD_FMT.format(sleeptime=0) + self._spawn_master_and_slave(self._background_process, child_code, + close_stdin=True) + +class PtyTermiosIntegrationTest(PtySpawnTestBase): + """Terminals are not just pipes. This integration testsuite asserts + that specific terminal functionality is operational. It tests ISIG, + which transforms sending 0x03 at the master side (usually triggered + by humans by pressing ctrl+c) to sending an INTR signal to the + child. In addition, on Linux, it tests pretty printing of control + characters, for example ^G, which is not defined in the POSIX.1-2008 + Standard but implemented. + + This class contains larger integration tests which verify the subtle + interplay of several modules. It depends on termios, pty, signal, + and os. It uses the Linux-only control character pretty printing + feature because this is one of the simple features of terminals + which are easy to test without digging into full-fledged os-level + integration tests. + """ + + @staticmethod + def _wait_for_slave(to_stdin, from_stdout): + # Expected to be called by _background_process. Wait for the + # slave to become ready and initialized. + debug("[background] waiting for slave process.") + read = _os_read_exhaust_exactly(from_stdout, len(b'slave ready!')) + if read != b'slave ready!': + raise ValueError('handshake with slave failed') + debug("[background] slave ready.") + + @unittest.skipUnless(sys.platform == 'linux', "ECHOCTL only supported by Linux") + def _enable_echoctl(self): + # Warning: ECHOCTL is not defined in POSIX. Works on + # Linux 4.4 with Ubuntu GLIBC 2.23. Did not work on Mac. + return " | termios.ECHOCTL" #pass as additional lflags + + _EXEC_BASE_TERMINAL_SETUP_FMT = textwrap.dedent(r""" + "Set up terminal to sane defaults (with regard to my Linux system). " + "See POSIX.1-2008, Chapter 11, General Terminal Interface." + + terminal_fd = sys.stdin.fileno() + old = termios.tcgetattr(terminal_fd) + # don't need iflag + old[0] = 0 + + # oflag: output processing: replace \n by \r\n + old[1] = termios.ONLCR | termios.OPOST + + # don't need cflag + old[2] = 0 + + # lflag: canonical mode (line-buffer), + # normal echoing, + # echoing of control chars in caret notation (for example ^C) + old[3] = termios.ICANON | termios.ECHO {add_lflags} + + termios.tcsetattr(terminal_fd, termios.TCSADRAIN, old) + """) + + @staticmethod + def _background_process_echo(to_stdin, from_stdout): + PtyTermiosIntegrationTest._wait_for_slave(to_stdin, from_stdout) + + answer = b"Hello, I'm background process!\n" + pty._writen(to_stdin, answer) + + # Slave terminal echoes back everything, rewriting line endings. + answer = answer[:-1] + b'\r\n' + read = _os_read_exactly(from_stdout, len(answer)) + if read != answer: + debug("Unexpected answer: {!r}".format(read)) + raise ValueError('Getting echoed data failed') + return 0 + + _EXEC_CHILD_ECHO = textwrap.dedent(r""" + print("slave ready!", end='', flush=True) + + inp = input() + if inp != "Hello, I'm background process!": + sys.exit("failure getting answer, got `{}'".format(inp)) + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + sys.exit() + """) + + def test_echo(self): + # Echo terminal input, and translate the echoed newline. + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(add_lflags="") + \ + self._EXEC_CHILD_ECHO + self._spawn_master_and_slave(self._background_process_echo, child_code) + + @staticmethod + def _background_process_bell(to_stdin, from_stdout): + PtyTermiosIntegrationTest._wait_for_slave(to_stdin, from_stdout) + + debug("[background] sending bell escape sequence to slave") + BELL = b'\a' + to_slave = b'Bell here -> '+BELL+b' <-Hello slave!\n' + pty._writen(to_stdin, to_slave) + + # Bell character gets `pretty-printed' when echoed by terminal + expected = b'Bell here -> ^G <-Hello slave!\r\n' + received = _os_read_exactly(from_stdout, len(expected)) + if received != expected: + raise RuntimeError("Expecting {!r} but got {!r}".format(expected, received)) + + debug("[background] got it back") + return 0 + + _EXEC_CHILD_BELL = textwrap.dedent(r""" + print("slave ready!", end='', flush=True) -class SmallPtyTests(unittest.TestCase): - """These tests don't spawn children or hang.""" + command = input() + # note how background process gets ^G and slave gets \a + if command != 'Bell here -> \a <-Hello slave!': + sys.exit("failure getting bell") + # terminal has automatically echoed the command, we can ignore it + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + sys.exit() + """) + + def test_bell_echoctl(self): + # Pretty printing of the bell character in caret notation. + lflags = self._enable_echoctl() + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(add_lflags=lflags) + \ + self._EXEC_CHILD_BELL + self._spawn_master_and_slave(self._background_process_bell, child_code) + + @staticmethod + def _background_process_eof(to_stdin, from_stdout): + PtyTermiosIntegrationTest._wait_for_slave(to_stdin, from_stdout) + + debug("[background] sending slave an EOF") + EOF = b'\x04' + pty._writen(to_stdin, EOF) + + # On OS X, we found that this test leaves an EOF character in + # STDOUT. Tested on OS X 10.6.8 and 10.11.2. Wipe EOF + # character which may remain here. + c = os.read(from_stdout, 1) + if c == b'\x04': # ignore EOF character + c = os.read(from_stdout, 1) + if c != b'!': + raise RuntimeError("Did not receive marker.") + + return 0 + + _EXEC_CHILD_EOF = textwrap.dedent(""" + print("slave ready!", end='', flush=True) + + try: + input() + # unreachable if we got our EOF: + sys.exit("failure, no EOF received") + except EOFError: + # we expect an EOF here, this is good + pass + + # Send an exclamation mark as marker. + print("!", end='', flush=True) + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + sys.exit() + """) + + def test_eof(self): + # Processing of the special EOF character. + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(add_lflags="") + \ + self._EXEC_CHILD_EOF + self._spawn_master_and_slave(self._background_process_eof, child_code) + + @staticmethod + def _background_process_intr(to_stdin, from_stdout): + """Try to send SIGINT to child. Careful: Testsuite also watches + for SIGINT. We only set our signal handler in the forked + slave.""" + PtyTermiosIntegrationTest._wait_for_slave(to_stdin, from_stdout) + + debug("[background] sending interrupt escape sequence to slave.") + INTR = b'\x03' + to_slave = b'This buffered stuff will be ignored'+INTR+b' Ohai slave!\n' + pty._writen(to_stdin, to_slave) + + expected = b' Ohai slave!\r\n' + # Race: The kernel may or may not echo parts before the INTR. In + # either case, due to canonical mode, the slave will only receive the + # string after INTR. + received = _os_readline(from_stdout) + if not received.endswith(expected): + raise RuntimeError("Expecting to end with {!r} but got {!r}".format(expected, received)) + + debug("[background] got it back") + return 0 + + _EXEC_CHILD_INTR = textwrap.dedent(""" + _sigint_received = False + + def _SIGINT_handler(a, b): + global _sigint_received + _sigint_received = True + + signal.signal(signal.SIGINT, _SIGINT_handler) + + print("slave ready!", end='', flush=True) + + command = input() + # Yes, only this arrives at STDIN here! + if command != ' Ohai slave!': + print(command) + sys.exit("failure getting interrupted input") + # terminal has automatically echoed the command and ^C, we can ignore it + + # Send final confirmation that all went well to master. + print("slave exits now", end='', flush=True) + + if _sigint_received: + sys.exit() + else: + sys.exit("failure, did not receive SIGINT") + """) + + def test_intr(self): + # Writing a x03 char to the master side is translated to sending + # an INTR signal to the slave. Simulates pressing ctrl+c in + # master. + # Tell our controlling terminal to send signals on special characters + lflags = " | termios.ISIG" + child_code = self._EXEC_IMPORTS + \ + self._EXEC_BASE_TERMINAL_SETUP_FMT.format(add_lflags=lflags) + \ + self._EXEC_CHILD_INTR + self._spawn_master_and_slave(self._background_process_intr, child_code) + + +class _MockSelectEternalWait(Exception): + """Used both as exception and placeholder value. Models that no + more select activity is expected and that a test can be + terminated.""" + pass + +class PtyCopyTests(unittest.TestCase): + """Whitebox mocking tests which don't spawn children or hang. Test + the _copy loop to transfer data between parent and child.""" def setUp(self): - self.orig_stdin_fileno = pty.STDIN_FILENO - self.orig_stdout_fileno = pty.STDOUT_FILENO - self.orig_pty_select = pty.select + save_and_restore = ['pty.STDIN_FILENO', + 'pty.STDOUT_FILENO', + 'pty.select'] + self.saved = dict() + for k in save_and_restore: + module, attr = k.split('.') + module = globals()[module] + self.saved[k] = getattr(module, attr) + self.fds = [] # A list of file descriptors to close. self.files = [] self.select_rfds_lengths = [] self.select_rfds_results = [] + # monkey-patch and replace with mock + pty.select = self._mock_select + self._mock_stdin_stdout() + def tearDown(self): - pty.STDIN_FILENO = self.orig_stdin_fileno - pty.STDOUT_FILENO = self.orig_stdout_fileno - pty.select = self.orig_pty_select + for k, v in self.saved.items(): + module, attr = k.split('.') + module = globals()[module] + setattr(module, attr, v) + for file in self.files: - try: - file.close() - except OSError: - pass + file.close() for fd in self.fds: - try: - os.close(fd) - except OSError: - pass + os.close(fd) def _pipe(self): pipe_fds = os.pipe() self.fds.extend(pipe_fds) return pipe_fds - def _socketpair(self): - socketpair = socket.socketpair() - self.files.extend(socketpair) - return socketpair + def _mock_stdin_stdout(self): + """Mock STDIN and STDOUT with two fresh pipes. Replaces + pty.STDIN_FILENO/pty.STDOUT_FILENO by one end of the pipe. + Makes the other end of the pipe available via self.""" + self.read_from_stdout_fd, pty.STDOUT_FILENO = self._pipe() + pty.STDIN_FILENO, self.write_to_stdin_fd = self._pipe() def _mock_select(self, rfds, wfds, xfds): + """Simulates the behavior of select.select. Only implemented + for reader waiting list (first parameter).""" + assert wfds == [] and xfds == [] # This will raise IndexError when no more expected calls exist. self.assertEqual(self.select_rfds_lengths.pop(0), len(rfds)) - return self.select_rfds_results.pop(0), [], [] + if len(rfds) == 0: + # called with three empty lists as file descriptors to wait + # on. Behavior of real select is platform-dependent and + # likely infinite blocking on Linux. + raise self.fail("mock select on no waitables") + rfds_result = self.select_rfds_results.pop(0) + + if rfds_result is _MockSelectEternalWait: + raise _MockSelectEternalWait + return rfds_result, [], [] + + def test__mock_stdin_stdout(self): + self.assertGreater(pty.STDIN_FILENO, 2, "replaced by our mock") + + def test__mock_select(self): + # Test the select proxy of this test class. Meta testing. + self.select_rfds_lengths.append(0) + with self.assertRaises(AssertionError): + self._mock_select([], [], []) + + # Prepare two select calls. Second one will block forever. + self.select_rfds_lengths.append(3) + self.select_rfds_results.append("foo") + self.select_rfds_lengths.append(3) + self.select_rfds_results.append(_MockSelectEternalWait) + + # Call one + self.assertEqual(self._mock_select([1, 2, 3], [], []), + ("foo", [], [])) + + # Call two + with self.assertRaises(_MockSelectEternalWait): + self._mock_select([1, 2, 3], [], []) + + # lists are cleaned + self.assertEqual(self.select_rfds_lengths, []) + self.assertEqual(self.select_rfds_results, []) + + def _socketpair(self): + socketpair = socket.socketpair() + self.files.extend(socketpair) + return socketpair def test__copy_to_each(self): - """Test the normal data case on both master_fd and stdin.""" - read_from_stdout_fd, mock_stdout_fd = self._pipe() - pty.STDOUT_FILENO = mock_stdout_fd - mock_stdin_fd, write_to_stdin_fd = self._pipe() - pty.STDIN_FILENO = mock_stdin_fd - socketpair = self._socketpair() - masters = [s.fileno() for s in socketpair] + # Test the normal data case on both master_fd and stdin. + masters = [s.fileno() for s in self._socketpair()] # Feed data. Smaller than PIPEBUF. These writes will not block. os.write(masters[1], b'from master') - os.write(write_to_stdin_fd, b'from stdin') + os.write(self.write_to_stdin_fd, b'from stdin') - # Expect two select calls, the last one will cause IndexError - pty.select = self._mock_select + # Expect two select calls, the last one will simulate eternal waiting self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) + self.select_rfds_results.append([pty.STDIN_FILENO, masters[0]]) self.select_rfds_lengths.append(2) + self.select_rfds_results.append(_MockSelectEternalWait) - with self.assertRaises(IndexError): + with self.assertRaises(_MockSelectEternalWait): pty._copy(masters[0]) # Test that the right data went to the right places. - rfds = select.select([read_from_stdout_fd, masters[1]], [], [], 0)[0] - self.assertEqual([read_from_stdout_fd, masters[1]], rfds) - self.assertEqual(os.read(read_from_stdout_fd, 20), b'from master') + rfds = select.select([self.read_from_stdout_fd, masters[1]], [], [], 0)[0] + self.assertEqual([self.read_from_stdout_fd, masters[1]], rfds) + self.assertEqual(os.read(self.read_from_stdout_fd, 20), b'from master') self.assertEqual(os.read(masters[1], 20), b'from stdin') - def test__copy_eof_on_all(self): - """Test the empty read EOF case on both master_fd and stdin.""" - read_from_stdout_fd, mock_stdout_fd = self._pipe() - pty.STDOUT_FILENO = mock_stdout_fd - mock_stdin_fd, write_to_stdin_fd = self._pipe() - pty.STDIN_FILENO = mock_stdin_fd + def _copy_eof_close_slave_helper(self, close_stdin): + """Helper to test the empty read EOF case on master_fd and/or + stdin.""" socketpair = self._socketpair() masters = [s.fileno() for s in socketpair] + # This side of the channel would usually be the slave_fd of the + # child. We simulate that the child has exited and its side of + # the channel is destroyed. socketpair[1].close() - os.close(write_to_stdin_fd) + self.files.remove(socketpair[1]) - # Expect two select calls, the last one will cause IndexError - pty.select = self._mock_select + # Optionally close fd or fill with dummy data in order to + # prevent blocking on one read call + if close_stdin: + os.close(self.write_to_stdin_fd) + self.fds.remove(self.write_to_stdin_fd) + else: + os.write(self.write_to_stdin_fd, b'from stdin') + + # Expect exactly one select() call. This call returns master_fd + # and STDIN. Since the slave side of masters is closed, we + # expect the _copy loop to exit immediately. self.select_rfds_lengths.append(2) - self.select_rfds_results.append([mock_stdin_fd, masters[0]]) - # We expect that both fds were removed from the fds list as they - # both encountered an EOF before the second select call. - self.select_rfds_lengths.append(0) + self.select_rfds_results.append([pty.STDIN_FILENO, masters[0]]) + + # Run the _copy test, which returns nothing and cleanly exits + self.assertIsNone(pty._copy(masters[0])) + + # We expect that everything is consumed + self.assertEqual(self.select_rfds_results, []) + self.assertEqual(self.select_rfds_lengths, []) + + # Test that STDIN was not touched. This test simulated the + # scenario where the child process immediately closed its end of + # the pipe. This means, nothing should be copied. + rfds = select.select([self.read_from_stdout_fd, pty.STDIN_FILENO], [], [], 0)[0] + # data or EOF is still sitting unconsumed in STDIN + self.assertEqual(rfds, [pty.STDIN_FILENO]) + unconsumed = os.read(pty.STDIN_FILENO, 20) + if close_stdin: + self.assertFalse(unconsumed) #EOF + else: + self.assertEqual(unconsumed, b'from stdin') + + def test__copy_eof_on_all(self): + # Test the empty read EOF case on both master_fd and stdin. + self._copy_eof_close_slave_helper(close_stdin=True) + + def test__copy_eof_on_master(self): + # Test the empty read EOF case on only master_fd. + self._copy_eof_close_slave_helper(close_stdin=False) + + def test__copy_eof_on_stdin(self): + # Test the empty read EOF case on stdin. + masters = [s.fileno() for s in self._socketpair()] + + # Fill with dummy data + os.write(masters[1], b'from master') + + os.close(self.write_to_stdin_fd) + self.fds.remove(self.write_to_stdin_fd) + + # Expect two select() calls. The first call returns master_fd + # and STDIN. + self.select_rfds_lengths.append(2) + self.select_rfds_results.append([pty.STDIN_FILENO, masters[0]]) + # The second call causes _MockSelectEternalWait. We expect that + # STDIN is removed from the waiters as it reached EOF. + self.select_rfds_lengths.append(1) + self.select_rfds_results.append(_MockSelectEternalWait) - with self.assertRaises(IndexError): + with self.assertRaises(_MockSelectEternalWait): pty._copy(masters[0]) + # We expect that everything is consumed + self.assertEqual(self.select_rfds_results, []) + self.assertEqual(self.select_rfds_lengths, []) def tearDownModule(): reap_children()