Michael Merickel
2017-05-01 80973f86368ee9bc8f7d8c87d32207cec83d9310
Merge pull request #3019 from mmerickel/fixes/csrf-decoupling-2854

Decouple CSRF protection from the session machinery (replaced #2854)
3 files added
23 files modified
1619 ■■■■■ changed files
CHANGES.txt 12 ●●●●● patch | view | raw | blame | history
CONTRIBUTORS.txt 2 ●●●●● patch | view | raw | blame | history
docs/api/config.rst 1 ●●●● patch | view | raw | blame | history
docs/api/csrf.rst 23 ●●●●● patch | view | raw | blame | history
docs/api/interfaces.rst 3 ●●●●● patch | view | raw | blame | history
docs/api/session.rst 4 ●●●● patch | view | raw | blame | history
docs/glossary.rst 5 ●●●●● patch | view | raw | blame | history
docs/narr/security.rst 216 ●●●●● patch | view | raw | blame | history
docs/narr/sessions.rst 183 ●●●●● patch | view | raw | blame | history
docs/narr/templates.rst 4 ●●●● patch | view | raw | blame | history
pyramid/config/__init__.py 1 ●●●● patch | view | raw | blame | history
pyramid/config/security.py 29 ●●●●● patch | view | raw | blame | history
pyramid/config/views.py 20 ●●●●● patch | view | raw | blame | history
pyramid/csrf.py 332 ●●●●● patch | view | raw | blame | history
pyramid/interfaces.py 43 ●●●● patch | view | raw | blame | history
pyramid/predicates.py 2 ●●● patch | view | raw | blame | history
pyramid/renderers.py 5 ●●●● patch | view | raw | blame | history
pyramid/session.py 169 ●●●●● patch | view | raw | blame | history
pyramid/testing.py 1 ●●●● patch | view | raw | blame | history
pyramid/tests/test_config/test_views.py 3 ●●●● patch | view | raw | blame | history
pyramid/tests/test_csrf.py 406 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_renderers.py 8 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_session.py 138 ●●●●● patch | view | raw | blame | history
pyramid/tests/test_util.py 6 ●●●● patch | view | raw | blame | history
pyramid/tests/test_viewderivers.py 1 ●●●● patch | view | raw | blame | history
pyramid/viewderivers.py 2 ●●● patch | view | raw | blame | history
CHANGES.txt
@@ -24,6 +24,14 @@
  can be alleviated by invoking ``config.begin()`` and ``config.end()``
  appropriately. See https://github.com/Pylons/pyramid/pull/2989
- A new CSRF implementation, ``pyramid.csrf.SessionCSRFStoragePolicy``,
  has been added which delegates all CSRF generation to the current session,
  following the old API for this. A ``pyramid.csrf.get_csrf_token()`` api is now
  available in template global scope, to make it easy for template developers
  to get the current CSRF token without adding it to Python code.
  See https://github.com/Pylons/pyramid/pull/2854 and
  https://github.com/Pylons/pyramid/pull/3019
- The ``pyramid.config.Configurator`` can now be used as a context manager
  which will automatically push/pop threadlocals (similar to
  ``config.begin()`` and ``config.end()``). It will also automatically perform
@@ -56,3 +64,7 @@
Documentation Changes
---------------------
- Retrieving CSRF token from the session has been deprecated, in favor of
  equivalent methods in :mod:`pyramid.csrf`.
  See https://github.com/Pylons/pyramid/pull/2854
CONTRIBUTORS.txt
@@ -291,6 +291,8 @@
- Mikko Ohtamaa, 2016/12/6
- Jure Cerjak, 2016/12/7
- Martin Frlin, 2016/12/7
- Kirill Kuzminykh, 2017/03/01
docs/api/config.rst
@@ -37,6 +37,7 @@
     .. automethod:: set_authentication_policy
     .. automethod:: set_authorization_policy
     .. automethod:: set_default_csrf_options
     .. automethod:: set_csrf_storage_policy
     .. automethod:: set_default_permission
     .. automethod:: add_permission
docs/api/csrf.rst
New file
@@ -0,0 +1,23 @@
.. _csrf_module:
:mod:`pyramid.csrf`
-------------------
.. automodule:: pyramid.csrf
  .. autoclass:: LegacySessionCSRFStoragePolicy
     :members:
  .. autoclass:: SessionCSRFStoragePolicy
     :members:
  .. autoclass:: CookieCSRFStoragePolicy
     :members:
  .. autofunction:: get_csrf_token
  .. autofunction:: new_csrf_token
  .. autofunction:: check_csrf_origin
  .. autofunction:: check_csrf_token
docs/api/interfaces.rst
@@ -44,6 +44,9 @@
  .. autointerface:: IRoutePregenerator
     :members:
  .. autointerface:: ICSRFStoragePolicy
     :members:
  .. autointerface:: ISession
     :members:
docs/api/session.rst
@@ -9,10 +9,6 @@
  .. autofunction:: signed_deserialize
  .. autofunction:: check_csrf_origin
  .. autofunction:: check_csrf_token
  .. autofunction:: SignedCookieSessionFactory
  .. autofunction:: UnencryptedCookieSessionFactoryConfig
docs/glossary.rst
@@ -891,6 +891,11 @@
      :meth:`pyramid.config.Configurator.set_session_factory` for more
      information.
   CSRF storage policy
      A utility that implements :class:`pyramid.interfaces.ICSRFStoragePolicy`
      which is responsible for allocating CSRF tokens to a user and verifying
      that a provided token is acceptable.
   Mako
     `Mako <http://www.makotemplates.org/>`_ is a template language
     which refines the familiar ideas of componentized layout and inheritance
docs/narr/security.rst
@@ -146,7 +146,7 @@
   # config is an instance of pyramid.config.Configurator
   config.add_view('mypackage.views.blog_entry_add_view',
                   name='add_entry.html',
                   name='add_entry.html',
                   context='mypackage.resources.Blog',
                   permission='add')
@@ -725,7 +725,7 @@
            """ Return ``True`` if any of the ``principals`` is allowed the
            ``permission`` in the current ``context``, else return ``False``
            """
        def principals_allowed_by_permission(self, context, permission):
            """ Return a set of principal identifiers allowed by the
            ``permission`` in ``context``.  This behavior is optional; if you
@@ -765,3 +765,215 @@
a secret across two different subsystems might drop the security of signing to
zero. Keys should not be re-used across different contexts where an attacker
has the possibility of providing a chosen plaintext.
.. index::
   single: preventing cross-site request forgery attacks
   single: cross-site request forgery attacks, prevention
Preventing Cross-Site Request Forgery Attacks
---------------------------------------------
`Cross-site request forgery
<https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a
phenomenon whereby a user who is logged in to your website might inadvertantly
load a URL because it is linked from, or embedded in, an attacker's website.
If the URL is one that may modify or delete data, the consequences can be dire.
You can avoid most of these attacks by issuing a unique token to the browser
and then requiring that it be present in all potentially unsafe requests.
:app:`Pyramid` provides facilities to create and check CSRF tokens.
By default :app:`Pyramid` comes with a session-based CSRF implementation
:class:`pyramid.csrf.SessionCSRFStoragePolicy`. To use it, you must first enable
a :term:`session factory` as described in
:ref:`using_the_default_session_factory` or
:ref:`using_alternate_session_factories`. Alternatively, you can use
a cookie-based implementation :class:`pyramid.csrf.CookieCSRFStoragePolicy` which gives
some additional flexibility as it does not require a session for each user.
You can also define your own implementation of
:class:`pyramid.interfaces.ICSRFStoragePolicy` and register it with the
:meth:`pyramid.config.Configurator.set_csrf_storage_policy` directive.
For example:
.. code-block:: python
   from pyramid.config import Configurator
   config = Configurator()
   config.set_csrf_storage_policy(MyCustomCSRFPolicy())
.. index::
   single: csrf.get_csrf_token
Using the ``csrf.get_csrf_token`` Method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To get the current CSRF token, use the
:data:`pyramid.csrf.get_csrf_token` method.
.. code-block:: python
   from pyramid.csrf import get_csrf_token
   token = get_csrf_token(request)
The ``get_csrf_token()`` method accepts a single argument: the request. It
returns a CSRF *token* string. If ``get_csrf_token()`` or ``new_csrf_token()``
was invoked previously for this user, then the existing token will be returned.
If no CSRF token previously existed for this user, then a new token will be set
into the session and returned. The newly created token will be opaque and
randomized.
.. _get_csrf_token_in_templates:
Using the ``get_csrf_token`` global in templates
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Templates have a ``get_csrf_token()`` method inserted into their globals, which
allows you to get the current token without modifying the view code. This
method takes no arguments and returns a CSRF token string. You can use the
returned token as the value of a hidden field in a form that posts to a method
that requires elevated privileges, or supply it as a request header in AJAX
requests.
For example, include the CSRF token as a hidden field:
.. code-block:: html
    <form method="post" action="/myview">
      <input type="hidden" name="csrf_token" value="${get_csrf_token()}">
      <input type="submit" value="Delete Everything">
    </form>
Or include it as a header in a jQuery AJAX request:
.. code-block:: javascript
    var csrfToken = "${get_csrf_token()}";
    $.ajax({
      type: "POST",
      url: "/myview",
      headers: { 'X-CSRF-Token': csrfToken }
    }).done(function() {
      alert("Deleted");
    });
The handler for the URL that receives the request should then require that the
correct CSRF token is supplied.
.. index::
   single: csrf.new_csrf_token
Using the ``csrf.new_csrf_token`` Method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To explicitly create a new CSRF token, use the ``csrf.new_csrf_token()``
method.  This differs only from ``csrf.get_csrf_token()`` inasmuch as it
clears any existing CSRF token, creates a new CSRF token, sets the token into
the user, and returns the token.
.. code-block:: python
   from pyramid.csrf import get_csrf_token
   token = new_csrf_token()
.. note::
    It is not possible to force a new CSRF token from a template. If you
    want to regenerate your CSRF token then do it in the view code and return
    the new token as part of the context.
Checking CSRF Tokens Manually
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In request handling code, you can check the presence and validity of a CSRF
token with :func:`pyramid.csrf.check_csrf_token`. If the token is valid, it
will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally,
you can specify ``raises=False`` to have the check return ``False`` instead of
raising an exception.
By default, it checks for a POST parameter named ``csrf_token`` or a header
named ``X-CSRF-Token``.
.. code-block:: python
   from pyramid.csrf import check_csrf_token
   def myview(request):
       # Require CSRF Token
       check_csrf_token(request)
       # ...
.. _auto_csrf_checking:
Checking CSRF Tokens Automatically
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. versionadded:: 1.7
:app:`Pyramid` supports automatically checking CSRF tokens on requests with an
unsafe method as defined by RFC2616. Any other request may be checked manually.
This feature can be turned on globally for an application using the
:meth:`pyramid.config.Configurator.set_default_csrf_options` directive.
For example:
.. code-block:: python
   from pyramid.config import Configurator
   config = Configurator()
   config.set_default_csrf_options(require_csrf=True)
CSRF checking may be explicitly enabled or disabled on a per-view basis using
the ``require_csrf`` view option. A value of ``True`` or ``False`` will
override the default set by ``set_default_csrf_options``. For example:
.. code-block:: python
   @view_config(route_name='hello', require_csrf=False)
   def myview(request):
       # ...
When CSRF checking is active, the token and header used to find the
supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively,
unless otherwise overridden by ``set_default_csrf_options``. The token is
checked against the value in ``request.POST`` which is the submitted form body.
If this value is not present, then the header will be checked.
In addition to token based CSRF checks, if the request is using HTTPS then the
automatic CSRF checking will also check the referrer of the request to ensure
that it matches one of the trusted origins. By default the only trusted origin
is the current host, however additional origins may be configured by setting
``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they
are non standard). If a host in the list of domains starts with a ``.`` then
that will allow all subdomains as well as the domain without the ``.``.
If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or
:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This
exception may be caught and handled by an :term:`exception view` but, by
default, will result in a ``400 Bad Request`` response being sent to the
client.
Checking CSRF Tokens with a View Predicate
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. deprecated:: 1.7
   Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead
   to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised.
A convenient way to require a valid CSRF token for a particular view is to
include ``check_csrf=True`` as a view predicate. See
:meth:`pyramid.config.Configurator.add_view`.
.. code-block:: python
    @view_config(request_method='POST', check_csrf=True, ...)
    def myview(request):
        ...
.. note::
   A mismatch of a CSRF token is treated like any other predicate miss, and the
   predicate system, when it doesn't find a view, raises ``HTTPNotFound``
   instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different
   from calling :func:`pyramid.csrf.check_csrf_token`.
docs/narr/sessions.rst
@@ -12,8 +12,7 @@
This chapter describes how to configure sessions, what session implementations
:app:`Pyramid` provides out of the box, how to store and retrieve data from
sessions, and two session-specific features: flash messages, and cross-site
request forgery attack prevention.
sessions, and a session-specific feature: flash messages.
.. index::
   single: session factory (default)
@@ -316,183 +315,3 @@
   ['info message']
   >>> request.session.peek_flash()
   []
.. index::
   single: preventing cross-site request forgery attacks
   single: cross-site request forgery attacks, prevention
Preventing Cross-Site Request Forgery Attacks
---------------------------------------------
`Cross-site request forgery
<https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a
phenomenon whereby a user who is logged in to your website might inadvertantly
load a URL because it is linked from, or embedded in, an attacker's website.
If the URL is one that may modify or delete data, the consequences can be dire.
You can avoid most of these attacks by issuing a unique token to the browser
and then requiring that it be present in all potentially unsafe requests.
:app:`Pyramid` sessions provide facilities to create and check CSRF tokens.
To use CSRF tokens, you must first enable a :term:`session factory` as
described in :ref:`using_the_default_session_factory` or
:ref:`using_alternate_session_factories`.
.. index::
   single: session.get_csrf_token
Using the ``session.get_csrf_token`` Method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To get the current CSRF token from the session, use the
``session.get_csrf_token()`` method.
.. code-block:: python
   token = request.session.get_csrf_token()
The ``session.get_csrf_token()`` method accepts no arguments.  It returns a
CSRF *token* string. If ``session.get_csrf_token()`` or
``session.new_csrf_token()`` was invoked previously for this session, then the
existing token will be returned.  If no CSRF token previously existed for this
session, then a new token will be set into the session and returned.  The newly
created token will be opaque and randomized.
You can use the returned token as the value of a hidden field in a form that
posts to a method that requires elevated privileges, or supply it as a request
header in AJAX requests.
For example, include the CSRF token as a hidden field:
.. code-block:: html
    <form method="post" action="/myview">
      <input type="hidden" name="csrf_token" value="${request.session.get_csrf_token()}">
      <input type="submit" value="Delete Everything">
    </form>
Or include it as a header in a jQuery AJAX request:
.. code-block:: javascript
    var csrfToken = ${request.session.get_csrf_token()};
    $.ajax({
      type: "POST",
      url: "/myview",
      headers: { 'X-CSRF-Token': csrfToken }
    }).done(function() {
      alert("Deleted");
    });
The handler for the URL that receives the request should then require that the
correct CSRF token is supplied.
.. index::
   single: session.new_csrf_token
Using the ``session.new_csrf_token`` Method
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
To explicitly create a new CSRF token, use the ``session.new_csrf_token()``
method.  This differs only from ``session.get_csrf_token()`` inasmuch as it
clears any existing CSRF token, creates a new CSRF token, sets the token into
the session, and returns the token.
.. code-block:: python
   token = request.session.new_csrf_token()
Checking CSRF Tokens Manually
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
In request handling code, you can check the presence and validity of a CSRF
token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it
will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally,
you can specify ``raises=False`` to have the check return ``False`` instead of
raising an exception.
By default, it checks for a POST parameter named ``csrf_token`` or a header
named ``X-CSRF-Token``.
.. code-block:: python
   from pyramid.session import check_csrf_token
   def myview(request):
       # Require CSRF Token
       check_csrf_token(request)
       # ...
.. _auto_csrf_checking:
Checking CSRF Tokens Automatically
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. versionadded:: 1.7
:app:`Pyramid` supports automatically checking CSRF tokens on requests with an
unsafe method as defined by RFC2616. Any other request may be checked manually.
This feature can be turned on globally for an application using the
:meth:`pyramid.config.Configurator.set_default_csrf_options` directive.
For example:
.. code-block:: python
   from pyramid.config import Configurator
   config = Configurator()
   config.set_default_csrf_options(require_csrf=True)
CSRF checking may be explicitly enabled or disabled on a per-view basis using
the ``require_csrf`` view option. A value of ``True`` or ``False`` will
override the default set by ``set_default_csrf_options``. For example:
.. code-block:: python
   @view_config(route_name='hello', require_csrf=False)
   def myview(request):
       # ...
When CSRF checking is active, the token and header used to find the
supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively,
unless otherwise overridden by ``set_default_csrf_options``. The token is
checked against the value in ``request.POST`` which is the submitted form body.
If this value is not present, then the header will be checked.
In addition to token based CSRF checks, if the request is using HTTPS then the
automatic CSRF checking will also check the referrer of the request to ensure
that it matches one of the trusted origins. By default the only trusted origin
is the current host, however additional origins may be configured by setting
``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they
are non standard). If a host in the list of domains starts with a ``.`` then
that will allow all subdomains as well as the domain without the ``.``.
If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or
:class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This
exception may be caught and handled by an :term:`exception view` but, by
default, will result in a ``400 Bad Request`` response being sent to the
client.
Checking CSRF Tokens with a View Predicate
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
.. deprecated:: 1.7
   Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead
   to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised.
A convenient way to require a valid CSRF token for a particular view is to
include ``check_csrf=True`` as a view predicate. See
:meth:`pyramid.config.Configurator.add_view`.
.. code-block:: python
    @view_config(request_method='POST', check_csrf=True, ...)
    def myview(request):
        ...
.. note::
   A mismatch of a CSRF token is treated like any other predicate miss, and the
   predicate system, when it doesn't find a view, raises ``HTTPNotFound``
   instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different
   from calling :func:`pyramid.session.check_csrf_token`.
docs/narr/templates.rst
@@ -228,6 +228,10 @@
  provided if the template is rendered as the result of a ``renderer=``
  argument to the view configuration being used.
``get_csrf_token()``
  A convenience function to access the current CSRF token. See
  :ref:`get_csrf_token_in_templates` for more information.
``renderer_name``
  The renderer name used to perform the rendering, e.g.,
  ``mypackage:templates/foo.pt``.
pyramid/config/__init__.py
@@ -396,6 +396,7 @@
        self.add_default_view_derivers()
        self.add_default_route_predicates()
        self.add_default_tweens()
        self.add_default_security()
        if exceptionresponse_view is not None:
            exceptionresponse_view = self.maybe_dotted(exceptionresponse_view)
pyramid/config/security.py
@@ -3,17 +3,24 @@
from pyramid.interfaces import (
    IAuthorizationPolicy,
    IAuthenticationPolicy,
    ICSRFStoragePolicy,
    IDefaultCSRFOptions,
    IDefaultPermission,
    PHASE1_CONFIG,
    PHASE2_CONFIG,
    )
from pyramid.csrf import LegacySessionCSRFStoragePolicy
from pyramid.exceptions import ConfigurationError
from pyramid.util import action_method
from pyramid.util import as_sorted_tuple
class SecurityConfiguratorMixin(object):
    def add_default_security(self):
        self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy())
    @action_method
    def set_authentication_policy(self, policy):
        """ Override the :app:`Pyramid` :term:`authentication policy` in the
@@ -223,9 +230,31 @@
        intr['header'] = header
        intr['safe_methods'] = as_sorted_tuple(safe_methods)
        intr['callback'] = callback
        self.action(IDefaultCSRFOptions, register, order=PHASE1_CONFIG,
                    introspectables=(intr,))
    @action_method
    def set_csrf_storage_policy(self, policy):
        """
        Set the :term:`CSRF storage policy` used by subsequent view
        registrations.
        ``policy`` is a class that implements the
        :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface and defines
        how to generate and persist CSRF tokens.
        """
        def register():
            self.registry.registerUtility(policy, ICSRFStoragePolicy)
        intr = self.introspectable('csrf storage policy',
                                   None,
                                   policy,
                                   'csrf storage policy')
        intr['policy'] = policy
        self.action(ICSRFStoragePolicy, register, introspectables=(intr,))
@implementer(IDefaultCSRFOptions)
class DefaultCSRFOptions(object):
    def __init__(self, require_csrf, token, header, safe_methods, callback):
pyramid/config/views.py
@@ -641,18 +641,22 @@
          'check name'.  If the value provided is ``True``, ``csrf_token`` will
          be used as the check name.
          If CSRF checking is performed, the checked value will be the value
          of ``request.params[check_name]``.  This value will be compared
          against the value of ``request.session.get_csrf_token()``, and the
          check will pass if these two values are the same.  If the check
          passes, the associated view will be permitted to execute.  If the
          If CSRF checking is performed, the checked value will be the value of
          ``request.params[check_name]``. This value will be compared against
          the value of ``policy.get_csrf_token()`` (where ``policy`` is an
          implementation of :meth:`pyramid.interfaces.ICSRFStoragePolicy`), and the
          check will pass if these two values are the same. If the check
          passes, the associated view will be permitted to execute. If the
          check fails, the associated view will not be permitted to execute.
          Note that using this feature requires a :term:`session factory` to
          have been configured.
          .. versionadded:: 1.4a2
          .. versionchanged:: 1.9
            This feature requires either a :term:`session factory` to have been
            configured, or a :term:`CSRF storage policy` other than the default
            to be in use.
        physical_path
          If specified, this value should be a string or a tuple representing
pyramid/csrf.py
New file
@@ -0,0 +1,332 @@
import uuid
from webob.cookies import CookieProfile
from zope.interface import implementer
from pyramid.authentication import _SimpleSerializer
from pyramid.compat import (
    bytes_,
    urlparse,
    text_,
)
from pyramid.exceptions import (
    BadCSRFOrigin,
    BadCSRFToken,
)
from pyramid.interfaces import ICSRFStoragePolicy
from pyramid.settings import aslist
from pyramid.util import (
    is_same_domain,
    strings_differ
)
@implementer(ICSRFStoragePolicy)
class LegacySessionCSRFStoragePolicy(object):
    """ A CSRF storage policy that defers control of CSRF storage to the
    session.
    This policy maintains compatibility with legacy ISession implementations
    that know how to manage CSRF tokens themselves via
    ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``.
    Note that using this CSRF implementation requires that
    a :term:`session factory` is configured.
    .. versionadded:: 1.9
    """
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the session and returns it. """
        return request.session.new_csrf_token()
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token from the session,
        generating a new one if needed."""
        return request.session.get_csrf_token()
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token))
@implementer(ICSRFStoragePolicy)
class SessionCSRFStoragePolicy(object):
    """ A CSRF storage policy that persists the CSRF token in the session.
    Note that using this CSRF implementation requires that
    a :term:`session factory` is configured.
    ``key``
        The session key where the CSRF token will be stored.
        Default: `_csrft_`.
    .. versionadded:: 1.9
    """
    _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
    def __init__(self, key='_csrft_'):
        self.key = key
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the session and returns it. """
        token = self._token_factory()
        request.session[self.key] = token
        return token
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token from the session,
        generating a new one if needed."""
        token = request.session.get(self.key, None)
        if not token:
            token = self.new_csrf_token(request)
        return token
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token))
@implementer(ICSRFStoragePolicy)
class CookieCSRFStoragePolicy(object):
    """ An alternative CSRF implementation that stores its information in
    unauthenticated cookies, known as the 'Double Submit Cookie' method in the
    `OWASP CSRF guidelines <https://www.owasp.org/index.php/
    Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#
    Double_Submit_Cookie>`_. This gives some additional flexibility with
    regards to scaling as the tokens can be generated and verified by a
    front-end server.
    .. versionadded:: 1.9
    """
    _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
    def __init__(self, cookie_name='csrf_token', secure=False, httponly=False,
                 domain=None, max_age=None, path='/'):
        serializer = _SimpleSerializer()
        self.cookie_profile = CookieProfile(
            cookie_name=cookie_name,
            secure=secure,
            max_age=max_age,
            httponly=httponly,
            path=path,
            domains=[domain],
            serializer=serializer
        )
        self.cookie_name = cookie_name
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the request and returns it. """
        token = self._token_factory()
        request.cookies[self.cookie_name] = token
        def set_cookie(request, response):
            self.cookie_profile.set_cookies(
                response,
                token,
            )
        request.add_response_callback(set_cookie)
        return token
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token by checking the cookies
        sent with the current request."""
        bound_cookies = self.cookie_profile.bind(request)
        token = bound_cookies.get_value()
        if not token:
            token = self.new_csrf_token(request)
        return token
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token))
def get_csrf_token(request):
    """ Get the currently active CSRF token for the request passed, generating
    a new one using ``new_csrf_token(request)`` if one does not exist. This
    calls the equivalent method in the chosen CSRF protection implementation.
    .. versionadded :: 1.9
    """
    registry = request.registry
    csrf = registry.getUtility(ICSRFStoragePolicy)
    return csrf.get_csrf_token(request)
