import base64 import json import unittest from pyramid import testing from pyramid.compat import pickle class SharedCookieSessionTests(object): def test_ctor_no_cookie(self): request = testing.DummyRequest() session = self._makeOne(request) self.assertEqual(dict(session), {}) def test_instance_conforms(self): from zope.interface.verify import verifyObject from pyramid.interfaces import ISession request = testing.DummyRequest() session = self._makeOne(request) verifyObject(ISession, session) def test_ctor_with_cookie_still_valid(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {'state':1}) def test_ctor_with_cookie_expired(self): request = testing.DummyRequest() cookieval = self._serialize((0, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {}) def test_ctor_with_bad_cookie_cannot_deserialize(self): request = testing.DummyRequest() request.cookies['session'] = 'abc' session = self._makeOne(request) self.assertEqual(dict(session), {}) def test_ctor_with_bad_cookie_not_tuple(self): request = testing.DummyRequest() cookieval = self._serialize('abc') request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(dict(session), {}) def test_timeout(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time() - 5, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, timeout=1) self.assertEqual(dict(session), {}) def test_timeout_never(self): import time request = testing.DummyRequest() LONG_TIME = 31536000 cookieval = self._serialize((time.time() + LONG_TIME, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, timeout=None) self.assertEqual(dict(session), {'state': 1}) def test_timeout_str(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time() - 5, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, timeout='1') self.assertEqual(dict(session), {}) def test_timeout_invalid(self): request = testing.DummyRequest() self.assertRaises(ValueError, self._makeOne, request, timeout='Invalid value') def test_changed(self): request = testing.DummyRequest() session = self._makeOne(request) self.assertEqual(session.changed(), None) self.assertTrue(session._dirty) def test_invalidate(self): request = testing.DummyRequest() session = self._makeOne(request) session['a'] = 1 self.assertEqual(session.invalidate(), None) self.assertFalse('a' in session) def test_reissue_triggered(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(session['state'], 1) self.assertTrue(session._dirty) def test__set_cookie_on_exception(self): request = testing.DummyRequest() request.exception = True session = self._makeOne(request) session._cookie_on_exception = False response = DummyResponse() self.assertEqual(session._set_cookie(response), False) def test__set_cookie_on_exception_no_request_exception(self): import webob request = testing.DummyRequest() request.exception = None session = self._makeOne(request) session._cookie_on_exception = False response = webob.Response() self.assertEqual(session._set_cookie(response), True) self.assertEqual(response.headerlist[-1][0], 'Set-Cookie') def test__set_cookie_cookieval_too_long(self): request = testing.DummyRequest() session = self._makeOne(request) session['abc'] = 'x'*100000 response = DummyResponse() self.assertRaises(ValueError, session._set_cookie, response) def test__set_cookie_real_webob_response(self): import webob request = testing.DummyRequest() session = self._makeOne(request) session['abc'] = 'x' response = webob.Response() self.assertEqual(session._set_cookie(response), True) self.assertEqual(response.headerlist[-1][0], 'Set-Cookie') def test__set_cookie_options(self): from pyramid.response import Response request = testing.DummyRequest() request.exception = None session = self._makeOne(request, cookie_name='abc', path='/foo', domain='localhost', secure=True, httponly=True, ) session['abc'] = 'x' response = Response() self.assertEqual(session._set_cookie(response), True) cookieval = response.headerlist[-1][1] val, domain, path, secure, httponly, samesite = [x.strip() for x in cookieval.split(';')] self.assertTrue(val.startswith('abc=')) self.assertEqual(domain, 'Domain=localhost') self.assertEqual(path, 'Path=/foo') self.assertEqual(secure, 'secure') self.assertEqual(httponly, 'HttpOnly') self.assertEqual(samesite, 'SameSite=Lax') def test_flash_default(self): request = testing.DummyRequest() session = self._makeOne(request) session.flash('msg1') session.flash('msg2') self.assertEqual(session['_f_'], ['msg1', 'msg2']) def test_flash_allow_duplicate_false(self): request = testing.DummyRequest() session = self._makeOne(request) session.flash('msg1') session.flash('msg1', allow_duplicate=False) self.assertEqual(session['_f_'], ['msg1']) def test_flash_allow_duplicate_true_and_msg_not_in_storage(self): request = testing.DummyRequest() session = self._makeOne(request) session.flash('msg1', allow_duplicate=True) self.assertEqual(session['_f_'], ['msg1']) def test_flash_allow_duplicate_false_and_msg_not_in_storage(self): request = testing.DummyRequest() session = self._makeOne(request) session.flash('msg1', allow_duplicate=False) self.assertEqual(session['_f_'], ['msg1']) def test_flash_mixed(self): request = testing.DummyRequest() session = self._makeOne(request) session.flash('warn1', 'warn') session.flash('warn2', 'warn') session.flash('err1', 'error') session.flash('err2', 'error') self.assertEqual(session['_f_warn'], ['warn1', 'warn2']) def test_pop_flash_default_queue(self): request = testing.DummyRequest() session = self._makeOne(request) queue = ['one', 'two'] session['_f_'] = queue result = session.pop_flash() self.assertEqual(result, queue) self.assertEqual(session.get('_f_'), None) def test_pop_flash_nodefault_queue(self): request = testing.DummyRequest() session = self._makeOne(request) queue = ['one', 'two'] session['_f_error'] = queue result = session.pop_flash('error') self.assertEqual(result, queue) self.assertEqual(session.get('_f_error'), None) def test_peek_flash_default_queue(self): request = testing.DummyRequest() session = self._makeOne(request) queue = ['one', 'two'] session['_f_'] = queue result = session.peek_flash() self.assertEqual(result, queue) self.assertEqual(session.get('_f_'), queue) def test_peek_flash_nodefault_queue(self): request = testing.DummyRequest() session = self._makeOne(request) queue = ['one', 'two'] session['_f_error'] = queue result = session.peek_flash('error') self.assertEqual(result, queue) self.assertEqual(session.get('_f_error'), queue) def test_new_csrf_token(self): request = testing.DummyRequest() session = self._makeOne(request) token = session.new_csrf_token() self.assertEqual(token, session['_csrft_']) def test_get_csrf_token(self): request = testing.DummyRequest() session = self._makeOne(request) session['_csrft_'] = 'token' token = session.get_csrf_token() self.assertEqual(token, 'token') self.assertTrue('_csrft_' in session) def test_get_csrf_token_new(self): request = testing.DummyRequest() session = self._makeOne(request) token = session.get_csrf_token() self.assertTrue(token) self.assertTrue('_csrft_' in session) def test_no_set_cookie_with_exception(self): import webob request = testing.DummyRequest() request.exception = True session = self._makeOne(request, set_on_exception=False) session['a'] = 1 callbacks = request.response_callbacks self.assertEqual(len(callbacks), 1) response = webob.Response() result = callbacks[0](request, response) self.assertEqual(result, None) self.assertFalse('Set-Cookie' in dict(response.headerlist)) def test_set_cookie_with_exception(self): import webob request = testing.DummyRequest() request.exception = True session = self._makeOne(request) session['a'] = 1 callbacks = request.response_callbacks self.assertEqual(len(callbacks), 1) response = webob.Response() result = callbacks[0](request, response) self.assertEqual(result, None) self.assertTrue('Set-Cookie' in dict(response.headerlist)) def test_cookie_is_set(self): import webob request = testing.DummyRequest() session = self._makeOne(request) session['a'] = 1 callbacks = request.response_callbacks self.assertEqual(len(callbacks), 1) response = webob.Response() result = callbacks[0](request, response) self.assertEqual(result, None) self.assertTrue('Set-Cookie' in dict(response.headerlist)) class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase): def _makeOne(self, request, **kw): from pyramid.session import BaseCookieSessionFactory serializer = DummySerializer() return BaseCookieSessionFactory(serializer, **kw)(request) def _serialize(self, value): return base64.b64encode(json.dumps(value).encode('utf-8')) def test_reissue_not_triggered(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time=1) self.assertEqual(session['state'], 1) self.assertFalse(session._dirty) def test_reissue_never(self): request = testing.DummyRequest() cookieval = self._serialize((0, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time=None, timeout=None) self.assertEqual(session['state'], 1) self.assertFalse(session._dirty) def test_reissue_str_triggered(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time='0') self.assertEqual(session['state'], 1) self.assertTrue(session._dirty) def test_reissue_invalid(self): request = testing.DummyRequest() self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value') def test_cookie_max_age_invalid(self): request = testing.DummyRequest() self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value') class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase): def _makeOne(self, request, **kw): from pyramid.session import SignedCookieSessionFactory kw.setdefault('secret', 'secret') return SignedCookieSessionFactory(**kw)(request) def _serialize(self, value, salt=b'pyramid.session.', hashalg='sha512'): import base64 import hashlib import hmac import pickle digestmod = lambda: hashlib.new(hashalg) cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL) sig = hmac.new(salt + b'secret', cstruct, digestmod).digest() return base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=') def test_reissue_not_triggered(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time=1) self.assertEqual(session['state'], 1) self.assertFalse(session._dirty) def test_reissue_never(self): request = testing.DummyRequest() cookieval = self._serialize((0, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time=None, timeout=None) self.assertEqual(session['state'], 1) self.assertFalse(session._dirty) def test_reissue_str_triggered(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time() - 2, 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, reissue_time='0') self.assertEqual(session['state'], 1) self.assertTrue(session._dirty) def test_reissue_invalid(self): request = testing.DummyRequest() self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value') def test_cookie_max_age_invalid(self): request = testing.DummyRequest() self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value') def test_custom_salt(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.') request.cookies['session'] = cookieval session = self._makeOne(request, salt=b'f.') self.assertEqual(session['state'], 1) def test_salt_mismatch(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.') request.cookies['session'] = cookieval session = self._makeOne(request, salt=b'g.') self.assertEqual(session, {}) def test_custom_hashalg(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1}), hashalg='sha1') request.cookies['session'] = cookieval session = self._makeOne(request, hashalg='sha1') self.assertEqual(session['state'], 1) def test_hashalg_mismatch(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1}), hashalg='sha1') request.cookies['session'] = cookieval session = self._makeOne(request, hashalg='sha256') self.assertEqual(session, {}) def test_secret_mismatch(self): import time request = testing.DummyRequest() cookieval = self._serialize((time.time(), 0, {'state': 1})) request.cookies['session'] = cookieval session = self._makeOne(request, secret='evilsecret') self.assertEqual(session, {}) def test_custom_serializer(self): import base64 from hashlib import sha512 import hmac import time request = testing.DummyRequest() serializer = DummySerializer() cstruct = serializer.dumps((time.time(), 0, {'state': 1})) sig = hmac.new(b'pyramid.session.secret', cstruct, sha512).digest() cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=') request.cookies['session'] = cookieval session = self._makeOne(request, serializer=serializer) self.assertEqual(session['state'], 1) def test_invalid_data_size(self): from hashlib import sha512 import base64 request = testing.DummyRequest() num_bytes = sha512().digest_size - 1 cookieval = base64.b64encode(b' ' * num_bytes) request.cookies['session'] = cookieval session = self._makeOne(request) self.assertEqual(session, {}) def test_very_long_key(self): verylongkey = b'a' * 1024 import webob request = testing.DummyRequest() session = self._makeOne(request, secret=verylongkey) session['a'] = 1 callbacks = request.response_callbacks self.assertEqual(len(callbacks), 1) response = webob.Response() try: result = callbacks[0](request, response) except TypeError: # pragma: no cover self.fail('HMAC failed to initialize due to key length.') self.assertEqual(result, None) self.assertTrue('Set-Cookie' in dict(response.headerlist)) def test_bad_pickle(self): import base64 import hashlib import hmac digestmod = lambda: hashlib.new('sha512') # generated from dumping an object that cannot be found anymore, eg: # class Foo: pass # print(pickle.dumps(Foo())) cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.' sig = hmac.new(b'pyramid.session.secret', cstruct, digestmod).digest() cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=') request = testing.DummyRequest() request.cookies['session'] = cookieval session = self._makeOne(request, secret='secret') self.assertEqual(session, {}) class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase): def setUp(self): super(TestUnencryptedCookieSession, self).setUp() from zope.deprecation import __show__ __show__.off() def tearDown(self): super(TestUnencryptedCookieSession, self).tearDown() from zope.deprecation import __show__ __show__.on() def _makeOne(self, request, **kw): from pyramid.session import UnencryptedCookieSessionFactoryConfig self._rename_cookie_var(kw, 'path', 'cookie_path') self._rename_cookie_var(kw, 'domain', 'cookie_domain') self._rename_cookie_var(kw, 'secure', 'cookie_secure') self._rename_cookie_var(kw, 'httponly', 'cookie_httponly') self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception') return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request) def _rename_cookie_var(self, kw, src, dest): if src in kw: kw.setdefault(dest, kw.pop(src)) def _serialize(self, value): from pyramid.compat import bytes_ from pyramid.session import signed_serialize return bytes_(signed_serialize(value, 'secret')) def test_serialize_option(self): from pyramid.response import Response secret = 'secret' request = testing.DummyRequest() session = self._makeOne(request, signed_serialize=dummy_signed_serialize) session['key'] = 'value' response = Response() self.assertEqual(session._set_cookie(response), True) cookie = response.headerlist[-1][1] expected_cookieval = dummy_signed_serialize( (session.accessed, session.created, {'key': 'value'}), secret) response = Response() response.set_cookie('session', expected_cookieval, samesite='Lax') expected_cookie = response.headerlist[-1][1] self.assertEqual(cookie, expected_cookie) def test_deserialize_option(self): import time secret = 'secret' request = testing.DummyRequest() accessed = time.time() state = {'key': 'value'} cookieval = dummy_signed_serialize((accessed, accessed, state), secret) request.cookies['session'] = cookieval session = self._makeOne(request, signed_deserialize=dummy_signed_deserialize) self.assertEqual(dict(session), state) def dummy_signed_serialize(data, secret): import base64 from pyramid.compat import pickle, bytes_ pickled = pickle.dumps(data) return base64.b64encode(bytes_(secret)) + base64.b64encode(pickled) def dummy_signed_deserialize(serialized, secret): import base64 from pyramid.compat import pickle, bytes_ serialized_data = base64.b64decode( serialized[len(base64.b64encode(bytes_(secret))):]) return pickle.loads(serialized_data) class Test_manage_accessed(unittest.TestCase): def _makeOne(self, wrapped): from pyramid.session import manage_accessed return manage_accessed(wrapped) def test_accessed_set(self): request = testing.DummyRequest() session = DummySessionFactory(request) session.renewed = 0 wrapper = self._makeOne(session.__class__.get) wrapper(session, 'a') self.assertNotEqual(session.accessed, None) self.assertTrue(session._dirty) def test_accessed_without_renew(self): import time request = testing.DummyRequest() session = DummySessionFactory(request) session._reissue_time = 5 session.renewed = time.time() wrapper = self._makeOne(session.__class__.get) wrapper(session, 'a') self.assertNotEqual(session.accessed, None) self.assertFalse(session._dirty) def test_already_dirty(self): request = testing.DummyRequest() session = DummySessionFactory(request) session.renewed = 0 session._dirty = True session['a'] = 1 wrapper = self._makeOne(session.__class__.get) self.assertEqual(wrapper.__doc__, session.get.__doc__) result = wrapper(session, 'a') self.assertEqual(result, 1) callbacks = request.response_callbacks if callbacks is not None: self.assertEqual(len(callbacks), 0) class Test_manage_changed(unittest.TestCase): def _makeOne(self, wrapped): from pyramid.session import manage_changed return manage_changed(wrapped) def test_it(self): request = testing.DummyRequest() session = DummySessionFactory(request) wrapper = self._makeOne(session.__class__.__setitem__) wrapper(session, 'a', 1) self.assertNotEqual(session.accessed, None) self.assertTrue(session._dirty) def serialize(data, secret): import hmac import base64 from hashlib import sha1 from pyramid.compat import bytes_ from pyramid.compat import native_ from pyramid.compat import pickle pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL) sig = hmac.new(bytes_(secret, 'utf-8'), pickled, sha1).hexdigest() return sig + native_(base64.b64encode(pickled)) class Test_signed_serialize(unittest.TestCase): def _callFUT(self, data, secret): from pyramid.session import signed_serialize return signed_serialize(data, secret) def test_it(self): expected = serialize('123', 'secret') result = self._callFUT('123', 'secret') self.assertEqual(result, expected) def test_it_with_highorder_secret(self): secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8') expected = serialize('123', secret) result = self._callFUT('123', secret) self.assertEqual(result, expected) def test_it_with_latin1_secret(self): secret = b'La Pe\xc3\xb1a' expected = serialize('123', secret) result = self._callFUT('123', secret.decode('latin-1')) self.assertEqual(result, expected) class Test_signed_deserialize(unittest.TestCase): def _callFUT(self, serialized, secret, hmac=None): if hmac is None: import hmac from pyramid.session import signed_deserialize return signed_deserialize(serialized, secret, hmac=hmac) def test_it(self): serialized = serialize('123', 'secret') result = self._callFUT(serialized, 'secret') self.assertEqual(result, '123') def test_invalid_bits(self): serialized = serialize('123', 'secret') self.assertRaises(ValueError, self._callFUT, serialized, 'seekrit') def test_invalid_len(self): class hmac(object): def new(self, *arg): return self def hexdigest(self): return '1234' serialized = serialize('123', 'secret123') self.assertRaises(ValueError, self._callFUT, serialized, 'secret', hmac=hmac()) def test_it_bad_encoding(self): serialized = 'bad' + serialize('123', 'secret') self.assertRaises(ValueError, self._callFUT, serialized, 'secret') def test_it_with_highorder_secret(self): secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8') serialized = serialize('123', secret) result = self._callFUT(serialized, secret) self.assertEqual(result, '123') # bwcompat with pyramid <= 1.5b1 where latin1 is the default def test_it_with_latin1_secret(self): secret = b'La Pe\xc3\xb1a' serialized = serialize('123', secret) result = self._callFUT(serialized, secret.decode('latin-1')) self.assertEqual(result, '123') class TestPickleSerializer(unittest.TestCase): def _makeOne(self): from pyramid.session import PickleSerializer return PickleSerializer() def test_loads(self): # generated from dumping Dummy() using protocol=2 cstruct = b'\x80\x02ctests.test_session\nDummy\nq\x00)\x81q\x01.' serializer = self._makeOne() result = serializer.loads(cstruct) self.assertIsInstance(result, Dummy) def test_loads_raises_ValueError_on_invalid_data(self): cstruct = b'not pickled' serializer = self._makeOne() self.assertRaises(ValueError, serializer.loads, cstruct) def test_loads_raises_ValueError_on_bad_import(self): # generated from dumping an object that cannot be found anymore, eg: # class Foo: pass # print(pickle.dumps(Foo())) cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.' serializer = self._makeOne() self.assertRaises(ValueError, serializer.loads, cstruct) def test_dumps(self): obj = Dummy() serializer = self._makeOne() result = serializer.dumps(obj) expected_result = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL) self.assertEqual(result, expected_result) self.assertIsInstance(result, bytes) class Dummy(object): pass class DummySerializer(object): def dumps(self, value): return base64.b64encode(json.dumps(value).encode('utf-8')) def loads(self, value): try: return json.loads(base64.b64decode(value).decode('utf-8')) # base64.b64decode raises a TypeError on py2 instead of a ValueError # and a ValueError is required for the session to handle it properly except TypeError: raise ValueError class DummySessionFactory(dict): _dirty = False _cookie_name = 'session' _cookie_max_age = None _cookie_path = '/' _cookie_domain = None _cookie_secure = False _cookie_httponly = False _timeout = 1200 _reissue_time = 0 def __init__(self, request): self.request = request dict.__init__(self, {}) def changed(self): self._dirty = True class DummyResponse(object): def __init__(self): self.headerlist = []