Merge pull request #3019 from mmerickel/fixes/csrf-decoupling-2854
Decouple CSRF protection from the session machinery (replaced #2854)
3 files added
23 files modified
| | |
| | | can be alleviated by invoking ``config.begin()`` and ``config.end()`` |
| | | appropriately. See https://github.com/Pylons/pyramid/pull/2989 |
| | | |
| | | - A new CSRF implementation, ``pyramid.csrf.SessionCSRFStoragePolicy``, |
| | | has been added which delegates all CSRF generation to the current session, |
| | | following the old API for this. A ``pyramid.csrf.get_csrf_token()`` api is now |
| | | available in template global scope, to make it easy for template developers |
| | | to get the current CSRF token without adding it to Python code. |
| | | See https://github.com/Pylons/pyramid/pull/2854 and |
| | | https://github.com/Pylons/pyramid/pull/3019 |
| | | |
| | | - The ``pyramid.config.Configurator`` can now be used as a context manager |
| | | which will automatically push/pop threadlocals (similar to |
| | | ``config.begin()`` and ``config.end()``). It will also automatically perform |
| | |
| | | |
| | | Documentation Changes |
| | | --------------------- |
| | | |
| | | - Retrieving CSRF token from the session has been deprecated, in favor of |
| | | equivalent methods in :mod:`pyramid.csrf`. |
| | | See https://github.com/Pylons/pyramid/pull/2854 |
| | |
| | | |
| | | - Mikko Ohtamaa, 2016/12/6 |
| | | |
| | | - Jure Cerjak, 2016/12/7 |
| | | |
| | | - Martin Frlin, 2016/12/7 |
| | | |
| | | - Kirill Kuzminykh, 2017/03/01 |
| | |
| | | .. automethod:: set_authentication_policy |
| | | .. automethod:: set_authorization_policy |
| | | .. automethod:: set_default_csrf_options |
| | | .. automethod:: set_csrf_storage_policy |
| | | .. automethod:: set_default_permission |
| | | .. automethod:: add_permission |
| | | |
New file |
| | |
| | | .. _csrf_module: |
| | | |
| | | :mod:`pyramid.csrf` |
| | | ------------------- |
| | | |
| | | .. automodule:: pyramid.csrf |
| | | |
| | | .. autoclass:: LegacySessionCSRFStoragePolicy |
| | | :members: |
| | | |
| | | .. autoclass:: SessionCSRFStoragePolicy |
| | | :members: |
| | | |
| | | .. autoclass:: CookieCSRFStoragePolicy |
| | | :members: |
| | | |
| | | .. autofunction:: get_csrf_token |
| | | |
| | | .. autofunction:: new_csrf_token |
| | | |
| | | .. autofunction:: check_csrf_origin |
| | | |
| | | .. autofunction:: check_csrf_token |
| | |
| | | .. autointerface:: IRoutePregenerator |
| | | :members: |
| | | |
| | | .. autointerface:: ICSRFStoragePolicy |
| | | :members: |
| | | |
| | | .. autointerface:: ISession |
| | | :members: |
| | | |
| | |
| | | |
| | | .. autofunction:: signed_deserialize |
| | | |
| | | .. autofunction:: check_csrf_origin |
| | | |
| | | .. autofunction:: check_csrf_token |
| | | |
| | | .. autofunction:: SignedCookieSessionFactory |
| | | |
| | | .. autofunction:: UnencryptedCookieSessionFactoryConfig |
| | |
| | | :meth:`pyramid.config.Configurator.set_session_factory` for more |
| | | information. |
| | | |
| | | CSRF storage policy |
| | | A utility that implements :class:`pyramid.interfaces.ICSRFStoragePolicy` |
| | | which is responsible for allocating CSRF tokens to a user and verifying |
| | | that a provided token is acceptable. |
| | | |
| | | Mako |
| | | `Mako <http://www.makotemplates.org/>`_ is a template language |
| | | which refines the familiar ideas of componentized layout and inheritance |
| | |
| | | # config is an instance of pyramid.config.Configurator |
| | | |
| | | config.add_view('mypackage.views.blog_entry_add_view', |
| | | name='add_entry.html', |
| | | name='add_entry.html', |
| | | context='mypackage.resources.Blog', |
| | | permission='add') |
| | | |
| | |
| | | """ Return ``True`` if any of the ``principals`` is allowed the |
| | | ``permission`` in the current ``context``, else return ``False`` |
| | | """ |
| | | |
| | | |
| | | def principals_allowed_by_permission(self, context, permission): |
| | | """ Return a set of principal identifiers allowed by the |
| | | ``permission`` in ``context``. This behavior is optional; if you |
| | |
| | | a secret across two different subsystems might drop the security of signing to |
| | | zero. Keys should not be re-used across different contexts where an attacker |
| | | has the possibility of providing a chosen plaintext. |
| | | |
| | | .. index:: |
| | | single: preventing cross-site request forgery attacks |
| | | single: cross-site request forgery attacks, prevention |
| | | |
| | | Preventing Cross-Site Request Forgery Attacks |
| | | --------------------------------------------- |
| | | |
| | | `Cross-site request forgery |
| | | <https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a |
| | | phenomenon whereby a user who is logged in to your website might inadvertantly |
| | | load a URL because it is linked from, or embedded in, an attacker's website. |
| | | If the URL is one that may modify or delete data, the consequences can be dire. |
| | | |
| | | You can avoid most of these attacks by issuing a unique token to the browser |
| | | and then requiring that it be present in all potentially unsafe requests. |
| | | :app:`Pyramid` provides facilities to create and check CSRF tokens. |
| | | |
| | | By default :app:`Pyramid` comes with a session-based CSRF implementation |
| | | :class:`pyramid.csrf.SessionCSRFStoragePolicy`. To use it, you must first enable |
| | | a :term:`session factory` as described in |
| | | :ref:`using_the_default_session_factory` or |
| | | :ref:`using_alternate_session_factories`. Alternatively, you can use |
| | | a cookie-based implementation :class:`pyramid.csrf.CookieCSRFStoragePolicy` which gives |
| | | some additional flexibility as it does not require a session for each user. |
| | | You can also define your own implementation of |
| | | :class:`pyramid.interfaces.ICSRFStoragePolicy` and register it with the |
| | | :meth:`pyramid.config.Configurator.set_csrf_storage_policy` directive. |
| | | |
| | | For example: |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator() |
| | | config.set_csrf_storage_policy(MyCustomCSRFPolicy()) |
| | | |
| | | .. index:: |
| | | single: csrf.get_csrf_token |
| | | |
| | | Using the ``csrf.get_csrf_token`` Method |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | To get the current CSRF token, use the |
| | | :data:`pyramid.csrf.get_csrf_token` method. |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.csrf import get_csrf_token |
| | | token = get_csrf_token(request) |
| | | |
| | | The ``get_csrf_token()`` method accepts a single argument: the request. It |
| | | returns a CSRF *token* string. If ``get_csrf_token()`` or ``new_csrf_token()`` |
| | | was invoked previously for this user, then the existing token will be returned. |
| | | If no CSRF token previously existed for this user, then a new token will be set |
| | | into the session and returned. The newly created token will be opaque and |
| | | randomized. |
| | | |
| | | .. _get_csrf_token_in_templates: |
| | | |
| | | Using the ``get_csrf_token`` global in templates |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | Templates have a ``get_csrf_token()`` method inserted into their globals, which |
| | | allows you to get the current token without modifying the view code. This |
| | | method takes no arguments and returns a CSRF token string. You can use the |
| | | returned token as the value of a hidden field in a form that posts to a method |
| | | that requires elevated privileges, or supply it as a request header in AJAX |
| | | requests. |
| | | |
| | | For example, include the CSRF token as a hidden field: |
| | | |
| | | .. code-block:: html |
| | | |
| | | <form method="post" action="/myview"> |
| | | <input type="hidden" name="csrf_token" value="${get_csrf_token()}"> |
| | | <input type="submit" value="Delete Everything"> |
| | | </form> |
| | | |
| | | Or include it as a header in a jQuery AJAX request: |
| | | |
| | | .. code-block:: javascript |
| | | |
| | | var csrfToken = "${get_csrf_token()}"; |
| | | $.ajax({ |
| | | type: "POST", |
| | | url: "/myview", |
| | | headers: { 'X-CSRF-Token': csrfToken } |
| | | }).done(function() { |
| | | alert("Deleted"); |
| | | }); |
| | | |
| | | The handler for the URL that receives the request should then require that the |
| | | correct CSRF token is supplied. |
| | | |
| | | .. index:: |
| | | single: csrf.new_csrf_token |
| | | |
| | | Using the ``csrf.new_csrf_token`` Method |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | To explicitly create a new CSRF token, use the ``csrf.new_csrf_token()`` |
| | | method. This differs only from ``csrf.get_csrf_token()`` inasmuch as it |
| | | clears any existing CSRF token, creates a new CSRF token, sets the token into |
| | | the user, and returns the token. |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.csrf import get_csrf_token |
| | | token = new_csrf_token() |
| | | |
| | | .. note:: |
| | | |
| | | It is not possible to force a new CSRF token from a template. If you |
| | | want to regenerate your CSRF token then do it in the view code and return |
| | | the new token as part of the context. |
| | | |
| | | Checking CSRF Tokens Manually |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | In request handling code, you can check the presence and validity of a CSRF |
| | | token with :func:`pyramid.csrf.check_csrf_token`. If the token is valid, it |
| | | will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, |
| | | you can specify ``raises=False`` to have the check return ``False`` instead of |
| | | raising an exception. |
| | | |
| | | By default, it checks for a POST parameter named ``csrf_token`` or a header |
| | | named ``X-CSRF-Token``. |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.csrf import check_csrf_token |
| | | |
| | | def myview(request): |
| | | # Require CSRF Token |
| | | check_csrf_token(request) |
| | | |
| | | # ... |
| | | |
| | | .. _auto_csrf_checking: |
| | | |
| | | Checking CSRF Tokens Automatically |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | .. versionadded:: 1.7 |
| | | |
| | | :app:`Pyramid` supports automatically checking CSRF tokens on requests with an |
| | | unsafe method as defined by RFC2616. Any other request may be checked manually. |
| | | This feature can be turned on globally for an application using the |
| | | :meth:`pyramid.config.Configurator.set_default_csrf_options` directive. |
| | | For example: |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator() |
| | | config.set_default_csrf_options(require_csrf=True) |
| | | |
| | | CSRF checking may be explicitly enabled or disabled on a per-view basis using |
| | | the ``require_csrf`` view option. A value of ``True`` or ``False`` will |
| | | override the default set by ``set_default_csrf_options``. For example: |
| | | |
| | | .. code-block:: python |
| | | |
| | | @view_config(route_name='hello', require_csrf=False) |
| | | def myview(request): |
| | | # ... |
| | | |
| | | When CSRF checking is active, the token and header used to find the |
| | | supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, |
| | | unless otherwise overridden by ``set_default_csrf_options``. The token is |
| | | checked against the value in ``request.POST`` which is the submitted form body. |
| | | If this value is not present, then the header will be checked. |
| | | |
| | | In addition to token based CSRF checks, if the request is using HTTPS then the |
| | | automatic CSRF checking will also check the referrer of the request to ensure |
| | | that it matches one of the trusted origins. By default the only trusted origin |
| | | is the current host, however additional origins may be configured by setting |
| | | ``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they |
| | | are non standard). If a host in the list of domains starts with a ``.`` then |
| | | that will allow all subdomains as well as the domain without the ``.``. |
| | | |
| | | If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or |
| | | :class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This |
| | | exception may be caught and handled by an :term:`exception view` but, by |
| | | default, will result in a ``400 Bad Request`` response being sent to the |
| | | client. |
| | | |
| | | Checking CSRF Tokens with a View Predicate |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | .. deprecated:: 1.7 |
| | | Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead |
| | | to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. |
| | | |
| | | A convenient way to require a valid CSRF token for a particular view is to |
| | | include ``check_csrf=True`` as a view predicate. See |
| | | :meth:`pyramid.config.Configurator.add_view`. |
| | | |
| | | .. code-block:: python |
| | | |
| | | @view_config(request_method='POST', check_csrf=True, ...) |
| | | def myview(request): |
| | | ... |
| | | |
| | | .. note:: |
| | | A mismatch of a CSRF token is treated like any other predicate miss, and the |
| | | predicate system, when it doesn't find a view, raises ``HTTPNotFound`` |
| | | instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different |
| | | from calling :func:`pyramid.csrf.check_csrf_token`. |
| | |
| | | |
| | | This chapter describes how to configure sessions, what session implementations |
| | | :app:`Pyramid` provides out of the box, how to store and retrieve data from |
| | | sessions, and two session-specific features: flash messages, and cross-site |
| | | request forgery attack prevention. |
| | | sessions, and a session-specific feature: flash messages. |
| | | |
| | | .. index:: |
| | | single: session factory (default) |
| | |
| | | ['info message'] |
| | | >>> request.session.peek_flash() |
| | | [] |
| | | |
| | | .. index:: |
| | | single: preventing cross-site request forgery attacks |
| | | single: cross-site request forgery attacks, prevention |
| | | |
| | | Preventing Cross-Site Request Forgery Attacks |
| | | --------------------------------------------- |
| | | |
| | | `Cross-site request forgery |
| | | <https://en.wikipedia.org/wiki/Cross-site_request_forgery>`_ attacks are a |
| | | phenomenon whereby a user who is logged in to your website might inadvertantly |
| | | load a URL because it is linked from, or embedded in, an attacker's website. |
| | | If the URL is one that may modify or delete data, the consequences can be dire. |
| | | |
| | | You can avoid most of these attacks by issuing a unique token to the browser |
| | | and then requiring that it be present in all potentially unsafe requests. |
| | | :app:`Pyramid` sessions provide facilities to create and check CSRF tokens. |
| | | |
| | | To use CSRF tokens, you must first enable a :term:`session factory` as |
| | | described in :ref:`using_the_default_session_factory` or |
| | | :ref:`using_alternate_session_factories`. |
| | | |
| | | .. index:: |
| | | single: session.get_csrf_token |
| | | |
| | | Using the ``session.get_csrf_token`` Method |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | To get the current CSRF token from the session, use the |
| | | ``session.get_csrf_token()`` method. |
| | | |
| | | .. code-block:: python |
| | | |
| | | token = request.session.get_csrf_token() |
| | | |
| | | The ``session.get_csrf_token()`` method accepts no arguments. It returns a |
| | | CSRF *token* string. If ``session.get_csrf_token()`` or |
| | | ``session.new_csrf_token()`` was invoked previously for this session, then the |
| | | existing token will be returned. If no CSRF token previously existed for this |
| | | session, then a new token will be set into the session and returned. The newly |
| | | created token will be opaque and randomized. |
| | | |
| | | You can use the returned token as the value of a hidden field in a form that |
| | | posts to a method that requires elevated privileges, or supply it as a request |
| | | header in AJAX requests. |
| | | |
| | | For example, include the CSRF token as a hidden field: |
| | | |
| | | .. code-block:: html |
| | | |
| | | <form method="post" action="/myview"> |
| | | <input type="hidden" name="csrf_token" value="${request.session.get_csrf_token()}"> |
| | | <input type="submit" value="Delete Everything"> |
| | | </form> |
| | | |
| | | Or include it as a header in a jQuery AJAX request: |
| | | |
| | | .. code-block:: javascript |
| | | |
| | | var csrfToken = ${request.session.get_csrf_token()}; |
| | | $.ajax({ |
| | | type: "POST", |
| | | url: "/myview", |
| | | headers: { 'X-CSRF-Token': csrfToken } |
| | | }).done(function() { |
| | | alert("Deleted"); |
| | | }); |
| | | |
| | | The handler for the URL that receives the request should then require that the |
| | | correct CSRF token is supplied. |
| | | |
| | | .. index:: |
| | | single: session.new_csrf_token |
| | | |
| | | Using the ``session.new_csrf_token`` Method |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | To explicitly create a new CSRF token, use the ``session.new_csrf_token()`` |
| | | method. This differs only from ``session.get_csrf_token()`` inasmuch as it |
| | | clears any existing CSRF token, creates a new CSRF token, sets the token into |
| | | the session, and returns the token. |
| | | |
| | | .. code-block:: python |
| | | |
| | | token = request.session.new_csrf_token() |
| | | |
| | | Checking CSRF Tokens Manually |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | In request handling code, you can check the presence and validity of a CSRF |
| | | token with :func:`pyramid.session.check_csrf_token`. If the token is valid, it |
| | | will return ``True``, otherwise it will raise ``HTTPBadRequest``. Optionally, |
| | | you can specify ``raises=False`` to have the check return ``False`` instead of |
| | | raising an exception. |
| | | |
| | | By default, it checks for a POST parameter named ``csrf_token`` or a header |
| | | named ``X-CSRF-Token``. |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.session import check_csrf_token |
| | | |
| | | def myview(request): |
| | | # Require CSRF Token |
| | | check_csrf_token(request) |
| | | |
| | | # ... |
| | | |
| | | .. _auto_csrf_checking: |
| | | |
| | | Checking CSRF Tokens Automatically |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | .. versionadded:: 1.7 |
| | | |
| | | :app:`Pyramid` supports automatically checking CSRF tokens on requests with an |
| | | unsafe method as defined by RFC2616. Any other request may be checked manually. |
| | | This feature can be turned on globally for an application using the |
| | | :meth:`pyramid.config.Configurator.set_default_csrf_options` directive. |
| | | For example: |
| | | |
| | | .. code-block:: python |
| | | |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator() |
| | | config.set_default_csrf_options(require_csrf=True) |
| | | |
| | | CSRF checking may be explicitly enabled or disabled on a per-view basis using |
| | | the ``require_csrf`` view option. A value of ``True`` or ``False`` will |
| | | override the default set by ``set_default_csrf_options``. For example: |
| | | |
| | | .. code-block:: python |
| | | |
| | | @view_config(route_name='hello', require_csrf=False) |
| | | def myview(request): |
| | | # ... |
| | | |
| | | When CSRF checking is active, the token and header used to find the |
| | | supplied CSRF token will be ``csrf_token`` and ``X-CSRF-Token``, respectively, |
| | | unless otherwise overridden by ``set_default_csrf_options``. The token is |
| | | checked against the value in ``request.POST`` which is the submitted form body. |
| | | If this value is not present, then the header will be checked. |
| | | |
| | | In addition to token based CSRF checks, if the request is using HTTPS then the |
| | | automatic CSRF checking will also check the referrer of the request to ensure |
| | | that it matches one of the trusted origins. By default the only trusted origin |
| | | is the current host, however additional origins may be configured by setting |
| | | ``pyramid.csrf_trusted_origins`` to a list of domain names (and ports if they |
| | | are non standard). If a host in the list of domains starts with a ``.`` then |
| | | that will allow all subdomains as well as the domain without the ``.``. |
| | | |
| | | If CSRF checks fail then a :class:`pyramid.exceptions.BadCSRFToken` or |
| | | :class:`pyramid.exceptions.BadCSRFOrigin` exception will be raised. This |
| | | exception may be caught and handled by an :term:`exception view` but, by |
| | | default, will result in a ``400 Bad Request`` response being sent to the |
| | | client. |
| | | |
| | | Checking CSRF Tokens with a View Predicate |
| | | ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~ |
| | | |
| | | .. deprecated:: 1.7 |
| | | Use the ``require_csrf`` option or read :ref:`auto_csrf_checking` instead |
| | | to have :class:`pyramid.exceptions.BadCSRFToken` exceptions raised. |
| | | |
| | | A convenient way to require a valid CSRF token for a particular view is to |
| | | include ``check_csrf=True`` as a view predicate. See |
| | | :meth:`pyramid.config.Configurator.add_view`. |
| | | |
| | | .. code-block:: python |
| | | |
| | | @view_config(request_method='POST', check_csrf=True, ...) |
| | | def myview(request): |
| | | ... |
| | | |
| | | .. note:: |
| | | A mismatch of a CSRF token is treated like any other predicate miss, and the |
| | | predicate system, when it doesn't find a view, raises ``HTTPNotFound`` |
| | | instead of ``HTTPBadRequest``, so ``check_csrf=True`` behavior is different |
| | | from calling :func:`pyramid.session.check_csrf_token`. |
| | |
| | | provided if the template is rendered as the result of a ``renderer=`` |
| | | argument to the view configuration being used. |
| | | |
| | | ``get_csrf_token()`` |
| | | A convenience function to access the current CSRF token. See |
| | | :ref:`get_csrf_token_in_templates` for more information. |
| | | |
| | | ``renderer_name`` |
| | | The renderer name used to perform the rendering, e.g., |
| | | ``mypackage:templates/foo.pt``. |
| | |
| | | self.add_default_view_derivers() |
| | | self.add_default_route_predicates() |
| | | self.add_default_tweens() |
| | | self.add_default_security() |
| | | |
| | | if exceptionresponse_view is not None: |
| | | exceptionresponse_view = self.maybe_dotted(exceptionresponse_view) |
| | |
| | | from pyramid.interfaces import ( |
| | | IAuthorizationPolicy, |
| | | IAuthenticationPolicy, |
| | | ICSRFStoragePolicy, |
| | | IDefaultCSRFOptions, |
| | | IDefaultPermission, |
| | | PHASE1_CONFIG, |
| | | PHASE2_CONFIG, |
| | | ) |
| | | |
| | | from pyramid.csrf import LegacySessionCSRFStoragePolicy |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.util import action_method |
| | | from pyramid.util import as_sorted_tuple |
| | | |
| | | |
| | | class SecurityConfiguratorMixin(object): |
| | | |
| | | def add_default_security(self): |
| | | self.set_csrf_storage_policy(LegacySessionCSRFStoragePolicy()) |
| | | |
| | | @action_method |
| | | def set_authentication_policy(self, policy): |
| | | """ Override the :app:`Pyramid` :term:`authentication policy` in the |
| | |
| | | intr['header'] = header |
| | | intr['safe_methods'] = as_sorted_tuple(safe_methods) |
| | | intr['callback'] = callback |
| | | |
| | | self.action(IDefaultCSRFOptions, register, order=PHASE1_CONFIG, |
| | | introspectables=(intr,)) |
| | | |
| | | @action_method |
| | | def set_csrf_storage_policy(self, policy): |
| | | """ |
| | | Set the :term:`CSRF storage policy` used by subsequent view |
| | | registrations. |
| | | |
| | | ``policy`` is a class that implements the |
| | | :meth:`pyramid.interfaces.ICSRFStoragePolicy` interface and defines |
| | | how to generate and persist CSRF tokens. |
| | | |
| | | """ |
| | | def register(): |
| | | self.registry.registerUtility(policy, ICSRFStoragePolicy) |
| | | intr = self.introspectable('csrf storage policy', |
| | | None, |
| | | policy, |
| | | 'csrf storage policy') |
| | | intr['policy'] = policy |
| | | self.action(ICSRFStoragePolicy, register, introspectables=(intr,)) |
| | | |
| | | |
| | | @implementer(IDefaultCSRFOptions) |
| | | class DefaultCSRFOptions(object): |
| | | def __init__(self, require_csrf, token, header, safe_methods, callback): |
| | |
| | | 'check name'. If the value provided is ``True``, ``csrf_token`` will |
| | | be used as the check name. |
| | | |
| | | If CSRF checking is performed, the checked value will be the value |
| | | of ``request.params[check_name]``. This value will be compared |
| | | against the value of ``request.session.get_csrf_token()``, and the |
| | | check will pass if these two values are the same. If the check |
| | | passes, the associated view will be permitted to execute. If the |
| | | If CSRF checking is performed, the checked value will be the value of |
| | | ``request.params[check_name]``. This value will be compared against |
| | | the value of ``policy.get_csrf_token()`` (where ``policy`` is an |
| | | implementation of :meth:`pyramid.interfaces.ICSRFStoragePolicy`), and the |
| | | check will pass if these two values are the same. If the check |
| | | passes, the associated view will be permitted to execute. If the |
| | | check fails, the associated view will not be permitted to execute. |
| | | |
| | | Note that using this feature requires a :term:`session factory` to |
| | | have been configured. |
| | | |
| | | .. versionadded:: 1.4a2 |
| | | |
| | | .. versionchanged:: 1.9 |
| | | This feature requires either a :term:`session factory` to have been |
| | | configured, or a :term:`CSRF storage policy` other than the default |
| | | to be in use. |
| | | |
| | | |
| | | physical_path |
| | | |
| | | If specified, this value should be a string or a tuple representing |
New file |
| | |
| | | import uuid |
| | | |
| | | from webob.cookies import CookieProfile |
| | | from zope.interface import implementer |
| | | |
| | | |
| | | from pyramid.authentication import _SimpleSerializer |
| | | |
| | | from pyramid.compat import ( |
| | | bytes_, |
| | | urlparse, |
| | | text_, |
| | | ) |
| | | from pyramid.exceptions import ( |
| | | BadCSRFOrigin, |
| | | BadCSRFToken, |
| | | ) |
| | | from pyramid.interfaces import ICSRFStoragePolicy |
| | | from pyramid.settings import aslist |
| | | from pyramid.util import ( |
| | | is_same_domain, |
| | | strings_differ |
| | | ) |
| | | |
| | | |
| | | @implementer(ICSRFStoragePolicy) |
| | | class LegacySessionCSRFStoragePolicy(object): |
| | | """ A CSRF storage policy that defers control of CSRF storage to the |
| | | session. |
| | | |
| | | This policy maintains compatibility with legacy ISession implementations |
| | | that know how to manage CSRF tokens themselves via |
| | | ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``. |
| | | |
| | | Note that using this CSRF implementation requires that |
| | | a :term:`session factory` is configured. |
| | | |
| | | .. versionadded:: 1.9 |
| | | |
| | | """ |
| | | def new_csrf_token(self, request): |
| | | """ Sets a new CSRF token into the session and returns it. """ |
| | | return request.session.new_csrf_token() |
| | | |
| | | def get_csrf_token(self, request): |
| | | """ Returns the currently active CSRF token from the session, |
| | | generating a new one if needed.""" |
| | | return request.session.get_csrf_token() |
| | | |
| | | def check_csrf_token(self, request, supplied_token): |
| | | """ Returns ``True`` if the ``supplied_token`` is valid.""" |
| | | expected_token = self.get_csrf_token(request) |
| | | return not strings_differ( |
| | | bytes_(expected_token), bytes_(supplied_token)) |
| | | |
| | | |
| | | @implementer(ICSRFStoragePolicy) |
| | | class SessionCSRFStoragePolicy(object): |
| | | """ A CSRF storage policy that persists the CSRF token in the session. |
| | | |
| | | Note that using this CSRF implementation requires that |
| | | a :term:`session factory` is configured. |
| | | |
| | | ``key`` |
| | | |
| | | The session key where the CSRF token will be stored. |
| | | Default: `_csrft_`. |
| | | |
| | | .. versionadded:: 1.9 |
| | | |
| | | """ |
| | | _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) |
| | | |
| | | def __init__(self, key='_csrft_'): |
| | | self.key = key |
| | | |
| | | def new_csrf_token(self, request): |
| | | """ Sets a new CSRF token into the session and returns it. """ |
| | | token = self._token_factory() |
| | | request.session[self.key] = token |
| | | return token |
| | | |
| | | def get_csrf_token(self, request): |
| | | """ Returns the currently active CSRF token from the session, |
| | | generating a new one if needed.""" |
| | | token = request.session.get(self.key, None) |
| | | if not token: |
| | | token = self.new_csrf_token(request) |
| | | return token |
| | | |
| | | def check_csrf_token(self, request, supplied_token): |
| | | """ Returns ``True`` if the ``supplied_token`` is valid.""" |
| | | expected_token = self.get_csrf_token(request) |
| | | return not strings_differ( |
| | | bytes_(expected_token), bytes_(supplied_token)) |
| | | |
| | | |
| | | @implementer(ICSRFStoragePolicy) |
| | | class CookieCSRFStoragePolicy(object): |
| | | """ An alternative CSRF implementation that stores its information in |
| | | unauthenticated cookies, known as the 'Double Submit Cookie' method in the |
| | | `OWASP CSRF guidelines <https://www.owasp.org/index.php/ |
| | | Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet# |
| | | Double_Submit_Cookie>`_. This gives some additional flexibility with |
| | | regards to scaling as the tokens can be generated and verified by a |
| | | front-end server. |
| | | |
| | | .. versionadded:: 1.9 |
| | | |
| | | """ |
| | | _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex)) |
| | | |
| | | def __init__(self, cookie_name='csrf_token', secure=False, httponly=False, |
| | | domain=None, max_age=None, path='/'): |
| | | serializer = _SimpleSerializer() |
| | | self.cookie_profile = CookieProfile( |
| | | cookie_name=cookie_name, |
| | | secure=secure, |
| | | max_age=max_age, |
| | | httponly=httponly, |
| | | path=path, |
| | | domains=[domain], |
| | | serializer=serializer |
| | | ) |
| | | self.cookie_name = cookie_name |
| | | |
| | | def new_csrf_token(self, request): |
| | | """ Sets a new CSRF token into the request and returns it. """ |
| | | token = self._token_factory() |
| | | request.cookies[self.cookie_name] = token |
| | | def set_cookie(request, response): |
| | | self.cookie_profile.set_cookies( |
| | | response, |
| | | token, |
| | | ) |
| | | request.add_response_callback(set_cookie) |
| | | return token |
| | | |
| | | def get_csrf_token(self, request): |
| | | """ Returns the currently active CSRF token by checking the cookies |
| | | sent with the current request.""" |
| | | bound_cookies = self.cookie_profile.bind(request) |
| | | token = bound_cookies.get_value() |
| | | if not token: |
| | | token = self.new_csrf_token(request) |
| | | return token |
| | | |
| | | def check_csrf_token(self, request, supplied_token): |
| | | """ Returns ``True`` if the ``supplied_token`` is valid.""" |
| | | expected_token = self.get_csrf_token(request) |
| | | return not strings_differ( |
| | | bytes_(expected_token), bytes_(supplied_token)) |
| | | |
| | | |
| | | def get_csrf_token(request): |
| | | """ Get the currently active CSRF token for the request passed, generating |
| | | a new one using ``new_csrf_token(request)`` if one does not exist. This |
| | | calls the equivalent method in the chosen CSRF protection implementation. |
| | | |
| | | .. versionadded :: 1.9 |
| | | |
| | | """ |
| | | registry = request.registry |
| | | csrf = registry.getUtility(ICSRFStoragePolicy) |
| | | return csrf.get_csrf_token(request) |
| | | |
| | | |
| | | def new_csrf_token(request): |
| | | """ Generate a new CSRF token for the request passed and persist it in an |
| | | implementation defined manner. This calls the equivalent method in the |
| | | chosen CSRF protection implementation. |
| | | |
| | | .. versionadded :: 1.9 |
| | | |
| | | """ |
| | | registry = request.registry |
| | | csrf = registry.getUtility(ICSRFStoragePolicy) |
| | | return csrf.new_csrf_token(request) |
| | | |
| | | |
| | | def check_csrf_token(request, |
| | | token='csrf_token', |
| | | header='X-CSRF-Token', |
| | | raises=True): |
| | | """ Check the CSRF token returned by the |
| | | :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the |
| | | value in ``request.POST.get(token)`` (if a POST request) or |
| | | ``request.headers.get(header)``. If a ``token`` keyword is not supplied to |
| | | this function, the string ``csrf_token`` will be used to look up the token |
| | | in ``request.POST``. If a ``header`` keyword is not supplied to this |
| | | function, the string ``X-CSRF-Token`` will be used to look up the token in |
| | | ``request.headers``. |
| | | |
| | | If the value supplied by post or by header cannot be verified by the |
| | | :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is |
| | | ``True``, this function will raise an |
| | | :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ |
| | | and ``raises`` is ``False``, this function will return ``False``. If the |
| | | CSRF check is successful, this function will return ``True`` |
| | | unconditionally. |
| | | |
| | | See :ref:`auto_csrf_checking` for information about how to secure your |
| | | application automatically against CSRF attacks. |
| | | |
| | | .. versionadded:: 1.4a2 |
| | | |
| | | .. versionchanged:: 1.7a1 |
| | | A CSRF token passed in the query string of the request is no longer |
| | | considered valid. It must be passed in either the request body or |
| | | a header. |
| | | |
| | | .. versionchanged:: 1.9 |
| | | Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated |
| | | to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to |
| | | verify the CSRF token. |
| | | |
| | | """ |
| | | supplied_token = "" |
| | | # We first check the headers for a csrf token, as that is significantly |
| | | # cheaper than checking the POST body |
| | | if header is not None: |
| | | supplied_token = request.headers.get(header, "") |
| | | |
| | | # If this is a POST/PUT/etc request, then we'll check the body to see if it |
| | | # has a token. We explicitly use request.POST here because CSRF tokens |
| | | # should never appear in an URL as doing so is a security issue. We also |
| | | # explicitly check for request.POST here as we do not support sending form |
| | | # encoded data over anything but a request.POST. |
| | | if supplied_token == "" and token is not None: |
| | | supplied_token = request.POST.get(token, "") |
| | | |
| | | policy = request.registry.getUtility(ICSRFStoragePolicy) |
| | | if not policy.check_csrf_token(request, text_(supplied_token)): |
| | | if raises: |
| | | raise BadCSRFToken('check_csrf_token(): Invalid token') |
| | | return False |
| | | return True |
| | | |
| | | |
| | | def check_csrf_origin(request, trusted_origins=None, raises=True): |
| | | """ |
| | | Check the ``Origin`` of the request to see if it is a cross site request or |
| | | not. |
| | | |
| | | If the value supplied by the ``Origin`` or ``Referer`` header isn't one of the |
| | | trusted origins and ``raises`` is ``True``, this function will raise a |
| | | :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is |
| | | ``False``, this function will return ``False`` instead. If the CSRF origin |
| | | checks are successful this function will return ``True`` unconditionally. |
| | | |
| | | Additional trusted origins may be added by passing a list of domain (and |
| | | ports if nonstandard like ``['example.com', 'dev.example.com:8080']``) in |
| | | with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` |
| | | (the default) this list of additional domains will be pulled from the |
| | | ``pyramid.csrf_trusted_origins`` setting. |
| | | |
| | | Note that this function will do nothing if ``request.scheme`` is not |
| | | ``https``. |
| | | |
| | | .. versionadded:: 1.7 |
| | | |
| | | .. versionchanged:: 1.9 |
| | | Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` |
| | | |
| | | """ |
| | | def _fail(reason): |
| | | if raises: |
| | | raise BadCSRFOrigin(reason) |
| | | else: |
| | | return False |
| | | |
| | | if request.scheme == "https": |
| | | # Suppose user visits http://example.com/ |
| | | # An active network attacker (man-in-the-middle, MITM) sends a |
| | | # POST form that targets https://example.com/detonate-bomb/ and |
| | | # submits it via JavaScript. |
| | | # |
| | | # The attacker will need to provide a CSRF cookie and token, but |
| | | # that's no problem for a MITM when we cannot make any assumptions |
| | | # about what kind of session storage is being used. So the MITM can |
| | | # circumvent the CSRF protection. This is true for any HTTP connection, |
| | | # but anyone using HTTPS expects better! For this reason, for |
| | | # https://example.com/ we need additional protection that treats |
| | | # http://example.com/ as completely untrusted. Under HTTPS, |
| | | # Barth et al. found that the Referer header is missing for |
| | | # same-domain requests in only about 0.2% of cases or less, so |
| | | # we can use strict Referer checking. |
| | | |
| | | # Determine the origin of this request |
| | | origin = request.headers.get("Origin") |
| | | if origin is None: |
| | | origin = request.referrer |
| | | |
| | | # Fail if we were not able to locate an origin at all |
| | | if not origin: |
| | | return _fail("Origin checking failed - no Origin or Referer.") |
| | | |
| | | # Parse our origin so we we can extract the required information from |
| | | # it. |
| | | originp = urlparse.urlparse(origin) |
| | | |
| | | # Ensure that our Referer is also secure. |
| | | if originp.scheme != "https": |
| | | return _fail( |
| | | "Referer checking failed - Referer is insecure while host is " |
| | | "secure." |
| | | ) |
| | | |
| | | # Determine which origins we trust, which by default will include the |
| | | # current origin. |
| | | if trusted_origins is None: |
| | | trusted_origins = aslist( |
| | | request.registry.settings.get( |
| | | "pyramid.csrf_trusted_origins", []) |
| | | ) |
| | | |
| | | if request.host_port not in set(["80", "443"]): |
| | | trusted_origins.append("{0.domain}:{0.host_port}".format(request)) |
| | | else: |
| | | trusted_origins.append(request.domain) |
| | | |
| | | # Actually check to see if the request's origin matches any of our |
| | | # trusted origins. |
| | | if not any(is_same_domain(originp.netloc, host) |
| | | for host in trusted_origins): |
| | | reason = ( |
| | | "Referer checking failed - {0} does not match any trusted " |
| | | "origins." |
| | | ) |
| | | return _fail(reason.format(origin)) |
| | | |
| | | return True |
| | |
| | | usually accessed via ``request.session``. |
| | | |
| | | Keys and values of a session must be pickleable. |
| | | |
| | | .. versionchanged:: 1.9 |
| | | |
| | | Sessions are no longer required to implement ``get_csrf_token`` and |
| | | ``new_csrf_token``. CSRF token support was moved to the pluggable |
| | | :class:`pyramid.interfaces.ICSRFStoragePolicy` configuration hook. |
| | | |
| | | """ |
| | | |
| | | # attributes |
| | |
| | | :meth:`pyramid.interfaces.ISession.flash` |
| | | """ |
| | | |
| | | def new_csrf_token(): |
| | | """ Create and set into the session a new, random cross-site request |
| | | forgery protection token. Return the token. It will be a string.""" |
| | | |
| | | def get_csrf_token(): |
| | | """ Return a random cross-site request forgery protection token. It |
| | | will be a string. If a token was previously added to the session via |
| | | ``new_csrf_token``, that token will be returned. If no CSRF token |
| | | was previously set into the session, ``new_csrf_token`` will be |
| | | class ICSRFStoragePolicy(Interface): |
| | | """ An object that offers the ability to verify CSRF tokens and generate |
| | | new ones.""" |
| | | |
| | | def new_csrf_token(request): |
| | | """ Create and return a new, random cross-site request forgery |
| | | protection token. The token will be an ascii-compatible unicode |
| | | string. |
| | | |
| | | """ |
| | | |
| | | def get_csrf_token(request): |
| | | """ Return a cross-site request forgery protection token. It |
| | | will be an ascii-compatible unicode string. If a token was previously |
| | | set for this user via ``new_csrf_token``, that token will be returned. |
| | | If no CSRF token was previously set, ``new_csrf_token`` will be |
| | | called, which will create and set a token, and this token will be |
| | | returned. |
| | | |
| | | """ |
| | | |
| | | def check_csrf_token(request, token): |
| | | """ Determine if the supplied ``token`` is valid. Most implementations |
| | | should simply compare the ``token`` to the current value of |
| | | ``get_csrf_token`` but it is possible to verify the token using |
| | | any mechanism necessary using this method. |
| | | |
| | | Returns ``True`` if the ``token`` is valid, otherwise ``False``. |
| | | |
| | | """ |
| | | |
| | | |
| | | class IIntrospector(Interface): |
| | | def get(category_name, discriminator, default=None): |
| | | """ Get the IIntrospectable related to the category_name and the |
| | |
| | | |
| | | from pyramid.compat import is_nonstr_iter |
| | | |
| | | from pyramid.session import check_csrf_token |
| | | from pyramid.csrf import check_csrf_token |
| | | from pyramid.traversal import ( |
| | | find_interface, |
| | | traversal_path, |
| | |
| | | from functools import partial |
| | | import json |
| | | import os |
| | | import re |
| | |
| | | text_type, |
| | | ) |
| | | |
| | | from pyramid.csrf import get_csrf_token |
| | | from pyramid.decorator import reify |
| | | |
| | | from pyramid.events import BeforeRender |
| | |
| | | 'context':context, |
| | | 'request':request, |
| | | 'req':request, |
| | | 'get_csrf_token':partial(get_csrf_token, request), |
| | | } |
| | | return self.render_to_response(response, system, request=request) |
| | | |
| | |
| | | 'context':getattr(request, 'context', None), |
| | | 'request':request, |
| | | 'req':request, |
| | | 'get_csrf_token':partial(get_csrf_token, request), |
| | | } |
| | | |
| | | system_values = BeforeRender(system_values, value) |
| | | |
| | | registry = self.registry |
| | | registry.notify(system_values) |
| | | |
| | | result = renderer(value, system_values) |
| | | return result |
| | | |
| | |
| | | text_, |
| | | bytes_, |
| | | native_, |
| | | urlparse, |
| | | ) |
| | | from pyramid.csrf import ( |
| | | check_csrf_origin, |
| | | check_csrf_token, |
| | | ) |
| | | |
| | | from pyramid.exceptions import ( |
| | | BadCSRFOrigin, |
| | | BadCSRFToken, |
| | | ) |
| | | from pyramid.interfaces import ISession |
| | | from pyramid.settings import aslist |
| | | from pyramid.util import ( |
| | | is_same_domain, |
| | | strings_differ, |
| | | ) |
| | | from pyramid.util import strings_differ |
| | | |
| | | |
| | | def manage_accessed(wrapped): |
| | | """ Decorator which causes a cookie to be renewed when an accessor |
| | |
| | | |
| | | return pickle.loads(pickled) |
| | | |
| | | def check_csrf_origin(request, trusted_origins=None, raises=True): |
| | | """ |
| | | Check the Origin of the request to see if it is a cross site request or |
| | | not. |
| | | |
| | | If the value supplied by the Origin or Referer header isn't one of the |
| | | trusted origins and ``raises`` is ``True``, this function will raise a |
| | | :exc:`pyramid.exceptions.BadCSRFOrigin` exception but if ``raises`` is |
| | | ``False`` this function will return ``False`` instead. If the CSRF origin |
| | | checks are successful this function will return ``True`` unconditionally. |
| | | |
| | | Additional trusted origins may be added by passing a list of domain (and |
| | | ports if nonstandard like `['example.com', 'dev.example.com:8080']`) in |
| | | with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None`` |
| | | (the default) this list of additional domains will be pulled from the |
| | | ``pyramid.csrf_trusted_origins`` setting. |
| | | |
| | | Note that this function will do nothing if request.scheme is not https. |
| | | |
| | | .. versionadded:: 1.7 |
| | | """ |
| | | def _fail(reason): |
| | | if raises: |
| | | raise BadCSRFOrigin(reason) |
| | | else: |
| | | return False |
| | | |
| | | if request.scheme == "https": |
| | | # Suppose user visits http://example.com/ |
| | | # An active network attacker (man-in-the-middle, MITM) sends a |
| | | # POST form that targets https://example.com/detonate-bomb/ and |
| | | # submits it via JavaScript. |
| | | # |
| | | # The attacker will need to provide a CSRF cookie and token, but |
| | | # that's no problem for a MITM when we cannot make any assumptions |
| | | # about what kind of session storage is being used. So the MITM can |
| | | # circumvent the CSRF protection. This is true for any HTTP connection, |
| | | # but anyone using HTTPS expects better! For this reason, for |
| | | # https://example.com/ we need additional protection that treats |
| | | # http://example.com/ as completely untrusted. Under HTTPS, |
| | | # Barth et al. found that the Referer header is missing for |
| | | # same-domain requests in only about 0.2% of cases or less, so |
| | | # we can use strict Referer checking. |
| | | |
| | | # Determine the origin of this request |
| | | origin = request.headers.get("Origin") |
| | | if origin is None: |
| | | origin = request.referrer |
| | | |
| | | # Fail if we were not able to locate an origin at all |
| | | if not origin: |
| | | return _fail("Origin checking failed - no Origin or Referer.") |
| | | |
| | | # Parse our origin so we we can extract the required information from |
| | | # it. |
| | | originp = urlparse.urlparse(origin) |
| | | |
| | | # Ensure that our Referer is also secure. |
| | | if originp.scheme != "https": |
| | | return _fail( |
| | | "Referer checking failed - Referer is insecure while host is " |
| | | "secure." |
| | | ) |
| | | |
| | | # Determine which origins we trust, which by default will include the |
| | | # current origin. |
| | | if trusted_origins is None: |
| | | trusted_origins = aslist( |
| | | request.registry.settings.get( |
| | | "pyramid.csrf_trusted_origins", []) |
| | | ) |
| | | |
| | | if request.host_port not in set(["80", "443"]): |
| | | trusted_origins.append("{0.domain}:{0.host_port}".format(request)) |
| | | else: |
| | | trusted_origins.append(request.domain) |
| | | |
| | | # Actually check to see if the request's origin matches any of our |
| | | # trusted origins. |
| | | if not any(is_same_domain(originp.netloc, host) |
| | | for host in trusted_origins): |
| | | reason = ( |
| | | "Referer checking failed - {0} does not match any trusted " |
| | | "origins." |
| | | ) |
| | | return _fail(reason.format(origin)) |
| | | |
| | | return True |
| | | |
| | | |
| | | def check_csrf_token(request, |
| | | token='csrf_token', |
| | | header='X-CSRF-Token', |
| | | raises=True): |
| | | """ Check the CSRF token in the request's session against the value in |
| | | ``request.POST.get(token)`` (if a POST request) or |
| | | ``request.headers.get(header)``. If a ``token`` keyword is not supplied to |
| | | this function, the string ``csrf_token`` will be used to look up the token |
| | | in ``request.POST``. If a ``header`` keyword is not supplied to this |
| | | function, the string ``X-CSRF-Token`` will be used to look up the token in |
| | | ``request.headers``. |
| | | |
| | | If the value supplied by post or by header doesn't match the value |
| | | supplied by ``request.session.get_csrf_token()``, and ``raises`` is |
| | | ``True``, this function will raise an |
| | | :exc:`pyramid.exceptions.BadCSRFToken` exception. |
| | | If the values differ and ``raises`` is ``False``, this function will |
| | | return ``False``. If the CSRF check is successful, this function will |
| | | return ``True`` unconditionally. |
| | | |
| | | Note that using this function requires that a :term:`session factory` is |
| | | configured. |
| | | |
| | | See :ref:`auto_csrf_checking` for information about how to secure your |
| | | application automatically against CSRF attacks. |
| | | |
| | | .. versionadded:: 1.4a2 |
| | | |
| | | .. versionchanged:: 1.7a1 |
| | | A CSRF token passed in the query string of the request is no longer |
| | | considered valid. It must be passed in either the request body or |
| | | a header. |
| | | """ |
| | | supplied_token = "" |
| | | # If this is a POST/PUT/etc request, then we'll check the body to see if it |
| | | # has a token. We explicitly use request.POST here because CSRF tokens |
| | | # should never appear in an URL as doing so is a security issue. We also |
| | | # explicitly check for request.POST here as we do not support sending form |
| | | # encoded data over anything but a request.POST. |
| | | if token is not None: |
| | | supplied_token = request.POST.get(token, "") |
| | | |
| | | # If we were unable to locate a CSRF token in a request body, then we'll |
| | | # check to see if there are any headers that have a value for us. |
| | | if supplied_token == "" and header is not None: |
| | | supplied_token = request.headers.get(header, "") |
| | | |
| | | expected_token = request.session.get_csrf_token() |
| | | if strings_differ(bytes_(expected_token), bytes_(supplied_token)): |
| | | if raises: |
| | | raise BadCSRFToken('check_csrf_token(): Invalid token') |
| | | return False |
| | | return True |
| | | |
| | | class PickleSerializer(object): |
| | | """ A serializer that uses the pickle protocol to dump Python |
| | |
| | | reissue_time=reissue_time, |
| | | set_on_exception=set_on_exception, |
| | | ) |
| | | |
| | | check_csrf_origin = check_csrf_origin # api |
| | | deprecated('check_csrf_origin', |
| | | 'pyramid.session.check_csrf_origin is deprecated as of Pyramid ' |
| | | '1.9. Use pyramid.csrf.check_csrf_origin instead.') |
| | | |
| | | check_csrf_token = check_csrf_token # api |
| | | deprecated('check_csrf_token', |
| | | 'pyramid.session.check_csrf_token is deprecated as of Pyramid ' |
| | | '1.9. Use pyramid.csrf.check_csrf_token instead.') |
| | |
| | | config.add_default_view_derivers() |
| | | config.add_default_route_predicates() |
| | | config.add_default_tweens() |
| | | config.add_default_security() |
| | | config.commit() |
| | | global have_zca |
| | | try: |
| | |
| | | def _makeOne(self, *arg, **kw): |
| | | from pyramid.config import Configurator |
| | | config = Configurator(*arg, **kw) |
| | | config.set_default_csrf_options(require_csrf=False) |
| | | return config |
| | | |
| | | def _getViewCallable(self, config, ctx_iface=None, exc_iface=None, |
| | |
| | | view = lambda r: 'OK' |
| | | config.set_default_csrf_options(require_csrf=True) |
| | | config.add_view(view, context=Exception, renderer=null_renderer) |
| | | view_intr = introspector.introspectables[1] |
| | | view_intr = introspector.introspectables[-1] |
| | | self.assertTrue(view_intr.type_name, 'view') |
| | | self.assertEqual(view_intr['callable'], view) |
| | | derived_view = view_intr['derived_callable'] |
New file |
| | |
| | | import unittest |
| | | |
| | | from pyramid import testing |
| | | from pyramid.config import Configurator |
| | | |
| | | |
| | | class TestLegacySessionCSRFStoragePolicy(unittest.TestCase): |
| | | class MockSession(object): |
| | | def __init__(self, current_token='02821185e4c94269bdc38e6eeae0a2f8'): |
| | | self.current_token = current_token |
| | | |
| | | def new_csrf_token(self): |
| | | self.current_token = 'e5e9e30a08b34ff9842ff7d2b958c14b' |
| | | return self.current_token |
| | | |
| | | def get_csrf_token(self): |
| | | return self.current_token |
| | | |
| | | def _makeOne(self): |
| | | from pyramid.csrf import LegacySessionCSRFStoragePolicy |
| | | return LegacySessionCSRFStoragePolicy() |
| | | |
| | | def test_register_session_csrf_policy(self): |
| | | from pyramid.csrf import LegacySessionCSRFStoragePolicy |
| | | from pyramid.interfaces import ICSRFStoragePolicy |
| | | |
| | | config = Configurator() |
| | | config.set_csrf_storage_policy(self._makeOne()) |
| | | config.commit() |
| | | |
| | | policy = config.registry.queryUtility(ICSRFStoragePolicy) |
| | | |
| | | self.assertTrue(isinstance(policy, LegacySessionCSRFStoragePolicy)) |
| | | |
| | | def test_session_csrf_implementation_delegates_to_session(self): |
| | | policy = self._makeOne() |
| | | request = DummyRequest(session=self.MockSession()) |
| | | |
| | | self.assertEqual( |
| | | policy.get_csrf_token(request), |
| | | '02821185e4c94269bdc38e6eeae0a2f8' |
| | | ) |
| | | self.assertEqual( |
| | | policy.new_csrf_token(request), |
| | | 'e5e9e30a08b34ff9842ff7d2b958c14b' |
| | | ) |
| | | |
| | | def test_check_csrf_token(self): |
| | | request = DummyRequest(session=self.MockSession('foo')) |
| | | |
| | | policy = self._makeOne() |
| | | self.assertTrue(policy.check_csrf_token(request, 'foo')) |
| | | self.assertFalse(policy.check_csrf_token(request, 'bar')) |
| | | |
| | | |
| | | class TestSessionCSRFStoragePolicy(unittest.TestCase): |
| | | def _makeOne(self, **kw): |
| | | from pyramid.csrf import SessionCSRFStoragePolicy |
| | | return SessionCSRFStoragePolicy(**kw) |
| | | |
| | | def test_register_session_csrf_policy(self): |
| | | from pyramid.csrf import SessionCSRFStoragePolicy |
| | | from pyramid.interfaces import ICSRFStoragePolicy |
| | | |
| | | config = Configurator() |
| | | config.set_csrf_storage_policy(self._makeOne()) |
| | | config.commit() |
| | | |
| | | policy = config.registry.queryUtility(ICSRFStoragePolicy) |
| | | |
| | | self.assertTrue(isinstance(policy, SessionCSRFStoragePolicy)) |
| | | |
| | | def test_it_creates_a_new_token(self): |
| | | request = DummyRequest(session={}) |
| | | |
| | | policy = self._makeOne() |
| | | policy._token_factory = lambda: 'foo' |
| | | self.assertEqual(policy.get_csrf_token(request), 'foo') |
| | | |
| | | def test_get_csrf_token_returns_the_new_token(self): |
| | | request = DummyRequest(session={'_csrft_': 'foo'}) |
| | | |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.get_csrf_token(request), 'foo') |
| | | |
| | | token = policy.new_csrf_token(request) |
| | | self.assertNotEqual(token, 'foo') |
| | | self.assertEqual(token, policy.get_csrf_token(request)) |
| | | |
| | | def test_check_csrf_token(self): |
| | | request = DummyRequest(session={}) |
| | | |
| | | policy = self._makeOne() |
| | | self.assertFalse(policy.check_csrf_token(request, 'foo')) |
| | | |
| | | request.session = {'_csrft_': 'foo'} |
| | | self.assertTrue(policy.check_csrf_token(request, 'foo')) |
| | | self.assertFalse(policy.check_csrf_token(request, 'bar')) |
| | | |
| | | |
| | | class TestCookieCSRFStoragePolicy(unittest.TestCase): |
| | | def _makeOne(self, **kw): |
| | | from pyramid.csrf import CookieCSRFStoragePolicy |
| | | return CookieCSRFStoragePolicy(**kw) |
| | | |
| | | def test_register_cookie_csrf_policy(self): |
| | | from pyramid.csrf import CookieCSRFStoragePolicy |
| | | from pyramid.interfaces import ICSRFStoragePolicy |
| | | |
| | | config = Configurator() |
| | | config.set_csrf_storage_policy(self._makeOne()) |
| | | config.commit() |
| | | |
| | | policy = config.registry.queryUtility(ICSRFStoragePolicy) |
| | | |
| | | self.assertTrue(isinstance(policy, CookieCSRFStoragePolicy)) |
| | | |
| | | def test_get_cookie_csrf_with_no_existing_cookie_sets_cookies(self): |
| | | response = MockResponse() |
| | | request = DummyRequest() |
| | | |
| | | policy = self._makeOne() |
| | | token = policy.get_csrf_token(request) |
| | | request.response_callback(request, response) |
| | | self.assertEqual( |
| | | response.headerlist, |
| | | [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] |
| | | ) |
| | | |
| | | def test_existing_cookie_csrf_does_not_set_cookie(self): |
| | | request = DummyRequest() |
| | | request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} |
| | | |
| | | policy = self._makeOne() |
| | | token = policy.get_csrf_token(request) |
| | | |
| | | self.assertEqual( |
| | | token, |
| | | 'e6f325fee5974f3da4315a8ccf4513d2' |
| | | ) |
| | | self.assertIsNone(request.response_callback) |
| | | |
| | | def test_new_cookie_csrf_with_existing_cookie_sets_cookies(self): |
| | | request = DummyRequest() |
| | | request.cookies = {'csrf_token': 'e6f325fee5974f3da4315a8ccf4513d2'} |
| | | |
| | | policy = self._makeOne() |
| | | token = policy.new_csrf_token(request) |
| | | |
| | | response = MockResponse() |
| | | request.response_callback(request, response) |
| | | self.assertEqual( |
| | | response.headerlist, |
| | | [('Set-Cookie', 'csrf_token={}; Path=/'.format(token))] |
| | | ) |
| | | |
| | | def test_get_csrf_token_returns_the_new_token(self): |
| | | request = DummyRequest() |
| | | request.cookies = {'csrf_token': 'foo'} |
| | | |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.get_csrf_token(request), 'foo') |
| | | |
| | | token = policy.new_csrf_token(request) |
| | | self.assertNotEqual(token, 'foo') |
| | | self.assertEqual(token, policy.get_csrf_token(request)) |
| | | |
| | | def test_check_csrf_token(self): |
| | | request = DummyRequest() |
| | | |
| | | policy = self._makeOne() |
| | | self.assertFalse(policy.check_csrf_token(request, 'foo')) |
| | | |
| | | request.cookies = {'csrf_token': 'foo'} |
| | | self.assertTrue(policy.check_csrf_token(request, 'foo')) |
| | | self.assertFalse(policy.check_csrf_token(request, 'bar')) |
| | | |
| | | class Test_get_csrf_token(unittest.TestCase): |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from pyramid.csrf import get_csrf_token |
| | | return get_csrf_token(*args, **kwargs) |
| | | |
| | | def test_no_override_csrf_utility_registered(self): |
| | | request = testing.DummyRequest() |
| | | self._callFUT(request) |
| | | |
| | | def test_success(self): |
| | | self.config.set_csrf_storage_policy(DummyCSRF()) |
| | | request = testing.DummyRequest() |
| | | |
| | | csrf_token = self._callFUT(request) |
| | | |
| | | self.assertEquals(csrf_token, '02821185e4c94269bdc38e6eeae0a2f8') |
| | | |
| | | |
| | | class Test_new_csrf_token(unittest.TestCase): |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from pyramid.csrf import new_csrf_token |
| | | return new_csrf_token(*args, **kwargs) |
| | | |
| | | def test_no_override_csrf_utility_registered(self): |
| | | request = testing.DummyRequest() |
| | | self._callFUT(request) |
| | | |
| | | def test_success(self): |
| | | self.config.set_csrf_storage_policy(DummyCSRF()) |
| | | request = testing.DummyRequest() |
| | | |
| | | csrf_token = self._callFUT(request) |
| | | |
| | | self.assertEquals(csrf_token, 'e5e9e30a08b34ff9842ff7d2b958c14b') |
| | | |
| | | |
| | | class Test_check_csrf_token(unittest.TestCase): |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | | |
| | | # set up CSRF |
| | | self.config.set_default_csrf_options(require_csrf=False) |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..csrf import check_csrf_token |
| | | return check_csrf_token(*args, **kwargs) |
| | | |
| | | def test_success_token(self): |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.POST = {'csrf_token': request.session.get_csrf_token()} |
| | | self.assertEqual(self._callFUT(request, token='csrf_token'), True) |
| | | |
| | | def test_success_header(self): |
| | | request = testing.DummyRequest() |
| | | request.headers['X-CSRF-Token'] = request.session.get_csrf_token() |
| | | self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) |
| | | |
| | | def test_success_default_token(self): |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.POST = {'csrf_token': request.session.get_csrf_token()} |
| | | self.assertEqual(self._callFUT(request), True) |
| | | |
| | | def test_success_default_header(self): |
| | | request = testing.DummyRequest() |
| | | request.headers['X-CSRF-Token'] = request.session.get_csrf_token() |
| | | self.assertEqual(self._callFUT(request), True) |
| | | |
| | | def test_failure_raises(self): |
| | | from pyramid.exceptions import BadCSRFToken |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(BadCSRFToken, self._callFUT, request, |
| | | 'csrf_token') |
| | | |
| | | def test_failure_no_raises(self): |
| | | request = testing.DummyRequest() |
| | | result = self._callFUT(request, 'csrf_token', raises=False) |
| | | self.assertEqual(result, False) |
| | | |
| | | |
| | | class Test_check_csrf_token_without_defaults_configured(unittest.TestCase): |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..csrf import check_csrf_token |
| | | return check_csrf_token(*args, **kwargs) |
| | | |
| | | def test_success_token(self): |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.POST = {'csrf_token': request.session.get_csrf_token()} |
| | | self.assertEqual(self._callFUT(request, token='csrf_token'), True) |
| | | |
| | | def test_failure_raises(self): |
| | | from pyramid.exceptions import BadCSRFToken |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(BadCSRFToken, self._callFUT, request, |
| | | 'csrf_token') |
| | | |
| | | def test_failure_no_raises(self): |
| | | request = testing.DummyRequest() |
| | | result = self._callFUT(request, 'csrf_token', raises=False) |
| | | self.assertEqual(result, False) |
| | | |
| | | |
| | | class Test_check_csrf_origin(unittest.TestCase): |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..csrf import check_csrf_origin |
| | | return check_csrf_origin(*args, **kwargs) |
| | | |
| | | def test_success_with_http(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "http" |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_referrer(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_origin(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.headers = {"Origin": "https://example.com/"} |
| | | request.referrer = "https://not-example.com/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_additional_trusted_host(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = { |
| | | "pyramid.csrf_trusted_origins": ["not-example.com"], |
| | | } |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_nonstandard_port(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = "8080" |
| | | request.referrer = "https://example.com:8080/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_fails_with_wrong_host(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_no_origin(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.referrer = None |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_when_http_to_https(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "http://example.com/evil/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_nonstandard_port(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = "8080" |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | |
| | | class DummyRequest(object): |
| | | registry = None |
| | | session = None |
| | | response_callback = None |
| | | |
| | | def __init__(self, registry=None, session=None): |
| | | self.registry = registry |
| | | self.session = session |
| | | self.cookies = {} |
| | | |
| | | def add_response_callback(self, callback): |
| | | self.response_callback = callback |
| | | |
| | | |
| | | class MockResponse(object): |
| | | def __init__(self): |
| | | self.headerlist = [] |
| | | |
| | | |
| | | class DummyCSRF(object): |
| | | def new_csrf_token(self, request): |
| | | return 'e5e9e30a08b34ff9842ff7d2b958c14b' |
| | | |
| | | def get_csrf_token(self, request): |
| | | return '02821185e4c94269bdc38e6eeae0a2f8' |
| | |
| | | self.assertEqual(helper.get_renderer(), factory.respond) |
| | | |
| | | def test_render_view(self): |
| | | import pyramid.csrf |
| | | self._registerRendererFactory() |
| | | self._registerResponseFactory() |
| | | request = Dummy() |
| | |
| | | request = testing.DummyRequest() |
| | | response = 'response' |
| | | response = helper.render_view(request, response, view, context) |
| | | get_csrf = response.app_iter[1].pop('get_csrf_token') |
| | | self.assertEqual(get_csrf.args, (request, )) |
| | | self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) |
| | | self.assertEqual(response.app_iter[0], 'response') |
| | | self.assertEqual(response.app_iter[1], |
| | | {'renderer_info': helper, |
| | |
| | | self.assertEqual(reg.event.__class__.__name__, 'BeforeRender') |
| | | |
| | | def test_render_system_values_is_None(self): |
| | | import pyramid.csrf |
| | | self._registerRendererFactory() |
| | | request = Dummy() |
| | | context = Dummy() |
| | | request.context = context |
| | | helper = self._makeOne('loo.foo') |
| | | result = helper.render('values', None, request=request) |
| | | get_csrf = result[1].pop('get_csrf_token') |
| | | self.assertEqual(get_csrf.args, (request, )) |
| | | self.assertEqual(get_csrf.func, pyramid.csrf.get_csrf_token) |
| | | system = {'request':request, |
| | | 'context':context, |
| | | 'renderer_name':'loo.foo', |
| | |
| | | result = self._callFUT(serialized, secret.decode('latin-1')) |
| | | self.assertEqual(result, '123') |
| | | |
| | | class Test_check_csrf_token(unittest.TestCase): |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..session import check_csrf_token |
| | | return check_csrf_token(*args, **kwargs) |
| | | |
| | | def test_success_token(self): |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.POST = {'csrf_token': request.session.get_csrf_token()} |
| | | self.assertEqual(self._callFUT(request, token='csrf_token'), True) |
| | | |
| | | def test_success_header(self): |
| | | request = testing.DummyRequest() |
| | | request.headers['X-CSRF-Token'] = request.session.get_csrf_token() |
| | | self.assertEqual(self._callFUT(request, header='X-CSRF-Token'), True) |
| | | |
| | | def test_success_default_token(self): |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.POST = {'csrf_token': request.session.get_csrf_token()} |
| | | self.assertEqual(self._callFUT(request), True) |
| | | |
| | | def test_success_default_header(self): |
| | | request = testing.DummyRequest() |
| | | request.headers['X-CSRF-Token'] = request.session.get_csrf_token() |
| | | self.assertEqual(self._callFUT(request), True) |
| | | |
| | | def test_failure_raises(self): |
| | | from pyramid.exceptions import BadCSRFToken |
| | | request = testing.DummyRequest() |
| | | self.assertRaises(BadCSRFToken, self._callFUT, request, |
| | | 'csrf_token') |
| | | |
| | | def test_failure_no_raises(self): |
| | | request = testing.DummyRequest() |
| | | result = self._callFUT(request, 'csrf_token', raises=False) |
| | | self.assertEqual(result, False) |
| | | |
| | | def test_token_differing_types(self): |
| | | from pyramid.compat import text_ |
| | | request = testing.DummyRequest() |
| | | request.method = "POST" |
| | | request.session['_csrft_'] = text_('foo') |
| | | request.POST = {'csrf_token': b'foo'} |
| | | self.assertEqual(self._callFUT(request, token='csrf_token'), True) |
| | | |
| | | |
| | | class Test_check_csrf_origin(unittest.TestCase): |
| | | |
| | | def _callFUT(self, *args, **kwargs): |
| | | from ..session import check_csrf_origin |
| | | return check_csrf_origin(*args, **kwargs) |
| | | |
| | | def test_success_with_http(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "http" |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_referrer(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_https_and_origin(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.headers = {"Origin": "https://example.com/"} |
| | | request.referrer = "https://not-example.com/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_additional_trusted_host(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = { |
| | | "pyramid.csrf_trusted_origins": ["not-example.com"], |
| | | } |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_success_with_nonstandard_port(self): |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = "8080" |
| | | request.referrer = "https://example.com:8080/login/" |
| | | request.registry.settings = {} |
| | | self.assertTrue(self._callFUT(request)) |
| | | |
| | | def test_fails_with_wrong_host(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "https://not-example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_no_origin(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.referrer = None |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_when_http_to_https(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com" |
| | | request.host_port = "443" |
| | | request.referrer = "http://example.com/evil/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | def test_fails_with_nonstandard_port(self): |
| | | from pyramid.exceptions import BadCSRFOrigin |
| | | request = testing.DummyRequest() |
| | | request.scheme = "https" |
| | | request.host = "example.com:8080" |
| | | request.host_port = "8080" |
| | | request.referrer = "https://example.com/login/" |
| | | request.registry.settings = {} |
| | | self.assertRaises(BadCSRFOrigin, self._callFUT, request) |
| | | self.assertFalse(self._callFUT(request, raises=False)) |
| | | |
| | | |
| | | class DummySerializer(object): |
| | | def dumps(self, value): |
| | |
| | | from pyramid.util import strings_differ |
| | | return strings_differ(*args, **kw) |
| | | |
| | | def test_it(self): |
| | | def test_it_bytes(self): |
| | | self.assertFalse(self._callFUT(b'foo', b'foo')) |
| | | self.assertTrue(self._callFUT(b'123', b'345')) |
| | | self.assertTrue(self._callFUT(b'1234', b'123')) |
| | | self.assertTrue(self._callFUT(b'123', b'1234')) |
| | | |
| | | def test_it_native_str(self): |
| | | self.assertFalse(self._callFUT('123', '123')) |
| | | self.assertTrue(self._callFUT('123', '1234')) |
| | | |
| | | def test_it_with_internal_comparator(self): |
| | | result = self._callFUT(b'foo', b'foo', compare_digest=None) |
| | | self.assertFalse(result) |
| | |
| | | |
| | | def setUp(self): |
| | | self.config = testing.setUp() |
| | | self.config.set_default_csrf_options(require_csrf=False) |
| | | |
| | | def tearDown(self): |
| | | self.config = None |
| | |
| | | ) |
| | | |
| | | from pyramid.security import NO_PERMISSION_REQUIRED |
| | | from pyramid.session import ( |
| | | from pyramid.csrf import ( |
| | | check_csrf_origin, |
| | | check_csrf_token, |
| | | ) |