def new_csrf_token(request):
    """ Generate a new CSRF token for the request passed and persist it in an
    implementation defined manner. This calls the equivalent method in the
    chosen CSRF protection implementation.
    .. versionadded :: 1.9
    """
    registry = request.registry
    csrf = registry.getUtility(ICSRFStoragePolicy)
    return csrf.new_csrf_token(request)
def check_csrf_token(request,
                     token='csrf_token',
                     header='X-CSRF-Token',
                     raises=True):
    """ Check the CSRF token returned by the
    :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the
    value in ``request.POST.get(token)`` (if a POST request) or
    ``request.headers.get(header)``. If a ``token`` keyword is not supplied to
    this function, the string ``csrf_token`` will be used to look up the token
    in ``request.POST``. If a ``header`` keyword is not supplied to this
    function, the string ``X-CSRF-Token`` will be used to look up the token in
    ``request.headers``.
    If the value supplied by post or by header cannot be verified by the
    :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is
    ``True``, this function will raise an
    :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ
    and ``raises`` is ``False``, this function will return ``False``.  If the
    CSRF check is successful, this function will return ``True``
    unconditionally.
    See :ref:`auto_csrf_checking` for information about how to secure your
    application automatically against CSRF attacks.
    .. versionadded:: 1.4a2
    .. versionchanged:: 1.7a1
       A CSRF token passed in the query string of the request is no longer
       considered valid. It must be passed in either the request body or
       a header.
    .. versionchanged:: 1.9
       Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated
       to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to
       verify the CSRF token.
    """
    supplied_token = ""
    # We first check the headers for a csrf token, as that is significantly
    # cheaper than checking the POST body
    if header is not None:
        supplied_token = request.headers.get(header, "")
    # If this is a POST/PUT/etc request, then we'll check the body to see if it
    # has a token. We explicitly use request.POST here because CSRF tokens
    # should never appear in an URL as doing so is a security issue. We also
    # explicitly check for request.POST here as we do not support sending form
    # encoded data over anything but a request.POST.
    if supplied_token == "" and token is not None:
        supplied_token = request.POST.get(token, "")
    policy = request.registry.getUtility(ICSRFStoragePolicy)
    if not policy.check_csrf_token(request, text_(supplied_token)):
        if raises:
            raise BadCSRFToken('check_csrf_token(): Invalid token')
        return False
    return True
