Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(2)

Side by Side Diff: Lib/test/test_smtplib.py

Issue 10639: reindent.py converts newlines to platform default
Patch Set: Created 8 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/test/test_signal.py ('k') | Lib/test/test_socketserver.py » ('j') | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 import asyncore 1 import asyncore
2 import email.mime.text 2 import email.mime.text
3 import email.utils 3 import email.utils
4 import socket 4 import socket
5 import smtpd 5 import smtpd
6 import smtplib 6 import smtplib
7 import io 7 import io
8 import re 8 import re
9 import sys 9 import sys
10 import time 10 import time
(...skipping 274 matching lines...) Expand 10 before | Expand all | Expand 10 after
285 smtp.sendmail('John', 'Sally', m) 285 smtp.sendmail('John', 'Sally', m)
286 # XXX (see comment in testSend) 286 # XXX (see comment in testSend)
287 time.sleep(0.01) 287 time.sleep(0.01)
288 smtp.quit() 288 smtp.quit()
289 289
290 self.client_evt.set() 290 self.client_evt.set()
291 self.serv_evt.wait() 291 self.serv_evt.wait()
292 self.output.flush() 292 self.output.flush()
293 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 293 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
294 self.assertEqual(self.output.getvalue(), mexpect) 294 self.assertEqual(self.output.getvalue(), mexpect)
295
296 def testSendNullSender(self):
297 m = 'A test message'
298 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
299 smtp.sendmail('<>', 'Sally', m)
300 # XXX (see comment in testSend)
301 time.sleep(0.01)
302 smtp.quit()
303
304 self.client_evt.set()
305 self.serv_evt.wait()
306 self.output.flush()
307 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
308 self.assertEqual(self.output.getvalue(), mexpect)
309 debugout = smtpd.DEBUGSTREAM.getvalue()
310 sender = re.compile("^sender: <>$", re.MULTILINE)
311 self.assertRegex(debugout, sender)
312 295
313 def testSendMessage(self): 296 def testSendMessage(self):
314 m = email.mime.text.MIMEText('A test message') 297 m = email.mime.text.MIMEText('A test message')
315 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3) 298 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
316 smtp.send_message(m, from_addr='John', to_addrs='Sally') 299 smtp.send_message(m, from_addr='John', to_addrs='Sally')
317 # XXX (see comment in testSend) 300 # XXX (see comment in testSend)
318 time.sleep(0.01) 301 time.sleep(0.01)
319 smtp.quit() 302 smtp.quit()
320 303
321 self.client_evt.set() 304 self.client_evt.set()
322 self.serv_evt.wait() 305 self.serv_evt.wait()
323 self.output.flush() 306 self.output.flush()
324 # Add the X-Peer header that DebuggingServer adds 307 # Add the X-Peer header that DebuggingServer adds
325 m['X-Peer'] = socket.gethostbyname('localhost') 308 m['X-Peer'] = socket.gethostbyname('localhost')
326 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 309 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
327 self.assertEqual(self.output.getvalue(), mexpect) 310 self.assertEqual(self.output.getvalue(), mexpect)
328 311
329 def testSendMessageWithAddresses(self): 312 def testSendMessageWithAddresses(self):
330 m = email.mime.text.MIMEText('A test message') 313 m = email.mime.text.MIMEText('A test message')
331 m['From'] = 'foo@bar.com' 314 m['From'] = 'foo@bar.com'
332 m['To'] = 'John' 315 m['To'] = 'John'
333 m['CC'] = 'Sally, Fred' 316 m['CC'] = 'Sally, Fred'
334 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.c om>' 317 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.c om>'
335 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3) 318 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
336 smtp.send_message(m) 319 smtp.send_message(m)
337 # XXX (see comment in testSend) 320 # XXX (see comment in testSend)
338 time.sleep(0.01) 321 time.sleep(0.01)
339 smtp.quit() 322 smtp.quit()
340 # make sure the Bcc header is still in the message.
341 self.assertEqual(m['Bcc'], 'John Root <root@localhost>, "Dinsdale" '
342 '<warped@silly.walks.com>')
343 323
344 self.client_evt.set() 324 self.client_evt.set()
345 self.serv_evt.wait() 325 self.serv_evt.wait()
346 self.output.flush() 326 self.output.flush()
347 # Add the X-Peer header that DebuggingServer adds 327 # Add the X-Peer header that DebuggingServer adds
348 m['X-Peer'] = socket.gethostbyname('localhost') 328 m['X-Peer'] = socket.gethostbyname('localhost')
349 # The Bcc header should not be transmitted. 329 # The Bcc header is deleted before serialization.
350 del m['Bcc'] 330 del m['Bcc']
351 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 331 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
352 self.assertEqual(self.output.getvalue(), mexpect) 332 self.assertEqual(self.output.getvalue(), mexpect)
353 debugout = smtpd.DEBUGSTREAM.getvalue() 333 debugout = smtpd.DEBUGSTREAM.getvalue()
354 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 334 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
355 self.assertRegex(debugout, sender) 335 self.assertRegex(debugout, sender)
356 for addr in ('John', 'Sally', 'Fred', 'root@localhost', 336 for addr in ('John', 'Sally', 'Fred', 'root@localhost',
357 'warped@silly.walks.com'): 337 'warped@silly.walks.com'):
358 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 338 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
359 re.MULTILINE) 339 re.MULTILINE)
(...skipping 18 matching lines...) Expand all
378 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END) 358 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
379 self.assertEqual(self.output.getvalue(), mexpect) 359 self.assertEqual(self.output.getvalue(), mexpect)
380 debugout = smtpd.DEBUGSTREAM.getvalue() 360 debugout = smtpd.DEBUGSTREAM.getvalue()
381 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE) 361 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
382 self.assertRegex(debugout, sender) 362 self.assertRegex(debugout, sender)
383 for addr in ('John', 'Dinsdale'): 363 for addr in ('John', 'Dinsdale'):
384 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr), 364 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
385 re.MULTILINE) 365 re.MULTILINE)
386 self.assertRegex(debugout, to_addr) 366 self.assertRegex(debugout, to_addr)
387 367
388 def testSendMessageWithSpecifiedAddresses(self):
389 # Make sure addresses specified in call override those in message.
390 m = email.mime.text.MIMEText('A test message')
391 m['From'] = 'foo@bar.com'
392 m['To'] = 'John, Dinsdale'
393 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
394 smtp.send_message(m, from_addr='joe@example.com', to_addrs='foo@example. net')
395 # XXX (see comment in testSend)
396 time.sleep(0.01)
397 smtp.quit()
398
399 self.client_evt.set()
400 self.serv_evt.wait()
401 self.output.flush()
402 # Add the X-Peer header that DebuggingServer adds
403 m['X-Peer'] = socket.gethostbyname('localhost')
404 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
405 self.assertEqual(self.output.getvalue(), mexpect)
406 debugout = smtpd.DEBUGSTREAM.getvalue()
407 sender = re.compile("^sender: joe@example.com$", re.MULTILINE)
408 self.assertRegex(debugout, sender)
409 for addr in ('John', 'Dinsdale'):
410 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
411 re.MULTILINE)
412 self.assertNotRegex(debugout, to_addr)
413 recip = re.compile(r"^recips: .*'foo@example.net'.*$", re.MULTILINE)
414 self.assertRegex(debugout, recip)
415
416 def testSendMessageWithMultipleFrom(self):
417 # Sender overrides To
418 m = email.mime.text.MIMEText('A test message')
419 m['From'] = 'Bernard, Bianca'
420 m['Sender'] = 'the_rescuers@Rescue-Aid-Society.com'
421 m['To'] = 'John, Dinsdale'
422 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
423 smtp.send_message(m)
424 # XXX (see comment in testSend)
425 time.sleep(0.01)
426 smtp.quit()
427
428 self.client_evt.set()
429 self.serv_evt.wait()
430 self.output.flush()
431 # Add the X-Peer header that DebuggingServer adds
432 m['X-Peer'] = socket.gethostbyname('localhost')
433 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
434 self.assertEqual(self.output.getvalue(), mexpect)
435 debugout = smtpd.DEBUGSTREAM.getvalue()
436 sender = re.compile("^sender: the_rescuers@Rescue-Aid-Society.com$", re. MULTILINE)
437 self.assertRegex(debugout, sender)
438 for addr in ('John', 'Dinsdale'):
439 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
440 re.MULTILINE)
441 self.assertRegex(debugout, to_addr)
442
443 def testSendMessageResent(self):
444 m = email.mime.text.MIMEText('A test message')
445 m['From'] = 'foo@bar.com'
446 m['To'] = 'John'
447 m['CC'] = 'Sally, Fred'
448 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.c om>'
449 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
450 m['Resent-From'] = 'holy@grail.net'
451 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
452 m['Resent-Bcc'] = 'doe@losthope.net'
453 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
454 smtp.send_message(m)
455 # XXX (see comment in testSend)
456 time.sleep(0.01)
457 smtp.quit()
458
459 self.client_evt.set()
460 self.serv_evt.wait()
461 self.output.flush()
462 # The Resent-Bcc headers are deleted before serialization.
463 del m['Bcc']
464 del m['Resent-Bcc']
465 # Add the X-Peer header that DebuggingServer adds
466 m['X-Peer'] = socket.gethostbyname('localhost')
467 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
468 self.assertEqual(self.output.getvalue(), mexpect)
469 debugout = smtpd.DEBUGSTREAM.getvalue()
470 sender = re.compile("^sender: holy@grail.net$", re.MULTILINE)
471 self.assertRegex(debugout, sender)
472 for addr in ('my_mom@great.cooker.com', 'Jeff', 'doe@losthope.net'):
473 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
474 re.MULTILINE)
475 self.assertRegex(debugout, to_addr)
476
477 def testSendMessageMultipleResentRaises(self):
478 m = email.mime.text.MIMEText('A test message')
479 m['From'] = 'foo@bar.com'
480 m['To'] = 'John'
481 m['CC'] = 'Sally, Fred'
482 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.c om>'
483 m['Resent-Date'] = 'Thu, 1 Jan 1970 17:42:00 +0000'
484 m['Resent-From'] = 'holy@grail.net'
485 m['Resent-To'] = 'Martha <my_mom@great.cooker.com>, Jeff'
486 m['Resent-Bcc'] = 'doe@losthope.net'
487 m['Resent-Date'] = 'Thu, 2 Jan 1970 17:42:00 +0000'
488 m['Resent-To'] = 'holy@grail.net'
489 m['Resent-From'] = 'Martha <my_mom@great.cooker.com>, Jeff'
490 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
491 with self.assertRaises(ValueError):
492 smtp.send_message(m)
493 smtp.close()
494 368
495 class NonConnectingTests(unittest.TestCase): 369 class NonConnectingTests(unittest.TestCase):
496 370
497 def setUp(self): 371 def setUp(self):
498 smtplib.socket = mock_socket 372 smtplib.socket = mock_socket
499 373
500 def tearDown(self): 374 def tearDown(self):
501 smtplib.socket = socket 375 smtplib.socket = socket
502 376
503 def testNotConnected(self): 377 def testNotConnected(self):
(...skipping 29 matching lines...) Expand all
533 def tearDown(self): 407 def tearDown(self):
534 smtplib.socket = socket 408 smtplib.socket = socket
535 sys.stdout = self.old_stdout 409 sys.stdout = self.old_stdout
536 410
537 def testFailingHELO(self): 411 def testFailingHELO(self):
538 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP, 412 self.assertRaises(smtplib.SMTPConnectError, smtplib.SMTP,
539 HOST, self.port, 'localhost', 3) 413 HOST, self.port, 'localhost', 3)
540 414
541 415
542 sim_users = {'Mr.A@somewhere.com':'John A', 416 sim_users = {'Mr.A@somewhere.com':'John A',
543 'Ms.B@xn--fo-fka.com':'Sally B', 417 'Ms.B@somewhere.com':'Sally B',
544 'Mrs.C@somewhereesle.com':'Ruth C', 418 'Mrs.C@somewhereesle.com':'Ruth C',
545 } 419 }
546 420
547 sim_auth = ('Mr.A@somewhere.com', 'somepassword') 421 sim_auth = ('Mr.A@somewhere.com', 'somepassword')
548 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn' 422 sim_cram_md5_challenge = ('PENCeUxFREJoU0NnbmhNWitOMjNGNn'
549 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=') 423 'dAZWx3b29kLmlubm9zb2Z0LmNvbT4=')
550 sim_auth_credentials = { 424 sim_auth_credentials = {
551 'login': 'TXIuQUBzb21ld2hlcmUuY29t', 425 'login': 'TXIuQUBzb21ld2hlcmUuY29t',
552 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=', 426 'plain': 'AE1yLkFAc29tZXdoZXJlLmNvbQBzb21lcGFzc3dvcmQ=',
553 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ' 427 'cram-md5': ('TXIUQUBZB21LD2HLCMUUY29TIDG4OWQ0MJ'
554 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'), 428 'KWZGQ4ODNMNDA4NTGXMDRLZWMYZJDMODG1'),
555 } 429 }
556 sim_auth_login_password = 'C29TZXBHC3N3B3JK' 430 sim_auth_login_password = 'C29TZXBHC3N3B3JK'
557 431
558 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'], 432 sim_lists = {'list-1':['Mr.A@somewhere.com','Mrs.C@somewhereesle.com'],
559 'list-2':['Ms.B@xn--fo-fka.com',], 433 'list-2':['Ms.B@somewhere.com',],
560 } 434 }
561 435
562 # Simulated SMTP channel & server 436 # Simulated SMTP channel & server
563 class SimSMTPChannel(smtpd.SMTPChannel): 437 class SimSMTPChannel(smtpd.SMTPChannel):
564 438
565 # For testing failures in QUIT when using the context manager API. 439 # For testing failures in QUIT when using the context manager API.
566 quit_response = None 440 quit_response = None
567 441
568 def __init__(self, extra_features, *args, **kw): 442 def __init__(self, extra_features, *args, **kw):
569 self._extrafeatures = ''.join( 443 self._extrafeatures = ''.join(
570 [ "250-{0}\r\n".format(x) for x in extra_features ]) 444 [ "250-{0}\r\n".format(x) for x in extra_features ])
571 super(SimSMTPChannel, self).__init__(*args, **kw) 445 super(SimSMTPChannel, self).__init__(*args, **kw)
572 446
573 def smtp_EHLO(self, arg): 447 def smtp_EHLO(self, arg):
574 resp = ('250-testhost\r\n' 448 resp = ('250-testhost\r\n'
575 '250-EXPN\r\n' 449 '250-EXPN\r\n'
576 '250-SIZE 20000000\r\n' 450 '250-SIZE 20000000\r\n'
577 '250-STARTTLS\r\n' 451 '250-STARTTLS\r\n'
578 '250-DELIVERBY\r\n') 452 '250-DELIVERBY\r\n')
579 resp = resp + self._extrafeatures + '250 HELP' 453 resp = resp + self._extrafeatures + '250 HELP'
580 self.push(resp) 454 self.push(resp)
581 455
582 def smtp_VRFY(self, arg): 456 def smtp_VRFY(self, arg):
583 # For max compatibility smtplib should be sending the raw address. 457 raw_addr = email.utils.parseaddr(arg)[1]
584 if arg in sim_users: 458 quoted_addr = smtplib.quoteaddr(arg)
585 self.push('250 %s %s' % (sim_users[arg], smtplib.quoteaddr(arg))) 459 if raw_addr in sim_users:
460 self.push('250 %s %s' % (sim_users[raw_addr], quoted_addr))
586 else: 461 else:
587 self.push('550 No such user: %s' % arg) 462 self.push('550 No such user: %s' % arg)
588 463
589 def smtp_EXPN(self, arg): 464 def smtp_EXPN(self, arg):
590 list_name = arg.lower() 465 list_name = email.utils.parseaddr(arg)[1].lower()
591 if list_name in sim_lists: 466 if list_name in sim_lists:
592 user_list = sim_lists[list_name] 467 user_list = sim_lists[list_name]
593 for n, user_email in enumerate(user_list): 468 for n, user_email in enumerate(user_list):
594 quoted_addr = smtplib.quoteaddr(user_email) 469 quoted_addr = smtplib.quoteaddr(user_email)
595 if n < len(user_list) - 1: 470 if n < len(user_list) - 1:
596 self.push('250-%s %s' % (sim_users[user_email], quoted_addr) ) 471 self.push('250-%s %s' % (sim_users[user_email], quoted_addr) )
597 else: 472 else:
598 self.push('250 %s %s' % (sim_users[user_email], quoted_addr) ) 473 self.push('250 %s %s' % (sim_users[user_email], quoted_addr) )
599 else: 474 else:
600 self.push('550 No access for you!') 475 self.push('550 No access for you!')
(...skipping 111 matching lines...) Expand 10 before | Expand all | Expand 10 after
712 def testVRFY(self): 587 def testVRFY(self):
713 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =15) 588 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =15)
714 589
715 for email, name in sim_users.items(): 590 for email, name in sim_users.items():
716 expected_known = (250, bytes('%s %s' % 591 expected_known = (250, bytes('%s %s' %
717 (name, smtplib.quoteaddr(email)), 592 (name, smtplib.quoteaddr(email)),
718 "ascii")) 593 "ascii"))
719 self.assertEqual(smtp.vrfy(email), expected_known) 594 self.assertEqual(smtp.vrfy(email), expected_known)
720 595
721 u = 'nobody@nowhere.com' 596 u = 'nobody@nowhere.com'
722 expected_unknown = (550, ('No such user: %s' % u).encode('ascii')) 597 expected_unknown = (550, ('No such user: %s'
598 % smtplib.quoteaddr(u)).encode('ascii'))
723 self.assertEqual(smtp.vrfy(u), expected_unknown) 599 self.assertEqual(smtp.vrfy(u), expected_unknown)
724 smtp.quit() 600 smtp.quit()
725 601
726 def testEXPN(self): 602 def testEXPN(self):
727 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =15) 603 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =15)
728 604
729 for listname, members in sim_lists.items(): 605 for listname, members in sim_lists.items():
730 users = [] 606 users = []
731 for m in members: 607 for m in members:
732 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m))) 608 users.append('%s %s' % (sim_users[m], smtplib.quoteaddr(m)))
(...skipping 61 matching lines...) Expand 10 before | Expand all | Expand 10 after
794 #test infrastructure can support it. 670 #test infrastructure can support it.
795 671
796 672
797 def test_main(verbose=None): 673 def test_main(verbose=None):
798 support.run_unittest(GeneralTests, DebuggingServerTests, 674 support.run_unittest(GeneralTests, DebuggingServerTests,
799 NonConnectingTests, 675 NonConnectingTests,
800 BadHELOServerTests, SMTPSimTests) 676 BadHELOServerTests, SMTPSimTests)
801 677
802 if __name__ == '__main__': 678 if __name__ == '__main__':
803 test_main() 679 test_main()
OLDNEW
« no previous file with comments | « Lib/test/test_signal.py ('k') | Lib/test/test_socketserver.py » ('j') | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+