Rietveld Code Review Tool
Help | Bug tracker | Discussion group | Source code | Sign in
(16654)

Side by Side Diff: Lib/test/test_smtplib.py

Issue 10321: Add support for Message objects and binary data to smtplib.sendmail
Patch Set: Created 8 years, 10 months ago
Left:
Right:
Use n/p to move between diff chunks; N/P to move between comments. Please Sign in to add in-line comments.
Jump to:
View unified diff | Download patch
« no previous file with comments | « Lib/smtplib.py ('k') | no next file » | no next file with comments »
Toggle Intra-line Diffs ('i') | Expand Comments ('e') | Collapse Comments ('c') | Show Comments Hide Comments ('s')
OLDNEW
1 import asyncore 1 import asyncore
2 import email.mime.text
2 import email.utils 3 import email.utils
3 import socket 4 import socket
4 import smtpd 5 import smtpd
5 import smtplib 6 import smtplib
6 import io 7 import io
8 import re
7 import sys 9 import sys
8 import time 10 import time
9 import select 11 import select
10 12
11 import unittest 13 import unittest
12 from test import support, mock_socket 14 from test import support, mock_socket
13 15
14 try: 16 try:
15 import threading 17 import threading
16 except ImportError: 18 except ImportError:
(...skipping 32 matching lines...) Expand 10 before | Expand all | Expand 10 after
49 evt.set() 51 evt.set()
50 52
51 class GeneralTests(unittest.TestCase): 53 class GeneralTests(unittest.TestCase):
52 54
53 def setUp(self): 55 def setUp(self):
54 smtplib.socket = mock_socket 56 smtplib.socket = mock_socket
55 self.port = 25 57 self.port = 25
56 58
57 def tearDown(self): 59 def tearDown(self):
58 smtplib.socket = socket 60 smtplib.socket = socket
61
62 # This method is no longer used but is retained for backward compatibility,
63 # so test to make sure it still works.
64 def testQuoteData(self):
65 teststr = "abc\n.jkl\rfoo\r\n..blue"
66 expected = "abc\r\n..jkl\r\nfoo\r\n...blue"
67 self.assertEqual(expected, smtplib.quotedata(teststr))
59 68
60 def testBasic1(self): 69 def testBasic1(self):
61 mock_socket.reply_with(b"220 Hola mundo") 70 mock_socket.reply_with(b"220 Hola mundo")
62 # connects 71 # connects
63 smtp = smtplib.SMTP(HOST, self.port) 72 smtp = smtplib.SMTP(HOST, self.port)
64 smtp.close() 73 smtp.close()
65 74
66 def testBasic2(self): 75 def testBasic2(self):
67 mock_socket.reply_with(b"220 Hola mundo") 76 mock_socket.reply_with(b"220 Hola mundo")
68 # connects, include port in host name 77 # connects, include port in host name
(...skipping 73 matching lines...) Expand 10 before | Expand all | Expand 10 after
142 MSG_END = '------------ END MESSAGE ------------\n' 151 MSG_END = '------------ END MESSAGE ------------\n'
143 152
144 # NOTE: Some SMTP objects in the tests below are created with a non-default 153 # NOTE: Some SMTP objects in the tests below are created with a non-default
145 # local_hostname argument to the constructor, since (on some systems) the FQDN 154 # local_hostname argument to the constructor, since (on some systems) the FQDN
146 # lookup caused by the default local_hostname sometimes takes so long that the 155 # lookup caused by the default local_hostname sometimes takes so long that the
147 # test server times out, causing the test to fail. 156 # test server times out, causing the test to fail.
148 157
149 # Test behavior of smtpd.DebuggingServer 158 # Test behavior of smtpd.DebuggingServer
150 @unittest.skipUnless(threading, 'Threading required for this test.') 159 @unittest.skipUnless(threading, 'Threading required for this test.')
151 class DebuggingServerTests(unittest.TestCase): 160 class DebuggingServerTests(unittest.TestCase):
161
162 maxDiff = None
152 163
153 def setUp(self): 164 def setUp(self):
154 self.real_getfqdn = socket.getfqdn 165 self.real_getfqdn = socket.getfqdn
155 socket.getfqdn = mock_socket.getfqdn 166 socket.getfqdn = mock_socket.getfqdn
156 # temporarily replace sys.stdout to capture DebuggingServer output 167 # temporarily replace sys.stdout to capture DebuggingServer output
157 self.old_stdout = sys.stdout 168 self.old_stdout = sys.stdout
158 self.output = io.StringIO() 169 self.output = io.StringIO()
159 sys.stdout = self.output 170 sys.stdout = self.output
160 171
161 self._threads = support.threading_setup() 172 self._threads = support.threading_setup()
162 self.serv_evt = threading.Event() 173 self.serv_evt = threading.Event()
163 self.client_evt = threading.Event() 174 self.client_evt = threading.Event()
175 # Capture SMTPChannel debug output
176 self.old_DEBUGSTREAM = smtpd.DEBUGSTREAM
177 smtpd.DEBUGSTREAM = io.StringIO()
164 # Pick a random unused port by passing 0 for the port number 178 # Pick a random unused port by passing 0 for the port number
165 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1)) 179 self.serv = smtpd.DebuggingServer((HOST, 0), ('nowhere', -1))
166 # Keep a note of what port was assigned 180 # Keep a note of what port was assigned
167 self.port = self.serv.socket.getsockname()[1] 181 self.port = self.serv.socket.getsockname()[1]
168 serv_args = (self.serv, self.serv_evt, self.client_evt) 182 serv_args = (self.serv, self.serv_evt, self.client_evt)
169 self.thread = threading.Thread(target=debugging_server, args=serv_args) 183 self.thread = threading.Thread(target=debugging_server, args=serv_args)
170 self.thread.start() 184 self.thread.start()
171 185
172 # wait until server thread has assigned a port number 186 # wait until server thread has assigned a port number
173 self.serv_evt.wait() 187 self.serv_evt.wait()
174 self.serv_evt.clear() 188 self.serv_evt.clear()
175 189
176 def tearDown(self): 190 def tearDown(self):
177 socket.getfqdn = self.real_getfqdn 191 socket.getfqdn = self.real_getfqdn
178 # indicate that the client is finished 192 # indicate that the client is finished
179 self.client_evt.set() 193 self.client_evt.set()
180 # wait for the server thread to terminate 194 # wait for the server thread to terminate
181 self.serv_evt.wait() 195 self.serv_evt.wait()
182 self.thread.join() 196 self.thread.join()
183 support.threading_cleanup(*self._threads) 197 support.threading_cleanup(*self._threads)
184 # restore sys.stdout 198 # restore sys.stdout
185 sys.stdout = self.old_stdout 199 sys.stdout = self.old_stdout
200 # restore DEBUGSTREAM
201 smtpd.DEBUGSTREAM.close()
202 smtpd.DEBUGSTREAM = self.old_DEBUGSTREAM
186 203
187 def testBasic(self): 204 def testBasic(self):
188 # connect 205 # connect
189 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3) 206 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
190 smtp.quit() 207 smtp.quit()
191 208
192 def testNOOP(self): 209 def testNOOP(self):
193 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3) 210 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
194 expected = (250, b'Ok') 211 expected = (250, b'Ok')
195 self.assertEqual(smtp.noop(), expected) 212 self.assertEqual(smtp.noop(), expected)
(...skipping 43 matching lines...) Expand 10 before | Expand all | Expand 10 after
239 # in asyncore. This sleep might help, but should really be fixed 256 # in asyncore. This sleep might help, but should really be fixed
240 # properly by using an Event variable. 257 # properly by using an Event variable.
241 time.sleep(0.01) 258 time.sleep(0.01)
242 smtp.quit() 259 smtp.quit()
243 260
244 self.client_evt.set() 261 self.client_evt.set()
245 self.serv_evt.wait() 262 self.serv_evt.wait()
246 self.output.flush() 263 self.output.flush()
247 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END) 264 mexpect = '%s%s\n%s' % (MSG_BEGIN, m, MSG_END)
248 self.assertEqual(self.output.getvalue(), mexpect) 265 self.assertEqual(self.output.getvalue(), mexpect)
266
267 def testSendBinary(self):
268 m = b'A test message'
269 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
270 smtp.sendmail('John', 'Sally', m)
271 # XXX (see comment in testSend)
272 time.sleep(0.01)
273 smtp.quit()
274
275 self.client_evt.set()
276 self.serv_evt.wait()
277 self.output.flush()
278 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.decode('ascii'), MSG_END)
279 self.assertEqual(self.output.getvalue(), mexpect)
280
281 def testSendMessage(self):
282 m = email.mime.text.MIMEText('A test message')
283 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
284 smtp.send_message(m, from_addr='John', to_addrs='Sally')
285 # XXX (see comment in testSend)
286 time.sleep(0.01)
287 smtp.quit()
288
289 self.client_evt.set()
290 self.serv_evt.wait()
291 self.output.flush()
292 # Add the X-Peer header that DebuggingServer adds
293 # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
294 m['X-Peer'] = '127.0.0.1'
295 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
296 self.assertEqual(self.output.getvalue(), mexpect)
297
298 def testSendMessageWithAddresses(self):
299 m = email.mime.text.MIMEText('A test message')
300 m['From'] = 'foo@bar.com'
301 m['To'] = 'John'
302 m['CC'] = 'Sally, Fred'
303 m['Bcc'] = 'John Root <root@localhost>, "Dinsdale" <warped@silly.walks.c om>'
304 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
305 smtp.send_message(m)
306 # XXX (see comment in testSend)
307 time.sleep(0.01)
308 smtp.quit()
309
310 self.client_evt.set()
311 self.serv_evt.wait()
312 self.output.flush()
313 # Add the X-Peer header that DebuggingServer adds
314 # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
315 m['X-Peer'] = '127.0.0.1'
316 # The Bcc header is deleted before serialization.
317 del m['Bcc']
318 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
319 self.assertEqual(self.output.getvalue(), mexpect)
320 debugout = smtpd.DEBUGSTREAM.getvalue()
321 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
322 self.assertRegexpMatches(debugout, sender)
323 for addr in ('John', 'Sally', 'Fred', 'root@localhost',
324 'warped@silly.walks.com'):
325 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
326 re.MULTILINE)
327 self.assertRegexpMatches(debugout, to_addr)
328
329 def testSendMessageWithSomeAddresses(self):
330 # Make sure nothing breaks if not all of the three 'to' headers exist
331 m = email.mime.text.MIMEText('A test message')
332 m['From'] = 'foo@bar.com'
333 m['To'] = 'John, Dinsdale'
334 smtp = smtplib.SMTP(HOST, self.port, local_hostname='localhost', timeout =3)
335 smtp.send_message(m)
336 # XXX (see comment in testSend)
337 time.sleep(0.01)
338 smtp.quit()
339
340 self.client_evt.set()
341 self.serv_evt.wait()
342 self.output.flush()
343 # Add the X-Peer header that DebuggingServer adds
344 # XXX: I'm not sure hardcoding this IP will work on linux-vserver.
345 m['X-Peer'] = '127.0.0.1'
346 mexpect = '%s%s\n%s' % (MSG_BEGIN, m.as_string(), MSG_END)
347 self.assertEqual(self.output.getvalue(), mexpect)
348 debugout = smtpd.DEBUGSTREAM.getvalue()
349 sender = re.compile("^sender: foo@bar.com$", re.MULTILINE)
350 self.assertRegexpMatches(debugout, sender)
351 for addr in ('John', 'Dinsdale'):
352 to_addr = re.compile(r"^recips: .*'{}'.*$".format(addr),
353 re.MULTILINE)
354 self.assertRegexpMatches(debugout, to_addr)
249 355
250 356
251 class NonConnectingTests(unittest.TestCase): 357 class NonConnectingTests(unittest.TestCase):
252 358
253 def setUp(self): 359 def setUp(self):
254 smtplib.socket = mock_socket 360 smtplib.socket = mock_socket
255 361
256 def tearDown(self): 362 def tearDown(self):
257 smtplib.socket = socket 363 smtplib.socket = socket
258 364
(...skipping 259 matching lines...) Expand 10 before | Expand all | Expand 10 after
518 #test infrastructure can support it. 624 #test infrastructure can support it.
519 625
520 626
521 def test_main(verbose=None): 627 def test_main(verbose=None):
522 support.run_unittest(GeneralTests, DebuggingServerTests, 628 support.run_unittest(GeneralTests, DebuggingServerTests,
523 NonConnectingTests, 629 NonConnectingTests,
524 BadHELOServerTests, SMTPSimTests) 630 BadHELOServerTests, SMTPSimTests)
525 631
526 if __name__ == '__main__': 632 if __name__ == '__main__':
527 test_main() 633 test_main()
OLDNEW
« no previous file with comments | « Lib/smtplib.py ('k') | no next file » | no next file with comments »

RSS Feeds Recent Issues | This issue
This is Rietveld 894c83f36cb7+