def check_csrf_origin(request, trusted_origins=None, raises=True):
    """
    Check the ``Origin`` of the request to see if it is a cross site request or
    not.
    If the value supplied by the ``Origin`` or ``Referer`` header isn't one of the
    trusted origins and ``raises`` is ``True``, this function will raise a
    :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is
    ``False``, this function will return ``False`` instead. If the CSRF origin
    checks are successful this function will return ``True`` unconditionally.
    Additional trusted origins may be added by passing a list of domain (and
    ports if nonstandard like ``['example.com', 'dev.example.com:8080']``) in
    with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None``
    (the default) this list of additional domains will be pulled from the
    ``pyramid.csrf_trusted_origins`` setting.
    Note that this function will do nothing if ``request.scheme`` is not
    ``https``.
    .. versionadded:: 1.7
    .. versionchanged:: 1.9
       Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf`
    """
    def _fail(reason):
        if raises:
            raise BadCSRFOrigin(reason)
        else:
            return False
    if request.scheme == "https":
        # Suppose user visits http://example.com/
        # An active network attacker (man-in-the-middle, MITM) sends a
        # POST form that targets https://example.com/detonate-bomb/ and
        # submits it via JavaScript.
        #
        # The attacker will need to provide a CSRF cookie and token, but
        # that's no problem for a MITM when we cannot make any assumptions
        # about what kind of session storage is being used. So the MITM can
        # circumvent the CSRF protection. This is true for any HTTP connection,
        # but anyone using HTTPS expects better! For this reason, for
        # https://example.com/ we need additional protection that treats
        # http://example.com/ as completely untrusted. Under HTTPS,
        # Barth et al. found that the Referer header is missing for
        # same-domain requests in only about 0.2% of cases or less, so
        # we can use strict Referer checking.
        # Determine the origin of this request
        origin = request.headers.get("Origin")
        if origin is None:
            origin = request.referrer
        # Fail if we were not able to locate an origin at all
        if not origin:
            return _fail("Origin checking failed - no Origin or Referer.")
        # Parse our origin so we we can extract the required information from
        # it.
        originp = urlparse.urlparse(origin)
        # Ensure that our Referer is also secure.
        if originp.scheme != "https":
            return _fail(
                "Referer checking failed - Referer is insecure while host is "
                "secure."
            )
        # Determine which origins we trust, which by default will include the
        # current origin.
        if trusted_origins is None:
            trusted_origins = aslist(
                request.registry.settings.get(
                    "pyramid.csrf_trusted_origins", [])
            )
        if request.host_port not in set(["80", "443"]):
            trusted_origins.append("{0.domain}:{0.host_port}".format(request))
        else:
            trusted_origins.append(request.domain)
        # Actually check to see if the request's origin matches any of our
        # trusted origins.
        if not any(is_same_domain(originp.netloc, host)
                   for host in trusted_origins):
            reason = (
                "Referer checking failed - {0} does not match any trusted "
                "origins."
            )
            return _fail(reason.format(origin))
    return True
