#!/usr/bin/env python3.0 import array import os import pty import traceback for pipe_func, name in ((os.pipe, 'os.pipe'), (pty.openpty, 'pty.openpty')): master_fd, slave_fd = pipe_func() master_file = os.fdopen(master_fd, 'rb') # Simulate a subprocess writing some data to the # slave end of the pipe, and then exiting. os.write(slave_fd, ('%s: success' % name).encode()) os.close(slave_fd) bufsize = 1024 eof = False success = False while not eof: buf = array.array('b') try: buf.fromfile(master_file, bufsize) except EOFError: eof = True except IOError: traceback.print_exc() eof = True if not buf: eof = True else: success = True print(buf.tostring().decode()) if not success: print('%s: failure' % name) master_file.close()