diff -r a206f952668e Lib/test/test_csv.py --- a/Lib/test/test_csv.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_csv.py Sat Aug 10 13:51:13 2013 +0300 @@ -905,78 +905,77 @@ dialect = sniffer.sniff(self.sample9) self.assertTrue(dialect.doublequote) -if not hasattr(sys, "gettotalrefcount"): - if support.verbose: print("*** skipping leakage tests ***") -else: - class NUL: - def write(s, *args): - pass - writelines = write +class NUL: + def write(s, *args): + pass + writelines = write - class TestLeaks(unittest.TestCase): - def test_create_read(self): - delta = 0 - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - csv.reader(["a,b,c\r\n"]) - csv.reader(["a,b,c\r\n"]) - csv.reader(["a,b,c\r\n"]) - delta = rc-lastrc - lastrc = rc - # if csv.reader() leaks, last delta should be 3 or more - self.assertEqual(delta < 3, True) +@unittest.skipUnless(hasattr(sys, "gettotalrefcount"), + 'requires sys.gettotalrefcount') +class TestLeaks(unittest.TestCase): + def test_create_read(self): + delta = 0 + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + csv.reader(["a,b,c\r\n"]) + csv.reader(["a,b,c\r\n"]) + csv.reader(["a,b,c\r\n"]) + delta = rc-lastrc + lastrc = rc + # if csv.reader() leaks, last delta should be 3 or more + self.assertEqual(delta < 3, True) - def test_create_write(self): - delta = 0 - lastrc = sys.gettotalrefcount() - s = NUL() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - csv.writer(s) - csv.writer(s) - csv.writer(s) - delta = rc-lastrc - lastrc = rc - # if csv.writer() leaks, last delta should be 3 or more - self.assertEqual(delta < 3, True) + def test_create_write(self): + delta = 0 + lastrc = sys.gettotalrefcount() + s = NUL() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + csv.writer(s) + csv.writer(s) + csv.writer(s) + delta = rc-lastrc + lastrc = rc + # if csv.writer() leaks, last delta should be 3 or more + self.assertEqual(delta < 3, True) - def test_read(self): - delta = 0 - rows = ["a,b,c\r\n"]*5 - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - rdr = csv.reader(rows) - for row in rdr: - pass - delta = rc-lastrc - lastrc = rc - # if reader leaks during read, delta should be 5 or more - self.assertEqual(delta < 5, True) + def test_read(self): + delta = 0 + rows = ["a,b,c\r\n"]*5 + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + rdr = csv.reader(rows) + for row in rdr: + pass + delta = rc-lastrc + lastrc = rc + # if reader leaks during read, delta should be 5 or more + self.assertEqual(delta < 5, True) - def test_write(self): - delta = 0 - rows = [[1,2,3]]*5 - s = NUL() - lastrc = sys.gettotalrefcount() - for i in range(20): - gc.collect() - self.assertEqual(gc.garbage, []) - rc = sys.gettotalrefcount() - writer = csv.writer(s) - for row in rows: - writer.writerow(row) - delta = rc-lastrc - lastrc = rc - # if writer leaks during write, last delta should be 5 or more - self.assertEqual(delta < 5, True) + def test_write(self): + delta = 0 + rows = [[1,2,3]]*5 + s = NUL() + lastrc = sys.gettotalrefcount() + for i in range(20): + gc.collect() + self.assertEqual(gc.garbage, []) + rc = sys.gettotalrefcount() + writer = csv.writer(s) + for row in rows: + writer.writerow(row) + delta = rc-lastrc + lastrc = rc + # if writer leaks during write, last delta should be 5 or more + self.assertEqual(delta < 5, True) class TestUnicode(unittest.TestCase): diff -r a206f952668e Lib/test/test_ftplib.py --- a/Lib/test/test_ftplib.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_ftplib.py Sat Aug 10 13:51:13 2013 +0300 @@ -16,7 +16,7 @@ except ImportError: ssl = None -from unittest import TestCase +from unittest import TestCase, skipUnless from test import support from test.support import HOST threading = support.import_module('threading') @@ -756,6 +756,7 @@ self.assertEqual(ftplib.parse257('257 "/foo/b""ar" created'), '/foo/b"ar') +@skipUnless(support.IPV6_ENABLED, "IPv6 not enabled") class TestIPv6Environment(TestCase): def setUp(self): @@ -796,6 +797,7 @@ retr() +@skipUnless(ssl, "SSL not available") class TestTLS_FTPClassMixin(TestFTPClass): """Repeat TestFTPClass tests starting the TLS layer for both control and data connections first. @@ -811,6 +813,7 @@ self.client.prot_p() +@skipUnless(ssl, "SSL not available") class TestTLS_FTPClass(TestCase): """Specific TLS_FTP class tests.""" @@ -1003,12 +1006,9 @@ def test_main(): - tests = [TestFTPClass, TestTimeouts, TestNetrcDeprecation] - if support.IPV6_ENABLED: - tests.append(TestIPv6Environment) - - if ssl is not None: - tests.extend([TestTLS_FTPClassMixin, TestTLS_FTPClass]) + tests = [TestFTPClass, TestTimeouts, TestNetrcDeprecation, + TestIPv6Environment, + TestTLS_FTPClassMixin, TestTLS_FTPClass] thread_info = support.threading_setup() try: diff -r a206f952668e Lib/test/test_math.py --- a/Lib/test/test_math.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_math.py Sat Aug 10 13:51:13 2013 +0300 @@ -980,38 +980,37 @@ # still fails this part of the test on some platforms. For now, we only # *run* test_exceptions() in verbose mode, so that this isn't normally # tested. + @unittest.skipUnless(verbose, 'requires vebose mode') + def test_exceptions(self): + try: + x = math.exp(-1000000000) + except: + # mathmodule.c is failing to weed out underflows from libm, or + # we've got an fp format with huge dynamic range + self.fail("underflowing exp() should not have raised " + "an exception") + if x != 0: + self.fail("underflowing exp() should have returned 0") - if verbose: - def test_exceptions(self): - try: - x = math.exp(-1000000000) - except: - # mathmodule.c is failing to weed out underflows from libm, or - # we've got an fp format with huge dynamic range - self.fail("underflowing exp() should not have raised " - "an exception") - if x != 0: - self.fail("underflowing exp() should have returned 0") + # If this fails, probably using a strict IEEE-754 conforming libm, and x + # is +Inf afterwards. But Python wants overflows detected by default. + try: + x = math.exp(1000000000) + except OverflowError: + pass + else: + self.fail("overflowing exp() didn't trigger OverflowError") - # If this fails, probably using a strict IEEE-754 conforming libm, and x - # is +Inf afterwards. But Python wants overflows detected by default. - try: - x = math.exp(1000000000) - except OverflowError: - pass - else: - self.fail("overflowing exp() didn't trigger OverflowError") - - # If this fails, it could be a puzzle. One odd possibility is that - # mathmodule.c's macros are getting confused while comparing - # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE - # as a result (and so raising OverflowError instead). - try: - x = math.sqrt(-1.0) - except ValueError: - pass - else: - self.fail("sqrt(-1) didn't raise ValueError") + # If this fails, it could be a puzzle. One odd possibility is that + # mathmodule.c's macros are getting confused while comparing + # Inf (HUGE_VAL) to a NaN, and artificially setting errno to ERANGE + # as a result (and so raising OverflowError instead). + try: + x = math.sqrt(-1.0) + except ValueError: + pass + else: + self.fail("sqrt(-1) didn't raise ValueError") @requires_IEEE_754 def test_testfile(self): diff -r a206f952668e Lib/test/test_mmap.py --- a/Lib/test/test_mmap.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_mmap.py Sat Aug 10 13:51:13 2013 +0300 @@ -617,67 +617,69 @@ self.assertEqual(m.read_byte(), b) m.close() - if os.name == 'nt': - def test_tagname(self): - data1 = b"0123456789" - data2 = b"abcdefghij" - assert len(data1) == len(data2) + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_tagname(self): + data1 = b"0123456789" + data2 = b"abcdefghij" + assert len(data1) == len(data2) - # Test same tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") - m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="foo") - m2[:] = data2 - self.assertEqual(m1[:], data2) - self.assertEqual(m2[:], data2) - m2.close() - m1.close() + # Test same tag + m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1[:] = data1 + m2 = mmap.mmap(-1, len(data2), tagname="foo") + m2[:] = data2 + self.assertEqual(m1[:], data2) + self.assertEqual(m2[:], data2) + m2.close() + m1.close() - # Test different tag - m1 = mmap.mmap(-1, len(data1), tagname="foo") - m1[:] = data1 - m2 = mmap.mmap(-1, len(data2), tagname="boo") - m2[:] = data2 - self.assertEqual(m1[:], data1) - self.assertEqual(m2[:], data2) - m2.close() - m1.close() + # Test different tag + m1 = mmap.mmap(-1, len(data1), tagname="foo") + m1[:] = data1 + m2 = mmap.mmap(-1, len(data2), tagname="boo") + m2[:] = data2 + self.assertEqual(m1[:], data1) + self.assertEqual(m2[:], data2) + m2.close() + m1.close() - def test_crasher_on_windows(self): - # Should not crash (Issue 1733986) - m = mmap.mmap(-1, 1000, tagname="foo") - try: - mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size - except: - pass - m.close() + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_crasher_on_windows(self): + # Should not crash (Issue 1733986) + m = mmap.mmap(-1, 1000, tagname="foo") + try: + mmap.mmap(-1, 5000, tagname="foo")[:] # same tagname, but larger size + except: + pass + m.close() - # Should not crash (Issue 5385) - with open(TESTFN, "wb") as fp: - fp.write(b"x"*10) - f = open(TESTFN, "r+b") - m = mmap.mmap(f.fileno(), 0) - f.close() - try: - m.resize(0) # will raise OSError - except: - pass - try: - m[:] - except: - pass - m.close() + # Should not crash (Issue 5385) + with open(TESTFN, "wb") as fp: + fp.write(b"x"*10) + f = open(TESTFN, "r+b") + m = mmap.mmap(f.fileno(), 0) + f.close() + try: + m.resize(0) # will raise OSError + except: + pass + try: + m[:] + except: + pass + m.close() - def test_invalid_descriptor(self): - # socket file descriptors are valid, but out of range - # for _get_osfhandle, causing a crash when validating the - # parameters to _get_osfhandle. - s = socket.socket() - try: - with self.assertRaises(OSError): - m = mmap.mmap(s.fileno(), 10) - finally: - s.close() + @unittest.skipUnless(os.name == 'nt', 'requires Windows') + def test_invalid_descriptor(self): + # socket file descriptors are valid, but out of range + # for _get_osfhandle, causing a crash when validating the + # parameters to _get_osfhandle. + s = socket.socket() + try: + with self.assertRaises(OSError): + m = mmap.mmap(s.fileno(), 10) + finally: + s.close() def test_context_manager(self): with mmap.mmap(-1, 10) as m: diff -r a206f952668e Lib/test/test_nntplib.py --- a/Lib/test/test_nntplib.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_nntplib.py Sat Aug 10 13:51:13 2013 +0300 @@ -199,23 +199,23 @@ resp, caps = self.server.capabilities() _check_caps(caps) - if _have_ssl: - def test_starttls(self): - file = self.server.file - sock = self.server.sock - try: - self.server.starttls() - except nntplib.NNTPPermanentError: - self.skipTest("STARTTLS not supported by server.") - else: - # Check that the socket and internal pseudo-file really were - # changed. - self.assertNotEqual(file, self.server.file) - self.assertNotEqual(sock, self.server.sock) - # Check that the new socket really is an SSL one - self.assertIsInstance(self.server.sock, ssl.SSLSocket) - # Check that trying starttls when it's already active fails. - self.assertRaises(ValueError, self.server.starttls) + @unittest.skipUnless(_have_ssl, 'requires SSL support') + def test_starttls(self): + file = self.server.file + sock = self.server.sock + try: + self.server.starttls() + except nntplib.NNTPPermanentError: + self.skipTest("STARTTLS not supported by server.") + else: + # Check that the socket and internal pseudo-file really were + # changed. + self.assertNotEqual(file, self.server.file) + self.assertNotEqual(sock, self.server.sock) + # Check that the new socket really is an SSL one + self.assertIsInstance(self.server.sock, ssl.SSLSocket) + # Check that trying starttls when it's already active fails. + self.assertRaises(ValueError, self.server.starttls) def test_zlogin(self): # This test must be the penultimate because further commands will be @@ -301,24 +301,24 @@ cls.server.quit() -if _have_ssl: - class NetworkedNNTP_SSLTests(NetworkedNNTPTests): +@unittest.skipUnless(_have_ssl, 'requires SSL support') +class NetworkedNNTP_SSLTests(NetworkedNNTPTests): - # Technical limits for this public NNTP server (see http://www.aioe.org): - # "Only two concurrent connections per IP address are allowed and - # 400 connections per day are accepted from each IP address." + # Technical limits for this public NNTP server (see http://www.aioe.org): + # "Only two concurrent connections per IP address are allowed and + # 400 connections per day are accepted from each IP address." - NNTP_HOST = 'nntp.aioe.org' - GROUP_NAME = 'comp.lang.python' - GROUP_PAT = 'comp.lang.*' + NNTP_HOST = 'nntp.aioe.org' + GROUP_NAME = 'comp.lang.python' + GROUP_PAT = 'comp.lang.*' - NNTP_CLASS = nntplib.NNTP_SSL + NNTP_CLASS = nntplib.NNTP_SSL - # Disabled as it produces too much data - test_list = None + # Disabled as it produces too much data + test_list = None - # Disabled as the connection will already be encrypted. - test_starttls = None + # Disabled as the connection will already be encrypted. + test_starttls = None # @@ -1400,9 +1400,7 @@ def test_main(): tests = [MiscTests, NNTPv1Tests, NNTPv2Tests, CapsAfterLoginNNTPv2Tests, - SendReaderNNTPv2Tests, NetworkedNNTPTests] - if _have_ssl: - tests.append(NetworkedNNTP_SSLTests) + SendReaderNNTPv2Tests, NetworkedNNTPTests, NetworkedNNTP_SSLTests] support.run_unittest(*tests) diff -r a206f952668e Lib/test/test_os.py --- a/Lib/test/test_os.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_os.py Sat Aug 10 13:51:13 2013 +0300 @@ -1107,6 +1107,7 @@ self._test_internal_execvpe(bytes) +@unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32ErrorTests(unittest.TestCase): def test_rename(self): self.assertRaises(OSError, os.rename, support.TESTFN, support.TESTFN+".bak") @@ -1249,138 +1250,133 @@ self.file2 = self.file1 + "2" self._test_link(self.file1, self.file2) -if sys.platform != 'win32': - class Win32ErrorTests(unittest.TestCase): - pass +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class PosixUidGidTests(unittest.TestCase): + @unittest.skipUnless(hasattr(os, 'setuid'), 'test needs os.setuid()') + def test_setuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.setuid, 0) + self.assertRaises(OverflowError, os.setuid, 1<<32) - class PosixUidGidTests(unittest.TestCase): - if hasattr(os, 'setuid'): - def test_setuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.setuid, 0) - self.assertRaises(OverflowError, os.setuid, 1<<32) + @unittest.skipUnless(hasattr(os, 'setgid'), 'test needs os.setgid()') + def test_setgid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setgid, 0) + self.assertRaises(OverflowError, os.setgid, 1<<32) - if hasattr(os, 'setgid'): - def test_setgid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setgid, 0) - self.assertRaises(OverflowError, os.setgid, 1<<32) + @unittest.skipUnless(hasattr(os, 'seteuid'), 'test needs os.seteuid()') + def test_seteuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.seteuid, 0) + self.assertRaises(OverflowError, os.seteuid, 1<<32) - if hasattr(os, 'seteuid'): - def test_seteuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.seteuid, 0) - self.assertRaises(OverflowError, os.seteuid, 1<<32) + @unittest.skipUnless(hasattr(os, 'setegid'), 'test needs os.setegid()') + def test_setegid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setegid, 0) + self.assertRaises(OverflowError, os.setegid, 1<<32) - if hasattr(os, 'setegid'): - def test_setegid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setegid, 0) - self.assertRaises(OverflowError, os.setegid, 1<<32) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid(self): + if os.getuid() != 0: + self.assertRaises(OSError, os.setreuid, 0, 0) + self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) + self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) - if hasattr(os, 'setreuid'): - def test_setreuid(self): - if os.getuid() != 0: - self.assertRaises(OSError, os.setreuid, 0, 0) - self.assertRaises(OverflowError, os.setreuid, 1<<32, 0) - self.assertRaises(OverflowError, os.setreuid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setreuid'), 'test needs os.setreuid()') + def test_setreuid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) - def test_setreuid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setreuid(-1,-1);sys.exit(0)']) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid(self): + if os.getuid() != 0 and not HAVE_WHEEL_GROUP: + self.assertRaises(OSError, os.setregid, 0, 0) + self.assertRaises(OverflowError, os.setregid, 1<<32, 0) + self.assertRaises(OverflowError, os.setregid, 0, 1<<32) - if hasattr(os, 'setregid'): - def test_setregid(self): - if os.getuid() != 0 and not HAVE_WHEEL_GROUP: - self.assertRaises(OSError, os.setregid, 0, 0) - self.assertRaises(OverflowError, os.setregid, 1<<32, 0) - self.assertRaises(OverflowError, os.setregid, 0, 1<<32) + @unittest.skipUnless(hasattr(os, 'setregid'), 'test needs os.setregid()') + def test_setregid_neg1(self): + # Needs to accept -1. We run this in a subprocess to avoid + # altering the test runner's process state (issue8045). + subprocess.check_call([ + sys.executable, '-c', + 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) - def test_setregid_neg1(self): - # Needs to accept -1. We run this in a subprocess to avoid - # altering the test runner's process state (issue8045). - subprocess.check_call([ - sys.executable, '-c', - 'import os,sys;os.setregid(-1,-1);sys.exit(0)']) +@unittest.skipIf(sys.platform == "win32", "Posix specific tests") +class Pep383Tests(unittest.TestCase): + def setUp(self): + if support.TESTFN_UNENCODABLE: + self.dir = support.TESTFN_UNENCODABLE + elif support.TESTFN_NONASCII: + self.dir = support.TESTFN_NONASCII + else: + self.dir = support.TESTFN + self.bdir = os.fsencode(self.dir) - class Pep383Tests(unittest.TestCase): - def setUp(self): - if support.TESTFN_UNENCODABLE: - self.dir = support.TESTFN_UNENCODABLE - elif support.TESTFN_NONASCII: - self.dir = support.TESTFN_NONASCII - else: - self.dir = support.TESTFN - self.bdir = os.fsencode(self.dir) + bytesfn = [] + def add_filename(fn): + try: + fn = os.fsencode(fn) + except UnicodeEncodeError: + return + bytesfn.append(fn) + add_filename(support.TESTFN_UNICODE) + if support.TESTFN_UNENCODABLE: + add_filename(support.TESTFN_UNENCODABLE) + if support.TESTFN_NONASCII: + add_filename(support.TESTFN_NONASCII) + if not bytesfn: + self.skipTest("couldn't create any non-ascii filename") - bytesfn = [] - def add_filename(fn): - try: - fn = os.fsencode(fn) - except UnicodeEncodeError: - return - bytesfn.append(fn) - add_filename(support.TESTFN_UNICODE) - if support.TESTFN_UNENCODABLE: - add_filename(support.TESTFN_UNENCODABLE) - if support.TESTFN_NONASCII: - add_filename(support.TESTFN_NONASCII) - if not bytesfn: - self.skipTest("couldn't create any non-ascii filename") + self.unicodefn = set() + os.mkdir(self.dir) + try: + for fn in bytesfn: + support.create_empty_file(os.path.join(self.bdir, fn)) + fn = os.fsdecode(fn) + if fn in self.unicodefn: + raise ValueError("duplicate filename") + self.unicodefn.add(fn) + except: + shutil.rmtree(self.dir) + raise - self.unicodefn = set() - os.mkdir(self.dir) - try: - for fn in bytesfn: - support.create_empty_file(os.path.join(self.bdir, fn)) - fn = os.fsdecode(fn) - if fn in self.unicodefn: - raise ValueError("duplicate filename") - self.unicodefn.add(fn) - except: - shutil.rmtree(self.dir) - raise + def tearDown(self): + shutil.rmtree(self.dir) - def tearDown(self): - shutil.rmtree(self.dir) + def test_listdir(self): + expected = self.unicodefn + found = set(os.listdir(self.dir)) + self.assertEqual(found, expected) + # test listdir without arguments + current_directory = os.getcwd() + try: + os.chdir(os.sep) + self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) + finally: + os.chdir(current_directory) - def test_listdir(self): - expected = self.unicodefn - found = set(os.listdir(self.dir)) - self.assertEqual(found, expected) - # test listdir without arguments - current_directory = os.getcwd() - try: - os.chdir(os.sep) - self.assertEqual(set(os.listdir()), set(os.listdir(os.sep))) - finally: - os.chdir(current_directory) + def test_open(self): + for fn in self.unicodefn: + f = open(os.path.join(self.dir, fn), 'rb') + f.close() - def test_open(self): - for fn in self.unicodefn: - f = open(os.path.join(self.dir, fn), 'rb') - f.close() + @unittest.skipUnless(hasattr(os, 'statvfs'), + "need os.statvfs()") + def test_statvfs(self): + # issue #9645 + for fn in self.unicodefn: + # should not fail with file not found error + fullname = os.path.join(self.dir, fn) + os.statvfs(fullname) - @unittest.skipUnless(hasattr(os, 'statvfs'), - "need os.statvfs()") - def test_statvfs(self): - # issue #9645 - for fn in self.unicodefn: - # should not fail with file not found error - fullname = os.path.join(self.dir, fn) - os.statvfs(fullname) - - def test_stat(self): - for fn in self.unicodefn: - os.stat(os.path.join(self.dir, fn)) -else: - class PosixUidGidTests(unittest.TestCase): - pass - class Pep383Tests(unittest.TestCase): - pass + def test_stat(self): + for fn in self.unicodefn: + os.stat(os.path.join(self.dir, fn)) @unittest.skipUnless(sys.platform == "win32", "Win32 specific tests") class Win32KillTests(unittest.TestCase): @@ -1818,6 +1814,8 @@ SUPPORT_HEADERS_TRAILERS = not sys.platform.startswith("linux") and \ not sys.platform.startswith("solaris") and \ not sys.platform.startswith("sunos") + requires_headers_trailers = unittest.skipUnless(SUPPORT_HEADERS_TRAILERS, + 'requires headers and trailers support') @classmethod def setUpClass(cls): @@ -1936,51 +1934,53 @@ # --- headers / trailers tests - if SUPPORT_HEADERS_TRAILERS: + @requires_headers_trailers + def test_headers(self): + total_sent = 0 + sent = os.sendfile(self.sockno, self.fileno, 0, 4096, + headers=[b"x" * 512]) + total_sent += sent + offset = 4096 + nbytes = 4096 + while 1: + sent = self.sendfile_wrapper(self.sockno, self.fileno, + offset, nbytes) + if sent == 0: + break + total_sent += sent + offset += sent - def test_headers(self): - total_sent = 0 - sent = os.sendfile(self.sockno, self.fileno, 0, 4096, - headers=[b"x" * 512]) - total_sent += sent - offset = 4096 - nbytes = 4096 - while 1: - sent = self.sendfile_wrapper(self.sockno, self.fileno, - offset, nbytes) - if sent == 0: - break - total_sent += sent - offset += sent + expected_data = b"x" * 512 + self.DATA + self.assertEqual(total_sent, len(expected_data)) + self.client.close() + self.server.wait() + data = self.server.handler_instance.get_data() + self.assertEqual(hash(data), hash(expected_data)) - expected_data = b"x" * 512 + self.DATA - self.assertEqual(total_sent, len(expected_data)) + @requires_headers_trailers + def test_trailers(self): + TESTFN2 = support.TESTFN + "2" + with open(TESTFN2, 'wb') as f: + f.write(b"abcde") + with open(TESTFN2, 'rb')as f: + self.addCleanup(os.remove, TESTFN2) + os.sendfile(self.sockno, f.fileno(), 0, 4096, + trailers=[b"12345"]) self.client.close() self.server.wait() data = self.server.handler_instance.get_data() - self.assertEqual(hash(data), hash(expected_data)) + self.assertEqual(data, b"abcde12345") - def test_trailers(self): - TESTFN2 = support.TESTFN + "2" - with open(TESTFN2, 'wb') as f: - f.write(b"abcde") - with open(TESTFN2, 'rb')as f: - self.addCleanup(os.remove, TESTFN2) - os.sendfile(self.sockno, f.fileno(), 0, 4096, - trailers=[b"12345"]) - self.client.close() - self.server.wait() - data = self.server.handler_instance.get_data() - self.assertEqual(data, b"abcde12345") - - if hasattr(os, "SF_NODISKIO"): - def test_flags(self): - try: - os.sendfile(self.sockno, self.fileno, 0, 4096, - flags=os.SF_NODISKIO) - except OSError as err: - if err.errno not in (errno.EBUSY, errno.EAGAIN): - raise + @requires_headers_trailers + @unittest.skipUnless(hasattr(os, 'SF_NODISKIO'), + 'test needs os.SF_NODISKIO') + def test_flags(self): + try: + os.sendfile(self.sockno, self.fileno, 0, 4096, + flags=os.SF_NODISKIO) + except OSError as err: + if err.errno not in (errno.EBUSY, errno.EAGAIN): + raise def supports_extended_attributes(): diff -r a206f952668e Lib/test/test_poplib.py --- a/Lib/test/test_poplib.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_poplib.py Sat Aug 10 13:51:13 2013 +0300 @@ -11,7 +11,7 @@ import time import errno -from unittest import TestCase +from unittest import TestCase, skipUnless from test import support as test_support threading = test_support.import_module('threading') @@ -24,6 +24,7 @@ SUPPORTS_SSL = True CERTFILE = os.path.join(os.path.dirname(__file__) or os.curdir, "keycert.pem") +requires_ssl = skipUnless(SUPPORTS_SSL, 'SSL not supported') # the dummy data returned by server when LIST and RETR commands are issued LIST_RESP = b'1 1\r\n2 2\r\n3 3\r\n4 4\r\n5 5\r\n.\r\n' @@ -312,105 +313,107 @@ self.assertIsNone(self.client.sock) self.assertIsNone(self.client.file) - if SUPPORTS_SSL: + @requires_ssl + def test_stls_capa(self): + capa = self.client.capa() + self.assertTrue('STLS' in capa.keys()) - def test_stls_capa(self): - capa = self.client.capa() - self.assertTrue('STLS' in capa.keys()) + @requires_ssl + def test_stls(self): + expected = b'+OK Begin TLS negotiation' + resp = self.client.stls() + self.assertEqual(resp, expected) - def test_stls(self): - expected = b'+OK Begin TLS negotiation' - resp = self.client.stls() - self.assertEqual(resp, expected) + @requires_ssl + def test_stls_context(self): + expected = b'+OK Begin TLS negotiation' + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + resp = self.client.stls(context=ctx) + self.assertEqual(resp, expected) - def test_stls_context(self): - expected = b'+OK Begin TLS negotiation' - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - resp = self.client.stls(context=ctx) - self.assertEqual(resp, expected) +@requires_ssl +class DummyPOP3_SSLHandler(DummyPOP3Handler): -if SUPPORTS_SSL: + def __init__(self, conn): + asynchat.async_chat.__init__(self, conn) + ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, + server_side=True, + do_handshake_on_connect=False) + self.del_channel() + self.set_socket(ssl_socket) + # Must try handshake before calling push() + self.tls_active = True + self.tls_starting = True + self._do_tls_handshake() + self.set_terminator(b"\r\n") + self.in_buffer = [] + self.push('+OK dummy pop3 server ready. ') - class DummyPOP3_SSLHandler(DummyPOP3Handler): - def __init__(self, conn): - asynchat.async_chat.__init__(self, conn) - ssl_socket = ssl.wrap_socket(self.socket, certfile=CERTFILE, - server_side=True, - do_handshake_on_connect=False) - self.del_channel() - self.set_socket(ssl_socket) - # Must try handshake before calling push() - self.tls_active = True - self.tls_starting = True - self._do_tls_handshake() - self.set_terminator(b"\r\n") - self.in_buffer = [] - self.push('+OK dummy pop3 server ready. ') +@requires_ssl +class TestPOP3_SSLClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3_SSL + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.handler = DummyPOP3_SSLHandler + self.server.start() + self.client = poplib.POP3_SSL(self.server.host, self.server.port) - class TestPOP3_SSLClass(TestPOP3Class): - # repeat previous tests by using poplib.POP3_SSL + def test__all__(self): + self.assertIn('POP3_SSL', poplib.__all__) - def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) - self.server.handler = DummyPOP3_SSLHandler - self.server.start() - self.client = poplib.POP3_SSL(self.server.host, self.server.port) + def test_context(self): + ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, keyfile=CERTFILE, context=ctx) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, certfile=CERTFILE, context=ctx) + self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, + self.server.port, keyfile=CERTFILE, + certfile=CERTFILE, context=ctx) - def test__all__(self): - self.assertIn('POP3_SSL', poplib.__all__) + self.client.quit() + self.client = poplib.POP3_SSL(self.server.host, self.server.port, + context=ctx) + self.assertIsInstance(self.client.sock, ssl.SSLSocket) + self.assertIs(self.client.sock.context, ctx) + self.assertTrue(self.client.noop().startswith(b'+OK')) - def test_context(self): - ctx = ssl.SSLContext(ssl.PROTOCOL_TLSv1) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, certfile=CERTFILE, context=ctx) - self.assertRaises(ValueError, poplib.POP3_SSL, self.server.host, - self.server.port, keyfile=CERTFILE, - certfile=CERTFILE, context=ctx) + def test_stls(self): + self.assertRaises(poplib.error_proto, self.client.stls) + test_stls_context = test_stls + + def test_stls_capa(self): + capa = self.client.capa() + self.assertFalse('STLS' in capa.keys()) + + +@requires_ssl +class TestPOP3_TLSClass(TestPOP3Class): + # repeat previous tests by using poplib.POP3.stls() + + def setUp(self): + self.server = DummyPOP3Server((HOST, PORT)) + self.server.start() + self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) + self.client.stls() + + def tearDown(self): + if self.client.file is not None and self.client.sock is not None: self.client.quit() - self.client = poplib.POP3_SSL(self.server.host, self.server.port, - context=ctx) - self.assertIsInstance(self.client.sock, ssl.SSLSocket) - self.assertIs(self.client.sock.context, ctx) - self.assertTrue(self.client.noop().startswith(b'+OK')) + self.server.stop() - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) + def test_stls(self): + self.assertRaises(poplib.error_proto, self.client.stls) - test_stls_context = test_stls + test_stls_context = test_stls - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse('STLS' in capa.keys()) - - - class TestPOP3_TLSClass(TestPOP3Class): - # repeat previous tests by using poplib.POP3.stls() - - def setUp(self): - self.server = DummyPOP3Server((HOST, PORT)) - self.server.start() - self.client = poplib.POP3(self.server.host, self.server.port, timeout=3) - self.client.stls() - - def tearDown(self): - if self.client.file is not None and self.client.sock is not None: - self.client.quit() - self.server.stop() - - def test_stls(self): - self.assertRaises(poplib.error_proto, self.client.stls) - - test_stls_context = test_stls - - def test_stls_capa(self): - capa = self.client.capa() - self.assertFalse(b'STLS' in capa.keys()) + def test_stls_capa(self): + capa = self.client.capa() + self.assertFalse(b'STLS' in capa.keys()) class TestTimeouts(TestCase): diff -r a206f952668e Lib/test/test_posix.py --- a/Lib/test/test_posix.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_posix.py Sat Aug 10 13:51:13 2013 +0300 @@ -54,47 +54,55 @@ posix_func() self.assertRaises(TypeError, posix_func, 1) - if hasattr(posix, 'getresuid'): - def test_getresuid(self): - user_ids = posix.getresuid() - self.assertEqual(len(user_ids), 3) - for val in user_ids: - self.assertGreaterEqual(val, 0) + @unittest.skipUnless(hasattr(posix, 'getresuid'), + 'test needs os.getresuid()') + def test_getresuid(self): + user_ids = posix.getresuid() + self.assertEqual(len(user_ids), 3) + for val in user_ids: + self.assertGreaterEqual(val, 0) - if hasattr(posix, 'getresgid'): - def test_getresgid(self): - group_ids = posix.getresgid() - self.assertEqual(len(group_ids), 3) - for val in group_ids: - self.assertGreaterEqual(val, 0) + @unittest.skipUnless(hasattr(posix, 'getresgid'), + 'test needs os.getresgid()') + def test_getresgid(self): + group_ids = posix.getresgid() + self.assertEqual(len(group_ids), 3) + for val in group_ids: + self.assertGreaterEqual(val, 0) - if hasattr(posix, 'setresuid'): - def test_setresuid(self): - current_user_ids = posix.getresuid() - self.assertIsNone(posix.setresuid(*current_user_ids)) - # -1 means don't change that value. - self.assertIsNone(posix.setresuid(-1, -1, -1)) + @unittest.skipUnless(hasattr(posix, 'setresuid'), + 'test needs os.setresuid()') + def test_setresuid(self): + current_user_ids = posix.getresuid() + self.assertIsNone(posix.setresuid(*current_user_ids)) + # -1 means don't change that value. + self.assertIsNone(posix.setresuid(-1, -1, -1)) - def test_setresuid_exception(self): - # Don't do this test if someone is silly enough to run us as root. - current_user_ids = posix.getresuid() - if 0 not in current_user_ids: - new_user_ids = (current_user_ids[0]+1, -1, -1) - self.assertRaises(OSError, posix.setresuid, *new_user_ids) + @unittest.skipUnless(hasattr(posix, 'setresuid'), + 'test needs os.setresuid()') + def test_setresuid_exception(self): + # Don't do this test if someone is silly enough to run us as root. + current_user_ids = posix.getresuid() + if 0 not in current_user_ids: + new_user_ids = (current_user_ids[0]+1, -1, -1) + self.assertRaises(OSError, posix.setresuid, *new_user_ids) - if hasattr(posix, 'setresgid'): - def test_setresgid(self): - current_group_ids = posix.getresgid() - self.assertIsNone(posix.setresgid(*current_group_ids)) - # -1 means don't change that value. - self.assertIsNone(posix.setresgid(-1, -1, -1)) + @unittest.skipUnless(hasattr(posix, 'setresgid'), + 'test needs os.setresgid()') + def test_setresgid(self): + current_group_ids = posix.getresgid() + self.assertIsNone(posix.setresgid(*current_group_ids)) + # -1 means don't change that value. + self.assertIsNone(posix.setresgid(-1, -1, -1)) - def test_setresgid_exception(self): - # Don't do this test if someone is silly enough to run us as root. - current_group_ids = posix.getresgid() - if 0 not in current_group_ids: - new_group_ids = (current_group_ids[0]+1, -1, -1) - self.assertRaises(OSError, posix.setresgid, *new_group_ids) + @unittest.skipUnless(hasattr(posix, 'setresgid'), + 'test needs os.setresgid()') + def test_setresgid_exception(self): + # Don't do this test if someone is silly enough to run us as root. + current_group_ids = posix.getresgid() + if 0 not in current_group_ids: + new_group_ids = (current_group_ids[0]+1, -1, -1) + self.assertRaises(OSError, posix.setresgid, *new_group_ids) @unittest.skipUnless(hasattr(posix, 'initgroups'), "test needs os.initgroups()") diff -r a206f952668e Lib/test/test_set.py --- a/Lib/test/test_set.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_set.py Sat Aug 10 13:51:13 2013 +0300 @@ -625,10 +625,10 @@ myset >= myobj self.assertTrue(myobj.le_called) - # C API test only available in a debug build - if hasattr(set, "test_c_api"): - def test_c_api(self): - self.assertEqual(set().test_c_api(), True) + @unittest.skipUnless(hasattr(set, "test_c_api"), + 'C API test only available in a debug build') + def test_c_api(self): + self.assertEqual(set().test_c_api(), True) class SetSubclass(set): pass diff -r a206f952668e Lib/test/test_socketserver.py --- a/Lib/test/test_socketserver.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_socketserver.py Sat Aug 10 13:51:13 2013 +0300 @@ -26,8 +26,9 @@ TEST_STR = b"hello world\n" HOST = test.support.HOST -HAVE_UNIX_SOCKETS = hasattr(socket, "AF_UNIX") -HAVE_FORKING = hasattr(os, "fork") +requires_unix_sockets = unittest.skipUnless(hasattr(socket, "AF_UNIX"), + 'requires Unix sockets"') +requires_forking = unittest.skipUnless(hasattr(os, "fork"), 'requires forking') def signal_alarm(n): """Call signal.alarm when it exists (i.e. not on Windows).""" @@ -44,14 +45,15 @@ else: raise RuntimeError("timed out on %r" % (sock,)) -if HAVE_UNIX_SOCKETS: - class ForkingUnixStreamServer(socketserver.ForkingMixIn, - socketserver.UnixStreamServer): - pass +@requires_unix_sockets +class ForkingUnixStreamServer(socketserver.ForkingMixIn, + socketserver.UnixStreamServer): + pass - class ForkingUnixDatagramServer(socketserver.ForkingMixIn, - socketserver.UnixDatagramServer): - pass +@requires_unix_sockets +class ForkingUnixDatagramServer(socketserver.ForkingMixIn, + socketserver.UnixDatagramServer): + pass @contextlib.contextmanager @@ -175,31 +177,33 @@ socketserver.StreamRequestHandler, self.stream_examine) - if HAVE_FORKING: - def test_ForkingTCPServer(self): - with simple_subprocess(self): - self.run_server(socketserver.ForkingTCPServer, - socketserver.StreamRequestHandler, - self.stream_examine) - - if HAVE_UNIX_SOCKETS: - def test_UnixStreamServer(self): - self.run_server(socketserver.UnixStreamServer, + @requires_forking + def test_ForkingTCPServer(self): + with simple_subprocess(self): + self.run_server(socketserver.ForkingTCPServer, socketserver.StreamRequestHandler, self.stream_examine) - def test_ThreadingUnixStreamServer(self): - self.run_server(socketserver.ThreadingUnixStreamServer, + @requires_unix_sockets + def test_UnixStreamServer(self): + self.run_server(socketserver.UnixStreamServer, + socketserver.StreamRequestHandler, + self.stream_examine) + + @requires_unix_sockets + def test_ThreadingUnixStreamServer(self): + self.run_server(socketserver.ThreadingUnixStreamServer, + socketserver.StreamRequestHandler, + self.stream_examine) + + @requires_unix_sockets + @requires_forking + def test_ForkingUnixStreamServer(self): + with simple_subprocess(self): + self.run_server(ForkingUnixStreamServer, socketserver.StreamRequestHandler, self.stream_examine) - if HAVE_FORKING: - def test_ForkingUnixStreamServer(self): - with simple_subprocess(self): - self.run_server(ForkingUnixStreamServer, - socketserver.StreamRequestHandler, - self.stream_examine) - def test_UDPServer(self): self.run_server(socketserver.UDPServer, socketserver.DatagramRequestHandler, @@ -210,12 +214,12 @@ socketserver.DatagramRequestHandler, self.dgram_examine) - if HAVE_FORKING: - def test_ForkingUDPServer(self): - with simple_subprocess(self): - self.run_server(socketserver.ForkingUDPServer, - socketserver.DatagramRequestHandler, - self.dgram_examine) + @requires_forking + def test_ForkingUDPServer(self): + with simple_subprocess(self): + self.run_server(socketserver.ForkingUDPServer, + socketserver.DatagramRequestHandler, + self.dgram_examine) @contextlib.contextmanager def mocked_select_module(self): @@ -252,22 +256,24 @@ # Alas, on Linux (at least) recvfrom() doesn't return a meaningful # client address so this cannot work: - # if HAVE_UNIX_SOCKETS: - # def test_UnixDatagramServer(self): - # self.run_server(socketserver.UnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # def test_UnixDatagramServer(self): + # self.run_server(socketserver.UnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) # - # def test_ThreadingUnixDatagramServer(self): - # self.run_server(socketserver.ThreadingUnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # def test_ThreadingUnixDatagramServer(self): + # self.run_server(socketserver.ThreadingUnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) # - # if HAVE_FORKING: - # def test_ForkingUnixDatagramServer(self): - # self.run_server(socketserver.ForkingUnixDatagramServer, - # socketserver.DatagramRequestHandler, - # self.dgram_examine) + # @requires_unix_sockets + # @requires_forking + # def test_ForkingUnixDatagramServer(self): + # self.run_server(socketserver.ForkingUnixDatagramServer, + # socketserver.DatagramRequestHandler, + # self.dgram_examine) @reap_threads def test_shutdown(self): diff -r a206f952668e Lib/test/test_zlib.py --- a/Lib/test/test_zlib.py Thu Aug 08 18:28:53 2013 +0100 +++ b/Lib/test/test_zlib.py Sat Aug 10 13:51:13 2013 +0300 @@ -7,6 +7,13 @@ zlib = support.import_module('zlib') +requires_Compress_copy = unittest.skipUnless( + hasattr(zlib.compressobj(), "copy"), + 'requires Compress.copy()') +requires_Decompress_copy = unittest.skipUnless( + hasattr(zlib.decompressobj(), "copy"), + 'requires Decompress.copy()') + class VersionTestCase(unittest.TestCase): @@ -525,67 +532,69 @@ data = zlib.compress(input2) self.assertEqual(dco.flush(), input1[1:]) - if hasattr(zlib.compressobj(), "copy"): - def test_compresscopy(self): - # Test copying a compression object - data0 = HAMLET_SCENE - data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") - c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) - bufs0 = [] - bufs0.append(c0.compress(data0)) + @requires_Compress_copy + def test_compresscopy(self): + # Test copying a compression object + data0 = HAMLET_SCENE + data1 = bytes(str(HAMLET_SCENE, "ascii").swapcase(), "ascii") + c0 = zlib.compressobj(zlib.Z_BEST_COMPRESSION) + bufs0 = [] + bufs0.append(c0.compress(data0)) - c1 = c0.copy() - bufs1 = bufs0[:] + c1 = c0.copy() + bufs1 = bufs0[:] - bufs0.append(c0.compress(data0)) - bufs0.append(c0.flush()) - s0 = b''.join(bufs0) + bufs0.append(c0.compress(data0)) + bufs0.append(c0.flush()) + s0 = b''.join(bufs0) - bufs1.append(c1.compress(data1)) - bufs1.append(c1.flush()) - s1 = b''.join(bufs1) + bufs1.append(c1.compress(data1)) + bufs1.append(c1.flush()) + s1 = b''.join(bufs1) - self.assertEqual(zlib.decompress(s0),data0+data0) - self.assertEqual(zlib.decompress(s1),data0+data1) + self.assertEqual(zlib.decompress(s0),data0+data0) + self.assertEqual(zlib.decompress(s1),data0+data1) - def test_badcompresscopy(self): - # Test copying a compression object in an inconsistent state - c = zlib.compressobj() - c.compress(HAMLET_SCENE) - c.flush() - self.assertRaises(ValueError, c.copy) + @requires_Compress_copy + def test_badcompresscopy(self): + # Test copying a compression object in an inconsistent state + c = zlib.compressobj() + c.compress(HAMLET_SCENE) + c.flush() + self.assertRaises(ValueError, c.copy) - if hasattr(zlib.decompressobj(), "copy"): - def test_decompresscopy(self): - # Test copying a decompression object - data = HAMLET_SCENE - comp = zlib.compress(data) - # Test type of return value - self.assertIsInstance(comp, bytes) + @requires_Decompress_copy + def test_decompresscopy(self): + # Test copying a decompression object + data = HAMLET_SCENE + comp = zlib.compress(data) + # Test type of return value + self.assertIsInstance(comp, bytes) - d0 = zlib.decompressobj() - bufs0 = [] - bufs0.append(d0.decompress(comp[:32])) + d0 = zlib.decompressobj() + bufs0 = [] + bufs0.append(d0.decompress(comp[:32])) - d1 = d0.copy() - bufs1 = bufs0[:] + d1 = d0.copy() + bufs1 = bufs0[:] - bufs0.append(d0.decompress(comp[32:])) - s0 = b''.join(bufs0) + bufs0.append(d0.decompress(comp[32:])) + s0 = b''.join(bufs0) - bufs1.append(d1.decompress(comp[32:])) - s1 = b''.join(bufs1) + bufs1.append(d1.decompress(comp[32:])) + s1 = b''.join(bufs1) - self.assertEqual(s0,s1) - self.assertEqual(s0,data) + self.assertEqual(s0,s1) + self.assertEqual(s0,data) - def test_baddecompresscopy(self): - # Test copying a compression object in an inconsistent state - data = zlib.compress(HAMLET_SCENE) - d = zlib.decompressobj() - d.decompress(data) - d.flush() - self.assertRaises(ValueError, d.copy) + @requires_Decompress_copy + def test_baddecompresscopy(self): + # Test copying a compression object in an inconsistent state + data = zlib.compress(HAMLET_SCENE) + d = zlib.decompressobj() + d.decompress(data) + d.flush() + self.assertRaises(ValueError, d.copy) # Memory use of the following functions takes into account overallocation