pyramid/interfaces.py
@@ -927,6 +927,13 @@
    usually accessed via ``request.session``.
    Keys and values of a session must be pickleable.
    .. versionchanged:: 1.9
       Sessions are no longer required to implement ``get_csrf_token`` and
       ``new_csrf_token``. CSRF token support was moved to the pluggable
       :class:`pyramid.interfaces.ICSRFStoragePolicy` configuration hook.
    """
    # attributes
@@ -981,19 +988,39 @@
        :meth:`pyramid.interfaces.ISession.flash`
        """
    def new_csrf_token():
        """ Create and set into the session a new, random cross-site request
        forgery protection token.  Return the token.  It will be a string."""
    def get_csrf_token():
        """ Return a random cross-site request forgery protection token.  It
        will be a string.  If a token was previously added to the session via
        ``new_csrf_token``, that token will be returned.  If no CSRF token
        was previously set into the session, ``new_csrf_token`` will be
class ICSRFStoragePolicy(Interface):
    """ An object that offers the ability to verify CSRF tokens and generate
    new ones."""
    def new_csrf_token(request):
        """ Create and return a new, random cross-site request forgery
        protection token. The token will be an ascii-compatible unicode
        string.
        """
    def get_csrf_token(request):
        """ Return a cross-site request forgery protection token.  It
        will be an ascii-compatible unicode string.  If a token was previously
        set for this user via ``new_csrf_token``, that token will be returned.
        If no CSRF token was previously set, ``new_csrf_token`` will be
        called, which will create and set a token, and this token will be
        returned.
        """
    def check_csrf_token(request, token):
        """ Determine if the supplied ``token`` is valid. Most implementations
        should simply compare the ``token`` to the current value of
        ``get_csrf_token`` but it is possible to verify the token using
        any mechanism necessary using this method.
        Returns ``True`` if the ``token`` is valid, otherwise ``False``.
        """
class IIntrospector(Interface):
    def get(category_name, discriminator, default=None):
        """ Get the IIntrospectable related to the category_name and the
