Chris McDonough
2011-09-06 f2ef797a1514a30e8dbb66e363100ef8c624811b
first cut; still missing features as documented in TODO.txt
9 files modified
1092 ■■■■ changed files
TODO.txt 14 ●●●● patch | view | raw | blame | history
pyramid/static.py 259 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_config/test_views.py 15 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_integration.py 209 ●●●● patch | view | raw | blame | history
pyramid/tests/test_static.py 511 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_traversal.py 3 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_view.py 62 ●●●●● patch | view | raw | blame | history
pyramid/traversal.py 3 ●●●● patch | view | raw | blame | history
pyramid/view.py 16 ●●●●● patch | view | raw | blame | history
TODO.txt
@@ -81,10 +81,16 @@
- 1.3: - Eliminate non-deployment-non-scaffold-related Paste dependency:
  ``paste.urlparser.StaticURLParser`` (cutnpaste or reimplement, possibly
  using chrisrossi's happy stuff as a base).  paste.urlparser/paste.fileapp
  features missing from happy.static: ``wsgi.file_wrapper`` support
  (FileApp.get), 'HEAD' method support (FileApp.get), ETAG and if-none-match
  support (DataApp.get), handling file permission exceptions (FileApp.get),
  using chrisrossi's happy stuff as a base).   Still need:
  ``wsgi.file_wrapper`` support (FileApp.get)
  'HEAD' method support (FileApp.get)
  handling file permission exceptions (FileApp.get).
  Features we won't supportL ETAG and if-none-match
  support (DataApp.get); replace with if-modified-since handling.
- 1.3: use zope.registry rather than zope.component.
pyramid/static.py
@@ -1,82 +1,95 @@
import os
import pkg_resources
from datetime import datetime, timedelta
from os.path import normcase, normpath, join, getmtime, getsize, isdir, exists
from pkg_resources import resource_exists, resource_filename, resource_isdir
import mimetypes
from paste import httpexceptions
from paste import request
from paste.httpheaders import ETAG
from paste.urlparser import StaticURLParser
from repoze.lru import lru_cache
from pyramid.asset import resolve_asset_spec
from pyramid.httpexceptions import HTTPNotFound
from pyramid.httpexceptions import HTTPMovedPermanently
from pyramid.path import caller_package
from pyramid.request import call_app_with_subpath_as_path_info
from pyramid.response import Response
from pyramid.traversal import traversal_path
from pyramid.traversal import quote_path_segment
class PackageURLParser(StaticURLParser):
    """ This probably won't work with zipimported resources """
    def __init__(self, package_name, resource_name, root_resource=None,
                 cache_max_age=None):
        self.package_name = package_name
        self.resource_name = os.path.normpath(resource_name)
        if root_resource is None:
            root_resource = self.resource_name
        self.root_resource = root_resource
        self.cache_max_age = cache_max_age
DEFAULT_CHUNKSIZE = 1<<16 # 64 kilobytes
    def __call__(self, environ, start_response):
        path_info = environ.get('PATH_INFO', '')
        if not path_info:
            return self.add_slash(environ, start_response)
        if path_info == '/':
            # @@: This should obviously be configurable
            filename = 'index.html'
        else:
            filename = request.path_info_pop(environ)
        resource = os.path.normcase(os.path.normpath(
                    self.resource_name + '/' + filename))
        if not resource.startswith(self.root_resource):
            # Out of bounds
            return self.not_found(environ, start_response)
        if not pkg_resources.resource_exists(self.package_name, resource):
            return self.not_found(environ, start_response)
        if pkg_resources.resource_isdir(self.package_name, resource):
            # @@: Cache?
            return self.__class__(
                self.package_name, resource, root_resource=self.resource_name,
                cache_max_age=self.cache_max_age)(environ, start_response)
        pi = environ.get('PATH_INFO')
        if pi and pi != '/':
            return self.error_extra_path(environ, start_response)
        full = pkg_resources.resource_filename(self.package_name, resource)
        if_none_match = environ.get('HTTP_IF_NONE_MATCH')
        if if_none_match:
            mytime = os.stat(full).st_mtime
            if str(mytime) == if_none_match:
                headers = []
                ETAG.update(headers, mytime)
                start_response('304 Not Modified', headers)
                return [''] # empty body
def init_mimetypes(mimetypes):
    # this is a function so it can be unittested
    if hasattr(mimetypes, 'init'):
        mimetypes.init()
        return True
    return False
        fa = self.make_app(full)
        if self.cache_max_age:
            fa.cache_control(max_age=self.cache_max_age)
        return fa(environ, start_response)
# See http://bugs.python.org/issue5853 which is a recursion bug
# that seems to effect Python 2.6, Python 2.6.1, and 2.6.2 (a fix
# has been applied on the Python 2 trunk).
init_mimetypes(mimetypes)
    def not_found(self, environ, start_response, debug_message=None):
        comment=('SCRIPT_NAME=%r; PATH_INFO=%r; looking in package %s; '
                 'subdir %s ;debug: %s' % (environ.get('SCRIPT_NAME'),
                                           environ.get('PATH_INFO'),
                                           self.package_name,
                                           self.resource_name,
                                           debug_message or '(none)'))
        exc = httpexceptions.HTTPNotFound(
            'The resource at %s could not be found'
            % request.construct_url(environ),
            comment=comment)
        return exc.wsgi_application(environ, start_response)
class FileResponse(Response):
    """
    Serves a static filelike object.
    """
    def __init__(self, path, request, expires, chunksize=DEFAULT_CHUNKSIZE):
        super(FileResponse, self).__init__()
        self.request = request
        self.last_modified = datetime.utcfromtimestamp(getmtime(path))
    def __repr__(self):
        return '<%s %s:%s at %s>' % (self.__class__.__name__, self.package_name,
                                     self.root_resource, id(self))
        # Check 'If-Modified-Since' request header
        # Browser might already have in cache
        modified_since = request.if_modified_since
        if modified_since is not None:
            if self.last_modified <= modified_since:
                self.status = 304
                return
        # Provide partial response if requested
        content_length = getsize(path)
        request_range = self._get_range(content_length)
        if request_range is not None:
            start, end = request_range
            if start >= content_length:
                self.status_int = 416 # Request range not satisfiable
                return
            self.status_int = 206 # Partial Content
            self.headers['Content-Range'] = 'bytes %d-%d/%d' % (
                start, end-1, content_length)
            content_length = end - start
        self.date = datetime.utcnow()
        self.app_iter = _file_iter(path, chunksize, request_range)
        self.content_type = mimetypes.guess_type(path, strict=False)[0]
        self.content_length = content_length
        if expires is not None:
            self.expires = self.date + expires
    def _get_range(self, content_length):
        # WebOb earlier than 0.9.7 has broken range parser implementation.
        # The current released version at this time is 0.9.6, so we do this
        # ourselves.  (It is fixed on trunk, though.)
        request = self.request
        range_header = request.headers.get('Range', None)
        if range_header is None:
            return None
        # Refuse to parse multiple byte ranges.  They are just plain silly.
        if ',' in range_header:
            return None
        unit, range_s = range_header.split('=', 1)
        if unit != 'bytes':
            # Other units are not supported
            return None
        if range_s.startswith('-'):
            start = content_length - int(range_s[1:])
            return start, content_length
        start, end = map(int, range_s.split('-'))
        return start, end + 1
class static_view(object):
    """ An instance of this class is a callable which can act as a
@@ -120,24 +133,110 @@
         package-relative directory.  However, if the ``root_dir`` is
         absolute, configuration will not be able to
         override the assets it contains.  """
    FileResponse = FileResponse # override point
    def __init__(self, root_dir, cache_max_age=3600, package_name=None,
                 use_subpath=False):
                 use_subpath=False, index='index.html',
                 chunksize=DEFAULT_CHUNKSIZE):
        # package_name is for bw compat; it is preferred to pass in a
        # package-relative path as root_dir
        # (e.g. ``anotherpackage:foo/static``).
        if isinstance(cache_max_age, int):
            cache_max_age = timedelta(seconds=cache_max_age)
        self.expires = cache_max_age
        if package_name is None:
            package_name = caller_package().__name__
        package_name, root_dir = resolve_asset_spec(root_dir, package_name)
        if package_name is None:
            app = StaticURLParser(root_dir, cache_max_age=cache_max_age)
        else:
            app = PackageURLParser(
                package_name, root_dir, cache_max_age=cache_max_age)
        self.app = app
        package_name, docroot = resolve_asset_spec(root_dir, package_name)
        self.use_subpath = use_subpath
        self.package_name = package_name
        self.docroot = docroot
        self.norm_docroot = normcase(normpath(docroot))
        self.chunksize = chunksize
        self.index = index
    def __call__(self, context, request):
        if self.use_subpath:
            return call_app_with_subpath_as_path_info(request, self.app)
        return request.get_response(self.app)
            path_tuple = request.subpath
        else:
            path_tuple = traversal_path(request.path_info)
        path = secure_path(path_tuple)
        if path is None:
            # belt-and-suspenders security; this should never be true
            # unless someone screws up the traversal_path code
            # (request.subpath is computed via traversal_path too)
            return HTTPNotFound('Out of bounds: %s' % request.url)
        if self.package_name: # package resource
            resource_path ='%s/%s' % (self.docroot.rstrip('/'), path)
            if resource_isdir(self.package_name, resource_path):
                if not request.path_url.endswith('/'):
                    return self.add_slash_redirect(request)
                resource_path = '%s/%s' % (resource_path.rstrip('/'),self.index)
            if not resource_exists(self.package_name, resource_path):
                return HTTPNotFound(request.url)
            filepath = resource_filename(self.package_name, resource_path)
        else: # filesystem file
            # os.path.normpath converts / to \ on windows
            filepath = normcase(normpath(join(self.norm_docroot, path)))
            if isdir(filepath):
                if not request.path_url.endswith('/'):
                    return self.add_slash_redirect(request)
                filepath = join(filepath, self.index)
            if not exists(filepath):
                return HTTPNotFound(request.url)
        return self.FileResponse(filepath, request,self.expires,self.chunksize)
    def add_slash_redirect(self, request):
        url = request.path_url + '/'
        qs = request.query_string
        if qs:
            url = url + '?' + qs
        return HTTPMovedPermanently(url)
