import os import time import numpy as np import multiprocessing import zipfile class ZipFileSource(object): def __init__(self, path, access_mode="lock"): self.path = path self.lock = multiprocessing.Lock() self.set_access_mode(access_mode) self.files = self.access( lambda archive: archive.infolist()) def set_access_mode(self, access_mode): def access_lock(f): with self.lock: return f(self.archive) def access_file(f): with zipfile.ZipFile(self.path, 'r') as archive: return f(archive) if access_mode == "lock": print("lock") self.archive = zipfile.ZipFile(self.path, 'r') self.access = access_lock elif access_mode == "file": print("file") self.archive = None self.access = access_file def __del__(self): if self.archive is not None: self.archive.close() def __len__(self): return len(self.files) def __getitem__(self, index): if index >= len(self): raise IndexError(f'Index {index} >= len {len(self)}') member = self.files[index] data_bytes = self.access( lambda archive: archive.read(member)) return data_bytes dirname = os.path.join(os.path.dirname(__file__), 'test_filesource_data') fs_small = ZipFileSource(os.path.join(dirname, 'foo_bar_small.zip'), access_mode="lock") def read_small(i): return fs_small[i%len(fs_small)][42] if __name__ == '__main__': n_frames = 1000 frames = np.arange(1000, dtype='u4') # read_small(0) def f(nb_processes, access_mode): pool = multiprocessing.Pool(processes=nb_processes) fs_small.set_access_mode(access_mode) start = time.time() res = 0 for i in pool.imap_unordered(read_small, frames): res += i delta = time.time() - start print(f"access_mode = {access_mode}, nb processes = {nb_processes}, res = {res}, {delta*1e3/len(frames)} ms/frame") return res f(1, "file") f(4, "file") f(4, "lock") f(4, "lock") #crash