Merge pull request #2501 from dstufft/check-origin-csrf
In addition to CSRF token, verify the origin too
| | |
| | | https://github.com/Pylons/pyramid/pull/2413 and |
| | | https://github.com/Pylons/pyramid/pull/2500 |
| | | |
| | | - Added an additional CSRF validation that checks the origin/referrer of a |
| | | request and makes sure it matches the current ``request.domain``. This |
| | | particular check is only active when accessing a site over HTTPS as otherwise |
| | | browsers don't always send the required information. If this additional CSRF |
| | | validation fails a ``BadCSRFOrigin`` exception will be raised and may be |
| | | caught by exception views (the default response is ``400 Bad Request``). |
| | | Additional allowed origins may be configured by setting |
| | | ``pyramid.csrf_trusted_origins`` to a list of domain names (with ports if on |
| | | a non standard port) to allow. Subdomains are not allowed unless the domain |
| | | name has been prefixed with a ``.``. See: |
| | | https://github.com/Pylons/pyramid/pull/2501 |
| | | |
| | | - Pyramid HTTPExceptions will now take into account the best match for the |
| | | clients Accept header, and depending on what is requested will return |
| | | text/html, application/json or text/plain. The default for */* is still |
| | |
| | | |
| | | .. automodule:: pyramid.exceptions |
| | | |
| | | .. autoexception:: BadCSRFOrigin |
| | | |
| | | .. autoexception:: BadCSRFToken |
| | | |
| | | .. autoexception:: PredicateMismatch |
| | |
| | | |
| | | .. autofunction:: signed_deserialize |
| | | |
| | | .. autofunction:: check_csrf_origin |
| | | |
| | | .. autofunction:: check_csrf_token |
| | | |
| | | .. autofunction:: SignedCookieSessionFactory |
| | |
| | | There is currently no way to define an alternate name for this header without |
| | | performing CSRF checking manually. |
| | | |
| | | In addition to token based CSRF checks, the automatic CSRF checking will also |
| | | check the referrer of the request to ensure that it matches one of the trusted |
| | | origins. By default the only trusted origin is the current host, however |
| | | additional origins may be configured by setting |
| | | ``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they |
| | | are non standard). If a host in the list of domains starts with a ``.`` then |
| | | that will allow all subdomains as well as the domain without the ``.``. |
| | | |
| | | If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` exception |
| | | will be raised. This exception may be caught and handled by an |
| | | :term:`exception view` but, by default, will result in a ``400 Bad Request`` |
| | |
| | | If this option is set to ``False`` then CSRF checks will be disabled |
| | | regardless of the ``pyramid.require_default_csrf`` setting. |
| | | |
| | | In addition, if this option is set to ``True`` or a string then CSRF origin |
| | | checking will be enabled. |
| | | |
| | | See :ref:`auto_csrf_checking` for more information. |
| | | |
| | | .. versionadded:: 1.7 |
| | |
| | | config_prevent_cachebust)) |
| | | require_default_csrf = self.get('pyramid.require_default_csrf') |
| | | eff_require_default_csrf = require_default_csrf |
| | | csrf_trusted_origins = self.get("pyramid.csrf_trusted_origins", []) |
| | | eff_csrf_trusted_origins = csrf_trusted_origins |
| | | |
| | | update = { |
| | | 'debug_authorization': eff_debug_all or eff_debug_auth, |
| | |
| | | 'prevent_http_cache':eff_prevent_http_cache, |
| | | 'prevent_cachebust':eff_prevent_cachebust, |
| | | 'require_default_csrf':eff_require_default_csrf, |
| | | 'csrf_trusted_origins':eff_csrf_trusted_origins, |
| | | |
| | | 'pyramid.debug_authorization': eff_debug_all or eff_debug_auth, |
| | | 'pyramid.debug_notfound': eff_debug_all or eff_debug_notfound, |
| | |
| | | 'pyramid.prevent_http_cache':eff_prevent_http_cache, |
| | | 'pyramid.prevent_cachebust':eff_prevent_cachebust, |
| | | 'pyramid.require_default_csrf':eff_require_default_csrf, |
| | | 'pyramid.csrf_trusted_origins':eff_csrf_trusted_origins, |
| | | } |
| | | |
| | | self.update(update) |
| | |
| | | |
| | | CR = '\n' |
| | | |
| | | |
| | | class BadCSRFOrigin(HTTPBadRequest): |
| | | """ |
| | | This exception indicates the request has failed cross-site request forgery |
| | | origin validation. |
| | | """ |
| | | title = "Bad CSRF Origin" |
| | | explanation = ( |
| | | "Access is denied. This server can not verify that the origin or " |
| | | "referrer of your request matches the current site. Either your " |
| | | "browser supplied the wrong Origin or Referrer or it did not supply " |
| | | "one at all." |
| | | ) |
| | | |
| | | |
| | | class BadCSRFToken(HTTPBadRequest): |
| | | """ |
| | | This exception indicates the request has failed cross-site request |
| | |
| | | text_, |
| | | bytes_, |
| | | native_, |
| | | urlparse, |
| | | ) |
| | | |
| | | from pyramid.exceptions import BadCSRFToken |
| | | from pyramid.exceptions import ( |
| | | BadCSRFOrigin, |
| | | BadCSRFToken, |
| | | ) |
| | | from pyramid.interfaces import ISession |
| | | from pyramid.util import strings_differ |
| | | from pyramid.settings import aslist |
| | | from pyramid.util import ( |
| | | is_same_domain, |
| | | strings_differ, |
| | | ) |
| | | |
| | | def manage_accessed(wrapped): |
| | | """ Decorator which causes a cookie to be renewed when an accessor |
| | |
| | | |
| | | return pickle.loads(pickled) |
| | | |
| | | |
| | | def check_csrf_origin(request, trusted_origins=None, raises=True): |
| | | """ |
| | | Check the Origin of the request to see if it is a cross site request or |
| | | not. |
| | | |
| | | If the value supplied by the Origin or Referer header isn't one of the |
| | | trusted origins and ``raises`` is ``True``, this function will raise a |
| | | :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is |
| | | ``False`` this function will return ``False`` instead. If the CSRF origin |
| | | checks are successful this function will return ``True`` unconditionally. |
| | | |
| | | Additional trusted origins may be added by passing a list of domain (and |
| | | ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in |
| | | with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` |
| | | (the default) this list of additional domains will be pulled from the |
| | | ``pyramid.csrf_trusted_origins`` setting. |
| | | |
| | | Note that this function will do nothing if request.scheme is not https. |
| | | |
| | | .. versionadded:: 1.7 |
| | | """ |
| | | def _fail(reason): |
| | | if raises: |
| | | raise BadCSRFOrigin(reason) |
| | | else: |
| | | return False |
| | | |
| | | if request.scheme == "https": |
| | | # Suppose user visits http://example.com/ |
| | | # An active network attacker (man-in-the-middle, MITM) sends a |
| | | # POST form that targets https://example.com/detonate-bomb/ and |
| | | # submits it via JavaScript. |
| | | # |
| | | # The attacker will need to provide a CSRF cookie and token, but |
| | | # that's no problem for a MITM when we cannot make any assumptions |
| | | # about what kind of session storage is being used. So the MITM can |
| | | # circumvent the CSRF protection. This is true for any HTTP connection, |
| | | # but anyone using HTTPS expects better! For this reason, for |
| | | # https://example.com/ we need additional protection that treats |
| | | # http://example.com/ as completely untrusted. Under HTTPS, |
| | | # Barth et al. found that the Referer header is missing for |
| | | # same-domain requests in only about 0.2% of cases or less, so |
| | | # we can use strict Referer checking. |
| | | |
| | | # Determine the origin of this request |
| | | origin = request.headers.get("Origin") |
| | | if origin is None: |
| | | origin = request.referrer |
| | | |
| | | # Fail if we were not able to locate an origin at all |
| | | if not origin: |
| | | return _fail("Origin checking failed - no Origin or Referer.") |
| | | |
| | | # Parse our origin so we we can extract the required information from |
| | | # it. |
| | | originp = urlparse.urlparse(origin) |
| | | |
| | | # Ensure that our Referer is also secure. |
| | | if originp.scheme != "https": |
| | | return _fail( |
| | | "Referer checking failed - Referer is insecure while host is " |
| | | "secure." |
| | | ) |
| | | |
| | | # Determine which origins we trust, which by default will include the |
| | | # current origin. |
| | | if trusted_origins is None: |
| | | trusted_origins = aslist( |
| | | request.registry.settings.get( |
| | | "pyramid.csrf_trusted_origins", []) |
| | | ) |
| | | |
| | | if request.host_port not in {80, 443}: |
| | | trusted_origins.append("{0.domain}:{0.host_port}".format(request)) |
| | | else: |
| | | trusted_origins.append(request.domain) |
| | | |
| | | # Actually check to see if the request's origin matches any of our |
| | | # trusted origins. |
| | | if not any(is_same_domain(originp.netloc, host) |
| | | for host in trusted_origins): |
| | | reason = ( |
| | | "Referer checking failed - {} does not match any trusted " |
| | | "origins." |
| | | ) |
| | | return _fail(reason.format(origin)) |
| | | |
| | | return True |
| | | |
| | | |
| | | def check_csrf_token(request, |
| | | token='csrf_token', |
| | | header='X-CSRF-Token', |
| | |
| | | config.add_view(view, require_csrf='st', renderer=null_renderer) |
| | | view = self._getViewCallable(config) |
| | | request = self._makeRequest(config) |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.POST = {'st': 'foo'} |
| | | request.headers = {} |
| | |
| | | config.add_view(view, require_csrf=True, renderer=null_renderer) |
| | | view = self._getViewCallable(config) |
| | | request = self._makeRequest(config) |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.POST = {} |
| | | request.headers = {'X-CSRF-Token': 'foo'} |
| | |
| | | config.add_view(view, require_csrf=True, renderer=null_renderer) |
| | | view = self._getViewCallable(config) |
| | | request = self._makeRequest(config) |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.POST = {} |
| | | request.headers = {} |
| | |
| | | request.POST = {'csrf_token': b'foo'} |
| | | self.assertEqual(self._callFUT(request, token='csrf_token'), True) |
| | | |
| | | |
| | | class Test_check_csrf_origin(unittest.TestCase): |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..session import check_csrf_origin |
| | | return check_csrf_origin(*args, **kwargs) |
| | | |
| | | def test_success_with_http(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "http" |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_referrer(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = 443 |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_origin(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = 443 |
| | | request.headers = {"Origin": "https://example.com/"} |
| | | request.referrer = "https://not-example.com/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_additional_trusted_host(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = 443 |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = { |
| | | "pyramid.csrf_trusted_origins": ["not-example.com"], |
| | | } |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_nonstandard_port(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = 8080 |
| | | request.referrer = "https://example.com:8080/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_fails_with_wrong_host(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = 443 |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_no_origin(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.referrer = None |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_when_http_to_https(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = 443 |
| | | request.referrer = "http://example.com/evil/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_nonstandard_port(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = 8080 |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | |
| | | class DummySerializer(object): |
| | | def dumps(self, value): |
| | | return base64.b64encode(json.dumps(value).encode('utf-8')) |
| | |
| | | |
| | | class Dummy(object): |
| | | pass |
| | | |
| | | |
| | | class Test_is_same_domain(unittest.TestCase): |
| | | def _callFUT(self, *args, **kw): |
| | | from pyramid.util import is_same_domain |
| | | return is_same_domain(*args, **kw) |
| | | |
| | | def test_it(self): |
| | | self.assertTrue(self._callFUT("example.com", "example.com")) |
| | | self.assertFalse(self._callFUT("evil.com", "example.com")) |
| | | self.assertFalse(self._callFUT("evil.example.com", "example.com")) |
| | | self.assertFalse(self._callFUT("example.com", "")) |
| | | |
| | | def test_with_wildcard(self): |
| | | self.assertTrue(self._callFUT("example.com", ".example.com")) |
| | | self.assertTrue(self._callFUT("good.example.com", ".example.com")) |
| | | |
| | | def test_with_port(self): |
| | | self.assertTrue(self._callFUT("example.com:8080", "example.com:8080")) |
| | | self.assertFalse(self._callFUT("example.com:8080", "example.com")) |
| | | self.assertFalse(self._callFUT("example.com", "example.com:8080")) |
| | |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.POST = {} |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'foo'} |
| | | view = self.config._derive_view(inner_view, require_csrf='DUMMY') |
| | | result = view(None, request) |
| | | self.assertTrue(result is response) |
| | | |
| | | def test_csrf_view_https_domain(self): |
| | | response = DummyResponse() |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "https" |
| | | request.domain = "example.com" |
| | | request.host_port = 443 |
| | | request.referrer = "https://example.com/login/" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'foo'} |
| | |
| | | from pyramid.exceptions import BadCSRFToken |
| | | def inner_view(request): pass |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'bar'} |
| | |
| | | from pyramid.exceptions import BadCSRFToken |
| | | def inner_view(request): pass |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.POST = {} |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | |
| | | from pyramid.exceptions import BadCSRFToken |
| | | def inner_view(request): pass |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'PUT' |
| | | request.POST = {} |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | |
| | | view = self.config._derive_view(inner_view, require_csrf='DUMMY') |
| | | self.assertRaises(BadCSRFToken, lambda: view(None, request)) |
| | | |
| | | def test_csrf_view_fails_on_bad_referrer(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | def inner_view(request): pass |
| | | request = self._makeRequest() |
| | | request.method = "POST" |
| | | request.scheme = "https" |
| | | request.host_port = 443 |
| | | request.domain = "example.com" |
| | | request.referrer = "https://not-example.com/evil/" |
| | | request.registry.settings = {} |
| | | view = self.config._derive_view(inner_view, require_csrf='DUMMY') |
| | | self.assertRaises(BadCSRFOrigin, lambda: view(None, request)) |
| | | |
| | | def test_csrf_view_fails_on_bad_origin(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | def inner_view(request): pass |
| | | request = self._makeRequest() |
| | | request.method = "POST" |
| | | request.scheme = "https" |
| | | request.host_port = 443 |
| | | request.domain = "example.com" |
| | | request.headers = {"Origin": "https://not-example.com/evil/"} |
| | | request.registry.settings = {} |
| | | view = self.config._derive_view(inner_view, require_csrf='DUMMY') |
| | | self.assertRaises(BadCSRFOrigin, lambda: view(None, request)) |
| | | |
| | | def test_csrf_view_uses_config_setting_truthy(self): |
| | | response = DummyResponse() |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'csrf_token': 'foo'} |
| | |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'foo'} |
| | |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'foo'} |
| | |
| | | def inner_view(request): |
| | | return response |
| | | request = self._makeRequest() |
| | | request.scheme = "http" |
| | | request.method = 'POST' |
| | | request.session = DummySession({'csrf_token': 'foo'}) |
| | | request.POST = {'DUMMY': 'foo'} |
| | |
| | | obj_vals[name] = saved_val |
| | | elif name in obj_vals: |
| | | del obj_vals[name] |
| | | |
| | | |
| | | def is_same_domain(host, pattern): |
| | | """ |
| | | Return ``True`` if the host is either an exact match or a match |
| | | to the wildcard pattern. |
| | | Any pattern beginning with a period matches a domain and all of its |
| | | subdomains. (e.g. ``.example.com`` matches ``example.com`` and |
| | | ``foo.example.com``). Anything else is an exact string match. |
| | | """ |
| | | if not pattern: |
| | | return False |
| | | |
| | | pattern = pattern.lower() |
| | | return (pattern[0] == "." and |
| | | (host.endswith(pattern) or host == pattern[1:]) or |
| | | pattern == host) |
| | |
| | | ) |
| | | |
| | | from pyramid.security import NO_PERMISSION_REQUIRED |
| | | from pyramid.session import check_csrf_token |
| | | from pyramid.session import ( |
| | | check_csrf_origin, |
| | | check_csrf_token, |
| | | ) |
| | | from pyramid.response import Response |
| | | |
| | | from pyramid.interfaces import ( |
| | |
| | | # Assume that anything not defined as 'safe' by RFC2616 needs |
| | | # protection |
| | | if request.method not in {"GET", "HEAD", "OPTIONS", "TRACE"}: |
| | | check_csrf_origin(request, raises=True) |
| | | check_csrf_token(request, val, raises=True) |
| | | return view(context, request) |
| | | wrapped_view = csrf_view |