import unittest from zope.interface import implementer from pyramid import testing from pyramid.interfaces import IResponse class TestRouter(unittest.TestCase): def setUp(self): self.config = testing.setUp() self.registry = self.config.registry def tearDown(self): testing.tearDown() def _registerRouteRequest(self, name): from pyramid.interfaces import IRouteRequest from pyramid.request import route_request_iface iface = route_request_iface(name) self.registry.registerUtility(iface, IRouteRequest, name=name) return iface def _connectRoute(self, name, path, factory=None): from pyramid.interfaces import IRoutesMapper from pyramid.urldispatch import RoutesMapper mapper = self.registry.queryUtility(IRoutesMapper) if mapper is None: mapper = RoutesMapper() self.registry.registerUtility(mapper, IRoutesMapper) return mapper.connect(name, path, factory) def _registerLogger(self): from pyramid.interfaces import IDebugLogger logger = DummyLogger() self.registry.registerUtility(logger, IDebugLogger) return logger def _registerSettings(self, **kw): settings = { 'debug_authorization': False, 'debug_notfound': False, 'debug_routematch': False, } settings.update(kw) self.registry.settings = settings def _registerTraverserFactory( self, context, view_name='', subpath=None, traversed=None, virtual_root=None, virtual_root_path=None, raise_error=None, **kw ): from pyramid.interfaces import ITraverser if virtual_root is None: virtual_root = context if subpath is None: subpath = [] if traversed is None: traversed = [] if virtual_root_path is None: virtual_root_path = [] class DummyTraverserFactory: def __init__(self, root): self.root = root def __call__(self, request): if raise_error: raise raise_error values = { 'root': self.root, 'context': context, 'view_name': view_name, 'subpath': subpath, 'traversed': traversed, 'virtual_root': virtual_root, 'virtual_root_path': virtual_root_path, } kw.update(values) return kw self.registry.registerAdapter( DummyTraverserFactory, (None,), ITraverser, name='' ) def _registerView(self, app, name, classifier, req_iface, ctx_iface): from pyramid.interfaces import IView self.registry.registerAdapter( app, (classifier, req_iface, ctx_iface), IView, name ) def _registerEventListener(self, iface): L = [] def listener(event): L.append(event) self.registry.registerHandler(listener, (iface,)) return L def _registerRootFactory(self, val): rootfactory = DummyRootFactory(val) from pyramid.interfaces import IRootFactory self.registry.registerUtility(rootfactory, IRootFactory) return rootfactory def _getTargetClass(self): from pyramid.router import Router return Router def _makeOne(self): klass = self._getTargetClass() return klass(self.registry) def _makeEnviron(self, **extras): environ = { 'wsgi.url_scheme': 'http', 'SERVER_NAME': 'localhost', 'SERVER_PORT': '8080', 'REQUEST_METHOD': 'GET', 'PATH_INFO': '/', } environ.update(extras) return environ def test_ctor_registry_has_no_settings(self): self.registry.settings = None router = self._makeOne() self.assertEqual(router.debug_notfound, False) self.assertEqual(router.debug_routematch, False) self.assertFalse('debug_notfound' in router.__dict__) self.assertFalse('debug_routematch' in router.__dict__) def test_root_policy(self): context = DummyContext() self._registerTraverserFactory(context) rootfactory = self._registerRootFactory('abc') router = self._makeOne() self.assertEqual(router.root_policy, rootfactory) def test_request_factory(self): from pyramid.interfaces import IRequestFactory class DummyRequestFactory(object): pass self.registry.registerUtility(DummyRequestFactory, IRequestFactory) router = self._makeOne() self.assertEqual(router.request_factory, DummyRequestFactory) def test_tween_factories(self): from pyramid.interfaces import ITweens from pyramid.config.tweens import Tweens from pyramid.response import Response from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IResponse tweens = Tweens() self.registry.registerUtility(tweens, ITweens) L = [] def tween_factory1(handler, registry): L.append((handler, registry)) def wrapper(request): request.environ['handled'].append('one') return handler(request) wrapper.name = 'one' wrapper.child = handler return wrapper def tween_factory2(handler, registry): L.append((handler, registry)) def wrapper(request): request.environ['handled'] = ['two'] return handler(request) wrapper.name = 'two' wrapper.child = handler return wrapper tweens.add_implicit('one', tween_factory1) tweens.add_implicit('two', tween_factory2) router = self._makeOne() self.assertEqual(router.handle_request.name, 'two') self.assertEqual(router.handle_request.child.name, 'one') self.assertEqual( router.handle_request.child.child.__name__, 'handle_request' ) context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron() view = DummyView('abc') self._registerView( self.config.derive_view(view), '', IViewClassifier, None, None ) start_response = DummyStartResponse() def make_response(s): return Response(s) router.registry.registerAdapter(make_response, (str,), IResponse) app_iter = router(environ, start_response) self.assertEqual(app_iter, [b'abc']) self.assertEqual(start_response.status, '200 OK') self.assertEqual(environ['handled'], ['two', 'one']) def test_call_traverser_default(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() logger = self._registerLogger() router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue('/' in why.args[0], why) self.assertFalse('debug_notfound' in why.args[0]) self.assertEqual(len(logger.messages), 0) def test_traverser_raises_notfound_class(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context, raise_error=HTTPNotFound) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(HTTPNotFound, router, environ, start_response) def test_traverser_raises_notfound_instance(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory( context, raise_error=HTTPNotFound('foo') ) router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue('foo' in why.args[0], why) def test_traverser_raises_forbidden_class(self): from pyramid.httpexceptions import HTTPForbidden environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context, raise_error=HTTPForbidden) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(HTTPForbidden, router, environ, start_response) def test_traverser_raises_forbidden_instance(self): from pyramid.httpexceptions import HTTPForbidden environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory( context, raise_error=HTTPForbidden('foo') ) router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPForbidden, router, environ, start_response) self.assertTrue('foo' in why.args[0], why) def test_call_no_view_registered_no_isettings(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue('/' in why.args[0], why) self.assertFalse('debug_notfound' in why.args[0]) self.assertEqual(len(logger.messages), 0) def test_call_no_view_registered_debug_notfound_false(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) logger = self._registerLogger() self._registerSettings(debug_notfound=False) router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue('/' in why.args[0], why) self.assertFalse('debug_notfound' in why.args[0]) self.assertEqual(len(logger.messages), 0) def test_call_no_view_registered_debug_notfound_true(self): from pyramid.httpexceptions import HTTPNotFound environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context) self._registerSettings(debug_notfound=True) logger = self._registerLogger() router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue( "debug_notfound of url http://localhost:8080/; " in why.args[0] ) self.assertTrue("view_name: '', subpath: []" in why.args[0]) self.assertTrue('http://localhost:8080' in why.args[0], why) self.assertEqual(len(logger.messages), 1) message = logger.messages[0] self.assertTrue('of url http://localhost:8080' in message) self.assertTrue("path_info: " in message) self.assertTrue('DummyContext' in message) self.assertTrue("view_name: ''" in message) self.assertTrue("subpath: []" in message) def test_call_view_returns_non_iresponse(self): from pyramid.interfaces import IViewClassifier context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron() view = DummyView('abc') self._registerView( self.config.derive_view(view), '', IViewClassifier, None, None ) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(ValueError, router, environ, start_response) def test_call_view_returns_adapted_response(self): from pyramid.response import Response from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IResponse context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron() view = DummyView('abc') self._registerView( self.config.derive_view(view), '', IViewClassifier, None, None ) router = self._makeOne() start_response = DummyStartResponse() def make_response(s): return Response(s) router.registry.registerAdapter(make_response, (str,), IResponse) app_iter = router(environ, start_response) self.assertEqual(app_iter, [b'abc']) self.assertEqual(start_response.status, '200 OK') def test_call_with_request_extensions(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequestExtensions from pyramid.interfaces import IRequest from pyramid.request import Request from pyramid.util import InstancePropertyHelper context = DummyContext() self._registerTraverserFactory(context) class Extensions(object): def __init__(self): self.methods = {} self.descriptors = {} extensions = Extensions() ext_method = lambda r: 'bar' name, fn = InstancePropertyHelper.make_property(ext_method, name='foo') extensions.descriptors[name] = fn request = Request.blank('/') request.request_iface = IRequest request.registry = self.registry def request_factory(environ): return request self.registry.registerUtility(extensions, IRequestExtensions) environ = self._makeEnviron() response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) self._registerView( self.config.derive_view(view), '', IViewClassifier, None, None ) router = self._makeOne() router.request_factory = request_factory start_response = DummyStartResponse() router(environ, start_response) self.assertEqual(view.request.foo, 'bar') def test_call_view_registered_nonspecific_default_path(self): from pyramid.interfaces import IViewClassifier context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron() self._registerView( self.config.derive_view(view), '', IViewClassifier, None, None ) self._registerRootFactory(context) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['Hello world']) self.assertEqual(start_response.headers, ()) self.assertEqual(start_response.status, '200 OK') request = view.request self.assertEqual(request.view_name, '') self.assertEqual(request.subpath, []) self.assertEqual(request.context, context) self.assertEqual(request.root, context) def test_call_view_registered_nonspecific_nondefault_path_and_subpath( self ): from pyramid.interfaces import IViewClassifier context = DummyContext() self._registerTraverserFactory( context, view_name='foo', subpath=['bar'], traversed=['context'] ) self._registerRootFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron() self._registerView(view, 'foo', IViewClassifier, None, None) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['Hello world']) self.assertEqual(start_response.headers, ()) self.assertEqual(start_response.status, '200 OK') request = view.request self.assertEqual(request.view_name, 'foo') self.assertEqual(request.subpath, ['bar']) self.assertEqual(request.context, context) self.assertEqual(request.root, context) def test_call_view_registered_specific_success(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context) self._registerRootFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['Hello world']) self.assertEqual(start_response.headers, ()) self.assertEqual(start_response.status, '200 OK') request = view.request self.assertEqual(request.view_name, '') self.assertEqual(request.subpath, []) self.assertEqual(request.context, context) self.assertEqual(request.root, context) def test_call_view_registered_specific_fail(self): from zope.interface import Interface from zope.interface import directlyProvides from pyramid.httpexceptions import HTTPNotFound from pyramid.interfaces import IViewClassifier class IContext(Interface): pass class INotContext(Interface): pass from pyramid.interfaces import IRequest context = DummyContext() directlyProvides(context, INotContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() view = DummyView(response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(HTTPNotFound, router, environ, start_response) def test_call_view_raises_forbidden(self): from zope.interface import Interface from zope.interface import directlyProvides from pyramid.httpexceptions import HTTPForbidden class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() view = DummyView( response, raise_exception=HTTPForbidden("unauthorized") ) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPForbidden, router, environ, start_response) self.assertEqual(why.args[0], 'unauthorized') def test_call_view_raises_notfound(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier from pyramid.httpexceptions import HTTPNotFound context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() view = DummyView(response, raise_exception=HTTPNotFound("notfound")) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertEqual(why.args[0], 'notfound') def test_call_view_raises_response_cleared(self): from zope.interface import Interface from zope.interface import directlyProvides from pyramid.interfaces import IExceptionViewClassifier class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) def view(context, request): request.response.a = 1 raise KeyError def exc_view(context, request): self.assertFalse(hasattr(request.response, 'a')) request.response.body = b'OK' return request.response environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) self._registerView( exc_view, '', IExceptionViewClassifier, IRequest, KeyError ) router = self._makeOne() start_response = DummyStartResponse() itera = router(environ, start_response) self.assertEqual(itera, [b'OK']) def test_call_request_has_response_callbacks(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse('200 OK') def view(context, request): def callback(request, response): response.called_back = True request.add_response_callback(callback) return response environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() router(environ, start_response) self.assertEqual(response.called_back, True) def test_call_request_has_finished_callbacks_when_view_succeeds(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse('200 OK') def view(context, request): def callback(request): request.environ['called_back'] = True request.add_finished_callback(callback) return response environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() router(environ, start_response) self.assertEqual(environ['called_back'], True) def test_call_request_has_finished_callbacks_when_view_raises(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) def view(context, request): def callback(request): request.environ['called_back'] = True request.add_finished_callback(callback) raise NotImplementedError environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, IContext) router = self._makeOne() start_response = DummyStartResponse() exc_raised(NotImplementedError, router, environ, start_response) self.assertEqual(environ['called_back'], True) def test_call_request_factory_raises(self): # making sure finally doesnt barf when a request cannot be created environ = self._makeEnviron() router = self._makeOne() def dummy_request_factory(environ): raise NotImplementedError router.request_factory = dummy_request_factory start_response = DummyStartResponse() exc_raised(NotImplementedError, router, environ, start_response) def test_call_eventsends(self): from pyramid.interfaces import INewRequest from pyramid.interfaces import INewResponse from pyramid.interfaces import IBeforeTraversal from pyramid.interfaces import IContextFound from pyramid.interfaces import IViewClassifier context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, None, None) request_events = self._registerEventListener(INewRequest) beforetraversal_events = self._registerEventListener(IBeforeTraversal) context_found_events = self._registerEventListener(IContextFound) response_events = self._registerEventListener(INewResponse) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(len(request_events), 1) self.assertEqual(request_events[0].request.environ, environ) self.assertEqual(len(beforetraversal_events), 1) self.assertEqual(beforetraversal_events[0].request.environ, environ) self.assertEqual(len(context_found_events), 1) self.assertEqual(context_found_events[0].request.environ, environ) self.assertEqual(context_found_events[0].request.context, context) self.assertEqual(len(response_events), 1) self.assertEqual(response_events[0].response, response) self.assertEqual(response_events[0].request.context, context) self.assertEqual(result, response.app_iter) def test_call_newrequest_evllist_exc_can_be_caught_by_exceptionview(self): from pyramid.interfaces import INewRequest from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron() def listener(event): raise KeyError self.registry.registerHandler(listener, (INewRequest,)) exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] exception_view = DummyView(exception_response) environ = self._makeEnviron() self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, KeyError ) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, exception_response.app_iter) def test_call_route_matches_and_has_factory(self): from pyramid.interfaces import IViewClassifier logger = self._registerLogger() self._registerSettings(debug_routematch=True) self._registerRouteRequest('foo') root = object() def factory(request): return root route = self._connectRoute('foo', 'archives/:action/:article', factory) route.predicates = [DummyPredicate()] context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') self._registerView(view, '', IViewClassifier, None, None) self._registerRootFactory(context) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['Hello world']) self.assertEqual(start_response.headers, ()) self.assertEqual(start_response.status, '200 OK') request = view.request self.assertEqual(request.view_name, '') self.assertEqual(request.subpath, []) self.assertEqual(request.context, context) self.assertEqual(request.root, root) matchdict = {'action': 'action1', 'article': 'article1'} self.assertEqual(request.matchdict, matchdict) self.assertEqual(request.matched_route.name, 'foo') self.assertEqual(len(logger.messages), 1) self.assertTrue( logger.messages[0].startswith( "route matched for url http://localhost:8080" "/archives/action1/article1; " "route_name: 'foo', " "path_info: " ) ) self.assertTrue("predicates: 'predicate'" in logger.messages[0]) def test_call_route_match_miss_debug_routematch(self): from pyramid.httpexceptions import HTTPNotFound logger = self._registerLogger() self._registerSettings(debug_routematch=True) self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article') context = DummyContext() self._registerTraverserFactory(context) environ = self._makeEnviron(PATH_INFO='/wontmatch') self._registerRootFactory(context) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(HTTPNotFound, router, environ, start_response) self.assertEqual(len(logger.messages), 1) self.assertEqual( logger.messages[0], 'no route matched for url http://localhost:8080/wontmatch', ) def test_call_route_matches_doesnt_overwrite_subscriber_iface(self): from pyramid.interfaces import INewRequest from pyramid.interfaces import IViewClassifier from zope.interface import alsoProvides from zope.interface import Interface self._registerRouteRequest('foo') class IFoo(Interface): pass def listener(event): alsoProvides(event.request, IFoo) self.registry.registerHandler(listener, (INewRequest,)) root = object() def factory(request): return root self._connectRoute('foo', 'archives/:action/:article', factory) context = DummyContext() self._registerTraverserFactory(context) response = DummyResponse() response.app_iter = ['Hello world'] view = DummyView(response) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') self._registerView(view, '', IViewClassifier, None, None) self._registerRootFactory(context) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['Hello world']) self.assertEqual(start_response.headers, ()) self.assertEqual(start_response.status, '200 OK') request = view.request self.assertEqual(request.view_name, '') self.assertEqual(request.subpath, []) self.assertEqual(request.context, context) self.assertEqual(request.root, root) matchdict = {'action': 'action1', 'article': 'article1'} self.assertEqual(request.matchdict, matchdict) self.assertEqual(request.matched_route.name, 'foo') self.assertTrue(IFoo.providedBy(request)) def test_root_factory_raises_notfound(self): from pyramid.interfaces import IRootFactory from pyramid.httpexceptions import HTTPNotFound from zope.interface import Interface from zope.interface import directlyProvides def rootfactory(request): raise HTTPNotFound('from root factory') self.registry.registerUtility(rootfactory, IRootFactory) class IContext(Interface): pass context = DummyContext() directlyProvides(context, IContext) environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPNotFound, router, environ, start_response) self.assertTrue('from root factory' in why.args[0]) def test_root_factory_raises_forbidden(self): from pyramid.interfaces import IRootFactory from pyramid.httpexceptions import HTTPForbidden from zope.interface import Interface from zope.interface import directlyProvides def rootfactory(request): raise HTTPForbidden('from root factory') self.registry.registerUtility(rootfactory, IRootFactory) class IContext(Interface): pass context = DummyContext() directlyProvides(context, IContext) environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() why = exc_raised(HTTPForbidden, router, environ, start_response) self.assertTrue('from root factory' in why.args[0]) def test_root_factory_exception_propagating(self): from pyramid.interfaces import IRootFactory from zope.interface import Interface from zope.interface import directlyProvides def rootfactory(request): raise RuntimeError() self.registry.registerUtility(rootfactory, IRootFactory) class IContext(Interface): pass context = DummyContext() directlyProvides(context, IContext) environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(RuntimeError, router, environ, start_response) def test_traverser_exception_propagating(self): environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context, raise_error=RuntimeError()) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(RuntimeError, router, environ, start_response) def test_call_view_exception_propagating(self): from zope.interface import Interface from zope.interface import directlyProvides class IContext(Interface): pass from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequestFactory from pyramid.interfaces import IExceptionViewClassifier def rfactory(environ): return request self.registry.registerUtility(rfactory, IRequestFactory) from pyramid.request import Request request = Request.blank('/') context = DummyContext() directlyProvides(context, IContext) self._registerTraverserFactory(context, subpath=['']) response = DummyResponse() response.app_iter = ['OK'] error = RuntimeError() view = DummyView(response, raise_exception=error) environ = self._makeEnviron() def exception_view(context, request): self.assertEqual(request.exc_info[0], RuntimeError) return response self._registerView(view, '', IViewClassifier, IRequest, IContext) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ['OK']) # exc_info and exception should still be around on the request after # the excview tween has run (see # https://github.com/Pylons/pyramid/issues/1223) self.assertEqual(request.exception, error) self.assertEqual(request.exc_info[:2], (RuntimeError, error)) def test_call_view_raises_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest response = DummyResponse() exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] view = DummyView(response, raise_exception=RuntimeError) def exception_view(context, request): self.assertEqual(request.exception.__class__, RuntimeError) return exception_response environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, None) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_call_view_raises_super_exception_sub_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class SuperException(Exception): pass class SubException(SuperException): pass response = DummyResponse() exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] view = DummyView(response, raise_exception=SuperException) exception_view = DummyView(exception_response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, None) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, SubException, ) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(SuperException, router, environ, start_response) def test_call_view_raises_sub_exception_super_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class SuperException(Exception): pass class SubException(SuperException): pass response = DummyResponse() exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] view = DummyView(response, raise_exception=SubException) exception_view = DummyView(exception_response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, None) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, SuperException, ) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_call_view_raises_exception_another_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class MyException(Exception): pass class AnotherException(Exception): pass response = DummyResponse() exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] view = DummyView(response, raise_exception=MyException) exception_view = DummyView(exception_response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, None) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, AnotherException, ) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(MyException, router, environ, start_response) def test_root_factory_raises_exception_view(self): from pyramid.interfaces import IRootFactory from pyramid.interfaces import IRequest from pyramid.interfaces import IExceptionViewClassifier def rootfactory(request): raise RuntimeError() self.registry.registerUtility(rootfactory, IRootFactory) exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] exception_view = DummyView(exception_response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() app_iter = router(environ, start_response) self.assertEqual(app_iter, ["Hello, world"]) def test_traverser_raises_exception_view(self): from pyramid.interfaces import IRequest from pyramid.interfaces import IExceptionViewClassifier environ = self._makeEnviron() context = DummyContext() self._registerTraverserFactory(context, raise_error=RuntimeError()) exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] exception_view = DummyView(exception_response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) router = self._makeOne() start_response = DummyStartResponse() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_exception_view_returns_non_iresponse(self): from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier environ = self._makeEnviron() response = DummyResponse() view = DummyView(response, raise_exception=RuntimeError) self._registerView( self.config.derive_view(view), '', IViewClassifier, IRequest, None ) exception_view = DummyView(None) self._registerView( self.config.derive_view(exception_view), '', IExceptionViewClassifier, IRequest, RuntimeError, ) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(ValueError, router, environ, start_response) def test_call_route_raises_route_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=RuntimeError) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, req_iface, RuntimeError, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_call_view_raises_exception_route_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=RuntimeError) self._registerView(view, '', IViewClassifier, IRequest, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, req_iface, RuntimeError, ) environ = self._makeEnviron() start_response = DummyStartResponse() router = self._makeOne() self.assertRaises(RuntimeError, router, environ, start_response) def test_call_route_raises_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=RuntimeError) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_call_route_raises_super_exception_sub_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class SuperException(Exception): pass class SubException(SuperException): pass req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=SuperException) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, SubException, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() self.assertRaises(SuperException, router, environ, start_response) def test_call_route_raises_sub_exception_super_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class SuperException(Exception): pass class SubException(SuperException): pass req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=SubException) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, SuperException, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_call_route_raises_exception_another_exception_view(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class MyException(Exception): pass class AnotherException(Exception): pass req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=MyException) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, AnotherException, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() self.assertRaises(MyException, router, environ, start_response) def test_call_route_raises_exception_view_specializing(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=RuntimeError) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, IRequest, RuntimeError, ) response_spec = DummyResponse() response_spec.app_iter = ["Hello, special world"] exception_view_spec = DummyView(response_spec) self._registerView( exception_view_spec, '', IExceptionViewClassifier, req_iface, RuntimeError, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() result = router(environ, start_response) self.assertEqual(result, ["Hello, special world"]) def test_call_route_raises_exception_view_another_route(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier req_iface = self._registerRouteRequest('foo') another_req_iface = self._registerRouteRequest('bar') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=RuntimeError) self._registerView(view, '', IViewClassifier, req_iface, None) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view = DummyView(response) self._registerView( exception_view, '', IExceptionViewClassifier, another_req_iface, RuntimeError, ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() self.assertRaises(RuntimeError, router, environ, start_response) def test_call_view_raises_exception_view_route(self): from pyramid.interfaces import IRequest from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier req_iface = self._registerRouteRequest('foo') response = DummyResponse() exception_response = DummyResponse() exception_response.app_iter = ["Hello, world"] view = DummyView(response, raise_exception=RuntimeError) exception_view = DummyView(exception_response) environ = self._makeEnviron() self._registerView(view, '', IViewClassifier, IRequest, None) self._registerView( exception_view, '', IExceptionViewClassifier, req_iface, RuntimeError, ) router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(RuntimeError, router, environ, start_response) def test_call_view_raises_predicate_mismatch(self): from pyramid.exceptions import PredicateMismatch from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequest view = DummyView(DummyResponse(), raise_exception=PredicateMismatch) self._registerView(view, '', IViewClassifier, IRequest, None) environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(PredicateMismatch, router, environ, start_response) def test_call_view_predicate_mismatch_doesnt_hide_views(self): from pyramid.exceptions import PredicateMismatch from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequest, IResponse from pyramid.response import Response class BaseContext: pass class DummyContext(BaseContext): pass context = DummyContext() self._registerTraverserFactory(context) view = DummyView(DummyResponse(), raise_exception=PredicateMismatch) self._registerView(view, '', IViewClassifier, IRequest, DummyContext) good_view = DummyView('abc') self._registerView( self.config.derive_view(good_view), '', IViewClassifier, IRequest, BaseContext, ) router = self._makeOne() def make_response(s): return Response(s) router.registry.registerAdapter(make_response, (str,), IResponse) environ = self._makeEnviron() start_response = DummyStartResponse() app_iter = router(environ, start_response) self.assertEqual(app_iter, [b'abc']) def test_call_view_multiple_predicate_mismatches_dont_hide_views(self): from pyramid.exceptions import PredicateMismatch from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequest, IResponse from pyramid.response import Response from zope.interface import Interface, implementer class IBaseContext(Interface): pass class IContext(IBaseContext): pass @implementer(IContext) class DummyContext: pass context = DummyContext() self._registerTraverserFactory(context) view1 = DummyView(DummyResponse(), raise_exception=PredicateMismatch) self._registerView(view1, '', IViewClassifier, IRequest, DummyContext) view2 = DummyView(DummyResponse(), raise_exception=PredicateMismatch) self._registerView(view2, '', IViewClassifier, IRequest, IContext) good_view = DummyView('abc') self._registerView( self.config.derive_view(good_view), '', IViewClassifier, IRequest, IBaseContext, ) router = self._makeOne() def make_response(s): return Response(s) router.registry.registerAdapter(make_response, (str,), IResponse) environ = self._makeEnviron() start_response = DummyStartResponse() app_iter = router(environ, start_response) self.assertEqual(app_iter, [b'abc']) def test_call_view_predicate_mismatch_doesnt_find_unrelated_views(self): from pyramid.exceptions import PredicateMismatch from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IRequest from zope.interface import Interface, implementer class IContext(Interface): pass class IOtherContext(Interface): pass @implementer(IContext) class DummyContext: pass context = DummyContext() self._registerTraverserFactory(context) view = DummyView(DummyResponse(), raise_exception=PredicateMismatch) self._registerView(view, '', IViewClassifier, IRequest, DummyContext) please_dont_call_me_view = DummyView('abc') self._registerView( self.config.derive_view(please_dont_call_me_view), '', IViewClassifier, IRequest, IOtherContext, ) router = self._makeOne() environ = self._makeEnviron() router = self._makeOne() start_response = DummyStartResponse() self.assertRaises(PredicateMismatch, router, environ, start_response) def test_custom_execution_policy(self): from pyramid.interfaces import IExecutionPolicy from pyramid.request import Request from pyramid.response import Response registry = self.config.registry def dummy_policy(environ, router): return Response(status=200, body=b'foo') registry.registerUtility(dummy_policy, IExecutionPolicy) router = self._makeOne() resp = Request.blank('/').get_response(router) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.body, b'foo') def test_execution_policy_handles_exception(self): from pyramid.interfaces import IViewClassifier from pyramid.interfaces import IExceptionViewClassifier from pyramid.interfaces import IRequest class Exception1(Exception): pass class Exception2(Exception): pass req_iface = self._registerRouteRequest('foo') self._connectRoute('foo', 'archives/:action/:article', None) view = DummyView(DummyResponse(), raise_exception=Exception1) self._registerView(view, '', IViewClassifier, req_iface, None) exception_view1 = DummyView( DummyResponse(), raise_exception=Exception2 ) self._registerView( exception_view1, '', IExceptionViewClassifier, IRequest, Exception1 ) response = DummyResponse() response.app_iter = ["Hello, world"] exception_view2 = DummyView(response) self._registerView( exception_view2, '', IExceptionViewClassifier, IRequest, Exception2 ) environ = self._makeEnviron(PATH_INFO='/archives/action1/article1') start_response = DummyStartResponse() router = self._makeOne() result = router(environ, start_response) self.assertEqual(result, ["Hello, world"]) def test_request_context_with_statement(self): from pyramid.threadlocal import get_current_request from pyramid.interfaces import IExecutionPolicy from pyramid.request import Request from pyramid.response import Response registry = self.config.registry result = [] def dummy_policy(environ, router): with router.request_context(environ): result.append(get_current_request()) result.append(get_current_request()) return Response(status=200, body=b'foo') registry.registerUtility(dummy_policy, IExecutionPolicy) router = self._makeOne() resp = Request.blank('/test_path').get_response(router) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.body, b'foo') self.assertEqual(result[0].path_info, '/test_path') self.assertEqual(result[1], None) def test_request_context_manually(self): from pyramid.threadlocal import get_current_request from pyramid.interfaces import IExecutionPolicy from pyramid.request import Request from pyramid.response import Response registry = self.config.registry result = [] def dummy_policy(environ, router): ctx = router.request_context(environ) ctx.begin() result.append(get_current_request()) ctx.end() result.append(get_current_request()) return Response(status=200, body=b'foo') registry.registerUtility(dummy_policy, IExecutionPolicy) router = self._makeOne() resp = Request.blank('/test_path').get_response(router) self.assertEqual(resp.status_code, 200) self.assertEqual(resp.body, b'foo') self.assertEqual(result[0].path_info, '/test_path') self.assertEqual(result[1], None) class DummyPredicate(object): def __call__(self, info, request): return True def text(self): return 'predicate' class DummyContext: pass class DummyView: def __init__(self, response, raise_exception=None): self.response = response self.raise_exception = raise_exception def __call__(self, context, request): self.context = context self.request = request if self.raise_exception is not None: raise self.raise_exception return self.response class DummyRootFactory: def __init__(self, root): self.root = root def __call__(self, environ): return self.root class DummyStartResponse: status = () headers = () def __call__(self, status, headers): self.status = status self.headers = headers @implementer(IResponse) class DummyResponse(object): headerlist = () app_iter = () environ = None def __init__(self, status='200 OK'): self.status = status def __call__(self, environ, start_response): self.environ = environ start_response(self.status, self.headerlist) return self.app_iter class DummyAuthenticationPolicy: pass class DummyLogger: def __init__(self): self.messages = [] def info(self, msg): self.messages.append(msg) warn = info debug = info def exc_raised(exc, func, *arg, **kw): try: func(*arg, **kw) except exc as e: return e else: raise AssertionError('%s not raised' % exc) # pragma: no cover