import base64
|
import json
|
import unittest
|
from pyramid import testing
|
from pyramid.compat import pickle
|
|
|
class SharedCookieSessionTests(object):
|
def test_ctor_no_cookie(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
self.assertEqual(dict(session), {})
|
|
def test_instance_conforms(self):
|
from zope.interface.verify import verifyObject
|
from pyramid.interfaces import ISession
|
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
verifyObject(ISession, session)
|
|
def test_ctor_with_cookie_still_valid(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request)
|
self.assertEqual(dict(session), {'state': 1})
|
|
def test_ctor_with_cookie_expired(self):
|
request = testing.DummyRequest()
|
cookieval = self._serialize((0, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request)
|
self.assertEqual(dict(session), {})
|
|
def test_ctor_with_bad_cookie_cannot_deserialize(self):
|
request = testing.DummyRequest()
|
request.cookies['session'] = 'abc'
|
session = self._makeOne(request)
|
self.assertEqual(dict(session), {})
|
|
def test_ctor_with_bad_cookie_not_tuple(self):
|
request = testing.DummyRequest()
|
cookieval = self._serialize('abc')
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request)
|
self.assertEqual(dict(session), {})
|
|
def test_timeout(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, timeout=1)
|
self.assertEqual(dict(session), {})
|
|
def test_timeout_never(self):
|
import time
|
|
request = testing.DummyRequest()
|
LONG_TIME = 31536000
|
cookieval = self._serialize((time.time() + LONG_TIME, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, timeout=None)
|
self.assertEqual(dict(session), {'state': 1})
|
|
def test_timeout_str(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, timeout='1')
|
self.assertEqual(dict(session), {})
|
|
def test_timeout_invalid(self):
|
request = testing.DummyRequest()
|
self.assertRaises(
|
ValueError, self._makeOne, request, timeout='Invalid value'
|
)
|
|
def test_changed(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
self.assertEqual(session.changed(), None)
|
self.assertTrue(session._dirty)
|
|
def test_invalidate(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session['a'] = 1
|
self.assertEqual(session.invalidate(), None)
|
self.assertFalse('a' in session)
|
|
def test_reissue_triggered(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request)
|
self.assertEqual(session['state'], 1)
|
self.assertTrue(session._dirty)
|
|
def test__set_cookie_on_exception(self):
|
request = testing.DummyRequest()
|
request.exception = True
|
session = self._makeOne(request)
|
session._cookie_on_exception = False
|
response = DummyResponse()
|
self.assertEqual(session._set_cookie(response), False)
|
|
def test__set_cookie_on_exception_no_request_exception(self):
|
import webob
|
|
request = testing.DummyRequest()
|
request.exception = None
|
session = self._makeOne(request)
|
session._cookie_on_exception = False
|
response = webob.Response()
|
self.assertEqual(session._set_cookie(response), True)
|
self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
|
|
def test__set_cookie_cookieval_too_long(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session['abc'] = 'x' * 100000
|
response = DummyResponse()
|
self.assertRaises(ValueError, session._set_cookie, response)
|
|
def test__set_cookie_real_webob_response(self):
|
import webob
|
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session['abc'] = 'x'
|
response = webob.Response()
|
self.assertEqual(session._set_cookie(response), True)
|
self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
|
|
def test__set_cookie_options(self):
|
from pyramid.response import Response
|
|
request = testing.DummyRequest()
|
request.exception = None
|
session = self._makeOne(
|
request,
|
cookie_name='abc',
|
path='/foo',
|
domain='localhost',
|
secure=True,
|
httponly=True,
|
)
|
session['abc'] = 'x'
|
response = Response()
|
self.assertEqual(session._set_cookie(response), True)
|
cookieval = response.headerlist[-1][1]
|
val, domain, path, secure, httponly, samesite = [
|
x.strip() for x in cookieval.split(';')
|
]
|
self.assertTrue(val.startswith('abc='))
|
self.assertEqual(domain, 'Domain=localhost')
|
self.assertEqual(path, 'Path=/foo')
|
self.assertEqual(secure, 'secure')
|
self.assertEqual(httponly, 'HttpOnly')
|
self.assertEqual(samesite, 'SameSite=Lax')
|
|
def test_flash_default(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session.flash('msg1')
|
session.flash('msg2')
|
self.assertEqual(session['_f_'], ['msg1', 'msg2'])
|
|
def test_flash_allow_duplicate_false(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session.flash('msg1')
|
session.flash('msg1', allow_duplicate=False)
|
self.assertEqual(session['_f_'], ['msg1'])
|
|
def test_flash_allow_duplicate_true_and_msg_not_in_storage(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session.flash('msg1', allow_duplicate=True)
|
self.assertEqual(session['_f_'], ['msg1'])
|
|
def test_flash_allow_duplicate_false_and_msg_not_in_storage(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session.flash('msg1', allow_duplicate=False)
|
self.assertEqual(session['_f_'], ['msg1'])
|
|
def test_flash_mixed(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session.flash('warn1', 'warn')
|
session.flash('warn2', 'warn')
|
session.flash('err1', 'error')
|
session.flash('err2', 'error')
|
self.assertEqual(session['_f_warn'], ['warn1', 'warn2'])
|
|
def test_pop_flash_default_queue(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
queue = ['one', 'two']
|
session['_f_'] = queue
|
result = session.pop_flash()
|
self.assertEqual(result, queue)
|
self.assertEqual(session.get('_f_'), None)
|
|
def test_pop_flash_nodefault_queue(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
queue = ['one', 'two']
|
session['_f_error'] = queue
|
result = session.pop_flash('error')
|
self.assertEqual(result, queue)
|
self.assertEqual(session.get('_f_error'), None)
|
|
def test_peek_flash_default_queue(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
queue = ['one', 'two']
|
session['_f_'] = queue
|
result = session.peek_flash()
|
self.assertEqual(result, queue)
|
self.assertEqual(session.get('_f_'), queue)
|
|
def test_peek_flash_nodefault_queue(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
queue = ['one', 'two']
|
session['_f_error'] = queue
|
result = session.peek_flash('error')
|
self.assertEqual(result, queue)
|
self.assertEqual(session.get('_f_error'), queue)
|
|
def test_new_csrf_token(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
token = session.new_csrf_token()
|
self.assertEqual(token, session['_csrft_'])
|
|
def test_get_csrf_token(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session['_csrft_'] = 'token'
|
token = session.get_csrf_token()
|
self.assertEqual(token, 'token')
|
self.assertTrue('_csrft_' in session)
|
|
def test_get_csrf_token_new(self):
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
token = session.get_csrf_token()
|
self.assertTrue(token)
|
self.assertTrue('_csrft_' in session)
|
|
def test_no_set_cookie_with_exception(self):
|
import webob
|
|
request = testing.DummyRequest()
|
request.exception = True
|
session = self._makeOne(request, set_on_exception=False)
|
session['a'] = 1
|
callbacks = request.response_callbacks
|
self.assertEqual(len(callbacks), 1)
|
response = webob.Response()
|
result = callbacks[0](request, response)
|
self.assertEqual(result, None)
|
self.assertFalse('Set-Cookie' in dict(response.headerlist))
|
|
def test_set_cookie_with_exception(self):
|
import webob
|
|
request = testing.DummyRequest()
|
request.exception = True
|
session = self._makeOne(request)
|
session['a'] = 1
|
callbacks = request.response_callbacks
|
self.assertEqual(len(callbacks), 1)
|
response = webob.Response()
|
result = callbacks[0](request, response)
|
self.assertEqual(result, None)
|
self.assertTrue('Set-Cookie' in dict(response.headerlist))
|
|
def test_cookie_is_set(self):
|
import webob
|
|
request = testing.DummyRequest()
|
session = self._makeOne(request)
|
session['a'] = 1
|
callbacks = request.response_callbacks
|
self.assertEqual(len(callbacks), 1)
|
response = webob.Response()
|
result = callbacks[0](request, response)
|
self.assertEqual(result, None)
|
self.assertTrue('Set-Cookie' in dict(response.headerlist))
|
|
|
class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
|
def _makeOne(self, request, **kw):
|
from pyramid.session import BaseCookieSessionFactory
|
|
serializer = DummySerializer()
|
return BaseCookieSessionFactory(serializer, **kw)(request)
|
|
def _serialize(self, value):
|
return base64.b64encode(json.dumps(value).encode('utf-8'))
|
|
def test_reissue_not_triggered(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time=1)
|
self.assertEqual(session['state'], 1)
|
self.assertFalse(session._dirty)
|
|
def test_reissue_never(self):
|
request = testing.DummyRequest()
|
cookieval = self._serialize((0, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time=None, timeout=None)
|
self.assertEqual(session['state'], 1)
|
self.assertFalse(session._dirty)
|
|
def test_reissue_str_triggered(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time='0')
|
self.assertEqual(session['state'], 1)
|
self.assertTrue(session._dirty)
|
|
def test_reissue_invalid(self):
|
request = testing.DummyRequest()
|
self.assertRaises(
|
ValueError, self._makeOne, request, reissue_time='invalid value'
|
)
|
|
def test_cookie_max_age_invalid(self):
|
request = testing.DummyRequest()
|
self.assertRaises(
|
ValueError, self._makeOne, request, max_age='invalid value'
|
)
|
|
|
class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
|
def _makeOne(self, request, **kw):
|
from pyramid.session import SignedCookieSessionFactory
|
|
kw.setdefault('secret', 'secret')
|
return SignedCookieSessionFactory(**kw)(request)
|
|
def _serialize(self, value, salt=b'pyramid.session.', hashalg='sha512'):
|
import base64
|
import hashlib
|
import hmac
|
import pickle
|
|
digestmod = lambda: hashlib.new(hashalg)
|
cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
|
sig = hmac.new(salt + b'secret', cstruct, digestmod).digest()
|
return base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
|
|
def test_reissue_not_triggered(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time=1)
|
self.assertEqual(session['state'], 1)
|
self.assertFalse(session._dirty)
|
|
def test_reissue_never(self):
|
request = testing.DummyRequest()
|
cookieval = self._serialize((0, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time=None, timeout=None)
|
self.assertEqual(session['state'], 1)
|
self.assertFalse(session._dirty)
|
|
def test_reissue_str_triggered(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, reissue_time='0')
|
self.assertEqual(session['state'], 1)
|
self.assertTrue(session._dirty)
|
|
def test_reissue_invalid(self):
|
request = testing.DummyRequest()
|
self.assertRaises(
|
ValueError, self._makeOne, request, reissue_time='invalid value'
|
)
|
|
def test_cookie_max_age_invalid(self):
|
request = testing.DummyRequest()
|
self.assertRaises(
|
ValueError, self._makeOne, request, max_age='invalid value'
|
)
|
|
def test_custom_salt(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, salt=b'f.')
|
self.assertEqual(session['state'], 1)
|
|
def test_salt_mismatch(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, salt=b'g.')
|
self.assertEqual(session, {})
|
|
def test_custom_hashalg(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize(
|
(time.time(), 0, {'state': 1}), hashalg='sha1'
|
)
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, hashalg='sha1')
|
self.assertEqual(session['state'], 1)
|
|
def test_hashalg_mismatch(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize(
|
(time.time(), 0, {'state': 1}), hashalg='sha1'
|
)
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, hashalg='sha256')
|
self.assertEqual(session, {})
|
|
def test_secret_mismatch(self):
|
import time
|
|
request = testing.DummyRequest()
|
cookieval = self._serialize((time.time(), 0, {'state': 1}))
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, secret='evilsecret')
|
self.assertEqual(session, {})
|
|
def test_custom_serializer(self):
|
import base64
|
from hashlib import sha512
|
import hmac
|
import time
|
|
request = testing.DummyRequest()
|
serializer = DummySerializer()
|
cstruct = serializer.dumps((time.time(), 0, {'state': 1}))
|
sig = hmac.new(b'pyramid.session.secret', cstruct, sha512).digest()
|
cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, serializer=serializer)
|
self.assertEqual(session['state'], 1)
|
|
def test_invalid_data_size(self):
|
from hashlib import sha512
|
import base64
|
|
request = testing.DummyRequest()
|
num_bytes = sha512().digest_size - 1
|
cookieval = base64.b64encode(b' ' * num_bytes)
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request)
|
self.assertEqual(session, {})
|
|
def test_very_long_key(self):
|
verylongkey = b'a' * 1024
|
import webob
|
|
request = testing.DummyRequest()
|
session = self._makeOne(request, secret=verylongkey)
|
session['a'] = 1
|
callbacks = request.response_callbacks
|
self.assertEqual(len(callbacks), 1)
|
response = webob.Response()
|
|
try:
|
result = callbacks[0](request, response)
|
except TypeError: # pragma: no cover
|
self.fail('HMAC failed to initialize due to key length.')
|
|
self.assertEqual(result, None)
|
self.assertTrue('Set-Cookie' in dict(response.headerlist))
|
|
def test_bad_pickle(self):
|
import base64
|
import hashlib
|
import hmac
|
|
digestmod = lambda: hashlib.new('sha512')
|
# generated from dumping an object that cannot be found anymore, eg:
|
# class Foo: pass
|
# print(pickle.dumps(Foo()))
|
cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
|
sig = hmac.new(b'pyramid.session.secret', cstruct, digestmod).digest()
|
cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
|
|
request = testing.DummyRequest()
|
request.cookies['session'] = cookieval
|
session = self._makeOne(request, secret='secret')
|
self.assertEqual(session, {})
|
|
|
class TestUnencryptedCookieSession(
|
SharedCookieSessionTests, unittest.TestCase
|
):
|
def setUp(self):
|
super(TestUnencryptedCookieSession, self).setUp()
|
from zope.deprecation import __show__
|
|
__show__.off()
|
|
def tearDown(self):
|
super(TestUnencryptedCookieSession, self).tearDown()
|
from zope.deprecation import __show__
|
|
__show__.on()
|
|
def _makeOne(self, request, **kw):
|
from pyramid.session import UnencryptedCookieSessionFactoryConfig
|
|
self._rename_cookie_var(kw, 'path', 'cookie_path')
|
self._rename_cookie_var(kw, 'domain', 'cookie_domain')
|
self._rename_cookie_var(kw, 'secure', 'cookie_secure')
|
self._rename_cookie_var(kw, 'httponly', 'cookie_httponly')
|
self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception')
|
return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request)
|
|
def _rename_cookie_var(self, kw, src, dest):
|
if src in kw:
|
kw.setdefault(dest, kw.pop(src))
|
|
def _serialize(self, value):
|
from pyramid.compat import bytes_
|
from pyramid.session import signed_serialize
|
|
return bytes_(signed_serialize(value, 'secret'))
|
|
def test_serialize_option(self):
|
from pyramid.response import Response
|
|
secret = 'secret'
|
request = testing.DummyRequest()
|
session = self._makeOne(
|
request, signed_serialize=dummy_signed_serialize
|
)
|
session['key'] = 'value'
|
response = Response()
|
self.assertEqual(session._set_cookie(response), True)
|
cookie = response.headerlist[-1][1]
|
expected_cookieval = dummy_signed_serialize(
|
(session.accessed, session.created, {'key': 'value'}), secret
|
)
|
response = Response()
|
response.set_cookie('session', expected_cookieval, samesite='Lax')
|
expected_cookie = response.headerlist[-1][1]
|
self.assertEqual(cookie, expected_cookie)
|
|
def test_deserialize_option(self):
|
import time
|
|
secret = 'secret'
|
request = testing.DummyRequest()
|
accessed = time.time()
|
state = {'key': 'value'}
|
cookieval = dummy_signed_serialize((accessed, accessed, state), secret)
|
request.cookies['session'] = cookieval
|
session = self._makeOne(
|
request, signed_deserialize=dummy_signed_deserialize
|
)
|
self.assertEqual(dict(session), state)
|
|
|
def dummy_signed_serialize(data, secret):
|
import base64
|
from pyramid.compat import pickle, bytes_
|
|
pickled = pickle.dumps(data)
|
return base64.b64encode(bytes_(secret)) + base64.b64encode(pickled)
|
|
|
def dummy_signed_deserialize(serialized, secret):
|
import base64
|
from pyramid.compat import pickle, bytes_
|
|
serialized_data = base64.b64decode(
|
serialized[len(base64.b64encode(bytes_(secret))) :]
|
)
|
return pickle.loads(serialized_data)
|
|
|
class Test_manage_accessed(unittest.TestCase):
|
def _makeOne(self, wrapped):
|
from pyramid.session import manage_accessed
|
|
return manage_accessed(wrapped)
|
|
def test_accessed_set(self):
|
request = testing.DummyRequest()
|
session = DummySessionFactory(request)
|
session.renewed = 0
|
wrapper = self._makeOne(session.__class__.get)
|
wrapper(session, 'a')
|
self.assertNotEqual(session.accessed, None)
|
self.assertTrue(session._dirty)
|
|
def test_accessed_without_renew(self):
|
import time
|
|
request = testing.DummyRequest()
|
session = DummySessionFactory(request)
|
session._reissue_time = 5
|
session.renewed = time.time()
|
wrapper = self._makeOne(session.__class__.get)
|
wrapper(session, 'a')
|
self.assertNotEqual(session.accessed, None)
|
self.assertFalse(session._dirty)
|
|
def test_already_dirty(self):
|
request = testing.DummyRequest()
|
session = DummySessionFactory(request)
|
session.renewed = 0
|
session._dirty = True
|
session['a'] = 1
|
wrapper = self._makeOne(session.__class__.get)
|
self.assertEqual(wrapper.__doc__, session.get.__doc__)
|
result = wrapper(session, 'a')
|
self.assertEqual(result, 1)
|
callbacks = request.response_callbacks
|
if callbacks is not None:
|
self.assertEqual(len(callbacks), 0)
|
|
|
class Test_manage_changed(unittest.TestCase):
|
def _makeOne(self, wrapped):
|
from pyramid.session import manage_changed
|
|
return manage_changed(wrapped)
|
|
def test_it(self):
|
request = testing.DummyRequest()
|
session = DummySessionFactory(request)
|
wrapper = self._makeOne(session.__class__.__setitem__)
|
wrapper(session, 'a', 1)
|
self.assertNotEqual(session.accessed, None)
|
self.assertTrue(session._dirty)
|
|
|
def serialize(data, secret):
|
import hmac
|
import base64
|
from hashlib import sha1
|
from pyramid.compat import bytes_
|
from pyramid.compat import native_
|
from pyramid.compat import pickle
|
|
pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
|
sig = hmac.new(bytes_(secret, 'utf-8'), pickled, sha1).hexdigest()
|
return sig + native_(base64.b64encode(pickled))
|
|
|
class Test_signed_serialize(unittest.TestCase):
|
def _callFUT(self, data, secret):
|
from pyramid.session import signed_serialize
|
|
return signed_serialize(data, secret)
|
|
def test_it(self):
|
expected = serialize('123', 'secret')
|
result = self._callFUT('123', 'secret')
|
self.assertEqual(result, expected)
|
|
def test_it_with_highorder_secret(self):
|
secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
|
expected = serialize('123', secret)
|
result = self._callFUT('123', secret)
|
self.assertEqual(result, expected)
|
|
def test_it_with_latin1_secret(self):
|
secret = b'La Pe\xc3\xb1a'
|
expected = serialize('123', secret)
|
result = self._callFUT('123', secret.decode('latin-1'))
|
self.assertEqual(result, expected)
|
|
|
class Test_signed_deserialize(unittest.TestCase):
|
def _callFUT(self, serialized, secret, hmac=None):
|
if hmac is None:
|
import hmac
|
from pyramid.session import signed_deserialize
|
|
return signed_deserialize(serialized, secret, hmac=hmac)
|
|
def test_it(self):
|
serialized = serialize('123', 'secret')
|
result = self._callFUT(serialized, 'secret')
|
self.assertEqual(result, '123')
|
|
def test_invalid_bits(self):
|
serialized = serialize('123', 'secret')
|
self.assertRaises(ValueError, self._callFUT, serialized, 'seekrit')
|
|
def test_invalid_len(self):
|
class hmac(object):
|
def new(self, *arg):
|
return self
|
|
def hexdigest(self):
|
return '1234'
|
|
serialized = serialize('123', 'secret123')
|
self.assertRaises(
|
ValueError, self._callFUT, serialized, 'secret', hmac=hmac()
|
)
|
|
def test_it_bad_encoding(self):
|
serialized = 'bad' + serialize('123', 'secret')
|
self.assertRaises(ValueError, self._callFUT, serialized, 'secret')
|
|
def test_it_with_highorder_secret(self):
|
secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
|
serialized = serialize('123', secret)
|
result = self._callFUT(serialized, secret)
|
self.assertEqual(result, '123')
|
|
# bwcompat with pyramid <= 1.5b1 where latin1 is the default
|
def test_it_with_latin1_secret(self):
|
secret = b'La Pe\xc3\xb1a'
|
serialized = serialize('123', secret)
|
result = self._callFUT(serialized, secret.decode('latin-1'))
|
self.assertEqual(result, '123')
|
|
|
class TestPickleSerializer(unittest.TestCase):
|
def _makeOne(self):
|
from pyramid.session import PickleSerializer
|
|
return PickleSerializer()
|
|
def test_loads(self):
|
# generated from dumping Dummy() using protocol=2
|
cstruct = b'\x80\x02ctests.test_session\nDummy\nq\x00)\x81q\x01.'
|
serializer = self._makeOne()
|
result = serializer.loads(cstruct)
|
self.assertIsInstance(result, Dummy)
|
|
def test_loads_raises_ValueError_on_invalid_data(self):
|
cstruct = b'not pickled'
|
serializer = self._makeOne()
|
self.assertRaises(ValueError, serializer.loads, cstruct)
|
|
def test_loads_raises_ValueError_on_bad_import(self):
|
# generated from dumping an object that cannot be found anymore, eg:
|
# class Foo: pass
|
# print(pickle.dumps(Foo()))
|
cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
|
serializer = self._makeOne()
|
self.assertRaises(ValueError, serializer.loads, cstruct)
|
|
def test_dumps(self):
|
obj = Dummy()
|
serializer = self._makeOne()
|
result = serializer.dumps(obj)
|
expected_result = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
|
self.assertEqual(result, expected_result)
|
self.assertIsInstance(result, bytes)
|
|
|
class Dummy(object):
|
pass
|
|
|
class DummySerializer(object):
|
def dumps(self, value):
|
return base64.b64encode(json.dumps(value).encode('utf-8'))
|
|
def loads(self, value):
|
try:
|
return json.loads(base64.b64decode(value).decode('utf-8'))
|
|
# base64.b64decode raises a TypeError on py2 instead of a ValueError
|
# and a ValueError is required for the session to handle it properly
|
except TypeError:
|
raise ValueError
|
|
|
class DummySessionFactory(dict):
|
_dirty = False
|
_cookie_name = 'session'
|
_cookie_max_age = None
|
_cookie_path = '/'
|
_cookie_domain = None
|
_cookie_secure = False
|
_cookie_httponly = False
|
_timeout = 1200
|
_reissue_time = 0
|
|
def __init__(self, request):
|
self.request = request
|
dict.__init__(self, {})
|
|
def changed(self):
|
self._dirty = True
|
|
|
class DummyResponse(object):
|
def __init__(self):
|
self.headerlist = []
|