#!/usr/bin/env python3.3 """ Demonstrates a bug in zipfile module. When, using hex editing, the size of a ZIP_STORED file is set to larger than the zip itself, when that file is opened, ZipExtFile.read, read(n) or readlines, will hang in an infinite loop consuming 100% cpu. read1 works fine, it dutifully reads in the entire binary data of the zip and returns it, making no note of the obvious corruption, but not hanging. ZIP_DEFLATED, ZIP_BZIP2 and ZIP_LZMA are all unfazed by this simple trick, as long as their compressed data stream is untampered with, but tampering with their data stream can make them hang as well. A naive and blind attempt worked on ZIP_DEFLATED, first try. """ from zipfile import ZipFile, ZIP_STORED, ZIP_DEFLATED, ZIP_BZIP2, ZIP_LZMA def test(mode=ZIP_STORED, corruptstream=False, cmd="read"): zipfilename = 'bar123.zip' data = ''.join(a+b for a in 'abcdefghijklmnopqrstuvwxyz' for b in '0123456789').encode() z = ZipFile(zipfilename, 'w', compression=ZIP_DEFLATED) z.writestr('spam.baz', data, compress_type=mode) z.close() z = ZipFile(zipfilename, 'r') compress_size = z.filelist[0].compress_size z.close() # Now we will corrupt the binary data of the zip # First we'll replace the uncompressed size zipbytes = open(zipfilename, 'rb').read() realsize = len(data).to_bytes(4, 'little') malsize = (len(data) + 5000).to_bytes(4, 'little') zipbytes = zipbytes.replace(realsize, malsize) # Next we replace the compressed size realsize = compress_size.to_bytes(4, 'little') malsize = (compress_size + 5000).to_bytes(4, 'little') zipbytes = zipbytes.replace(realsize, malsize) # Now we'll corrupt the compressed data (if desired) if corruptstream: import regex # re seems to corrupt binary streams zipbytes = regex.sub(rb'(..)(PK\x01\x02)', rb'\x99\xa0PK\x01\x02', zipbytes) # Now we write out our malicously corrupted zip open(zipfilename, 'wb').write(zipbytes) # And open it z = ZipFile(zipfilename, 'r') f = z.open(z.filelist[0]) getattr(f, cmd)(1000) f = z.open(z.filelist[0]) getattr(f, cmd)(-1 if cmd == 'read' else 2 ** 31 -1) # A simple testing harness to test code which is expected to hang. from multiprocessing import Process class Tester(Process): def __init__(self, args): super().__init__() self.args = args self.name = args[1] + '-' + str(args[2]) def run(self): mode, name, corrupt, cmd = args try: test(mode, corrupt, cmd) except Exception as e: print("An exception occured: {}/{}".format(type(e).__name__, e)) else: print("okay.".format(self.name)) attempts = [(ZIP_STORED, 'ZIP_STORED', False, 'read'), (ZIP_DEFLATED, 'ZIP_DEFLATED', False, 'read'), (ZIP_DEFLATED, 'ZIP_DEFLATED', True, 'read'), (ZIP_BZIP2, 'ZIP_BZIP2', False, 'read'), (ZIP_BZIP2, 'ZIP_BZIP2', True, 'read'), (ZIP_LZMA, 'ZIP_LZMA', False, 'read'), (ZIP_LZMA, 'ZIP_LZMA', True, 'read'), (ZIP_STORED, 'ZIP_STORED', False, 'read1'), (ZIP_DEFLATED, 'ZIP_DEFLATED', False, 'read1'), (ZIP_DEFLATED, 'ZIP_DEFLATED', True, 'read1'), (ZIP_BZIP2, 'ZIP_BZIP2', False, 'read1'), (ZIP_BZIP2, 'ZIP_BZIP2', True, 'read1'), (ZIP_LZMA, 'ZIP_LZMA', False, 'read1'), (ZIP_LZMA, 'ZIP_LZMA', True, 'read1') ] # Test these in a subprocess so we can terminate with extreme prejudice. for args in attempts: print("Testing {}/{}/{}:".format(args[1], 'corrupt' if args[2] else 'normal', args[3]), end=" ") p = Tester(args) p.start() p.join(5) # We'll give it 5 seceonds. But it'll time out forever if allowed if p.is_alive(): print("IT HANGED.") p.terminate() print('')