diff -r b7c0137cccbe Lib/test/test_zipfile.py --- a/Lib/test/test_zipfile.py Thu Mar 26 23:50:57 2015 +0100 +++ b/Lib/test/test_zipfile.py Thu Apr 09 16:18:15 2015 +0100 @@ -361,6 +361,364 @@ self.assertIn('[closed]', repr(zipopen)) self.assertIn('[closed]', repr(zipfp)) + def zip_remove_file_from_existing_test(self, f, compression): + self.make_test_archive(f, compression) + + with zipfile.ZipFile(f, "a", compression) as zipfp: + + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + + zipfp.remove(TESTFN) + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 2) + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + # Check removed file + self.assertNotIn(TESTFN, names) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 2) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertNotIn(TESTFN, names) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + with zipfile.ZipFile(f, "r", compression) as zipfp: + + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 2) + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + # Check removed file + self.assertNotIn(TESTFN, names) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 2) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertNotIn(TESTFN, names) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + # check getinfo + for nm in ("another.name", "strfile"): + info = zipfp.getinfo(nm) + self.assertEqual(info.filename, nm) + self.assertEqual(info.file_size, len(self.data)) + + # Check that testzip doesn't raise an exception + zipfp.testzip() + + def test_remove_file_from_existing(self): + for f in get_files(self): + self.zip_remove_file_from_existing_test(f, self.compression) + + def test_rename_file_in_existing(self): + for f in get_files(self): + self.zip_rename_file_in_existing_test(f, self.compression) + + def zip_rename_file_in_existing_test(self, f, compression): + self.make_test_archive(f, compression) + + with zipfile.ZipFile(f, "a", compression) as zipfp: + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + TESTFN_NEW = ''.join(["new", TESTFN]) + zipfp.rename(TESTFN, TESTFN_NEW) + + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 3) + # Check renamed file + self.assertIn(TESTFN_NEW, names) + self.assertNotIn(TESTFN, names) + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + self.assertEqual(zipfp.read(TESTFN_NEW), self.data) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 3) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertIn(TESTFN_NEW, names) + + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + with zipfile.ZipFile(f, "r", compression) as zipfp: + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + self.assertEqual(zipfp.read(TESTFN_NEW), self.data) + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 3) + # Check renamed file + self.assertIn(TESTFN_NEW, names) + self.assertNotIn(TESTFN, names) + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 3) + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertIn(TESTFN_NEW, names) + self.assertNotIn(TESTFN, names) + + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + # check getinfo + for nm in ("another.name", "strfile", TESTFN_NEW): + info = zipfp.getinfo(nm) + self.assertEqual(info.filename, nm) + self.assertEqual(info.file_size, len(self.data)) + + # Check that testzip doesn't raise an exception + zipfp.testzip() + + def test_remove_nonexistent_file(self): + for f in get_files(self): + self.zip_remove_nonexistent_file_test(f, self.compression) + + def zip_remove_nonexistent_file_test(self, f, compression): + self.make_test_archive(f, compression) + + with zipfile.ZipFile(f, "a", compression) as zipfp: + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + with self.assertRaises(KeyError): + zipfp.remove("non.existent.file") + + def test_rename_nonexistent_file(self): + for f in get_files(self): + self.zip_remove_nonexistent_file_test(f, self.compression) + + def zip_rename_nonexistent_file_test(self, f, compression): + self.make_test_archive(f, compression) + + with zipfile.ZipFile(f, "a", compression) as zipfp: + self.assertEqual(zipfp.read(TESTFN), self.data) + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + TESTFN_NEW = ''.join(["new", TESTFN]) + with self.assertRaises(KeyError): + zipfp.rename("non.existent.file", TESTFN_NEW) + + def test_rename_and_remove_wrong_permissions(self): + for f in get_files(self): + self.zip_rename_and_remove_wrong_permissions(f, self.compression) + + def zip_rename_and_remove_wrong_permissions(self, f, compression): + self.make_test_archive(f, compression) + + with zipfile.ZipFile(f, "r", compression) as zipfp: + with self.assertRaises(RuntimeError): + zipfp.rename("another.name", "test") + with self.assertRaises(RuntimeError): + zipfp.remove("another.name") + + def test_clone(self): + for f in get_files(self): + self.zip_clone_test(f, self.compression) + + def zip_clone_test(self, f, compression): + TESTFN3 = TESTFN + "3" + self.make_test_archive(f, compression) + with zipfile.ZipFile(f) as f: + with f.clone(TESTFN3) as zipfp: + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 3) + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + self.assertEqual(zipfp.read(TESTFN), self.data) + + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertIn(TESTFN, names) + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 3) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + def test_clone_with_filenames(self): + for f in get_files(self): + self.zip_clone_with_filenames_test(f, self.compression) + + def zip_clone_with_filenames_test(self, f, compression): + TESTFN3 = TESTFN + "3" + self.make_test_archive(f, compression) + with zipfile.ZipFile(f) as f: + with f.clone(TESTFN3, ["another.name", "strfile"]) as zipfp: + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 2) + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertNotIn(TESTFN, names) + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 2) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + + def test_clone_with_fileinfos(self): + for f in get_files(self): + self.zip_clone_with_fileinfos_test(f, self.compression) + + def zip_clone_with_fileinfos_test(self, f, compression): + TESTFN3 = TESTFN + "3" + self.make_test_archive(f, compression) + with zipfile.ZipFile(f) as f: + fileinfos = [info for info in f.infolist() + if info.filename in ["another.name", "strfile"]] + with f.clone(TESTFN3, fileinfos) as zipfp: + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 2) + # Check remaining data + self.assertEqual(zipfp.read("another.name"), self.data) + self.assertEqual(zipfp.read("strfile"), self.data) + + # Check present files + self.assertIn("another.name", names) + self.assertIn("strfile", names) + self.assertNotIn(TESTFN, names) + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 2) + for i in infos: + self.assertEqual(i.file_size, len(self.data)) + unlink(TESTFN3) + + def test_hidden_files(self): + f = findfile("zip_hiddenfiles.zip") + hidden_data = [b'This is a prefix.\n', + b'Intermediate data\n', + b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x0cgYF\xf39@\x12\x0c\x00\x00\x00\x0c\x00\x00\x00\x04\x00\x00\x00fourHidden file\n'] + + with zipfile.ZipFile(f) as f: + hidden_files = f._hidden_files() + self.assertEqual(len(hidden_files), 3) + + for file, hidden in zip(hidden_files, hidden_data): + data = file.read(file.length) + self.assertEqual(data, hidden) + + def test_clone_with_hidden_files(self): + TESTFN3 = TESTFN + "3" + f = findfile("zip_hiddenfiles.zip") + hidden_data = [b'This is a prefix.\n', + b'Intermediate data\n', + b'PK\x03\x04\x14\x00\x00\x00\x00\x00\x0cgYF\xf39@\x12\x0c\x00\x00\x00\x0c\x00\x00\x00\x04\x00\x00\x00fourHidden file\n'] + + with zipfile.ZipFile(f) as f: + original_files = {fileinfo.filename: f.read(fileinfo.filename) + for fileinfo in f.infolist()} + with f.clone(TESTFN3, filenames_or_infolist=f.infolist()) as zipfp: + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 4) + # check the hidden files persisted + hidden_files = zipfp._hidden_files() + self.assertEqual(len(hidden_files), 3) + + for file, hidden in zip(hidden_files, hidden_data): + data = file.read(file.length) + self.assertEqual(data, hidden) + names = zipfp.namelist() + self.assertIn("one", names) + self.assertIn("two", names) + self.assertIn("three", names) + self.assertIn("five", names) + self.assertNotIn("four", names) + + # check data + new_files = {fileinfo.filename: zipfp.read(fileinfo.filename) + for fileinfo in zipfp.infolist()} + for name, data in new_files.items(): + self.assertEqual(data, original_files[name]) + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 4) + + def test_clone_ignore_hidden_files(self): + TESTFN3 = TESTFN + "3" + f = findfile("zip_hiddenfiles.zip") + with zipfile.ZipFile(f) as f: + original_files = {fileinfo.filename: f.read(fileinfo.filename) + for fileinfo in f.infolist()} + with f.clone(TESTFN3, ignore_hidden_files=True) as zipfp: + # Check the namelist + names = zipfp.namelist() + self.assertEqual(len(names), 4) + + # check the hidden files persisted + hidden_files = zipfp._hidden_files() + self.assertEqual(len(hidden_files), 0) + + names = zipfp.namelist() + self.assertIn("one", names) + self.assertIn("two", names) + self.assertIn("three", names) + self.assertIn("five", names) + self.assertNotIn("four", names) + + # check data + new_files = {fileinfo.filename: zipfp.read(fileinfo.filename) + for fileinfo in zipfp.infolist()} + for name, data in new_files.items(): + self.assertEqual(data, original_files[name]) + + # Check infolist + infos = zipfp.infolist() + names = [i.filename for i in infos] + self.assertEqual(len(names), 4) + def tearDown(self): unlink(TESTFN) unlink(TESTFN2) diff -r b7c0137cccbe Lib/test/zip_hiddenfiles.zip Binary file Lib/test/zip_hiddenfiles.zip has changed diff -r b7c0137cccbe Lib/zipfile.py --- a/Lib/zipfile.py Thu Mar 26 23:50:57 2015 +0100 +++ b/Lib/zipfile.py Thu Apr 09 16:18:15 2015 +0100 @@ -14,7 +14,8 @@ import struct import binascii import threading - +import tempfile +import operator try: import zlib # We may need its compression method @@ -815,7 +816,7 @@ def readable(self): return True - def read(self, n=-1): + def read(self, n=-1, decompress=True): """Read and return up to n bytes. If the argument is omitted, None, or negative, data is read and returned until EOF is reached.. """ @@ -824,7 +825,7 @@ self._readbuffer = b'' self._offset = 0 while not self._eof: - buf += self._read1(self.MAX_N) + buf += self._read1(self.MAX_N, decompress=decompress) return buf end = n + self._offset @@ -838,7 +839,7 @@ self._readbuffer = b'' self._offset = 0 while n > 0 and not self._eof: - data = self._read1(n) + data = self._read1(n, decompress=decompress) if n < len(data): self._readbuffer = data self._offset = n @@ -856,7 +857,7 @@ self._running_crc = crc32(newdata, self._running_crc) & 0xffffffff # Check the CRC if we're at the end of the file if self._eof and self._running_crc != self._expected_crc: - raise BadZipFile("Bad CRC-32 for file %r" % self.name) + raise BadZipFile("Bad CRC-32: for file %r" % self.name) def read1(self, n): """Read up to n bytes with at most one read() system call.""" @@ -895,7 +896,7 @@ break return buf - def _read1(self, n): + def _read1(self, n, decompress=True): # Read up to n compressed bytes with at most one read() system call, # decrypt and decompress them. if self._eof or n <= 0: @@ -910,7 +911,7 @@ else: data = self._read2(n) - if self._compress_type == ZIP_STORED: + if self._compress_type == ZIP_STORED or not decompress: self._eof = self._compress_left <= 0 elif self._compress_type == ZIP_DEFLATED: n = max(n, self.MIN_READ_SIZE) @@ -928,7 +929,9 @@ self._left -= len(data) if self._left <= 0: self._eof = True - self._update_crc(data) + # We can only check the crc if we are decompressing + if decompress: + self._update_crc(data) return data def _read2(self, n): @@ -956,7 +959,8 @@ class ZipFile: - """ Class with methods to open, read, write, close, list zip files. + """ Class with methods to open, read, write, rename, remove, close, list + the contents of zip files. z = ZipFile(file, mode="r", compression=ZIP_STORED, allowZip64=True) @@ -985,6 +989,8 @@ self._allowZip64 = allowZip64 self._didModify = False + self.requires_commit = False + self.removed_filelist = [] self.debug = 0 # Level of printing: 0 through 3 self.NameToInfo = {} # Find file info given name self.filelist = [] # List of ZipInfo instances for archive @@ -1420,6 +1426,25 @@ raise LargeZipFile(requires_zip64 + " would require ZIP64 extensions") + def _renamecheck(self, filename): + """Check for errors before writing a file to the archive.""" + if filename in self.NameToInfo: + import warnings + warnings.warn('Duplicate name: %r' % zinfo.filename, stacklevel=3) + if self.mode not in ('w', 'x', 'a'): + raise RuntimeError("rename() requires mode 'w', 'x', or 'a'") + if not self.fp: + raise RuntimeError( + "Attempt to modify ZIP archive that was already closed") + + def _removecheck(self): + """Check for errors before writing a file to the archive.""" + if self.mode not in ('w', 'x', 'a'): + raise RuntimeError("rename() requires mode 'w', 'x', or 'a'") + if not self.fp: + raise RuntimeError( + "Attempt to modify ZIP archive that was already closed") + def write(self, filename, arcname=None, compress_type=None): """Put the bytes from filename into the archive under the name arcname.""" @@ -1585,6 +1610,383 @@ self.filelist.append(zinfo) self.NameToInfo[zinfo.filename] = zinfo + def _hidden_files(self): + """Find any files that are hidden between memebers of this archive""" + # Establish the file boundaries, start - end, for each file + # initial file boundaries are the start and end of the zip up to the + # central directory + file_boundaries = [{"start": 0, "end": 0}, + {"start": self.start_dir, "end": self.start_dir}] + for fileinfo in self.filelist + self.removed_filelist: + + # establish the end_offset + end_offset = fileinfo.header_offset + end_offset += sizeFileHeader + end_offset += len(fileinfo.orig_filename) + end_offset += len(fileinfo.extra) + end_offset += fileinfo.compress_size + is_encrypted = fileinfo.flag_bits & 0x1 + if is_encrypted: + end_offset += 12 + + # add to the file boundaries + file_boundaries.append({"start": fileinfo.header_offset, + "end": end_offset}) + + # Look for data inbetween the file boundaries + file_boundaries.sort(key=operator.itemgetter("start")) + current = file_boundaries.pop(0) + hidden_files = [] + for next in file_boundaries: + if current["end"] > next["start"]: + # next is contained within current |--c.s---n.s--n.e---c.e--| + continue + elif current["end"] != next["start"]: + # There is some data inbetween + file = _SharedFile(self.fp, current["end"], self._fpclose, self._lock) + file.length = next["start"] - current["end"] + hidden_files.append(file) + current = next + + return hidden_files + + def _write_hidden(self, data): + """Write data to the file that contains the zipfile without adding it as + a managed entry of the zip""" + with self._lock: + if self._seekable: + self.fp.seek(self.start_dir) + self.fp.write(data) + self.fp.flush() + self.start_dir = self.fp.tell() + + def remove(self, zinfo_or_arcname): + """ + Remove a member from the archive. + + Args: + zinfo_or_arcname (ZipInfo, str) ZipInfo object or filename of the + member. + + Raises: + RuntimeError: If attempting to modify an Zip archive that is closed. + """ + + if not self.fp: + raise RuntimeError( + "Attempt to modify to ZIP archive that was already closed") + + self._removecheck() + + if isinstance(zinfo_or_arcname, ZipInfo): + zinfo = zinfo_or_arcname + # perform an existence check + self.getinfo(zinfo.filename) + else: + zinfo = self.getinfo(zinfo_or_arcname) + + self.filelist.remove(zinfo) + self.removed_filelist.append(zinfo) + del self.NameToInfo[zinfo.filename] + self._didModify = True + self.requires_commit = True + + def rename(self, zinfo_or_arcname, filename): + """ + Rename a member in the archive. + + Args: + zinfo_or_arcname (ZipInfo, str): ZipInfo object or filename of the + member. + filename (str): the new name for the member. + + Raises: + RuntimeError: If attempting to modify an Zip archive that is closed. + """ + + if not self.fp: + raise RuntimeError( + "Attempt to modify to ZIP archive that was already closed") + + self._renamecheck(filename) + + # Terminate the file name at the first null byte. Null bytes in file + # names are used as tricks by viruses in archives. + null_byte = filename.find(chr(0)) + if null_byte >= 0: + filename = filename[0:null_byte] + # This is used to ensure paths in generated ZIP files always use + # forward slashes as the directory separator, as required by the + # ZIP format specification. + if os.sep != "/" and os.sep in filename: + filename = filename.replace(os.sep, "/") + + if isinstance(zinfo_or_arcname, ZipInfo): + zinfo = zinfo_or_arcname + # perform an existence check + self.getinfo(zinfo.filename) + else: + zinfo = self.getinfo(zinfo_or_arcname) + + zinfo.filename = filename + self.NameToInfo[zinfo.filename] = zinfo + + self._didModify = True + self.requires_commit = True + + def _reset(self): + # Reset modification and commit flags + self._didModify = False + self.requires_commit = False + self.removed_filelist = [] + # Reread contents + self._RealGetContents() + # seek to start of directory ready for subsequent writes + self.fp.seek(self.start_dir) + + def read_compressed(self, name, pwd=None): + """Return file bytes compressed for name.""" + with self.open(name, "r", pwd) as fp: + return fp.read(decompress=False) + + def write_compressed(self, zinfo, data, compress_type=None): + """Write a file into the archive using the already compressed bytes. + The contents is 'data', which is the already compressed bytes. + 'zinfo' is a ZipInfo instance proving the required metadata to + sucessfully write this file. + """ + if not self.fp: + raise RuntimeError( + "Attempt to write to ZIP archive that was already closed") + + with self._lock: + try: + self.fp.seek(self.start_dir) + except (AttributeError, io.UnsupportedOperation): + # Some file-like objects can provide tell() but not seek() + pass + + # ensure the two match as the header is about to be re-written + zinfo.orig_filename = zinfo.filename + + zinfo.header_offset = self.fp.tell() # update start of header + if compress_type is not None: + zinfo.compress_type = compress_type + if zinfo.compress_type == ZIP_LZMA: + # Compressed data includes an end-of-stream (EOS) marker + zinfo.flag_bits |= 0x02 + + # we don't care about the compression type used + self._writecheck(zinfo) + self._didModify = True + + zinfo.compress_size = len(data) # Compressed size + + zip64 = zinfo.file_size > ZIP64_LIMIT or \ + zinfo.compress_size > ZIP64_LIMIT + if zip64 and not self._allowZip64: + raise LargeZipFile("Filesize would require ZIP64 extensions") + self.fp.write(zinfo.FileHeader(zip64)) + self.fp.write(data) + if zinfo.flag_bits & 0x08: + # Write CRC and file sizes after the file data + fmt = '