#! /usr/bin/env python3 import os import tarfile import tempfile from contextlib import contextmanager from concurrent.futures import ThreadPoolExecutor, ProcessPoolExecutor PKG_DIR = 'packages' WORK_DIR = 'work' SUBDIR = 'etc' @contextmanager def temp_cwd(): """Context manager that temporarily creates and changes the CWD.""" saved_dir = os.getcwd() with tempfile.TemporaryDirectory() as temp_path: os.chdir(temp_path) try: yield os.getcwd() finally: os.chdir(saved_dir) # Use safe_makedirs() to fix the problem. @contextmanager def safe_makedirs(): def _makedirs(*args, **kwds): kwds['exist_ok'] = True saved_makedirs(*args[:2], **kwds) saved_makedirs = os.makedirs try: os.makedirs = _makedirs yield finally: os.makedirs = saved_makedirs def etc_files_filter(members): for tarinfo in members: fname = tarinfo.name if (tarinfo.isfile() and fname.startswith(SUBDIR)): yield tarinfo def extract_from(pkg, pkg_dir, work_dir): tar = tarfile.open(os.path.join(pkg_dir, pkg), mode='r:xz', debug=1) tar.extractall(work_dir, members=list(etc_files_filter(tar.getmembers()))) print('extracted from', pkg) def extract_from_pkgs(pkg_dir, work_dir): packages = list(os.listdir(pkg_dir)) # with safe_makedirs(): with ThreadPoolExecutor(max_workers=4) as executor: futures = [executor.submit(extract_from, pkg, pkg_dir, work_dir) for pkg in packages] for f in futures: exc = f.exception() if exc is not None: raise exc class TestCase(): def __init__(self, tmpdir): self.tmpdir = tmpdir self.pkg_dir = os.path.join(tmpdir, PKG_DIR) self.work_dir = os.path.join(tmpdir, WORK_DIR) def add_files(self, files, dir_path=''): """'files' is a dictionary of file names mapped to their content.""" for fname in files: path = os.path.join(dir_path, SUBDIR, fname) dirname = os.path.dirname(path) if not os.path.isdir(dirname): os.makedirs(dirname) with open(path, 'w') as f: f.write(files[fname]) def add_package(self, name, files, version='1.0', release='1'): """Add a package to 'pkg_dir'.""" if not os.path.isdir(self.pkg_dir): os.makedirs(self.pkg_dir) pkg_name = os.path.join(self.pkg_dir, '%s-%s-%s-%s.pkg.tar.xz' % (name, version, release, os.uname().machine)) with temp_cwd(): self.add_files(files) with tarfile.open(pkg_name, 'w|xz') as tar: tar.add(SUBDIR) def check_result(self, expected): res = sorted(os.listdir(os.path.join(self.tmpdir, WORK_DIR, SUBDIR))) assert res == expected , '%s != %s' % (res, expected) def use_case(self): fnames = ['a', 'b', 'c'] for n in fnames: self.add_package('%s_package' % n, {n: '%s content' % n}) extract_from_pkgs(self.pkg_dir, self.work_dir) self.check_result(fnames) def main(): with temp_cwd() as tmpdir: TestCase(tmpdir).use_case() if __name__ == '__main__': main()