Michael Merickel
2016-04-16 4a4d4b90d108f545000666080b873363386d3ac9
Merge pull request #2501 from dstufft/check-origin-csrf

In addition to CSRF token, verify the origin too
14 files modified
340 ■■■■■ changed files
CHANGES.txt 12 ●●●●● patch | view | raw | blame | history
docs/api/exceptions.rst 2 ●●●●● patch | view | raw | blame | history
docs/api/session.rst 2 ●●●●● patch | view | raw | blame | history
docs/narr/sessions.rst 8 ●●●●● patch | view | raw | blame | history
docs/narr/viewconfig.rst 3 ●●●●● patch | view | raw | blame | history
pyramid/config/settings.py 4 ●●●● patch | view | raw | blame | history
pyramid/exceptions.py 15 ●●●●● patch | view | raw | blame | history
pyramid/session.py 103 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_config/test_views.py 3 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_session.py 93 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_util.py 21 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_viewderivers.py 51 ●●●●● patch | view | raw | blame | history
pyramid/util.py 17 ●●●●● patch | view | raw | blame | history
pyramid/viewderivers.py 6 ●●●● patch | view | raw | blame | history
CHANGES.txt
@@ -35,6 +35,18 @@
  https://github.com/Pylons/pyramid/pull/2413 and
  https://github.com/Pylons/pyramid/pull/2500
- Added an additional CSRF validation that checks the origin/referrer of a
  request and makes sure it matches the current ``request.domain``. This
  particular check is only active when accessing a site over HTTPS as otherwise
  browsers don't always send the required information. If this additional CSRF
  validation fails a ``BadCSRFOrigin`` exception will be raised and may be
  caught by exception views (the default response is ``400 Bad Request``).
  Additional allowed origins may be configured by setting
  ``pyramid.csrf_trusted_origins`` to a list of domain names (with ports if on
  a non standard port) to allow. Subdomains are not allowed unless the domain
  name has been prefixed with a ``.``. See:
  https://github.com/Pylons/pyramid/pull/2501
