Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2)

Delta Between Two Patch Sets: Lib/test/test_imaplib.py

Issue 4972: context managerment support in imaplib, smtplib, ftplib
Left Patch Set: Created 7 years ago
Right Patch Set: Created 5 years, 7 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
Left: Side by side diff | Download
Right: Side by side diff | Download
« no previous file with change/comment | « Lib/imaplib.py ('k') | no next file » | no next file with change/comment »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
LEFTRIGHT
1 from test import support 1 from test import support
2 # If we end up with a significant number of tests that don't require 2 # If we end up with a significant number of tests that don't require
3 # threading, this test module should be split. Right now we skip 3 # threading, this test module should be split. Right now we skip
4 # them all if we don't have threading. 4 # them all if we don't have threading.
5 threading = support.import_module('threading') 5 threading = support.import_module('threading')
6 6
7 from contextlib import contextmanager 7 from contextlib import contextmanager
8 import imaplib 8 import imaplib
9 import os.path 9 import os.path
10 import socketserver 10 import socketserver
11 import time 11 import time
12 import calendar 12 import calendar
13 import sys
14 13
15 from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale 14 from test.support import reap_threads, verbose, transient_internet, run_with_tz, run_with_locale
16 import unittest 15 import unittest
17 from datetime import datetime, timezone, timedelta 16 from datetime import datetime, timezone, timedelta
18 try: 17 try:
19 import ssl 18 import ssl
20 except ImportError: 19 except ImportError:
21 ssl = None 20 ssl = None
21 HAS_SNI = False
22 else:
23 from ssl import HAS_SNI
22 24
23 CERTFILE = None 25 CERTFILE = None
26 CAFILE = None
24 27
25 28
26 class TestImaplib(unittest.TestCase): 29 class TestImaplib(unittest.TestCase):
27 30
28 def test_Internaldate2tuple(self): 31 def test_Internaldate2tuple(self):
29 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1)) 32 t0 = calendar.timegm((2000, 1, 1, 0, 0, 0, -1, -1, -1))
30 tt = imaplib.Internaldate2tuple( 33 tt = imaplib.Internaldate2tuple(
31 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")') 34 b'25 (INTERNALDATE "01-Jan-2000 00:00:00 +0000")')
32 self.assertEqual(time.mktime(tt), t0) 35 self.assertEqual(time.mktime(tt), t0)
33 tt = imaplib.Internaldate2tuple( 36 tt = imaplib.Internaldate2tuple(
(...skipping 54 matching lines...) Expand 10 before | Expand all | Expand 10 after
88 91
89 class SecureTCPServer: 92 class SecureTCPServer:
90 pass 93 pass
91 94
92 IMAP4_SSL = None 95 IMAP4_SSL = None
93 96
94 97
95 class SimpleIMAPHandler(socketserver.StreamRequestHandler): 98 class SimpleIMAPHandler(socketserver.StreamRequestHandler):
96 99
97 timeout = 1 100 timeout = 1
101 continuation = None
102 capabilities = ''
98 103
99 def setup(self): 104 def setup(self):
100 super().setup() 105 super().setup()
101 self.server.logged = None 106 self.server.logged = None
102 107
103 def _send(self, message): 108 def _send(self, message):
104 if verbose: print("SENT: %r" % message.strip()) 109 if verbose: print("SENT: %r" % message.strip())
105 self.wfile.write(message) 110 self.wfile.write(message)
106 111
112 def _send_line(self, message):
113 self._send(message + b'\r\n')
114
115 def _send_textline(self, message):
116 self._send_line(message.encode('ASCII'))
117
118 def _send_tagged(self, tag, code, message):
119 self._send_textline(' '.join((tag, code, message)))
120
107 def handle(self): 121 def handle(self):
108 # Send a welcome message. 122 # Send a welcome message.
109 self._send(b'* OK IMAP4rev1\r\n') 123 self._send_textline('* OK IMAP4rev1')
110 while 1: 124 while 1:
111 # Gather up input until we receive a line terminator or we timeout. 125 # Gather up input until we receive a line terminator or we timeout.
112 # Accumulate read(1) because it's simpler to handle the differences 126 # Accumulate read(1) because it's simpler to handle the differences
113 # between naked sockets and SSL sockets. 127 # between naked sockets and SSL sockets.
114 line = b'' 128 line = b''
115 while 1: 129 while 1:
116 try: 130 try:
117 part = self.rfile.read(1) 131 part = self.rfile.read(1)
118 if part == b'': 132 if part == b'':
119 # Naked sockets return empty strings.. 133 # Naked sockets return empty strings..
120 return 134 return
121 line += part 135 line += part
122 except OSError: 136 except OSError:
123 # ..but SSLSockets raise exceptions. 137 # ..but SSLSockets raise exceptions.
124 return 138 return
125 if line.endswith(b'\r\n'): 139 if line.endswith(b'\r\n'):
126 break 140 break
127 141
128 if verbose: print('GOT: %r' % line.strip()) 142 if verbose: print('GOT: %r' % line.strip())
129 splitline = line.split() 143 if self.continuation:
130 tag = splitline[0].decode('ASCII') 144 try:
131 cmd = splitline[1].decode('ASCII') 145 self.continuation.send(line)
146 except StopIteration:
147 self.continuation = None
148 continue
149 splitline = line.decode('ASCII').split()
150 tag = splitline[0]
151 cmd = splitline[1]
132 args = splitline[2:] 152 args = splitline[2:]
133 153
134 if hasattr(self, 'cmd_'+cmd): 154 if hasattr(self, 'cmd_'+cmd):
135 getattr(self, 'cmd_'+cmd)(tag, args) 155 continuation = getattr(self, 'cmd_'+cmd)(tag, args)
156 if continuation:
157 self.continuation = continuation
158 next(continuation)
136 else: 159 else:
137 self._send('{} BAD {} unknown\r\n'.format(tag, cmd).encode('ASCI I')) 160 self._send_tagged(tag, 'BAD', cmd + ' unknown')
138 161
139 def cmd_CAPABILITY(self, tag, args): 162 def cmd_CAPABILITY(self, tag, args):
140 self._send(b'* CAPABILITY IMAP4rev1\r\n') 163 caps = 'IMAP4rev1 ' + self.capabilities if self.capabilities else 'IMAP4 rev1'
141 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode('ASCII')) 164 self._send_textline('* CAPABILITY ' + caps)
165 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
142 166
143 def cmd_LOGOUT(self, tag, args): 167 def cmd_LOGOUT(self, tag, args):
144 self.server.logged = None 168 self.server.logged = None
145 self._send('{} OK LOGOUT\r\n'.format(tag).encode('ASCII')) 169 self._send_textline('* BYE IMAP4ref1 Server logging out')
170 self._send_tagged(tag, 'OK', 'LOGOUT completed')
146 171
147 def cmd_LOGIN(self, tag, args): 172 def cmd_LOGIN(self, tag, args):
148 self.server.logged = args[0] 173 self.server.logged = args[0]
149 self._send('{} OK LOGIN\r\n'.format(tag).encode('ASCII')) 174 self._send_tagged(tag, 'OK', 'LOGIN completed')
150 175
151 176
152 class BaseThreadedNetworkedTests(unittest.TestCase): 177 class BaseThreadedNetworkedTests(unittest.TestCase):
153 178
154 def make_server(self, addr, hdlr): 179 def make_server(self, addr, hdlr):
155 180
156 class MyServer(self.server_class): 181 class MyServer(self.server_class):
157 def handle_error(self, request, client_address): 182 def handle_error(self, request, client_address):
158 self.close_request(request) 183 self.close_request(request)
159 self.server_close() 184 self.server_close()
(...skipping 29 matching lines...) Expand all
189 if verbose: print("done") 214 if verbose: print("done")
190 215
191 @contextmanager 216 @contextmanager
192 def reaped_server(self, hdlr): 217 def reaped_server(self, hdlr):
193 server, thread = self.make_server((support.HOST, 0), hdlr) 218 server, thread = self.make_server((support.HOST, 0), hdlr)
194 try: 219 try:
195 yield server 220 yield server
196 finally: 221 finally:
197 self.reap_server(server, thread) 222 self.reap_server(server, thread)
198 223
224 @contextmanager
225 def reaped_pair(self, hdlr):
226 with self.reaped_server(hdlr) as server:
227 client = self.imap_class(*server.server_address)
228 try:
229 yield server, client
230 finally:
231 client.logout()
232
199 @reap_threads 233 @reap_threads
200 def test_connect(self): 234 def test_connect(self):
201 with self.reaped_server(SimpleIMAPHandler) as server: 235 with self.reaped_server(SimpleIMAPHandler) as server:
202 client = self.imap_class(*server.server_address) 236 client = self.imap_class(*server.server_address)
203 client.shutdown() 237 client.shutdown()
204 238
205 @reap_threads 239 @reap_threads
206 def test_issue5949(self): 240 def test_issue5949(self):
207 241
208 class EOFHandler(socketserver.StreamRequestHandler): 242 class EOFHandler(socketserver.StreamRequestHandler):
209 def handle(self): 243 def handle(self):
210 # EOF without sending a complete welcome message. 244 # EOF without sending a complete welcome message.
211 self.wfile.write(b'* OK') 245 self.wfile.write(b'* OK')
212 246
213 with self.reaped_server(EOFHandler) as server: 247 with self.reaped_server(EOFHandler) as server:
214 self.assertRaises(imaplib.IMAP4.abort, 248 self.assertRaises(imaplib.IMAP4.abort,
215 self.imap_class, *server.server_address) 249 self.imap_class, *server.server_address)
216 250
217 @reap_threads 251 @reap_threads
218 def test_line_termination(self): 252 def test_line_termination(self):
219 253
220 class BadNewlineHandler(SimpleIMAPHandler): 254 class BadNewlineHandler(SimpleIMAPHandler):
221 255
222 def cmd_CAPABILITY(self, tag, args): 256 def cmd_CAPABILITY(self, tag, args):
223 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n') 257 self._send(b'* CAPABILITY IMAP4rev1 AUTH\n')
224 self._send('{} OK CAPABILITY completed\r\n'.format(tag).encode(' ASCII')) 258 self._send_tagged(tag, 'OK', 'CAPABILITY completed')
225 259
226 with self.reaped_server(BadNewlineHandler) as server: 260 with self.reaped_server(BadNewlineHandler) as server:
227 self.assertRaises(imaplib.IMAP4.abort, 261 self.assertRaises(imaplib.IMAP4.abort,
228 self.imap_class, *server.server_address) 262 self.imap_class, *server.server_address)
229 263
230 @reap_threads 264 @reap_threads
265 def test_bad_auth_name(self):
266
267 class MyServer(SimpleIMAPHandler):
268
269 def cmd_AUTHENTICATE(self, tag, args):
270 self._send_tagged(tag, 'NO', 'unrecognized authentication '
271 'type {}'.format(args[0]))
272
273 with self.reaped_pair(MyServer) as (server, client):
274 with self.assertRaises(imaplib.IMAP4.error):
275 client.authenticate('METHOD', lambda: 1)
276
277 @reap_threads
278 def test_invalid_authentication(self):
279
280 class MyServer(SimpleIMAPHandler):
281
282 def cmd_AUTHENTICATE(self, tag, args):
283 self._send_textline('+')
284 self.response = yield
285 self._send_tagged(tag, 'NO', '[AUTHENTICATIONFAILED] invalid')
286
287 with self.reaped_pair(MyServer) as (server, client):
288 with self.assertRaises(imaplib.IMAP4.error):
289 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
290
291 @reap_threads
292 def test_valid_authentication(self):
293
294 class MyServer(SimpleIMAPHandler):
295
296 def cmd_AUTHENTICATE(self, tag, args):
297 self._send_textline('+')
298 self.server.response = yield
299 self._send_tagged(tag, 'OK', 'FAKEAUTH successful')
300
301 with self.reaped_pair(MyServer) as (server, client):
302 code, data = client.authenticate('MYAUTH', lambda x: b'fake')
303 self.assertEqual(code, 'OK')
304 self.assertEqual(server.response,
305 b'ZmFrZQ==\r\n') #b64 encoded 'fake'
306
307 with self.reaped_pair(MyServer) as (server, client):
308 code, data = client.authenticate('MYAUTH', lambda x: 'fake')
309 self.assertEqual(code, 'OK')
310 self.assertEqual(server.response,
311 b'ZmFrZQ==\r\n') #b64 encoded 'fake'
312
313 @reap_threads
314 def test_login_cram_md5(self):
315
316 class AuthHandler(SimpleIMAPHandler):
317
318 capabilities = 'LOGINDISABLED AUTH=CRAM-MD5'
319
320 def cmd_AUTHENTICATE(self, tag, args):
321 self._send_textline('+ PDE4OTYuNjk3MTcwOTUyQHBvc3RvZmZpY2Uucm'
322 'VzdG9uLm1jaS5uZXQ=')
323 r = yield
324 if r == b'dGltIGYxY2E2YmU0NjRiOWVmYTFjY2E2ZmZkNmNmMmQ5ZjMy\r\n' :
325 self._send_tagged(tag, 'OK', 'CRAM-MD5 successful')
326 else:
327 self._send_tagged(tag, 'NO', 'No access')
328
329 with self.reaped_pair(AuthHandler) as (server, client):
330 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
331 ret, data = client.login_cram_md5("tim", "tanstaaftanstaaf")
332 self.assertEqual(ret, "OK")
333
334 with self.reaped_pair(AuthHandler) as (server, client):
335 self.assertTrue('AUTH=CRAM-MD5' in client.capabilities)
336 ret, data = client.login_cram_md5("tim", b"tanstaaftanstaaf")
337 self.assertEqual(ret, "OK")
338
339
340 def test_linetoolong(self):
341 class TooLongHandler(SimpleIMAPHandler):
342 def handle(self):
343 # Send a very long response line
344 self.wfile.write(b'* OK ' + imaplib._MAXLINE*b'x' + b'\r\n')
345
346 with self.reaped_server(TooLongHandler) as server:
347 self.assertRaises(imaplib.IMAP4.error,
348 self.imap_class, *server.server_address)
349
350 @reap_threads
231 def test_simple_with_statement(self): 351 def test_simple_with_statement(self):
232 # simplest call 352 # simplest call
233 with self.reaped_server(SimpleIMAPHandler) as server: 353 with self.reaped_server(SimpleIMAPHandler) as server:
234 with self.imap_class(*server.server_address) as imap: 354 with self.imap_class(*server.server_address):
235 pass 355 pass
236 356
237 @reap_threads 357 @reap_threads
238 def test_with_statement(self): 358 def test_with_statement(self):
239 # doing something
240 with self.reaped_server(SimpleIMAPHandler) as server: 359 with self.reaped_server(SimpleIMAPHandler) as server:
241 with self.imap_class(*server.server_address) as imap: 360 with self.imap_class(*server.server_address) as imap:
242 imap.login('user', 'pass') 361 imap.login('user', 'pass')
243 self.assertEqual(server.logged, b'user') 362 self.assertEqual(server.logged, 'user')
244 self.assertIsNone(server.logged) 363 self.assertIsNone(server.logged)
245 364
246 @reap_threads 365 @reap_threads
247 def test_with_statement_logout(self): 366 def test_with_statement_logout(self):
248 # what happens if already logout in the block ? 367 # what happens if already logout in the block?
249 with self.reaped_server(SimpleIMAPHandler) as server: 368 with self.reaped_server(SimpleIMAPHandler) as server:
250 with self.imap_class(*server.server_address) as imap: 369 with self.imap_class(*server.server_address) as imap:
251 imap.login('user', 'pass') 370 imap.login('user', 'pass')
252 self.assertEqual(server.logged, b'user') 371 self.assertEqual(server.logged, 'user')
253 imap.logout() 372 imap.logout()
254 self.assertIsNone(server.logged) 373 self.assertIsNone(server.logged)
255 self.assertIsNone(server.logged) 374 self.assertIsNone(server.logged)
256 375
257 376
258 class ThreadedNetworkedTests(BaseThreadedNetworkedTests): 377 class ThreadedNetworkedTests(BaseThreadedNetworkedTests):
259 378
260 server_class = socketserver.TCPServer 379 server_class = socketserver.TCPServer
261 imap_class = imaplib.IMAP4 380 imap_class = imaplib.IMAP4
262 381
263 382
264 @unittest.skipUnless(ssl, "SSL not available") 383 @unittest.skipUnless(ssl, "SSL not available")
265 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests): 384 class ThreadedNetworkedTestsSSL(BaseThreadedNetworkedTests):
266 385
267 server_class = SecureTCPServer 386 server_class = SecureTCPServer
268 imap_class = IMAP4_SSL 387 imap_class = IMAP4_SSL
388
389 @reap_threads
390 @unittest.skipUnless(HAS_SNI, 'No SNI support in ssl module')
391 def test_ssl_verified(self):
392 ssl_context = ssl.SSLContext(ssl.PROTOCOL_SSLv23)
393 ssl_context.verify_mode = ssl.CERT_REQUIRED
394 ssl_context.check_hostname = True
395 ssl_context.load_verify_locations(CAFILE)
396
397 with self.assertRaisesRegex(ssl.CertificateError,
398 "hostname '127.0.0.1' doesn't match 'localho st'"):
399 with self.reaped_server(SimpleIMAPHandler) as server:
400 client = self.imap_class(*server.server_address,
401 ssl_context=ssl_context)
402 client.shutdown()
403
404 with self.reaped_server(SimpleIMAPHandler) as server:
405 client = self.imap_class("localhost", server.server_address[1],
406 ssl_context=ssl_context)
407 client.shutdown()
269 408
270 409
271 class RemoteIMAPTest(unittest.TestCase): 410 class RemoteIMAPTest(unittest.TestCase):
272 host = 'cyrus.andrew.cmu.edu' 411 host = 'cyrus.andrew.cmu.edu'
273 port = 143 412 port = 143
274 username = 'anonymous' 413 username = 'anonymous'
275 password = 'pass' 414 password = 'pass'
276 imap_class = imaplib.IMAP4 415 imap_class = imaplib.IMAP4
277 416
278 def setUp(self): 417 def setUp(self):
(...skipping 88 matching lines...) Expand 10 before | Expand all | Expand 10 after
367 with transient_internet(self.host): 506 with transient_internet(self.host):
368 self.assertRaises(ValueError, self.imap_class, self.host, self.port, 507 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
369 certfile=CERTFILE, ssl_context=self.create_ssl_con text()) 508 certfile=CERTFILE, ssl_context=self.create_ssl_con text())
370 509
371 def test_ssl_context_keyfile_exclusive(self): 510 def test_ssl_context_keyfile_exclusive(self):
372 with transient_internet(self.host): 511 with transient_internet(self.host):
373 self.assertRaises(ValueError, self.imap_class, self.host, self.port, 512 self.assertRaises(ValueError, self.imap_class, self.host, self.port,
374 keyfile=CERTFILE, ssl_context=self.create_ssl_cont ext()) 513 keyfile=CERTFILE, ssl_context=self.create_ssl_cont ext())
375 514
376 515
377 def test_main(): 516 def load_tests(*args):
378 tests = [TestImaplib] 517 tests = [TestImaplib]
379 518
380 if support.is_resource_enabled('network'): 519 if support.is_resource_enabled('network'):
381 if ssl: 520 if ssl:
382 global CERTFILE 521 global CERTFILE, CAFILE
383 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, 522 CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
384 "keycert.pem") 523 "keycert3.pem")
385 if not os.path.exists(CERTFILE): 524 if not os.path.exists(CERTFILE):
386 raise support.TestFailed("Can't read certificate files!") 525 raise support.TestFailed("Can't read certificate files!")
526 CAFILE = os.path.join(os.path.dirname(__file__) or os.curdir,
527 "pycacert.pem")
528 if not os.path.exists(CAFILE):
529 raise support.TestFailed("Can't read CA file!")
387 tests.extend([ 530 tests.extend([
388 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL, 531 ThreadedNetworkedTests, ThreadedNetworkedTestsSSL,
389 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest, 532 RemoteIMAPTest, RemoteIMAP_SSLTest, RemoteIMAP_STARTTLSTest,
390 ]) 533 ])
391 534
392 support.run_unittest(*tests) 535 return unittest.TestSuite([unittest.makeSuite(test) for test in tests])
393 536
394 537
395 if __name__ == "__main__": 538 if __name__ == "__main__":
396 support.use_resources = ['network'] 539 unittest.main()
397 test_main()
LEFTRIGHT

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+