def _file_iter(path, chunksize, content_range=None):
    file = open(path, 'rb')
    if content_range is not None:
        class ByteReader(object):
            def __init__(self, n_bytes):
                self.bytes_left = n_bytes
            def __call__(self):
                b = file.read(min(self.bytes_left, chunksize))
                self.bytes_left -= len(b)
                return b
        start, end = content_range
        file.seek(start)
        get_bytes = ByteReader(end - start)
    else:
        def get_bytes():
            return file.read(chunksize)
    try:
        buf = get_bytes()
        while buf:
            yield buf
            buf = get_bytes()
    finally:
        if hasattr(file, 'close'):
            file.close()
@lru_cache(1000)
def secure_path(path_tuple):
    if '' in path_tuple:
        return None
    for item in path_tuple:
        for val in ['.', '/']:
            if item.startswith(val):
                return None
    return '/'.join([quote_path_segment(x) for x in path_tuple])
pyramid/tests/test_config/test_views.py
@@ -1415,19 +1415,19 @@
    def test_add_static_view_here_no_utility_registered(self):
        from pyramid.renderers import null_renderer
        from zope.interface import Interface
        from pyramid.static import PackageURLParser
        from pyramid.interfaces import IView
        from pyramid.interfaces import IViewClassifier
        config = self._makeOne(autocommit=True)
        config.add_static_view('static', 'files',
                               renderer=null_renderer)
        config.add_static_view('static', 'files', renderer=null_renderer)
        request_type = self._getRouteRequestIface(config, 'static/')
        self._assertRoute(config, 'static/', 'static/*subpath')
        wrapped = config.registry.adapters.lookup(
            (IViewClassifier, request_type, Interface), IView, name='')
        request = self._makeRequest(config)
        from pyramid.request import Request
        request = Request.blank('/static/minimal.pt')
        request.subpath = ('minimal.pt', )
        result = wrapped(None, request)
        self.assertEqual(result.__class__, PackageURLParser)
        self.assertEqual(result.status, '200 OK')
    def test_add_static_view_package_relative(self):
        from pyramid.interfaces import IStaticURLInfo
