| | |
| | | from pyramid import testing |
| | | from pyramid.compat import pickle |
| | | |
| | | class SharedCookieSessionTests(object): |
| | | |
| | | class SharedCookieSessionTests(object): |
| | | def test_ctor_no_cookie(self): |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request) |
| | |
| | | def test_instance_conforms(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import ISession |
| | | |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request) |
| | | verifyObject(ISession, session) |
| | | |
| | | def test_ctor_with_cookie_still_valid(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | | session = self._makeOne(request) |
| | | self.assertEqual(dict(session), {'state':1}) |
| | | self.assertEqual(dict(session), {'state': 1}) |
| | | |
| | | def test_ctor_with_cookie_expired(self): |
| | | request = testing.DummyRequest() |
| | |
| | | |
| | | def test_timeout(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time() - 5, 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_timeout_never(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | LONG_TIME = 31536000 |
| | | cookieval = self._serialize((time.time() + LONG_TIME, 0, {'state': 1})) |
| | |
| | | |
| | | def test_timeout_str(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time() - 5, 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_timeout_invalid(self): |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(ValueError, self._makeOne, request, timeout='Invalid value') |
| | | self.assertRaises( |
| | | ValueError, self._makeOne, request, timeout='Invalid value' |
| | | ) |
| | | |
| | | def test_changed(self): |
| | | request = testing.DummyRequest() |
| | |
| | | |
| | | def test_reissue_triggered(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test__set_cookie_on_exception_no_request_exception(self): |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | request.exception = None |
| | | session = self._makeOne(request) |
| | |
| | | def test__set_cookie_cookieval_too_long(self): |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request) |
| | | session['abc'] = 'x'*100000 |
| | | session['abc'] = 'x' * 100000 |
| | | response = DummyResponse() |
| | | self.assertRaises(ValueError, session._set_cookie, response) |
| | | |
| | | def test__set_cookie_real_webob_response(self): |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request) |
| | | session['abc'] = 'x' |
| | |
| | | |
| | | def test__set_cookie_options(self): |
| | | from pyramid.response import Response |
| | | |
| | | request = testing.DummyRequest() |
| | | request.exception = None |
| | | session = self._makeOne(request, |
| | | cookie_name='abc', |
| | | path='/foo', |
| | | domain='localhost', |
| | | secure=True, |
| | | httponly=True, |
| | | ) |
| | | session = self._makeOne( |
| | | request, |
| | | cookie_name='abc', |
| | | path='/foo', |
| | | domain='localhost', |
| | | secure=True, |
| | | httponly=True, |
| | | ) |
| | | session['abc'] = 'x' |
| | | response = Response() |
| | | self.assertEqual(session._set_cookie(response), True) |
| | | cookieval = response.headerlist[-1][1] |
| | | val, domain, path, secure, httponly, samesite = [x.strip() for x in |
| | | cookieval.split(';')] |
| | | val, domain, path, secure, httponly, samesite = [ |
| | | x.strip() for x in cookieval.split(';') |
| | | ] |
| | | self.assertTrue(val.startswith('abc=')) |
| | | self.assertEqual(domain, 'Domain=localhost') |
| | | self.assertEqual(path, 'Path=/foo') |
| | |
| | | |
| | | def test_no_set_cookie_with_exception(self): |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | request.exception = True |
| | | session = self._makeOne(request, set_on_exception=False) |
| | |
| | | |
| | | def test_set_cookie_with_exception(self): |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | request.exception = True |
| | | session = self._makeOne(request) |
| | |
| | | |
| | | def test_cookie_is_set(self): |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request) |
| | | session['a'] = 1 |
| | |
| | | self.assertEqual(result, None) |
| | | self.assertTrue('Set-Cookie' in dict(response.headerlist)) |
| | | |
| | | |
| | | class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase): |
| | | def _makeOne(self, request, **kw): |
| | | from pyramid.session import BaseCookieSessionFactory |
| | | |
| | | serializer = DummySerializer() |
| | | return BaseCookieSessionFactory(serializer, **kw)(request) |
| | | |
| | |
| | | |
| | | def test_reissue_not_triggered(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_reissue_str_triggered(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_reissue_invalid(self): |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value') |
| | | self.assertRaises( |
| | | ValueError, self._makeOne, request, reissue_time='invalid value' |
| | | ) |
| | | |
| | | def test_cookie_max_age_invalid(self): |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value') |
| | | self.assertRaises( |
| | | ValueError, self._makeOne, request, max_age='invalid value' |
| | | ) |
| | | |
| | | |
| | | class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase): |
| | | def _makeOne(self, request, **kw): |
| | | from pyramid.session import SignedCookieSessionFactory |
| | | |
| | | kw.setdefault('secret', 'secret') |
| | | return SignedCookieSessionFactory(**kw)(request) |
| | | |
| | |
| | | |
| | | def test_reissue_not_triggered(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_reissue_str_triggered(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_reissue_invalid(self): |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value') |
| | | self.assertRaises( |
| | | ValueError, self._makeOne, request, reissue_time='invalid value' |
| | | ) |
| | | |
| | | def test_cookie_max_age_invalid(self): |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value') |
| | | self.assertRaises( |
| | | ValueError, self._makeOne, request, max_age='invalid value' |
| | | ) |
| | | |
| | | def test_custom_salt(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.') |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_salt_mismatch(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.') |
| | | request.cookies['session'] = cookieval |
| | |
| | | |
| | | def test_custom_hashalg(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1}), |
| | | hashalg='sha1') |
| | | cookieval = self._serialize( |
| | | (time.time(), 0, {'state': 1}), hashalg='sha1' |
| | | ) |
| | | request.cookies['session'] = cookieval |
| | | session = self._makeOne(request, hashalg='sha1') |
| | | self.assertEqual(session['state'], 1) |
| | | |
| | | def test_hashalg_mismatch(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1}), |
| | | hashalg='sha1') |
| | | cookieval = self._serialize( |
| | | (time.time(), 0, {'state': 1}), hashalg='sha1' |
| | | ) |
| | | request.cookies['session'] = cookieval |
| | | session = self._makeOne(request, hashalg='sha256') |
| | | self.assertEqual(session, {}) |
| | | |
| | | def test_secret_mismatch(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | cookieval = self._serialize((time.time(), 0, {'state': 1})) |
| | | request.cookies['session'] = cookieval |
| | |
| | | from hashlib import sha512 |
| | | import hmac |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | serializer = DummySerializer() |
| | | cstruct = serializer.dumps((time.time(), 0, {'state': 1})) |
| | |
| | | def test_invalid_data_size(self): |
| | | from hashlib import sha512 |
| | | import base64 |
| | | |
| | | request = testing.DummyRequest() |
| | | num_bytes = sha512().digest_size - 1 |
| | | cookieval = base64.b64encode(b' ' * num_bytes) |
| | |
| | | def test_very_long_key(self): |
| | | verylongkey = b'a' * 1024 |
| | | import webob |
| | | |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request, secret=verylongkey) |
| | | session['a'] = 1 |
| | |
| | | |
| | | try: |
| | | result = callbacks[0](request, response) |
| | | except TypeError: # pragma: no cover |
| | | except TypeError: # pragma: no cover |
| | | self.fail('HMAC failed to initialize due to key length.') |
| | | |
| | | self.assertEqual(result, None) |
| | |
| | | session = self._makeOne(request, secret='secret') |
| | | self.assertEqual(session, {}) |
| | | |
| | | class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase): |
| | | |
| | | class TestUnencryptedCookieSession( |
| | | SharedCookieSessionTests, unittest.TestCase |
| | | ): |
| | | def setUp(self): |
| | | super(TestUnencryptedCookieSession, self).setUp() |
| | | from zope.deprecation import __show__ |
| | | |
| | | __show__.off() |
| | | |
| | | def tearDown(self): |
| | | super(TestUnencryptedCookieSession, self).tearDown() |
| | | from zope.deprecation import __show__ |
| | | |
| | | __show__.on() |
| | | |
| | | |
| | | def _makeOne(self, request, **kw): |
| | | from pyramid.session import UnencryptedCookieSessionFactoryConfig |
| | | |
| | | self._rename_cookie_var(kw, 'path', 'cookie_path') |
| | | self._rename_cookie_var(kw, 'domain', 'cookie_domain') |
| | | self._rename_cookie_var(kw, 'secure', 'cookie_secure') |
| | |
| | | def _serialize(self, value): |
| | | from pyramid.compat import bytes_ |
| | | from pyramid.session import signed_serialize |
| | | |
| | | return bytes_(signed_serialize(value, 'secret')) |
| | | |
| | | def test_serialize_option(self): |
| | | from pyramid.response import Response |
| | | |
| | | secret = 'secret' |
| | | request = testing.DummyRequest() |
| | | session = self._makeOne(request, |
| | | signed_serialize=dummy_signed_serialize) |
| | | session = self._makeOne( |
| | | request, signed_serialize=dummy_signed_serialize |
| | | ) |
| | | session['key'] = 'value' |
| | | response = Response() |
| | | self.assertEqual(session._set_cookie(response), True) |
| | | cookie = response.headerlist[-1][1] |
| | | expected_cookieval = dummy_signed_serialize( |
| | | (session.accessed, session.created, {'key': 'value'}), secret) |
| | | (session.accessed, session.created, {'key': 'value'}), secret |
| | | ) |
| | | response = Response() |
| | | response.set_cookie('session', expected_cookieval, samesite='Lax') |
| | | expected_cookie = response.headerlist[-1][1] |
| | |
| | | |
| | | def test_deserialize_option(self): |
| | | import time |
| | | |
| | | secret = 'secret' |
| | | request = testing.DummyRequest() |
| | | accessed = time.time() |
| | | state = {'key': 'value'} |
| | | cookieval = dummy_signed_serialize((accessed, accessed, state), secret) |
| | | request.cookies['session'] = cookieval |
| | | session = self._makeOne(request, |
| | | signed_deserialize=dummy_signed_deserialize) |
| | | session = self._makeOne( |
| | | request, signed_deserialize=dummy_signed_deserialize |
| | | ) |
| | | self.assertEqual(dict(session), state) |
| | | |
| | | |
| | | def dummy_signed_serialize(data, secret): |
| | | import base64 |
| | | from pyramid.compat import pickle, bytes_ |
| | | |
| | | pickled = pickle.dumps(data) |
| | | return base64.b64encode(bytes_(secret)) + base64.b64encode(pickled) |
| | | |
| | | |
| | | def dummy_signed_deserialize(serialized, secret): |
| | | import base64 |
| | | from pyramid.compat import pickle, bytes_ |
| | | |
| | | serialized_data = base64.b64decode( |
| | | serialized[len(base64.b64encode(bytes_(secret))):]) |
| | | serialized[len(base64.b64encode(bytes_(secret))) :] |
| | | ) |
| | | return pickle.loads(serialized_data) |
| | | |
| | | |
| | | class Test_manage_accessed(unittest.TestCase): |
| | | def _makeOne(self, wrapped): |
| | | from pyramid.session import manage_accessed |
| | | |
| | | return manage_accessed(wrapped) |
| | | |
| | | def test_accessed_set(self): |
| | |
| | | |
| | | def test_accessed_without_renew(self): |
| | | import time |
| | | |
| | | request = testing.DummyRequest() |
| | | session = DummySessionFactory(request) |
| | | session._reissue_time = 5 |
| | |
| | | result = wrapper(session, 'a') |
| | | self.assertEqual(result, 1) |
| | | callbacks = request.response_callbacks |
| | | if callbacks is not None: self.assertEqual(len(callbacks), 0) |
| | | if callbacks is not None: |
| | | self.assertEqual(len(callbacks), 0) |
| | | |
| | | |
| | | class Test_manage_changed(unittest.TestCase): |
| | | def _makeOne(self, wrapped): |
| | | from pyramid.session import manage_changed |
| | | |
| | | return manage_changed(wrapped) |
| | | |
| | | def test_it(self): |
| | |
| | | self.assertNotEqual(session.accessed, None) |
| | | self.assertTrue(session._dirty) |
| | | |
| | | |
| | | def serialize(data, secret): |
| | | import hmac |
| | | import base64 |
| | |
| | | from pyramid.compat import bytes_ |
| | | from pyramid.compat import native_ |
| | | from pyramid.compat import pickle |
| | | |
| | | pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL) |
| | | sig = hmac.new(bytes_(secret, 'utf-8'), pickled, sha1).hexdigest() |
| | | return sig + native_(base64.b64encode(pickled)) |
| | | |
| | | |
| | | class Test_signed_serialize(unittest.TestCase): |
| | | def _callFUT(self, data, secret): |
| | | from pyramid.session import signed_serialize |
| | | |
| | | return signed_serialize(data, secret) |
| | | |
| | | def test_it(self): |
| | |
| | | expected = serialize('123', secret) |
| | | result = self._callFUT('123', secret.decode('latin-1')) |
| | | self.assertEqual(result, expected) |
| | | |
| | | |
| | | |
| | | class Test_signed_deserialize(unittest.TestCase): |
| | | def _callFUT(self, serialized, secret, hmac=None): |
| | | if hmac is None: |
| | | import hmac |
| | | from pyramid.session import signed_deserialize |
| | | |
| | | return signed_deserialize(serialized, secret, hmac=hmac) |
| | | |
| | | def test_it(self): |
| | |
| | | class hmac(object): |
| | | def new(self, *arg): |
| | | return self |
| | | |
| | | def hexdigest(self): |
| | | return '1234' |
| | | |
| | | serialized = serialize('123', 'secret123') |
| | | self.assertRaises(ValueError, self._callFUT, serialized, 'secret', |
| | | hmac=hmac()) |
| | | |
| | | self.assertRaises( |
| | | ValueError, self._callFUT, serialized, 'secret', hmac=hmac() |
| | | ) |
| | | |
| | | def test_it_bad_encoding(self): |
| | | serialized = 'bad' + serialize('123', 'secret') |
| | | self.assertRaises(ValueError, self._callFUT, serialized, 'secret') |
| | |
| | | class TestPickleSerializer(unittest.TestCase): |
| | | def _makeOne(self): |
| | | from pyramid.session import PickleSerializer |
| | | |
| | | return PickleSerializer() |
| | | |
| | | def test_loads(self): |
| | |
| | | except TypeError: |
| | | raise ValueError |
| | | |
| | | |
| | | class DummySessionFactory(dict): |
| | | _dirty = False |
| | | _cookie_name = 'session' |
| | |
| | | def changed(self): |
| | | self._dirty = True |
| | | |
| | | |
| | | class DummyResponse(object): |
| | | def __init__(self): |
| | | self.headerlist = [] |