pyramid/predicates.py
@@ -4,7 +4,7 @@
from pyramid.compat import is_nonstr_iter
from pyramid.session import check_csrf_token
from pyramid.csrf import check_csrf_token
from pyramid.traversal import (
    find_interface,
    traversal_path,
pyramid/renderers.py
@@ -1,3 +1,4 @@
from functools import partial
import json
import os
import re
@@ -19,6 +20,7 @@
    text_type,
    )
from pyramid.csrf import get_csrf_token
from pyramid.decorator import reify
from pyramid.events import BeforeRender
@@ -428,6 +430,7 @@
                  'context':context,
                  'request':request,
                  'req':request,
                  'get_csrf_token':partial(get_csrf_token, request),
                  }
        return self.render_to_response(response, system, request=request)
@@ -441,13 +444,13 @@
                'context':getattr(request, 'context', None),
                'request':request,
                'req':request,
                'get_csrf_token':partial(get_csrf_token, request),
                }
        system_values = BeforeRender(system_values, value)
        registry = self.registry
        registry.notify(system_values)
        result = renderer(value, system_values)
        return result
pyramid/session.py
@@ -16,19 +16,15 @@
    text_,
    bytes_,
    native_,
    urlparse,
    )
from pyramid.csrf import (
    check_csrf_origin,
    check_csrf_token,
)
from pyramid.exceptions import (
    BadCSRFOrigin,
    BadCSRFToken,
)
from pyramid.interfaces import ISession
from pyramid.settings import aslist
from pyramid.util import (
    is_same_domain,
    strings_differ,
)
from pyramid.util import strings_differ
def manage_accessed(wrapped):
    """ Decorator which causes a cookie to be renewed when an accessor
@@ -109,149 +105,6 @@
    return pickle.loads(pickled)
def check_csrf_origin(request, trusted_origins=None, raises=True):
    """
    Check the Origin of the request to see if it is a cross site request or
    not.
    If the value supplied by the Origin or Referer header isn't one of the
    trusted origins and ``raises`` is ``True``, this function will raise a
    :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is
    ``False`` this function will return ``False`` instead. If the CSRF origin
    checks are successful this function will return ``True`` unconditionally.
    Additional trusted origins may be added by passing a list of domain (and
    ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in
    with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None``
    (the default) this list of additional domains will be pulled from the
    ``pyramid.csrf_trusted_origins`` setting.
    Note that this function will do nothing if request.scheme is not https.
    .. versionadded:: 1.7
    """
    def _fail(reason):
        if raises:
            raise BadCSRFOrigin(reason)
        else:
            return False
    if request.scheme == "https":
        # Suppose user visits http://example.com/
        # An active network attacker (man-in-the-middle, MITM) sends a
        # POST form that targets https://example.com/detonate-bomb/ and
        # submits it via JavaScript.
        #
        # The attacker will need to provide a CSRF cookie and token, but
        # that's no problem for a MITM when we cannot make any assumptions
        # about what kind of session storage is being used. So the MITM can
        # circumvent the CSRF protection. This is true for any HTTP connection,
        # but anyone using HTTPS expects better! For this reason, for
        # https://example.com/ we need additional protection that treats
        # http://example.com/ as completely untrusted. Under HTTPS,
        # Barth et al. found that the Referer header is missing for
        # same-domain requests in only about 0.2% of cases or less, so
        # we can use strict Referer checking.
        # Determine the origin of this request
        origin = request.headers.get("Origin")
        if origin is None:
            origin = request.referrer
        # Fail if we were not able to locate an origin at all
        if not origin:
            return _fail("Origin checking failed - no Origin or Referer.")
        # Parse our origin so we we can extract the required information from
        # it.
        originp = urlparse.urlparse(origin)
        # Ensure that our Referer is also secure.
        if originp.scheme != "https":
            return _fail(
                "Referer checking failed - Referer is insecure while host is "
                "secure."
            )
        # Determine which origins we trust, which by default will include the
        # current origin.
        if trusted_origins is None:
            trusted_origins = aslist(
                request.registry.settings.get(
                    "pyramid.csrf_trusted_origins", [])
            )
        if request.host_port not in set(["80", "443"]):
            trusted_origins.append("{0.domain}:{0.host_port}".format(request))
        else:
            trusted_origins.append(request.domain)
        # Actually check to see if the request's origin matches any of our
        # trusted origins.
        if not any(is_same_domain(originp.netloc, host)
                   for host in trusted_origins):
            reason = (
                "Referer checking failed - {0} does not match any trusted "
                "origins."
            )
            return _fail(reason.format(origin))
    return True
def check_csrf_token(request,
                     token='csrf_token',
                     header='X-CSRF-Token',
                     raises=True):
    """ Check the CSRF token in the request's session against the value in
    ``request.POST.get(token)`` (if a POST request) or
    ``request.headers.get(header)``. If a ``token`` keyword is not supplied to
    this function, the string ``csrf_token`` will be used to look up the token
    in ``request.POST``. If a ``header`` keyword is not supplied to this
    function, the string ``X-CSRF-Token`` will be used to look up the token in
    ``request.headers``.
    If the value supplied by post or by header doesn't match the value
    supplied by ``request.session.get_csrf_token()``, and ``raises`` is
    ``True``, this function will raise an
    :exc:`pyramid.exceptions.BadCSRFToken` exception.
    If the values differ and ``raises`` is ``False``, this function will
    return ``False``.  If the CSRF check is successful, this function will
    return ``True`` unconditionally.
    Note that using this function requires that a :term:`session factory` is
    configured.
    See :ref:`auto_csrf_checking` for information about how to secure your
    application automatically against CSRF attacks.
    .. versionadded:: 1.4a2
    .. versionchanged:: 1.7a1
       A CSRF token passed in the query string of the request is no longer
       considered valid. It must be passed in either the request body or
       a header.
    """
    supplied_token = ""
    # If this is a POST/PUT/etc request, then we'll check the body to see if it
    # has a token. We explicitly use request.POST here because CSRF tokens
    # should never appear in an URL as doing so is a security issue. We also
    # explicitly check for request.POST here as we do not support sending form
    # encoded data over anything but a request.POST.
    if token is not None:
        supplied_token = request.POST.get(token, "")
    # If we were unable to locate a CSRF token in a request body, then we'll
    # check to see if there are any headers that have a value for us.
    if supplied_token == "" and header is not None:
        supplied_token = request.headers.get(header, "")
    expected_token = request.session.get_csrf_token()
    if strings_differ(bytes_(expected_token), bytes_(supplied_token)):
        if raises:
            raise BadCSRFToken('check_csrf_token(): Invalid token')
        return False
    return True
class PickleSerializer(object):
    """ A serializer that uses the pickle protocol to dump Python
@@ -759,3 +612,13 @@
        reissue_time=reissue_time,
        set_on_exception=set_on_exception,
    )
