diff --git a/Lib/importlib/abc.py b/Lib/importlib/abc.py --- a/Lib/importlib/abc.py +++ b/Lib/importlib/abc.py @@ -1,6 +1,5 @@ """Abstract base classes related to import.""" -from . import _bootstrap -from . import machinery +from . import _bootstrap # Don't import .machinery to avoid circular import. try: import _frozen_importlib except ModuleNotFoundError as exc: @@ -52,8 +51,8 @@ This method is used by importlib.invalidate_caches(). """ -_register(MetaPathFinder, machinery.BuiltinImporter, machinery.FrozenImporter, - machinery.PathFinder, machinery.WindowsRegistryFinder) +_register(MetaPathFinder, _bootstrap.BuiltinImporter, _bootstrap.FrozenImporter, + _bootstrap.PathFinder, _bootstrap.WindowsRegistryFinder) class PathEntryFinder(Finder): @@ -78,7 +77,7 @@ This method is used by PathFinder.invalidate_caches(). """ -_register(PathEntryFinder, machinery.FileFinder) +_register(PathEntryFinder, _bootstrap.FileFinder) class Loader(metaclass=abc.ABCMeta): @@ -187,8 +186,8 @@ load_module = _bootstrap._LoaderBasics.load_module -_register(InspectLoader, machinery.BuiltinImporter, machinery.FrozenImporter, - machinery.ExtensionFileLoader, _bootstrap.NamespaceLoader) +_register(InspectLoader, _bootstrap.BuiltinImporter, _bootstrap.FrozenImporter, + _bootstrap.ExtensionFileLoader, _bootstrap.NamespaceLoader) class ExecutionLoader(InspectLoader): @@ -243,8 +242,8 @@ """Abstract base class partially implementing the ResourceLoader and ExecutionLoader ABCs.""" -_register(FileLoader, machinery.SourceFileLoader, - machinery.SourcelessFileLoader) +_register(FileLoader, _bootstrap.SourceFileLoader, + _bootstrap.SourcelessFileLoader) class SourceLoader(_bootstrap.SourceLoader, ResourceLoader, ExecutionLoader): @@ -291,4 +290,4 @@ silently. """ -_register(SourceLoader, machinery.SourceFileLoader) +_register(SourceLoader, _bootstrap.SourceFileLoader) diff --git a/Lib/importlib/machinery.py b/Lib/importlib/machinery.py --- a/Lib/importlib/machinery.py +++ b/Lib/importlib/machinery.py @@ -1,7 +1,11 @@ """The machinery of importlib: finders, loaders, hooks, etc.""" -import _imp +import datetime +import importlib.abc +import os +import zipfile +# Exported API from ._bootstrap import (SOURCE_SUFFIXES, DEBUG_BYTECODE_SUFFIXES, OPTIMIZED_BYTECODE_SUFFIXES, BYTECODE_SUFFIXES, EXTENSION_SUFFIXES) @@ -16,5 +20,176 @@ def all_suffixes(): - """Returns a list of all recognized module suffixes for this process""" + """Returns a list of all recognized module suffixes for this process.""" return SOURCE_SUFFIXES + BYTECODE_SUFFIXES + EXTENSION_SUFFIXES + + +def _norm_path(path): + if os.sep != '/' and os.sep in path: + path = path.replace(os.sep, '/') + return path + + +def _split_path(full_path, zip_path): + """Remove zip_path from full_path along with any path separators.""" + path = full_path[len(zip_path):] + for sep in filter(None, (os.sep, os.altsep)): + if path.startswith(sep): + path = path[len(sep):] + break + return path + + +def _join_path(head, tail): + """Join two path parts together, returning both the OS-dependent version and + a normalized one.""" + real_path = os.path.join(head, tail) + norm_path = _norm_path(real_path) + return real_path, norm_path + + +class ZipPathHook: + + """A sys.path_hook implementation for zip files. + + All returned finders have their zip files cached by the instance which + instantiated them. + + """ + + def __init__(self): + """Create the zip file cache.""" + self._cache = {} + + def __call__(self, path): + """If 'path' contains a subpath to a zip file, return a ZipFinder.""" + full_path = path + while path: + if not os.path.isfile(path): + path = os.path.dirname(path) + continue + elif not zipfile.is_zipfile(path): + raise ImportError('{} is not a zip file'.format(path)) + else: + break + else: + raise ImportError( + '{} does not contain any zipfile path'.format(path)) + zip_path = path + pkg_path = _split_path(full_path, zip_path) + self._cache_zip_file(zip_path) + return ZipFinder(zip_path, pkg_path, self) + + def _cache_zip_file(self, zip_path): + zip_file = zipfile.ZipFile(zip_path) + self._cache_zip_object(zip_path, zip_file) + + def _cache_zip_object(self, zip_path, zip_object): + self._cache[zip_path] = zip_object, frozenset(zip_object.namelist()) + + def _zip(self, path): + """Return the cached zip file for 'path'. + + If the zip file is not in the cache, open it. + """ + try: + return self._cache[path] + except KeyError: + self._cache[path] = zipfile.ZipFile(path) + return zip_file + + def invalidate_caches(self, path=None): + """Close a zip file.""" + for path in (list(self._cache.keys()) if path is None else [path]): + try: + self._cache.pop(path)[0].close() + except KeyError: + pass + + +class ZipFinder(importlib.abc.PathEntryFinder): + + """A path entry finder for zip files.""" + + def __init__(self, zip_path, pkg_path, hook): + self._zip_path = zip_path + if pkg_path.endswith(os.sep): + pkg_path = pkg_path[:-len(os.sep)] + elif os.altsep is not None and pkg_path.endswith(os.altsep): + pkg_path = pkg_path[:-len(os.altsep)] + self._pkg_path = pkg_path + self._hook = hook + + def invalidate_caches(self): + """Invalidate the cached zip file stored in the ZipPathHook which + instantiated this finder.""" + self._hook.invalidate_caches(self._zip_path) + + def _path_set(self): + return self._hook._zip(self._zip_path)[1] + + def find_loader(self, fullname): + module_name = fullname.rpartition('.')[-1] + portions = [] + real_dir, norm_dir = _join_path(self._pkg_path, module_name) + path_set = self._path_set() + if norm_dir + '/' in path_set: + portions.append(real_dir) + for suffix in SOURCE_SUFFIXES: + module_name = '__init__' + suffix + real_pkg_path, norm_pkg_path = _join_path(real_dir, module_name) + if norm_pkg_path in path_set: + return ZipLoader(fullname, self._zip_path, real_pkg_path, + self._hook), portions + else: + for suffix in SOURCE_SUFFIXES: + module_file_name = module_name + suffix + real_module_path, norm_module_path = _join_path(self._pkg_path, + module_file_name) + if norm_module_path in path_set: + return ZipLoader(fullname, self._zip_path, real_module_path, + self._hook), portions + return None, portions + + +class ZipLoader(importlib.abc.SourceLoader): + + def __init__(self, name, zip_path, file_path, hook): + self._name = name + self._zip_path = zip_path + self._file_path = file_path + self._hook = hook + + def _zip_file(self): + return self._hook._zip(self._zip_path)[0] + + def _norm_path(self, path): + if path.startswith(self._zip_path): + path = _norm_path(_split_path(path, self._zip_path)) + return path + + def get_data(self, path): + path = self._norm_path(path) + zip_file = self._zip_file() + try: + return zip_file.read(path) + except KeyError: + raise IOError('{!r} does not exist in the archive'.format(path)) + + def get_filename(self, fullname): + if fullname != self._name: + msg = 'cannot handle {}, only {}'.format(fullname, self._name) + raise ImportError(msg) + return os.path.join(self._zip_path, self._file_path) + + def path_stats(self, path): + path = self._norm_path(path) + zip_file = self._zip_file() + try: + info = zip_file.getinfo(path) + except KeyError: + raise IOError('{!r} does not exist in the archive'.format(path)) + stats = {'size': info.file_size} + mtime = datetime.datetime(*info.date_time) + stats['mtime'] = int(mtime.timestamp()) + return stats \ No newline at end of file diff --git a/Lib/test/test_importlib/machinery/test_zip_file.py b/Lib/test/test_importlib/machinery/test_zip_file.py new file mode 100755 --- /dev/null +++ b/Lib/test/test_importlib/machinery/test_zip_file.py @@ -0,0 +1,220 @@ +import datetime +from importlib import abc +from importlib import machinery +import io +import os +import tempfile +import unittest +import zipfile + + +class PathHookTests(unittest.TestCase): + + @classmethod + def setUpClass(cls): + temp_zip = tempfile.NamedTemporaryFile(delete=False) + cls.zip_path = temp_zip.name + zip_file = zipfile.ZipFile(cls.zip_path, 'w') + zip_file.writestr('module.py', 'path = "module.py"') + zip_file.writestr('module2.py', 'path= "module2.py"') + zip_file.close() + + @classmethod + def tearDownClass(cls): + os.unlink(cls.zip_path) + + def setUp(self): + self.hook = machinery.ZipPathHook() + + def tearDown(self): + self.hook.invalidate_caches() + + def test_zip_file(self): + # Path to a zip file should return a finder. + finder = self.hook(self.zip_path) + self.assertIsInstance(finder, abc.PathEntryFinder) + self.assertEqual(finder._zip_path, self.zip_path) + self.assertEqual(finder._pkg_path, '') + + def test_file_path(self): + # Path to a non-zip file should raise ImportError. + with self.assertRaises(ImportError): + self.hook(__file__) + + def test_embedded_zip_file(self): + # Path with a zip file as a prefix should return a finder. + path = os.path.join(self.zip_path, 'pkg', 'subpkg') + finder = self.hook(path) + self.assertIsInstance(finder, abc.PathEntryFinder) + self.assertEqual(finder._zip_path, self.zip_path) + self.assertEqual(finder._pkg_path, os.path.join('pkg', 'subpkg')) + + def test_embedded_file_path(self): + # Path with an embedded no-zip file should raise ImportError. + path = os.path.join(__file__, 'pkg', 'subpkg') + with self.assertRaises(ImportError): + self.hook(path) + + def populate_cache(self): + path1 = 'some bogus path' + self.hook._cache[path1] = zipfile.ZipFile(io.BytesIO(), 'w'), None + path2 = 'some other bogus path' + self.hook._cache[path2] = zipfile.ZipFile(io.BytesIO(), 'w'), None + return path1, path2 + + def test_invalidate_caches_path(self): + # Giving a path to invalidate only invalidates that one path. + path1, path2 = self.populate_cache() + self.hook.invalidate_caches(path1) + self.assertNotIn(path1, self.hook._cache) + self.assertIn(path2, self.hook._cache) + + def test_invalidate_caches_all(self): + # Not specifying a path invalidates all zip files. + self.populate_cache() + self.hook.invalidate_caches() + self.assertFalse(self.hook._cache) + + +class UnclosableBytesIO(io.BytesIO): + + """zipfile.ZipFile does not write out the TOC for a zip file until it is + closed, but at that point you can't read from an io.BytesIO instance. This + class blocks the real close to give a chance to read the contents before + actually closing the object. + + """ + + def close(self): + pass + + def _close(self): + super().close() + + +def create_zip(): + data_container = UnclosableBytesIO() + try: + with zipfile.ZipFile(data_container, 'w') as zip_writer: + paths = ('module.py', 'pkg/', 'pkg/__init__.py', + 'pkg/module.py', 'ns/', 'ns/module.py') + for path in paths: + data = '' + if not path.endswith('/'): + data = 'loc = {!r}'.format(path) + zip_writer.writestr(zipfile.ZipInfo(path), data) + data_copy = io.BytesIO(data_container.getvalue()) + finally: + data_container._close() + return zipfile.ZipFile(data_copy, 'r') + + +class FinderTests(unittest.TestCase): + + zip_path = '/path/to.code.zip' + + def setUp(self): + zip_reader = create_zip() + self.hook = machinery.ZipPathHook() + self.hook._cache_zip_object(self.zip_path, zip_reader) + + def test_invalidate_caches(self): + # Invalidates zip file back in path hook. + finder = machinery.ZipFinder(self.zip_path, '', self.hook) + finder.invalidate_caches() + self.assertFalse(self.zip_path in self.hook._cache) + + def test_find_namespace(self): + # Should return (None, [directory]). + finder = machinery.ZipFinder(self.zip_path, '', self.hook) + loader, portions = finder.find_loader('ns') + self.assertIsNone(loader) + self.assertEqual(portions, ['ns']) + + def test_find_package(self): + # Should return (loader, _) for the package. + finder = machinery.ZipFinder(self.zip_path, '', self.hook) + loader, portions = finder.find_loader('pkg') + self.assertIsInstance(loader, abc.Loader) + self.assertEqual(loader._name, 'pkg') + self.assertEqual(loader._zip_path, self.zip_path) + self.assertEqual(loader._file_path, os.path.join('pkg', '__init__.py')) + + def test_find_module(self): + # Should return (loader, []) for the module. + finder = machinery.ZipFinder(self.zip_path, '', self.hook) + loader, portions = finder.find_loader('module') + self.assertIsInstance(loader, abc.Loader) + self.assertEqual(loader._name, 'module') + self.assertEqual(loader._zip_path, self.zip_path) + self.assertEqual(loader._file_path, 'module.py') + self.assertEqual(len(portions), 0) + + def test_find_submodule(self): + # Should return (loader, []) for the submodule, even if the package + # name does not directly map to the directory structure + # (e.g. __path__ was manipulated). + # Also implicitly testing trailing os.sep in package path. + finder = machinery.ZipFinder(self.zip_path, 'pkg/', self.hook) + loader, portions = finder.find_loader('pkg.module') + self.assertIsInstance(loader, abc.Loader) + self.assertEqual(loader._name, 'pkg.module') + self.assertEqual(loader._zip_path, self.zip_path) + self.assertEqual(loader._file_path, os.path.join('pkg', 'module.py')) + + +class LoaderTests(unittest.TestCase): + + zip_path = '/path/to/code.zip' + + @classmethod + def setUpClass(cls): + zip_reader = create_zip() + cls.hook = machinery.ZipPathHook() + cls.hook._cache_zip_object(cls.zip_path, zip_reader) + + def test_get_filename(self): + # Only if the module name matches the one given to the loader should + # get_filename() return what the constructor was given. + loader = machinery.ZipLoader('module', self.zip_path, 'module.py', + self.hook) + with self.assertRaises(ImportError): + loader.get_filename('does not exist') + self.assertEqual(loader.get_filename('module'), + os.path.join(self.zip_path, 'module.py')) + + def test_get_data_full_path(self): + # Should work even when zip file path is a prefix. + path = 'pkg/module.py' + loader = machinery.ZipLoader('pkg.module', self.zip_path, + path, self.hook) + full_path = os.path.join(self.zip_path, 'pkg/module.py') + self.assertEqual(loader.get_data(full_path), + ('loc = {!r}'.format(path)).encode('ascii')) + + def test_get_data_relative_path(self): + # Should work when path is for only within the zip file. + path = 'pkg/module.py' + loader = machinery.ZipLoader('pkg.module', self.zip_path, + path, self.hook) + self.assertEqual(loader.get_data(path), + ('loc = {!r}'.format(path)).encode('ascii')) + + def test_get_data_bad_path(self): + # IOError should be raised when a non-existent path is requested. + loader = machinery.ZipLoader('pkg.module', self.zip_path, + 'pkg/module.py', self.hook) + with self.assertRaises(IOError): + loader.get_data('does not exist') + + def test_path_stats(self): + path = 'module.py' + loader = machinery.ZipLoader('module', self.zip_path, path, self.hook) + stats = loader.path_stats(path) + self.assertEqual(stats['size'], len(loader.get_data(path))) + expected_timestamp = datetime.datetime(1980, 1, 1, 0, 0, 0) + self.assertEqual(stats['mtime'], int(expected_timestamp.timestamp())) + + +if __name__ == '__main__': + unittest.main()