diff -r eafff38a56cc Lib/test/test_tarfile.py --- a/Lib/test/test_tarfile.py Fri May 10 05:22:14 2013 +0300 +++ b/Lib/test/test_tarfile.py Sun May 12 13:49:44 2013 +0300 @@ -2,7 +2,6 @@ import os import io import shutil -import io from hashlib import md5 import errno @@ -10,12 +9,12 @@ import tarfile from test import support +from test.support import requires_bz2, requires_lzma # Check for our compression modules. try: import gzip - gzip.GzipFile -except (ImportError, AttributeError): +except ImportError: gzip = None try: import bz2 @@ -40,25 +39,55 @@ md5_sparse = "a54fbc4ca4f4399a90e1b27164012fc6" -class ReadTest(unittest.TestCase): +class TarTest: + tarname = tarname + suffix = '' + open = io.FileIO - tarname = tarname - mode = "r:" + @property + def mode(self): + return self.prefix + self.suffix + +@unittest.skipUnless(gzip, 'requires gzip') +class GzipTest: + tarname = gzipname + suffix = 'gz' + open = gzip.GzipFile if gzip else None + +@requires_bz2 +class Bz2Test: + tarname = bz2name + suffix = 'bz2' + open = bz2.BZ2File if bz2 else None + +@requires_lzma +class LzmaTest: + tarname = xzname + suffix = 'xz' + open = lzma.LZMAFile if lzma else None + + +class ReadTest(TarTest): + + prefix = "r:" def setUp(self): - self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") + self.tar = tarfile.open(self.tarname, mode=self.mode, + encoding="iso8859-1") def tearDown(self): self.tar.close() -class UstarReadTest(ReadTest): +class UstarReadTest(ReadTest, unittest.TestCase): def test_fileobj_regular_file(self): tarinfo = self.tar.getmember("ustar/regtype") with self.tar.extractfile(tarinfo) as fobj: data = fobj.read() - self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), + self.assertEqual(len(data), tarinfo.size, + "regular file extraction failed") + self.assertEqual(md5sum(data), md5_regtype, "regular file extraction failed") def test_fileobj_readlines(self): @@ -70,12 +99,13 @@ with self.tar.extractfile(tarinfo) as fobj: fobj2 = io.TextIOWrapper(fobj) lines2 = fobj2.readlines() - self.assertTrue(lines1 == lines2, + self.assertEqual(lines1, lines2, "fileobj.readlines() failed") - self.assertTrue(len(lines2) == 114, + self.assertEqual(len(lines2), 114, "fileobj.readlines() failed") - self.assertTrue(lines2[83] == - "I will gladly admit that Python is not the fastest running scripting language.\n", + self.assertEqual(lines2[83], + "I will gladly admit that Python is not the fastest " + "running scripting language.\n", "fileobj.readlines() failed") def test_fileobj_iter(self): @@ -85,8 +115,8 @@ lines1 = fobj1.readlines() with self.tar.extractfile(tarinfo) as fobj2: lines2 = list(io.TextIOWrapper(fobj2)) - self.assertTrue(lines1 == lines2, - "fileobj.__iter__() failed") + self.assertEqual(lines1, lines2, + "fileobj.__iter__() failed") def test_fileobj_seek(self): self.tar.extract("ustar/regtype", TEMPDIR) @@ -110,12 +140,12 @@ self.assertEqual(2048, fobj.tell(), "seek() to positive relative position failed") s = fobj.read(10) - self.assertTrue(s == data[2048:2058], + self.assertEqual(s, data[2048:2058], "read() after seek failed") fobj.seek(0, 2) self.assertEqual(tarinfo.size, fobj.tell(), "seek() to file's end failed") - self.assertTrue(fobj.read() == b"", + self.assertEqual(fobj.read(), b"", "read() at file's end did not return empty string") fobj.seek(-tarinfo.size, 2) self.assertEqual(0, fobj.tell(), @@ -124,13 +154,13 @@ s1 = fobj.readlines() fobj.seek(512) s2 = fobj.readlines() - self.assertTrue(s1 == s2, + self.assertEqual(s1, s2, "readlines() after seek failed") fobj.seek(0) self.assertEqual(len(fobj.readline()), fobj.tell(), "tell() after readline() failed") fobj.seek(512) - self.assertTrue(len(fobj.readline()) + 512 == fobj.tell(), + self.assertEqual(len(fobj.readline()) + 512, fobj.tell(), "tell() after seek() and readline() failed") fobj.seek(0) line = fobj.readline() @@ -154,24 +184,36 @@ # test link members each point to a regular member whose data is # supposed to be exported. def _test_fileobj_link(self, lnktype, regtype): - with self.tar.extractfile(lnktype) as a, self.tar.extractfile(regtype) as b: + with self.tar.extractfile(lnktype) as a, \ + self.tar.extractfile(regtype) as b: self.assertEqual(a.name, b.name) def test_fileobj_link1(self): self._test_fileobj_link("ustar/lnktype", "ustar/regtype") def test_fileobj_link2(self): - self._test_fileobj_link("./ustar/linktest2/lnktype", "ustar/linktest1/regtype") + self._test_fileobj_link("./ustar/linktest2/lnktype", + "ustar/linktest1/regtype") def test_fileobj_symlink1(self): self._test_fileobj_link("ustar/symtype", "ustar/regtype") def test_fileobj_symlink2(self): - self._test_fileobj_link("./ustar/linktest2/symtype", "ustar/linktest1/regtype") + self._test_fileobj_link("./ustar/linktest2/symtype", + "ustar/linktest1/regtype") def test_issue14160(self): self._test_fileobj_link("symtype2", "ustar/regtype") +class GzipUstarReadTest(GzipTest, UstarReadTest): + pass + +class Bz2UstarReadTest(Bz2Test, UstarReadTest): + pass + +class LzmaUstarReadTest(LzmaTest, UstarReadTest): + pass + class CommonReadTest(ReadTest): @@ -203,37 +245,24 @@ def test_ignore_zeros(self): # Test TarFile's ignore_zeros option. - if self.mode.endswith(":gz"): - _open = gzip.GzipFile - elif self.mode.endswith(":bz2"): - _open = bz2.BZ2File - elif self.mode.endswith(":xz"): - _open = lzma.LZMAFile - else: - _open = io.FileIO - for char in (b'\0', b'a'): # Test if EOFHeaderError ('\0') and InvalidHeaderError ('a') # are ignored correctly. - with _open(tmpname, "w") as fobj: + with self.open(tmpname, "w") as fobj: fobj.write(char * 1024) fobj.write(tarfile.TarInfo("foo").tobuf()) tar = tarfile.open(tmpname, mode="r", ignore_zeros=True) try: self.assertListEqual(tar.getnames(), ["foo"], - "ignore_zeros=True should have skipped the %r-blocks" % char) + "ignore_zeros=True should have skipped the %r-blocks" % + char) finally: tar.close() -class MiscReadTest(CommonReadTest): - +class MiscReadTestBase(CommonReadTest): def test_no_name_argument(self): - if self.mode.endswith(("bz2", "xz")): - # BZ2File and LZMAFile have no name attribute. - self.skipTest("no name attribute") - with open(self.tarname, "rb") as fobj: tar = tarfile.open(fileobj=fobj, mode=self.mode) self.assertEqual(tar.name, os.path.abspath(fobj.name)) @@ -269,16 +298,7 @@ tar.close() # Open the testtar and seek to the offset of the second member. - if self.mode.endswith(":gz"): - _open = gzip.GzipFile - elif self.mode.endswith(":bz2"): - _open = bz2.BZ2File - elif self.mode.endswith(":xz"): - _open = lzma.LZMAFile - else: - _open = io.FileIO - - with _open(self.tarname) as fobj: + with self.open(self.tarname) as fobj: fobj.seek(offset) # Test if the tarfile starts with the second member. @@ -294,8 +314,6 @@ def test_fail_comp(self): # For Gzip and Bz2 Tests: fail with a ReadError on an uncompressed file. - if self.mode == "r:": - return self.assertRaises(tarfile.ReadError, tarfile.open, tarname, self.mode) with open(tarname, "rb") as fobj: self.assertRaises(tarfile.ReadError, tarfile.open, @@ -306,7 +324,7 @@ # Old V7 tars create directory members using an AREGTYPE # header with a "/" appended to the filename field. tarinfo = self.tar.getmember("misc/dirtype-old-v7") - self.assertTrue(tarinfo.type == tarfile.DIRTYPE, + self.assertEqual(tarinfo.type, tarfile.DIRTYPE, "v7 dirtype failed") def test_xstar_type(self): @@ -320,15 +338,15 @@ def test_check_members(self): for tarinfo in self.tar: - self.assertTrue(int(tarinfo.mtime) == 0o7606136617, + self.assertEqual(int(tarinfo.mtime), 0o7606136617, "wrong mtime for %s" % tarinfo.name) if not tarinfo.name.startswith("ustar/"): continue - self.assertTrue(tarinfo.uname == "tarfile", + self.assertEqual(tarinfo.uname, "tarfile", "wrong uname for %s" % tarinfo.name) def test_find_members(self): - self.assertTrue(self.tar.getmembers()[-1].name == "misc/eof", + self.assertEqual(self.tar.getmembers()[-1].name, "misc/eof", "could not find all members") @unittest.skipUnless(hasattr(os, "link"), @@ -365,7 +383,8 @@ path = os.path.join(DIR, tarinfo.name) if sys.platform != "win32": # Win32 has no support for fine grained permissions. - self.assertEqual(tarinfo.mode & 0o777, os.stat(path).st_mode & 0o777) + self.assertEqual(tarinfo.mode & 0o777, + os.stat(path).st_mode & 0o777) def format_mtime(mtime): if isinstance(mtime, float): return "{} ({})".format(mtime, mtime.hex()) @@ -423,10 +442,29 @@ self.assertEqual(m1.offset, m2.offset) self.assertEqual(m1.get_info(), m2.get_info()) +class MiscReadTest(MiscReadTestBase, unittest.TestCase): + test_fail_comp = None -class StreamReadTest(CommonReadTest): +class GzipMiscReadTest(GzipTest, MiscReadTestBase, unittest.TestCase): + def test_non_existent_targz_file(self): + # Test for issue11513: prevent non-existent gzipped tarfiles raising + # multiple exceptions. + with self.assertRaisesRegex(OSError, "xxx") as ex: + tarfile.open("xxx", self.mode) + self.assertEqual(ex.exception.errno, errno.ENOENT) - mode="r|" +class Bz2MiscReadTest(Bz2Test, MiscReadTestBase, unittest.TestCase): + def test_no_name_argument(self): + self.skipTest("BZ2File have no name attribute") + +class LzmaMiscReadTest(LzmaTest, MiscReadTestBase, unittest.TestCase): + def test_no_name_argument(self): + self.skipTest("LZMAFile have no name attribute") + + +class StreamReadTest(CommonReadTest, unittest.TestCase): + + prefix="r|" def test_read_through(self): # Issue #11224: A poorly designed _FileInFile.read() method @@ -439,7 +477,8 @@ try: buf = fobj.read(512) except tarfile.StreamError: - self.fail("simple read-through using TarFile.extractfile() failed") + self.fail("simple read-through using " + "TarFile.extractfile() failed") if not buf: break @@ -447,7 +486,9 @@ tarinfo = self.tar.next() # get "regtype" (can't use getmember) with self.tar.extractfile(tarinfo) as fobj: data = fobj.read() - self.assertTrue((len(data), md5sum(data)) == (tarinfo.size, md5_regtype), + self.assertEqual(len(data), tarinfo.size, + "regular file extraction failed") + self.assertEqual(md5sum(data), md5_regtype, "regular file extraction failed") def test_provoke_stream_error(self): @@ -465,24 +506,34 @@ t2 = tar2.next() if t1 is None: break - self.assertTrue(t2 is not None, "stream.next() failed.") + self.assertIsNotNone(t2, "stream.next() failed.") if t2.islnk() or t2.issym(): - self.assertRaises(tarfile.StreamError, tar2.extractfile, t2) + with self.assertRaises(tarfile.StreamError): + tar2.extractfile(t2) continue v1 = tar1.extractfile(t1) v2 = tar2.extractfile(t2) if v1 is None: continue - self.assertTrue(v2 is not None, "stream.extractfile() failed") - self.assertEqual(v1.read(), v2.read(), "stream extraction failed") + self.assertIsNotNone(v2, "stream.extractfile() failed") + self.assertEqual(v1.read(), v2.read(), + "stream extraction failed") finally: tar1.close() +class GzipStreamReadTest(GzipTest, StreamReadTest): + pass -class DetectReadTest(unittest.TestCase): +class Bz2StreamReadTest(Bz2Test, StreamReadTest): + pass +class LzmaStreamReadTest(LzmaTest, StreamReadTest): + pass + + +class DetectReadTest(TarTest, unittest.TestCase): def _testfunc_file(self, name, mode): try: tar = tarfile.open(name, mode) @@ -501,47 +552,20 @@ tar.close() def _test_modes(self, testfunc): - testfunc(tarname, "r") - testfunc(tarname, "r:") - testfunc(tarname, "r:*") - testfunc(tarname, "r|") - testfunc(tarname, "r|*") - - if gzip: - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:gz") - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|gz") - self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r:") - self.assertRaises(tarfile.ReadError, tarfile.open, gzipname, mode="r|") - - testfunc(gzipname, "r") - testfunc(gzipname, "r:*") - testfunc(gzipname, "r:gz") - testfunc(gzipname, "r|*") - testfunc(gzipname, "r|gz") - - if bz2: - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r:") - self.assertRaises(tarfile.ReadError, tarfile.open, bz2name, mode="r|") - - testfunc(bz2name, "r") - testfunc(bz2name, "r:*") - testfunc(bz2name, "r:bz2") - testfunc(bz2name, "r|*") - testfunc(bz2name, "r|bz2") - - if lzma: - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r:xz") - self.assertRaises(tarfile.ReadError, tarfile.open, tarname, mode="r|xz") - self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r:") - self.assertRaises(tarfile.ReadError, tarfile.open, xzname, mode="r|") - - testfunc(xzname, "r") - testfunc(xzname, "r:*") - testfunc(xzname, "r:xz") - testfunc(xzname, "r|*") - testfunc(xzname, "r|xz") + if self.suffix: + with self.assertRaises(tarfile.ReadError): + tarfile.open(tarname, mode="r:" + self.suffix) + with self.assertRaises(tarfile.ReadError): + tarfile.open(tarname, mode="r|" + self.suffix) + with self.assertRaises(tarfile.ReadError): + tarfile.open(self.tarname, mode="r:") + with self.assertRaises(tarfile.ReadError): + tarfile.open(self.tarname, mode="r|") + testfunc(self.tarname, "r") + testfunc(self.tarname, "r:" + self.suffix) + testfunc(self.tarname, "r:*") + testfunc(self.tarname, "r|" + self.suffix) + testfunc(self.tarname, "r|*") def test_detect_file(self): self._test_modes(self._testfunc_file) @@ -549,14 +573,15 @@ def test_detect_fileobj(self): self._test_modes(self._testfunc_fileobj) +class GzipDetectReadTest(GzipTest, DetectReadTest): + pass + +class Bz2DetectReadTest(Bz2Test, DetectReadTest): def test_detect_stream_bz2(self): # Originally, tarfile's stream detection looked for the string # "BZh91" at the start of the file. This is incorrect because # the '9' represents the blocksize (900kB). If the file was # compressed using another blocksize autodetection fails. - if not bz2: - return - with open(tarname, "rb") as fobj: data = fobj.read() @@ -566,13 +591,17 @@ self._testfunc_file(tmpname, "r|*") +class LzmaDetectReadTest(LzmaTest, DetectReadTest): + pass -class MemberReadTest(ReadTest): + +class MemberReadTest(ReadTest, unittest.TestCase): def _test_member(self, tarinfo, chksum=None, **kwargs): if chksum is not None: - self.assertTrue(md5sum(self.tar.extractfile(tarinfo).read()) == chksum, - "wrong md5sum for %s" % tarinfo.name) + with self.tar.extractfile(tarinfo) as f: + self.assertEqual(md5sum(f.read()), chksum, + "wrong md5sum for %s" % tarinfo.name) kwargs["mtime"] = 0o7606136617 kwargs["uid"] = 1000 @@ -582,7 +611,7 @@ kwargs["uname"] = "tarfile" kwargs["gname"] = "tarfile" for k, v in kwargs.items(): - self.assertTrue(getattr(tarinfo, k) == v, + self.assertEqual(getattr(tarinfo, k), v, "wrong value in %s field of %s" % (k, tarinfo.name)) def test_find_regtype(self): @@ -642,7 +671,8 @@ self._test_member(tarinfo, size=86016, chksum=md5_sparse) def test_find_umlauts(self): - tarinfo = self.tar.getmember("ustar/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + tarinfo = self.tar.getmember("ustar/umlauts-" + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self._test_member(tarinfo, size=7011, chksum=md5_regtype) def test_find_ustar_longname(self): @@ -655,12 +685,14 @@ def test_find_pax_umlauts(self): self.tar.close() - self.tar = tarfile.open(self.tarname, mode=self.mode, encoding="iso8859-1") - tarinfo = self.tar.getmember("pax/umlauts-\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.tar = tarfile.open(self.tarname, mode=self.mode, + encoding="iso8859-1") + tarinfo = self.tar.getmember("pax/umlauts-" + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") self._test_member(tarinfo, size=7011, chksum=md5_regtype) -class LongnameTest(ReadTest): +class LongnameTest: def test_read_longname(self): # Test reading of longname (bug #1471427). @@ -669,7 +701,8 @@ tarinfo = self.tar.getmember(longname) except KeyError: self.fail("longname not found") - self.assertTrue(tarinfo.type != tarfile.DIRTYPE, "read longname as dirtype") + self.assertNotEqual(tarinfo.type, tarfile.DIRTYPE, + "read longname as dirtype") def test_read_longlink(self): longname = self.subdir + "/" + "123/" * 125 + "longname" @@ -678,7 +711,7 @@ tarinfo = self.tar.getmember(longlink) except KeyError: self.fail("longlink not found") - self.assertTrue(tarinfo.linkname == longname, "linkname wrong") + self.assertEqual(tarinfo.linkname, longname, "linkname wrong") def test_truncated_longname(self): longname = self.subdir + "/" + "123/" * 125 + "longname" @@ -686,7 +719,8 @@ offset = tarinfo.offset self.tar.fileobj.seek(offset) fobj = io.BytesIO(self.tar.fileobj.read(3 * 512)) - self.assertRaises(tarfile.ReadError, tarfile.open, name="foo.tar", fileobj=fobj) + with self.assertRaises(tarfile.ReadError): + tarfile.open(name="foo.tar", fileobj=fobj) def test_header_offset(self): # Test if the start offset of the TarInfo object includes @@ -695,11 +729,12 @@ offset = self.tar.getmember(longname).offset with open(tarname, "rb") as fobj: fobj.seek(offset) - tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), "iso8859-1", "strict") + tarinfo = tarfile.TarInfo.frombuf(fobj.read(512), + "iso8859-1", "strict") self.assertEqual(tarinfo.type, self.longnametype) -class GNUReadTest(LongnameTest): +class GNUReadTest(LongnameTest, ReadTest, unittest.TestCase): subdir = "gnu" longnametype = tarfile.GNUTYPE_LONGNAME @@ -721,7 +756,7 @@ if self._fs_supports_holes(): s = os.stat(filename) - self.assertTrue(s.st_blocks * 512 < s.st_size) + self.assertLess(s.st_blocks * 512, s.st_size) def test_sparse_file_old(self): self._test_sparse_file("gnu/sparse") @@ -753,7 +788,7 @@ return False -class PaxReadTest(LongnameTest): +class PaxReadTest(LongnameTest, ReadTest, unittest.TestCase): subdir = "pax" longnametype = tarfile.XHDTYPE @@ -764,17 +799,20 @@ tarinfo = tar.getmember("pax/regtype1") self.assertEqual(tarinfo.uname, "foo") self.assertEqual(tarinfo.gname, "bar") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") tarinfo = tar.getmember("pax/regtype2") self.assertEqual(tarinfo.uname, "") self.assertEqual(tarinfo.gname, "bar") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") tarinfo = tar.getmember("pax/regtype3") self.assertEqual(tarinfo.uname, "tarfile") self.assertEqual(tarinfo.gname, "tarfile") - self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") + self.assertEqual(tarinfo.pax_headers.get("VENDOR.umlauts"), + "\xc4\xd6\xdc\xe4\xf6\xfc\xdf") finally: tar.close() @@ -794,7 +832,7 @@ tar.close() -class WriteTestBase(unittest.TestCase): +class WriteTestBase(TarTest): # Put all write tests in here that are supposed to be tested # in all possible mode combinations. @@ -803,12 +841,12 @@ tar = tarfile.open(fileobj=fobj, mode=self.mode) tar.addfile(tarfile.TarInfo("foo")) tar.close() - self.assertTrue(fobj.closed is False, "external fileobjs must never closed") + self.assertFalse(fobj.closed, "external fileobjs must never closed") -class WriteTest(WriteTestBase): +class WriteTest(WriteTestBase, unittest.TestCase): - mode = "w:" + prefix = "w:" def test_100_char_name(self): # The name field in a tar header stores strings of at most 100 chars. @@ -825,7 +863,7 @@ tar = tarfile.open(tmpname) try: - self.assertTrue(tar.getnames()[0] == name, + self.assertEqual(tar.getnames()[0], name, "failed to store 100 char filename") finally: tar.close() @@ -840,7 +878,7 @@ tar.add(path) finally: tar.close() - self.assertTrue(os.path.getsize(tmpname) > 0, + self.assertGreater(os.path.getsize(tmpname), 0, "tarfile is empty") # The test_*_size tests test for bug #1167128. @@ -873,25 +911,26 @@ finally: os.rmdir(path) + @unittest.skipUnless(hasattr(os, "link"), + "Missing hardlink implementation") def test_link_size(self): - if hasattr(os, "link"): - link = os.path.join(TEMPDIR, "link") - target = os.path.join(TEMPDIR, "link_target") - with open(target, "wb") as fobj: - fobj.write(b"aaa") - os.link(target, link) + link = os.path.join(TEMPDIR, "link") + target = os.path.join(TEMPDIR, "link_target") + with open(target, "wb") as fobj: + fobj.write(b"aaa") + os.link(target, link) + try: + tar = tarfile.open(tmpname, self.mode) try: - tar = tarfile.open(tmpname, self.mode) - try: - # Record the link target in the inodes list. - tar.gettarinfo(target) - tarinfo = tar.gettarinfo(link) - self.assertEqual(tarinfo.size, 0) - finally: - tar.close() + # Record the link target in the inodes list. + tar.gettarinfo(target) + tarinfo = tar.gettarinfo(link) + self.assertEqual(tarinfo.size, 0) finally: - os.remove(target) - os.remove(link) + tar.close() + finally: + os.remove(target) + os.remove(link) @support.skip_unless_symlink def test_symlink_size(self): @@ -912,15 +951,18 @@ dstname = os.path.abspath(tmpname) tar = tarfile.open(tmpname, self.mode) try: - self.assertTrue(tar.name == dstname, "archive name must be absolute") + self.assertEqual(tar.name, dstname, + "archive name must be absolute") tar.add(dstname) - self.assertTrue(tar.getnames() == [], "added the archive to itself") + self.assertEqual(tar.getnames(), [], + "added the archive to itself") cwd = os.getcwd() os.chdir(TEMPDIR) tar.add(dstname) os.chdir(cwd) - self.assertTrue(tar.getnames() == [], "added the archive to itself") + self.assertEqual(tar.getnames(), [], + "added the archive to itself") finally: tar.close() @@ -1087,48 +1129,49 @@ tar = tarfile.open(tmpname, "r") try: for t in tar: - self.assertTrue(t.name == "." or t.name.startswith("./")) + if t.name != ".": + self.assertTrue(t.name.startswith("./"), t.name) finally: tar.close() finally: os.chdir(cwd) +class GzipWriteTest(GzipTest, WriteTest): + pass -class StreamWriteTest(WriteTestBase): +class Bz2WriteTest(Bz2Test, WriteTest): + pass - mode = "w|" +class LzmaWriteTest(LzmaTest, WriteTest): + pass + + +class StreamWriteTest(WriteTestBase, unittest.TestCase): + + prefix = "w|" + decompressor = None def test_stream_padding(self): # Test for bug #1543303. tar = tarfile.open(tmpname, self.mode) tar.close() - - if self.mode.endswith("gz"): - with gzip.GzipFile(tmpname) as fobj: - data = fobj.read() - elif self.mode.endswith("bz2"): - dec = bz2.BZ2Decompressor() + if self.decompressor: + dec = self.decompressor() with open(tmpname, "rb") as fobj: data = fobj.read() data = dec.decompress(data) - self.assertTrue(len(dec.unused_data) == 0, - "found trailing data") - elif self.mode.endswith("xz"): - with lzma.LZMAFile(tmpname) as fobj: + self.assertFalse(dec.unused_data, "found trailing data") + else: + with self.open(tmpname) as fobj: data = fobj.read() - else: - with open(tmpname, "rb") as fobj: - data = fobj.read() + self.assertEqual(data.count(b"\0"), tarfile.RECORDSIZE, + "incorrect zero padding") - self.assertTrue(data.count(b"\0") == tarfile.RECORDSIZE, - "incorrect zero padding") - + @unittest.skipUnless(sys.platform != "win32" and hasattr(os, "umask"), + "Missing umask implementar") def test_file_mode(self): # Test for issue #8464: Create files with correct # permissions. - if sys.platform == "win32" or not hasattr(os, "umask"): - return - if os.path.exists(tmpname): os.remove(tmpname) @@ -1141,15 +1184,22 @@ finally: os.umask(original_umask) +class GzipStreamWriteTest(GzipTest, StreamWriteTest): + pass + +class Bz2StreamWriteTest(Bz2Test, StreamWriteTest): + decompressor = bz2.BZ2Decompressor if bz2 else None + +class LzmaStreamWriteTest(LzmaTest, StreamWriteTest): + decompressor = lzma.LZMADecompressor if lzma else None + class GNUWriteTest(unittest.TestCase): # This testcase checks for correct creation of GNU Longname # and Longlink extended headers (cp. bug #812325). def _length(self, s): - blocks, remainder = divmod(len(s) + 1, 512) - if remainder: - blocks += 1 + blocks = len(s) // 512 + 1 return blocks * 512 def _calc_size(self, name, link=None): @@ -1179,7 +1229,7 @@ v1 = self._calc_size(name, link) v2 = tar.offset - self.assertTrue(v1 == v2, "GNU longname/longlink creation failed") + self.assertEqual(v1, v2, "GNU longname/longlink creation failed") finally: tar.close() @@ -1226,6 +1276,7 @@ ("longlnk/" * 127) + "longlink_") +@unittest.skipUnless(hasattr(os, "link"), "Missing hardlink implementation") class HardlinkTest(unittest.TestCase): # Test the creation of LNKTYPE (hardlink) members in an archive. @@ -1250,18 +1301,18 @@ # The same name will be added as a REGTYPE every # time regardless of st_nlink. tarinfo = self.tar.gettarinfo(self.foo) - self.assertTrue(tarinfo.type == tarfile.REGTYPE, + self.assertEqual(tarinfo.type, tarfile.REGTYPE, "add file as regular failed") def test_add_hardlink(self): tarinfo = self.tar.gettarinfo(self.bar) - self.assertTrue(tarinfo.type == tarfile.LNKTYPE, + self.assertEqual(tarinfo.type, tarfile.LNKTYPE, "add file as hardlink failed") def test_dereference_hardlink(self): self.tar.dereference = True tarinfo = self.tar.gettarinfo(self.bar) - self.assertTrue(tarinfo.type == tarfile.REGTYPE, + self.assertEqual(tarinfo.type, tarfile.REGTYPE, "dereferencing hardlink failed") @@ -1284,10 +1335,10 @@ try: if link: l = tar.getmembers()[0].linkname - self.assertTrue(link == l, "PAX longlink creation failed") + self.assertEqual(link, l, "PAX longlink creation failed") else: n = tar.getmembers()[0].name - self.assertTrue(name == n, "PAX longname creation failed") + self.assertEqual(name, n, "PAX longname creation failed") finally: tar.close() @@ -1313,8 +1364,8 @@ self.assertEqual(tar.getmembers()[0].pax_headers, pax_headers) # Test if all the fields are strings. for key, val in tar.pax_headers.items(): - self.assertTrue(type(key) is not bytes) - self.assertTrue(type(val) is not bytes) + self.assertIsNot(type(key), bytes) + self.assertIsNot(type(val), bytes) if key in tarfile.PAX_NUMBER_FIELDS: try: tarfile.PAX_NUMBER_FIELDS[key](val) @@ -1328,7 +1379,8 @@ # TarInfo. pax_headers = {"path": "foo", "uid": "123"} - tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, encoding="iso8859-1") + tar = tarfile.open(tmpname, "w", format=tarfile.PAX_FORMAT, + encoding="iso8859-1") try: t = tarfile.TarInfo() t.name = "\xe4\xf6\xfc" # non-ASCII @@ -1362,7 +1414,8 @@ self._test_unicode_filename("utf-8") def _test_unicode_filename(self, encoding): - tar = tarfile.open(tmpname, "w", format=self.format, encoding=encoding, errors="strict") + tar = tarfile.open(tmpname, "w", format=self.format, + encoding=encoding, errors="strict") try: name = "\xe4\xf6\xfc" tar.addfile(tarfile.TarInfo(name)) @@ -1376,11 +1429,8 @@ tar.close() def test_unicode_filename_error(self): - if self.format == tarfile.PAX_FORMAT: - # PAX_FORMAT ignores encoding in write mode. - return - - tar = tarfile.open(tmpname, "w", format=self.format, encoding="ascii", errors="strict") + tar = tarfile.open(tmpname, "w", format=self.format, + encoding="ascii", errors="strict") try: tarinfo = tarfile.TarInfo() @@ -1394,13 +1444,14 @@ tar.close() def test_unicode_argument(self): - tar = tarfile.open(tarname, "r", encoding="iso8859-1", errors="strict") + tar = tarfile.open(tarname, "r", + encoding="iso8859-1", errors="strict") try: for t in tar: - self.assertTrue(type(t.name) is str) - self.assertTrue(type(t.linkname) is str) - self.assertTrue(type(t.uname) is str) - self.assertTrue(type(t.gname) is str) + self.assertIs(type(t.name), str) + self.assertIs(type(t.linkname), str) + self.assertIs(type(t.uname), str) + self.assertIs(type(t.gname), str) finally: tar.close() @@ -1409,7 +1460,8 @@ t.uname = "\xe4\xf6\xfc" t.gname = "\xe4\xf6\xfc" - tar = tarfile.open(tmpname, mode="w", format=self.format, encoding="iso8859-1") + tar = tarfile.open(tmpname, mode="w", format=self.format, + encoding="iso8859-1") try: tar.addfile(t) finally: @@ -1438,9 +1490,11 @@ def test_bad_pax_header(self): # Test for issue #8633. GNU tar <= 1.23 creates raw binary fields # without a hdrcharset=BINARY header. - for encoding, name in (("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), + for encoding, name in ( + ("utf-8", "pax/bad-pax-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/bad-pax-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: + with tarfile.open(tarname, encoding=encoding, + errors="surrogateescape") as tar: try: t = tar.getmember(name) except KeyError: @@ -1451,18 +1505,23 @@ format = tarfile.PAX_FORMAT + # PAX_FORMAT ignores encoding in write mode. + test_unicode_filename_error = None + def test_binary_header(self): # Test a POSIX.1-2008 compatible header with a hdrcharset=BINARY field. - for encoding, name in (("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), + for encoding, name in ( + ("utf-8", "pax/hdrcharset-\udce4\udcf6\udcfc"), ("iso8859-1", "pax/hdrcharset-\xe4\xf6\xfc"),): - with tarfile.open(tarname, encoding=encoding, errors="surrogateescape") as tar: + with tarfile.open(tarname, encoding=encoding, + errors="surrogateescape") as tar: try: t = tar.getmember(name) except KeyError: self.fail("unable to read POSIX.1-2008 binary header") -class AppendTest(unittest.TestCase): +class AppendTestBase: # Test append mode (cp. patch #1652681). def setUp(self): @@ -1470,10 +1529,6 @@ if os.path.exists(self.tarname): os.remove(self.tarname) - def _add_testfile(self, fileobj=None): - with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: - tar.addfile(tarfile.TarInfo("bar")) - def _create_testtar(self, mode="w:"): with tarfile.open(tarname, encoding="iso8859-1") as src: t = src.getmember("ustar/regtype") @@ -1482,6 +1537,17 @@ with tarfile.open(self.tarname, mode) as tar: tar.addfile(t, f) + def test_append_compressed(self): + self._create_testtar("w:" + self.suffix) + self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") + +class AppendTest(AppendTestBase, unittest.TestCase): + test_append_compressed = None + + def _add_testfile(self, fileobj=None): + with tarfile.open(self.tarname, "a", fileobj=fileobj) as tar: + tar.addfile(tarfile.TarInfo("bar")) + def _test(self, names=["bar"], fileobj=None): with tarfile.open(self.tarname, fileobj=fileobj) as tar: self.assertEqual(tar.getnames(), names) @@ -1515,24 +1581,6 @@ self._add_testfile() self._test(names=["foo", "bar"]) - def test_append_gz(self): - if gzip is None: - return - self._create_testtar("w:gz") - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") - - def test_append_bz2(self): - if bz2 is None: - return - self._create_testtar("w:bz2") - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") - - def test_append_lzma(self): - if lzma is None: - self.skipTest("lzma module not available") - self._create_testtar("w:xz") - self.assertRaises(tarfile.ReadError, tarfile.open, tmpname, "a") - # Append mode is supposed to fail if the tarfile to append to # does not end with a zero block. def _test_error(self, data): @@ -1557,6 +1605,15 @@ def test_invalid(self): self._test_error(b"a" * 512) +class GzipAppendTest(GzipTest, AppendTestBase, unittest.TestCase): + pass + +class Bz2AppendTest(Bz2Test, AppendTestBase, unittest.TestCase): + pass + +class LzmaAppendTest(LzmaTest, AppendTestBase, unittest.TestCase): + pass + class LimitsTest(unittest.TestCase): @@ -1620,36 +1677,54 @@ class MiscTest(unittest.TestCase): def test_char_fields(self): - self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), b"foo\0\0\0\0\0") - self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), b"foo") - self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), "foo") - self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), "foo") + self.assertEqual(tarfile.stn("foo", 8, "ascii", "strict"), + b"foo\0\0\0\0\0") + self.assertEqual(tarfile.stn("foobar", 3, "ascii", "strict"), + b"foo") + self.assertEqual(tarfile.nts(b"foo\0\0\0\0\0", "ascii", "strict"), + "foo") + self.assertEqual(tarfile.nts(b"foo\0bar\0", "ascii", "strict"), + "foo") def test_read_number_fields(self): # Issue 13158: Test if GNU tar specific base-256 number fields # are decoded correctly. self.assertEqual(tarfile.nti(b"0000001\x00"), 1) self.assertEqual(tarfile.nti(b"7777777\x00"), 0o7777777) - self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), 0o10000000) - self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), 0xffffffff) - self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), -1) - self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), -100) - self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), -0x100000000000000) + self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\x00\x20\x00\x00"), + 0o10000000) + self.assertEqual(tarfile.nti(b"\x80\x00\x00\x00\xff\xff\xff\xff"), + 0xffffffff) + self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\xff"), + -1) + self.assertEqual(tarfile.nti(b"\xff\xff\xff\xff\xff\xff\xff\x9c"), + -100) + self.assertEqual(tarfile.nti(b"\xff\x00\x00\x00\x00\x00\x00\x00"), + -0x100000000000000) def test_write_number_fields(self): self.assertEqual(tarfile.itn(1), b"0000001\x00") self.assertEqual(tarfile.itn(0o7777777), b"7777777\x00") - self.assertEqual(tarfile.itn(0o10000000), b"\x80\x00\x00\x00\x00\x20\x00\x00") - self.assertEqual(tarfile.itn(0xffffffff), b"\x80\x00\x00\x00\xff\xff\xff\xff") - self.assertEqual(tarfile.itn(-1), b"\xff\xff\xff\xff\xff\xff\xff\xff") - self.assertEqual(tarfile.itn(-100), b"\xff\xff\xff\xff\xff\xff\xff\x9c") - self.assertEqual(tarfile.itn(-0x100000000000000), b"\xff\x00\x00\x00\x00\x00\x00\x00") + self.assertEqual(tarfile.itn(0o10000000), + b"\x80\x00\x00\x00\x00\x20\x00\x00") + self.assertEqual(tarfile.itn(0xffffffff), + b"\x80\x00\x00\x00\xff\xff\xff\xff") + self.assertEqual(tarfile.itn(-1), + b"\xff\xff\xff\xff\xff\xff\xff\xff") + self.assertEqual(tarfile.itn(-100), + b"\xff\xff\xff\xff\xff\xff\xff\x9c") + self.assertEqual(tarfile.itn(-0x100000000000000), + b"\xff\x00\x00\x00\x00\x00\x00\x00") def test_number_field_limits(self): - self.assertRaises(ValueError, tarfile.itn, -1, 8, tarfile.USTAR_FORMAT) - self.assertRaises(ValueError, tarfile.itn, 0o10000000, 8, tarfile.USTAR_FORMAT) - self.assertRaises(ValueError, tarfile.itn, -0x10000000001, 6, tarfile.GNU_FORMAT) - self.assertRaises(ValueError, tarfile.itn, 0x10000000000, 6, tarfile.GNU_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(-1, 8, tarfile.USTAR_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(0o10000000, 8, tarfile.USTAR_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(-0x10000000001, 6, tarfile.GNU_FORMAT) + with self.assertRaises(ValueError): + tarfile.itn(0x10000000000, 6, tarfile.GNU_FORMAT) class ContextManagerTest(unittest.TestCase): @@ -1710,11 +1785,12 @@ self.assertTrue(tar.closed, "context manager failed") -class LinkEmulationTest(ReadTest): +@unittest.skipIf(hasattr(os, "link"), "requires os.link to be missing") +class LinkEmulationTest(ReadTest, unittest.TestCase): # Test for issue #8741 regression. On platforms that do not support - # symbolic or hard links tarfile tries to extract these types of members as - # the regular files they point to. + # symbolic or hard links tarfile tries to extract these types of members + # as the regular files they point to. def _test_link_extraction(self, name): self.tar.extract(name, TEMPDIR) data = open(os.path.join(TEMPDIR, name), "rb").read() @@ -1744,44 +1820,7 @@ self._test_link_extraction("./ustar/linktest2/symtype") -class GzipMiscReadTest(MiscReadTest): - tarname = gzipname - mode = "r:gz" - - def test_non_existent_targz_file(self): - # Test for issue11513: prevent non-existent gzipped tarfiles raising - # multiple exceptions. - with self.assertRaisesRegex(OSError, "xxx") as ex: - tarfile.open("xxx", self.mode) - self.assertEqual(ex.exception.errno, errno.ENOENT) - -class GzipUstarReadTest(UstarReadTest): - tarname = gzipname - mode = "r:gz" -class GzipStreamReadTest(StreamReadTest): - tarname = gzipname - mode = "r|gz" -class GzipWriteTest(WriteTest): - mode = "w:gz" -class GzipStreamWriteTest(StreamWriteTest): - mode = "w|gz" - - -class Bz2MiscReadTest(MiscReadTest): - tarname = bz2name - mode = "r:bz2" -class Bz2UstarReadTest(UstarReadTest): - tarname = bz2name - mode = "r:bz2" -class Bz2StreamReadTest(StreamReadTest): - tarname = bz2name - mode = "r|bz2" -class Bz2WriteTest(WriteTest): - mode = "w:bz2" -class Bz2StreamWriteTest(StreamWriteTest): - mode = "w|bz2" - -class Bz2PartialReadTest(unittest.TestCase): +class Bz2PartialReadTest(Bz2Test, unittest.TestCase): # Issue5068: The _BZ2Proxy.read() method loops forever # on an empty or partial bzipped file. @@ -1790,7 +1829,8 @@ hit_eof = False def read(self, n): if self.hit_eof: - raise AssertionError("infinite loop detected in tarfile.open()") + raise AssertionError("infinite loop detected in " + "tarfile.open()") self.hit_eof = self.tell() == len(self.getvalue()) return super(MyBytesIO, self).read(n) def seek(self, *args): @@ -1811,102 +1851,23 @@ self._test_partial_input("r:bz2") -class LzmaMiscReadTest(MiscReadTest): - tarname = xzname - mode = "r:xz" -class LzmaUstarReadTest(UstarReadTest): - tarname = xzname - mode = "r:xz" -class LzmaStreamReadTest(StreamReadTest): - tarname = xzname - mode = "r|xz" -class LzmaWriteTest(WriteTest): - mode = "w:xz" -class LzmaStreamWriteTest(StreamWriteTest): - mode = "w|xz" - - -def test_main(): +def setUpModule(): support.unlink(TEMPDIR) os.makedirs(TEMPDIR) - tests = [ - UstarReadTest, - MiscReadTest, - StreamReadTest, - DetectReadTest, - MemberReadTest, - GNUReadTest, - PaxReadTest, - WriteTest, - StreamWriteTest, - GNUWriteTest, - PaxWriteTest, - UstarUnicodeTest, - GNUUnicodeTest, - PAXUnicodeTest, - AppendTest, - LimitsTest, - MiscTest, - ContextManagerTest, - ] - - if hasattr(os, "link"): - tests.append(HardlinkTest) - else: - tests.append(LinkEmulationTest) - with open(tarname, "rb") as fobj: data = fobj.read() - if gzip: - # Create testtar.tar.gz and add gzip-specific tests. - support.unlink(gzipname) - with gzip.open(gzipname, "wb") as tar: - tar.write(data) + # Create compressed tarfiles. + for c in GzipTest, Bz2Test, LzmaTest: + if c.open: + support.unlink(c.tarname) + with c.open(c.tarname, "wb") as tar: + tar.write(data) - tests += [ - GzipMiscReadTest, - GzipUstarReadTest, - GzipStreamReadTest, - GzipWriteTest, - GzipStreamWriteTest, - ] - - if bz2: - # Create testtar.tar.bz2 and add bz2-specific tests. - support.unlink(bz2name) - with bz2.BZ2File(bz2name, "wb") as tar: - tar.write(data) - - tests += [ - Bz2MiscReadTest, - Bz2UstarReadTest, - Bz2StreamReadTest, - Bz2WriteTest, - Bz2StreamWriteTest, - Bz2PartialReadTest, - ] - - if lzma: - # Create testtar.tar.xz and add lzma-specific tests. - support.unlink(xzname) - with lzma.LZMAFile(xzname, "w") as tar: - tar.write(data) - - tests += [ - LzmaMiscReadTest, - LzmaUstarReadTest, - LzmaStreamReadTest, - LzmaWriteTest, - LzmaStreamWriteTest, - ] - - try: - support.run_unittest(*tests) - finally: - if os.path.exists(TEMPDIR): - shutil.rmtree(TEMPDIR) +def teatDownModule(): + if os.path.exists(TEMPDIR): + shutil.rmtree(TEMPDIR) if __name__ == "__main__": - test_main() + unittest.main()