check_csrf_origin = check_csrf_origin  # api
deprecated('check_csrf_origin',
           'pyramid.session.check_csrf_origin is deprecated as of Pyramid '
           '1.9. Use pyramid.csrf.check_csrf_origin instead.')
check_csrf_token = check_csrf_token  # api
deprecated('check_csrf_token',
           'pyramid.session.check_csrf_token is deprecated as of Pyramid '
           '1.9. Use pyramid.csrf.check_csrf_token instead.')
pyramid/testing.py
@@ -479,6 +479,7 @@
        config.add_default_view_derivers()
        config.add_default_route_predicates()
        config.add_default_tweens()
        config.add_default_security()
    config.commit()
    global have_zca
    try:
pyramid/tests/test_config/test_views.py
@@ -18,6 +18,7 @@
    def _makeOne(self, *arg, **kw):
        from pyramid.config import Configurator
        config = Configurator(*arg, **kw)
        config.set_default_csrf_options(require_csrf=False)
        return config
    def _getViewCallable(self, config, ctx_iface=None, exc_iface=None,
@@ -2373,7 +2374,7 @@
        view = lambda r: 'OK'
        config.set_default_csrf_options(require_csrf=True)
        config.add_view(view, context=Exception, renderer=null_renderer)
        view_intr = introspector.introspectables[1]
        view_intr = introspector.introspectables[-1]
        self.assertTrue(view_intr.type_name, 'view')
        self.assertEqual(view_intr['callable'], view)
        derived_view = view_intr['derived_callable']
pyramid/tests/test_csrf.py
New file
@@ -0,0 +1,406 @@
import unittest
from pyramid import testing
from pyramid.config import Configurator
class TestLegacySessionCSRFStoragePolicy(unittest.TestCase):
    class MockSession(object):
        def __init__(self, current_token='02821185e4c94269bdc38e6eeae0a2f8'):
            self.current_token = current_token
        def new_csrf_token(self):
            self.current_token = 'e5e9e30a08b34ff9842ff7d2b958c14b'
            return self.current_token
        def get_csrf_token(self):
            return self.current_token
    def _makeOne(self):
        from pyramid.csrf import LegacySessionCSRFStoragePolicy
        return LegacySessionCSRFStoragePolicy()
    def test_register_session_csrf_policy(self):
        from pyramid.csrf import LegacySessionCSRFStoragePolicy
        from pyramid.interfaces import ICSRFStoragePolicy
        config = Configurator()
        config.set_csrf_storage_policy(self._makeOne())
        config.commit()
        policy = config.registry.queryUtility(ICSRFStoragePolicy)
        self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy))
    def test_session_csrf_implementation_delegates_to_session(self):
        policy = self._makeOne()
        request = DummyRequest(session=self.MockSession())
        self.assertEqual(
            policy.get_csrf_token(request),
            '02821185e4c94269bdc38e6eeae0a2f8'
        )
        self.assertEqual(
            policy.new_csrf_token(request),
            'e5e9e30a08b34ff9842ff7d2b958c14b'
        )
    def test_check_csrf_token(self):
        request = DummyRequest(session=self.MockSession('foo'))
        policy = self._makeOne()
        self.assertTrue(policy.check_csrf_token(request, 'foo'))
        self.assertFalse(policy.check_csrf_token(request, 'bar'))
