import unittest
|
from pyramid import testing
|
|
from pyramid.compat import PY2, text_, bytes_, native_
|
from pyramid.security import AuthenticationAPIMixin, AuthorizationAPIMixin
|
|
|
class TestRequest(unittest.TestCase):
|
def setUp(self):
|
self.config = testing.setUp()
|
|
def tearDown(self):
|
testing.tearDown()
|
|
def _getTargetClass(self):
|
from pyramid.request import Request
|
|
return Request
|
|
def _makeOne(self, environ=None):
|
if environ is None:
|
environ = {}
|
return self._getTargetClass()(environ)
|
|
def _registerResourceURL(self):
|
from pyramid.interfaces import IResourceURL
|
from zope.interface import Interface
|
|
class DummyResourceURL(object):
|
def __init__(self, context, request):
|
self.physical_path = '/context/'
|
self.virtual_path = '/context/'
|
|
self.config.registry.registerAdapter(
|
DummyResourceURL, (Interface, Interface), IResourceURL
|
)
|
|
def test_class_conforms_to_IRequest(self):
|
from zope.interface.verify import verifyClass
|
from pyramid.interfaces import IRequest
|
|
verifyClass(IRequest, self._getTargetClass())
|
|
def test_instance_conforms_to_IRequest(self):
|
from zope.interface.verify import verifyObject
|
from pyramid.interfaces import IRequest
|
|
verifyObject(IRequest, self._makeOne())
|
|
def test_ResponseClass_is_pyramid_Response(self):
|
from pyramid.response import Response
|
|
cls = self._getTargetClass()
|
self.assertEqual(cls.ResponseClass, Response)
|
|
def test_implements_security_apis(self):
|
apis = (AuthenticationAPIMixin, AuthorizationAPIMixin)
|
r = self._makeOne()
|
self.assertTrue(isinstance(r, apis))
|
|
def test_charset_defaults_to_utf8(self):
|
r = self._makeOne({'PATH_INFO': '/'})
|
self.assertEqual(r.charset, 'UTF-8')
|
|
def test_exception_defaults_to_None(self):
|
r = self._makeOne({'PATH_INFO': '/'})
|
self.assertEqual(r.exception, None)
|
|
def test_matchdict_defaults_to_None(self):
|
r = self._makeOne({'PATH_INFO': '/'})
|
self.assertEqual(r.matchdict, None)
|
|
def test_matched_route_defaults_to_None(self):
|
r = self._makeOne({'PATH_INFO': '/'})
|
self.assertEqual(r.matched_route, None)
|
|
def test_params_decoded_from_utf_8_by_default(self):
|
environ = {'PATH_INFO': '/', 'QUERY_STRING': 'la=La%20Pe%C3%B1a'}
|
request = self._makeOne(environ)
|
request.charset = None
|
self.assertEqual(request.GET['la'], text_(b'La Pe\xf1a'))
|
|
def test_tmpl_context(self):
|
from pyramid.request import TemplateContext
|
|
inst = self._makeOne()
|
result = inst.tmpl_context
|
self.assertEqual(result.__class__, TemplateContext)
|
|
def test_session_configured(self):
|
from pyramid.interfaces import ISessionFactory
|
|
inst = self._makeOne()
|
|
def factory(request):
|
return 'orangejuice'
|
|
self.config.registry.registerUtility(factory, ISessionFactory)
|
inst.registry = self.config.registry
|
self.assertEqual(inst.session, 'orangejuice')
|
self.assertEqual(inst.__dict__['session'], 'orangejuice')
|
|
def test_session_not_configured(self):
|
inst = self._makeOne()
|
inst.registry = self.config.registry
|
self.assertRaises(AttributeError, getattr, inst, 'session')
|
|
def test_setattr_and_getattr_dotnotation(self):
|
inst = self._makeOne()
|
inst.foo = 1
|
self.assertEqual(inst.foo, 1)
|
|
def test_setattr_and_getattr(self):
|
environ = {}
|
inst = self._makeOne(environ)
|
setattr(inst, 'bar', 1)
|
self.assertEqual(getattr(inst, 'bar'), 1)
|
self.assertEqual(environ, {}) # make sure we're not using adhoc attrs
|
|
def test_add_response_callback(self):
|
inst = self._makeOne()
|
self.assertEqual(len(inst.response_callbacks), 0)
|
|
def callback(request, response):
|
""" """
|
|
inst.add_response_callback(callback)
|
self.assertEqual(list(inst.response_callbacks), [callback])
|
inst.add_response_callback(callback)
|
self.assertEqual(list(inst.response_callbacks), [callback, callback])
|
|
def test__process_response_callbacks(self):
|
inst = self._makeOne()
|
|
def callback1(request, response):
|
request.called1 = True
|
response.called1 = True
|
|
def callback2(request, response):
|
request.called2 = True
|
response.called2 = True
|
|
inst.add_response_callback(callback1)
|
inst.add_response_callback(callback2)
|
response = DummyResponse()
|
inst._process_response_callbacks(response)
|
self.assertEqual(inst.called1, True)
|
self.assertEqual(inst.called2, True)
|
self.assertEqual(response.called1, True)
|
self.assertEqual(response.called2, True)
|
self.assertEqual(len(inst.response_callbacks), 0)
|
|
def test__process_response_callback_adding_response_callback(self):
|
"""
|
When a response callback adds another callback, that new callback
|
should still be called.
|
|
See https://github.com/Pylons/pyramid/pull/1373
|
"""
|
inst = self._makeOne()
|
|
def callback1(request, response):
|
request.called1 = True
|
response.called1 = True
|
request.add_response_callback(callback2)
|
|
def callback2(request, response):
|
request.called2 = True
|
response.called2 = True
|
|
inst.add_response_callback(callback1)
|
response = DummyResponse()
|
inst._process_response_callbacks(response)
|
self.assertEqual(inst.called1, True)
|
self.assertEqual(inst.called2, True)
|
self.assertEqual(response.called1, True)
|
self.assertEqual(response.called2, True)
|
self.assertEqual(len(inst.response_callbacks), 0)
|
|
def test_add_finished_callback(self):
|
inst = self._makeOne()
|
self.assertEqual(len(inst.finished_callbacks), 0)
|
|
def callback(request):
|
""" """
|
|
inst.add_finished_callback(callback)
|
self.assertEqual(list(inst.finished_callbacks), [callback])
|
inst.add_finished_callback(callback)
|
self.assertEqual(list(inst.finished_callbacks), [callback, callback])
|
|
def test__process_finished_callbacks(self):
|
inst = self._makeOne()
|
|
def callback1(request):
|
request.called1 = True
|
|
def callback2(request):
|
request.called2 = True
|
|
inst.add_finished_callback(callback1)
|
inst.add_finished_callback(callback2)
|
inst._process_finished_callbacks()
|
self.assertEqual(inst.called1, True)
|
self.assertEqual(inst.called2, True)
|
self.assertEqual(len(inst.finished_callbacks), 0)
|
|
def test_resource_url(self):
|
self._registerResourceURL()
|
environ = {
|
'PATH_INFO': '/',
|
'SERVER_NAME': 'example.com',
|
'SERVER_PORT': '80',
|
'wsgi.url_scheme': 'http',
|
}
|
inst = self._makeOne(environ)
|
root = DummyContext()
|
result = inst.resource_url(root)
|
self.assertEqual(result, 'http://example.com/context/')
|
|
def test_route_url(self):
|
environ = {
|
'PATH_INFO': '/',
|
'SERVER_NAME': 'example.com',
|
'SERVER_PORT': '5432',
|
'QUERY_STRING': 'la=La%20Pe%C3%B1a',
|
'wsgi.url_scheme': 'http',
|
}
|
from pyramid.interfaces import IRoutesMapper
|
|
inst = self._makeOne(environ)
|
mapper = DummyRoutesMapper(route=DummyRoute('/1/2/3'))
|
self.config.registry.registerUtility(mapper, IRoutesMapper)
|
result = inst.route_url(
|
'flub',
|
'extra1',
|
'extra2',
|
a=1,
|
b=2,
|
c=3,
|
_query={'a': 1},
|
_anchor=text_("foo"),
|
)
|
self.assertEqual(
|
result, 'http://example.com:5432/1/2/3/extra1/extra2?a=1#foo'
|
)
|
|
def test_route_path(self):
|
environ = {
|
'PATH_INFO': '/',
|
'SERVER_NAME': 'example.com',
|
'SERVER_PORT': '5432',
|
'QUERY_STRING': 'la=La%20Pe%C3%B1a',
|
'wsgi.url_scheme': 'http',
|
}
|
from pyramid.interfaces import IRoutesMapper
|
|
inst = self._makeOne(environ)
|
mapper = DummyRoutesMapper(route=DummyRoute('/1/2/3'))
|
self.config.registry.registerUtility(mapper, IRoutesMapper)
|
result = inst.route_path(
|
'flub',
|
'extra1',
|
'extra2',
|
a=1,
|
b=2,
|
c=3,
|
_query={'a': 1},
|
_anchor=text_("foo"),
|
)
|
self.assertEqual(result, '/1/2/3/extra1/extra2?a=1#foo')
|
|
def test_static_url(self):
|
from pyramid.interfaces import IStaticURLInfo
|
|
environ = {
|
'PATH_INFO': '/',
|
'SERVER_NAME': 'example.com',
|
'SERVER_PORT': '5432',
|
'QUERY_STRING': '',
|
'wsgi.url_scheme': 'http',
|
}
|
request = self._makeOne(environ)
|
info = DummyStaticURLInfo('abc')
|
self.config.registry.registerUtility(info, IStaticURLInfo)
|
result = request.static_url('pyramid.tests:static/foo.css')
|
self.assertEqual(result, 'abc')
|
self.assertEqual(
|
info.args, ('pyramid.tests:static/foo.css', request, {})
|
)
|
|
def test_is_response_false(self):
|
request = self._makeOne()
|
request.registry = self.config.registry
|
self.assertEqual(request.is_response('abc'), False)
|
|
def test_is_response_true_ob_is_pyramid_response(self):
|
from pyramid.response import Response
|
|
r = Response('hello')
|
request = self._makeOne()
|
request.registry = self.config.registry
|
self.assertEqual(request.is_response(r), True)
|
|
def test_is_response_false_adapter_is_not_self(self):
|
from pyramid.interfaces import IResponse
|
|
request = self._makeOne()
|
request.registry = self.config.registry
|
|
def adapter(ob):
|
return object()
|
|
class Foo(object):
|
pass
|
|
foo = Foo()
|
request.registry.registerAdapter(adapter, (Foo,), IResponse)
|
self.assertEqual(request.is_response(foo), False)
|
|
def test_is_response_adapter_true(self):
|
from pyramid.interfaces import IResponse
|
|
request = self._makeOne()
|
request.registry = self.config.registry
|
|
class Foo(object):
|
pass
|
|
foo = Foo()
|
|
def adapter(ob):
|
return ob
|
|
request.registry.registerAdapter(adapter, (Foo,), IResponse)
|
self.assertEqual(request.is_response(foo), True)
|
|
def test_json_body_invalid_json(self):
|
request = self._makeOne({'REQUEST_METHOD': 'POST'})
|
request.body = b'{'
|
self.assertRaises(ValueError, getattr, request, 'json_body')
|
|
def test_json_body_valid_json(self):
|
request = self._makeOne({'REQUEST_METHOD': 'POST'})
|
request.body = b'{"a":1}'
|
self.assertEqual(request.json_body, {'a': 1})
|
|
def test_json_body_alternate_charset(self):
|
import json
|
|
request = self._makeOne({'REQUEST_METHOD': 'POST'})
|
inp = text_(
|
b'/\xe6\xb5\x81\xe8\xa1\x8c\xe8\xb6\x8b\xe5\x8a\xbf', 'utf-8'
|
)
|
if PY2:
|
body = json.dumps({'a': inp}).decode('utf-8').encode('utf-16')
|
else:
|
body = bytes(json.dumps({'a': inp}), 'utf-16')
|
request.body = body
|
request.content_type = 'application/json; charset=utf-16'
|
self.assertEqual(request.json_body, {'a': inp})
|
|
def test_json_body_GET_request(self):
|
request = self._makeOne({'REQUEST_METHOD': 'GET'})
|
self.assertRaises(ValueError, getattr, request, 'json_body')
|
|
def test_set_property(self):
|
request = self._makeOne()
|
opts = [2, 1]
|
|
def connect(obj):
|
return opts.pop()
|
|
request.set_property(connect, name='db')
|
self.assertEqual(1, request.db)
|
self.assertEqual(2, request.db)
|
|
def test_set_property_reify(self):
|
request = self._makeOne()
|
opts = [2, 1]
|
|
def connect(obj):
|
return opts.pop()
|
|
request.set_property(connect, name='db', reify=True)
|
self.assertEqual(1, request.db)
|
self.assertEqual(1, request.db)
|
|
|
class Test_route_request_iface(unittest.TestCase):
|
def _callFUT(self, name):
|
from pyramid.request import route_request_iface
|
|
return route_request_iface(name)
|
|
def test_it(self):
|
iface = self._callFUT('routename')
|
self.assertEqual(iface.__name__, 'routename_IRequest')
|
self.assertTrue(hasattr(iface, 'combined'))
|
self.assertEqual(
|
iface.combined.__name__, 'routename_combined_IRequest'
|
)
|
|
def test_it_routename_with_spaces(self):
|
# see https://github.com/Pylons/pyramid/issues/232
|
iface = self._callFUT('routename with spaces')
|
self.assertEqual(iface.__name__, 'routename with spaces_IRequest')
|
self.assertTrue(hasattr(iface, 'combined'))
|
self.assertEqual(
|
iface.combined.__name__, 'routename with spaces_combined_IRequest'
|
)
|
|
|
class Test_add_global_response_headers(unittest.TestCase):
|
def _callFUT(self, request, headerlist):
|
from pyramid.request import add_global_response_headers
|
|
return add_global_response_headers(request, headerlist)
|
|
def test_it(self):
|
request = DummyRequest()
|
response = DummyResponse()
|
self._callFUT(request, [('c', 1)])
|
self.assertEqual(len(request.response_callbacks), 1)
|
request.response_callbacks[0](None, response)
|
self.assertEqual(response.headerlist, [('c', 1)])
|
|
|
class Test_call_app_with_subpath_as_path_info(unittest.TestCase):
|
def _callFUT(self, request, app):
|
from pyramid.request import call_app_with_subpath_as_path_info
|
|
return call_app_with_subpath_as_path_info(request, app)
|
|
def test_it_all_request_and_environment_data_missing(self):
|
request = DummyRequest({})
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '')
|
self.assertEqual(request.environ['PATH_INFO'], '/')
|
|
def test_it_with_subpath_and_path_info(self):
|
request = DummyRequest({'PATH_INFO': '/hello'})
|
request.subpath = ('hello',)
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '')
|
self.assertEqual(request.environ['PATH_INFO'], '/hello')
|
|
def test_it_with_subpath_and_path_info_path_info_endswith_slash(self):
|
request = DummyRequest({'PATH_INFO': '/hello/'})
|
request.subpath = ('hello',)
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '')
|
self.assertEqual(request.environ['PATH_INFO'], '/hello/')
|
|
def test_it_with_subpath_and_path_info_extra_script_name(self):
|
request = DummyRequest(
|
{'PATH_INFO': '/hello', 'SCRIPT_NAME': '/script'}
|
)
|
request.subpath = ('hello',)
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '/script')
|
self.assertEqual(request.environ['PATH_INFO'], '/hello')
|
|
def test_it_with_extra_slashes_in_path_info(self):
|
request = DummyRequest(
|
{'PATH_INFO': '//hello/', 'SCRIPT_NAME': '/script'}
|
)
|
request.subpath = ('hello',)
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '/script')
|
self.assertEqual(request.environ['PATH_INFO'], '/hello/')
|
|
def test_subpath_path_info_and_script_name_have_utf8(self):
|
encoded = native_(text_(b'La Pe\xc3\xb1a'))
|
decoded = text_(bytes_(encoded), 'utf-8')
|
request = DummyRequest(
|
{'PATH_INFO': '/' + encoded, 'SCRIPT_NAME': '/' + encoded}
|
)
|
request.subpath = (decoded,)
|
response = self._callFUT(request, 'app')
|
self.assertTrue(request.copied)
|
self.assertEqual(response, 'app')
|
self.assertEqual(request.environ['SCRIPT_NAME'], '/' + encoded)
|
self.assertEqual(request.environ['PATH_INFO'], '/' + encoded)
|
|
|
class Test_apply_request_extensions(unittest.TestCase):
|
def setUp(self):
|
self.config = testing.setUp()
|
|
def tearDown(self):
|
testing.tearDown()
|
|
def _callFUT(self, request, extensions=None):
|
from pyramid.request import apply_request_extensions
|
|
return apply_request_extensions(request, extensions=extensions)
|
|
def test_it_with_registry(self):
|
from pyramid.interfaces import IRequestExtensions
|
|
extensions = Dummy()
|
extensions.methods = {'foo': lambda x, y: y}
|
extensions.descriptors = {'bar': property(lambda x: 'bar')}
|
self.config.registry.registerUtility(extensions, IRequestExtensions)
|
request = DummyRequest()
|
request.registry = self.config.registry
|
self._callFUT(request)
|
self.assertEqual(request.bar, 'bar')
|
self.assertEqual(request.foo('abc'), 'abc')
|
|
def test_it_override_extensions(self):
|
from pyramid.interfaces import IRequestExtensions
|
|
ignore = Dummy()
|
ignore.methods = {'x': lambda x, y, z: 'asdf'}
|
ignore.descriptors = {'bar': property(lambda x: 'asdf')}
|
self.config.registry.registerUtility(ignore, IRequestExtensions)
|
request = DummyRequest()
|
request.registry = self.config.registry
|
|
extensions = Dummy()
|
extensions.methods = {'foo': lambda x, y: y}
|
extensions.descriptors = {'bar': property(lambda x: 'bar')}
|
self._callFUT(request, extensions=extensions)
|
self.assertRaises(AttributeError, lambda: request.x)
|
self.assertEqual(request.bar, 'bar')
|
self.assertEqual(request.foo('abc'), 'abc')
|
|
|
class Dummy(object):
|
pass
|
|
|
class Test_subclassing_Request(unittest.TestCase):
|
def test_subclass(self):
|
from pyramid.interfaces import IRequest
|
from pyramid.request import Request
|
|
class RequestSub(Request):
|
pass
|
|
self.assertTrue(hasattr(Request, '__provides__'))
|
self.assertTrue(hasattr(Request, '__implemented__'))
|
self.assertTrue(hasattr(Request, '__providedBy__'))
|
self.assertFalse(hasattr(RequestSub, '__provides__'))
|
self.assertTrue(hasattr(RequestSub, '__providedBy__'))
|
self.assertTrue(hasattr(RequestSub, '__implemented__'))
|
|
self.assertTrue(IRequest.implementedBy(RequestSub))
|
# The call to implementedBy will add __provides__ to the class
|
self.assertTrue(hasattr(RequestSub, '__provides__'))
|
|
def test_subclass_with_implementer(self):
|
from pyramid.interfaces import IRequest
|
from pyramid.request import Request
|
from pyramid.util import InstancePropertyHelper
|
from zope.interface import implementer
|
|
@implementer(IRequest)
|
class RequestSub(Request):
|
pass
|
|
self.assertTrue(hasattr(Request, '__provides__'))
|
self.assertTrue(hasattr(Request, '__implemented__'))
|
self.assertTrue(hasattr(Request, '__providedBy__'))
|
self.assertTrue(hasattr(RequestSub, '__provides__'))
|
self.assertTrue(hasattr(RequestSub, '__providedBy__'))
|
self.assertTrue(hasattr(RequestSub, '__implemented__'))
|
|
req = RequestSub({})
|
helper = InstancePropertyHelper()
|
helper.apply_properties(req, {'b': 'b'})
|
|
self.assertTrue(IRequest.providedBy(req))
|
self.assertTrue(IRequest.implementedBy(RequestSub))
|
|
def test_subclass_mutate_before_providedBy(self):
|
from pyramid.interfaces import IRequest
|
from pyramid.request import Request
|
from pyramid.util import InstancePropertyHelper
|
|
class RequestSub(Request):
|
pass
|
|
req = RequestSub({})
|
helper = InstancePropertyHelper()
|
helper.apply_properties(req, {'b': 'b'})
|
|
self.assertTrue(IRequest.providedBy(req))
|
self.assertTrue(IRequest.implementedBy(RequestSub))
|
|
|
class DummyRequest(object):
|
def __init__(self, environ=None):
|
if environ is None:
|
environ = {}
|
self.environ = environ
|
|
def add_response_callback(self, callback):
|
self.response_callbacks = [callback]
|
|
def get_response(self, app):
|
return app
|
|
def copy(self):
|
self.copied = True
|
return self
|
|
|
class DummyResponse:
|
def __init__(self):
|
self.headerlist = []
|
|
|
class DummyContext:
|
pass
|
|
|
class DummyRoutesMapper:
|
raise_exc = None
|
|
def __init__(self, route=None, raise_exc=False):
|
self.route = route
|
|
def get_route(self, route_name):
|
return self.route
|
|
|
class DummyRoute:
|
pregenerator = None
|
|
def __init__(self, result='/1/2/3'):
|
self.result = result
|
|
def generate(self, kw):
|
self.kw = kw
|
return self.result
|
|
|
class DummyStaticURLInfo:
|
def __init__(self, result):
|
self.result = result
|
|
def generate(self, path, request, **kw):
|
self.args = path, request, kw
|
return self.result
|