Index: Lib/test/test_poplib.py =================================================================== --- Lib/test/test_poplib.py (revision 66864) +++ Lib/test/test_poplib.py (working copy) @@ -1,43 +1,259 @@ +"""Test script for poplib module.""" + +# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL +# a real test suite + +import poplib +import threading +import asyncore +import asynchat import socket -import threading -import poplib +import os import time from unittest import TestCase from test import test_support +from test.test_support import HOST -HOST = test_support.HOST -def server(evt, serv): - serv.listen(5) - try: - conn, addr = serv.accept() - except socket.timeout: - pass - else: - conn.send("+ Hola mundo\n") - conn.close() - finally: - serv.close() - evt.set() +# the dummy data returned by server when LIST and RETR commands are issued +LIST_RESP = '1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' +RETR_RESP = """From: postmaster@python.org\ +\r\nContent-Type: text/plain\r\n\ +MIME-Version: 1.0\r\n\ +Subject: Dummy\r\n\ +\r\n\ +line1\r\n\ +line2\r\n\ +line3\r\n\ +.\r\n""" -class GeneralTests(TestCase): +class DummyPOP3Handler(asynchat.async_chat): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.set_terminator("\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + def collect_incoming_data(self, data): + self.in_buffer.append(data) + + def found_terminator(self): + line = ''.join(self.in_buffer) + self.in_buffer = [] + cmd = line.split(' ')[0].lower() + space = line.find(' ') + if space != -1: + arg = line[space + 1:] + else: + arg = "" + if hasattr(self, 'cmd_' + cmd): + method = getattr(self, 'cmd_' + cmd) + method(arg) + else: + self.push('-ERR unrecognized POP3 command "%s".' %cmd) + + def handle_error(self): + raise + + def push(self, data): + asynchat.async_chat.push(self, data + '\r\n') + + def cmd_echo(self, arg): + # sends back the received string (used by the test suite) + self.push(arg) + + def cmd_user(self, arg): + self.push('+OK password required') + + def cmd_pass(self, arg): + self.push('+OK 10 messages') + + def cmd_stat(self, arg): + self.push('+OK 10 100') + + def cmd_list(self, arg): + if arg: + self.push('+OK %s %s' %(arg, arg)) + else: + self.push('+OK') + asynchat.async_chat.push(self, LIST_RESP) + + cmd_uidl = cmd_list + + def cmd_retr(self, arg): + self.push('+OK %s bytes' %len(RETR_RESP)) + asynchat.async_chat.push(self, RETR_RESP) + + cmd_top = cmd_retr + + def cmd_dele(self, arg): + self.push('+OK message marked for deletion.') + + def cmd_noop(self, arg): + self.push('+OK done nothing.') + + def cmd_rpop(self, arg): + self.push('+OK done nothing.') + + +class DummyPOP3Server(asyncore.dispatcher, threading.Thread): + + handler = DummyPOP3Handler + + def __init__(self, address, af=socket.AF_INET): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(af, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.active = False + self.active_lock = threading.Lock() + self.host, self.port = self.socket.getsockname()[:2] + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all(ignore_all=True) + + def stop(self): + assert self.active + self.active = False + self.join() + + def handle_accept(self): + conn, addr = self.accept() + self.handler = self.handler(conn) + self.close() + + def handle_connect(self): + self.close() + handle_read = handle_connect + + def writable(self): + return 0 + + def handle_error(self): + raise + + +class TestPOP3Class(TestCase): + def setUp(self): + self.server = DummyPOP3Server((HOST, 0)) + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port) + + def tearDown(self): + self.client.quit() + self.server.stop() + + def test_getwelcome(self): + self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.') + + def test_exceptions(self): + self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') + + def test_user(self): + self.client.user('foo') + + def test_pass_(self): + self.client.pass_('bar') + + def test_stat(self): + self.assertEqual(self.client.stat(), (10, 100)) + + def test_list(self): + self.client.list() + self.client.list('1') + + def test_retr(self): + self.client.retr('foo') + + def test_dele(self): + self.client.dele('foo') + + def test_noop(self): + self.client.noop() + + def test_rpop(self): + self.client.rpop('foo') + + def test_top(self): + self.client.top(1, 1) + + def test_uidl(self): + self.client.uidl() + self.client.uidl('foo') + + +SUPPORTS_SSL = False +if hasattr(poplib, 'POP3_SSL'): + import ssl + + SUPPORTS_SSL = True + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") + + class DummyPOP3_SSLHandler(DummyPOP3Handler): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True) + self.set_terminator("\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + class TestPOP3_SSLClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3_SSL + + def setUp(self): + self.server = DummyPOP3Server((HOST, 0)) + self.server.handler = DummyPOP3_SSLHandler + self.server.start() + self.client = poplib.POP3_SSL(self.server.host, self.server.port) + + def test__all__(self): + self.assert_('POP3_SSL' in poplib.__all__) + + +class TestTimeouts(TestCase): + + def setUp(self): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(3) self.port = test_support.bind_port(self.sock) - threading.Thread(target=server, args=(self.evt,self.sock)).start() + threading.Thread(target=self.server, args=(self.evt,self.sock)).start() time.sleep(.1) def tearDown(self): self.evt.wait() - def testBasic(self): - # connects - pop = poplib.POP3(HOST, self.port) - pop.sock.close() + def server(self, evt, serv): + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + conn.send("+ Hola mundo\n") + conn.close() + finally: + serv.close() + evt.set() def testTimeoutDefault(self): self.assertTrue(socket.getdefaulttimeout() is None) @@ -65,8 +281,16 @@ pop.sock.close() -def test_main(verbose=None): - test_support.run_unittest(GeneralTests) +def test_main(): + tests = [TestPOP3Class, TestTimeouts] + if SUPPORTS_SSL: + tests.append(TestPOP3_SSLClass) + thread_info = test_support.threading_setup() + try: + test_support.run_unittest(*tests) + finally: + test_support.threading_cleanup(*thread_info) + if __name__ == '__main__': test_main()