- New APIs: ``pyramid.response.FileResponse`` and
``pyramid.response.FileIter``, for usage in views that must serve files
"manually".
| | |
| | | be preferred over using ``pyramid.view.view_config`` with |
| | | ``context=HTTPForbidden`` as was previously recommended. |
| | | |
| | | - New APIs: ``pyramid.response.FileResponse`` and |
| | | ``pyramid.response.FileIter``, for usage in views that must serve files |
| | | "manually". |
| | | |
| | | Backwards Incompatibilities |
| | | --------------------------- |
| | | |
| | |
| | | Nice-to-Have |
| | | ------------ |
| | | |
| | | - Expose _FileIter and _FileResponse somehow fbo of |
| | | manual-static-view-creators. |
| | | |
| | | - Add docs about upgrading between Pyramid versions (e.g. how to see |
| | | deprecation warnings). |
| | | |
| | |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | .. autoclass:: FileResponse |
| | | :members: |
| | | |
| | | .. autoclass:: FileIter |
| | | |
| | | Functions |
| | | ~~~~~~~~~ |
| | | |
| | |
| | | can be used to replace the respective default values of |
| | | ``request.application_url`` partially. |
| | | |
| | | - New APIs: :class:`pyramid.response.FileResponse` and |
| | | :class:`pyramid.response.FileIter`, for usage in views that must serve |
| | | files "manually". |
| | | |
| | | Backwards Incompatibilities |
| | | --------------------------- |
| | | |
| | |
| | | import mimetypes |
| | | from os.path import ( |
| | | getmtime, |
| | | getsize, |
| | | ) |
| | | |
| | | import venusian |
| | | |
| | | from webob import Response as _Response |
| | | from zope.interface import implementer |
| | | from pyramid.interfaces import IResponse |
| | | |
| | | _BLOCK_SIZE = 4096 * 64 # 256K |
| | | |
| | | @implementer(IResponse) |
| | | class Response(_Response): |
| | | pass |
| | | |
| | | class FileResponse(Response): |
| | | """ |
| | | A Response object that can be used to serve a static file from disk |
| | | simply. |
| | | |
| | | ``path`` is a file path on disk. |
| | | |
| | | ``request`` must be a Pyramid :term:`request` object if passed. Note |
| | | that a request *must* be passed if the response is meant to attempt to |
| | | use the ``wsgi.file_wrapper`` feature of the web server that you're using |
| | | to serve your Pyramid application. |
| | | |
| | | ``cache_max_age`` if passed, is the number of seconds that should be used |
| | | to HTTP cache this response. |
| | | """ |
| | | def __init__(self, path, request=None, cache_max_age=None): |
| | | super(FileResponse, self).__init__(conditional_response=True) |
| | | self.last_modified = getmtime(path) |
| | | content_type, content_encoding = mimetypes.guess_type(path, |
| | | strict=False) |
| | | if content_type is None: |
| | | content_type = 'application/octet-stream' |
| | | self.content_type = content_type |
| | | self.content_encoding = content_encoding |
| | | content_length = getsize(path) |
| | | f = open(path, 'rb') |
| | | app_iter = None |
| | | if request is not None: |
| | | environ = request.environ |
| | | if 'wsgi.file_wrapper' in environ: |
| | | app_iter = environ['wsgi.file_wrapper'](f, _BLOCK_SIZE) |
| | | if app_iter is None: |
| | | app_iter = FileIter(f, _BLOCK_SIZE) |
| | | self.app_iter = app_iter |
| | | # assignment of content_length must come after assignment of app_iter |
| | | self.content_length = content_length |
| | | if cache_max_age is not None: |
| | | self.cache_expires = cache_max_age |
| | | |
| | | class FileIter(object): |
| | | """ A fixed-block-size iterator for use as a WSGI app_iter. |
| | | |
| | | ``file`` is a Python file pointer (or at least an object with a ``read`` |
| | | method that takes a size hint). |
| | | |
| | | ``block_size`` is an optional block size for iteration. |
| | | """ |
| | | def __init__(self, file, block_size=_BLOCK_SIZE): |
| | | self.file = file |
| | | self.block_size = block_size |
| | | |
| | | def __iter__(self): |
| | | return self |
| | | |
| | | def next(self): |
| | | val = self.file.read(self.block_size) |
| | | if not val: |
| | | raise StopIteration |
| | | return val |
| | | |
| | | __next__ = next # py3 |
| | | |
| | | def close(self): |
| | | self.file.close() |
| | | |
| | | |
| | | class response_adapter(object): |
| | | """ Decorator activated via a :term:`scan` which treats the function |
| | | being decorated as a :term:`response adapter` for the set of types or |
| | |
| | | normcase, |
| | | normpath, |
| | | join, |
| | | getmtime, |
| | | getsize, |
| | | isdir, |
| | | exists, |
| | | ) |
| | |
| | | ) |
| | | |
| | | from pyramid.path import caller_package |
| | | from pyramid.response import Response |
| | | from pyramid.response import FileResponse |
| | | from pyramid.traversal import traversal_path_info |
| | | |
| | | slash = text_('/') |
| | |
| | | # that seems to effect Python 2.6, Python 2.6.1, and 2.6.2 (a fix |
| | | # has been applied on the Python 2 trunk). |
| | | init_mimetypes(mimetypes) |
| | | |
| | | _BLOCK_SIZE = 4096 * 64 # 256K |
| | | |
| | | class _FileResponse(Response): |
| | | """ |
| | | Serves a static filelike object. |
| | | """ |
| | | def __init__(self, path, cache_max_age, request): |
| | | super(_FileResponse, self).__init__(conditional_response=True) |
| | | self.last_modified = getmtime(path) |
| | | content_type, content_encoding = mimetypes.guess_type(path, |
| | | strict=False) |
| | | if content_type is None: |
| | | content_type = 'application/octet-stream' |
| | | self.content_type = content_type |
| | | self.content_encoding = content_encoding |
| | | content_length = getsize(path) |
| | | f = open(path, 'rb') |
| | | environ = request.environ |
| | | if 'wsgi.file_wrapper' in environ: |
| | | app_iter = environ['wsgi.file_wrapper'](f, _BLOCK_SIZE) |
| | | else: |
| | | app_iter = _FileIter(f, _BLOCK_SIZE) |
| | | self.app_iter = app_iter |
| | | # assignment of content_length must come after assignment of app_iter |
| | | self.content_length = content_length |
| | | if cache_max_age is not None: |
| | | self.cache_expires = cache_max_age |
| | | |
| | | class _FileIter(object): |
| | | def __init__(self, file, block_size): |
| | | self.file = file |
| | | self.block_size = block_size |
| | | |
| | | def __iter__(self): |
| | | return self |
| | | |
| | | def next(self): |
| | | val = self.file.read(self.block_size) |
| | | if not val: |
| | | raise StopIteration |
| | | return val |
| | | |
| | | __next__ = next # py3 |
| | | |
| | | def close(self): |
| | | self.file.close() |
| | | |
| | | class static_view(object): |
| | | """ An instance of this class is a callable which can act as a |
| | |
| | | if not exists(filepath): |
| | | return HTTPNotFound(request.url) |
| | | |
| | | return _FileResponse(filepath ,self.cache_max_age, request) |
| | | return FileResponse(filepath, request, self.cache_max_age) |
| | | |
| | | def add_slash_redirect(self, request): |
| | | url = request.path_url + '/' |
| | |
| | | import io |
| | | import unittest |
| | | from pyramid import testing |
| | | |
| | |
| | | inst = self._getTargetClass()() |
| | | self.assertTrue(IResponse.providedBy(inst)) |
| | | |
| | | class TestFileIter(unittest.TestCase): |
| | | def _makeOne(self, file, block_size): |
| | | from pyramid.response import FileIter |
| | | return FileIter(file, block_size) |
| | | |
| | | def test___iter__(self): |
| | | f = io.BytesIO(b'abc') |
| | | inst = self._makeOne(f, 1) |
| | | self.assertEqual(inst.__iter__(), inst) |
| | | |
| | | def test_iteration(self): |
| | | data = b'abcdef' |
| | | f = io.BytesIO(b'abcdef') |
| | | inst = self._makeOne(f, 1) |
| | | r = b'' |
| | | for x in inst: |
| | | self.assertEqual(len(x), 1) |
| | | r+=x |
| | | self.assertEqual(r, data) |
| | | |
| | | def test_close(self): |
| | | f = io.BytesIO(b'abc') |
| | | inst = self._makeOne(f, 1) |
| | | inst.close() |
| | | self.assertTrue(f.closed) |
| | | |
| | | class Dummy(object): |
| | | pass |
| | | |
| | |
| | | import datetime |
| | | import unittest |
| | | import io |
| | | |
| | | # 5 years from now (more or less) |
| | | fiveyrsfuture = datetime.datetime.utcnow() + datetime.timedelta(5*365) |
| | |
| | | self.assertTrue(b'<html>static</html>' in response.body) |
| | | |
| | | def test_resource_is_file_with_wsgi_file_wrapper(self): |
| | | from pyramid.static import _BLOCK_SIZE |
| | | from pyramid.response import _BLOCK_SIZE |
| | | inst = self._makeOne('pyramid.tests:fixtures/static') |
| | | request = self._makeRequest({'PATH_INFO':'/index.html'}) |
| | | class _Wrapper(object): |
| | |
| | | module = DummyMimetypes() |
| | | result = self._callFUT(module) |
| | | self.assertEqual(result, False) |
| | | |
| | | class Test_FileIter(unittest.TestCase): |
| | | def _makeOne(self, file, block_size): |
| | | from pyramid.static import _FileIter |
| | | return _FileIter(file, block_size) |
| | | |
| | | def test___iter__(self): |
| | | f = io.BytesIO(b'abc') |
| | | inst = self._makeOne(f, 1) |
| | | self.assertEqual(inst.__iter__(), inst) |
| | | |
| | | def test_iteration(self): |
| | | data = b'abcdef' |
| | | f = io.BytesIO(b'abcdef') |
| | | inst = self._makeOne(f, 1) |
| | | r = b'' |
| | | for x in inst: |
| | | self.assertEqual(len(x), 1) |
| | | r+=x |
| | | self.assertEqual(r, data) |
| | | |
| | | def test_close(self): |
| | | f = io.BytesIO(b'abc') |
| | | inst = self._makeOne(f, 1) |
| | | inst.close() |
| | | self.assertTrue(f.closed) |
| | | |
| | | class DummyContext: |
| | | pass |