diff -r 5c8d71516235 Lib/test/test_asyncore.py --- a/Lib/test/test_asyncore.py Mon May 26 00:58:56 2014 -0700 +++ b/Lib/test/test_asyncore.py Mon May 26 10:48:45 2014 -0400 @@ -5,14 +5,11 @@ import socket import sys import time -import warnings import errno import struct from test import support -from test.support import TESTFN, run_unittest, unlink, HOST, HOSTv6 from io import BytesIO -from io import StringIO try: import threading @@ -94,7 +91,7 @@ """Helper function to bind a socket according to its family.""" if HAS_UNIX_SOCKETS and sock.family == socket.AF_UNIX: # Make sure the path doesn't exist. - unlink(addr) + support.unlink(addr) sock.bind(addr) @@ -257,40 +254,29 @@ d = asyncore.dispatcher() # capture output of dispatcher.log() (to stderr) - fp = StringIO() - stderr = sys.stderr l1 = "Lovely spam! Wonderful spam!" l2 = "I don't like spam!" - try: - sys.stderr = fp + with support.captured_stderr() as stderr: d.log(l1) d.log(l2) - finally: - sys.stderr = stderr - lines = fp.getvalue().splitlines() + lines = stderr.getvalue().splitlines() self.assertEqual(lines, ['log: %s' % l1, 'log: %s' % l2]) def test_log_info(self): d = asyncore.dispatcher() # capture output of dispatcher.log_info() (to stdout via print) - fp = StringIO() - stdout = sys.stdout l1 = "Have you got anything without spam?" l2 = "Why can't she have egg bacon spam and sausage?" l3 = "THAT'S got spam in it!" - try: - sys.stdout = fp + with support.captured_stdout() as stdout: d.log_info(l1, 'EGGS') d.log_info(l2) d.log_info(l3, 'SPAM') - finally: - sys.stdout = stdout - lines = fp.getvalue().splitlines() + lines = stdout.getvalue().splitlines() expected = ['EGGS: %s' % l1, 'info: %s' % l2, 'SPAM: %s' % l3] - self.assertEqual(lines, expected) def test_unhandled(self): @@ -298,18 +284,13 @@ d.ignore_log_types = () # capture output of dispatcher.log_info() (to stdout via print) - fp = StringIO() - stdout = sys.stdout - try: - sys.stdout = fp + with support.captured_stdout() as stdout: d.handle_expt() d.handle_read() d.handle_write() d.handle_connect() - finally: - sys.stdout = stdout - lines = fp.getvalue().splitlines() + lines = stdout.getvalue().splitlines() expected = ['warning: unhandled incoming priority event', 'warning: unhandled read event', 'warning: unhandled write event', @@ -360,7 +341,7 @@ data = b"Suppose there isn't a 16-ton weight?" d = dispatcherwithsend_noread() d.create_socket() - d.connect((HOST, port)) + d.connect((support.HOST, port)) # give time for socket to connect time.sleep(0.1) @@ -388,14 +369,14 @@ class FileWrapperTest(unittest.TestCase): def setUp(self): self.d = b"It's not dead, it's sleeping!" - with open(TESTFN, 'wb') as file: + with open(support.TESTFN, 'wb') as file: file.write(self.d) def tearDown(self): - unlink(TESTFN) + support.unlink(support.TESTFN) def test_recv(self): - fd = os.open(TESTFN, os.O_RDONLY) + fd = os.open(support.TESTFN, os.O_RDONLY) w = asyncore.file_wrapper(fd) os.close(fd) @@ -409,20 +390,20 @@ def test_send(self): d1 = b"Come again?" d2 = b"I want to buy some cheese." - fd = os.open(TESTFN, os.O_WRONLY | os.O_APPEND) + fd = os.open(support.TESTFN, os.O_WRONLY | os.O_APPEND) w = asyncore.file_wrapper(fd) os.close(fd) w.write(d1) w.send(d2) w.close() - with open(TESTFN, 'rb') as file: + with open(support.TESTFN, 'rb') as file: self.assertEqual(file.read(), self.d + d1 + d2) @unittest.skipUnless(hasattr(asyncore, 'file_dispatcher'), 'asyncore.file_dispatcher required') def test_dispatcher(self): - fd = os.open(TESTFN, os.O_RDONLY) + fd = os.open(support.TESTFN, os.O_RDONLY) data = [] class FileDispatcher(asyncore.file_dispatcher): def handle_read(self): @@ -793,12 +774,12 @@ class TestAPI_UseIPv4Sockets(BaseTestAPI): family = socket.AF_INET - addr = (HOST, 0) + addr = (support.HOST, 0) @unittest.skipUnless(support.IPV6_ENABLED, 'IPv6 support required') class TestAPI_UseIPv6Sockets(BaseTestAPI): family = socket.AF_INET6 - addr = (HOSTv6, 0) + addr = (support.HOSTv6, 0) @unittest.skipUnless(HAS_UNIX_SOCKETS, 'Unix sockets required') class TestAPI_UseUnixSockets(BaseTestAPI): @@ -807,7 +788,7 @@ addr = support.TESTFN def tearDown(self): - unlink(self.addr) + support.unlink(self.addr) BaseTestAPI.tearDown(self) class TestAPI_UseIPv4Select(TestAPI_UseIPv4Sockets, unittest.TestCase):