Merge branch 'fix.view-lookup'
| | |
| | | import operator |
| | | import os |
| | | import sys |
| | | import threading |
| | | import venusian |
| | | |
| | | from webob.exc import WSGIHTTPException as WebobWSGIHTTPException |
| | |
| | | info=info, event=event) |
| | | _registry.registerSelfAdapter = registerSelfAdapter |
| | | |
| | | if not hasattr(_registry, '_lock'): |
| | | _registry._lock = threading.Lock() |
| | | |
| | | if not hasattr(_registry, '_clear_view_lookup_cache'): |
| | | def _clear_view_lookup_cache(): |
| | | _registry._view_lookup_cache = {} |
| | | _registry._clear_view_lookup_cache = _clear_view_lookup_cache |
| | | |
| | | |
| | | # API |
| | | |
| | | def _get_introspector(self): |
| | |
| | | multiview, |
| | | (IExceptionViewClassifier, request_iface, context), |
| | | IMultiView, name=name) |
| | | |
| | | self.registry._clear_view_lookup_cache() |
| | | renderer_type = getattr(renderer, 'type', None) # gard against None |
| | | intrspc = self.introspector |
| | | if ( |
| | |
| | | import operator |
| | | import threading |
| | | |
| | | from zope.interface import implementer |
| | | |
| | |
| | | |
| | | _settings = None |
| | | |
| | | def __init__(self, *arg, **kw): |
| | | # add a registry-instance-specific lock, which is used when the lookup |
| | | # cache is mutated |
| | | self._lock = threading.Lock() |
| | | # add a view lookup cache |
| | | self._clear_view_lookup_cache() |
| | | Components.__init__(self, *arg, **kw) |
| | | |
| | | def _clear_view_lookup_cache(self): |
| | | self._view_lookup_cache = {} |
| | | |
| | | def __nonzero__(self): |
| | | # defeat bool determination via dict.__len__ |
| | | return True |
| | |
| | | exc_info = None |
| | | matchdict = None |
| | | matched_route = None |
| | | request_iface = IRequest |
| | | |
| | | ResponseClass = Response |
| | | |
| | |
| | | IRequestFactory, |
| | | IRoutesMapper, |
| | | ITraverser, |
| | | IView, |
| | | IViewClassifier, |
| | | ITweens, |
| | | ) |
| | | |
| | |
| | | NewResponse, |
| | | ) |
| | | |
| | | from pyramid.exceptions import PredicateMismatch |
| | | from pyramid.httpexceptions import HTTPNotFound |
| | | from pyramid.request import Request |
| | | from pyramid.view import _call_view |
| | | from pyramid.request import apply_request_extensions |
| | | from pyramid.threadlocal import manager |
| | | |
| | |
| | | |
| | | # find a view callable |
| | | context_iface = providedBy(context) |
| | | view_callable = adapters.lookup( |
| | | (IViewClassifier, request.request_iface, context_iface), |
| | | IView, name=view_name, default=None) |
| | | response = _call_view( |
| | | registry, |
| | | request, |
| | | context, |
| | | context_iface, |
| | | view_name |
| | | ) |
| | | |
| | | # invoke the view callable |
| | | if view_callable is None: |
| | | if response is None: |
| | | if self.debug_notfound: |
| | | msg = ( |
| | | 'debug_notfound of url %s; path_info: %r, ' |
| | |
| | | else: |
| | | msg = request.path_info |
| | | raise HTTPNotFound(msg) |
| | | else: |
| | | try: |
| | | response = view_callable(context, request) |
| | | except PredicateMismatch: |
| | | # look for other views that meet the predicate |
| | | # criteria |
| | | for iface in context_iface.__sro__[1:]: |
| | | previous_view_callable = view_callable |
| | | view_callable = adapters.lookup( |
| | | (IViewClassifier, request.request_iface, iface), |
| | | IView, name=view_name, default=None) |
| | | # intermediate bases may lookup same view_callable |
| | | if view_callable is previous_view_callable: |
| | | continue |
| | | if view_callable is not None: |
| | | try: |
| | | response = view_callable(context, request) |
| | | break |
| | | except PredicateMismatch: |
| | | pass |
| | | else: |
| | | raise |
| | | |
| | | return response |
| | | |
| | | def invoke_subrequest(self, request, use_tweens=False): |
| | |
| | | request = self.request_factory(environ) |
| | | response = self.invoke_subrequest(request, use_tweens=True) |
| | | return response(request.environ, start_response) |
| | | |
| | |
| | | import textwrap |
| | | import re |
| | | |
| | | from zope.interface import Interface |
| | | |
| | | from pyramid.paster import bootstrap |
| | | from pyramid.compat import (string_types, configparser) |
| | | from pyramid.interfaces import ( |
| | | IRouteRequest, |
| | | IViewClassifier, |
| | | IView, |
| | | ) |
| | | from pyramid.interfaces import IRouteRequest |
| | | from pyramid.config import not_ |
| | | |
| | | from pyramid.scripts.common import parse_vars |
| | | from pyramid.static import static_view |
| | | from zope.interface import Interface |
| | | from pyramid.view import _find_views |
| | | |
| | | |
| | | PAD = 3 |
| | |
| | | (route.name, _get_pattern(route), UNKNOWN_KEY, ANY_KEY) |
| | | ] |
| | | |
| | | view_callable = registry.adapters.lookup( |
| | | (IViewClassifier, request_iface, Interface), |
| | | IView, |
| | | name='', |
| | | default=None |
| | | ) |
| | | view_callables = _find_views(registry, request_iface, Interface, '') |
| | | if view_callables: |
| | | view_callable = view_callables[0] |
| | | else: |
| | | view_callable = None |
| | | view_module = _get_view_module(view_callable) |
| | | |
| | | # Introspectables can be turned off, so there could be a chance |
| | |
| | | from pyramid.paster import bootstrap |
| | | from pyramid.request import Request |
| | | from pyramid.scripts.common import parse_vars |
| | | from pyramid.view import _find_views |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PViewsCommand(argv, quiet) |
| | |
| | | from pyramid.interfaces import IRootFactory |
| | | from pyramid.interfaces import IRouteRequest |
| | | from pyramid.interfaces import IRoutesMapper |
| | | from pyramid.interfaces import IView |
| | | from pyramid.interfaces import IViewClassifier |
| | | from pyramid.interfaces import ITraverser |
| | | from pyramid.traversal import DefaultRootFactory |
| | | from pyramid.traversal import ResourceTreeTraverser |
| | |
| | | IRouteRequest, |
| | | name=route.name, |
| | | default=IRequest) |
| | | view = adapters.lookup( |
| | | (IViewClassifier, request_iface, context_iface), |
| | | IView, name='', default=None) |
| | | if view is None: |
| | | views = _find_views( |
| | | request.registry, |
| | | request_iface, |
| | | context_iface, |
| | | '' |
| | | ) |
| | | if not views: |
| | | continue |
| | | view = views[0] |
| | | view.__request_attrs__ = {} |
| | | view.__request_attrs__['matchdict'] = match |
| | | view.__request_attrs__['matched_route'] = route |
| | |
| | | # find a view callable |
| | | context_iface = providedBy(context) |
| | | if routes_multiview is None: |
| | | view = adapters.lookup( |
| | | (IViewClassifier, request_iface, context_iface), |
| | | IView, name=view_name, default=None) |
| | | views = _find_views( |
| | | request.registry, |
| | | request_iface, |
| | | context_iface, |
| | | view_name, |
| | | ) |
| | | if views: |
| | | view = views[0] |
| | | else: |
| | | view = None |
| | | else: |
| | | view = RoutesMultiView(infos, context_iface, root_factory, request) |
| | | |
| | | # routes are not registered with a view name |
| | | if view is None: |
| | | view = adapters.lookup( |
| | | (IViewClassifier, request_iface, context_iface), |
| | | IView, name='', default=None) |
| | | views = _find_views( |
| | | request.registry, |
| | | request_iface, |
| | | context_iface, |
| | | '', |
| | | ) |
| | | if views: |
| | | view = views[0] |
| | | else: |
| | | view = None |
| | | # we don't want a multiview here |
| | | if IMultiView.providedBy(view): |
| | | view = None |
| | |
| | | """ |
| | | reg = _get_registry(request) |
| | | provides = [IViewClassifier] + map_(providedBy, (request, context)) |
| | | # XXX not sure what to do here about using _find_views or analogue; |
| | | # for now let's just keep it as-is |
| | | view = reg.adapters.lookup(provides, ISecuredView, name=name) |
| | | if view is None: |
| | | view = reg.adapters.lookup(provides, IView, name=name) |
| | |
| | | from pyramid.config import Configurator |
| | | from pyramid.decorator import reify |
| | | from pyramid.path import caller_package |
| | | from pyramid.response import Response, _get_response_factory |
| | | from pyramid.response import _get_response_factory |
| | | from pyramid.registry import Registry |
| | | |
| | | from pyramid.security import ( |
| | |
| | | charset = 'UTF-8' |
| | | script_name = '' |
| | | _registry = None |
| | | request_iface = IRequest |
| | | |
| | | def __init__(self, params=None, environ=None, headers=None, path='/', |
| | | cookies=None, post=None, **kw): |
| | |
| | | import unittest |
| | | import warnings |
| | | |
| | | import os |
| | | |
| | |
| | | |
| | | from pyramid.exceptions import ConfigurationExecutionError |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | class ConfiguratorTests(unittest.TestCase): |
| | | def _makeOne(self, *arg, **kw): |
| | |
| | | self.assertEqual(kw, |
| | | {'info': '', 'provided': 'provided', |
| | | 'required': 'required', 'name': 'abc', 'event': True}) |
| | | |
| | | def test__fix_registry_adds__lock(self): |
| | | reg = DummyRegistry() |
| | | config = self._makeOne(reg) |
| | | config._fix_registry() |
| | | self.assertTrue(hasattr(reg, '_lock')) |
| | | |
| | | def test__fix_registry_adds_clear_view_lookup_cache(self): |
| | | reg = DummyRegistry() |
| | | config = self._makeOne(reg) |
| | | self.assertFalse(hasattr(reg, '_clear_view_lookup_cache')) |
| | | config._fix_registry() |
| | | self.assertFalse(hasattr(reg, '_view_lookup_cache')) |
| | | reg._clear_view_lookup_cache() |
| | | self.assertEqual(reg._view_lookup_cache, {}) |
| | | |
| | | def test_setup_registry_calls_fix_registry(self): |
| | | reg = DummyRegistry() |
| | |
| | | class DummyRequest: |
| | | subpath = () |
| | | matchdict = None |
| | | request_iface = IRequest |
| | | def __init__(self, environ=None): |
| | | if environ is None: |
| | | environ = {} |
| | |
| | | request.params = {'param':'1'} |
| | | self.assertEqual(wrapper(ctx, request), 'view8') |
| | | |
| | | def test_view_with_most_specific_predicate(self): |
| | | from pyramid.renderers import null_renderer as nr |
| | | from pyramid.router import Router |
| | | |
| | | class OtherBase(object): pass |
| | | class Int1(object): pass |
| | | class Int2(object): pass |
| | | |
| | | class Resource(OtherBase, Int1, Int2): |
| | | def __init__(self, request): pass |
| | | |
| | | def unknown(context, request): return 'unknown' |
| | | def view(context, request): return 'hello' |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | config.add_route('root', '/', factory=Resource) |
| | | config.add_view(unknown, route_name='root', renderer=nr) |
| | | config.add_view( |
| | | view, renderer=nr, route_name='root', |
| | | context=Int1, request_method='GET' |
| | | ) |
| | | config.add_view( |
| | | view=view, renderer=nr, route_name='root', |
| | | context=Int2, request_method='POST' |
| | | ) |
| | | request = self._makeRequest(config) |
| | | request.method = 'POST' |
| | | request.params = {} |
| | | router = Router(config.registry) |
| | | response = router.handle_request(request) |
| | | self.assertEqual(response, 'hello') |
| | | |
| | | def test_view_with_most_specific_predicate_with_mismatch(self): |
| | | from pyramid.renderers import null_renderer as nr |
| | | from pyramid.router import Router |
| | | |
| | | class OtherBase(object): pass |
| | | class Int1(object): pass |
| | | class Int2(object): pass |
| | | |
| | | class Resource(OtherBase, Int1, Int2): |
| | | def __init__(self, request): pass |
| | | |
| | | def unknown(context, request): return 'unknown' |
| | | def view(context, request): return 'hello' |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | config.add_route('root', '/', factory=Resource) |
| | | |
| | | config.add_view( |
| | | unknown, |
| | | route_name='root', |
| | | renderer=nr, |
| | | request_method=('POST',), |
| | | xhr=True, |
| | | ) |
| | | |
| | | config.add_view( |
| | | view, renderer=nr, route_name='root', |
| | | context=Int1, request_method='GET' |
| | | ) |
| | | config.add_view( |
| | | view=view, renderer=nr, route_name='root', |
| | | context=Int2, request_method='POST' |
| | | ) |
| | | request = self._makeRequest(config) |
| | | request.method = 'POST' |
| | | request.params = {} |
| | | router = Router(config.registry) |
| | | response = router.handle_request(request) |
| | | self.assertEqual(response, 'hello') |
| | | |
| | | def test_add_view_multiview___discriminator__(self): |
| | | from pyramid.renderers import null_renderer |
| | | from zope.interface import Interface |
| | |
| | | self.settings = {} |
| | | |
| | | from zope.interface import implementer |
| | | from pyramid.interfaces import IResponse |
| | | from pyramid.interfaces import ( |
| | | IResponse, |
| | | IRequest, |
| | | ) |
| | | |
| | | @implementer(IResponse) |
| | | class DummyResponse(object): |
| | | content_type = None |
| | |
| | | class DummyRequest: |
| | | subpath = () |
| | | matchdict = None |
| | | request_iface = IRequest |
| | | |
| | | def __init__(self, environ=None): |
| | | if environ is None: |
| | |
| | | registry = self._makeOne() |
| | | self.assertEqual(registry.__nonzero__(), True) |
| | | |
| | | def test__lock(self): |
| | | registry = self._makeOne() |
| | | self.assertTrue(registry._lock) |
| | | |
| | | def test_clear_view_cache_lookup(self): |
| | | registry = self._makeOne() |
| | | registry._view_lookup_cache[1] = 2 |
| | | registry._clear_view_lookup_cache() |
| | | self.assertEqual(registry._view_lookup_cache, {}) |
| | | |
| | | def test_package_name(self): |
| | | package_name = 'testing' |
| | | registry = self._getTargetClass()(package_name) |
| | |
| | | |
| | | from pyramid import testing |
| | | |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | class BaseTest(object): |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | |
| | | testing.tearDown() |
| | | |
| | | def _registerView(self, reg, app, name): |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.interfaces import IViewClassifier |
| | | for_ = (IViewClassifier, IRequest, IContext) |
| | | from pyramid.interfaces import IView |
| | |
| | | return environ |
| | | |
| | | def _makeRequest(self, **environ): |
| | | from pyramid.interfaces import IRequest |
| | | from zope.interface import directlyProvides |
| | | from webob import Request |
| | | from pyramid.request import Request |
| | | from pyramid.registry import Registry |
| | | environ = self._makeEnviron(**environ) |
| | | request = Request(environ) |
| | | request.registry = Registry() |
| | | directlyProvides(request, IRequest) |
| | | return request |
| | | |
| | | def _makeContext(self): |
| | |
| | | |
| | | class DummyRequest: |
| | | exception = None |
| | | request_iface = IRequest |
| | | |
| | | def __init__(self, environ=None): |
| | | if environ is None: |
| | |
| | | from pyramid.interfaces import ( |
| | | IExceptionViewClassifier, |
| | | IRequest, |
| | | IView, |
| | | ) |
| | | |
| | | from zope.interface import providedBy |
| | | from pyramid.view import _call_view |
| | | |
| | | def excview_tween_factory(handler, registry): |
| | | """ A :term:`tween` factory which produces a tween that catches an |
| | | exception raised by downstream tweens (or the main Pyramid request |
| | | handler) and, if possible, converts it into a Response using an |
| | | :term:`exception view`.""" |
| | | adapters = registry.adapters |
| | | |
| | | def excview_tween(request): |
| | | attrs = request.__dict__ |
| | |
| | | # https://github.com/Pylons/pyramid/issues/700 |
| | | request_iface = attrs.get('request_iface', IRequest) |
| | | provides = providedBy(exc) |
| | | for_ = (IExceptionViewClassifier, request_iface.combined, provides) |
| | | view_callable = adapters.lookup(for_, IView, default=None) |
| | | if view_callable is None: |
| | | response = _call_view( |
| | | registry, |
| | | request, |
| | | exc, |
| | | provides, |
| | | '', |
| | | view_classifier=IExceptionViewClassifier, |
| | | request_iface=request_iface.combined |
| | | ) |
| | | if response is None: |
| | | raise |
| | | response = view_callable(exc, request) |
| | | |
| | | return response |
| | | |
| | |
| | | import itertools |
| | | import venusian |
| | | |
| | | from zope.interface import providedBy |
| | | |
| | | from pyramid.interfaces import ( |
| | | IRoutesMapper, |
| | | IMultiView, |
| | | ISecuredView, |
| | | IView, |
| | | IViewClassifier, |
| | | IRequest, |
| | | ) |
| | | |
| | | from pyramid.compat import ( |
| | | map_, |
| | | decode_path_info, |
| | | ) |
| | | from pyramid.compat import decode_path_info |
| | | |
| | | from pyramid.exceptions import PredicateMismatch |
| | | |
| | | from pyramid.httpexceptions import ( |
| | | HTTPFound, |
| | |
| | | disallowed. |
| | | |
| | | If ``secure`` is ``False``, no permission checking is done.""" |
| | | provides = [IViewClassifier] + map_(providedBy, (request, context)) |
| | | try: |
| | | reg = request.registry |
| | | except AttributeError: |
| | | reg = get_current_registry() |
| | | view = reg.adapters.lookup(provides, IView, name=name) |
| | | if view is None: |
| | | return None |
| | | |
| | | if not secure: |
| | | # the view will have a __call_permissive__ attribute if it's |
| | | # secured; otherwise it won't. |
| | | view = getattr(view, '__call_permissive__', view) |
| | | registry = getattr(request, 'registry', None) |
| | | if registry is None: |
| | | registry = get_current_registry() |
| | | |
| | | # if this view is secured, it will raise a Forbidden |
| | | # appropriately if the executing user does not have the proper |
| | | # permission |
| | | return view(context, request) |
| | | context_iface = providedBy(context) |
| | | |
| | | response = _call_view( |
| | | registry, |
| | | request, |
| | | context, |
| | | context_iface, |
| | | name, |
| | | secure = secure, |
| | | ) |
| | | |
| | | return response # NB: might be None |
| | | |
| | | |
| | | def render_view_to_iterable(context, request, name='', secure=True): |
| | | """ Call the :term:`view callable` configured with a :term:`view |
| | |
| | | settings['_info'] = info.codeinfo # fbo "action_method" |
| | | return wrapped |
| | | |
| | | def _find_views( |
| | | registry, |
| | | request_iface, |
| | | context_iface, |
| | | view_name, |
| | | view_types=None, |
| | | view_classifier=None, |
| | | ): |
| | | if view_types is None: |
| | | view_types = (IView, ISecuredView, IMultiView) |
| | | if view_classifier is None: |
| | | view_classifier = IViewClassifier |
| | | registered = registry.adapters.registered |
| | | cache = registry._view_lookup_cache |
| | | views = cache.get((request_iface, context_iface, view_name)) |
| | | if views is None: |
| | | views = [] |
| | | for req_type, ctx_type in itertools.product( |
| | | request_iface.__sro__, context_iface.__sro__ |
| | | ): |
| | | source_ifaces = (view_classifier, req_type, ctx_type) |
| | | for view_type in view_types: |
| | | view_callable = registered( |
| | | source_ifaces, |
| | | view_type, |
| | | name=view_name, |
| | | ) |
| | | if view_callable is not None: |
| | | views.append(view_callable) |
| | | if views: |
| | | # do not cache view lookup misses. rationale: dont allow cache to |
| | | # grow without bound if somebody tries to hit the site with many |
| | | # missing URLs. we could use an LRU cache instead, but then |
| | | # purposeful misses by an attacker would just blow out the cache |
| | | # anyway. downside: misses will almost always consume more CPU than |
| | | # hits in steady state. |
| | | with registry._lock: |
| | | cache[(request_iface, context_iface, view_name)] = views |
| | | |
| | | return views |
| | | |
| | | def _call_view( |
| | | registry, |
| | | request, |
| | | context, |
| | | context_iface, |
| | | view_name, |
| | | view_types=None, |
| | | view_classifier=None, |
| | | secure=True, |
| | | request_iface=None, |
| | | ): |
| | | if request_iface is None: |
| | | request_iface = getattr(request, 'request_iface', IRequest) |
| | | view_callables = _find_views( |
| | | registry, |
| | | request_iface, |
| | | context_iface, |
| | | view_name, |
| | | view_types=view_types, |
| | | view_classifier=view_classifier, |
| | | ) |
| | | |
| | | pme = None |
| | | response = None |
| | | |
| | | for view_callable in view_callables: |
| | | # look for views that meet the predicate criteria |
| | | try: |
| | | if not secure: |
| | | # the view will have a __call_permissive__ attribute if it's |
| | | # secured; otherwise it won't. |
| | | view_callable = getattr( |
| | | view_callable, |
| | | '__call_permissive__', |
| | | view_callable |
| | | ) |
| | | |
| | | # if this view is secured, it will raise a Forbidden |
| | | # appropriately if the executing user does not have the proper |
| | | # permission |
| | | response = view_callable(context, request) |
| | | return response |
| | | except PredicateMismatch as _pme: |
| | | pme = _pme |
| | | |
| | | if pme is not None: |
| | | raise pme |
| | | |
| | | return response |