@@ -3346,7 +3346,6 @@
        self.assertEqual(config.route_args, ('view/', 'view/*subpath'))
        self.assertEqual(config.view_kw['permission'], NO_PERMISSION_REQUIRED)
        self.assertEqual(config.view_kw['view'].__class__, static_view)
        self.assertEqual(config.view_kw['view'].app.cache_max_age, 1)
    def test_add_viewname_with_permission(self):
        config = DummyConfig()
@@ -3416,10 +3415,6 @@
        self.environ = environ
        self.params = {}
        self.cookies = {}
    def copy(self):
        return self
    def get_response(self, app):
        return app
class DummyContext:
    pass
pyramid/tests/test_integration.py
@@ -42,82 +42,177 @@
        self.assertEqual(view.__original_view__, wsgiapptest)
here = os.path.dirname(__file__)
staticapp = static_view(os.path.join(here, 'fixtures'), use_subpath=True)
class TestStaticApp(unittest.TestCase):
    def test_basic(self):
        from webob import Request
        context = DummyContext()
class TestStaticAppBase(object):
    def _makeRequest(self, extra=None):
        if extra is None:
            extra = {}
        from pyramid.request import Request
        from StringIO import StringIO
        request = Request({'PATH_INFO':'',
                           'SCRIPT_NAME':'',
                           'SERVER_NAME':'localhost',
                           'SERVER_PORT':'80',
                           'REQUEST_METHOD':'GET',
                           'wsgi.version':(1,0),
                           'wsgi.url_scheme':'http',
                           'wsgi.input':StringIO()})
        request.subpath = ('minimal.pt',)
        result = staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        kw = {'PATH_INFO':'',
              'SCRIPT_NAME':'',
              'SERVER_NAME':'localhost',
              'SERVER_PORT':'80',
              'REQUEST_METHOD':'GET',
              'wsgi.version':(1,0),
              'wsgi.url_scheme':'http',
              'wsgi.input':StringIO()}
        kw.update(extra)
        request = Request(kw)
        return request
    def _assertBody(self, body, filename):
        self.assertEqual(
            result.body.replace('\r', ''),
            open(os.path.join(here, 'fixtures/minimal.pt'), 'r').read())
            body.replace('\r', ''),
            open(filename, 'r').read()
            )
