from zope.interface import implementer, providedBy
|
|
from pyramid.interfaces import (
|
IDebugLogger,
|
IExecutionPolicy,
|
IRequest,
|
IRequestExtensions,
|
IRootFactory,
|
IRouteRequest,
|
IRouter,
|
IRequestFactory,
|
IRoutesMapper,
|
ITraverser,
|
ITweens,
|
)
|
|
from pyramid.events import (
|
ContextFound,
|
NewRequest,
|
NewResponse,
|
BeforeTraversal,
|
)
|
|
from pyramid.httpexceptions import HTTPNotFound
|
from pyramid.request import Request
|
from pyramid.view import _call_view
|
from pyramid.request import apply_request_extensions
|
from pyramid.threadlocal import RequestContext
|
|
from pyramid.traversal import DefaultRootFactory, ResourceTreeTraverser
|
|
|
@implementer(IRouter)
|
class Router(object):
|
|
debug_notfound = False
|
debug_routematch = False
|
|
def __init__(self, registry):
|
q = registry.queryUtility
|
self.logger = q(IDebugLogger)
|
self.root_factory = q(IRootFactory, default=DefaultRootFactory)
|
self.routes_mapper = q(IRoutesMapper)
|
self.request_factory = q(IRequestFactory, default=Request)
|
self.request_extensions = q(IRequestExtensions)
|
self.execution_policy = q(
|
IExecutionPolicy, default=default_execution_policy
|
)
|
self.orig_handle_request = self.handle_request
|
tweens = q(ITweens)
|
if tweens is not None:
|
self.handle_request = tweens(self.handle_request, registry)
|
self.root_policy = self.root_factory # b/w compat
|
self.registry = registry
|
settings = registry.settings
|
if settings is not None:
|
self.debug_notfound = settings['debug_notfound']
|
self.debug_routematch = settings['debug_routematch']
|
|
def handle_request(self, request):
|
attrs = request.__dict__
|
registry = attrs['registry']
|
|
request.request_iface = IRequest
|
context = None
|
routes_mapper = self.routes_mapper
|
debug_routematch = self.debug_routematch
|
adapters = registry.adapters
|
has_listeners = registry.has_listeners
|
notify = registry.notify
|
logger = self.logger
|
|
has_listeners and notify(NewRequest(request))
|
# find the root object
|
root_factory = self.root_factory
|
if routes_mapper is not None:
|
info = routes_mapper(request)
|
match, route = info['match'], info['route']
|
if route is None:
|
if debug_routematch:
|
msg = 'no route matched for url %s' % request.url
|
logger and logger.debug(msg)
|
else:
|
attrs['matchdict'] = match
|
attrs['matched_route'] = route
|
|
if debug_routematch:
|
msg = (
|
'route matched for url %s; '
|
'route_name: %r, '
|
'path_info: %r, '
|
'pattern: %r, '
|
'matchdict: %r, '
|
'predicates: %r'
|
% (
|
request.url,
|
route.name,
|
request.path_info,
|
route.pattern,
|
match,
|
', '.join([p.text() for p in route.predicates]),
|
)
|
)
|
logger and logger.debug(msg)
|
|
request.request_iface = registry.queryUtility(
|
IRouteRequest, name=route.name, default=IRequest
|
)
|
|
root_factory = route.factory or self.root_factory
|
|
# Notify anyone listening that we are about to start traversal
|
#
|
# Notify before creating root_factory in case we want to do something
|
# special on a route we may have matched. See
|
# https://github.com/Pylons/pyramid/pull/1876 for ideas of what is
|
# possible.
|
has_listeners and notify(BeforeTraversal(request))
|
|
# Create the root factory
|
root = root_factory(request)
|
attrs['root'] = root
|
|
# We are about to traverse and find a context
|
traverser = adapters.queryAdapter(root, ITraverser)
|
if traverser is None:
|
traverser = ResourceTreeTraverser(root)
|
tdict = traverser(request)
|
|
context, view_name, subpath, traversed, vroot, vroot_path = (
|
tdict['context'],
|
tdict['view_name'],
|
tdict['subpath'],
|
tdict['traversed'],
|
tdict['virtual_root'],
|
tdict['virtual_root_path'],
|
)
|
|
attrs.update(tdict)
|
|
# Notify anyone listening that we have a context and traversal is
|
# complete
|
has_listeners and notify(ContextFound(request))
|
|
# find a view callable
|
context_iface = providedBy(context)
|
response = _call_view(
|
registry, request, context, context_iface, view_name
|
)
|
|
if response is None:
|
if self.debug_notfound:
|
msg = (
|
'debug_notfound of url %s; path_info: %r, '
|
'context: %r, view_name: %r, subpath: %r, '
|
'traversed: %r, root: %r, vroot: %r, '
|
'vroot_path: %r'
|
% (
|
request.url,
|
request.path_info,
|
context,
|
view_name,
|
subpath,
|
traversed,
|
root,
|
vroot,
|
vroot_path,
|
)
|
)
|
logger and logger.debug(msg)
|
else:
|
msg = request.path_info
|
raise HTTPNotFound(msg)
|
|
return response
|
|
def invoke_subrequest(self, request, use_tweens=False):
|
"""Obtain a response object from the Pyramid application based on
|
information in the ``request`` object provided. The ``request``
|
object must be an object that implements the Pyramid request
|
interface (such as a :class:`pyramid.request.Request` instance). If
|
``use_tweens`` is ``True``, the request will be sent to the
|
:term:`tween` in the tween stack closest to the request ingress. If
|
``use_tweens`` is ``False``, the request will be sent to the main
|
router handler, and no tweens will be invoked.
|
|
See the API for pyramid.request for complete documentation.
|
"""
|
request.registry = self.registry
|
request.invoke_subrequest = self.invoke_subrequest
|
extensions = self.request_extensions
|
if extensions is not None:
|
apply_request_extensions(request, extensions=extensions)
|
with RequestContext(request):
|
return self.invoke_request(request, _use_tweens=use_tweens)
|
|
def request_context(self, environ):
|
"""
|
Create a new request context from a WSGI environ.
|
|
The request context is used to push/pop the threadlocals required
|
when processing the request. It also contains an initialized
|
:class:`pyramid.interfaces.IRequest` instance using the registered
|
:class:`pyramid.interfaces.IRequestFactory`. The context may be
|
used as a context manager to control the threadlocal lifecycle:
|
|
.. code-block:: python
|
|
with router.request_context(environ) as request:
|
...
|
|
Alternatively, the context may be used without the ``with`` statement
|
by manually invoking its ``begin()`` and ``end()`` methods.
|
|
.. code-block:: python
|
|
ctx = router.request_context(environ)
|
request = ctx.begin()
|
try:
|
...
|
finally:
|
ctx.end()
|
|
"""
|
request = self.request_factory(environ)
|
request.registry = self.registry
|
request.invoke_subrequest = self.invoke_subrequest
|
extensions = self.request_extensions
|
if extensions is not None:
|
apply_request_extensions(request, extensions=extensions)
|
return RequestContext(request)
|
|
def invoke_request(self, request, _use_tweens=True):
|
"""
|
Execute a request through the request processing pipeline and
|
return the generated response.
|
|
"""
|
registry = self.registry
|
has_listeners = registry.has_listeners
|
notify = registry.notify
|
|
if _use_tweens:
|
handle_request = self.handle_request
|
else:
|
handle_request = self.orig_handle_request
|
|
try:
|
response = handle_request(request)
|
|
if request.response_callbacks:
|
request._process_response_callbacks(response)
|
|
has_listeners and notify(NewResponse(request, response))
|
|
return response
|
|
finally:
|
if request.finished_callbacks:
|
request._process_finished_callbacks()
|
|
def __call__(self, environ, start_response):
|
"""
|
Accept ``environ`` and ``start_response``; create a
|
:term:`request` and route the request to a :app:`Pyramid`
|
view based on introspection of :term:`view configuration`
|
within the application registry; call ``start_response`` and
|
return an iterable.
|
"""
|
response = self.execution_policy(environ, self)
|
return response(environ, start_response)
|
|
|
def default_execution_policy(environ, router):
|
with router.request_context(environ) as request:
|
try:
|
return router.invoke_request(request)
|
except Exception:
|
return request.invoke_exception_view(reraise=True)
|