class TestSessionCSRFStoragePolicy(unittest.TestCase):
    def _makeOne(self, **kw):
        from pyramid.csrf import SessionCSRFStoragePolicy
        return SessionCSRFStoragePolicy(**kw)
    def test_register_session_csrf_policy(self):
        from pyramid.csrf import SessionCSRFStoragePolicy
        from pyramid.interfaces import ICSRFStoragePolicy
        config = Configurator()
        config.set_csrf_storage_policy(self._makeOne())
        config.commit()
        policy = config.registry.queryUtility(ICSRFStoragePolicy)
        self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy))
    def test_it_creates_a_new_token(self):
        request = DummyRequest(session={})
        policy = self._makeOne()
        policy._token_factory = lambda: 'foo'
        self.assertEqual(policy.get_csrf_token(request), 'foo')
    def test_get_csrf_token_returns_the_new_token(self):
        request = DummyRequest(session={'_csrft_': 'foo'})
        policy = self._makeOne()
        self.assertEqual(policy.get_csrf_token(request), 'foo')
        token = policy.new_csrf_token(request)
        self.assertNotEqual(token, 'foo')
        self.assertEqual(token, policy.get_csrf_token(request))
    def test_check_csrf_token(self):
        request = DummyRequest(session={})
        policy = self._makeOne()
        self.assertFalse(policy.check_csrf_token(request, 'foo'))
        request.session = {'_csrft_': 'foo'}
        self.assertTrue(policy.check_csrf_token(request, 'foo'))
        self.assertFalse(policy.check_csrf_token(request, 'bar'))
class TestCookieCSRFStoragePolicy(unittest.TestCase):
    def _makeOne(self, **kw):
        from pyramid.csrf import CookieCSRFStoragePolicy
        return CookieCSRFStoragePolicy(**kw)
    def test_register_cookie_csrf_policy(self):
        from pyramid.csrf import CookieCSRFStoragePolicy
        from pyramid.interfaces import ICSRFStoragePolicy
        config = Configurator()
        config.set_csrf_storage_policy(self._makeOne())
        config.commit()
        policy = config.registry.queryUtility(ICSRFStoragePolicy)
        self.assertTrue(isinstance(policy, CookieCSRFStoragePolicy))
    def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self):
        response = MockResponse()
        request = DummyRequest()
        policy = self._makeOne()
        token = policy.get_csrf_token(request)
        request.response_callback(request, response)
        self.assertEqual(
            response.headerlist,
            [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))]
        )
    def test_existing_cookie_csrf_does_not_set_cookie(self):
        request = DummyRequest()
        request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
        policy = self._makeOne()
        token = policy.get_csrf_token(request)
        self.assertEqual(
            token,
            'e6f325fee5974f3da4315a8ccf4513d2'
        )
        self.assertIsNone(request.response_callback)
    def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self):
        request = DummyRequest()
        request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'}
        policy = self._makeOne()
        token = policy.new_csrf_token(request)
        response = MockResponse()
        request.response_callback(request, response)
        self.assertEqual(
            response.headerlist,
            [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))]
        )
    def test_get_csrf_token_returns_the_new_token(self):
        request = DummyRequest()
        request.cookies = {'csrf_token': 'foo'}
        policy = self._makeOne()
        self.assertEqual(policy.get_csrf_token(request), 'foo')
        token = policy.new_csrf_token(request)
        self.assertNotEqual(token, 'foo')
        self.assertEqual(token, policy.get_csrf_token(request))
    def test_check_csrf_token(self):
        request = DummyRequest()
        policy = self._makeOne()
        self.assertFalse(policy.check_csrf_token(request, 'foo'))
        request.cookies = {'csrf_token': 'foo'}
        self.assertTrue(policy.check_csrf_token(request, 'foo'))
        self.assertFalse(policy.check_csrf_token(request, 'bar'))
class Test_get_csrf_token(unittest.TestCase):
    def setUp(self):
        self.config = testing.setUp()
    def _callFUT(self, *args, **kwargs):
        from pyramid.csrf import get_csrf_token
        return get_csrf_token(*args, **kwargs)
    def test_no_override_csrf_utility_registered(self):
        request = testing.DummyRequest()
        self._callFUT(request)
    def test_success(self):
        self.config.set_csrf_storage_policy(DummyCSRF())
        request = testing.DummyRequest()
        csrf_token = self._callFUT(request)
        self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8')
class Test_new_csrf_token(unittest.TestCase):
    def setUp(self):
        self.config = testing.setUp()
    def _callFUT(self, *args, **kwargs):
        from pyramid.csrf import new_csrf_token
        return new_csrf_token(*args, **kwargs)
    def test_no_override_csrf_utility_registered(self):
        request = testing.DummyRequest()
        self._callFUT(request)
    def test_success(self):
        self.config.set_csrf_storage_policy(DummyCSRF())
        request = testing.DummyRequest()
        csrf_token = self._callFUT(request)
        self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b')
class Test_check_csrf_token(unittest.TestCase):
    def setUp(self):
        self.config = testing.setUp()
        # set up CSRF
        self.config.set_default_csrf_options(require_csrf=False)
    def _callFUT(self, *args, **kwargs):
        from ..csrf import check_csrf_token
        return check_csrf_token(*args, **kwargs)
    def test_success_token(self):
        request = testing.DummyRequest()
        request.method = "POST"
        request.POST = {'csrf_token': request.session.get_csrf_token()}
        self.assertEqual(self._callFUT(request, token='csrf_token'), True)
    def test_success_header(self):
        request = testing.DummyRequest()
        request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
        self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True)
    def test_success_default_token(self):
        request = testing.DummyRequest()
        request.method = "POST"
        request.POST = {'csrf_token': request.session.get_csrf_token()}
        self.assertEqual(self._callFUT(request), True)
    def test_success_default_header(self):
        request = testing.DummyRequest()
        request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
        self.assertEqual(self._callFUT(request), True)
    def test_failure_raises(self):
        from pyramid.exceptions import BadCSRFToken
        request = testing.DummyRequest()
        self.assertRaises(BadCSRFToken, self._callFUT, request,
                          'csrf_token')
    def test_failure_no_raises(self):
        request = testing.DummyRequest()
        result = self._callFUT(request, 'csrf_token', raises=False)
        self.assertEqual(result, False)
class Test_check_csrf_token_without_defaults_configured(unittest.TestCase):
    def setUp(self):
        self.config = testing.setUp()
    def _callFUT(self, *args, **kwargs):
        from ..csrf import check_csrf_token
        return check_csrf_token(*args, **kwargs)
    def test_success_token(self):
        request = testing.DummyRequest()
        request.method = "POST"
        request.POST = {'csrf_token': request.session.get_csrf_token()}
        self.assertEqual(self._callFUT(request, token='csrf_token'), True)
    def test_failure_raises(self):
        from pyramid.exceptions import BadCSRFToken
        request = testing.DummyRequest()
        self.assertRaises(BadCSRFToken, self._callFUT, request,
                          'csrf_token')
    def test_failure_no_raises(self):
        request = testing.DummyRequest()
        result = self._callFUT(request, 'csrf_token', raises=False)
        self.assertEqual(result, False)