class TestStaticAppTests(TestStaticAppBase):
    def test_basic(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('minimal.pt',)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self._assertBody(result.body, os.path.join(here, 'fixtures/minimal.pt'))
    def test_not_modified(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('minimal.pt',)
        request.if_modified_since = pow(2, 32)-1
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '304 Not Modified') # CR only
    def test_file_in_subdir(self):
        from webob import Request
        request = self._makeRequest()
        context = DummyContext()
        from StringIO import StringIO
        request = Request({'PATH_INFO':'',
                           'SCRIPT_NAME':'',
                           'SERVER_NAME':'localhost',
                           'SERVER_PORT':'80',
                           'REQUEST_METHOD':'GET',
                           'wsgi.version':(1,0),
                           'wsgi.url_scheme':'http',
                           'wsgi.input':StringIO()})
        request.subpath = ('static', 'index.html',)
        result = staticapp(context, request)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self.assertEqual(
            result.body.replace('\r', ''),
            open(os.path.join(here, 'fixtures/static/index.html'), 'r').read())
        self._assertBody(result.body,
                         os.path.join(here, 'fixtures/static/index.html'))
    def test_redirect_to_subdir(self):
        from webob import Request
    def test_directory_noslash_redir(self):
        request = self._makeRequest({'PATH_INFO':'/static'})
        context = DummyContext()
        from StringIO import StringIO
        request = Request({'PATH_INFO':'',
                           'SCRIPT_NAME':'',
                           'SERVER_NAME':'localhost',
                           'SERVER_PORT':'80',
                           'REQUEST_METHOD':'GET',
                           'wsgi.version':(1,0),
                           'wsgi.url_scheme':'http',
                           'wsgi.input':StringIO()})
        request.subpath = ('static',)
        result = staticapp(context, request)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '301 Moved Permanently')
        self.assertEqual(result.location, 'http://localhost/static/')
    def test_redirect_to_subdir_with_existing_script_name(self):
        from webob import Request
    def test_directory_noslash_redir_preserves_qs(self):
        request = self._makeRequest({'PATH_INFO':'/static',
                                     'QUERY_STRING':'a=1&b=2'})
        context = DummyContext()
        from StringIO import StringIO
        request = Request({'PATH_INFO':'/static',
                           'SCRIPT_NAME':'/script_name',
                           'SERVER_NAME':'localhost',
                           'SERVER_PORT':'80',
                           'REQUEST_METHOD':'GET',
                           'wsgi.version':(1,0),
                           'wsgi.url_scheme':'http',
                           'wsgi.input':StringIO()})
        request.subpath = ('static',)
        result = staticapp(context, request)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '301 Moved Permanently')
        self.assertEqual(result.location,
        self.assertEqual(result.location, 'http://localhost/static/?a=1&b=2')
    def test_directory_noslash_redir_with_scriptname(self):
        request = self._makeRequest({'SCRIPT_NAME':'/script_name',
                                     'PATH_INFO':'/static'})
        context = DummyContext()
        request.subpath = ('static',)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '301 Moved Permanently')
        self.assertEqual(result.location,
                         'http://localhost/script_name/static/')
    def test_directory_withslash(self):
        request = self._makeRequest({'PATH_INFO':'/static/'})
        context = DummyContext()
        request.subpath = ('static',)
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self._assertBody(result.body,
                         os.path.join(here, 'fixtures/static/index.html'))
    def test_range_inclusive(self):
        request = self._makeRequest({'HTTP_RANGE':'bytes=1-2'})
        context = DummyContext()
        request.subpath = ('static', 'index.html')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '206 Partial Content')
        self.assertEqual(result.body, 'ht')
    def test_range_tilend(self):
        request = self._makeRequest({'HTTP_RANGE':'bytes=-5'})
        context = DummyContext()
        request.subpath = ('static', 'index.html')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '206 Partial Content')
        self.assertEqual(result.body, 'tml>\n') # CR only
    def test_range_notbytes(self):
        request = self._makeRequest({'HTTP_RANGE':'kilohertz=10'})
        context = DummyContext()
        request.subpath = ('static', 'index.html')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self._assertBody(result.body,
                         os.path.join(here, 'fixtures/static/index.html'))
    def test_range_multiple(self):
        request = self._makeRequest({'HTTP_RANGE':'bytes=10,11'})
        context = DummyContext()
        request.subpath = ('static', 'index.html')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self._assertBody(result.body,
                         os.path.join(here, 'fixtures/static/index.html'))
    def test_range_oob(self):
        request = self._makeRequest({'HTTP_RANGE':'bytes=1000-1002'})
        context = DummyContext()
        request.subpath = ('static', 'index.html')
        result = self.staticapp(context, request)
        self.assertEqual(result.status_int, 416)
    def test_notfound(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('static', 'wontbefound.x')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '404 Not Found')
    def test_oob_doubledot(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('..', 'test_integration.py')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '404 Not Found')
    def test_oob_slash(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('/', 'test_integration.py')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '404 Not Found')
    def test_oob_empty(self):
        request = self._makeRequest()
        context = DummyContext()
        request.subpath = ('', 'test_integration.py')
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '404 Not Found')
class TestStaticAppUsingAbsPath(unittest.TestCase, TestStaticAppTests):
    staticapp = static_view(os.path.join(here, 'fixtures'), use_subpath=True)
