# HG changeset patch # Parent 7884ffbe9ae76afc9f654a98f37cc5fcf02be45c diff --git a/Doc/library/poplib.rst b/Doc/library/poplib.rst --- a/Doc/library/poplib.rst +++ b/Doc/library/poplib.rst @@ -182,6 +182,11 @@ An :class:`POP3` instance has the follow the unique id for that message in the form ``'response mesgnum uid``, otherwise result is list ``(response, ['mesgnum uid', ...], octets)``. +.. method:: POP3.stls(keyfile=None, certfile=None, context=None) + + Start a TLS session on the active connection as specified in RFC 2595. + This is only allowed before user authentication + Instances of :class:`POP3_SSL` have no additional methods. The interface of this subclass is identical to its parent. diff --git a/Lib/poplib.py b/Lib/poplib.py --- a/Lib/poplib.py +++ b/Lib/poplib.py @@ -15,6 +15,12 @@ Based on the J. Myers POP3 draft, Jan. 9 import re, socket +try: + import ssl + HAVE_SSL = True +except ImportError: + HAVE_SSL = False + __all__ = ["POP3","error_proto"] # Exception raised when an error or invalid response is received: @@ -56,6 +62,7 @@ class POP3: TOP msg n top(msg, n) UIDL [msg] uidl(msg = None) CAPA capa() + STLS stls() Raises one exception: 'error_proto'. @@ -346,21 +353,54 @@ class POP3: raise error_proto('-ERR CAPA not supported by server') return caps -try: - import ssl -except ImportError: - pass -else: + + def stls(self, keyfile=None, certfile=None, context=None): + """Start a TLS session on the active connection as specified in RFC 2595. + """ + if not HAVE_SSL: + raise error_proto('-ERR TLS support missing') + if hasattr(self,'_tls_established') and self._tls_established: + raise error_proto('-ERR TLS session already established') + caps=self.capa() + if not b'STLS' in caps: + raise error_proto('-ERR STLS not supported by server') + try: + if context is not None and keyfile is not None: + raise ValueError("context and keyfile arguments are mutually " + "exclusive") + if context is not None and certfile is not None: + raise ValueError("context and certfile arguments are mutually " + "exclusive") + resp=self._shortcmd('STLS') + if context is None: + context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + context.options |= ssl.OP_NO_SSLv2 + if keyfile is not None or certfile is not None: + if certfile is not None: + context.load_cert_chain(certfile, keyfile) + else: + context.load_cert_chain(keyfile) + self.sock = context.wrap_socket(self.sock) + self.file = self.sock.makefile('rb') + self._tls_established = True + except error_proto as _err: + resp = _err + return resp + + +if HAVE_SSL: class POP3_SSL(POP3): """POP3 client class over SSL connection - Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None) + Instantiate with: POP3_SSL(hostname, port=995, keyfile=None, certfile=None, + context=None) hostname - the hostname of the pop3 over ssl server port - port number keyfile - PEM formatted file that countains your private key certfile - PEM formatted certificate chain file + context - a ssl.SSLContext See the methods of the parent class POP3 for more documentation. """ @@ -386,6 +426,10 @@ else: sock = ssl.wrap_socket(sock, self.keyfile, self.certfile) return sock + def stls(self): + raise ValueError("STLS not allowed on an ssl connection") + + __all__.append("POP3_SSL") if __name__ == "__main__": diff --git a/Lib/test/test_poplib.py b/Lib/test/test_poplib.py --- a/Lib/test/test_poplib.py +++ b/Lib/test/test_poplib.py @@ -125,7 +125,6 @@ class DummyPOP3Handler(asynchat.async_ch self.push('.') - class DummyPOP3Server(asyncore.dispatcher, threading.Thread): handler = DummyPOP3Handler @@ -247,7 +246,6 @@ class TestPOP3Class(TestCase): def test_capa(self): capa = self.client.capa() - print (capa) self.assertTrue(b'IMPLEMENTATION' in capa.keys()) def test_quit(self): @@ -303,6 +301,69 @@ if hasattr(poplib, 'POP3_SSL'): DummyPOP3Handler.handle_read(self) + class DummyPOP3_STLSHandler(DummyPOP3Handler): + + def __init__(self, conn): + DummyPOP3Handler.__init__(self, conn) + self.tls_active = False + self.tls_starting = False + + def _get_capas(self): + _capas = dict(self.CAPAS) + if not self.tls_active: + _capas['STLS'] = [] + return _capas + + def _do_tls_handshake(self): + try: + self.socket.do_handshake() + except ssl.SSLError as err: + if err.args[0] in (ssl.SSL_ERROR_WANT_READ, + ssl.SSL_ERROR_WANT_WRITE): + return + elif err.args[0] == ssl.SSL_ERROR_EOF: + return self.handle_close() + raise + except socket.error as err: + if err.args[0] == errno.ECONNABORTED: + return self.handle_close() + else: + self.tls_active = True + self.tls_starting = False + + def cmd_stls(self, arg): + if self.tls_active is False: + self.push('+OK Begin TLS negotiation') + tls_sock = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True, + do_handshake_on_connect=False, + suppress_ragged_eofs=False) + self.del_channel() + self.set_socket(tls_sock) + self.tls_active = True + self.tls_starting = True + self.in_buffer = [] + self._do_tls_handshake() + else: + self.push('-ERR Command not permitted when TLS active') + + def cmd_capa(self, arg): + self.push('+OK Capability list follows') + if self._get_capas(): + for cap, params in self._get_capas().items(): + _ln = [cap] + if params: + _ln.extend(params) + self.push(' '.join(_ln)) + self.push('.') + + def handle_read(self): + if self.tls_starting: + self._do_tls_handshake() + else: + DummyPOP3Handler.handle_read(self) + + class TestPOP3_SSLClass(TestPOP3Class): # repeat previous tests by using poplib.POP3_SSL @@ -333,6 +394,22 @@ if hasattr(poplib, 'POP3_SSL'): self.assertTrue(self.client.noop().startswith(b'+OK')) + class TestPOP3_TLSClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3.stls() + + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.handler = DummyPOP3_STLSHandler + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) + self.client.stls() + + def tearDown(self): + if self.client.file is not None and self.client.sock is not None: + self.client.quit() + self.server.stop() + + class TestTimeouts(TestCase): def setUp(self): @@ -391,6 +468,7 @@ def test_main(): tests = [TestPOP3Class, TestTimeouts] if SUPPORTS_SSL: tests.append(TestPOP3_SSLClass) + tests.append(TestPOP3_TLSClass) thread_info = test_support.threading_setup() try: test_support.run_unittest(*tests)