diff -r 118d6f49d6d6 Lib/imaplib.py --- a/Lib/imaplib.py Fri Aug 01 17:49:24 2014 +0200 +++ b/Lib/imaplib.py Tue Aug 05 20:44:41 2014 +0200 @@ -65,7 +65,6 @@ 'CREATE': ('AUTH', 'SELECTED'), 'DELETE': ('AUTH', 'SELECTED'), 'DELETEACL': ('AUTH', 'SELECTED'), - 'EXAMINE': ('AUTH', 'SELECTED'), 'EXPUNGE': ('SELECTED',), 'FETCH': ('SELECTED',), 'GETACL': ('AUTH', 'SELECTED'), @@ -312,7 +311,7 @@ """ name = 'RECENT' typ, dat = self._untagged_response('OK', [None], name) - if dat[-1]: + if dat[-1] is not None: return typ, dat typ, dat = self.noop() # Prod server for response return self._untagged_response(typ, dat, name) diff -r 118d6f49d6d6 Lib/test/test_imaplib.py --- a/Lib/test/test_imaplib.py Fri Aug 01 17:49:24 2014 +0200 +++ b/Lib/test/test_imaplib.py Tue Aug 05 20:44:41 2014 +0200 @@ -93,11 +93,30 @@ IMAP4_SSL = None +def const(var): + return property(lambda self: var) + + class SimpleIMAPHandler(socketserver.StreamRequestHandler): timeout = 1 continuation = None capabilities = '' + # There is no real implementation of this commands but just answering OK is + # enough to test them. + simple_commands = [ + 'NOOP', + 'CHECK', + 'CLOSE', + 'COPY', + 'CREATE', + 'DELETE', + 'DELETEACL', + 'RENAME', + 'SUBSCRIBE', + 'UNSUBSCRIBE', + ] + def _send(self, message): if verbose: print("SENT: %r" % message.strip()) @@ -151,9 +170,48 @@ if continuation: self.continuation = continuation next(continuation) + elif cmd in self.simple_commands: + self._send_untagged_responses() + self._send_tagged(tag, 'OK', '%s completed' % cmd) else: self._send_tagged(tag, 'BAD', cmd + ' unknown') + def _send_untagged_responses(self): + # this reassembles the NOOP example from RFC 3501 + for update in [ + '22 EXPUNGE', + '23 EXISTS', + '3 RECENT', + '14 FETCH (FLAGS (\Seen \Deleted))' + ]: + self._send_textline('* %s' % update) + + def cmd_EXPUNGE(self, tag, args): + self._send_textline('* 42 EXPUNGE') + self._send_tagged(tag, 'OK', 'EXPUNGE completed') + + def cmd_FETCH(self, tag, args): + # This isn't even a plausible implementation (I got it from the example + # in RFC 3501 on page 34) + if '7' not in args: + for line in [ + '2 FETCH ....', + '3 FETCH ....', + '4 FETCH ....', + ]: + self._send_textline('* %s' % line) + self._send_tagged(tag, 'OK', 'FETCH completed') + else: + self._send_tagged(tag, 'NO', 'FETCH impossible') + + def cmd_GETACL(self, tag, args): + self._send_textline('* ACL INBOX Fred rwipsldexta') + self._send_tagged(tag, 'OK', 'GETACL complete') + + def cmd_LIST(self, tag, args): + self._send_textline('* LIST (\\Noselect) "/" ""') + self._send_tagged(tag, 'OK', 'LIST complete') + def cmd_CAPABILITY(self, tag, args): caps = ('IMAP4rev1 ' + self.capabilities if self.capabilities @@ -229,6 +287,19 @@ client.logout() @reap_threads + def test_getattr(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + for command in imaplib.Commands.keys(): + try: + self.assertEqual(getattr(client, command.upper()), + getattr(client, command.lower())) + except AttributeError: + self.fail(msg='imaplib.Commands entry %s is not an ' + 'attribute of imaplib.IMAP4' % command) + client.shutdown() + + @reap_threads def test_connect(self): with self.reaped_server(SimpleIMAPHandler) as server: client = self.imap_class(*server.server_address) @@ -260,6 +331,376 @@ self.imap_class, *server.server_address) @reap_threads + def test_read_readline_shutdown(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertEqual(client.read(0), b'') + self.assertEqual(client.readline(), b'') + client.shutdown() + self.assertRaises(ValueError, client.read, 0) + self.assertRaises(ValueError, client.readline) + + @reap_threads + def test_socket_getter(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertEqual(client.socket(), client.sock) + client.shutdown() + + @reap_threads + def test_noop(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertEqual(client.noop()[1], [b'NOOP completed', ]) + client.shutdown() + + @reap_threads + def test_recent(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + # the servers response b'3' is hardcoded in SimpleIMAPHandler + self.assertEqual(client.recent()[1], [b'3', ]) + # this will cover one more line: + client.noop() + self.assertEqual(client.recent()[1], [b'3', ]) + client.shutdown() + + @reap_threads + def test_check(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises(imaplib.IMAP4.error, client.CHECK) + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual(client.check()[1], [b'CHECK completed', ]) + client.shutdown() + + @reap_threads + def test_close(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises(imaplib.IMAP4.error, client.close) + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual(client.close()[1], [b'CLOSE completed', ]) + client.shutdown() + + @reap_threads + def test_copy(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.copy, b'1 2 3', 'example') + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.copy(b'1 2 3', 'example')[1], [b'COPY completed', ]) + client.shutdown() + + @reap_threads + def test_create(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises(imaplib.IMAP4.error, client.create, 'example') + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.create('example')[1], [b'CREATE completed', ]) + client.shutdown() + + @reap_threads + def test_delete(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises(imaplib.IMAP4.error, client.delete, 'example') + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.delete('example')[1], [b'DELETE completed', ]) + client.shutdown() + + @reap_threads + def test_expunge(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.expunge) + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual(client.expunge()[1], [b'42', ]) + client.shutdown() + + @reap_threads + def test_fetch(self): + with self.reaped_server(SimpleIMAPHandler) as server: + args = ['2:4', '(FLAGS BODY[HEADER.FIELDS (DATE FROM)])'] + result_prefixes = [b'2', b'3', b'4'] + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.fetch, *args) + # lets cheat a bit here: + client.state = 'SELECTED' + typ, data = client.fetch(*args) + for item in data: + self.assertIn(item[:1], result_prefixes) + self.assertEqual( + client.fetch('7', '(FLAGS BODY[HEADER.FIELDS (DATE FROM)])'), + ('NO', [b'FETCH impossible', ])) + client.shutdown() + + @reap_threads + def test_list(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.list) + # lets cheat a bit here: + client.state = 'SELECTED' + typ, data = client.list() + self.assertEqual(client.list()[1][0], b'(\\Noselect) "/" ""') + client.shutdown() + + @reap_threads + def test_lsub(self): + args = ['"#news."', '"comp.mail.*"'] + results = [ + '() "." #news.comp.mail.mime', + '() "." #news.comp.mail.misc' + ] + + class SimpleIMAPHandlerWithLsub(SimpleIMAPHandler): + lsub_response = results + def cmd_LSUB(self, tag, args): + for line in self.lsub_response: + self._send_textline('* LSUB %s' % line) + self._send_tagged(tag, 'OK', 'LSUB complete') + + with self.reaped_server(SimpleIMAPHandlerWithLsub) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.lsub, *args) + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.lsub(*args)[1], + [results[0].encode(), results[1].encode()]) + client.shutdown() + + @reap_threads + def test_rename(self): + args = ['folder', 'directory'] + result = ('OK', [b'RENAME completed', ]) + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.rename, *args) + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual(client.rename(*args), result) + client.shutdown() + + @reap_threads + def test_search(self): + args = [None, 'FLAGGED SINCE 1-Feb-1994 NOT FROM "Smith"'] + results = [b'2', b'84', b'882'] + + class SimpleIMAPHandlerWithSearch(SimpleIMAPHandler): + search_response = results + def cmd_SEARCH(self, tag, args): + numbers = '' + for number in self.search_response: + numbers += number.decode() + ' ' + self._send_textline('* SEARCH %s' % numbers) + self._send_tagged(tag, 'OK', 'SEARCH complete') + + with self.reaped_server(SimpleIMAPHandlerWithSearch) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.search, *args) + client.state = 'SELECTED' + self.assertEqual(client.search(*args), ('OK', [b'2 84 882 ', ])) + client.shutdown() + + @reap_threads + def test_select(self): + result = 42 + + class SimpleIMAPHandlerWithSelect(SimpleIMAPHandler): + inbox_exists_count = result + def cmd_SELECT(self, tag, args): + for line in [ + '%s EXISTS' % result, + '1 RECENT', + 'OK [UNSEEN 12] Message 12 is first unseen', + 'OK [UIDVALIDITY 3857529045] UIDs valid', + 'OK [UIDNEXT 4392] Predicted next UID', + 'FLAGS (\Answered \Flagged \Deleted \Seen \Draft)', + 'OK [PERMANENTFLAGS (\Deleted \Seen \*)] Limited', + ]: + self._send_textline('* %s' % line) + self._send_tagged(tag, 'OK', 'SELECT complete') + + with self.reaped_server(SimpleIMAPHandlerWithSelect) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.select) + client.state = 'AUTH' + self.assertEqual(client.select(), ('OK', [str(result).encode(), ])) + client.shutdown() + + @reap_threads + def test_status(self): + args = ['blurdybloop', '(UIDNEXT MESSAGES)'] + results = { + 'UIDNEXT': 44292, + 'MESSAGES': 231, + } + + class SimpleIMAPHandlerWithStatus(SimpleIMAPHandler): + blurdybloop_status = results + def cmd_STATUS(self, tag, args): + self._send_textline( + '* STATUS blurdybloop ' + '(MESSAGES %(MESSAGES)s UIDNEXT %(UIDNEXT)s)' % + self.blurdybloop_status) + self._send_tagged(tag, 'OK', 'STATUS complete') + + with self.reaped_server(SimpleIMAPHandlerWithStatus) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.status, *args) + client.state = 'SELECTED' + self.assertEqual( + client.status(*args), + ('OK', + [('blurdybloop (MESSAGES %(MESSAGES)s UIDNEXT %(UIDNEXT)s)' % + results).encode()])) + client.shutdown() + + @reap_threads + def test_store(self): + args = ['2:4', '+FLAGS', '(\Deleted)'] + results = { + '2': ['\\Deleted', '\\Seen'], + '3': ['\\Deleted', ], + '4': ['\\Deleted', '\\Flagged', '\\Seen'], + } + + class SimpleIMAPHandlerWithStore(SimpleIMAPHandler): + message_flags = results + def cmd_STORE(self, tag, args): + for message_id, flags in self.message_flags.items(): + self._send_textline('* %s FETCH (FLAGS (%s))' % + (message_id, ' '.join(flags))) + self._send_tagged(tag, 'OK', 'STORE complete') + + with self.reaped_server(SimpleIMAPHandlerWithStore) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.store, *args) + client.state = 'SELECTED' + typ, data = client.store(*args) + self.assertEqual(typ, 'OK') + for actual_result in data: + id = actual_result[:1].decode() + flags = results[id] + self.assertEqual("%s (FLAGS (%s))" % (id, ' '.join(flags)), + actual_result.decode()) + client.shutdown() + + @reap_threads + def test_uid(self): + fetch_args = ['FETCH', '4827313', 'FLAGS'] + fetch_answer = '* 23 FETCH (FLAGS (\Seen) UID 4827313)' + search_args = ['SEARCH', 'FLAGGED SINCE 1-Feb-1994 NOT FROM "Smith"', ] + search_answer = '* SEARCH 4827313' + + class SimpleIMAPHandlerWithUID(SimpleIMAPHandler): + uid_fetch_answer = fetch_answer + uid_search_answer = search_answer + def cmd_UID(self, tag, args): + if 'FETCH' in args: + self._send_textline(self.uid_fetch_answer) + self._send_tagged(tag, 'OK', 'UID FETCH complete') + else: + self._send_textline(self.uid_search_answer) + self._send_tagged(tag, 'OK', 'UID SEARCH complete') + + with self.reaped_server(SimpleIMAPHandlerWithUID) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.uid, *fetch_args) + client.state = 'SELECTED' + typ, data = client.uid(*fetch_args) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0].decode(), + fetch_answer[2:].replace('FETCH ', '')) + typ, data = client.uid(*search_args) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0].decode(), search_answer.split()[2]) + client.shutdown() + + @reap_threads + def test_subscribe(self): + arg = '#news.comp.mail.mime' + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.subscribe, arg) + client.state = 'SELECTED' + typ, data = client.subscribe(arg) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'SUBSCRIBE completed') + client.shutdown() + + @reap_threads + def test_unsubscribe(self): + arg = '#news.comp.mail.mime' + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.unsubscribe, arg) + client.state = 'SELECTED' + typ, data = client.unsubscribe(arg) + self.assertEqual(typ, 'OK') + self.assertEqual(data[0], b'UNSUBSCRIBE completed') + client.shutdown() + + @reap_threads + def test_response(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + client.noop() + # the servers response b'23' is hardcoded in SimpleIMAPHandler + self.assertEqual(client.response('exists')[1], [b'23', ]) + client.shutdown() + + @reap_threads + def test_getacl(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.getacl, 'INBOX') + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.getacl('INBOX')[1], [b'INBOX Fred rwipsldexta', ]) + client.shutdown() + + @reap_threads + def test_deleteacl(self): + with self.reaped_server(SimpleIMAPHandler) as server: + client = self.imap_class(*server.server_address) + self.assertRaises( + imaplib.IMAP4.error, client.deleteacl, 'example', 'john') + # lets cheat a bit here: + client.state = 'SELECTED' + self.assertEqual( + client.deleteacl('example', 'john')[1], + [b'DELETEACL completed', ]) + client.shutdown() + + @reap_threads def test_bad_auth_name(self): class MyServer(SimpleIMAPHandler):