Michael Merickel
2018-10-15 2b024920847481592b1a13d4006d2a9fa8881d72
commit | author | age
10dd60 1 import base64
4fade6 2 import json
968209 3 import unittest
CM 4 from pyramid import testing
a9cd71 5 from pyramid.compat import pickle
968209 6
4fade6 7 class SharedCookieSessionTests(object):
968209 8
CM 9     def test_ctor_no_cookie(self):
10         request = testing.DummyRequest()
11         session = self._makeOne(request)
12         self.assertEqual(dict(session), {})
13
6af3eb 14     def test_instance_conforms(self):
CM 15         from zope.interface.verify import verifyObject
16         from pyramid.interfaces import ISession
17         request = testing.DummyRequest()
18         session = self._makeOne(request)
19         verifyObject(ISession, session)
20
968209 21     def test_ctor_with_cookie_still_valid(self):
CM 22         import time
23         request = testing.DummyRequest()
4fade6 24         cookieval = self._serialize((time.time(), 0, {'state': 1}))
968209 25         request.cookies['session'] = cookieval
CM 26         session = self._makeOne(request)
27         self.assertEqual(dict(session), {'state':1})
4fade6 28
968209 29     def test_ctor_with_cookie_expired(self):
CM 30         request = testing.DummyRequest()
4fade6 31         cookieval = self._serialize((0, 0, {'state': 1}))
968209 32         request.cookies['session'] = cookieval
CM 33         session = self._makeOne(request)
34         self.assertEqual(dict(session), {})
35
4fade6 36     def test_ctor_with_bad_cookie_cannot_deserialize(self):
fb5292 37         request = testing.DummyRequest()
4fade6 38         request.cookies['session'] = 'abc'
MM 39         session = self._makeOne(request)
40         self.assertEqual(dict(session), {})
41
42     def test_ctor_with_bad_cookie_not_tuple(self):
43         request = testing.DummyRequest()
44         cookieval = self._serialize('abc')
fb5292 45         request.cookies['session'] = cookieval
CM 46         session = self._makeOne(request)
4fade6 47         self.assertEqual(dict(session), {})
MM 48
49     def test_timeout(self):
50         import time
51         request = testing.DummyRequest()
52         cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
53         request.cookies['session'] = cookieval
54         session = self._makeOne(request, timeout=1)
fb5292 55         self.assertEqual(dict(session), {})
CM 56
7075e1 57     def test_timeout_never(self):
MM 58         import time
59         request = testing.DummyRequest()
60         LONG_TIME = 31536000
61         cookieval = self._serialize((time.time() + LONG_TIME, 0, {'state': 1}))
62         request.cookies['session'] = cookieval
63         session = self._makeOne(request, timeout=None)
64         self.assertEqual(dict(session), {'state': 1})
65
67ff3b 66     def test_timeout_str(self):
R 67         import time
68         request = testing.DummyRequest()
69         cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
70         request.cookies['session'] = cookieval
71         session = self._makeOne(request, timeout='1')
72         self.assertEqual(dict(session), {})
73
74     def test_timeout_invalid(self):
75         request = testing.DummyRequest()
7ca2b2 76         self.assertRaises(ValueError, self._makeOne, request, timeout='Invalid value')
67ff3b 77
968209 78     def test_changed(self):
CM 79         request = testing.DummyRequest()
80         session = self._makeOne(request)
81         self.assertEqual(session.changed(), None)
4fade6 82         self.assertTrue(session._dirty)
968209 83
CM 84     def test_invalidate(self):
85         request = testing.DummyRequest()
86         session = self._makeOne(request)
87         session['a'] = 1
88         self.assertEqual(session.invalidate(), None)
a1d395 89         self.assertFalse('a' in session)
4fade6 90
MM 91     def test_reissue_triggered(self):
92         import time
93         request = testing.DummyRequest()
94         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
95         request.cookies['session'] = cookieval
96         session = self._makeOne(request)
97         self.assertEqual(session['state'], 1)
98         self.assertTrue(session._dirty)
968209 99
CM 100     def test__set_cookie_on_exception(self):
101         request = testing.DummyRequest()
102         request.exception = True
103         session = self._makeOne(request)
104         session._cookie_on_exception = False
105         response = DummyResponse()
106         self.assertEqual(session._set_cookie(response), False)
d0c2f3 107
CM 108     def test__set_cookie_on_exception_no_request_exception(self):
109         import webob
110         request = testing.DummyRequest()
111         request.exception = None
112         session = self._makeOne(request)
113         session._cookie_on_exception = False
114         response = webob.Response()
115         self.assertEqual(session._set_cookie(response), True)
116         self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
117
968209 118     def test__set_cookie_cookieval_too_long(self):
CM 119         request = testing.DummyRequest()
120         session = self._makeOne(request)
121         session['abc'] = 'x'*100000
122         response = DummyResponse()
123         self.assertRaises(ValueError, session._set_cookie, response)
124
125     def test__set_cookie_real_webob_response(self):
126         import webob
127         request = testing.DummyRequest()
128         session = self._makeOne(request)
129         session['abc'] = 'x'
130         response = webob.Response()
131         self.assertEqual(session._set_cookie(response), True)
132         self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
133
134     def test__set_cookie_options(self):
d868ff 135         from pyramid.response import Response
968209 136         request = testing.DummyRequest()
CM 137         request.exception = None
138         session = self._makeOne(request,
4fade6 139                                 cookie_name='abc',
MM 140                                 path='/foo',
141                                 domain='localhost',
142                                 secure=True,
143                                 httponly=True,
968209 144                                 )
CM 145         session['abc'] = 'x'
d868ff 146         response = Response()
968209 147         self.assertEqual(session._set_cookie(response), True)
4fade6 148         cookieval = response.headerlist[-1][1]
3d3dee 149         val, domain, path, secure, httponly, samesite = [x.strip() for x in
AG 150                                                          cookieval.split(';')]
a1d395 151         self.assertTrue(val.startswith('abc='))
968209 152         self.assertEqual(domain, 'Domain=localhost')
CM 153         self.assertEqual(path, 'Path=/foo')
154         self.assertEqual(secure, 'secure')
155         self.assertEqual(httponly, 'HttpOnly')
3d3dee 156         self.assertEqual(samesite, 'SameSite=Lax')
968209 157
4df636 158     def test_flash_default(self):
CM 159         request = testing.DummyRequest()
160         session = self._makeOne(request)
161         session.flash('msg1')
162         session.flash('msg2')
6f6d36 163         self.assertEqual(session['_f_'], ['msg1', 'msg2'])
d0c2f3 164
CM 165     def test_flash_allow_duplicate_false(self):
166         request = testing.DummyRequest()
167         session = self._makeOne(request)
168         session.flash('msg1')
169         session.flash('msg1', allow_duplicate=False)
170         self.assertEqual(session['_f_'], ['msg1'])
171
172     def test_flash_allow_duplicate_true_and_msg_not_in_storage(self):
173         request = testing.DummyRequest()
174         session = self._makeOne(request)
175         session.flash('msg1', allow_duplicate=True)
176         self.assertEqual(session['_f_'], ['msg1'])
177
178     def test_flash_allow_duplicate_false_and_msg_not_in_storage(self):
179         request = testing.DummyRequest()
180         session = self._makeOne(request)
181         session.flash('msg1', allow_duplicate=False)
182         self.assertEqual(session['_f_'], ['msg1'])
183
4df636 184     def test_flash_mixed(self):
CM 185         request = testing.DummyRequest()
186         session = self._makeOne(request)
6f6d36 187         session.flash('warn1', 'warn')
CM 188         session.flash('warn2', 'warn')
189         session.flash('err1', 'error')
190         session.flash('err2', 'error')
191         self.assertEqual(session['_f_warn'], ['warn1', 'warn2'])
4df636 192
6f6d36 193     def test_pop_flash_default_queue(self):
4df636 194         request = testing.DummyRequest()
CM 195         session = self._makeOne(request)
6f6d36 196         queue = ['one', 'two']
CM 197         session['_f_'] = queue
198         result = session.pop_flash()
199         self.assertEqual(result, queue)
200         self.assertEqual(session.get('_f_'), None)
4df636 201
6f6d36 202     def test_pop_flash_nodefault_queue(self):
4df636 203         request = testing.DummyRequest()
CM 204         session = self._makeOne(request)
6f6d36 205         queue = ['one', 'two']
CM 206         session['_f_error'] = queue
207         result = session.pop_flash('error')
208         self.assertEqual(result, queue)
209         self.assertEqual(session.get('_f_error'), None)
4df636 210
6f6d36 211     def test_peek_flash_default_queue(self):
4df636 212         request = testing.DummyRequest()
CM 213         session = self._makeOne(request)
6f6d36 214         queue = ['one', 'two']
CM 215         session['_f_'] = queue
216         result = session.peek_flash()
217         self.assertEqual(result, queue)
218         self.assertEqual(session.get('_f_'), queue)
219
220     def test_peek_flash_nodefault_queue(self):
221         request = testing.DummyRequest()
222         session = self._makeOne(request)
223         queue = ['one', 'two']
224         session['_f_error'] = queue
225         result = session.peek_flash('error')
226         self.assertEqual(result, queue)
227         self.assertEqual(session.get('_f_error'), queue)
4df636 228
319793 229     def test_new_csrf_token(self):
CM 230         request = testing.DummyRequest()
231         session = self._makeOne(request)
232         token = session.new_csrf_token()
233         self.assertEqual(token, session['_csrft_'])
234
36ea5b 235     def test_get_csrf_token(self):
319793 236         request = testing.DummyRequest()
CM 237         session = self._makeOne(request)
238         session['_csrft_'] = 'token'
36ea5b 239         token = session.get_csrf_token()
319793 240         self.assertEqual(token, 'token')
a1d395 241         self.assertTrue('_csrft_' in session)
319793 242
14f863 243     def test_get_csrf_token_new(self):
CM 244         request = testing.DummyRequest()
245         session = self._makeOne(request)
246         token = session.get_csrf_token()
a1d395 247         self.assertTrue(token)
CM 248         self.assertTrue('_csrft_' in session)
14f863 249
4fade6 250     def test_no_set_cookie_with_exception(self):
MM 251         import webob
252         request = testing.DummyRequest()
253         request.exception = True
254         session = self._makeOne(request, set_on_exception=False)
255         session['a'] = 1
256         callbacks = request.response_callbacks
257         self.assertEqual(len(callbacks), 1)
258         response = webob.Response()
259         result = callbacks[0](request, response)
260         self.assertEqual(result, None)
261         self.assertFalse('Set-Cookie' in dict(response.headerlist))
262
263     def test_set_cookie_with_exception(self):
264         import webob
265         request = testing.DummyRequest()
266         request.exception = True
267         session = self._makeOne(request)
268         session['a'] = 1
269         callbacks = request.response_callbacks
270         self.assertEqual(len(callbacks), 1)
271         response = webob.Response()
272         result = callbacks[0](request, response)
273         self.assertEqual(result, None)
274         self.assertTrue('Set-Cookie' in dict(response.headerlist))
275
276     def test_cookie_is_set(self):
277         import webob
278         request = testing.DummyRequest()
279         session = self._makeOne(request)
280         session['a'] = 1
281         callbacks = request.response_callbacks
282         self.assertEqual(len(callbacks), 1)
283         response = webob.Response()
284         result = callbacks[0](request, response)
285         self.assertEqual(result, None)
286         self.assertTrue('Set-Cookie' in dict(response.headerlist))
287
288 class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
289     def _makeOne(self, request, **kw):
290         from pyramid.session import BaseCookieSessionFactory
8134a7 291         serializer = DummySerializer()
CM 292         return BaseCookieSessionFactory(serializer, **kw)(request)
4fade6 293
MM 294     def _serialize(self, value):
10dd60 295         return base64.b64encode(json.dumps(value).encode('utf-8'))
4fade6 296
MM 297     def test_reissue_not_triggered(self):
298         import time
299         request = testing.DummyRequest()
300         cookieval = self._serialize((time.time(), 0, {'state': 1}))
301         request.cookies['session'] = cookieval
302         session = self._makeOne(request, reissue_time=1)
303         self.assertEqual(session['state'], 1)
304         self.assertFalse(session._dirty)
305
9549f6 306     def test_reissue_never(self):
MM 307         request = testing.DummyRequest()
308         cookieval = self._serialize((0, 0, {'state': 1}))
309         request.cookies['session'] = cookieval
310         session = self._makeOne(request, reissue_time=None, timeout=None)
311         self.assertEqual(session['state'], 1)
312         self.assertFalse(session._dirty)
313
a43abd 314     def test_reissue_str_triggered(self):
R 315         import time
316         request = testing.DummyRequest()
317         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
318         request.cookies['session'] = cookieval
319         session = self._makeOne(request, reissue_time='0')
320         self.assertEqual(session['state'], 1)
321         self.assertTrue(session._dirty)
322
323     def test_reissue_invalid(self):
324         request = testing.DummyRequest()
325         self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value')
326
fa7886 327     def test_cookie_max_age_invalid(self):
R 328         request = testing.DummyRequest()
329         self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value')
330
4fade6 331 class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
MM 332     def _makeOne(self, request, **kw):
333         from pyramid.session import SignedCookieSessionFactory
b0b09c 334         kw.setdefault('secret', 'secret')
MM 335         return SignedCookieSessionFactory(**kw)(request)
4fade6 336
554a02 337     def _serialize(self, value, salt=b'pyramid.session.', hashalg='sha512'):
b0b09c 338         import base64
MM 339         import hashlib
340         import hmac
341         import pickle
342
343         digestmod = lambda: hashlib.new(hashalg)
344         cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
554a02 345         sig = hmac.new(salt + b'secret', cstruct, digestmod).digest()
8134a7 346         return base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
4fade6 347
MM 348     def test_reissue_not_triggered(self):
349         import time
350         request = testing.DummyRequest()
351         cookieval = self._serialize((time.time(), 0, {'state': 1}))
352         request.cookies['session'] = cookieval
353         session = self._makeOne(request, reissue_time=1)
354         self.assertEqual(session['state'], 1)
355         self.assertFalse(session._dirty)
356
9549f6 357     def test_reissue_never(self):
MM 358         request = testing.DummyRequest()
359         cookieval = self._serialize((0, 0, {'state': 1}))
360         request.cookies['session'] = cookieval
361         session = self._makeOne(request, reissue_time=None, timeout=None)
362         self.assertEqual(session['state'], 1)
363         self.assertFalse(session._dirty)
364
a43abd 365     def test_reissue_str_triggered(self):
R 366         import time
367         request = testing.DummyRequest()
368         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
369         request.cookies['session'] = cookieval
370         session = self._makeOne(request, reissue_time='0')
371         self.assertEqual(session['state'], 1)
372         self.assertTrue(session._dirty)
373
374     def test_reissue_invalid(self):
375         request = testing.DummyRequest()
376         self.assertRaises(ValueError, self._makeOne, request, reissue_time='invalid value')
377
fa7886 378     def test_cookie_max_age_invalid(self):
R 379         request = testing.DummyRequest()
380         self.assertRaises(ValueError, self._makeOne, request, max_age='invalid value')
381
b0b09c 382     def test_custom_salt(self):
4fade6 383         import time
MM 384         request = testing.DummyRequest()
554a02 385         cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
4fade6 386         request.cookies['session'] = cookieval
554a02 387         session = self._makeOne(request, salt=b'f.')
4fade6 388         self.assertEqual(session['state'], 1)
b0b09c 389
MM 390     def test_salt_mismatch(self):
391         import time
392         request = testing.DummyRequest()
554a02 393         cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
b0b09c 394         request.cookies['session'] = cookieval
554a02 395         session = self._makeOne(request, salt=b'g.')
b0b09c 396         self.assertEqual(session, {})
MM 397
398     def test_custom_hashalg(self):
399         import time
400         request = testing.DummyRequest()
401         cookieval = self._serialize((time.time(), 0, {'state': 1}),
402                                     hashalg='sha1')
403         request.cookies['session'] = cookieval
404         session = self._makeOne(request, hashalg='sha1')
405         self.assertEqual(session['state'], 1)
406
407     def test_hashalg_mismatch(self):
408         import time
409         request = testing.DummyRequest()
410         cookieval = self._serialize((time.time(), 0, {'state': 1}),
411                                     hashalg='sha1')
412         request.cookies['session'] = cookieval
413         session = self._makeOne(request, hashalg='sha256')
414         self.assertEqual(session, {})
415
416     def test_secret_mismatch(self):
417         import time
418         request = testing.DummyRequest()
419         cookieval = self._serialize((time.time(), 0, {'state': 1}))
420         request.cookies['session'] = cookieval
421         session = self._makeOne(request, secret='evilsecret')
422         self.assertEqual(session, {})
423
424     def test_custom_serializer(self):
425         import base64
426         from hashlib import sha512
427         import hmac
428         import time
429         request = testing.DummyRequest()
8134a7 430         serializer = DummySerializer()
CM 431         cstruct = serializer.dumps((time.time(), 0, {'state': 1}))
554a02 432         sig = hmac.new(b'pyramid.session.secret', cstruct, sha512).digest()
8134a7 433         cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
b0b09c 434         request.cookies['session'] = cookieval
8134a7 435         session = self._makeOne(request, serializer=serializer)
b0b09c 436         self.assertEqual(session['state'], 1)
MM 437
438     def test_invalid_data_size(self):
439         from hashlib import sha512
440         import base64
441         request = testing.DummyRequest()
442         num_bytes = sha512().digest_size - 1
443         cookieval = base64.b64encode(b' ' * num_bytes)
444         request.cookies['session'] = cookieval
445         session = self._makeOne(request)
446         self.assertEqual(session, {})
4fade6 447
1c0db5 448     def test_very_long_key(self):
BJR 449         verylongkey = b'a' * 1024
450         import webob
451         request = testing.DummyRequest()
452         session = self._makeOne(request, secret=verylongkey)
453         session['a'] = 1
454         callbacks = request.response_callbacks
455         self.assertEqual(len(callbacks), 1)
456         response = webob.Response()
457
458         try:
459             result = callbacks[0](request, response)
8134a7 460         except TypeError: # pragma: no cover
1c0db5 461             self.fail('HMAC failed to initialize due to key length.')
BJR 462
463         self.assertEqual(result, None)
464         self.assertTrue('Set-Cookie' in dict(response.headerlist))
465
a9cd71 466     def test_bad_pickle(self):
MM 467         import base64
468         import hashlib
469         import hmac
470
471         digestmod = lambda: hashlib.new('sha512')
472         # generated from dumping an object that cannot be found anymore, eg:
473         # class Foo: pass
474         # print(pickle.dumps(Foo()))
475         cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
476         sig = hmac.new(b'pyramid.session.secret', cstruct, digestmod).digest()
477         cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
478
479         request = testing.DummyRequest()
480         request.cookies['session'] = cookieval
481         session = self._makeOne(request, secret='secret')
482         self.assertEqual(session, {})
483
4fade6 484 class TestUnencryptedCookieSession(SharedCookieSessionTests, unittest.TestCase):
9536f9 485     def setUp(self):
CM 486         super(TestUnencryptedCookieSession, self).setUp()
487         from zope.deprecation import __show__
488         __show__.off()
489
490     def tearDown(self):
491         super(TestUnencryptedCookieSession, self).tearDown()
492         from zope.deprecation import __show__
493         __show__.on()
494         
4fade6 495     def _makeOne(self, request, **kw):
MM 496         from pyramid.session import UnencryptedCookieSessionFactoryConfig
497         self._rename_cookie_var(kw, 'path', 'cookie_path')
498         self._rename_cookie_var(kw, 'domain', 'cookie_domain')
499         self._rename_cookie_var(kw, 'secure', 'cookie_secure')
500         self._rename_cookie_var(kw, 'httponly', 'cookie_httponly')
501         self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception')
502         return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request)
503
504     def _rename_cookie_var(self, kw, src, dest):
505         if src in kw:
506             kw.setdefault(dest, kw.pop(src))
507
508     def _serialize(self, value):
8134a7 509         from pyramid.compat import bytes_
4fade6 510         from pyramid.session import signed_serialize
8134a7 511         return bytes_(signed_serialize(value, 'secret'))
4fade6 512
cf46a1 513     def test_serialize_option(self):
IW 514         from pyramid.response import Response
515         secret = 'secret'
516         request = testing.DummyRequest()
517         session = self._makeOne(request,
518             signed_serialize=dummy_signed_serialize)
519         session['key'] = 'value'
520         response = Response()
521         self.assertEqual(session._set_cookie(response), True)
522         cookie = response.headerlist[-1][1]
523         expected_cookieval = dummy_signed_serialize(
524             (session.accessed, session.created, {'key': 'value'}), secret)
525         response = Response()
6c8ad4 526         response.set_cookie('session', expected_cookieval, samesite='Lax')
cf46a1 527         expected_cookie = response.headerlist[-1][1]
IW 528         self.assertEqual(cookie, expected_cookie)
529
530     def test_deserialize_option(self):
531         import time
532         secret = 'secret'
533         request = testing.DummyRequest()
534         accessed = time.time()
535         state = {'key': 'value'}
536         cookieval = dummy_signed_serialize((accessed, accessed, state), secret)
537         request.cookies['session'] = cookieval
538         session = self._makeOne(request,
539             signed_deserialize=dummy_signed_deserialize)
540         self.assertEqual(dict(session), state)
541
542 def dummy_signed_serialize(data, secret):
543     import base64
544     from pyramid.compat import pickle, bytes_
545     pickled = pickle.dumps(data)
546     return base64.b64encode(bytes_(secret)) + base64.b64encode(pickled)
547
548 def dummy_signed_deserialize(serialized, secret):
549     import base64
550     from pyramid.compat import pickle, bytes_
551     serialized_data = base64.b64decode(
552         serialized[len(base64.b64encode(bytes_(secret))):])
553     return pickle.loads(serialized_data)
554
968209 555 class Test_manage_accessed(unittest.TestCase):
CM 556     def _makeOne(self, wrapped):
557         from pyramid.session import manage_accessed
558         return manage_accessed(wrapped)
559
560     def test_accessed_set(self):
561         request = testing.DummyRequest()
562         session = DummySessionFactory(request)
4fade6 563         session.renewed = 0
968209 564         wrapper = self._makeOne(session.__class__.get)
CM 565         wrapper(session, 'a')
566         self.assertNotEqual(session.accessed, None)
4fade6 567         self.assertTrue(session._dirty)
MM 568
569     def test_accessed_without_renew(self):
570         import time
571         request = testing.DummyRequest()
572         session = DummySessionFactory(request)
573         session._reissue_time = 5
574         session.renewed = time.time()
575         wrapper = self._makeOne(session.__class__.get)
576         wrapper(session, 'a')
577         self.assertNotEqual(session.accessed, None)
578         self.assertFalse(session._dirty)
579
968209 580     def test_already_dirty(self):
CM 581         request = testing.DummyRequest()
582         session = DummySessionFactory(request)
4fade6 583         session.renewed = 0
968209 584         session._dirty = True
CM 585         session['a'] = 1
586         wrapper = self._makeOne(session.__class__.get)
587         self.assertEqual(wrapper.__doc__, session.get.__doc__)
588         result = wrapper(session, 'a')
589         self.assertEqual(result, 1)
590         callbacks = request.response_callbacks
39a03e 591         if callbacks is not None: self.assertEqual(len(callbacks), 0)
968209 592
4fade6 593 class Test_manage_changed(unittest.TestCase):
MM 594     def _makeOne(self, wrapped):
595         from pyramid.session import manage_changed
596         return manage_changed(wrapped)
968209 597
4fade6 598     def test_it(self):
968209 599         request = testing.DummyRequest()
CM 600         session = DummySessionFactory(request)
4fade6 601         wrapper = self._makeOne(session.__class__.__setitem__)
MM 602         wrapper(session, 'a', 1)
603         self.assertNotEqual(session.accessed, None)
604         self.assertTrue(session._dirty)
968209 605
CM 606 def serialize(data, secret):
607     import hmac
608     import base64
954999 609     from hashlib import sha1
CM 610     from pyramid.compat import bytes_
611     from pyramid.compat import native_
612     from pyramid.compat import pickle
613     pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
69b613 614     sig = hmac.new(bytes_(secret, 'utf-8'), pickled, sha1).hexdigest()
954999 615     return sig + native_(base64.b64encode(pickled))
968209 616
815955 617 class Test_signed_serialize(unittest.TestCase):
968209 618     def _callFUT(self, data, secret):
815955 619         from pyramid.session import signed_serialize
CM 620         return signed_serialize(data, secret)
968209 621
CM 622     def test_it(self):
623         expected = serialize('123', 'secret')
624         result = self._callFUT('123', 'secret')
69b613 625         self.assertEqual(result, expected)
MM 626
627     def test_it_with_highorder_secret(self):
cf026e 628         secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
69b613 629         expected = serialize('123', secret)
MM 630         result = self._callFUT('123', secret)
cf026e 631         self.assertEqual(result, expected)
MM 632
633     def test_it_with_latin1_secret(self):
634         secret = b'La Pe\xc3\xb1a'
635         expected = serialize('123', secret)
636         result = self._callFUT('123', secret.decode('latin-1'))
968209 637         self.assertEqual(result, expected)
CM 638         
815955 639 class Test_signed_deserialize(unittest.TestCase):
968209 640     def _callFUT(self, serialized, secret, hmac=None):
CM 641         if hmac is None:
642             import hmac
815955 643         from pyramid.session import signed_deserialize
CM 644         return signed_deserialize(serialized, secret, hmac=hmac)
968209 645
CM 646     def test_it(self):
647         serialized = serialize('123', 'secret')
648         result = self._callFUT(serialized, 'secret')
649         self.assertEqual(result, '123')
650
651     def test_invalid_bits(self):
652         serialized = serialize('123', 'secret')
815955 653         self.assertRaises(ValueError, self._callFUT, serialized, 'seekrit')
968209 654
CM 655     def test_invalid_len(self):
656         class hmac(object):
657             def new(self, *arg):
658                 return self
659             def hexdigest(self):
660                 return '1234'
661         serialized = serialize('123', 'secret123')
815955 662         self.assertRaises(ValueError, self._callFUT, serialized, 'secret',
CM 663                           hmac=hmac())
968209 664         
CM 665     def test_it_bad_encoding(self):
666         serialized = 'bad' + serialize('123', 'secret')
815955 667         self.assertRaises(ValueError, self._callFUT, serialized, 'secret')
4fade6 668
69b613 669     def test_it_with_highorder_secret(self):
cf026e 670         secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
69b613 671         serialized = serialize('123', secret)
MM 672         result = self._callFUT(serialized, secret)
673         self.assertEqual(result, '123')
674
cf026e 675     # bwcompat with pyramid <= 1.5b1 where latin1 is the default
MM 676     def test_it_with_latin1_secret(self):
677         secret = b'La Pe\xc3\xb1a'
678         serialized = serialize('123', secret)
679         result = self._callFUT(serialized, secret.decode('latin-1'))
680         self.assertEqual(result, '123')
681
65dee6 682
a9cd71 683 class TestPickleSerializer(unittest.TestCase):
MM 684     def _makeOne(self):
685         from pyramid.session import PickleSerializer
686         return PickleSerializer()
687
688     def test_loads(self):
689         # generated from dumping Dummy() using protocol=2
690         cstruct = b'\x80\x02cpyramid.tests.test_session\nDummy\nq\x00)\x81q\x01.'
691         serializer = self._makeOne()
692         result = serializer.loads(cstruct)
693         self.assertIsInstance(result, Dummy)
694
695     def test_loads_raises_ValueError_on_invalid_data(self):
696         cstruct = b'not pickled'
697         serializer = self._makeOne()
698         self.assertRaises(ValueError, serializer.loads, cstruct)
699
700     def test_loads_raises_ValueError_on_bad_import(self):
701         # generated from dumping an object that cannot be found anymore, eg:
702         # class Foo: pass
703         # print(pickle.dumps(Foo()))
704         cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
705         serializer = self._makeOne()
706         self.assertRaises(ValueError, serializer.loads, cstruct)
707
708     def test_dumps(self):
709         obj = Dummy()
710         serializer = self._makeOne()
711         result = serializer.dumps(obj)
712         expected_result = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
713         self.assertEqual(result, expected_result)
714         self.assertIsInstance(result, bytes)
715
716
717 class Dummy(object):
718     pass
719
720
8134a7 721 class DummySerializer(object):
CM 722     def dumps(self, value):
10dd60 723         return base64.b64encode(json.dumps(value).encode('utf-8'))
4fade6 724
8134a7 725     def loads(self, value):
5b5d5e 726         try:
MM 727             return json.loads(base64.b64decode(value).decode('utf-8'))
728
729         # base64.b64decode raises a TypeError on py2 instead of a ValueError
730         # and a ValueError is required for the session to handle it properly
731         except TypeError:
732             raise ValueError
4fade6 733
968209 734 class DummySessionFactory(dict):
CM 735     _dirty = False
736     _cookie_name = 'session'
737     _cookie_max_age = None
738     _cookie_path = '/'
739     _cookie_domain = None
740     _cookie_secure = False
741     _cookie_httponly = False
742     _timeout = 1200
4fade6 743     _reissue_time = 0
MM 744
968209 745     def __init__(self, request):
CM 746         self.request = request
747         dict.__init__(self, {})
748
4fade6 749     def changed(self):
MM 750         self._dirty = True
968209 751
CM 752 class DummyResponse(object):
753     def __init__(self):
754         self.headerlist = []