| | |
| | | import json |
| | | import os |
| | | |
| | | from os.path import ( |
| | | getmtime, |
| | | normcase, |
| | | normpath, |
| | | join, |
| | | isdir, |
| | | exists, |
| | | ) |
| | | from os.path import getmtime, normcase, normpath, join, isdir, exists |
| | | |
| | | from pkg_resources import ( |
| | | resource_exists, |
| | | resource_filename, |
| | | resource_isdir, |
| | | ) |
| | | from pkg_resources import resource_exists, resource_filename, resource_isdir |
| | | |
| | | from pyramid.asset import ( |
| | | abspath_from_asset_spec, |
| | | resolve_asset_spec, |
| | | ) |
| | | from pyramid.asset import abspath_from_asset_spec, resolve_asset_spec |
| | | |
| | | from pyramid.compat import ( |
| | | lru_cache, |
| | | text_, |
| | | ) |
| | | from pyramid.compat import lru_cache, text_ |
| | | |
| | | from pyramid.httpexceptions import ( |
| | | HTTPNotFound, |
| | | HTTPMovedPermanently, |
| | | ) |
| | | from pyramid.httpexceptions import HTTPNotFound, HTTPMovedPermanently |
| | | |
| | | from pyramid.path import caller_package |
| | | |
| | | from pyramid.response import ( |
| | | _guess_type, |
| | | FileResponse, |
| | | ) |
| | | from pyramid.response import _guess_type, FileResponse |
| | | |
| | | from pyramid.traversal import traversal_path_info |
| | | |
| | | slash = text_('/') |
| | | |
| | | |
| | | class static_view(object): |
| | | """ An instance of this class is a callable which can act as a |
| | |
| | | to override the assets it contains. |
| | | """ |
| | | |
| | | def __init__(self, root_dir, cache_max_age=3600, package_name=None, |
| | | use_subpath=False, index='index.html'): |
| | | def __init__( |
| | | self, |
| | | root_dir, |
| | | cache_max_age=3600, |
| | | package_name=None, |
| | | use_subpath=False, |
| | | index='index.html', |
| | | ): |
| | | # package_name is for bw compat; it is preferred to pass in a |
| | | # package-relative path as root_dir |
| | | # (e.g. ``anotherpackage:foo/static``). |
| | |
| | | if path is None: |
| | | raise HTTPNotFound('Out of bounds: %s' % request.url) |
| | | |
| | | if self.package_name: # package resource |
| | | if self.package_name: # package resource |
| | | resource_path = '%s/%s' % (self.docroot.rstrip('/'), path) |
| | | if resource_isdir(self.package_name, resource_path): |
| | | if not request.path_url.endswith('/'): |
| | | self.add_slash_redirect(request) |
| | | resource_path = '%s/%s' % ( |
| | | resource_path.rstrip('/'), self.index |
| | | resource_path.rstrip('/'), |
| | | self.index, |
| | | ) |
| | | |
| | | if not resource_exists(self.package_name, resource_path): |
| | | raise HTTPNotFound(request.url) |
| | | filepath = resource_filename(self.package_name, resource_path) |
| | | |
| | | else: # filesystem file |
| | | else: # filesystem file |
| | | |
| | | # os.path.normpath converts / to \ on windows |
| | | filepath = normcase(normpath(join(self.norm_docroot, path))) |
| | |
| | | |
| | | content_type, content_encoding = _guess_type(filepath) |
| | | return FileResponse( |
| | | filepath, request, self.cache_max_age, |
| | | content_type, content_encoding=None) |
| | | filepath, |
| | | request, |
| | | self.cache_max_age, |
| | | content_type, |
| | | content_encoding=None, |
| | | ) |
| | | |
| | | def add_slash_redirect(self, request): |
| | | url = request.path_url + '/' |
| | |
| | | url = url + '?' + qs |
| | | raise HTTPMovedPermanently(url) |
| | | |
| | | |
| | | _seps = set(['/', os.sep]) |
| | | |
| | | |
| | | def _contains_slash(item): |
| | | for sep in _seps: |
| | | if sep in item: |
| | | return True |
| | | |
| | | |
| | | _has_insecure_pathelement = set(['..', '.', '']).intersection |
| | | |
| | | |
| | | @lru_cache(1000) |
| | | def _secure_path(path_tuple): |
| | |
| | | return None |
| | | if any([_contains_slash(item) for item in path_tuple]): |
| | | return None |
| | | encoded = slash.join(path_tuple) # will be unicode |
| | | encoded = slash.join(path_tuple) # will be unicode |
| | | return encoded |
| | | |
| | | |
| | | class QueryStringCacheBuster(object): |
| | | """ |
| | |
| | | |
| | | .. versionadded:: 1.6 |
| | | """ |
| | | |
| | | def __init__(self, param='x'): |
| | | self.param = param |
| | | |
| | |
| | | else: |
| | | kw['_query'] = tuple(query) + ((self.param, token),) |
| | | return subpath, kw |
| | | |
| | | |
| | | class QueryStringConstantCacheBuster(QueryStringCacheBuster): |
| | | """ |
| | |
| | | |
| | | .. versionadded:: 1.6 |
| | | """ |
| | | |
| | | def __init__(self, token, param='x'): |
| | | super(QueryStringConstantCacheBuster, self).__init__(param=param) |
| | | self._token = token |
| | | |
| | | def tokenize(self, request, subpath, kw): |
| | | return self._token |
| | | |
| | | |
| | | class ManifestCacheBuster(object): |
| | | """ |
| | |
| | | |
| | | .. versionadded:: 1.6 |
| | | """ |
| | | exists = staticmethod(exists) # testing |
| | | getmtime = staticmethod(getmtime) # testing |
| | | |
| | | exists = staticmethod(exists) # testing |
| | | getmtime = staticmethod(getmtime) # testing |
| | | |
| | | def __init__(self, manifest_spec, reload=False): |
| | | package_name = caller_package().__name__ |
| | | self.manifest_path = abspath_from_asset_spec( |
| | | manifest_spec, package_name) |
| | | manifest_spec, package_name |
| | | ) |
| | | self.reload = reload |
| | | |
| | | self._mtime = None |