diff -r f27af2243e2a Lib/test/test_zipimport.py --- a/Lib/test/test_zipimport.py Fri Sep 20 21:25:53 2013 +0300 +++ b/Lib/test/test_zipimport.py Mon Sep 23 20:53:18 2013 -0700 @@ -1,3 +1,4 @@ +import io import sys import os import marshal @@ -55,6 +56,27 @@ TEMP_ZIP = os.path.abspath("junk95142" + os.extsep + "zip") +def _write_zip_package(zipname, files, + data_to_prepend=b"", compression=ZIP_STORED): + z = ZipFile(zipname, "w") + try: + for name, (mtime, data) in files.items(): + zinfo = ZipInfo(name, time.localtime(mtime)) + zinfo.compress_type = compression + z.writestr(zinfo, data) + finally: + z.close() + + if data_to_prepend: + # Prepend data to the start of the zipfile + with open(zipname, "rb") as f: + zip_data = f.read() + + with open(zipname, "wb") as f: + f.write(data_to_prepend) + f.write(zip_data) + + class UncompressedZipImportTestCase(ImportHooksBaseTestCase): compression = ZIP_STORED @@ -67,26 +89,9 @@ ImportHooksBaseTestCase.setUp(self) def doTest(self, expected_ext, files, *modules, **kw): - z = ZipFile(TEMP_ZIP, "w") + _write_zip_package(TEMP_ZIP, files, data_to_prepend=kw.get('stuff'), + compression=self.compression) try: - for name, (mtime, data) in files.items(): - zinfo = ZipInfo(name, time.localtime(mtime)) - zinfo.compress_type = self.compression - z.writestr(zinfo, data) - z.close() - - stuff = kw.get("stuff", None) - if stuff is not None: - # Prepend 'stuff' to the start of the zipfile - f = open(TEMP_ZIP, "rb") - data = f.read() - f.close() - - f = open(TEMP_ZIP, "wb") - f.write(stuff) - f.write(data) - f.close() - sys.path.insert(0, TEMP_ZIP) mod = __import__(".".join(modules), globals(), locals(), @@ -101,7 +106,8 @@ self.assertEqual(file, os.path.join(TEMP_ZIP, *modules) + expected_ext) finally: - z.close() + while TEMP_ZIP in sys.path: + sys.path.remove(TEMP_ZIP) os.remove(TEMP_ZIP) def testAFakeZlib(self): @@ -387,6 +393,63 @@ compression = ZIP_DEFLATED +class ZipFileModifiedAfterImportTestCase(ImportHooksBaseTestCase): + def setUp(self): + zipimport._zip_directory_cache.clear() + ImportHooksBaseTestCase.setUp(self) + + def tearDown(self): + while TEMP_ZIP in sys.path: + sys.path.remove(TEMP_ZIP) + if os.path.exists(TEMP_ZIP): + os.remove(TEMP_ZIP) + + def testZipFileChangesAfterFirstImport(self): + """Alter the zip file after caching its index and try an import.""" + packdir = TESTPACK + os.sep + files = {packdir + "__init__" + pyc_ext: (NOW, test_pyc), + packdir + TESTMOD + ".py": (NOW, "test_value = 38\n"), + "ziptest_a.py": (NOW, "test_value = 23\n"), + "ziptest_b.py": (NOW, "test_value = 42\n"), + "ziptest_c.py": (NOW, "test_value = 1337\n")} + zipfile_path = TEMP_ZIP + _write_zip_package(zipfile_path, files) + self.assertTrue(os.path.exists(zipfile_path)) + sys.path.insert(0, zipfile_path) + + # Import something out of the zipfile and confirm it is correct. + testmod = __import__(TESTPACK + "." + TESTMOD, + globals(), locals(), ["__dummy__"]) + self.assertEqual(testmod.test_value, 38) + # Import something else out of the zipfile and confirm it is correct. + ziptest_b = __import__("ziptest_b", globals(), locals(), ["test_value"]) + self.assertEqual(ziptest_b.test_value, 42) + + # Truncate and fill the zip file with non-zip garbage. + with io.open(zipfile_path, "rb") as orig_zip_file: + orig_zip_file_contents = orig_zip_file.read() + with io.open(zipfile_path, "wb") as byebye_valid_zip_file: + byebye_valid_zip_file.write("Tear down this wall!\n"*1987) + # Now that the zipfile has been replaced, import something else from it + # which should fail as the file contents are now garbage. + with self.assertRaises(ImportError): + ziptest_a = __import__("ziptest_a", globals(), locals(), + ["test_value"]) + self.assertEqual(ziptest_a.test_value, 23) + + # Now lets make it a valid zipfile that has some garbage at the start. + # This alters all of the offsets within the file + with io.open(zipfile_path, "wb") as new_zip_file: + new_zip_file.write(b"X"*1991) # The year Python was created. + new_zip_file.write(orig_zip_file_contents) + + # Now that the zip file has been "restored" to a valid but different + # zipfile the zipimporter should *successfully* re-read the new zip + # file's end of file central index and be able to import from it again. + ziptest_c = __import__("ziptest_c", globals(), locals(), ["test_value"]) + self.assertEqual(ziptest_c.test_value, 1337) + + class BadFileZipImportTestCase(unittest.TestCase): def assertZipFailure(self, filename): self.assertRaises(zipimport.ZipImportError, @@ -464,6 +527,7 @@ UncompressedZipImportTestCase, CompressedZipImportTestCase, BadFileZipImportTestCase, + ZipFileModifiedAfterImportTestCase, ) finally: test_support.unlink(TESTMOD)