Michael Merickel
2018-10-26 572e03c1e47385f093ffb701e21c226f99247837
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
# -*- coding: utf-8 -*-
import json
import os
 
from os.path import getmtime, normcase, normpath, join, isdir, exists
 
from pkg_resources import resource_exists, resource_filename, resource_isdir
 
from pyramid.asset import abspath_from_asset_spec, resolve_asset_spec
 
from pyramid.compat import lru_cache, text_
 
from pyramid.httpexceptions import HTTPNotFound, HTTPMovedPermanently
 
from pyramid.path import caller_package
 
from pyramid.response import _guess_type, FileResponse
 
from pyramid.traversal import traversal_path_info
 
slash = text_('/')
 
 
class static_view(object):
    """ An instance of this class is a callable which can act as a
    :app:`Pyramid` :term:`view callable`; this view will serve
    static files from a directory on disk based on the ``root_dir``
    you provide to its constructor.
 
    The directory may contain subdirectories (recursively); the static
    view implementation will descend into these directories as
    necessary based on the components of the URL in order to resolve a
    path into a response.
 
    You may pass an absolute or relative filesystem path or a
    :term:`asset specification` representing the directory
    containing static files as the ``root_dir`` argument to this
    class' constructor.
 
    If the ``root_dir`` path is relative, and the ``package_name``
    argument is ``None``, ``root_dir`` will be considered relative to
    the directory in which the Python file which *calls* ``static``
    resides.  If the ``package_name`` name argument is provided, and a
    relative ``root_dir`` is provided, the ``root_dir`` will be
    considered relative to the Python :term:`package` specified by
    ``package_name`` (a dotted path to a Python package).
 
    ``cache_max_age`` influences the ``Expires`` and ``Max-Age``
    response headers returned by the view (default is 3600 seconds or
    one hour).
 
    ``use_subpath`` influences whether ``request.subpath`` will be used as
    ``PATH_INFO`` when calling the underlying WSGI application which actually
    serves the static files.  If it is ``True``, the static application will
    consider ``request.subpath`` as ``PATH_INFO`` input.  If it is ``False``,
    the static application will consider request.environ[``PATH_INFO``] as
    ``PATH_INFO`` input. By default, this is ``False``.
 
    .. note::
 
       If the ``root_dir`` is relative to a :term:`package`, or is a
       :term:`asset specification` the :app:`Pyramid`
       :class:`pyramid.config.Configurator` method can be used to override
       assets within the named ``root_dir`` package-relative directory.
       However, if the ``root_dir`` is absolute, configuration will not be able
       to override the assets it contains.
    """
 
    def __init__(
        self,
        root_dir,
        cache_max_age=3600,
        package_name=None,
        use_subpath=False,
        index='index.html',
    ):
        # package_name is for bw compat; it is preferred to pass in a
        # package-relative path as root_dir
        # (e.g. ``anotherpackage:foo/static``).
        self.cache_max_age = cache_max_age
        if package_name is None:
            package_name = caller_package().__name__
        package_name, docroot = resolve_asset_spec(root_dir, package_name)
        self.use_subpath = use_subpath
        self.package_name = package_name
        self.docroot = docroot
        self.norm_docroot = normcase(normpath(docroot))
        self.index = index
 
    def __call__(self, context, request):
        if self.use_subpath:
            path_tuple = request.subpath
        else:
            path_tuple = traversal_path_info(request.environ['PATH_INFO'])
        path = _secure_path(path_tuple)
 
        if path is None:
            raise HTTPNotFound('Out of bounds: %s' % request.url)
 
        if self.package_name:  # package resource
            resource_path = '%s/%s' % (self.docroot.rstrip('/'), path)
            if resource_isdir(self.package_name, resource_path):
                if not request.path_url.endswith('/'):
                    self.add_slash_redirect(request)
                resource_path = '%s/%s' % (
                    resource_path.rstrip('/'),
                    self.index,
                )
 
            if not resource_exists(self.package_name, resource_path):
                raise HTTPNotFound(request.url)
            filepath = resource_filename(self.package_name, resource_path)
 
        else:  # filesystem file
 
            # os.path.normpath converts / to \ on windows
            filepath = normcase(normpath(join(self.norm_docroot, path)))
            if isdir(filepath):
                if not request.path_url.endswith('/'):
                    self.add_slash_redirect(request)
                filepath = join(filepath, self.index)
            if not exists(filepath):
                raise HTTPNotFound(request.url)
 
        content_type, content_encoding = _guess_type(filepath)
        return FileResponse(
            filepath,
            request,
            self.cache_max_age,
            content_type,
            content_encoding=None,
        )
 
    def add_slash_redirect(self, request):
        url = request.path_url + '/'
        qs = request.query_string
        if qs:
            url = url + '?' + qs
        raise HTTPMovedPermanently(url)
 
 
