Michael Merickel
2018-10-18 f28dbb0ba8d276fad10a3cd25e4d60b298702d83
commit | author | age
a2c7c7 1 import unittest
MW 2
313c25 3 from pyramid import testing
a2c7c7 4 from pyramid.config import Configurator
313c25 5
JC 6
682a9b 7 class TestLegacySessionCSRFStoragePolicy(unittest.TestCase):
a2c7c7 8     class MockSession(object):
3f14d6 9         def __init__(self, current_token='02821185e4c94269bdc38e6eeae0a2f8'):
MM 10             self.current_token = current_token
11
a2c7c7 12         def new_csrf_token(self):
3f14d6 13             self.current_token = 'e5e9e30a08b34ff9842ff7d2b958c14b'
MM 14             return self.current_token
a2c7c7 15
MW 16         def get_csrf_token(self):
3f14d6 17             return self.current_token
a2c7c7 18
313c25 19     def _makeOne(self):
682a9b 20         from pyramid.csrf import LegacySessionCSRFStoragePolicy
0c29cf 21
682a9b 22         return LegacySessionCSRFStoragePolicy()
313c25 23
JC 24     def test_register_session_csrf_policy(self):
682a9b 25         from pyramid.csrf import LegacySessionCSRFStoragePolicy
fe0d22 26         from pyramid.interfaces import ICSRFStoragePolicy
313c25 27
a2c7c7 28         config = Configurator()
7c0f09 29         config.set_csrf_storage_policy(self._makeOne())
a2c7c7 30         config.commit()
MW 31
fe0d22 32         policy = config.registry.queryUtility(ICSRFStoragePolicy)
313c25 33
682a9b 34         self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy))
313c25 35
JC 36     def test_session_csrf_implementation_delegates_to_session(self):
37         policy = self._makeOne()
38         request = DummyRequest(session=self.MockSession())
39
a2c7c7 40         self.assertEqual(
0c29cf 41             policy.get_csrf_token(request), '02821185e4c94269bdc38e6eeae0a2f8'
a2c7c7 42         )
MW 43         self.assertEqual(
0c29cf 44             policy.new_csrf_token(request), 'e5e9e30a08b34ff9842ff7d2b958c14b'
a2c7c7 45         )
3f14d6 46
MM 47     def test_check_csrf_token(self):
48         request = DummyRequest(session=self.MockSession('foo'))
49
50         policy = self._makeOne()
51         self.assertTrue(policy.check_csrf_token(request, 'foo'))
52         self.assertFalse(policy.check_csrf_token(request, 'bar'))
a2c7c7 53
682a9b 54
MM 55 class TestSessionCSRFStoragePolicy(unittest.TestCase):
56     def _makeOne(self, **kw):
57         from pyramid.csrf import SessionCSRFStoragePolicy
0c29cf 58
682a9b 59         return SessionCSRFStoragePolicy(**kw)
MM 60
61     def test_register_session_csrf_policy(self):
62         from pyramid.csrf import SessionCSRFStoragePolicy
63         from pyramid.interfaces import ICSRFStoragePolicy
64
65         config = Configurator()
66         config.set_csrf_storage_policy(self._makeOne())
67         config.commit()
68
69         policy = config.registry.queryUtility(ICSRFStoragePolicy)
70
71         self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy))
72
73     def test_it_creates_a_new_token(self):
74         request = DummyRequest(session={})
75
313c25 76         policy = self._makeOne()
682a9b 77         policy._token_factory = lambda: 'foo'
MM 78         self.assertEqual(policy.get_csrf_token(request), 'foo')
a2c7c7 79
682a9b 80     def test_get_csrf_token_returns_the_new_token(self):
MM 81         request = DummyRequest(session={'_csrft_': 'foo'})
a2c7c7 82
313c25 83         policy = self._makeOne()
682a9b 84         self.assertEqual(policy.get_csrf_token(request), 'foo')
313c25 85
682a9b 86         token = policy.new_csrf_token(request)
MM 87         self.assertNotEqual(token, 'foo')
88         self.assertEqual(token, policy.get_csrf_token(request))
3f14d6 89
MM 90     def test_check_csrf_token(self):
91         request = DummyRequest(session={})
92
93         policy = self._makeOne()
94         self.assertFalse(policy.check_csrf_token(request, 'foo'))
95
96         request.session = {'_csrft_': 'foo'}
97         self.assertTrue(policy.check_csrf_token(request, 'foo'))
98         self.assertFalse(policy.check_csrf_token(request, 'bar'))
313c25 99
JC 100
7c0f09 101 class TestCookieCSRFStoragePolicy(unittest.TestCase):
682a9b 102     def _makeOne(self, **kw):
7c0f09 103         from pyramid.csrf import CookieCSRFStoragePolicy
0c29cf 104
682a9b 105         return CookieCSRFStoragePolicy(**kw)
313c25 106
JC 107     def test_register_cookie_csrf_policy(self):
7c0f09 108         from pyramid.csrf import CookieCSRFStoragePolicy
fe0d22 109         from pyramid.interfaces import ICSRFStoragePolicy
313c25 110
a2c7c7 111         config = Configurator()
7c0f09 112         config.set_csrf_storage_policy(self._makeOne())
a2c7c7 113         config.commit()
MW 114
fe0d22 115         policy = config.registry.queryUtility(ICSRFStoragePolicy)
a2c7c7 116
7c0f09 117         self.assertTrue(isinstance(policy, CookieCSRFStoragePolicy))
313c25 118
JC 119     def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self):
120         response = MockResponse()
682a9b 121         request = DummyRequest()
313c25 122
JC 123         policy = self._makeOne()
124         token = policy.get_csrf_token(request)
682a9b 125         request.response_callback(request, response)
a2c7c7 126         self.assertEqual(
7c0f09 127             response.headerlist,
0c29cf 128             [
MM 129                 (
130                     'Set-Cookie',
131                     'csrf_token={}; Path=/; SameSite=Lax'.format(token),
132                 )
133             ],
87771a 134         )
CM 135
136     def test_get_cookie_csrf_nondefault_samesite(self):
137         response = MockResponse()
138         request = DummyRequest()
139
140         policy = self._makeOne(samesite=None)
141         token = policy.get_csrf_token(request)
142         request.response_callback(request, response)
143         self.assertEqual(
144             response.headerlist,
0c29cf 145             [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))],
a2c7c7 146         )
MW 147
148     def test_existing_cookie_csrf_does_not_set_cookie(self):
682a9b 149         request = DummyRequest()
a2c7c7 150         request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
MW 151
313c25 152         policy = self._makeOne()
JC 153         token = policy.get_csrf_token(request)
154
0c29cf 155         self.assertEqual(token, 'e6f325fee5974f3da4315a8ccf4513d2')
682a9b 156         self.assertIsNone(request.response_callback)
a2c7c7 157
MW 158     def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self):
682a9b 159         request = DummyRequest()
a2c7c7 160         request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
MW 161
313c25 162         policy = self._makeOne()
JC 163         token = policy.new_csrf_token(request)
682a9b 164
MM 165         response = MockResponse()
166         request.response_callback(request, response)
a2c7c7 167         self.assertEqual(
7c0f09 168             response.headerlist,
0c29cf 169             [
MM 170                 (
171                     'Set-Cookie',
172                     'csrf_token={}; Path=/; SameSite=Lax'.format(token),
173                 )
174             ],
a2c7c7 175         )
MW 176
682a9b 177     def test_get_csrf_token_returns_the_new_token(self):
MM 178         request = DummyRequest()
179         request.cookies = {'csrf_token': 'foo'}
313c25 180
JC 181         policy = self._makeOne()
682a9b 182         self.assertEqual(policy.get_csrf_token(request), 'foo')
313c25 183
682a9b 184         token = policy.new_csrf_token(request)
MM 185         self.assertNotEqual(token, 'foo')
186         self.assertEqual(token, policy.get_csrf_token(request))
313c25 187
3f14d6 188     def test_check_csrf_token(self):
MM 189         request = DummyRequest()
190
191         policy = self._makeOne()
192         self.assertFalse(policy.check_csrf_token(request, 'foo'))
193
194         request.cookies = {'csrf_token': 'foo'}
195         self.assertTrue(policy.check_csrf_token(request, 'foo'))
196         self.assertFalse(policy.check_csrf_token(request, 'bar'))
197
0c29cf 198
3f14d6 199 class Test_get_csrf_token(unittest.TestCase):
MM 200     def setUp(self):
201         self.config = testing.setUp()
202
203     def _callFUT(self, *args, **kwargs):
204         from pyramid.csrf import get_csrf_token
0c29cf 205
3f14d6 206         return get_csrf_token(*args, **kwargs)
MM 207
208     def test_no_override_csrf_utility_registered(self):
209         request = testing.DummyRequest()
210         self._callFUT(request)
211
212     def test_success(self):
213         self.config.set_csrf_storage_policy(DummyCSRF())
214         request = testing.DummyRequest()
215
216         csrf_token = self._callFUT(request)
217
218         self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8')
219
220
221 class Test_new_csrf_token(unittest.TestCase):
222     def setUp(self):
223         self.config = testing.setUp()
224
225     def _callFUT(self, *args, **kwargs):
226         from pyramid.csrf import new_csrf_token
0c29cf 227
3f14d6 228         return new_csrf_token(*args, **kwargs)
MM 229
230     def test_no_override_csrf_utility_registered(self):
231         request = testing.DummyRequest()
232         self._callFUT(request)
233
234     def test_success(self):
235         self.config.set_csrf_storage_policy(DummyCSRF())
236         request = testing.DummyRequest()
237
238         csrf_token = self._callFUT(request)
239
240         self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b')
241
313c25 242
JC 243 class Test_check_csrf_token(unittest.TestCase):
244     def setUp(self):
245         self.config = testing.setUp()
246
4b3603 247         # set up CSRF
313c25 248         self.config.set_default_csrf_options(require_csrf=False)
JC 249
250     def _callFUT(self, *args, **kwargs):
dd3cc8 251         from pyramid.csrf import check_csrf_token
0c29cf 252
313c25 253         return check_csrf_token(*args, **kwargs)
JC 254
255     def test_success_token(self):
256         request = testing.DummyRequest()
257         request.method = "POST"
258         request.POST = {'csrf_token': request.session.get_csrf_token()}
259         self.assertEqual(self._callFUT(request, token='csrf_token'), True)
260
261     def test_success_header(self):
262         request = testing.DummyRequest()
263         request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
264         self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True)
265
266     def test_success_default_token(self):
267         request = testing.DummyRequest()
268         request.method = "POST"
269         request.POST = {'csrf_token': request.session.get_csrf_token()}
270         self.assertEqual(self._callFUT(request), True)
271
272     def test_success_default_header(self):
273         request = testing.DummyRequest()
274         request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
275         self.assertEqual(self._callFUT(request), True)
276
277     def test_failure_raises(self):
278         from pyramid.exceptions import BadCSRFToken
0c29cf 279
313c25 280         request = testing.DummyRequest()
0c29cf 281         self.assertRaises(BadCSRFToken, self._callFUT, request, 'csrf_token')
313c25 282
JC 283     def test_failure_no_raises(self):
284         request = testing.DummyRequest()
285         result = self._callFUT(request, 'csrf_token', raises=False)
286         self.assertEqual(result, False)
287
288
f6d63a 289 class Test_check_csrf_token_without_defaults_configured(unittest.TestCase):
MW 290     def setUp(self):
291         self.config = testing.setUp()
292
293     def _callFUT(self, *args, **kwargs):
dd3cc8 294         from pyramid.csrf import check_csrf_token
0c29cf 295
f6d63a 296         return check_csrf_token(*args, **kwargs)
MW 297
298     def test_success_token(self):
299         request = testing.DummyRequest()
300         request.method = "POST"
301         request.POST = {'csrf_token': request.session.get_csrf_token()}
302         self.assertEqual(self._callFUT(request, token='csrf_token'), True)
303
304     def test_failure_raises(self):
305         from pyramid.exceptions import BadCSRFToken
0c29cf 306
f6d63a 307         request = testing.DummyRequest()
0c29cf 308         self.assertRaises(BadCSRFToken, self._callFUT, request, 'csrf_token')
f6d63a 309
MW 310     def test_failure_no_raises(self):
311         request = testing.DummyRequest()
312         result = self._callFUT(request, 'csrf_token', raises=False)
313         self.assertEqual(result, False)
314
315
313c25 316 class Test_check_csrf_origin(unittest.TestCase):
JC 317     def _callFUT(self, *args, **kwargs):
dd3cc8 318         from pyramid.csrf import check_csrf_origin
0c29cf 319
313c25 320         return check_csrf_origin(*args, **kwargs)
JC 321
322     def test_success_with_http(self):
323         request = testing.DummyRequest()
324         request.scheme = "http"
325         self.assertTrue(self._callFUT(request))
326
327     def test_success_with_https_and_referrer(self):
328         request = testing.DummyRequest()
329         request.scheme = "https"
330         request.host = "example.com"
331         request.host_port = "443"
332         request.referrer = "https://example.com/login/"
333         request.registry.settings = {}
334         self.assertTrue(self._callFUT(request))
335
336     def test_success_with_https_and_origin(self):
337         request = testing.DummyRequest()
338         request.scheme = "https"
339         request.host = "example.com"
340         request.host_port = "443"
341         request.headers = {"Origin": "https://example.com/"}
342         request.referrer = "https://not-example.com/"
343         request.registry.settings = {}
344         self.assertTrue(self._callFUT(request))
345
346     def test_success_with_additional_trusted_host(self):
347         request = testing.DummyRequest()
348         request.scheme = "https"
349         request.host = "example.com"
350         request.host_port = "443"
351         request.referrer = "https://not-example.com/login/"
352         request.registry.settings = {
0c29cf 353             "pyramid.csrf_trusted_origins": ["not-example.com"]
313c25 354         }
JC 355         self.assertTrue(self._callFUT(request))
356
357     def test_success_with_nonstandard_port(self):
358         request = testing.DummyRequest()
359         request.scheme = "https"
360         request.host = "example.com:8080"
361         request.host_port = "8080"
362         request.referrer = "https://example.com:8080/login/"
363         request.registry.settings = {}
364         self.assertTrue(self._callFUT(request))
365
366     def test_fails_with_wrong_host(self):
367         from pyramid.exceptions import BadCSRFOrigin
0c29cf 368
313c25 369         request = testing.DummyRequest()
JC 370         request.scheme = "https"
371         request.host = "example.com"
372         request.host_port = "443"
373         request.referrer = "https://not-example.com/login/"
374         request.registry.settings = {}
375         self.assertRaises(BadCSRFOrigin, self._callFUT, request)
376         self.assertFalse(self._callFUT(request, raises=False))
377
378     def test_fails_with_no_origin(self):
379         from pyramid.exceptions import BadCSRFOrigin
0c29cf 380
313c25 381         request = testing.DummyRequest()
JC 382         request.scheme = "https"
383         request.referrer = None
384         self.assertRaises(BadCSRFOrigin, self._callFUT, request)
385         self.assertFalse(self._callFUT(request, raises=False))
386
387     def test_fails_when_http_to_https(self):
388         from pyramid.exceptions import BadCSRFOrigin
0c29cf 389
313c25 390         request = testing.DummyRequest()
JC 391         request.scheme = "https"
392         request.host = "example.com"
393         request.host_port = "443"
394         request.referrer = "http://example.com/evil/"
395         request.registry.settings = {}
396         self.assertRaises(BadCSRFOrigin, self._callFUT, request)
397         self.assertFalse(self._callFUT(request, raises=False))
398
399     def test_fails_with_nonstandard_port(self):
400         from pyramid.exceptions import BadCSRFOrigin
0c29cf 401
313c25 402         request = testing.DummyRequest()
JC 403         request.scheme = "https"
404         request.host = "example.com:8080"
405         request.host_port = "8080"
406         request.referrer = "https://example.com/login/"
407         request.registry.settings = {}
408         self.assertRaises(BadCSRFOrigin, self._callFUT, request)
409         self.assertFalse(self._callFUT(request, raises=False))
410
a2c7c7 411
MW 412 class DummyRequest(object):
413     registry = None
414     session = None
682a9b 415     response_callback = None
a2c7c7 416
682a9b 417     def __init__(self, registry=None, session=None):
a2c7c7 418         self.registry = registry
MW 419         self.session = session
682a9b 420         self.cookies = {}
a2c7c7 421
MW 422     def add_response_callback(self, callback):
682a9b 423         self.response_callback = callback
a2c7c7 424
MW 425
426 class MockResponse(object):
427     def __init__(self):
7c0f09 428         self.headerlist = []
313c25 429
JC 430
431 class DummyCSRF(object):
432     def new_csrf_token(self, request):
433         return 'e5e9e30a08b34ff9842ff7d2b958c14b'
434
435     def get_csrf_token(self, request):
436         return '02821185e4c94269bdc38e6eeae0a2f8'