Index: Lib/test/test_smtplib.py =================================================================== --- Lib/test/test_smtplib.py (revision 61623) +++ Lib/test/test_smtplib.py (working copy) @@ -11,89 +11,64 @@ from unittest import TestCase from test import test_support +import mock_socket # PORT is used to communicate the port number assigned to the server # to the test client HOST = "localhost" PORT = None -def server(evt, buf): - serv = socket.socket(socket.AF_INET, socket.SOCK_STREAM) - serv.settimeout(15) - serv.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) - serv.bind(("", 0)) - global PORT - PORT = serv.getsockname()[1] - serv.listen(5) - evt.set() - try: - conn, addr = serv.accept() - except socket.timeout: - pass - else: - n = 500 - while buf and n > 0: - r, w, e = select.select([], [conn], []) - if w: - sent = conn.send(buf) - buf = buf[sent:] - - n -= 1 - - conn.close() - finally: - serv.close() - PORT = None - evt.set() - +def mock_getfqdn(): + """Fake out socket.getfqdn() to speed up tests.""" + return "localhost" + + class GeneralTests(TestCase): - + def setUp(self): - self.evt = threading.Event() - servargs = (self.evt, "220 Hola mundo\n") - threading.Thread(target=server, args=servargs).start() - self.evt.wait() - self.evt.clear() - + smtplib.socket = mock_socket + self.port = 25 + def tearDown(self): - self.evt.wait() + smtplib.socket = socket def testBasic1(self): + mock_socket.reply_with("220 Hola mundo\n") # connects - smtp = smtplib.SMTP(HOST, PORT) + smtp = smtplib.SMTP(HOST, self.port) smtp.sock.close() def testBasic2(self): # connects, include port in host name - smtp = smtplib.SMTP("%s:%s" % (HOST, PORT)) + smtp = smtplib.SMTP("%s:%s" % (HOST, self.port)) smtp.sock.close() def testLocalHostName(self): # check that supplied local_hostname is used - smtp = smtplib.SMTP(HOST, PORT, local_hostname="testhost") + smtp = smtplib.SMTP(HOST, self.port, local_hostname="testhost") self.assertEqual(smtp.local_hostname, "testhost") smtp.sock.close() def testTimeoutDefault(self): # default - smtp = smtplib.SMTP(HOST, PORT) + smtp = smtplib.SMTP(HOST, self.port) self.assertTrue(smtp.sock.gettimeout() is None) smtp.sock.close() def testTimeoutValue(self): # a value - smtp = smtplib.SMTP(HOST, PORT, timeout=30) + smtp = smtplib.SMTP(HOST, self.port, timeout=30) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.sock.close() def testTimeoutNone(self): # None, having other default - previous = socket.getdefaulttimeout() - socket.setdefaulttimeout(30) + previous = mock_socket.getdefaulttimeout() + mock_socket.setdefaulttimeout(30) try: - smtp = smtplib.SMTP(HOST, PORT, timeout=None) + smtp = smtplib.SMTP(HOST, self.port, timeout=None) finally: - socket.setdefaulttimeout(previous) + mock_socket.setdefaulttimeout(previous) self.assertEqual(smtp.sock.gettimeout(), 30) smtp.sock.close() @@ -146,6 +121,8 @@ class DebuggingServerTests(TestCase): def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_getfqdn # temporarily replace sys.stdout to capture DebuggingServer output self.old_stdout = sys.stdout self.output = StringIO.StringIO() @@ -161,6 +138,7 @@ self.serv_evt.clear() def tearDown(self): + socket.getfqdn = self.real_getfqdn # indicate that the client is finished self.client_evt.set() # wait for the server thread to terminate @@ -230,6 +208,12 @@ class NonConnectingTests(TestCase): + def setUp(self): + smtplib.socket = mock_socket + + def tearDown(self): + smtplib.socket = socket + def testNotConnected(self): # Test various operations on an unconnected SMTP object that # should raise exceptions (at present the attempt in SMTP.send @@ -242,9 +226,9 @@ def testNonnumericPort(self): # check that non-numeric port raises socket.error - self.assertRaises(socket.error, smtplib.SMTP, + self.assertRaises(mock_socket.error, smtplib.SMTP, "localhost", "bogus") - self.assertRaises(socket.error, smtplib.SMTP, + self.assertRaises(mock_socket.error, smtplib.SMTP, "localhost:bogus") @@ -252,18 +236,14 @@ class BadHELOServerTests(TestCase): def setUp(self): + smtplib.socket = mock_socket + mock_socket.reply_with("199 no hello for you!\n") self.old_stdout = sys.stdout self.output = StringIO.StringIO() sys.stdout = self.output - self.evt = threading.Event() - servargs = (self.evt, "199 no hello for you!\n") - threading.Thread(target=server, args=servargs).start() - self.evt.wait() - self.evt.clear() - def tearDown(self): - self.evt.wait() + smtplib.socket = socket sys.stdout = self.old_stdout def testFailingHELO(self): @@ -331,6 +311,8 @@ class SMTPSimTests(TestCase): def setUp(self): + self.real_getfqdn = socket.getfqdn + socket.getfqdn = mock_getfqdn self.serv_evt = threading.Event() self.client_evt = threading.Event() serv_args = (SimSMTPServer, self.serv_evt, self.client_evt) @@ -341,6 +323,7 @@ self.serv_evt.clear() def tearDown(self): + socket.getfqdn = self.real_getfqdn # indicate that the client is finished self.client_evt.set() # wait for the server thread to terminate