Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(1)

Delta Between Two Patch Sets: Lib/test/test_imaplib.py

Issue 4972: context managerment support in imaplib, smtplib, ftplib
Left Patch Set: Created 7 years, 2 months ago
Right Patch Set: Created 5 years, 8 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/imaplib.py ('k') | no next file » | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 from test import support 1 from test import support
2 # If we end up with a significant number of tests that don't require 2 # If we end up with a significant number of tests that don't require
3 # threading, this test module should be split. Right now we skip 3 # threading, this test module should be split. Right now we skip
4 # them all if we don't have threading. 4 # them all if we don't have threading.
5 threading = support.import_module('threading') 5 threading = support.import_module('threading')
6 6
7 from contextlib import contextmanager 7 from contextlib import contextmanager
8 import imaplib 8 import imaplib
9 import os.path 9 import os.path
10 import socketserver 10 import socketserver
11 import time 11 import time
12 import calendar 12 import calendar
13 13
14 from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale 14 from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale
15 import unittest 15 import unittest
16 from datetime import datetime, timezone, timedelta 16 from datetime import datetime, timezone, timedelta
17 try: 17 try:
18 import ssl 18 import ssl
19 except ImportError: 19 except ImportError:
20 ssl = None 20 ssl = None
21 HAS_SNI = False
22 else:
23 from ssl import HAS_SNI
21 24
22 CERTFILE = None 25 CERTFILE = None
26 CAFILE = None
23 27
24 28
25 class TestImaplib(unittest.TestCase): 29 class TestImaplib(unittest.TestCase):
26 30
27 def test_Internaldate2tuple(self): 31 def test_Internaldate2tuple(self):
28 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 32 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
29 tt = imaplib.Internaldate2tuple( 33 tt = imaplib.Internaldate2tuple(
30 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 34 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
31 self.assertEqual(time.mktime(tt), t0) 35 self.assertEqual(time.mktime(tt), t0)
32 tt = imaplib.Internaldate2tuple( 36 tt = imaplib.Internaldate2tuple(
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
87 91
88 class SecureTCPServer: 92 class SecureTCPServer:
89 pass 93 pass
90 94
91 IMAP4_SSL = None 95 IMAP4_SSL = None
92 96
93 97
94 class SimpleIMAPHandler(socketserver.StreamRequestHandler): 98 class SimpleIMAPHandler(socketserver.StreamRequestHandler):
95 99
96 timeout = 1 100 timeout = 1
101 continuation = None
102 capabilities = ''
103
104 def setup(self):
105 super().setup()
106 self.server.logged = None
97 107
98 def _send(self, message): 108 def _send(self, message):
99 if verbose: print("SENT: %r" % message.strip()) 109 if verbose: print("SENT: %r" % message.strip())
100 self.wfile.write(message) 110 self.wfile.write(message)
101 111
112 def _send_line(self, message):
113 self._send(message + b'\r\n')
114
115 def _send_textline(self, message):
116 self._send_line(message.encode('ASCII'))
117
118 def _send_tagged(self, tag, code, message):
119 self._send_textline(' '.join((tag, code, message)))
120
102 def handle(self): 121 def handle(self):
103 # Send a welcome message. 122 # Send a welcome message.
104 self._send(b'* OK IMAP4rev1\r\n') 123 self._send_textline('* OK IMAP4rev1')
105 while 1: 124 while 1:
106 # Gather up input until we receive a line terminator or we timeout. 125 # Gather up input until we receive a line terminator or we timeout.
107 # Accumulate read(1) because it's simpler to handle the differences 126 # Accumulate read(1) because it's simpler to handle the differences
108 # between naked sockets and SSL sockets. 127 # between naked sockets and SSL sockets.
109 line = b'' 128 line = b''
110 while 1: 129 while 1:
111 try: 130 try:
112 part = self.rfile.read(1) 131 part = self.rfile.read(1)
113 if part == b'': 132 if part == b'':
114 # Naked sockets return empty strings.. 133 # Naked sockets return empty strings..
115 return 134 return
116 line += part 135 line += part
117 except OSError: 136 except OSError:
118 # ..but SSLSockets raise exceptions. 137 # ..but SSLSockets raise exceptions.
119 return 138 return
120 if line.endswith(b'\r\n'): 139 if line.endswith(b'\r\n'):
121 break 140 break
122 141
123 if verbose: print('GOT: %r' % line.strip()) 142 if verbose: print('GOT: %r' % line.strip())
124 splitline = line.split() 143 if self.continuation:
125 tag = splitline[0].decode('ASCII') 144 try:
126 cmd = splitline[1].decode('ASCII') 145 self.continuation.send(line)
146 except StopIteration:
147 self.continuation = None
148 continue
149 splitline = line.decode('ASCII').split()
150 tag = splitline[0]
151 cmd = splitline[1]
127 args = splitline[2:] 152 args = splitline[2:]
128 153
129 if hasattr(self, 'cmd_'+cmd): 154 if hasattr(self, 'cmd_'+cmd):
130 getattr(self, 'cmd_'+cmd)(tag, args) 155 continuation = getattr(self, 'cmd_'+cmd)(tag, args)
156 if continuation:
157 self.continuation = continuation
158 next(continuation)
131 else: 159 else:
132 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCI I')) 160 self._send_tagged(tag, 'BAD', cmd + ' unknown')
133 161
134 def cmd_CAPABILITY(self, tag, args): 162 def cmd_CAPABILITY(self, tag, args):
135 self._send(b'* CAPABILITY IMAP4rev1\r\n') 163 caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4 rev1'
136 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) 164 self._send_textline('* CAPABILITY ' + caps)
165 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
137 166
138 def cmd_LOGOUT(self, tag, args): 167 def cmd_LOGOUT(self, tag, args):
139 self._send('{} OK LOGOUT\r\n'.format(tag).encode('ASCII')) 168 self.server.logged = None
169 self._send_textline('* BYE IMAP4ref1 Server logging out')
170 self._send_tagged(tag, 'OK', 'LOGOUT completed')
140 171
141 def cmd_LOGIN(self, tag, args): 172 def cmd_LOGIN(self, tag, args):
142 self._send('{} OK LOGIN\r\n'.format(tag).encode('ASCII')) 173 self.server.logged = args[0]
174 self._send_tagged(tag, 'OK', 'LOGIN completed')
143 175
144 176
145 class BaseThreadedNetworkedTests(unittest.TestCase): 177 class BaseThreadedNetworkedTests(unittest.TestCase):
146 178
147 def make_server(self, addr, hdlr): 179 def make_server(self, addr, hdlr):
148 180
149 class MyServer(self.server_class): 181 class MyServer(self.server_class):
150 def handle_error(self, request, client_address): 182 def handle_error(self, request, client_address):
151 self.close_request(request) 183 self.close_request(request)
152 self.server_close() 184 self.server_close()
(...skipping 29 matching lines...) Expand all
182 if verbose: print("done") 214 if verbose: print("done")
183 215
184 @contextmanager 216 @contextmanager
185 def reaped_server(self, hdlr): 217 def reaped_server(self, hdlr):
186 server, thread = self.make_server((support.HOST, 0), hdlr) 218 server, thread = self.make_server((support.HOST, 0), hdlr)
187 try: 219 try:
188 yield server 220 yield server
189 finally: 221 finally:
190 self.reap_server(server, thread) 222 self.reap_server(server, thread)
191 223
224 @contextmanager
225 def reaped_pair(self, hdlr):
226 with self.reaped_server(hdlr) as server:
227 client = self.imap_class(*server.server_address)
228 try:
229 yield server, client
230 finally:
231 client.logout()
232
192 @reap_threads 233 @reap_threads
193 def test_connect(self): 234 def test_connect(self):
194 with self.reaped_server(SimpleIMAPHandler) as server: 235 with self.reaped_server(SimpleIMAPHandler) as server:
195 client = self.imap_class(*server.server_address) 236 client = self.imap_class(*server.server_address)
196 client.shutdown() 237 client.shutdown()
197 238
198 @reap_threads 239 @reap_threads
199 def test_issue5949(self): 240 def test_issue5949(self):
200 241
201 class EOFHandler(socketserver.StreamRequestHandler): 242 class EOFHandler(socketserver.StreamRequestHandler):
202 def handle(self): 243 def handle(self):
203 # EOF without sending a complete welcome message. 244 # EOF without sending a complete welcome message.
204 self.wfile.write(b'* OK') 245 self.wfile.write(b'* OK')
205 246
206 with self.reaped_server(EOFHandler) as server: 247 with self.reaped_server(EOFHandler) as server:
207 self.assertRaises(imaplib.IMAP4.abort, 248 self.assertRaises(imaplib.IMAP4.abort,
208 self.imap_class, *server.server_address) 249 self.imap_class, *server.server_address)
209 250
210 @reap_threads 251 @reap_threads
211 def test_line_termination(self): 252 def test_line_termination(self):
212 253
213 class BadNewlineHandler(SimpleIMAPHandler): 254 class BadNewlineHandler(SimpleIMAPHandler):
214 255
215 def cmd_CAPABILITY(self, tag, args): 256 def cmd_CAPABILITY(self, tag, args):
216 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 257 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
217 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode(' ASCII')) 258 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
218 259
219 with self.reaped_server(BadNewlineHandler) as server: 260 with self.reaped_server(BadNewlineHandler) as server:
220 self.assertRaises(imaplib.IMAP4.abort, 261 self.assertRaises(imaplib.IMAP4.abort,
221 self.imap_class, *server.server_address) 262 self.imap_class, *server.server_address)
222 263
223 @reap_threads 264 @reap_threads
265 def test_bad_auth_name(self):
266
267 class MyServer(SimpleIMAPHandler):
268
269 def cmd_AUTHENTICATE(self, tag, args):
270 self._send_tagged(tag, 'NO', 'unrecognized authentication '
271 'type {}'.format(args[0]))
272
273 with self.reaped_pair(MyServer) as (server, client):
274 with self.assertRaises(imaplib.IMAP4.error):
275 client.authenticate('METHOD', lambda: 1)
276
277 @reap_threads
278 def test_invalid_authentication(self):
279
280 class MyServer(SimpleIMAPHandler):
281
282 def cmd_AUTHENTICATE(self, tag, args):
283 self._send_textline('+')
284 self.response = yield
285 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
286
287 with self.reaped_pair(MyServer) as (server, client):
288 with self.assertRaises(imaplib.IMAP4.error):
289 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
290
291 @reap_threads
292 def test_valid_authentication(self):
293
294 class MyServer(SimpleIMAPHandler):
295
296 def cmd_AUTHENTICATE(self, tag, args):
297 self._send_textline('+')
298 self.server.response = yield
299 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
300
301 with self.reaped_pair(MyServer) as (server, client):
302 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
303 self.assertEqual(code, 'OK')
304 self.assertEqual(server.response,
305 b'ZmFrZQ==\r\n') #b64 encoded 'fake'
306
307 with self.reaped_pair(MyServer) as (server, client):
308 code, data = client.authenticate('MYAUTH', lambda x: 'fake')
309 self.assertEqual(code, 'OK')
310 self.assertEqual(server.response,
311 b'ZmFrZQ==\r\n') #b64 encoded 'fake'
312
313 @reap_threads
314 def test_login_cram_md5(self):
315
316 class AuthHandler(SimpleIMAPHandler):
317
318 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
319
320 def cmd_AUTHENTICATE(self, tag, args):
321 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
322 'VzdG9uLm1jaS5uZXQ=')
323 r = yield
324 if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n' :
325 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
326 else:
327 self._send_tagged(tag, 'NO', 'No access')
328
329 with self.reaped_pair(AuthHandler) as (server, client):
330 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
331 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
332 self.assertEqual(ret, "OK")
333
334 with self.reaped_pair(AuthHandler) as (server, client):
335 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
336 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
337 self.assertEqual(ret, "OK")
338
339
340 def test_linetoolong(self):
341 class TooLongHandler(SimpleIMAPHandler):
342 def handle(self):
343 # Send a very long response line
344 self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n')
345
346 with self.reaped_server(TooLongHandler) as server:
347 self.assertRaises(imaplib.IMAP4.error,
348 self.imap_class, *server.server_address)
349
350 @reap_threads
224 def test_simple_with_statement(self): 351 def test_simple_with_statement(self):
225 # simplest call 352 # simplest call
226 with self.reaped_server(SimpleIMAPHandler) as server: 353 with self.reaped_server(SimpleIMAPHandler) as server:
227 with self.imap_class(*server.server_address) as imap: 354 with self.imap_class(*server.server_address):
228 pass 355 pass
229 356
230 @reap_threads 357 @reap_threads
231 def test_with_statement(self): 358 def test_with_statement(self):
232 # doing something
233 with self.reaped_server(SimpleIMAPHandler) as server: 359 with self.reaped_server(SimpleIMAPHandler) as server:
234 with self.imap_class(*server.server_address) as imap: 360 with self.imap_class(*server.server_address) as imap:
235 imap.login('user', 'pass') 361 imap.login('user', 'pass')
236 362 self.assertEqual(server.logged, 'user')
237 @reap_threads 363 self.assertIsNone(server.logged)
238 def test_with_statement_quit(self): 364
239 # what happens if already quit in the block ? 365 @reap_threads
366 def test_with_statement_logout(self):
367 # what happens if already logout in the block?
240 with self.reaped_server(SimpleIMAPHandler) as server: 368 with self.reaped_server(SimpleIMAPHandler) as server:
241 with self.imap_class(*server.server_address) as imap: 369 with self.imap_class(*server.server_address) as imap:
242 imap.login('user', 'pass') 370 imap.login('user', 'pass')
371 self.assertEqual(server.logged, 'user')
243 imap.logout() 372 imap.logout()
ezio.melotti 2013/01/27 17:53:12 Shouldn't there be some asserts that check that af
storchaka 2013/01/27 19:57:19 Good idea.
373 self.assertIsNone(server.logged)
374 self.assertIsNone(server.logged)
244 375
245 376
246 class ThreadedNetworkedTests(BaseThreadedNetworkedTests): 377 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
247 378
248 server_class = socketserver.TCPServer 379 server_class = socketserver.TCPServer
249 imap_class = imaplib.IMAP4 380 imap_class = imaplib.IMAP4
250 381
251 382
252 @unittest.skipUnless(ssl, "SSL not available") 383 @unittest.skipUnless(ssl, "SSL not available")
253 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): 384 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
254 385
255 server_class = SecureTCPServer 386 server_class = SecureTCPServer
256 imap_class = IMAP4_SSL 387 imap_class = IMAP4_SSL
388
389 @reap_threads
390 @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module')
391 def test_ssl_verified(self):
392 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
393 ssl_context.verify_mode = ssl.CERT_REQUIRED
394 ssl_context.check_hostname = True
395 ssl_context.load_verify_locations(CAFILE)
396
397 with self.assertRaisesRegex(ssl.CertificateError,
398 "hostname '127.0.0.1' doesn't match 'localho st'"):
399 with self.reaped_server(SimpleIMAPHandler) as server:
400 client = self.imap_class(*server.server_address,
401 ssl_context=ssl_context)
402 client.shutdown()
403
404 with self.reaped_server(SimpleIMAPHandler) as server:
405 client = self.imap_class("localhost", server.server_address[1],
406 ssl_context=ssl_context)
407 client.shutdown()
257 408
258 409
259 class RemoteIMAPTest(unittest.TestCase): 410 class RemoteIMAPTest(unittest.TestCase):
260 host = 'cyrus.andrew.cmu.edu' 411 host = 'cyrus.andrew.cmu.edu'
261 port = 143 412 port = 143
262 username = 'anonymous' 413 username = 'anonymous'
263 password = 'pass' 414 password = 'pass'
264 imap_class = imaplib.IMAP4 415 imap_class = imaplib.IMAP4
265 416
266 def setUp(self): 417 def setUp(self):
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
355 with transient_internet(self.host): 506 with transient_internet(self.host):
356 self.assertRaises(ValueError, self.imap_class, self.host, self.port, 507 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
357 certfile=CERTFILE, ssl_context=self.create_ssl_con text()) 508 certfile=CERTFILE, ssl_context=self.create_ssl_con text())
358 509
359 def test_ssl_context_keyfile_exclusive(self): 510 def test_ssl_context_keyfile_exclusive(self):
360 with transient_internet(self.host): 511 with transient_internet(self.host):
361 self.assertRaises(ValueError, self.imap_class, self.host, self.port, 512 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
362 keyfile=CERTFILE, ssl_context=self.create_ssl_cont ext()) 513 keyfile=CERTFILE, ssl_context=self.create_ssl_cont ext())
363 514
364 515
365 def test_main(): 516 def load_tests(*args):
366 tests = [TestImaplib] 517 tests = [TestImaplib]
367 518
368 if support.is_resource_enabled('network'): 519 if support.is_resource_enabled('network'):
369 if ssl: 520 if ssl:
370 global CERTFILE 521 global CERTFILE, CAFILE
371 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 522 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
372 "keycert.pem") 523 "keycert3.pem")
373 if not os.path.exists(CERTFILE): 524 if not os.path.exists(CERTFILE):
374 raise support.TestFailed("Can't read certificate files!") 525 raise support.TestFailed("Can't read certificate files!")
526 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
527 "pycacert.pem")
528 if not os.path.exists(CAFILE):
529 raise support.TestFailed("Can't read CA file!")
375 tests.extend([ 530 tests.extend([
376 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, 531 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
377 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest, 532 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest,
378 ]) 533 ])
379 534
380 support.run_unittest(*tests) 535 return unittest.TestSuite([unittest.makeSuite(test) for test in tests])
381 536
382 537
383 if __name__ == "__main__": 538 if __name__ == "__main__":
384 support.use_resources = ['network'] 539 unittest.main()
385 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+