Index: Lib/test/test_telnetlib.py =================================================================== --- Lib/test/test_telnetlib.py (revision 71100) +++ Lib/test/test_telnetlib.py (working copy) @@ -2,23 +2,46 @@ import threading import telnetlib import time +import Queue from unittest import TestCase from test import test_support HOST = test_support.HOST +EOF_sigil = object() -def server(evt, serv): +def server(evt, serv, dataq=None): + """ Open a tcp server in three steps + 1) set evt to true to let the parent know we are ready + 2) [optional] write all the data in dataq to the socket + terminate when dataq.get() returns EOF_sigil + 3) set evt to true to let the parent know we're done + """ serv.listen(5) evt.set() try: conn, addr = serv.accept() + if dataq: + data = '' + new_data = dataq.get(True, 0.5) + while new_data is not EOF_sigil: + if type(new_data) == str: + data += new_data + elif type(new_data) in [int, float]: + time.sleep(new_data) + written = conn.send(data) + data = data[written:] + new_data = dataq.get(True, 0.5) except socket.timeout: pass finally: serv.close() evt.set() +def wibble_float(num): + ''' return a (low, high) tuple that are 1% more and 1% less of num ''' + return num * 0.99, num * 1.01 + class GeneralTests(TestCase): def setUp(self): @@ -26,13 +49,15 @@ self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(3) self.port = test_support.bind_port(self.sock) - threading.Thread(target=server, args=(self.evt,self.sock)).start() + self.thread = threading.Thread(target=server, args=(self.evt,self.sock)) + self.thread.start() self.evt.wait() self.evt.clear() time.sleep(.1) def tearDown(self): self.evt.wait() + self.thread.join() def testBasic(self): # connects @@ -71,9 +96,184 @@ self.assertEqual(telnet.sock.gettimeout(), 30) telnet.sock.close() +class ReadTests(TestCase): + def setUp(self): + # the blocking constant should be tuned! + self.blocking_timeout = 0.0 + self.evt = threading.Event() + self.dataq = Queue.Queue() + self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + self.sock.settimeout(3) + self.port = test_support.bind_port(self.sock) + self.thread = threading.Thread(target=server, args=(self.evt,self.sock, self.dataq)) + self.thread.start() + self.evt.wait() + self.evt.clear() + time.sleep(.1) + def tearDown(self): + self.evt.wait() + self.thread.join() + + def _test_blocking(self, func): + start = time.time() + self.dataq.put(self.blocking_timeout) + self.dataq.put(EOF_sigil) + data = func() + low, high = wibble_float(self.blocking_timeout) + self.assertTrue(time.time() - start >= low) + + def test_read_until_A(self): + """ + read_until(expected, [timeout]) + Read until the expected string has been seen, or a timeout is + hit (default is no timeout); may block. + """ + want = ['x' * 10, 'match', 'y' * 10, EOF_sigil] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + data = telnet.read_until('match') + self.assertEqual(data, ''.join(want[:-2])) + + def test_read_until_B(self): + # test the timeout - it does NOT raise socket.timeout + want = ['hello', self.blocking_timeout, EOF_sigil] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + start = time.time() + timeout = self.blocking_timeout / 2 + data = telnet.read_until('not seen', timeout) + low, high = wibble_float(timeout) + self.assertTrue(low <= time.time() - high) + self.assertEqual(data, want[0]) + + def test_read_all_A(self): + """ + read_all() + Read all data until EOF; may block. + """ + want = ['x' * 500, 'y' * 500, 'z' * 500, EOF_sigil] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + data = telnet.read_all() + self.assertEqual(data, ''.join(want[:-1])) + return + + def test_read_all_B(self): + self._test_blocking(telnetlib.Telnet(HOST, self.port).read_all) + + def test_read_some_A(self): + """ + read_some() + Read at least one byte or EOF; may block. + """ + # test 'at least one byte' + want = ['x' * 500, EOF_sigil] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + data = telnet.read_all() + self.assertTrue(len(data) >= 1) + + def test_read_some_B(self): + # test EOF + self.dataq.put(EOF_sigil) + telnet = telnetlib.Telnet(HOST, self.port) + self.assertEqual('', telnet.read_some()) + + def test_read_all_C(self): + self._test_blocking(telnetlib.Telnet(HOST, self.port).read_some) + + def _test_read_any_eager_A(self, func_name): + """ + read_very_eager() + Read all data available already queued or on the socket, + without blocking. + """ + # this never blocks so it should return eat part in turn + want = ['x' * 100, self.blocking_timeout/2, 'y' * 100, EOF_sigil] + expects = want[0] + want[2] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + func = getattr(telnet, func_name) + time.sleep(self.blocking_timeout/10) + data = '' + while True: + try: + data += func() + self.assertTrue(expects.startswith(data)) + time.sleep(self.blocking_timeout) + except EOFError: + break + self.assertEqual(expects, data) + + def _test_read_any_eager_B(self, func_name): + # test EOF + self.dataq.put(EOF_sigil) + time.sleep(self.blocking_timeout / 10) + telnet = telnetlib.Telnet(HOST, self.port) + func = getattr(telnet, func_name) + self.assertRaises(EOFError, func) + + # read_eager and read_very_eager make the same gaurantees + # (they behave differently but we only test the gaurantees) + def test_read_very_eager_A(self): + self._test_read_any_eager_A('read_very_eager') + def test_read_very_eager_B(self): + self._test_read_any_eager_B('read_very_eager') + def test_read_eager_A(self): + self._test_read_any_eager_A('read_eager') + def test_read_eager_B(self): + self._test_read_any_eager_B('read_eager') + # NB -- we need to test the IAC block which is mentioned in the docstring + # but not in the module docs + + def _test_read_any_lazy_A(self, func_name): + want = [self.blocking_timeout/2, 'x' * 100, EOF_sigil] + for item in want: + self.dataq.put(item) + telnet = telnetlib.Telnet(HOST, self.port) + func = getattr(telnet, func_name) + self.assertEqual('', func()) + data = '' + while True: + time.sleep(self.blocking_timeout) + try: + telnet.fill_rawq() + data += func() + if not data: + break + except EOFError: + break + self.assertTrue(want[1].startswith(data)) + return data, want[1] + + def _test_read_any_lazy_B(self, func_name): + self.dataq.put(EOF_sigil) + telnet = telnetlib.Telnet(HOST, self.port) + func = getattr(telnet, func_name) + time.sleep(self.blocking_timeout/10) + telnet.fill_rawq() + self.assertRaises(EOFError, func) + + # read_lazy and read_very_lazy make the samish gaurantees + def test_read_very_lazy_A(self): + data, want = self._test_read_any_lazy_A('read_very_lazy') + self.assertEqual(data, '') + def test_read_lazy(self): + data, want = self._test_read_any_lazy_A('read_lazy') + self.assertEqual(data, want) + def test_read_very_lazy_B(self): + self._test_read_any_lazy_B('read_very_lazy') + def test_read_lazy_B(self): + self._test_read_any_lazy_B('read_lazy') + def test_main(verbose=None): - test_support.run_unittest(GeneralTests) + test_support.run_unittest(GeneralTests, ReadTests) if __name__ == '__main__': test_main()