Michael Merickel
2018-10-18 f28dbb0ba8d276fad10a3cd25e4d60b298702d83
commit | author | age
10dd60 1 import base64
4fade6 2 import json
968209 3 import unittest
CM 4 from pyramid import testing
a9cd71 5 from pyramid.compat import pickle
968209 6
CM 7
0c29cf 8 class SharedCookieSessionTests(object):
968209 9     def test_ctor_no_cookie(self):
CM 10         request = testing.DummyRequest()
11         session = self._makeOne(request)
12         self.assertEqual(dict(session), {})
13
6af3eb 14     def test_instance_conforms(self):
CM 15         from zope.interface.verify import verifyObject
16         from pyramid.interfaces import ISession
0c29cf 17
6af3eb 18         request = testing.DummyRequest()
CM 19         session = self._makeOne(request)
20         verifyObject(ISession, session)
21
968209 22     def test_ctor_with_cookie_still_valid(self):
CM 23         import time
0c29cf 24
968209 25         request = testing.DummyRequest()
4fade6 26         cookieval = self._serialize((time.time(), 0, {'state': 1}))
968209 27         request.cookies['session'] = cookieval
CM 28         session = self._makeOne(request)
0c29cf 29         self.assertEqual(dict(session), {'state': 1})
4fade6 30
968209 31     def test_ctor_with_cookie_expired(self):
CM 32         request = testing.DummyRequest()
4fade6 33         cookieval = self._serialize((0, 0, {'state': 1}))
968209 34         request.cookies['session'] = cookieval
CM 35         session = self._makeOne(request)
36         self.assertEqual(dict(session), {})
37
4fade6 38     def test_ctor_with_bad_cookie_cannot_deserialize(self):
fb5292 39         request = testing.DummyRequest()
4fade6 40         request.cookies['session'] = 'abc'
MM 41         session = self._makeOne(request)
42         self.assertEqual(dict(session), {})
43
44     def test_ctor_with_bad_cookie_not_tuple(self):
45         request = testing.DummyRequest()
46         cookieval = self._serialize('abc')
fb5292 47         request.cookies['session'] = cookieval
CM 48         session = self._makeOne(request)
4fade6 49         self.assertEqual(dict(session), {})
MM 50
51     def test_timeout(self):
52         import time
0c29cf 53
4fade6 54         request = testing.DummyRequest()
MM 55         cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
56         request.cookies['session'] = cookieval
57         session = self._makeOne(request, timeout=1)
fb5292 58         self.assertEqual(dict(session), {})
CM 59
7075e1 60     def test_timeout_never(self):
MM 61         import time
0c29cf 62
7075e1 63         request = testing.DummyRequest()
MM 64         LONG_TIME = 31536000
65         cookieval = self._serialize((time.time() + LONG_TIME, 0, {'state': 1}))
66         request.cookies['session'] = cookieval
67         session = self._makeOne(request, timeout=None)
68         self.assertEqual(dict(session), {'state': 1})
69
67ff3b 70     def test_timeout_str(self):
R 71         import time
0c29cf 72
67ff3b 73         request = testing.DummyRequest()
R 74         cookieval = self._serialize((time.time() - 5, 0, {'state': 1}))
75         request.cookies['session'] = cookieval
76         session = self._makeOne(request, timeout='1')
77         self.assertEqual(dict(session), {})
78
79     def test_timeout_invalid(self):
80         request = testing.DummyRequest()
0c29cf 81         self.assertRaises(
MM 82             ValueError, self._makeOne, request, timeout='Invalid value'
83         )
67ff3b 84
968209 85     def test_changed(self):
CM 86         request = testing.DummyRequest()
87         session = self._makeOne(request)
88         self.assertEqual(session.changed(), None)
4fade6 89         self.assertTrue(session._dirty)
968209 90
CM 91     def test_invalidate(self):
92         request = testing.DummyRequest()
93         session = self._makeOne(request)
94         session['a'] = 1
95         self.assertEqual(session.invalidate(), None)
a1d395 96         self.assertFalse('a' in session)
4fade6 97
MM 98     def test_reissue_triggered(self):
99         import time
0c29cf 100
4fade6 101         request = testing.DummyRequest()
MM 102         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
103         request.cookies['session'] = cookieval
104         session = self._makeOne(request)
105         self.assertEqual(session['state'], 1)
106         self.assertTrue(session._dirty)
968209 107
CM 108     def test__set_cookie_on_exception(self):
109         request = testing.DummyRequest()
110         request.exception = True
111         session = self._makeOne(request)
112         session._cookie_on_exception = False
113         response = DummyResponse()
114         self.assertEqual(session._set_cookie(response), False)
d0c2f3 115
CM 116     def test__set_cookie_on_exception_no_request_exception(self):
117         import webob
0c29cf 118
d0c2f3 119         request = testing.DummyRequest()
CM 120         request.exception = None
121         session = self._makeOne(request)
122         session._cookie_on_exception = False
123         response = webob.Response()
124         self.assertEqual(session._set_cookie(response), True)
125         self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
126
968209 127     def test__set_cookie_cookieval_too_long(self):
CM 128         request = testing.DummyRequest()
129         session = self._makeOne(request)
0c29cf 130         session['abc'] = 'x' * 100000
968209 131         response = DummyResponse()
CM 132         self.assertRaises(ValueError, session._set_cookie, response)
133
134     def test__set_cookie_real_webob_response(self):
135         import webob
0c29cf 136
968209 137         request = testing.DummyRequest()
CM 138         session = self._makeOne(request)
139         session['abc'] = 'x'
140         response = webob.Response()
141         self.assertEqual(session._set_cookie(response), True)
142         self.assertEqual(response.headerlist[-1][0], 'Set-Cookie')
143
144     def test__set_cookie_options(self):
d868ff 145         from pyramid.response import Response
0c29cf 146
968209 147         request = testing.DummyRequest()
CM 148         request.exception = None
0c29cf 149         session = self._makeOne(
MM 150             request,
151             cookie_name='abc',
152             path='/foo',
153             domain='localhost',
154             secure=True,
155             httponly=True,
156         )
968209 157         session['abc'] = 'x'
d868ff 158         response = Response()
968209 159         self.assertEqual(session._set_cookie(response), True)
4fade6 160         cookieval = response.headerlist[-1][1]
0c29cf 161         val, domain, path, secure, httponly, samesite = [
MM 162             x.strip() for x in cookieval.split(';')
163         ]
a1d395 164         self.assertTrue(val.startswith('abc='))
968209 165         self.assertEqual(domain, 'Domain=localhost')
CM 166         self.assertEqual(path, 'Path=/foo')
167         self.assertEqual(secure, 'secure')
168         self.assertEqual(httponly, 'HttpOnly')
3d3dee 169         self.assertEqual(samesite, 'SameSite=Lax')
968209 170
4df636 171     def test_flash_default(self):
CM 172         request = testing.DummyRequest()
173         session = self._makeOne(request)
174         session.flash('msg1')
175         session.flash('msg2')
6f6d36 176         self.assertEqual(session['_f_'], ['msg1', 'msg2'])
d0c2f3 177
CM 178     def test_flash_allow_duplicate_false(self):
179         request = testing.DummyRequest()
180         session = self._makeOne(request)
181         session.flash('msg1')
182         session.flash('msg1', allow_duplicate=False)
183         self.assertEqual(session['_f_'], ['msg1'])
184
185     def test_flash_allow_duplicate_true_and_msg_not_in_storage(self):
186         request = testing.DummyRequest()
187         session = self._makeOne(request)
188         session.flash('msg1', allow_duplicate=True)
189         self.assertEqual(session['_f_'], ['msg1'])
190
191     def test_flash_allow_duplicate_false_and_msg_not_in_storage(self):
192         request = testing.DummyRequest()
193         session = self._makeOne(request)
194         session.flash('msg1', allow_duplicate=False)
195         self.assertEqual(session['_f_'], ['msg1'])
196
4df636 197     def test_flash_mixed(self):
CM 198         request = testing.DummyRequest()
199         session = self._makeOne(request)
6f6d36 200         session.flash('warn1', 'warn')
CM 201         session.flash('warn2', 'warn')
202         session.flash('err1', 'error')
203         session.flash('err2', 'error')
204         self.assertEqual(session['_f_warn'], ['warn1', 'warn2'])
4df636 205
6f6d36 206     def test_pop_flash_default_queue(self):
4df636 207         request = testing.DummyRequest()
CM 208         session = self._makeOne(request)
6f6d36 209         queue = ['one', 'two']
CM 210         session['_f_'] = queue
211         result = session.pop_flash()
212         self.assertEqual(result, queue)
213         self.assertEqual(session.get('_f_'), None)
4df636 214
6f6d36 215     def test_pop_flash_nodefault_queue(self):
4df636 216         request = testing.DummyRequest()
CM 217         session = self._makeOne(request)
6f6d36 218         queue = ['one', 'two']
CM 219         session['_f_error'] = queue
220         result = session.pop_flash('error')
221         self.assertEqual(result, queue)
222         self.assertEqual(session.get('_f_error'), None)
4df636 223
6f6d36 224     def test_peek_flash_default_queue(self):
4df636 225         request = testing.DummyRequest()
CM 226         session = self._makeOne(request)
6f6d36 227         queue = ['one', 'two']
CM 228         session['_f_'] = queue
229         result = session.peek_flash()
230         self.assertEqual(result, queue)
231         self.assertEqual(session.get('_f_'), queue)
232
233     def test_peek_flash_nodefault_queue(self):
234         request = testing.DummyRequest()
235         session = self._makeOne(request)
236         queue = ['one', 'two']
237         session['_f_error'] = queue
238         result = session.peek_flash('error')
239         self.assertEqual(result, queue)
240         self.assertEqual(session.get('_f_error'), queue)
4df636 241
319793 242     def test_new_csrf_token(self):
CM 243         request = testing.DummyRequest()
244         session = self._makeOne(request)
245         token = session.new_csrf_token()
246         self.assertEqual(token, session['_csrft_'])
247
36ea5b 248     def test_get_csrf_token(self):
319793 249         request = testing.DummyRequest()
CM 250         session = self._makeOne(request)
251         session['_csrft_'] = 'token'
36ea5b 252         token = session.get_csrf_token()
319793 253         self.assertEqual(token, 'token')
a1d395 254         self.assertTrue('_csrft_' in session)
319793 255
14f863 256     def test_get_csrf_token_new(self):
CM 257         request = testing.DummyRequest()
258         session = self._makeOne(request)
259         token = session.get_csrf_token()
a1d395 260         self.assertTrue(token)
CM 261         self.assertTrue('_csrft_' in session)
14f863 262
4fade6 263     def test_no_set_cookie_with_exception(self):
MM 264         import webob
0c29cf 265
4fade6 266         request = testing.DummyRequest()
MM 267         request.exception = True
268         session = self._makeOne(request, set_on_exception=False)
269         session['a'] = 1
270         callbacks = request.response_callbacks
271         self.assertEqual(len(callbacks), 1)
272         response = webob.Response()
273         result = callbacks[0](request, response)
274         self.assertEqual(result, None)
275         self.assertFalse('Set-Cookie' in dict(response.headerlist))
276
277     def test_set_cookie_with_exception(self):
278         import webob
0c29cf 279
4fade6 280         request = testing.DummyRequest()
MM 281         request.exception = True
282         session = self._makeOne(request)
283         session['a'] = 1
284         callbacks = request.response_callbacks
285         self.assertEqual(len(callbacks), 1)
286         response = webob.Response()
287         result = callbacks[0](request, response)
288         self.assertEqual(result, None)
289         self.assertTrue('Set-Cookie' in dict(response.headerlist))
290
291     def test_cookie_is_set(self):
292         import webob
0c29cf 293
4fade6 294         request = testing.DummyRequest()
MM 295         session = self._makeOne(request)
296         session['a'] = 1
297         callbacks = request.response_callbacks
298         self.assertEqual(len(callbacks), 1)
299         response = webob.Response()
300         result = callbacks[0](request, response)
301         self.assertEqual(result, None)
302         self.assertTrue('Set-Cookie' in dict(response.headerlist))
303
0c29cf 304
4fade6 305 class TestBaseCookieSession(SharedCookieSessionTests, unittest.TestCase):
MM 306     def _makeOne(self, request, **kw):
307         from pyramid.session import BaseCookieSessionFactory
0c29cf 308
8134a7 309         serializer = DummySerializer()
CM 310         return BaseCookieSessionFactory(serializer, **kw)(request)
4fade6 311
MM 312     def _serialize(self, value):
10dd60 313         return base64.b64encode(json.dumps(value).encode('utf-8'))
4fade6 314
MM 315     def test_reissue_not_triggered(self):
316         import time
0c29cf 317
4fade6 318         request = testing.DummyRequest()
MM 319         cookieval = self._serialize((time.time(), 0, {'state': 1}))
320         request.cookies['session'] = cookieval
321         session = self._makeOne(request, reissue_time=1)
322         self.assertEqual(session['state'], 1)
323         self.assertFalse(session._dirty)
324
9549f6 325     def test_reissue_never(self):
MM 326         request = testing.DummyRequest()
327         cookieval = self._serialize((0, 0, {'state': 1}))
328         request.cookies['session'] = cookieval
329         session = self._makeOne(request, reissue_time=None, timeout=None)
330         self.assertEqual(session['state'], 1)
331         self.assertFalse(session._dirty)
332
a43abd 333     def test_reissue_str_triggered(self):
R 334         import time
0c29cf 335
a43abd 336         request = testing.DummyRequest()
R 337         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
338         request.cookies['session'] = cookieval
339         session = self._makeOne(request, reissue_time='0')
340         self.assertEqual(session['state'], 1)
341         self.assertTrue(session._dirty)
342
343     def test_reissue_invalid(self):
344         request = testing.DummyRequest()
0c29cf 345         self.assertRaises(
MM 346             ValueError, self._makeOne, request, reissue_time='invalid value'
347         )
a43abd 348
fa7886 349     def test_cookie_max_age_invalid(self):
R 350         request = testing.DummyRequest()
0c29cf 351         self.assertRaises(
MM 352             ValueError, self._makeOne, request, max_age='invalid value'
353         )
354
fa7886 355
4fade6 356 class TestSignedCookieSession(SharedCookieSessionTests, unittest.TestCase):
MM 357     def _makeOne(self, request, **kw):
358         from pyramid.session import SignedCookieSessionFactory
0c29cf 359
b0b09c 360         kw.setdefault('secret', 'secret')
MM 361         return SignedCookieSessionFactory(**kw)(request)
4fade6 362
554a02 363     def _serialize(self, value, salt=b'pyramid.session.', hashalg='sha512'):
b0b09c 364         import base64
MM 365         import hashlib
366         import hmac
367         import pickle
368
369         digestmod = lambda: hashlib.new(hashalg)
370         cstruct = pickle.dumps(value, pickle.HIGHEST_PROTOCOL)
554a02 371         sig = hmac.new(salt + b'secret', cstruct, digestmod).digest()
8134a7 372         return base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
4fade6 373
MM 374     def test_reissue_not_triggered(self):
375         import time
0c29cf 376
4fade6 377         request = testing.DummyRequest()
MM 378         cookieval = self._serialize((time.time(), 0, {'state': 1}))
379         request.cookies['session'] = cookieval
380         session = self._makeOne(request, reissue_time=1)
381         self.assertEqual(session['state'], 1)
382         self.assertFalse(session._dirty)
383
9549f6 384     def test_reissue_never(self):
MM 385         request = testing.DummyRequest()
386         cookieval = self._serialize((0, 0, {'state': 1}))
387         request.cookies['session'] = cookieval
388         session = self._makeOne(request, reissue_time=None, timeout=None)
389         self.assertEqual(session['state'], 1)
390         self.assertFalse(session._dirty)
391
a43abd 392     def test_reissue_str_triggered(self):
R 393         import time
0c29cf 394
a43abd 395         request = testing.DummyRequest()
R 396         cookieval = self._serialize((time.time() - 2, 0, {'state': 1}))
397         request.cookies['session'] = cookieval
398         session = self._makeOne(request, reissue_time='0')
399         self.assertEqual(session['state'], 1)
400         self.assertTrue(session._dirty)
401
402     def test_reissue_invalid(self):
403         request = testing.DummyRequest()
0c29cf 404         self.assertRaises(
MM 405             ValueError, self._makeOne, request, reissue_time='invalid value'
406         )
a43abd 407
fa7886 408     def test_cookie_max_age_invalid(self):
R 409         request = testing.DummyRequest()
0c29cf 410         self.assertRaises(
MM 411             ValueError, self._makeOne, request, max_age='invalid value'
412         )
fa7886 413
b0b09c 414     def test_custom_salt(self):
4fade6 415         import time
0c29cf 416
4fade6 417         request = testing.DummyRequest()
554a02 418         cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
4fade6 419         request.cookies['session'] = cookieval
554a02 420         session = self._makeOne(request, salt=b'f.')
4fade6 421         self.assertEqual(session['state'], 1)
b0b09c 422
MM 423     def test_salt_mismatch(self):
424         import time
0c29cf 425
b0b09c 426         request = testing.DummyRequest()
554a02 427         cookieval = self._serialize((time.time(), 0, {'state': 1}), salt=b'f.')
b0b09c 428         request.cookies['session'] = cookieval
554a02 429         session = self._makeOne(request, salt=b'g.')
b0b09c 430         self.assertEqual(session, {})
MM 431
432     def test_custom_hashalg(self):
433         import time
0c29cf 434
b0b09c 435         request = testing.DummyRequest()
0c29cf 436         cookieval = self._serialize(
MM 437             (time.time(), 0, {'state': 1}), hashalg='sha1'
438         )
b0b09c 439         request.cookies['session'] = cookieval
MM 440         session = self._makeOne(request, hashalg='sha1')
441         self.assertEqual(session['state'], 1)
442
443     def test_hashalg_mismatch(self):
444         import time
0c29cf 445
b0b09c 446         request = testing.DummyRequest()
0c29cf 447         cookieval = self._serialize(
MM 448             (time.time(), 0, {'state': 1}), hashalg='sha1'
449         )
b0b09c 450         request.cookies['session'] = cookieval
MM 451         session = self._makeOne(request, hashalg='sha256')
452         self.assertEqual(session, {})
453
454     def test_secret_mismatch(self):
455         import time
0c29cf 456
b0b09c 457         request = testing.DummyRequest()
MM 458         cookieval = self._serialize((time.time(), 0, {'state': 1}))
459         request.cookies['session'] = cookieval
460         session = self._makeOne(request, secret='evilsecret')
461         self.assertEqual(session, {})
462
463     def test_custom_serializer(self):
464         import base64
465         from hashlib import sha512
466         import hmac
467         import time
0c29cf 468
b0b09c 469         request = testing.DummyRequest()
8134a7 470         serializer = DummySerializer()
CM 471         cstruct = serializer.dumps((time.time(), 0, {'state': 1}))
554a02 472         sig = hmac.new(b'pyramid.session.secret', cstruct, sha512).digest()
8134a7 473         cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
b0b09c 474         request.cookies['session'] = cookieval
8134a7 475         session = self._makeOne(request, serializer=serializer)
b0b09c 476         self.assertEqual(session['state'], 1)
MM 477
478     def test_invalid_data_size(self):
479         from hashlib import sha512
480         import base64
0c29cf 481
b0b09c 482         request = testing.DummyRequest()
MM 483         num_bytes = sha512().digest_size - 1
484         cookieval = base64.b64encode(b' ' * num_bytes)
485         request.cookies['session'] = cookieval
486         session = self._makeOne(request)
487         self.assertEqual(session, {})
4fade6 488
1c0db5 489     def test_very_long_key(self):
BJR 490         verylongkey = b'a' * 1024
491         import webob
0c29cf 492
1c0db5 493         request = testing.DummyRequest()
BJR 494         session = self._makeOne(request, secret=verylongkey)
495         session['a'] = 1
496         callbacks = request.response_callbacks
497         self.assertEqual(len(callbacks), 1)
498         response = webob.Response()
499
500         try:
501             result = callbacks[0](request, response)
0c29cf 502         except TypeError:  # pragma: no cover
1c0db5 503             self.fail('HMAC failed to initialize due to key length.')
BJR 504
505         self.assertEqual(result, None)
506         self.assertTrue('Set-Cookie' in dict(response.headerlist))
507
a9cd71 508     def test_bad_pickle(self):
MM 509         import base64
510         import hashlib
511         import hmac
512
513         digestmod = lambda: hashlib.new('sha512')
514         # generated from dumping an object that cannot be found anymore, eg:
515         # class Foo: pass
516         # print(pickle.dumps(Foo()))
517         cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
518         sig = hmac.new(b'pyramid.session.secret', cstruct, digestmod).digest()
519         cookieval = base64.urlsafe_b64encode(sig + cstruct).rstrip(b'=')
520
521         request = testing.DummyRequest()
522         request.cookies['session'] = cookieval
523         session = self._makeOne(request, secret='secret')
524         self.assertEqual(session, {})
525
0c29cf 526
MM 527 class TestUnencryptedCookieSession(
528     SharedCookieSessionTests, unittest.TestCase
529 ):
9536f9 530     def setUp(self):
CM 531         super(TestUnencryptedCookieSession, self).setUp()
532         from zope.deprecation import __show__
0c29cf 533
9536f9 534         __show__.off()
CM 535
536     def tearDown(self):
537         super(TestUnencryptedCookieSession, self).tearDown()
538         from zope.deprecation import __show__
0c29cf 539
9536f9 540         __show__.on()
0c29cf 541
4fade6 542     def _makeOne(self, request, **kw):
MM 543         from pyramid.session import UnencryptedCookieSessionFactoryConfig
0c29cf 544
4fade6 545         self._rename_cookie_var(kw, 'path', 'cookie_path')
MM 546         self._rename_cookie_var(kw, 'domain', 'cookie_domain')
547         self._rename_cookie_var(kw, 'secure', 'cookie_secure')
548         self._rename_cookie_var(kw, 'httponly', 'cookie_httponly')
549         self._rename_cookie_var(kw, 'set_on_exception', 'cookie_on_exception')
550         return UnencryptedCookieSessionFactoryConfig('secret', **kw)(request)
551
552     def _rename_cookie_var(self, kw, src, dest):
553         if src in kw:
554             kw.setdefault(dest, kw.pop(src))
555
556     def _serialize(self, value):
8134a7 557         from pyramid.compat import bytes_
4fade6 558         from pyramid.session import signed_serialize
0c29cf 559
8134a7 560         return bytes_(signed_serialize(value, 'secret'))
4fade6 561
cf46a1 562     def test_serialize_option(self):
IW 563         from pyramid.response import Response
0c29cf 564
cf46a1 565         secret = 'secret'
IW 566         request = testing.DummyRequest()
0c29cf 567         session = self._makeOne(
MM 568             request, signed_serialize=dummy_signed_serialize
569         )
cf46a1 570         session['key'] = 'value'
IW 571         response = Response()
572         self.assertEqual(session._set_cookie(response), True)
573         cookie = response.headerlist[-1][1]
574         expected_cookieval = dummy_signed_serialize(
0c29cf 575             (session.accessed, session.created, {'key': 'value'}), secret
MM 576         )
cf46a1 577         response = Response()
6c8ad4 578         response.set_cookie('session', expected_cookieval, samesite='Lax')
cf46a1 579         expected_cookie = response.headerlist[-1][1]
IW 580         self.assertEqual(cookie, expected_cookie)
581
582     def test_deserialize_option(self):
583         import time
0c29cf 584
cf46a1 585         secret = 'secret'
IW 586         request = testing.DummyRequest()
587         accessed = time.time()
588         state = {'key': 'value'}
589         cookieval = dummy_signed_serialize((accessed, accessed, state), secret)
590         request.cookies['session'] = cookieval
0c29cf 591         session = self._makeOne(
MM 592             request, signed_deserialize=dummy_signed_deserialize
593         )
cf46a1 594         self.assertEqual(dict(session), state)
0c29cf 595
cf46a1 596
IW 597 def dummy_signed_serialize(data, secret):
598     import base64
599     from pyramid.compat import pickle, bytes_
0c29cf 600
cf46a1 601     pickled = pickle.dumps(data)
IW 602     return base64.b64encode(bytes_(secret)) + base64.b64encode(pickled)
0c29cf 603
cf46a1 604
IW 605 def dummy_signed_deserialize(serialized, secret):
606     import base64
607     from pyramid.compat import pickle, bytes_
0c29cf 608
cf46a1 609     serialized_data = base64.b64decode(
0c29cf 610         serialized[len(base64.b64encode(bytes_(secret))) :]
MM 611     )
cf46a1 612     return pickle.loads(serialized_data)
0c29cf 613
cf46a1 614
968209 615 class Test_manage_accessed(unittest.TestCase):
CM 616     def _makeOne(self, wrapped):
617         from pyramid.session import manage_accessed
0c29cf 618
968209 619         return manage_accessed(wrapped)
CM 620
621     def test_accessed_set(self):
622         request = testing.DummyRequest()
623         session = DummySessionFactory(request)
4fade6 624         session.renewed = 0
968209 625         wrapper = self._makeOne(session.__class__.get)
CM 626         wrapper(session, 'a')
627         self.assertNotEqual(session.accessed, None)
4fade6 628         self.assertTrue(session._dirty)
MM 629
630     def test_accessed_without_renew(self):
631         import time
0c29cf 632
4fade6 633         request = testing.DummyRequest()
MM 634         session = DummySessionFactory(request)
635         session._reissue_time = 5
636         session.renewed = time.time()
637         wrapper = self._makeOne(session.__class__.get)
638         wrapper(session, 'a')
639         self.assertNotEqual(session.accessed, None)
640         self.assertFalse(session._dirty)
641
968209 642     def test_already_dirty(self):
CM 643         request = testing.DummyRequest()
644         session = DummySessionFactory(request)
4fade6 645         session.renewed = 0
968209 646         session._dirty = True
CM 647         session['a'] = 1
648         wrapper = self._makeOne(session.__class__.get)
649         self.assertEqual(wrapper.__doc__, session.get.__doc__)
650         result = wrapper(session, 'a')
651         self.assertEqual(result, 1)
652         callbacks = request.response_callbacks
0c29cf 653         if callbacks is not None:
MM 654             self.assertEqual(len(callbacks), 0)
655
968209 656
4fade6 657 class Test_manage_changed(unittest.TestCase):
MM 658     def _makeOne(self, wrapped):
659         from pyramid.session import manage_changed
0c29cf 660
4fade6 661         return manage_changed(wrapped)
968209 662
4fade6 663     def test_it(self):
968209 664         request = testing.DummyRequest()
CM 665         session = DummySessionFactory(request)
4fade6 666         wrapper = self._makeOne(session.__class__.__setitem__)
MM 667         wrapper(session, 'a', 1)
668         self.assertNotEqual(session.accessed, None)
669         self.assertTrue(session._dirty)
968209 670
0c29cf 671
968209 672 def serialize(data, secret):
CM 673     import hmac
674     import base64
954999 675     from hashlib import sha1
CM 676     from pyramid.compat import bytes_
677     from pyramid.compat import native_
678     from pyramid.compat import pickle
0c29cf 679
954999 680     pickled = pickle.dumps(data, pickle.HIGHEST_PROTOCOL)
69b613 681     sig = hmac.new(bytes_(secret, 'utf-8'), pickled, sha1).hexdigest()
954999 682     return sig + native_(base64.b64encode(pickled))
968209 683
0c29cf 684
815955 685 class Test_signed_serialize(unittest.TestCase):
968209 686     def _callFUT(self, data, secret):
815955 687         from pyramid.session import signed_serialize
0c29cf 688
815955 689         return signed_serialize(data, secret)
968209 690
CM 691     def test_it(self):
692         expected = serialize('123', 'secret')
693         result = self._callFUT('123', 'secret')
69b613 694         self.assertEqual(result, expected)
MM 695
696     def test_it_with_highorder_secret(self):
cf026e 697         secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
69b613 698         expected = serialize('123', secret)
MM 699         result = self._callFUT('123', secret)
cf026e 700         self.assertEqual(result, expected)
MM 701
702     def test_it_with_latin1_secret(self):
703         secret = b'La Pe\xc3\xb1a'
704         expected = serialize('123', secret)
705         result = self._callFUT('123', secret.decode('latin-1'))
968209 706         self.assertEqual(result, expected)
0c29cf 707
MM 708
815955 709 class Test_signed_deserialize(unittest.TestCase):
968209 710     def _callFUT(self, serialized, secret, hmac=None):
CM 711         if hmac is None:
712             import hmac
815955 713         from pyramid.session import signed_deserialize
0c29cf 714
815955 715         return signed_deserialize(serialized, secret, hmac=hmac)
968209 716
CM 717     def test_it(self):
718         serialized = serialize('123', 'secret')
719         result = self._callFUT(serialized, 'secret')
720         self.assertEqual(result, '123')
721
722     def test_invalid_bits(self):
723         serialized = serialize('123', 'secret')
815955 724         self.assertRaises(ValueError, self._callFUT, serialized, 'seekrit')
968209 725
CM 726     def test_invalid_len(self):
727         class hmac(object):
728             def new(self, *arg):
729                 return self
0c29cf 730
968209 731             def hexdigest(self):
CM 732                 return '1234'
0c29cf 733
968209 734         serialized = serialize('123', 'secret123')
0c29cf 735         self.assertRaises(
MM 736             ValueError, self._callFUT, serialized, 'secret', hmac=hmac()
737         )
738
968209 739     def test_it_bad_encoding(self):
CM 740         serialized = 'bad' + serialize('123', 'secret')
815955 741         self.assertRaises(ValueError, self._callFUT, serialized, 'secret')
4fade6 742
69b613 743     def test_it_with_highorder_secret(self):
cf026e 744         secret = b'\xce\xb1\xce\xb2\xce\xb3\xce\xb4'.decode('utf-8')
69b613 745         serialized = serialize('123', secret)
MM 746         result = self._callFUT(serialized, secret)
747         self.assertEqual(result, '123')
748
cf026e 749     # bwcompat with pyramid <= 1.5b1 where latin1 is the default
MM 750     def test_it_with_latin1_secret(self):
751         secret = b'La Pe\xc3\xb1a'
752         serialized = serialize('123', secret)
753         result = self._callFUT(serialized, secret.decode('latin-1'))
754         self.assertEqual(result, '123')
755
65dee6 756
a9cd71 757 class TestPickleSerializer(unittest.TestCase):
MM 758     def _makeOne(self):
759         from pyramid.session import PickleSerializer
0c29cf 760
a9cd71 761         return PickleSerializer()
MM 762
763     def test_loads(self):
764         # generated from dumping Dummy() using protocol=2
dd3cc8 765         cstruct = b'\x80\x02ctests.test_session\nDummy\nq\x00)\x81q\x01.'
a9cd71 766         serializer = self._makeOne()
MM 767         result = serializer.loads(cstruct)
768         self.assertIsInstance(result, Dummy)
769
770     def test_loads_raises_ValueError_on_invalid_data(self):
771         cstruct = b'not pickled'
772         serializer = self._makeOne()
773         self.assertRaises(ValueError, serializer.loads, cstruct)
774
775     def test_loads_raises_ValueError_on_bad_import(self):
776         # generated from dumping an object that cannot be found anymore, eg:
777         # class Foo: pass
778         # print(pickle.dumps(Foo()))
779         cstruct = b'(i__main__\nFoo\np0\n(dp1\nb.'
780         serializer = self._makeOne()
781         self.assertRaises(ValueError, serializer.loads, cstruct)
782
783     def test_dumps(self):
784         obj = Dummy()
785         serializer = self._makeOne()
786         result = serializer.dumps(obj)
787         expected_result = pickle.dumps(obj, protocol=pickle.HIGHEST_PROTOCOL)
788         self.assertEqual(result, expected_result)
789         self.assertIsInstance(result, bytes)
790
791
792 class Dummy(object):
793     pass
794
795
8134a7 796 class DummySerializer(object):
CM 797     def dumps(self, value):
10dd60 798         return base64.b64encode(json.dumps(value).encode('utf-8'))
4fade6 799
8134a7 800     def loads(self, value):
5b5d5e 801         try:
MM 802             return json.loads(base64.b64decode(value).decode('utf-8'))
803
804         # base64.b64decode raises a TypeError on py2 instead of a ValueError
805         # and a ValueError is required for the session to handle it properly
806         except TypeError:
807             raise ValueError
4fade6 808
0c29cf 809
968209 810 class DummySessionFactory(dict):
CM 811     _dirty = False
812     _cookie_name = 'session'
813     _cookie_max_age = None
814     _cookie_path = '/'
815     _cookie_domain = None
816     _cookie_secure = False
817     _cookie_httponly = False
818     _timeout = 1200
4fade6 819     _reissue_time = 0
MM 820
968209 821     def __init__(self, request):
CM 822         self.request = request
823         dict.__init__(self, {})
824
4fade6 825     def changed(self):
MM 826         self._dirty = True
968209 827
0c29cf 828
968209 829 class DummyResponse(object):
CM 830     def __init__(self):
831         self.headerlist = []