_seps = set(['/', os.sep])
 
 
def _contains_slash(item):
    for sep in _seps:
        if sep in item:
            return True
 
 
_has_insecure_pathelement = set(['..', '.', '']).intersection
 
 
@lru_cache(1000)
def _secure_path(path_tuple):
    if _has_insecure_pathelement(path_tuple):
        # belt-and-suspenders security; this should never be true
        # unless someone screws up the traversal_path code
        # (request.subpath is computed via traversal_path too)
        return None
    if any([_contains_slash(item) for item in path_tuple]):
        return None
    encoded = slash.join(path_tuple)  # will be unicode
    return encoded
 
 
class QueryStringCacheBuster(object):
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
    a token for cache busting in the query string of an asset URL.
 
    The optional ``param`` argument determines the name of the parameter added
    to the query string and defaults to ``'x'``.
 
    To use this class, subclass it and provide a ``tokenize`` method which
    accepts ``request, pathspec, kw`` and returns a token.
 
    .. versionadded:: 1.6
    """
 
    def __init__(self, param='x'):
        self.param = param
 
    def __call__(self, request, subpath, kw):
        token = self.tokenize(request, subpath, kw)
        query = kw.setdefault('_query', {})
        if isinstance(query, dict):
            query[self.param] = token
        else:
            kw['_query'] = tuple(query) + ((self.param, token),)
        return subpath, kw
 
 
class QueryStringConstantCacheBuster(QueryStringCacheBuster):
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which adds
    an arbitrary token for cache busting in the query string of an asset URL.
 
    The ``token`` parameter is the token string to use for cache busting and
    will be the same for every request.
 
    The optional ``param`` argument determines the name of the parameter added
    to the query string and defaults to ``'x'``.
 
    .. versionadded:: 1.6
    """
 
    def __init__(self, token, param='x'):
        super(QueryStringConstantCacheBuster, self).__init__(param=param)
        self._token = token
 
    def tokenize(self, request, subpath, kw):
        return self._token
 
 
class ManifestCacheBuster(object):
    """
    An implementation of :class:`~pyramid.interfaces.ICacheBuster` which
    uses a supplied manifest file to map an asset path to a cache-busted
    version of the path.
 
    The ``manifest_spec`` can be an absolute path or a :term:`asset
    specification` pointing to a package-relative file.
 
    The manifest file is expected to conform to the following simple JSON
    format:
 
    .. code-block:: json
 
       {
           "css/main.css": "css/main-678b7c80.css",
           "images/background.png": "images/background-a8169106.png",
       }
 
    By default, it is a JSON-serialized dictionary where the keys are the
    source asset paths used in calls to
    :meth:`~pyramid.request.Request.static_url`. For example:
 
    .. code-block:: pycon
 
       >>> request.static_url('myapp:static/css/main.css')
       "http://www.example.com/static/css/main-678b7c80.css"
 
    The file format and location can be changed by subclassing and overriding
    :meth:`.parse_manifest`.
 
    If a path is not found in the manifest it will pass through unchanged.
 
    If ``reload`` is ``True`` then the manifest file will be reloaded when
    changed. It is not recommended to leave this enabled in production.
 
    If the manifest file cannot be found on disk it will be treated as
    an empty mapping unless ``reload`` is ``False``.
 
    .. versionadded:: 1.6
    """
 
    exists = staticmethod(exists)  # testing
    getmtime = staticmethod(getmtime)  # testing
 
    def __init__(self, manifest_spec, reload=False):
        package_name = caller_package().__name__
        self.manifest_path = abspath_from_asset_spec(
            manifest_spec, package_name
        )
        self.reload = reload
 
        self._mtime = None
        if not reload:
            self._manifest = self.get_manifest()
 
    def get_manifest(self):
        with open(self.manifest_path, 'rb') as fp:
            return self.parse_manifest(fp.read())
 
    def parse_manifest(self, content):
        """
        Parse the ``content`` read from the ``manifest_path`` into a
        dictionary mapping.
 
        Subclasses may override this method to use something other than
        ``json.loads`` to load any type of file format and return a conforming
        dictionary.
 
        """
        return json.loads(content.decode('utf-8'))
 
    @property
    def manifest(self):
        """ The current manifest dictionary."""
        if self.reload:
            if not self.exists(self.manifest_path):
                return {}
            mtime = self.getmtime(self.manifest_path)
            if self._mtime is None or mtime > self._mtime:
                self._manifest = self.get_manifest()
                self._mtime = mtime
        return self._manifest
 
    def __call__(self, request, subpath, kw):
        subpath = self.manifest.get(subpath, subpath)
        return (subpath, kw)