import inspect
|
|
from zope.interface import implementer, provider
|
|
from pyramid.security import NO_PERMISSION_REQUIRED, Authenticated
|
from pyramid.csrf import check_csrf_origin, check_csrf_token
|
from pyramid.response import Response
|
|
from pyramid.interfaces import (
|
IAuthenticationPolicy,
|
IAuthorizationPolicy,
|
IDefaultCSRFOptions,
|
IDefaultPermission,
|
IDebugLogger,
|
IResponse,
|
IViewMapper,
|
IViewMapperFactory,
|
)
|
|
from pyramid.compat import is_bound_method, is_unbound_method
|
|
from pyramid.exceptions import ConfigurationError
|
from pyramid.httpexceptions import HTTPForbidden, HTTPUnauthorized
|
from pyramid.util import object_description, takes_one_arg
|
from pyramid.view import render_view_to_response
|
from pyramid import renderers
|
|
|
def view_description(view):
|
try:
|
return view.__text__
|
except AttributeError:
|
# custom view mappers might not add __text__
|
return object_description(view)
|
|
|
def requestonly(view, attr=None):
|
return takes_one_arg(view, attr=attr, argname='request')
|
|
|
@implementer(IViewMapper)
|
@provider(IViewMapperFactory)
|
class DefaultViewMapper(object):
|
def __init__(self, **kw):
|
self.attr = kw.get('attr')
|
|
def __call__(self, view):
|
if is_unbound_method(view) and self.attr is None:
|
raise ConfigurationError(
|
(
|
'Unbound method calls are not supported, please set the '
|
'class as your `view` and the method as your `attr`'
|
)
|
)
|
|
if inspect.isclass(view):
|
view = self.map_class(view)
|
else:
|
view = self.map_nonclass(view)
|
return view
|
|
def map_class(self, view):
|
ronly = requestonly(view, self.attr)
|
if ronly:
|
mapped_view = self.map_class_requestonly(view)
|
else:
|
mapped_view = self.map_class_native(view)
|
mapped_view.__text__ = 'method %s of %s' % (
|
self.attr or '__call__',
|
object_description(view),
|
)
|
return mapped_view
|
|
def map_nonclass(self, view):
|
# We do more work here than appears necessary to avoid wrapping the
|
# view unless it actually requires wrapping (to avoid function call
|
# overhead).
|
mapped_view = view
|
ronly = requestonly(view, self.attr)
|
if ronly:
|
mapped_view = self.map_nonclass_requestonly(view)
|
elif self.attr:
|
mapped_view = self.map_nonclass_attr(view)
|
if inspect.isroutine(mapped_view):
|
# This branch will be true if the view is a function or a method.
|
# We potentially mutate an unwrapped object here if it's a
|
# function. We do this to avoid function call overhead of
|
# injecting another wrapper. However, we must wrap if the
|
# function is a bound method because we can't set attributes on a
|
# bound method.
|
if is_bound_method(view):
|
_mapped_view = mapped_view
|
|
def mapped_view(context, request):
|
return _mapped_view(context, request)
|
|
if self.attr is not None:
|
mapped_view.__text__ = 'attr %s of %s' % (
|
self.attr,
|
object_description(view),
|
)
|
else:
|
mapped_view.__text__ = object_description(view)
|
return mapped_view
|
|
def map_class_requestonly(self, view):
|
# its a class that has an __init__ which only accepts request
|
attr = self.attr
|
|
def _class_requestonly_view(context, request):
|
inst = view(request)
|
request.__view__ = inst
|
if attr is None:
|
response = inst()
|
else:
|
response = getattr(inst, attr)()
|
return response
|
|
return _class_requestonly_view
|
|
def map_class_native(self, view):
|
# its a class that has an __init__ which accepts both context and
|
# request
|
attr = self.attr
|
|
def _class_view(context, request):
|
inst = view(context, request)
|
request.__view__ = inst
|
if attr is None:
|
response = inst()
|
else:
|
response = getattr(inst, attr)()
|
return response
|
|
return _class_view
|
|
def map_nonclass_requestonly(self, view):
|
# its a function that has a __call__ which accepts only a single
|
# request argument
|
attr = self.attr
|
|
def _requestonly_view(context, request):
|
if attr is None:
|
response = view(request)
|
else:
|
response = getattr(view, attr)(request)
|
return response
|
|
return _requestonly_view
|
|
def map_nonclass_attr(self, view):
|
# its a function that has a __call__ which accepts both context and
|
# request, but still has an attr
|
def _attr_view(context, request):
|
response = getattr(view, self.attr)(context, request)
|
return response
|
|
return _attr_view
|
|
|
def wraps_view(wrapper):
|
def inner(view, info):
|
wrapper_view = wrapper(view, info)
|
return preserve_view_attrs(view, wrapper_view)
|
|
return inner
|
|
|
def preserve_view_attrs(view, wrapper):
|
if view is None:
|
return wrapper
|
|
if wrapper is view:
|
return view
|
|
original_view = getattr(view, '__original_view__', None)
|
|
if original_view is None:
|
original_view = view
|
|
wrapper.__wraps__ = view
|
wrapper.__original_view__ = original_view
|
wrapper.__module__ = view.__module__
|
wrapper.__doc__ = view.__doc__
|
|
try:
|
wrapper.__name__ = view.__name__
|
except AttributeError:
|
wrapper.__name__ = repr(view)
|
|
# attrs that may not exist on "view", but, if so, must be attached to
|
# "wrapped view"
|
for attr in (
|
'__permitted__',
|
'__call_permissive__',
|
'__permission__',
|
'__predicated__',
|
'__predicates__',
|
'__accept__',
|
'__order__',
|
'__text__',
|
):
|
try:
|
setattr(wrapper, attr, getattr(view, attr))
|
except AttributeError:
|
pass
|
|
return wrapper
|
|
|
def mapped_view(view, info):
|
mapper = info.options.get('mapper')
|
if mapper is None:
|
mapper = getattr(view, '__view_mapper__', None)
|
if mapper is None:
|
mapper = info.registry.queryUtility(IViewMapperFactory)
|
if mapper is None:
|
mapper = DefaultViewMapper
|
|
mapped_view = mapper(**info.options)(view)
|
return mapped_view
|
|
|
mapped_view.options = ('mapper', 'attr')
|
|
|
def owrapped_view(view, info):
|
wrapper_viewname = info.options.get('wrapper')
|
viewname = info.options.get('name')
|
if not wrapper_viewname:
|
return view
|
|
def _owrapped_view(context, request):
|
response = view(context, request)
|
request.wrapped_response = response
|
request.wrapped_body = response.body
|
request.wrapped_view = view
|
wrapped_response = render_view_to_response(
|
context, request, wrapper_viewname
|
)
|
if wrapped_response is None:
|
raise ValueError(
|
'No wrapper view named %r found when executing view '
|
'named %r' % (wrapper_viewname, viewname)
|
)
|
return wrapped_response
|
|
return _owrapped_view
|
|
|
owrapped_view.options = ('name', 'wrapper')
|
|
|
def http_cached_view(view, info):
|
if info.settings.get('prevent_http_cache', False):
|
return view
|
|
seconds = info.options.get('http_cache')
|
|
if seconds is None:
|
return view
|
|
options = {}
|
|
if isinstance(seconds, (tuple, list)):
|
try:
|
seconds, options = seconds
|
except ValueError:
|
raise ConfigurationError(
|
'If http_cache parameter is a tuple or list, it must be '
|
'in the form (seconds, options); not %s' % (seconds,)
|
)
|
|
def wrapper(context, request):
|
response = view(context, request)
|
prevent_caching = getattr(
|
response.cache_control, 'prevent_auto', False
|
)
|
if not prevent_caching:
|
response.cache_expires(seconds, **options)
|
return response
|
|
return wrapper
|
|
|
http_cached_view.options = ('http_cache',)
|
|
|
def secured_view(view, info):
|
for wrapper in (_secured_view, _authdebug_view):
|
view = wraps_view(wrapper)(view, info)
|
return view
|
|
|
secured_view.options = ('permission',)
|
|
|
def _secured_view(view, info):
|
permission = explicit_val = info.options.get('permission')
|
if permission is None:
|
permission = info.registry.queryUtility(IDefaultPermission)
|
if permission == NO_PERMISSION_REQUIRED:
|
# allow views registered within configurations that have a
|
# default permission to explicitly override the default
|
# permission, replacing it with no permission at all
|
permission = None
|
|
wrapped_view = view
|
authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
|
authz_policy = info.registry.queryUtility(IAuthorizationPolicy)
|
|
# no-op on exception-only views without an explicit permission
|
if explicit_val is None and info.exception_only:
|
return view
|
|
if authn_policy and authz_policy and (permission is not None):
|
|
def permitted(context, request):
|
principals = authn_policy.effective_principals(request)
|
return authz_policy.permits(context, principals, permission)
|
|
def secured_view(context, request):
|
result = permitted(context, request)
|
if result:
|
return view(context, request)
|
view_name = getattr(view, '__name__', view)
|
msg = getattr(
|
request,
|
'authdebug_message',
|
'Unauthorized: %s failed permission check' % view_name,
|
)
|
if Authenticated in result.principals:
|
raise HTTPForbidden(msg, result=result)
|
raise HTTPUnauthorized(msg)
|
|
wrapped_view = secured_view
|
wrapped_view.__call_permissive__ = view
|
wrapped_view.__permitted__ = permitted
|
wrapped_view.__permission__ = permission
|
|
return wrapped_view
|
|
|
def _authdebug_view(view, info):
|
wrapped_view = view
|
settings = info.settings
|
permission = explicit_val = info.options.get('permission')
|
if permission is None:
|
permission = info.registry.queryUtility(IDefaultPermission)
|
authn_policy = info.registry.queryUtility(IAuthenticationPolicy)
|
authz_policy = info.registry.queryUtility(IAuthorizationPolicy)
|
logger = info.registry.queryUtility(IDebugLogger)
|
|
# no-op on exception-only views without an explicit permission
|
if explicit_val is None and info.exception_only:
|
return view
|
|
if settings and settings.get('debug_authorization', False):
|
|
def authdebug_view(context, request):
|
view_name = getattr(request, 'view_name', None)
|
|
if authn_policy and authz_policy:
|
if permission is NO_PERMISSION_REQUIRED:
|
msg = 'Allowed (NO_PERMISSION_REQUIRED)'
|
elif permission is None:
|
msg = 'Allowed (no permission registered)'
|
else:
|
principals = authn_policy.effective_principals(request)
|
msg = str(
|
authz_policy.permits(context, principals, permission)
|
)
|
else:
|
msg = 'Allowed (no authorization policy in use)'
|
|
view_name = getattr(request, 'view_name', None)
|
url = getattr(request, 'url', None)
|
msg = (
|
'debug_authorization of url %s (view name %r against '
|
'context %r): %s' % (url, view_name, context, msg)
|
)
|
if logger:
|
logger.debug(msg)
|
if request is not None:
|
request.authdebug_message = msg
|
return view(context, request)
|
|
wrapped_view = authdebug_view
|
|
return wrapped_view
|
|
|
def rendered_view(view, info):
|
# one way or another this wrapper must produce a Response (unless
|
# the renderer is a NullRendererHelper)
|
renderer = info.options.get('renderer')
|
if renderer is None:
|
# register a default renderer if you want super-dynamic
|
# rendering. registering a default renderer will also allow
|
# override_renderer to work if a renderer is left unspecified for
|
# a view registration.
|
def viewresult_to_response(context, request):
|
result = view(context, request)
|
if result.__class__ is Response: # common case
|
response = result
|
else:
|
response = info.registry.queryAdapterOrSelf(result, IResponse)
|
if response is None:
|
if result is None:
|
append = (
|
' You may have forgotten to return a value '
|
'from the view callable.'
|
)
|
elif isinstance(result, dict):
|
append = (
|
' You may have forgotten to define a '
|
'renderer in the view configuration.'
|
)
|
else:
|
append = ''
|
|
msg = (
|
'Could not convert return value of the view '
|
'callable %s into a response object. '
|
'The value returned was %r.' + append
|
)
|
|
raise ValueError(msg % (view_description(view), result))
|
|
return response
|
|
return viewresult_to_response
|
|
if renderer is renderers.null_renderer:
|
return view
|
|
def rendered_view(context, request):
|
result = view(context, request)
|
if result.__class__ is Response: # potential common case
|
response = result
|
else:
|
# this must adapt, it can't do a simple interface check
|
# (avoid trying to render webob responses)
|
response = info.registry.queryAdapterOrSelf(result, IResponse)
|
if response is None:
|
attrs = getattr(request, '__dict__', {})
|
if 'override_renderer' in attrs:
|
# renderer overridden by newrequest event or other
|
renderer_name = attrs.pop('override_renderer')
|
view_renderer = renderers.RendererHelper(
|
name=renderer_name,
|
package=info.package,
|
registry=info.registry,
|
)
|
else:
|
view_renderer = renderer.clone()
|
if '__view__' in attrs:
|
view_inst = attrs.pop('__view__')
|
else:
|
view_inst = getattr(view, '__original_view__', view)
|
response = view_renderer.render_view(
|
request, result, view_inst, context
|
)
|
return response
|
|
return rendered_view
|
|
|
rendered_view.options = ('renderer',)
|
|
|
def decorated_view(view, info):
|
decorator = info.options.get('decorator')
|
if decorator is None:
|
return view
|
return decorator(view)
|
|
|
decorated_view.options = ('decorator',)
|
|
|
def csrf_view(view, info):
|
explicit_val = info.options.get('require_csrf')
|
defaults = info.registry.queryUtility(IDefaultCSRFOptions)
|
if defaults is None:
|
default_val = False
|
token = 'csrf_token'
|
header = 'X-CSRF-Token'
|
safe_methods = frozenset(["GET", "HEAD", "OPTIONS", "TRACE"])
|
callback = None
|
else:
|
default_val = defaults.require_csrf
|
token = defaults.token
|
header = defaults.header
|
safe_methods = defaults.safe_methods
|
callback = defaults.callback
|
|
enabled = (
|
explicit_val is True
|
or
|
# fallback to the default val if not explicitly enabled
|
# but only if the view is not an exception view
|
(explicit_val is not False and default_val and not info.exception_only)
|
)
|
# disable if both header and token are disabled
|
enabled = enabled and (token or header)
|
wrapped_view = view
|
if enabled:
|
|
def csrf_view(context, request):
|
if request.method not in safe_methods and (
|
callback is None or callback(request)
|
):
|
check_csrf_origin(request, raises=True)
|
check_csrf_token(request, token, header, raises=True)
|
return view(context, request)
|
|
wrapped_view = csrf_view
|
return wrapped_view
|
|
|
csrf_view.options = ('require_csrf',)
|
|
VIEW = 'VIEW'
|
INGRESS = 'INGRESS'
|