- Pyramid HTTPExceptions will now take into account the best match for the
  clients Accept header, and depending on what is requested will return
  text/html, application/json or text/plain. The default for */* is still
docs/api/exceptions.rst
@@ -5,6 +5,8 @@
.. automodule:: pyramid.exceptions
  .. autoexception:: BadCSRFOrigin
  .. autoexception:: BadCSRFToken
  .. autoexception:: PredicateMismatch
docs/api/session.rst
@@ -9,6 +9,8 @@
  .. autofunction:: signed_deserialize
  .. autofunction:: check_csrf_origin
  .. autofunction:: check_csrf_token
  .. autofunction:: SignedCookieSessionFactory
docs/narr/sessions.rst
@@ -437,6 +437,14 @@
There is currently no way to define an alternate name for this header without
performing CSRF checking manually.
In addition to token based CSRF checks, the automatic CSRF checking will also
check the referrer of the request to ensure that it matches one of the trusted
origins. By default the only trusted origin is the current host, however
additional origins may be configured by setting
``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they
are non standard). If a host in the list of domains starts with a ``.`` then
that will allow all subdomains as well as the domain without the ``.``.
If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` exception
will be raised. This exception may be caught and handled by an
:term:`exception view` but, by default, will result in a ``400 Bad Request``
docs/narr/viewconfig.rst
@@ -215,6 +215,9 @@
  If this option is set to ``False`` then CSRF checks will be disabled
  regardless of the ``pyramid.require_default_csrf`` setting.
  In addition, if this option is set to ``True`` or a string then CSRF origin
  checking will be enabled.
  See :ref:`auto_csrf_checking` for more information.
  .. versionadded:: 1.7
pyramid/config/settings.py
@@ -124,6 +124,8 @@
                                             config_prevent_cachebust))
        require_default_csrf = self.get('pyramid.require_default_csrf')
        eff_require_default_csrf = require_default_csrf
        csrf_trusted_origins = self.get("pyramid.csrf_trusted_origins", [])
        eff_csrf_trusted_origins = csrf_trusted_origins
        update = {
            'debug_authorization': eff_debug_all or eff_debug_auth,
@@ -137,6 +139,7 @@
            'prevent_http_cache':eff_prevent_http_cache,
            'prevent_cachebust':eff_prevent_cachebust,
            'require_default_csrf':eff_require_default_csrf,
            'csrf_trusted_origins':eff_csrf_trusted_origins,
            'pyramid.debug_authorization': eff_debug_all or eff_debug_auth,
            'pyramid.debug_notfound': eff_debug_all or eff_debug_notfound,
@@ -149,6 +152,7 @@
            'pyramid.prevent_http_cache':eff_prevent_http_cache,
            'pyramid.prevent_cachebust':eff_prevent_cachebust,
            'pyramid.require_default_csrf':eff_require_default_csrf,
            'pyramid.csrf_trusted_origins':eff_csrf_trusted_origins,
        }
        self.update(update)
pyramid/exceptions.py
@@ -9,6 +9,21 @@
CR = '\n'
class BadCSRFOrigin(HTTPBadRequest):
    """
    This exception indicates the request has failed cross-site request forgery
    origin validation.
    """
    title = "Bad CSRF Origin"
    explanation = (
        "Access is denied. This server can not verify that the origin or "
        "referrer of your request matches the current site. Either your "
        "browser supplied the wrong Origin or Referrer or it did not supply "
        "one at all."
    )
class BadCSRFToken(HTTPBadRequest):
    """
    This exception indicates the request has failed cross-site request
pyramid/session.py
@@ -16,11 +16,19 @@
    text_,
    bytes_,
    native_,
    urlparse,
    )
from pyramid.exceptions import BadCSRFToken
from pyramid.exceptions import (
    BadCSRFOrigin,
    BadCSRFToken,
)
from pyramid.interfaces import ISession
from pyramid.util import strings_differ
from pyramid.settings import aslist
from pyramid.util import (
    is_same_domain,
    strings_differ,
)
def manage_accessed(wrapped):
    """ Decorator which causes a cookie to be renewed when an accessor
@@ -101,6 +109,97 @@
    return pickle.loads(pickled)
def check_csrf_origin(request, trusted_origins=None, raises=True):
    """
    Check the Origin of the request to see if it is a cross site request or
    not.
    If the value supplied by the Origin or Referer header isn't one of the
    trusted origins and ``raises`` is ``True``, this function will raise a
    :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is
    ``False`` this function will return ``False`` instead. If the CSRF origin
    checks are successful this function will return ``True`` unconditionally.
    Additional trusted origins may be added by passing a list of domain (and
    ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in
    with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None``
    (the default) this list of additional domains will be pulled from the
    ``pyramid.csrf_trusted_origins`` setting.
    Note that this function will do nothing if request.scheme is not https.
    .. versionadded:: 1.7
    """
    def _fail(reason):
        if raises:
            raise BadCSRFOrigin(reason)
        else:
            return False
    if request.scheme == "https":
        # Suppose user visits http://example.com/
        # An active network attacker (man-in-the-middle, MITM) sends a
        # POST form that targets https://example.com/detonate-bomb/ and
        # submits it via JavaScript.
        #
        # The attacker will need to provide a CSRF cookie and token, but
        # that's no problem for a MITM when we cannot make any assumptions
        # about what kind of session storage is being used. So the MITM can
        # circumvent the CSRF protection. This is true for any HTTP connection,
        # but anyone using HTTPS expects better! For this reason, for
        # https://example.com/ we need additional protection that treats
        # http://example.com/ as completely untrusted. Under HTTPS,
        # Barth et al. found that the Referer header is missing for
        # same-domain requests in only about 0.2% of cases or less, so
        # we can use strict Referer checking.
        # Determine the origin of this request
        origin = request.headers.get("Origin")
        if origin is None:
            origin = request.referrer
        # Fail if we were not able to locate an origin at all
        if not origin:
            return _fail("Origin checking failed - no Origin or Referer.")
        # Parse our origin so we we can extract the required information from
        # it.
        originp = urlparse.urlparse(origin)
        # Ensure that our Referer is also secure.
        if originp.scheme != "https":
            return _fail(
                "Referer checking failed - Referer is insecure while host is "
                "secure."
            )
        # Determine which origins we trust, which by default will include the
        # current origin.
        if trusted_origins is None:
            trusted_origins = aslist(
                request.registry.settings.get(
                    "pyramid.csrf_trusted_origins", [])
            )
        if request.host_port not in {80, 443}:
            trusted_origins.append("{0.domain}:{0.host_port}".format(request))
        else:
            trusted_origins.append(request.domain)
        # Actually check to see if the request's origin matches any of our
        # trusted origins.
        if not any(is_same_domain(originp.netloc, host)
                   for host in trusted_origins):
            reason = (
                "Referer checking failed - {} does not match any trusted "
                "origins."
            )
            return _fail(reason.format(origin))
    return True
def check_csrf_token(request,
                     token='csrf_token',
                     header='X-CSRF-Token',
pyramid/tests/test_config/test_views.py
@@ -1595,6 +1595,7 @@
        config.add_view(view, require_csrf='st', renderer=null_renderer)
        view = self._getViewCallable(config)
        request = self._makeRequest(config)
        request.scheme = "http"
        request.method = 'POST'
        request.POST = {'st': 'foo'}
        request.headers = {}
@@ -1609,6 +1610,7 @@
        config.add_view(view, require_csrf=True, renderer=null_renderer)
        view = self._getViewCallable(config)
        request = self._makeRequest(config)
        request.scheme = "http"
        request.method = 'POST'
        request.POST = {}
        request.headers = {'X-CSRF-Token': 'foo'}
@@ -1623,6 +1625,7 @@
        config.add_view(view, require_csrf=True, renderer=null_renderer)
        view = self._getViewCallable(config)
        request = self._makeRequest(config)
        request.scheme = "http"
        request.method = 'POST'
        request.POST = {}
        request.headers = {}
pyramid/tests/test_session.py
@@ -705,6 +705,99 @@
        request.POST = {'csrf_token': b'foo'}
        self.assertEqual(self._callFUT(request, token='csrf_token'), True)
class Test_check_csrf_origin(unittest.TestCase):
    def _callFUT(self, *args, **kwargs):
        from ..session import check_csrf_origin
        return check_csrf_origin(*args, **kwargs)
    def test_success_with_http(self):
        request = testing.DummyRequest()
        request.scheme = "http"
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_referrer(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = 443
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_origin(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = 443
        request.headers = {"Origin": "https://example.com/"}
        request.referrer = "https://not-example.com/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_additional_trusted_host(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = 443
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {
            "pyramid.csrf_trusted_origins": ["not-example.com"],
        }
        self.assertTrue(self._callFUT(request))
    def test_success_with_nonstandard_port(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = 8080
        request.referrer = "https://example.com:8080/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_fails_with_wrong_host(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = 443
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_no_origin(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.referrer = None
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_when_http_to_https(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = 443
        request.referrer = "http://example.com/evil/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_nonstandard_port(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = 8080
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
class DummySerializer(object):
    def dumps(self, value):
        return base64.b64encode(json.dumps(value).encode('utf-8'))
pyramid/tests/test_util.py
@@ -856,3 +856,24 @@
class Dummy(object):
    pass
class Test_is_same_domain(unittest.TestCase):
    def _callFUT(self, *args, **kw):
        from pyramid.util import is_same_domain
        return is_same_domain(*args, **kw)
    def test_it(self):
        self.assertTrue(self._callFUT("example.com", "example.com"))
        self.assertFalse(self._callFUT("evil.com", "example.com"))
        self.assertFalse(self._callFUT("evil.example.com", "example.com"))
        self.assertFalse(self._callFUT("example.com", ""))
    def test_with_wildcard(self):
        self.assertTrue(self._callFUT("example.com", ".example.com"))
        self.assertTrue(self._callFUT("good.example.com", ".example.com"))
    def test_with_port(self):
        self.assertTrue(self._callFUT("example.com:8080", "example.com:8080"))
        self.assertFalse(self._callFUT("example.com:8080", "example.com"))
        self.assertFalse(self._callFUT("example.com", "example.com:8080"))
pyramid/tests/test_viewderivers.py
@@ -1119,6 +1119,7 @@
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.POST = {}
        request.session = DummySession({'csrf_token': 'foo'})
@@ -1132,6 +1133,23 @@
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'foo'}
        view = self.config._derive_view(inner_view, require_csrf='DUMMY')
        result = view(None, request)
        self.assertTrue(result is response)
    def test_csrf_view_https_domain(self):
        response = DummyResponse()
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "https"
        request.domain = "example.com"
        request.host_port = 443
        request.referrer = "https://example.com/login/"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'foo'}
@@ -1153,6 +1171,7 @@
        from pyramid.exceptions import BadCSRFToken
        def inner_view(request): pass
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'bar'}
@@ -1163,6 +1182,7 @@
        from pyramid.exceptions import BadCSRFToken
        def inner_view(request): pass
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.POST = {}
        request.session = DummySession({'csrf_token': 'foo'})
@@ -1174,6 +1194,7 @@
        from pyramid.exceptions import BadCSRFToken
        def inner_view(request): pass
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'PUT'
        request.POST = {}
        request.session = DummySession({'csrf_token': 'foo'})
@@ -1181,11 +1202,38 @@
        view = self.config._derive_view(inner_view, require_csrf='DUMMY')
        self.assertRaises(BadCSRFToken, lambda: view(None, request))
    def test_csrf_view_fails_on_bad_referrer(self):
        from pyramid.exceptions import BadCSRFOrigin
        def inner_view(request): pass
        request = self._makeRequest()
        request.method = "POST"
        request.scheme = "https"
        request.host_port = 443
        request.domain = "example.com"
        request.referrer = "https://not-example.com/evil/"
        request.registry.settings = {}
        view = self.config._derive_view(inner_view, require_csrf='DUMMY')
        self.assertRaises(BadCSRFOrigin, lambda: view(None, request))
    def test_csrf_view_fails_on_bad_origin(self):
        from pyramid.exceptions import BadCSRFOrigin
        def inner_view(request): pass
        request = self._makeRequest()
        request.method = "POST"
        request.scheme = "https"
        request.host_port = 443
        request.domain = "example.com"
        request.headers = {"Origin": "https://not-example.com/evil/"}
        request.registry.settings = {}
        view = self.config._derive_view(inner_view, require_csrf='DUMMY')
        self.assertRaises(BadCSRFOrigin, lambda: view(None, request))
    def test_csrf_view_uses_config_setting_truthy(self):
        response = DummyResponse()
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'csrf_token': 'foo'}
@@ -1199,6 +1247,7 @@
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'foo'}
@@ -1225,6 +1274,7 @@
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'foo'}
@@ -1238,6 +1288,7 @@
        def inner_view(request):
            return response
        request = self._makeRequest()
        request.scheme = "http"
        request.method = 'POST'
        request.session = DummySession({'csrf_token': 'foo'})
        request.POST = {'DUMMY': 'foo'}
pyramid/util.py
@@ -614,3 +614,20 @@
                obj_vals[name] = saved_val
            elif name in obj_vals:
                del obj_vals[name]
def is_same_domain(host, pattern):
    """
    Return ``True`` if the host is either an exact match or a match
    to the wildcard pattern.
    Any pattern beginning with a period matches a domain and all of its
    subdomains. (e.g. ``.example.com`` matches ``example.com`` and
    ``foo.example.com``). Anything else is an exact string match.
    """
    if not pattern:
        return False
    pattern = pattern.lower()
    return (pattern[0] == "." and
            (host.endswith(pattern) or host == pattern[1:]) or
            pattern == host)
pyramid/viewderivers.py
@@ -6,7 +6,10 @@
    )
from pyramid.security import NO_PERMISSION_REQUIRED
from pyramid.session import check_csrf_token
from pyramid.session import (
    check_csrf_origin,
    check_csrf_token,
)
from pyramid.response import Response
from pyramid.interfaces import (
@@ -491,6 +494,7 @@
            # Assume that anything not defined as 'safe' by RFC2616 needs
            # protection
            if request.method not in {"GET", "HEAD", "OPTIONS", "TRACE"}:
                check_csrf_origin(request, raises=True)
                check_csrf_token(request, val, raises=True)
            return view(context, request)
        wrapped_view = csrf_view