Index: Lib/test/test_poplib.py =================================================================== --- Lib/test/test_poplib.py (révision 66888) +++ Lib/test/test_poplib.py (copie de travail) @@ -1,43 +1,284 @@ +"""Test script for poplib module.""" + +# Modified by Giampaolo Rodola' to give poplib.POP3 and poplib.POP3_SSL +# a real test suite + +import poplib +import threading +import asyncore +import asynchat import socket -import threading -import poplib +import os import time from unittest import TestCase -from test import support +from test import support as test_support -HOST = support.HOST +HOST = test_support.HOST +PORT = 0 -def server(evt, serv): - serv.listen(5) - try: - conn, addr = serv.accept() - except socket.timeout: - pass - else: - conn.send(b"+ Hola mundo\n") - conn.close() - finally: - serv.close() - evt.set() +# the dummy data returned by server when LIST and RETR commands are issued +LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' +RETR_RESP = b"""From: postmaster@python.org\ +\r\nContent-Type: text/plain\r\n\ +MIME-Version: 1.0\r\n\ +Subject: Dummy\r\n\ +\r\n\ +line1\r\n\ +line2\r\n\ +line3\r\n\ +.\r\n""" -class GeneralTests(TestCase): +class DummyPOP3Handler(asynchat.async_chat): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.set_terminator(b"\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + def collect_incoming_data(self, data): + self.in_buffer.append(data) + + def found_terminator(self): + line = b''.join(self.in_buffer) + line = str(line, 'ISO-8859-1') + self.in_buffer = [] + cmd = line.split(' ')[0].lower() + space = line.find(' ') + if space != -1: + arg = line[space + 1:] + else: + arg = "" + if hasattr(self, 'cmd_' + cmd): + method = getattr(self, 'cmd_' + cmd) + method(arg) + else: + self.push('-ERR unrecognized POP3 command "%s".' %cmd) + + def handle_error(self): + raise + + def push(self, data): + asynchat.async_chat.push(self, data.encode("ISO-8859-1") + b'\r\n') + + def cmd_echo(self, arg): + # sends back the received string (used by the test suite) + self.push(arg) + + def cmd_user(self, arg): + if arg != "guido": + self.push("-ERR no such user") + self.push('+OK password required') + + def cmd_pass(self, arg): + if arg != "python": + self.push("-ERR wrong password") + self.push('+OK 10 messages') + + def cmd_stat(self, arg): + self.push('+OK 10 100') + + def cmd_list(self, arg): + if arg: + self.push('+OK %s %s' %(arg, arg)) + else: + self.push('+OK') + asynchat.async_chat.push(self, LIST_RESP) + + cmd_uidl = cmd_list + + def cmd_retr(self, arg): + self.push('+OK %s bytes' %len(RETR_RESP)) + asynchat.async_chat.push(self, RETR_RESP) + + cmd_top = cmd_retr + + def cmd_dele(self, arg): + self.push('+OK message marked for deletion.') + + def cmd_noop(self, arg): + self.push('+OK done nothing.') + + def cmd_rpop(self, arg): + self.push('+OK done nothing.') + + +class DummyPOP3Server(asyncore.dispatcher, threading.Thread): + + handler = DummyPOP3Handler + + def __init__(self, address, af=socket.AF_INET): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(af, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.active = False + self.active_lock = threading.Lock() + self.host, self.port = self.socket.getsockname()[:2] + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all(ignore_all=True) + + def stop(self): + assert self.active + self.active = False + self.join() + + def handle_accept(self): + conn, addr = self.accept() + self.handler = self.handler(conn) + self.close() + + def handle_connect(self): + self.close() + handle_read = handle_connect + + def writable(self): + return 0 + + def handle_error(self): + raise + + +class TestPOP3Class(TestCase): + def assertOK(self, resp): + self.assertTrue(resp.startswith("+OK")) + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port) + + def tearDown(self): + self.client.quit() + self.server.stop() + + def test_getwelcome(self): + self.assertEqual(self.client.getwelcome(), '+OK dummy pop3 server ready.') + + def test_exceptions(self): + self.assertRaises(poplib.error_proto, self.client._shortcmd, 'echo -err') + + def test_user(self): + self.assertOK(self.client.user('guido')) + self.assertRaises(poplib.error_proto, self.client.user, 'invalid') + + def test_pass_(self): + self.assertOK(self.client.pass_('python')) + self.assertRaises(poplib.error_proto, self.client.user, 'invalid') + + def test_stat(self): + self.assertEqual(self.client.stat(), (10, 100)) + + def test_list(self): + self.assertEqual(self.client.list()[1:], + (['1 1', '2 2', '3 3', '4 4', '5 5'], + 20)) + self.assertTrue(self.client.list('1').endswith("OK 1 1")) + + def test_retr(self): + expected = ('+OK 116 bytes', + ['From: postmaster@python.org', 'Content-Type: text/plain', + 'MIME-Version: 1.0', 'Subject: Dummy', + '', 'line1', 'line2', 'line3'], + 105) + foo = self.client.retr('foo') + self.assertEqual(foo, expected) + + def test_dele(self): + self.assertOK(self.client.dele('foo')) + + def test_noop(self): + self.assertOK(self.client.noop()) + + def test_rpop(self): + self.assertOK(self.client.rpop('foo')) + + def test_top(self): + expected = ('+OK 116 bytes', + ['From: postmaster@python.org', 'Content-Type: text/plain', + 'MIME-Version: 1.0', 'Subject: Dummy', '', + 'line1', 'line2', 'line3'], + 105) + self.assertEqual(self.client.top(1, 1), expected) + + def test_uidl(self): + self.client.uidl() + self.client.uidl('foo') + + +SUPPORTS_SSL = False +if hasattr(poplib, 'POP3_SSL'): + import ssl + + SUPPORTS_SSL = True + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") + + class DummyPOP3_SSLHandler(DummyPOP3Handler): + + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True) + self.del_channel() + self.set_socket(ssl_socket) + self.set_terminator(b"\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready.') + + class TestPOP3_SSLClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3_SSL + + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.handler = DummyPOP3_SSLHandler + self.server.start() + self.client = poplib.POP3_SSL(self.server.host, self.server.port) + + def test__all__(self): + self.assert_('POP3_SSL' in poplib.__all__) + + +class TestTimeouts(TestCase): + + def setUp(self): self.evt = threading.Event() self.sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) self.sock.settimeout(3) - self.port = support.bind_port(self.sock) - threading.Thread(target=server, args=(self.evt,self.sock)).start() + self.port = test_support.bind_port(self.sock) + threading.Thread(target=self.server, args=(self.evt,self.sock)).start() time.sleep(.1) def tearDown(self): self.evt.wait() - def testBasic(self): - # connects - pop = poplib.POP3(HOST, self.port) - pop.sock.close() + def server(self, evt, serv): + serv.listen(5) + try: + conn, addr = serv.accept() + except socket.timeout: + pass + else: + conn.send(b"+ Hola mundo\n") + conn.close() + finally: + serv.close() + evt.set() def testTimeoutDefault(self): self.assertTrue(socket.getdefaulttimeout() is None) @@ -65,8 +306,16 @@ pop.sock.close() -def test_main(verbose=None): - support.run_unittest(GeneralTests) +def test_main(): + tests = [TestPOP3Class, TestTimeouts] + if SUPPORTS_SSL: + tests.append(TestPOP3_SSLClass) + thread_info = test_support.threading_setup() + try: + test_support.run_unittest(*tests) + finally: + test_support.threading_cleanup(*thread_info) + if __name__ == '__main__': test_main() Index: Lib/poplib.py =================================================================== --- Lib/poplib.py (révision 66888) +++ Lib/poplib.py (copie de travail) @@ -27,12 +27,6 @@ # POP SSL PORT POP3_SSL_PORT = 995 -# Line terminators (we always output CRLF, but accept any of CRLF, LFCR, LF) -CR = b'\r' -LF = b'\n' -CRLF = CR+LF - - class POP3: """This class supports both the minimal and optional command sets. @@ -75,20 +69,22 @@ above. """ - def __init__(self, host, port=POP3_PORT, - timeout=socket._GLOBAL_DEFAULT_TIMEOUT): + timeout=socket._GLOBAL_DEFAULT_TIMEOUT, + encoding="ISO-8859-1"): self.host = host self.port = port self.sock = socket.create_connection((host, port), timeout) - self.file = self.sock.makefile('rb') + self.encoding = encoding + self.file = self.sock.makefile('r', encoding=self.encoding) self._debugging = 0 self.welcome = self._getresp() def _putline(self, line): + line += '\r\n' if self._debugging > 1: print('*put*', repr(line)) - self.sock.sendall('%s%s' % (line, CRLF)) + self.sock.sendall(line.encode(self.encoding)) # Internal: send one command to the server (through _putline()) @@ -107,13 +103,6 @@ if self._debugging > 1: print('*get*', repr(line)) if not line: raise error_proto('-ERR EOF') octets = len(line) - # server can send any combination of CR & LF - # however, 'readline()' returns lines ending in LF - # so only possibilities are ...LF, ...CRLF, CR...LF - if line[-2:] == CRLF: - return line[:-2], octets - if line[0] == CR: - return line[1:-1], octets return line[:-1], octets @@ -123,8 +112,7 @@ def _getresp(self): resp, o = self._getline() if self._debugging > 1: print('*resp*', repr(resp)) - c = resp[:1] - if c != b'+': + if not resp[0].startswith('+'): raise error_proto(resp) return resp @@ -135,8 +123,8 @@ resp = self._getresp() list = []; octets = 0 line, o = self._getline() - while line != b'.': - if line[:2] == b'..': + while line != '.': + if line.startswith('..'): o = o-1 line = line[1:] octets = octets + o @@ -327,12 +315,12 @@ See the methods of the parent class POP3 for more documentation. """ - def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None): + def __init__(self, host, port = POP3_SSL_PORT, keyfile = None, certfile = None, encoding="ISO-8859-1"): self.host = host self.port = port self.keyfile = keyfile self.certfile = certfile - self.buffer = "" + self.encoding = encoding msg = "getaddrinfo returns an empty list" self.sock = None for res in socket.getaddrinfo(self.host, self.port, 0, socket.SOCK_STREAM): @@ -348,56 +336,11 @@ break if not self.sock: raise socket.error(msg) - self.file = self.sock.makefile('rb') - self.sslobj = ssl.wrap_socket(self.sock, self.keyfile, self.certfile) + self.sock = ssl.wrap_socket(self.sock, self.keyfile, self.certfile) + self.file = self.sock.makefile('r', encoding=self.encoding) self._debugging = 0 self.welcome = self._getresp() - def _fillBuffer(self): - localbuf = self.sslobj.read() - if len(localbuf) == 0: - raise error_proto('-ERR EOF') - self.buffer += localbuf - - def _getline(self): - line = "" - renewline = re.compile(r'.*?\n') - match = renewline.match(self.buffer) - while not match: - self._fillBuffer() - match = renewline.match(self.buffer) - line = match.group(0) - self.buffer = renewline.sub('' ,self.buffer, 1) - if self._debugging > 1: print('*get*', repr(line)) - - octets = len(line) - if line[-2:] == CRLF: - return line[:-2], octets - if line[0] == CR: - return line[1:-1], octets - return line[:-1], octets - - def _putline(self, line): - if self._debugging > 1: print('*put*', repr(line)) - line += CRLF - bytes = len(line) - while bytes > 0: - sent = self.sslobj.write(line) - if sent == bytes: - break # avoid copy - line = line[sent:] - bytes = bytes - sent - - def quit(self): - """Signoff: commit changes on server, unlock mailbox, close connection.""" - try: - resp = self._shortcmd('QUIT') - except error_proto as val: - resp = val - self.sock.close() - del self.sslobj, self.sock - return resp - __all__.append("POP3_SSL") if __name__ == "__main__":