class Test_check_csrf_origin(unittest.TestCase):
    def _callFUT(self, *args, **kwargs):
        from ..csrf import check_csrf_origin
        return check_csrf_origin(*args, **kwargs)
    def test_success_with_http(self):
        request = testing.DummyRequest()
        request.scheme = "http"
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_referrer(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_origin(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.headers = {"Origin": "https://example.com/"}
        request.referrer = "https://not-example.com/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_additional_trusted_host(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {
            "pyramid.csrf_trusted_origins": ["not-example.com"],
        }
        self.assertTrue(self._callFUT(request))
    def test_success_with_nonstandard_port(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = "8080"
        request.referrer = "https://example.com:8080/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_fails_with_wrong_host(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_no_origin(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.referrer = None
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_when_http_to_https(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "http://example.com/evil/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_nonstandard_port(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = "8080"
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
class DummyRequest(object):
    registry = None
    session = None
    response_callback = None
    def __init__(self, registry=None, session=None):
        self.registry = registry
        self.session = session
        self.cookies = {}
    def add_response_callback(self, callback):
        self.response_callback = callback
class MockResponse(object):
    def __init__(self):
        self.headerlist = []
class DummyCSRF(object):
    def new_csrf_token(self, request):
        return 'e5e9e30a08b34ff9842ff7d2b958c14b'
    def get_csrf_token(self, request):
        return '02821185e4c94269bdc38e6eeae0a2f8'
pyramid/tests/test_renderers.py
@@ -203,6 +203,7 @@
        self.assertEqual(helper.get_renderer(), factory.respond)
    def test_render_view(self):
        import pyramid.csrf
        self._registerRendererFactory()
        self._registerResponseFactory()
        request = Dummy()
@@ -212,6 +213,9 @@
        request = testing.DummyRequest()
        response = 'response'
        response = helper.render_view(request, response, view, context)
        get_csrf = response.app_iter[1].pop('get_csrf_token')
        self.assertEqual(get_csrf.args, (request, ))
        self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token)
        self.assertEqual(response.app_iter[0], 'response')
        self.assertEqual(response.app_iter[1],
                          {'renderer_info': helper,
@@ -242,12 +246,16 @@
        self.assertEqual(reg.event.__class__.__name__, 'BeforeRender')
    def test_render_system_values_is_None(self):
        import pyramid.csrf
        self._registerRendererFactory()
        request = Dummy()
        context = Dummy()
        request.context = context
        helper = self._makeOne('loo.foo')
        result = helper.render('values', None, request=request)
        get_csrf = result[1].pop('get_csrf_token')
        self.assertEqual(get_csrf.args, (request, ))
        self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token)
        system = {'request':request,
                  'context':context,
                  'renderer_name':'loo.foo',
pyramid/tests/test_session.py
@@ -659,144 +659,6 @@
        result = self._callFUT(serialized, secret.decode('latin-1'))
        self.assertEqual(result, '123')
class Test_check_csrf_token(unittest.TestCase):
    def _callFUT(self, *args, **kwargs):
        from ..session import check_csrf_token
        return check_csrf_token(*args, **kwargs)
    def test_success_token(self):
        request = testing.DummyRequest()
        request.method = "POST"
        request.POST = {'csrf_token': request.session.get_csrf_token()}
        self.assertEqual(self._callFUT(request, token='csrf_token'), True)
    def test_success_header(self):
        request = testing.DummyRequest()
        request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
        self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True)
    def test_success_default_token(self):
        request = testing.DummyRequest()
        request.method = "POST"
        request.POST = {'csrf_token': request.session.get_csrf_token()}
        self.assertEqual(self._callFUT(request), True)
    def test_success_default_header(self):
        request = testing.DummyRequest()
        request.headers['X-CSRF-Token'] = request.session.get_csrf_token()
        self.assertEqual(self._callFUT(request), True)
    def test_failure_raises(self):
        from pyramid.exceptions import BadCSRFToken
        request = testing.DummyRequest()
        self.assertRaises(BadCSRFToken, self._callFUT, request,
                          'csrf_token')
    def test_failure_no_raises(self):
        request = testing.DummyRequest()
        result = self._callFUT(request, 'csrf_token', raises=False)
        self.assertEqual(result, False)
    def test_token_differing_types(self):
        from pyramid.compat import text_
        request = testing.DummyRequest()
        request.method = "POST"
        request.session['_csrft_'] = text_('foo')
        request.POST = {'csrf_token': b'foo'}
        self.assertEqual(self._callFUT(request, token='csrf_token'), True)
class Test_check_csrf_origin(unittest.TestCase):
    def _callFUT(self, *args, **kwargs):
        from ..session import check_csrf_origin
        return check_csrf_origin(*args, **kwargs)
    def test_success_with_http(self):
        request = testing.DummyRequest()
        request.scheme = "http"
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_referrer(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_https_and_origin(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.headers = {"Origin": "https://example.com/"}
        request.referrer = "https://not-example.com/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_success_with_additional_trusted_host(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {
            "pyramid.csrf_trusted_origins": ["not-example.com"],
        }
        self.assertTrue(self._callFUT(request))
    def test_success_with_nonstandard_port(self):
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = "8080"
        request.referrer = "https://example.com:8080/login/"
        request.registry.settings = {}
        self.assertTrue(self._callFUT(request))
    def test_fails_with_wrong_host(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "https://not-example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_no_origin(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.referrer = None
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_when_http_to_https(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com"
        request.host_port = "443"
        request.referrer = "http://example.com/evil/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
    def test_fails_with_nonstandard_port(self):
        from pyramid.exceptions import BadCSRFOrigin
        request = testing.DummyRequest()
        request.scheme = "https"
        request.host = "example.com:8080"
        request.host_port = "8080"
        request.referrer = "https://example.com/login/"
        request.registry.settings = {}
        self.assertRaises(BadCSRFOrigin, self._callFUT, request)
        self.assertFalse(self._callFUT(request, raises=False))
class DummySerializer(object):
    def dumps(self, value):
pyramid/tests/test_util.py
@@ -369,12 +369,16 @@
        from pyramid.util import strings_differ
        return strings_differ(*args, **kw)
    def test_it(self):
    def test_it_bytes(self):
        self.assertFalse(self._callFUT(b'foo', b'foo'))
        self.assertTrue(self._callFUT(b'123', b'345'))
        self.assertTrue(self._callFUT(b'1234', b'123'))
        self.assertTrue(self._callFUT(b'123', b'1234'))
    def test_it_native_str(self):
        self.assertFalse(self._callFUT('123', '123'))
        self.assertTrue(self._callFUT('123', '1234'))
    def test_it_with_internal_comparator(self):
        result = self._callFUT(b'foo', b'foo', compare_digest=None)
        self.assertFalse(result)
pyramid/tests/test_viewderivers.py
@@ -12,6 +12,7 @@
    def setUp(self):
        self.config = testing.setUp()
        self.config.set_default_csrf_options(require_csrf=False)
    def tearDown(self):
        self.config = None
pyramid/viewderivers.py
@@ -6,7 +6,7 @@
    )
from pyramid.security import NO_PERMISSION_REQUIRED
from pyramid.session import (
from pyramid.csrf import (
    check_csrf_origin,
    check_csrf_token,
)