Index: Lib/imaplib.py =================================================================== --- Lib/imaplib.py (révision 68762) +++ Lib/imaplib.py (copie de travail) @@ -213,7 +213,14 @@ return getattr(self, attr.lower()) raise AttributeError("Unknown IMAP4 command: '%s'" % attr) + def __enter__(self): + """Context management protocol. Returns self.""" + return self + def __exit__(self, *args): + """Context management protocol. + logout() will close properly even if not logged in.""" + self.logout() # Overridable methods Index: Lib/test/test_imaplib.py =================================================================== --- Lib/test/test_imaplib.py (révision 68762) +++ Lib/test/test_imaplib.py (copie de travail) @@ -1,10 +1,110 @@ import imaplib import time +import asyncore +import asynchat +import threading +import socket from test import test_support +from test.test_support import HOST import unittest +class DummyIMAP4Handler(asynchat.async_chat): + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + self.set_terminator("\r\n") + self.in_buffer = [] + self.previous_cmd = None + self.push(r'* OK Welcome') + + def collect_incoming_data(self, data): + self.in_buffer.append(data) + + def found_terminator(self): + line = ''.join(self.in_buffer) + line = line.split() + for index, part in enumerate(line[1:3]): + cmd = 'cmd_%s' % part.lower() + if hasattr(self, cmd): + cmd = getattr(self, cmd) + args = line[index+1:] + if (self.previous_cmd is not None and + line[1].startswith(self.previous_cmd)): + tag = line[1][len(self.previous_cmd):] + else: + tag = line[0] + cmd(tag, args) + self.previous_cmd = part + return + # this fake server needs more code + raise NotImplementedError(str(line)) + + def handle_error(self): + raise + + def push(self, data): + asynchat.async_chat.push(self, data + '\r\n') + + def cmd_capability(self, tag, args): + self.push('* CAPABILITY IMAP4REV1') + self.push('%s OK CAPABILITY' % tag) + + def cmd_logout(self, tag, args): + self.push('%s OK LOGOUT' % tag) + + def cmd_login(self, tag, args): + self.push('%s OK LOGIN' % tag) + +class DummyIMAP4Server(asyncore.dispatcher, threading.Thread): + + handler = DummyIMAP4Handler + + def __init__(self, address, af=socket.AF_INET): + threading.Thread.__init__(self) + asyncore.dispatcher.__init__(self) + self.create_socket(af, socket.SOCK_STREAM) + self.bind(address) + self.listen(5) + self.active = False + self.active_lock = threading.Lock() + self.host, self.port = self.socket.getsockname()[:2] + + def start(self): + assert not self.active + self.__flag = threading.Event() + threading.Thread.start(self) + self.__flag.wait() + + def run(self): + self.active = True + self.__flag.set() + while self.active and asyncore.socket_map: + self.active_lock.acquire() + asyncore.loop(timeout=0.1, count=1) + self.active_lock.release() + asyncore.close_all(ignore_all=True) + + def stop(self): + assert self.active + self.active = False + self.join() + + def handle_accept(self): + conn, addr = self.accept() + self.handler = self.handler(conn) + self.close() + + def handle_connect(self): + self.close() + handle_read = handle_connect + + def writable(self): + return 0 + + def handle_error(self): + raise + class TestImaplib(unittest.TestCase): def test_that_Time2Internaldate_returns_a_result(self): # We can check only that it successfully produces a result, @@ -16,7 +116,37 @@ for t in timevalues: imaplib.Time2Internaldate(t) + def imaped(func): + def _imap(self): + server = DummyIMAP4Server((HOST, 0)) + server.start() + self.host = server.host + self.port = server.port + try: + return func(self) + finally: + server.stop() + return _imap + @imaped + def test_simple_with_statement(self): + # simplest call + with imaplib.IMAP4(self.host, self.port) as imap: + pass + + @imaped + def test_with_statement(self): + # doing something + with imaplib.IMAP4(self.host, self.port) as imap: + imap.login('user', 'pass') + + @imaped + def test_with_statement_quit(self): + # what happens if already quit in the block ? + with imaplib.IMAP4(self.host, self.port) as imap: + imap.login('user', 'pass') + imap.logout() + def test_main(): test_support.run_unittest(TestImaplib)