import os import pkg_resources import sys from zope.interface import implementer from pyramid.interfaces import IPackageOverrides, PHASE1_CONFIG from pyramid.exceptions import ConfigurationError from pyramid.threadlocal import get_current_registry from pyramid.config.actions import action_method class OverrideProvider(pkg_resources.DefaultProvider): def __init__(self, module): pkg_resources.DefaultProvider.__init__(self, module) self.module_name = module.__name__ def _get_overrides(self): reg = get_current_registry() overrides = reg.queryUtility(IPackageOverrides, self.module_name) return overrides def get_resource_filename(self, manager, resource_name): """ Return a true filesystem path for resource_name, co-ordinating the extraction with manager, if the resource must be unpacked to the filesystem. """ overrides = self._get_overrides() if overrides is not None: filename = overrides.get_filename(resource_name) if filename is not None: return filename return pkg_resources.DefaultProvider.get_resource_filename( self, manager, resource_name ) def get_resource_stream(self, manager, resource_name): """ Return a readable file-like object for resource_name.""" overrides = self._get_overrides() if overrides is not None: stream = overrides.get_stream(resource_name) if stream is not None: return stream return pkg_resources.DefaultProvider.get_resource_stream( self, manager, resource_name ) def get_resource_string(self, manager, resource_name): """ Return a string containing the contents of resource_name.""" overrides = self._get_overrides() if overrides is not None: string = overrides.get_string(resource_name) if string is not None: return string return pkg_resources.DefaultProvider.get_resource_string( self, manager, resource_name ) def has_resource(self, resource_name): overrides = self._get_overrides() if overrides is not None: result = overrides.has_resource(resource_name) if result is not None: return result return pkg_resources.DefaultProvider.has_resource(self, resource_name) def resource_isdir(self, resource_name): overrides = self._get_overrides() if overrides is not None: result = overrides.isdir(resource_name) if result is not None: return result return pkg_resources.DefaultProvider.resource_isdir( self, resource_name ) def resource_listdir(self, resource_name): overrides = self._get_overrides() if overrides is not None: result = overrides.listdir(resource_name) if result is not None: return result return pkg_resources.DefaultProvider.resource_listdir( self, resource_name ) @implementer(IPackageOverrides) class PackageOverrides(object): # pkg_resources arg in kw args below for testing def __init__(self, package, pkg_resources=pkg_resources): loader = self._real_loader = getattr(package, '__loader__', None) if isinstance(loader, self.__class__): self._real_loader = None # We register ourselves as a __loader__ *only* to support the # setuptools _find_adapter adapter lookup; this class doesn't # actually support the PEP 302 loader "API". This is # excusable due to the following statement in the spec: # ... Loader objects are not # required to offer any useful functionality (any such functionality, # such as the zipimport get_data() method mentioned above, is # optional)... # A __loader__ attribute is basically metadata, and setuptools # uses it as such. package.__loader__ = self # we call register_loader_type for every instantiation of this # class; that's OK, it's idempotent to do it more than once. pkg_resources.register_loader_type(self.__class__, OverrideProvider) self.overrides = [] self.overridden_package_name = package.__name__ def insert(self, path, source): if not path or path.endswith('/'): override = DirectoryOverride(path, source) else: override = FileOverride(path, source) self.overrides.insert(0, override) return override def filtered_sources(self, resource_name): for override in self.overrides: o = override(resource_name) if o is not None: yield o def get_filename(self, resource_name): for source, path in self.filtered_sources(resource_name): result = source.get_filename(path) if result is not None: return result def get_stream(self, resource_name): for source, path in self.filtered_sources(resource_name): result = source.get_stream(path) if result is not None: return result def get_string(self, resource_name): for source, path in self.filtered_sources(resource_name): result = source.get_string(path) if result is not None: return result def has_resource(self, resource_name): for source, path in self.filtered_sources(resource_name): if source.exists(path): return True def isdir(self, resource_name): for source, path in self.filtered_sources(resource_name): result = source.isdir(path) if result is not None: return result def listdir(self, resource_name): for source, path in self.filtered_sources(resource_name): result = source.listdir(path) if result is not None: return result @property def real_loader(self): if self._real_loader is None: raise NotImplementedError() return self._real_loader def get_data(self, path): """ See IPEP302Loader. """ return self.real_loader.get_data(path) def is_package(self, fullname): """ See IPEP302Loader. """ return self.real_loader.is_package(fullname) def get_code(self, fullname): """ See IPEP302Loader. """ return self.real_loader.get_code(fullname) def get_source(self, fullname): """ See IPEP302Loader. """ return self.real_loader.get_source(fullname) class DirectoryOverride: def __init__(self, path, source): self.path = path self.pathlen = len(self.path) self.source = source def __call__(self, resource_name): if resource_name.startswith(self.path): new_path = resource_name[self.pathlen :] return self.source, new_path class FileOverride: def __init__(self, path, source): self.path = path self.source = source def __call__(self, resource_name): if resource_name == self.path: return self.source, '' class PackageAssetSource(object): """ An asset source relative to a package. If this asset source is a file, then we expect the ``prefix`` to point to the new name of the file, and the incoming ``resource_name`` will be the empty string, as returned by the ``FileOverride``. """ def __init__(self, package, prefix): self.package = package if hasattr(package, '__name__'): self.pkg_name = package.__name__ else: self.pkg_name = package self.prefix = prefix def get_path(self, resource_name): return '%s%s' % (self.prefix, resource_name) def get_filename(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return pkg_resources.resource_filename(self.pkg_name, path) def get_stream(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return pkg_resources.resource_stream(self.pkg_name, path) def get_string(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return pkg_resources.resource_string(self.pkg_name, path) def exists(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return True def isdir(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return pkg_resources.resource_isdir(self.pkg_name, path) def listdir(self, resource_name): path = self.get_path(resource_name) if pkg_resources.resource_exists(self.pkg_name, path): return pkg_resources.resource_listdir(self.pkg_name, path) class FSAssetSource(object): """ An asset source relative to a path in the filesystem. """ def __init__(self, prefix): self.prefix = prefix def get_path(self, resource_name): if resource_name: path = os.path.join(self.prefix, resource_name) else: path = self.prefix return path def get_filename(self, resource_name): path = self.get_path(resource_name) if os.path.exists(path): return path def get_stream(self, resource_name): path = self.get_filename(resource_name) if path is not None: return open(path, 'rb') def get_string(self, resource_name): stream = self.get_stream(resource_name) if stream is not None: with stream: return stream.read() def exists(self, resource_name): path = self.get_filename(resource_name) if path is not None: return True def isdir(self, resource_name): path = self.get_filename(resource_name) if path is not None: return os.path.isdir(path) def listdir(self, resource_name): path = self.get_filename(resource_name) if path is not None: return os.listdir(path) class AssetsConfiguratorMixin(object): def _override( self, package, path, override_source, PackageOverrides=PackageOverrides ): pkg_name = package.__name__ override = self.registry.queryUtility(IPackageOverrides, name=pkg_name) if override is None: override = PackageOverrides(package) self.registry.registerUtility( override, IPackageOverrides, name=pkg_name ) override.insert(path, override_source) @action_method def override_asset(self, to_override, override_with, _override=None): """ Add a :app:`Pyramid` asset override to the current configuration state. ``to_override`` is an :term:`asset specification` to the asset being overridden. ``override_with`` is an :term:`asset specification` to the asset that is performing the override. This may also be an absolute path. See :ref:`assets_chapter` for more information about asset overrides.""" if to_override == override_with: raise ConfigurationError( 'You cannot override an asset with itself' ) package = to_override path = '' if ':' in to_override: package, path = to_override.split(':', 1) # *_isdir = override is package or directory overridden_isdir = path == '' or path.endswith('/') if os.path.isabs(override_with): override_source = FSAssetSource(override_with) if not os.path.exists(override_with): raise ConfigurationError( 'Cannot override asset with an absolute path that does ' 'not exist' ) override_isdir = os.path.isdir(override_with) override_package = None override_prefix = override_with else: override_package = override_with override_prefix = '' if ':' in override_with: override_package, override_prefix = override_with.split(':', 1) __import__(override_package) to_package = sys.modules[override_package] override_source = PackageAssetSource(to_package, override_prefix) override_isdir = override_prefix == '' or override_with.endswith( '/' ) if overridden_isdir and (not override_isdir): raise ConfigurationError( 'A directory cannot be overridden with a file (put a ' 'slash at the end of override_with if necessary)' ) if (not overridden_isdir) and override_isdir: raise ConfigurationError( 'A file cannot be overridden with a directory (put a ' 'slash at the end of to_override if necessary)' ) override = _override or self._override # test jig def register(): __import__(package) from_package = sys.modules[package] override(from_package, path, override_source) intr = self.introspectable( 'asset overrides', (package, override_package, path, override_prefix), '%s -> %s' % (to_override, override_with), 'asset override', ) intr['to_override'] = to_override intr['override_with'] = override_with self.action( None, register, introspectables=(intr,), order=PHASE1_CONFIG ) override_resource = override_asset # bw compat