diff --git a/Lib/test/test_imaplib.py b/Lib/test/test_imaplib.py --- a/Lib/test/test_imaplib.py +++ b/Lib/test/test_imaplib.py @@ -172,6 +172,303 @@ self._send_tagged(tag, 'OK', 'LOGIN completed') +class NewIMAPTests(unittest.TestCase): + _imap_class = imaplib.IMAP4 + _server_class = socketserver.TCPServer + _client = None + + def _setup(self, imap_server, connect=True): + """ + Setups imap_server (which should inherit SimpleIMAPHandler) for tests. + Returns (client, server). + """ + class TestTCPServer(self._server_class): + def handle_error(self, request, client_address): + """ + End request and raise the error if one occurs. + """ + self.close_request(request) + self.server_close() + raise + + self.addCleanup(self._cleanup) + self._server = self._server_class((support.HOST, 0), imap_server) + self._thread = threading.Thread( + name=self._server.__class__.__name__, + target=self._server.serve_forever, + # Short poll interval to make the test finish quickly. + # Time between requests is short enough that we won't wake + # up spuriously too many times. + kwargs={'poll_interval': 0.01}) + self._thread.daemon = True # In case this function raises. + self._thread.start() + + if connect: + self._client = self._imap_class(*self._server.server_address) + + return self._client, self._server + + def _cleanup(self): + """ + Cleans up the test server. This method should not be called manually, + it's being add to cleanup queue in the _setup method already. + """ + # if logout was called already we'd raise an exception trying to + # shutdown the client once again + if self._client is not None and self._client.state != 'LOGOUT': + self._client.shutdown() + # cleanup the server + self._server.shutdown() + self._server.server_close() + self._thread.join() + + def test_issue5949(self): + class EOFHandler(socketserver.StreamRequestHandler): + def handle(self): + # EOF without sending a complete welcome message. + self.wfile.write(b'* OK') + _, server = self._setup(EOFHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self._imap_class, + *server.server_address) + + def test_line_termination(self): + class BadNewlineHandler(SimpleIMAPHandler): + def cmd_CAPABILITY(self, tag, args): + self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') + self._send_tagged(tag, 'OK', 'CAPABILITY completed') + _, server = self._setup(BadNewlineHandler, connect=False) + self.assertRaises(imaplib.IMAP4.abort, self._imap_class, + *server.server_address) + + def test_enable_raises_error_if_not_AUTH(self): + class EnableHandler(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + client, _ = self._setup(EnableHandler) + self.assertFalse(client.utf8_enabled) + self.assertRaisesRegex(imaplib.IMAP4.error, + 'command ENABLE illegal in state NONAUTH',client.enable, 'foo') + self.assertFalse(client.utf8_enabled) + + def test_enable_raises_error_if_no_capability(self): + client, _ = self._setup(SimpleIMAPHandler) + self.assertRaisesRegex(imaplib.IMAP4.error, + 'Server does not support ENABLE', client.enable, 'foo') + + def test_enable_UTF8_raises_error_if_not_supported(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertRaisesRegex(imaplib.IMAP4.error, + 'Server does not support ENABLE', client.enable, 'UTF8=ACCEPT') + + def test_enable_UTF8_True_append(self): + class UTF8AppendServer(SimpleIMAPHandler): + capabilities = 'ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + def cmd_APPEND(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'okay') + client, server = self._setup(UTF8AppendServer) + self.assertEqual(client._encoding, 'ascii') + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + code, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(code, 'OK') + self.assertEqual(client._encoding, 'utf-8') + msg_string = 'Subject: üñí©öðé' + typ, data = client.append(None, None, None, msg_string.encode('utf-8')) + self.assertEqual(typ, 'OK') + self.assertEqual(server.response, + ('UTF8 (%s)\r\n' % msg_string).encode('utf-8')) + + def test_search_disallows_charset_in_utf8_mode(self): + class UTF8Server(SimpleIMAPHandler): + capabilities = 'AUTH ENABLE UTF8=ACCEPT' + def cmd_ENABLE(self, tag, args): + self._send_tagged(tag, 'OK', 'ENABLE successful') + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + client, _ = self._setup(UTF8Server) + typ, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(typ, 'OK') + typ, _ = client.enable('UTF8=ACCEPT') + self.assertEqual(typ, 'OK') + self.assertTrue(client.utf8_enabled) + self.assertRaisesRegex(imaplib.IMAP4.error, + 'Non-None charset not valid in UTF8 mode', client.search, + 'foo', 'bar') + + def test_bad_auth_name(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_tagged(tag, 'NO', + 'unrecognized authentication type {}'.format(args[0])) + client, _ = self._setup(MyServer) + self.assertRaisesRegex(imaplib.IMAP4.error, + 'unrecognized authentication type METHOD', + client.authenticate, 'METHOD', lambda: 1) + + def test_invalid_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid') + client, _ = self._setup(MyServer) + self.assertRaisesRegex(imaplib.IMAP4.error, + '\[AUTHENTICATIONFAILED\] invalid', + client.authenticate, 'MYAUTH', lambda x: b'fake') + + def test_valid_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.server.response = yield + self._send_tagged(tag, 'OK', 'FAKEAUTH successful') + # with bytes as argument to authobject + client, server = self._setup(MyServer) + code, _ = client.authenticate('MYAUTH', lambda x: b'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + # we need to cheet client state to invoke auth once again + client.state = 'NONAUTH' + # plain text as argument to authobject + code, _ = client.authenticate('MYAUTH', lambda x: 'fake') + self.assertEqual(code, 'OK') + self.assertEqual(server.response, b'ZmFrZQ==\r\n') # b64 encoded 'fake' + + def test_login_cram_md5(self): + class AuthHandler(SimpleIMAPHandler): + capabilities = 'LOGINDISABLED AUTH=CRAM-MD5' + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm' + 'VzdG9uLm1jaS5uZXQ=') + r = yield + if (r == b'dGltIGYxY2E2YmU0NjRiOWVmYT' + b'FjY2E2ZmZkNmNmMmQ5ZjMy\r\n'): + self._send_tagged(tag, 'OK', 'CRAM-MD5 successful') + else: + self._send_tagged(tag, 'NO', 'No access') + client, _ = self._setup(AuthHandler) + self.assertTrue('AUTH=CRAM-MD5' in client.capabilities) + ret, _ = client.login_cram_md5("tim", "tanstaaftanstaaf") + self.assertEqual(ret, "OK") + # we need to cheet client state to invoke auth once again + client.state = 'NONAUTH' + ret, _ = client.login_cram_md5("tim", b"tanstaaftanstaaf") + self.assertEqual(ret, "OK") + + def test_aborted_authentication(self): + class MyServer(SimpleIMAPHandler): + def cmd_AUTHENTICATE(self, tag, args): + self._send_textline('+') + self.response = yield + self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] aborted') + client, _ = self._setup(MyServer) + self.assertRaisesRegex(imaplib.IMAP4.error, + '\[AUTHENTICATIONFAILED\] aborted', + client.authenticate, 'MYAUTH', lambda x: None) + + def test_linetoolong(self): + class TooLongHandler(SimpleIMAPHandler): + def handle(self): + # Send a very long response line + self.wfile.write(b'* OK ' + imaplib._MAXLINE * b'x' + b'\r\n') + _, server = self._setup(TooLongHandler, connect=False) + self.assertRaisesRegex(imaplib.IMAP4.error, + 'got more than 1000000 bytes', + self._imap_class, *server.server_address) + + def test_simple_with_statement(self): + _, server = self._setup(SimpleIMAPHandler, connect=False) + with self._imap_class(*server.server_address): + pass + + def test_with_statement(self): + _, server = self._setup(SimpleIMAPHandler) + with self._imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + self.assertIsNone(server.logged) + + def test_with_statement_logout(self): + # what happens if already logout in the block? + _, server = self._setup(SimpleIMAPHandler) + with self._imap_class(*server.server_address) as imap: + imap.login('user', 'pass') + self.assertEqual(server.logged, 'user') + imap.logout() + self.assertIsNone(server.logged) + self.assertIsNone(server.logged) + + # command tests + + def test_login(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + self.assertEqual(client.state, 'AUTH') + + def test_logout(self): + client, _ = self._setup(SimpleIMAPHandler) + typ, data = client.login('user', 'pass') + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'LOGIN completed') + typ, data = client.logout() + self.assertEqual(typ, 'BYE') + self.assertEqual(data[0], b'IMAP4ref1 Server logging out') + self.assertEqual(client.state, 'LOGOUT') + + def test_lsub(self): + class LsubCmd(SimpleIMAPHandler): + def cmd_LSUB(self, tag, args): + self._send_textline('* LSUB () "." directoryA') + return self._send_tagged(tag, 'OK', 'LSUB completed') + client, _ = self._setup(LsubCmd) + client.login('user', 'pass') + typ, data = client.lsub() + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'() "." directoryA') + + +@unittest.skipUnless(ssl, "SSL not available") +class NewIMAPSSLTests(NewIMAPTests): + _imap_class = imaplib.IMAP4_SSL + _server_class = SecureTCPServer + + def test_ssl_raises(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + with self.assertRaisesRegex(ssl.CertificateError, + "hostname '127.0.0.1' doesn't match 'localhost'"): + _, server = self._setup(SimpleIMAPHandler) + client = self._imap_class(*server.server_address, ssl_context=ssl_context) + client.shutdown() + + def test_ssl_verified(self): + ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23) + ssl_context.verify_mode = ssl.CERT_REQUIRED + ssl_context.check_hostname = True + ssl_context.load_verify_locations(CAFILE) + + _, server = self._setup(SimpleIMAPHandler) + client = self._imap_class("localhost", server.server_address[1], + ssl_context=ssl_context) + client.shutdown() + class ThreadedNetworkedTests(unittest.TestCase): server_class = socketserver.TCPServer imap_class = imaplib.IMAP4