class TestStaticAppUsingResourcePath(unittest.TestCase, TestStaticAppTests):
    staticapp = static_view('pyramid.tests:fixtures', use_subpath=True)
class TestStaticAppNoSubpath(unittest.TestCase, TestStaticAppBase):
    staticapp = static_view(os.path.join(here, 'fixtures'), use_subpath=False)
    def test_basic(self):
        request = self._makeRequest({'PATH_INFO':'/minimal.pt'})
        context = DummyContext()
        result = self.staticapp(context, request)
        self.assertEqual(result.status, '200 OK')
        self._assertBody(result.body, os.path.join(here, 'fixtures/minimal.pt'))
class IntegrationBase(unittest.TestCase):
    root_factory = None
pyramid/tests/test_static.py
@@ -1,16 +1,16 @@
import unittest
from pyramid.testing import cleanUp
import datetime
class TestPackageURLParser(unittest.TestCase):
class Test_static_view_use_subpath_False(unittest.TestCase):
    def _getTargetClass(self):
        from pyramid.static import PackageURLParser
        return PackageURLParser
        from pyramid.static import static_view
        return static_view
    def _makeOne(self, *arg, **kw):
        return self._getTargetClass()(*arg, **kw)
    def _makeEnviron(self, **kw):
    def _makeRequest(self, kw=None):
        from pyramid.request import Request
        environ = {
            'wsgi.url_scheme':'http',
            'wsgi.version':(1,0),
@@ -20,332 +20,251 @@
            'SCRIPT_NAME':'',
            'REQUEST_METHOD':'GET',
            }
        environ.update(kw)
        return environ
        if kw is not None:
            environ.update(kw)
        return Request(environ=environ)
    
    def test_ctor_allargs(self):
        import os.path
        inst = self._makeOne('package', 'resource/name', root_resource='root',
                             cache_max_age=100)
        self.assertEqual(inst.package_name, 'package')
        self.assertEqual(inst.resource_name, os.path.join('resource', 'name'))
        self.assertEqual(inst.root_resource, 'root')
        self.assertEqual(inst.cache_max_age, 100)
    def test_ctor_defaultargs(self):
        import os.path
        inst = self._makeOne('package', 'resource/name')
        inst = self._makeOne('package:resource_name')
        self.assertEqual(inst.package_name, 'package')
        self.assertEqual(inst.resource_name, os.path.join('resource', 'name'))
        self.assertEqual(inst.root_resource, os.path.join('resource', 'name'))
        self.assertEqual(inst.cache_max_age, None)
        self.assertEqual(inst.docroot, 'resource_name')
        self.assertEqual(inst.expires, datetime.timedelta(seconds=3600))
        self.assertEqual(inst.index, 'index.html')
    def test_call_adds_slash_path_info_empty(self):
        environ = self._makeEnviron(PATH_INFO='')
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('301 Moved Permanently' in body)
        self.assertTrue('http://example.com:6543/' in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':''})
        context = DummyContext()
        response = inst(context, request)
        response.prepare(request.environ)
        self.assertEqual(response.status, '301 Moved Permanently')
        self.assertTrue('http://example.com:6543/' in response.body)
        
    def test_path_info_slash_means_index_html(self):
        environ = self._makeEnviron()
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>static</html>' in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
    def test_resource_out_of_bounds(self):
        environ = self._makeEnviron()
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        inst.root_resource = 'abcdef'
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('404 Not Found' in body)
        self.assertTrue('http://example.com:6543/' in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/subdir/../../minimal.pt'})
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
    def test_resource_doesnt_exist(self):
        environ = self._makeEnviron(PATH_INFO='/notthere')
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('404 Not Found' in body)
        self.assertTrue('http://example.com:6543/' in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/notthere'})
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
    def test_resource_isdir(self):
        environ = self._makeEnviron(PATH_INFO='/subdir/')
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>subdir</html>' in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/subdir/'})
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>subdir</html>' in response.body)
    def test_resource_is_file(self):
        environ = self._makeEnviron(PATH_INFO='/index.html')
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>static</html>' in body)
    def test_resource_has_extra_path_info(self):
        environ = self._makeEnviron(PATH_INFO='/static/index.html/more')
        inst = self._makeOne('pyramid.tests', 'fixtures')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue("The trailing path '/more' is not allowed" in body)
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/index.html'})
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
    def test_resource_is_file_with_cache_max_age(self):
        environ = self._makeEnviron(PATH_INFO='/index.html')
        inst = self._makeOne('pyramid.tests', 'fixtures/static',
                             cache_max_age=600)
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>static</html>' in body)
        self.assertEqual(len(sr.headerlist), 8)
        header_names = [ x[0] for x in sr.headerlist ]
        inst = self._makeOne('pyramid.tests:fixtures/static', cache_max_age=600)
        request = self._makeRequest({'PATH_INFO':'/index.html'})
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
        self.assertEqual(len(response.headerlist), 5)
        header_names = [ x[0] for x in response.headerlist ]
        header_names.sort()
        self.assertEqual(header_names,
                         ['Accept-Ranges', 'Cache-Control',
                          'Content-Length', 'Content-Range',
                          'Content-Type', 'ETag', 'Expires', 'Last-Modified'])
                         ['Content-Length', 'Content-Type', 'Date', 'Expires',
                          'Last-Modified'])
    def test_resource_is_file_with_no_cache_max_age(self):
        environ = self._makeEnviron(PATH_INFO='/index.html')
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>static</html>' in body)
        self.assertEqual(len(sr.headerlist), 6)
        header_names = [ x[0] for x in sr.headerlist ]
        inst = self._makeOne('pyramid.tests:fixtures/static',
                             cache_max_age=None)
        request = self._makeRequest({'PATH_INFO':'/index.html'})
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
        self.assertEqual(len(response.headerlist), 4)
        header_names = [ x[0] for x in response.headerlist ]
        header_names.sort()
        self.assertEqual(header_names,
                         ['Accept-Ranges', 'Content-Length', 'Content-Range',
                          'Content-Type', 'ETag', 'Last-Modified'])
        self.assertEqual(
            header_names,
            ['Content-Length', 'Content-Type', 'Date', 'Last-Modified'])
    def test_with_root_resource(self):
        environ = self._makeEnviron(PATH_INFO='/static/index.html')
        inst = self._makeOne('pyramid.tests', 'fixtures',
                             root_resource='fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        body = response[0]
        self.assertTrue('<html>static</html>' in body)
    def test_if_none_match(self):
        class DummyEq(object):
            def __eq__(self, other):
                return True
        dummy_eq = DummyEq()
        environ = self._makeEnviron(HTTP_IF_NONE_MATCH=dummy_eq)
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        response = inst(environ, sr)
        self.assertEqual(len(sr.headerlist), 1)
        self.assertEqual(sr.status, '304 Not Modified')
        self.assertEqual(sr.headerlist[0][0], 'ETag')
        self.assertEqual(response[0], '')
    def test_if_none_match_miss(self):
        class DummyEq(object):
            def __eq__(self, other):
                return False
        dummy_eq = DummyEq()
        environ = self._makeEnviron(HTTP_IF_NONE_MATCH=dummy_eq)
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        sr = DummyStartResponse()
        inst(environ, sr)
        self.assertEqual(len(sr.headerlist), 6)
        self.assertEqual(sr.status, '200 OK')
    def test_repr(self):
        import os.path
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        self.assertTrue(
            repr(inst).startswith(
            '<PackageURLParser pyramid.tests:%s at'
                % os.path.join('fixtures', 'static')))
    def test_resource_notmodified(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/index.html'})
        request.if_modified_since = pow(2, 32) -1
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '304 Not Modified')
    def test_not_found(self):
        inst = self._makeOne('pyramid.tests', 'fixtures/static')
        environ = self._makeEnviron()
        sr = DummyStartResponse()
        response = inst.not_found(environ, sr, 'debug_message')
        body = response[0]
        self.assertTrue('404 Not Found' in body)
        self.assertEqual(sr.status, '404 Not Found')
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':'/notthere.html'})
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
class Test_static_view(unittest.TestCase):
    def setUp(self):
        cleanUp()
    def tearDown(self):
        cleanUp()
class Test_static_view_use_subpath_True(unittest.TestCase):
    def _getTargetClass(self):
        from pyramid.static import static_view
        return static_view
    def _makeOne(self, path, package_name=None, use_subpath=False):
        return self._getTargetClass()(path, package_name=package_name,
                                      use_subpath=use_subpath)
    def _makeEnviron(self, **extras):
    def _makeOne(self, *arg, **kw):
        kw['use_subpath'] = True
        return self._getTargetClass()(*arg, **kw)
    def _makeRequest(self, kw=None):
        from pyramid.request import Request
        environ = {
            'wsgi.url_scheme':'http',
            'wsgi.version':(1,0),
            'SERVER_NAME':'localhost',
            'SERVER_PORT':'8080',
            'SERVER_NAME':'example.com',
            'SERVER_PORT':'6543',
            'PATH_INFO':'/',
            'SCRIPT_NAME':'',
            'REQUEST_METHOD':'GET',
            }
        environ.update(extras)
        return environ
    def test_abspath_subpath(self):
        import os.path
        path = os.path.dirname(__file__)
        view = self._makeOne(path, use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(response.directory, os.path.normcase(path))
    def test_relpath_subpath(self):
        path = 'fixtures'
        view = self._makeOne(path, use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(response.root_resource, 'fixtures')
        self.assertEqual(response.resource_name, 'fixtures')
        self.assertEqual(response.package_name, 'pyramid.tests')
        self.assertEqual(response.cache_max_age, 3600)
    def test_relpath_notsubpath(self):
        path = 'fixtures'
        view = self._makeOne(path)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertTrue(not hasattr(request, 'copied'))
        self.assertEqual(response.root_resource, 'fixtures')
        self.assertEqual(response.resource_name, 'fixtures')
        self.assertEqual(response.package_name, 'pyramid.tests')
        self.assertEqual(response.cache_max_age, 3600)
    def test_relpath_withpackage_subpath(self):
        view = self._makeOne('another:fixtures', use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(response.root_resource, 'fixtures')
        self.assertEqual(response.resource_name, 'fixtures')
        self.assertEqual(response.package_name, 'another')
        self.assertEqual(response.cache_max_age, 3600)
    def test_relpath_withpackage_name_subpath(self):
        view = self._makeOne('fixtures', package_name='another',
                             use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(response.root_resource, 'fixtures')
        self.assertEqual(response.resource_name, 'fixtures')
        self.assertEqual(response.package_name, 'another')
        self.assertEqual(response.cache_max_age, 3600)
    def test_no_subpath_preserves_path_info_and_script_name_subpath(self):
        view = self._makeOne('fixtures', package_name='another',
                             use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ()
        request.environ = self._makeEnviron(PATH_INFO='/path_info',
                                            SCRIPT_NAME='/script_name')
        view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(request.environ['PATH_INFO'], '/')
        self.assertEqual(request.environ['SCRIPT_NAME'],
                         '/script_name/path_info')
    def test_with_subpath_path_info_ends_with_slash_subpath(self):
        view = self._makeOne('fixtures', package_name='another',
                             use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ('subpath',)
        request.environ = self._makeEnviron(PATH_INFO='/path_info/subpath/')
        view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(request.environ['PATH_INFO'], '/subpath/')
        self.assertEqual(request.environ['SCRIPT_NAME'], '/path_info')
    def test_with_subpath_original_script_name_preserved(self):
        view = self._makeOne('fixtures', package_name='another',
                             use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ('subpath',)
        request.environ = self._makeEnviron(PATH_INFO='/path_info/subpath/',
                                            SCRIPT_NAME='/scriptname')
        view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(request.environ['PATH_INFO'], '/subpath/')
        self.assertEqual(request.environ['SCRIPT_NAME'],
                         '/scriptname/path_info')
    def test_with_subpath_new_script_name_fixes_trailing_slashes(self):
        view = self._makeOne('fixtures', package_name='another',
                             use_subpath=True)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ('sub', 'path')
        request.environ = self._makeEnviron(PATH_INFO='/path_info//sub//path//')
        view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(request.environ['PATH_INFO'], '/sub/path/')
        self.assertEqual(request.environ['SCRIPT_NAME'], '/path_info')
class DummyStartResponse:
    def __call__(self, status, headerlist, exc_info=None):
        self.status = status
        self.headerlist = headerlist
        self.exc_info = exc_info
        if kw is not None:
            environ.update(kw)
        return Request(environ=environ)
    
    def test_ctor_defaultargs(self):
        inst = self._makeOne('package:resource_name')
        self.assertEqual(inst.package_name, 'package')
        self.assertEqual(inst.docroot, 'resource_name')
        self.assertEqual(inst.expires, datetime.timedelta(seconds=3600))
        self.assertEqual(inst.index, 'index.html')
    def test_call_adds_slash_path_info_empty(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest({'PATH_INFO':''})
        request.subpath = ()
        context = DummyContext()
        response = inst(context, request)
        response.prepare(request.environ)
        self.assertEqual(response.status, '301 Moved Permanently')
        self.assertTrue('http://example.com:6543/' in response.body)
    def test_path_info_slash_means_index_html(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ()
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
    def test_resource_out_of_bounds(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ('subdir', '..', '..', 'minimal.pt')
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
    def test_resource_doesnt_exist(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ('notthere,')
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
    def test_resource_isdir(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ('subdir',)
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>subdir</html>' in response.body)
    def test_resource_is_file(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ('index.html',)
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
    def test_resource_is_file_with_cache_max_age(self):
        inst = self._makeOne('pyramid.tests:fixtures/static', cache_max_age=600)
        request = self._makeRequest()
        request.subpath = ('index.html',)
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
        self.assertEqual(len(response.headerlist), 5)
        header_names = [ x[0] for x in response.headerlist ]
        header_names.sort()
        self.assertEqual(header_names,
                         ['Content-Length', 'Content-Type', 'Date', 'Expires',
                          'Last-Modified'])
    def test_resource_is_file_with_no_cache_max_age(self):
        inst = self._makeOne('pyramid.tests:fixtures/static',
                             cache_max_age=None)
        request = self._makeRequest()
        request.subpath = ('index.html',)
        context = DummyContext()
        response = inst(context, request)
        self.assertTrue('<html>static</html>' in response.body)
        self.assertEqual(len(response.headerlist), 4)
        header_names = [ x[0] for x in response.headerlist ]
        header_names.sort()
        self.assertEqual(
            header_names,
            ['Content-Length', 'Content-Type', 'Date', 'Last-Modified'])
    def test_resource_notmodified(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.if_modified_since = pow(2, 32) -1
        request.subpath = ('index.html',)
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '304 Not Modified')
    def test_not_found(self):
        inst = self._makeOne('pyramid.tests:fixtures/static')
        request = self._makeRequest()
        request.subpath = ('notthere.html',)
        context = DummyContext()
        response = inst(context, request)
        self.assertEqual(response.status, '404 Not Found')
class Test_patch_mimetypes(unittest.TestCase):
    def _callFUT(self, module):
        from pyramid.static import init_mimetypes
        return init_mimetypes(module)
    def test_has_init(self):
        class DummyMimetypes(object):
            def init(self):
                self.initted = True
        module = DummyMimetypes()
        result = self._callFUT(module)
        self.assertEqual(result, True)
        self.assertEqual(module.initted, True)
    def test_missing_init(self):
        class DummyMimetypes(object):
            pass
        module = DummyMimetypes()
        result = self._callFUT(module)
        self.assertEqual(result, False)
class DummyContext:
    pass
class DummyRequest:
    def __init__(self, environ=None):
        if environ is None:
            environ = {}
        self.environ = environ
    def get_response(self, application):
        return application
    def copy(self):
        self.copied = True
        return self
pyramid/tests/test_traversal.py
@@ -19,6 +19,9 @@
    def test_twodots(self):
        self.assertEqual(self._callFUT('foo/../bar'), (u'bar',))
    def test_twodots_at_start(self):
        self.assertEqual(self._callFUT('../../bar'), (u'bar',))
    def test_element_urllquoted(self):
        self.assertEqual(self._callFUT('/foo/space%20thing/bar'),
                         (u'foo', u'space thing', u'bar'))
pyramid/tests/test_view.py
@@ -548,27 +548,6 @@
        result = self._callFUT(context, request)
        self.assertEqual(result, 'abc')
class Test_patch_mimetypes(unittest.TestCase):
    def _callFUT(self, module):
        from pyramid.view import init_mimetypes
        return init_mimetypes(module)
    def test_has_init(self):
        class DummyMimetypes(object):
            def init(self):
                self.initted = True
        module = DummyMimetypes()
        result = self._callFUT(module)
        self.assertEqual(result, True)
        self.assertEqual(module.initted, True)
    def test_missing_init(self):
        class DummyMimetypes(object):
            pass
        module = DummyMimetypes()
        result = self._callFUT(module)
        self.assertEqual(result, False)
class Test_static(unittest.TestCase):
    def setUp(self):
        from zope.deprecation import __show__
@@ -578,38 +557,14 @@
        from zope.deprecation import __show__
        __show__.on()
    def _getTargetClass(self):
    def _makeOne(self, path, package_name):
        from pyramid.view import static
        return static
    def _makeOne(self, path, package_name=None):
        return self._getTargetClass()(path, package_name=package_name)
        return static(path, package_name)
        
    def _makeEnviron(self, **extras):
        environ = {
            'wsgi.url_scheme':'http',
            'wsgi.version':(1,0),
            'SERVER_NAME':'localhost',
            'SERVER_PORT':'8080',
            'REQUEST_METHOD':'GET',
            }
        environ.update(extras)
        return environ
    def test_relpath_subpath(self):
    def test_it(self):
        path = 'fixtures'
        view = self._makeOne(path)
        context = DummyContext()
        request = DummyRequest()
        request.subpath = ['__init__.py']
        request.environ = self._makeEnviron()
        response = view(context, request)
        self.assertEqual(request.copied, True)
        self.assertEqual(response.root_resource, 'fixtures')
        self.assertEqual(response.resource_name, 'fixtures')
        self.assertEqual(response.package_name, 'pyramid.tests')
        self.assertEqual(response.cache_max_age, 3600)
        view = self._makeOne(path, None)
        self.assertEqual(view.docroot, 'fixtures')
class ExceptionResponse(Exception):
    status = '404 Not Found'
@@ -632,13 +587,6 @@
            environ = {}
        self.environ = environ
        
    def get_response(self, application):
        return application
    def copy(self):
        self.copied = True
        return self
from pyramid.interfaces import IResponse
from zope.interface import implements
pyramid/traversal.py
@@ -479,7 +479,8 @@
        if not segment or segment=='.':
            continue
        elif segment == '..':
            del clean[-1]
            if clean:
                del clean[-1]
        else:
            try:
                segment = segment.decode('utf-8')
pyramid/view.py
@@ -1,4 +1,3 @@
import mimetypes
import venusian
from zope.interface import providedBy
@@ -13,21 +12,6 @@
from pyramid.path import caller_package
from pyramid.static import static_view
from pyramid.threadlocal import get_current_registry
def init_mimetypes(mimetypes):
    # this is a function so it can be unittested
    if hasattr(mimetypes, 'init'):
        mimetypes.init()
        return True
    return False
# See http://bugs.python.org/issue5853 which is a recursion bug
# that seems to effect Python 2.6, Python 2.6.1, and 2.6.2 (a fix
# has been applied on the Python 2 trunk).  This workaround should
# really be in Paste if anywhere, but it's easiest to just do it
# here and get it over with to avoid needing to deal with any
# fallout.
init_mimetypes(mimetypes)
_marker = object()