Index: Lib/test/test_telnetlib.py =================================================================== --- Lib/test/test_telnetlib.py (revision 76132) +++ Lib/test/test_telnetlib.py (working copy) @@ -1,48 +1,23 @@ import socket +import select import threading import telnetlib import time -import queue -import sys -import io +import contextlib from unittest import TestCase from test import support HOST = support.HOST -EOF_sigil = object() -def server(evt, serv, dataq=None, test_done=None): - """ Open a tcp server in four steps - 1) set evt to true to let the parent know we are ready - 2) [optional] if is not False, write the list of data from dataq.get() - to the socket. - 3) [optional] if test_done is not None, it's an event; wait - for parent to set test_done before closing connection - 4) set evt to true to let the parent know we're done - """ +def server(evt, serv): serv.listen(5) evt.set() try: conn, addr = serv.accept() - if dataq: - data = b'' - new_data = dataq.get(True, 0.5) - dataq.task_done() - for item in new_data: - if item == EOF_sigil: - break - if type(item) in [int, float]: - time.sleep(item) - else: - data += item - written = conn.send(data) - data = data[written:] except socket.timeout: pass finally: - if test_done is not None: - test_done.wait() serv.close() evt.set() @@ -100,163 +75,159 @@ self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() -def _read_setUp(self): - self.evt = threading.Event() - self.dataq = queue.Queue() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(3) - self.port = support.bind_port(self.sock) - self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) - self.thread.start() - self.evt.wait() - self.evt.clear() - time.sleep(.1) +class SocketStub(object): + ''' a socket proxy that re-defines sendall() ''' + def __init__(self, reads=[]): + self.reads = reads + self.writes = [] + self.block = False + def sendall(self, data): + self.writes.append(data) + def recv(self, size): + out = b'' + while self.reads and len(out) < size: + out += self.reads.pop(0) + if len(out) > size: + self.reads.insert(0, out[size:]) + out = out[:size] + return out -def _read_tearDown(self): - self.evt.wait() - self.thread.join() +class TelnetAlike(telnetlib.Telnet): + def fileno(self): + raise NotImplementedError() + def close(self): pass + def sock_avail(self): + return (not self.sock.block) + def msg(self, msg, *args): + with support.captured_stdout() as out: + telnetlib.Telnet.msg(self, msg, *args) + self._messages += out.getvalue() + return +def new_select(*s_args): + block = False + for l in s_args: + for fob in l: + if isinstance(fob, TelnetAlike): + block = fob.sock.block + if block: + return [[], [], []] + else: + return s_args + +@contextlib.contextmanager +def test_socket(reads): + def new_conn(*ignored): + return SocketStub(reads) + try: + old_conn = socket.create_connection + socket.create_connection = new_conn + yield None + finally: + socket.create_connection = old_conn + return + +def test_telnet(reads=[], cls=TelnetAlike): + ''' return a telnetlib.Telnet object that uses a SocketStub with + reads queued up to be read ''' + for x in reads: + assert type(x) is bytes, x + with test_socket(reads): + telnet = cls('dummy', 0) + telnet._messages = '' # debuglevel output + return telnet + class ReadTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown + def setUp(self): + self.old_select = select.select + select.select = new_select + def tearDown(self): + select.select = self.old_select - # use a similar approach to testing timeouts as test_timeout.py - # these will never pass 100% but make the fuzz big enough that it is rare - block_long = 0.6 - block_short = 0.3 - def test_read_until_A(self): + def test_read_until(self): """ - read_until(expected, [timeout]) - Read until the expected string has been seen, or a timeout is - hit (default is no timeout); may block. + read_until(expected, timeout=None) + test the blocking version of read_util """ - want = [b'x' * 10, b'match', b'y' * 10, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + want = [b'xxxmatchyyy'] + telnet = test_telnet(want) data = telnet.read_until(b'match') - self.assertEqual(data, b''.join(want[:-2])) + self.assertEqual(data, b'xxxmatch', msg=(telnet.cookedq, telnet.rawq, telnet.sock.reads)) - def test_read_until_B(self): - # test the timeout - it does NOT raise socket.timeout - want = [b'hello', self.block_long, b'not seen', EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_until(b'not seen', self.block_short) - self.assertEqual(data, want[0]) - self.assertEqual(telnet.read_all(), b'not seen') + reads = [b'x' * 50, b'match', b'y' * 50] + expect = b''.join(reads[:-1]) + telnet = test_telnet(reads) + data = telnet.read_until(b'match') + self.assertEqual(data, expect) - def test_read_all_A(self): + + def test_read_all(self): """ read_all() Read all data until EOF; may block. """ - want = [b'x' * 500, b'y' * 500, b'z' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + reads = [b'x' * 500, b'y' * 500, b'z' * 500] + expect = b''.join(reads) + telnet = test_telnet(reads) data = telnet.read_all() - self.assertEqual(data, b''.join(want[:-1])) + self.assertEqual(data, expect) return - def _test_blocking(self, func): - self.dataq.put([self.block_long, EOF_sigil]) - self.dataq.join() - start = time.time() - data = func() - self.assertTrue(self.block_short <= time.time() - start) - - def test_read_all_B(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) - - def test_read_all_C(self): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - telnet.read_all() - telnet.read_all() # shouldn't raise - - def test_read_some_A(self): + def test_read_some(self): """ read_some() Read at least one byte or EOF; may block. """ # test 'at least one byte' - want = [b'x' * 500, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - data = telnet.read_all() + telnet = test_telnet([b'x' * 500]) + data = telnet.read_some() self.assertTrue(len(data) >= 1) - - def test_read_some_B(self): # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.assertEqual(b'', telnet.read_some()) + telnet = test_telnet() + data = telnet.read_some() + self.assertEqual(b'', data) - def test_read_some_C(self): - self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) - - def _test_read_any_eager_A(self, func_name): + def _read_eager(self, func_name): """ - read_very_eager() + read_*_eager() Read all data available already queued or on the socket, without blocking. """ - want = [self.block_long, b'x' * 100, b'y' * 100, EOF_sigil] - expects = want[1] + want[2] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + want = b'x' * 100 + telnet = test_telnet([want]) func = getattr(telnet, func_name) + telnet.sock.block = True + self.assertEqual(b'', func()) + telnet.sock.block = False data = b'' while True: try: data += func() - self.assertTrue(expects.startswith(data)) except EOFError: break - self.assertEqual(expects, data) + self.assertEqual(data, want) - def _test_read_any_eager_B(self, func_name): - # test EOF - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - func = getattr(telnet, func_name) - self.assertRaises(EOFError, func) + def test_read_eager(self): + # read_eager and read_very_eager make the same gaurantees + # (they behave differently but we only test the gaurantees) + self._read_eager('read_eager') + self._read_eager('read_very_eager') + # NB -- we need to test the IAC block which is mentioned in the + # docstring but not in the module docs - # read_eager and read_very_eager make the same gaurantees - # (they behave differently but we only test the gaurantees) - def test_read_very_eager_A(self): - self._test_read_any_eager_A('read_very_eager') - def test_read_very_eager_B(self): - self._test_read_any_eager_B('read_very_eager') - def test_read_eager_A(self): - self._test_read_any_eager_A('read_eager') - def test_read_eager_B(self): - self._test_read_any_eager_B('read_eager') - # NB -- we need to test the IAC block which is mentioned in the docstring - # but not in the module docs + def read_very_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) + self.assertEqual(b'', telnet.read_very_lazy()) + while telnet.sock.reads: + telnet.fill_rawq() + data = telnet.read_very_lazy() + self.assertEqual(want, data) + self.assertRaises(EOFError, telnet.read_very_lazy) - def _test_read_any_lazy_B(self, func_name): - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - func = getattr(telnet, func_name) - telnet.fill_rawq() - self.assertRaises(EOFError, func) - - def test_read_lazy_A(self): - want = [b'x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) + def test_read_lazy(self): + want = b'x' * 100 + telnet = test_telnet([want]) self.assertEqual(b'', telnet.read_lazy()) data = b'' while True: @@ -267,36 +238,9 @@ telnet.fill_rawq() except EOFError: break - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) + self.assertTrue(want.startswith(data)) + self.assertEqual(data, want) - def test_read_lazy_B(self): - self._test_read_any_lazy_B('read_lazy') - - def test_read_very_lazy_A(self): - want = [b'x' * 100, EOF_sigil] - self.dataq.put(want) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - time.sleep(self.block_short) - self.assertEqual(b'', telnet.read_very_lazy()) - data = b'' - while True: - try: - read_data = telnet.read_very_lazy() - except EOFError: - break - data += read_data - if not read_data: - telnet.fill_rawq() - self.assertEqual(b'', telnet.cookedq) - telnet.process_rawq() - self.assertTrue(want[0].startswith(data)) - self.assertEqual(data, want[0]) - - def test_read_very_lazy_B(self): - self._test_read_any_lazy_B('read_very_lazy') - class nego_collector(object): def __init__(self, sb_getter=None): self.seen = b'' @@ -309,91 +253,32 @@ sb_data = self.sb_getter() self.sb_seen += sb_data -class SocketProxy(object): - ''' a socket proxy that re-defines sendall() ''' - def __init__(self, real_sock): - self.socket = real_sock - self._raw_sent = b'' - def __getattr__(self, k): - return getattr(self.socket, k) - def sendall(self, data): - self._raw_sent += data - self.socket.sendall(data) +tl = telnetlib -class TelnetSockSendall(telnetlib.Telnet): - def open(self, *args, **opts): - telnetlib.Telnet.open(self, *args, **opts) - self.sock = SocketProxy(self.sock) - class WriteTests(TestCase): '''The only thing that write does is replace each tl.IAC for tl.IAC+tl.IAC''' - def setUp(self): - self.evt = threading.Event() - self.test_done = threading.Event() - self.dataq = queue.Queue() - self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - self.sock.settimeout(3) - self.port = support.bind_port(self.sock) - self.thread = threading.Thread(target=server, args=( - self.evt, self.sock, self.dataq, self.test_done)) - self.thread.start() - self.evt.wait() - self.evt.clear() - time.sleep(.1) - def tearDown(self): - self.test_done.set() - self.evt.wait() - self.thread.join() - - def _test_write(self, data): - self.telnet.sock._raw_sent = b'' - self.telnet.write(data) - after_write = self.telnet.sock._raw_sent - self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), - after_write) - def test_write(self): - self.telnet = TelnetSockSendall() data_sample = [b'data sample without IAC', b'data sample with' + tl.IAC + b' one IAC', b'a few' + tl.IAC + tl.IAC + b' iacs' + tl.IAC, tl.IAC, b''] - self.telnet.open(HOST, self.port) - for d in data_sample: - self.dataq.put([b'']) - self._test_write(d) - self.telnet.close() + for data in data_sample: + telnet = test_telnet() + telnet.write(data) + written = b''.join(telnet.sock.writes) + self.assertEqual(data.replace(tl.IAC,tl.IAC+tl.IAC), written) -tl = telnetlib - -class TelnetDebuglevel(tl.Telnet): - ''' Telnet-alike that captures messages written to stdout when - debuglevel > 0 - ''' - _messages = '' - def msg(self, msg, *args): - orig_stdout = sys.stdout - sys.stdout = fake_stdout = io.StringIO() - tl.Telnet.msg(self, msg, *args) - self._messages += fake_stdout.getvalue() - sys.stdout = orig_stdout - return - class OptionTests(TestCase): - setUp = _read_setUp - tearDown = _read_tearDown # RFC 854 commands cmds = [tl.AO, tl.AYT, tl.BRK, tl.EC, tl.EL, tl.GA, tl.IP, tl.NOP] def _test_command(self, data): """ helper for testing IAC + cmd """ - self.setUp() - self.dataq.put(data) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(data) + data_len = len(b''.join(data)) nego = nego_collector() telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() @@ -401,24 +286,16 @@ self.assertTrue(len(cmd) > 0) # we expect at least one command self.assertTrue(cmd[:1] in self.cmds) self.assertEqual(cmd[1:2], tl.NOOPT) - self.assertEqual(len(b''.join(data[:-1])), len(txt + cmd)) + self.assertEqual(data_len, len(txt + cmd)) nego.sb_getter = None # break the nego => telnet cycle - self.tearDown() def test_IAC_commands(self): - # reset our setup - self.dataq.put([EOF_sigil]) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() - self.tearDown() - for cmd in self.cmds: - self._test_command([tl.IAC, cmd, EOF_sigil]) - self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100, EOF_sigil]) - self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10, EOF_sigil]) + self._test_command([tl.IAC, cmd]) + self._test_command([b'x' * 100, tl.IAC, cmd, b'y'*100]) + self._test_command([b'x' * 10, tl.IAC, cmd, b'y'*10]) # all at once - self._test_command([tl.IAC + cmd for (cmd) in self.cmds] + [EOF_sigil]) - self.assertEqual(b'', telnet.read_sb_data()) + self._test_command([tl.IAC + cmd for (cmd) in self.cmds]) def test_SB_commands(self): # RFC 855, subnegotiations portion @@ -427,11 +304,8 @@ tl.IAC + tl.SB + tl.IAC + tl.IAC + b'aa' + tl.IAC + tl.SE, tl.IAC + tl.SB + b'bb' + tl.IAC + tl.IAC + tl.IAC + tl.SE, tl.IAC + tl.SB + b'cc' + tl.IAC + tl.IAC + b'dd' + tl.IAC + tl.SE, - EOF_sigil, ] - self.dataq.put(send) - telnet = telnetlib.Telnet(HOST, self.port) - self.dataq.join() + telnet = test_telnet(send) nego = nego_collector(telnet.read_sb_data) telnet.set_option_negotiation_callback(nego.do_nego) txt = telnet.read_all() @@ -441,19 +315,6 @@ self.assertEqual(b'', telnet.read_sb_data()) nego.sb_getter = None # break the nego => telnet cycle - def _test_debuglevel(self, data, expected_msg): - """ helper for testing debuglevel messages """ - self.setUp() - self.dataq.put(data + [EOF_sigil]) - telnet = TelnetDebuglevel(HOST, self.port) - telnet.set_debuglevel(1) - self.dataq.join() - txt = telnet.read_all() - self.assertTrue(expected_msg in telnet._messages, - msg=(telnet._messages, expected_msg)) - telnet.close() - self.tearDown() - def test_debuglevel_reads(self): # test all the various places that self.msg(...) is called given_a_expect_b = [ @@ -467,21 +328,18 @@ (tl.IAC + tl.WONT + bytes([1]), ": IAC WONT 1\n"), ] for a, b in given_a_expect_b: - self._test_debuglevel([a, EOF_sigil], b) + telnet = test_telnet([a]) + telnet.set_debuglevel(1) + txt = telnet.read_all() + self.assertTrue(b in telnet._messages) return def test_debuglevel_write(self): - self.setUp() - telnet = TelnetDebuglevel(HOST, self.port) + telnet = test_telnet() telnet.set_debuglevel(1) - self.dataq.put([b'', EOF_sigil]) - self.dataq.join() telnet.write(b'xxx') expected = "send b'xxx'\n" - self.assertTrue(expected in telnet._messages, - msg=(telnet._messages, expected)) - telnet.close() - self.tearDown() + self.assertTrue(expected in telnet._messages) def test_main(verbose=None): support.run_unittest(GeneralTests, ReadTests, WriteTests, OptionTests)