Index: Lib/test/test_imaplib.py =================================================================== --- Lib/test/test_imaplib.py (revision 76683) +++ Lib/test/test_imaplib.py (working copy) @@ -1,9 +1,16 @@ import imaplib +import os.path +import socket +import SocketServer +import ssl +import sys import time from test import test_support +from test.test_support import reap_threads, verbose import unittest +CERTFILE = None class TestImaplib(unittest.TestCase): def test_that_Time2Internaldate_returns_a_result(self): @@ -16,10 +23,157 @@ for t in timevalues: imaplib.Time2Internaldate(t) +try: + import threading +except ImportError: + _have_threads = False +else: + _have_threads = True + class SecureTCPServer(SocketServer.TCPServer): + def get_request(self): + newsocket, fromaddr = self.socket.accept() + connstream = ssl.wrap_socket(newsocket, + server_side=True, + certfile=CERTFILE) + return connstream, fromaddr + + class ThreadedTCPServer(SocketServer.ThreadingMixIn, SocketServer.TCPServer): + pass + + class ThreadedSecureTCPServer(SocketServer.ThreadingMixIn, SecureTCPServer): + pass + + class SimpleIMAPHandler(SocketServer.StreamRequestHandler): + def _send(self, message): + if verbose: print "SENT:", message.strip() + self.wfile.write(message) + + def handle(self): + # Welcome message + self._send('* OK IMAP4rev1\r\n') + while 1: + line = self.rfile.readline() + if line == '': + return + if not line.endswith('\r\n'): + raise ValueError('unterminated client request') + if verbose: print 'GOT:', line.strip() + + splitline = line.split() + tag = splitline[0] + cmd = splitline[1] + args = splitline[2:] + + if hasattr(self, 'cmd_%s' % (cmd,)): + getattr(self, 'cmd_%s' % (cmd,))(tag, args) + else: + self._send('%s BAD %s unknown\r\n' % (tag, cmd)) + + def cmd_CAPABILITY(self, tag, args): + self._send('* CAPABILITY IMAP4rev1\r\n') + self._send('%s OK CAPABILITY completed\r\n' % (tag,)) + + class ThreadedNetworkedTests(unittest.TestCase): + def make_server(self, addr, svrcls, hdlr): + class MyServer(svrcls): + def handle_error(self, request, client_address): + self.close_request(request) + self.server_close() + raise + + if verbose: print "creating server" + server = MyServer(addr, hdlr) + self.assertEquals(server.server_address, server.socket.getsockname()) + return server + + @reap_threads + def run_server(self, svrcls, hdlr, testfunc): + server = self.make_server((test_support.HOST, 0), svrcls, hdlr) + # We had the OS pick a port, so pull the real address out of + # the server. + addr = server.server_address + if verbose: + print "server created" + print "ADDR =", addr + print "CLASS =", svrcls + print "HDLR =", hdlr + t = threading.Thread( + name='%s serving' % svrcls, + target=server.serve_forever, + # Short poll interval to make the test finish quickly. + # Time between requests is short enough that we won't wake + # up spuriously too many times. + kwargs={'poll_interval':0.01}) + t.daemon = True # In case this function raises. + t.start() + if verbose: print "server running" + for i in range(3): + if verbose: print "test client", i + testfunc(addr) + if verbose: print "waiting for server" + server.shutdown() + t.join() + if verbose: print "done" + + def connect_with_IMAP_SSL(self, addr): + server = imaplib.IMAP4_SSL(*addr) + server.shutdown() + + def connect_with_IMAP(self, addr): + server = imaplib.IMAP4(*addr) + server.shutdown() + + def test_connect(self): + self.run_server(ThreadedTCPServer, + SimpleIMAPHandler, + self.connect_with_IMAP) + self.run_server(ThreadedSecureTCPServer, + SimpleIMAPHandler, + self.connect_with_IMAP_SSL) + + def test_issue5949(self): + class EOFHandler(SocketServer.StreamRequestHandler): + def handle(self): + # EOF without sending a complete welcome message + self.wfile.write('* OK') + + def testfunc(addr): + self.assertRaises(imaplib.IMAP4.abort, + self.connect_with_IMAP, addr) + self.run_server(ThreadedTCPServer, + EOFHandler, + testfunc) + + def testfunc(addr): + self.assertRaises(imaplib.IMAP4.abort, + self.connect_with_IMAP_SSL, addr) + self.run_server(ThreadedSecureTCPServer, + EOFHandler, + testfunc) + def test_main(): - test_support.run_unittest(TestImaplib) + global CERTFILE + CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, + "keycert.pem") + if not os.path.exists(CERTFILE): + raise test_support.TestFailed("Can't read certificate files!") + if verbose: print "CERTFILE =", CERTFILE + + tests = [TestImaplib] + + thread_info = None + if _have_threads: + thread_info = test_support.threading_setup() + if thread_info and test_support.is_resource_enabled('network'): + tests.append(ThreadedNetworkedTests) + + test_support.run_unittest(*tests) + + if thread_info: + test_support.threading_cleanup(*thread_info) + if __name__ == "__main__": - unittest.main() + test_main()