Michael Merickel
2018-10-18 41f103af2745c336a3bcdc715e70ef3cb5d1e545
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
import uuid
 
from webob.cookies import CookieProfile
from zope.interface import implementer
 
 
from pyramid.compat import bytes_, urlparse, text_
from pyramid.exceptions import BadCSRFOrigin, BadCSRFToken
from pyramid.interfaces import ICSRFStoragePolicy
from pyramid.settings import aslist
from pyramid.util import SimpleSerializer, is_same_domain, strings_differ
 
 
@implementer(ICSRFStoragePolicy)
class LegacySessionCSRFStoragePolicy(object):
    """ A CSRF storage policy that defers control of CSRF storage to the
    session.
 
    This policy maintains compatibility with legacy ISession implementations
    that know how to manage CSRF tokens themselves via
    ``ISession.new_csrf_token`` and ``ISession.get_csrf_token``.
 
    Note that using this CSRF implementation requires that
    a :term:`session factory` is configured.
 
    .. versionadded:: 1.9
 
    """
 
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the session and returns it. """
        return request.session.new_csrf_token()
 
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token from the session,
        generating a new one if needed."""
        return request.session.get_csrf_token()
 
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token)
        )
 
 
@implementer(ICSRFStoragePolicy)
class SessionCSRFStoragePolicy(object):
    """ A CSRF storage policy that persists the CSRF token in the session.
 
    Note that using this CSRF implementation requires that
    a :term:`session factory` is configured.
 
    ``key``
 
        The session key where the CSRF token will be stored.
        Default: `_csrft_`.
 
    .. versionadded:: 1.9
 
    """
 
    _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
 
    def __init__(self, key='_csrft_'):
        self.key = key
 
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the session and returns it. """
        token = self._token_factory()
        request.session[self.key] = token
        return token
 
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token from the session,
        generating a new one if needed."""
        token = request.session.get(self.key, None)
        if not token:
            token = self.new_csrf_token(request)
        return token
 
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token)
        )
 
 
@implementer(ICSRFStoragePolicy)
class CookieCSRFStoragePolicy(object):
    """ An alternative CSRF implementation that stores its information in
    unauthenticated cookies, known as the 'Double Submit Cookie' method in the
    `OWASP CSRF guidelines <https://www.owasp.org/index.php/
    Cross-Site_Request_Forgery_(CSRF)_Prevention_Cheat_Sheet#
    Double_Submit_Cookie>`_. This gives some additional flexibility with
    regards to scaling as the tokens can be generated and verified by a
    front-end server.
 
    .. versionadded:: 1.9
 
    .. versionchanged: 1.10
 
       Added the ``samesite`` option and made the default ``'Lax'``.
 
    """
 
    _token_factory = staticmethod(lambda: text_(uuid.uuid4().hex))
 
    def __init__(
        self,
        cookie_name='csrf_token',
        secure=False,
        httponly=False,
        domain=None,
        max_age=None,
        path='/',
        samesite='Lax',
    ):
        serializer = SimpleSerializer()
        self.cookie_profile = CookieProfile(
            cookie_name=cookie_name,
            secure=secure,
            max_age=max_age,
            httponly=httponly,
            path=path,
            domains=[domain],
            serializer=serializer,
            samesite=samesite,
        )
        self.cookie_name = cookie_name
 
    def new_csrf_token(self, request):
        """ Sets a new CSRF token into the request and returns it. """
        token = self._token_factory()
        request.cookies[self.cookie_name] = token
 
        def set_cookie(request, response):
            self.cookie_profile.set_cookies(response, token)
 
        request.add_response_callback(set_cookie)
        return token
 
    def get_csrf_token(self, request):
        """ Returns the currently active CSRF token by checking the cookies
        sent with the current request."""
        bound_cookies = self.cookie_profile.bind(request)
        token = bound_cookies.get_value()
        if not token:
            token = self.new_csrf_token(request)
        return token
 
    def check_csrf_token(self, request, supplied_token):
        """ Returns ``True`` if the ``supplied_token`` is valid."""
        expected_token = self.get_csrf_token(request)
        return not strings_differ(
            bytes_(expected_token), bytes_(supplied_token)
        )
 
 
def get_csrf_token(request):
    """ Get the currently active CSRF token for the request passed, generating
    a new one using ``new_csrf_token(request)`` if one does not exist. This
    calls the equivalent method in the chosen CSRF protection implementation.
 
    .. versionadded :: 1.9
 
    """
    registry = request.registry
    csrf = registry.getUtility(ICSRFStoragePolicy)
    return csrf.get_csrf_token(request)
 
 
def new_csrf_token(request):
    """ Generate a new CSRF token for the request passed and persist it in an
    implementation defined manner. This calls the equivalent method in the
    chosen CSRF protection implementation.
 
    .. versionadded :: 1.9
 
    """
    registry = request.registry
    csrf = registry.getUtility(ICSRFStoragePolicy)
    return csrf.new_csrf_token(request)
 
 
def check_csrf_token(
    request, token='csrf_token', header='X-CSRF-Token', raises=True
):
    """ Check the CSRF token returned by the
    :class:`pyramid.interfaces.ICSRFStoragePolicy` implementation against the
    value in ``request.POST.get(token)`` (if a POST request) or
    ``request.headers.get(header)``. If a ``token`` keyword is not supplied to
    this function, the string ``csrf_token`` will be used to look up the token
    in ``request.POST``. If a ``header`` keyword is not supplied to this
    function, the string ``X-CSRF-Token`` will be used to look up the token in
    ``request.headers``.
 
    If the value supplied by post or by header cannot be verified by the
    :class:`pyramid.interfaces.ICSRFStoragePolicy`, and ``raises`` is
    ``True``, this function will raise an
    :exc:`pyramid.exceptions.BadCSRFToken` exception. If the values differ
    and ``raises`` is ``False``, this function will return ``False``.  If the
    CSRF check is successful, this function will return ``True``
    unconditionally.
 
    See :ref:`auto_csrf_checking` for information about how to secure your
    application automatically against CSRF attacks.
 
    .. versionadded:: 1.4a2
 
    .. versionchanged:: 1.7a1
       A CSRF token passed in the query string of the request is no longer
       considered valid. It must be passed in either the request body or
       a header.
 
    .. versionchanged:: 1.9
       Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf` and updated
       to use the configured :class:`pyramid.interfaces.ICSRFStoragePolicy` to
       verify the CSRF token.
 
    """
    supplied_token = ""
    # We first check the headers for a csrf token, as that is significantly
    # cheaper than checking the POST body
    if header is not None:
        supplied_token = request.headers.get(header, "")
 
    # If this is a POST/PUT/etc request, then we'll check the body to see if it
    # has a token. We explicitly use request.POST here because CSRF tokens
    # should never appear in an URL as doing so is a security issue. We also
    # explicitly check for request.POST here as we do not support sending form
    # encoded data over anything but a request.POST.
    if supplied_token == "" and token is not None:
        supplied_token = request.POST.get(token, "")
 
    policy = request.registry.getUtility(ICSRFStoragePolicy)
    if not policy.check_csrf_token(request, text_(supplied_token)):
        if raises:
            raise BadCSRFToken('check_csrf_token(): Invalid token')
        return False
    return True
 
 
def check_csrf_origin(request, trusted_origins=None, raises=True):
    """
    Check the ``Origin`` of the request to see if it is a cross site request or
    not.
 
    If the value supplied by the ``Origin`` or ``Referer`` header isn't one of
    the trusted origins and ``raises`` is ``True``, this function will raise a
    :exc:`pyramid.exceptions.BadCSRFOrigin` exception, but if ``raises`` is
    ``False``, this function will return ``False`` instead. If the CSRF origin
    checks are successful this function will return ``True`` unconditionally.
 
    Additional trusted origins may be added by passing a list of domain (and
    ports if non-standard like ``['example.com', 'dev.example.com:8080']``) in
    with the ``trusted_origins`` parameter. If ``trusted_origins`` is ``None``
    (the default) this list of additional domains will be pulled from the
    ``pyramid.csrf_trusted_origins`` setting.
 
    Note that this function will do nothing if ``request.scheme`` is not
    ``https``.
 
    .. versionadded:: 1.7
 
    .. versionchanged:: 1.9
       Moved from :mod:`pyramid.session` to :mod:`pyramid.csrf`
 
    """
 
    def _fail(reason):
        if raises:
            raise BadCSRFOrigin(reason)
        else:
            return False
 
    if request.scheme == "https":
        # Suppose user visits http://example.com/
        # An active network attacker (man-in-the-middle, MITM) sends a
        # POST form that targets https://example.com/detonate-bomb/ and
        # submits it via JavaScript.
        #
        # The attacker will need to provide a CSRF cookie and token, but
        # that's no problem for a MITM when we cannot make any assumptions
        # about what kind of session storage is being used. So the MITM can
        # circumvent the CSRF protection. This is true for any HTTP connection,
        # but anyone using HTTPS expects better! For this reason, for
        # https://example.com/ we need additional protection that treats
        # http://example.com/ as completely untrusted. Under HTTPS,
        # Barth et al. found that the Referer header is missing for
        # same-domain requests in only about 0.2% of cases or less, so
        # we can use strict Referer checking.
 
        # Determine the origin of this request
        origin = request.headers.get("Origin")
        if origin is None:
            origin = request.referrer
 
        # Fail if we were not able to locate an origin at all
        if not origin:
            return _fail("Origin checking failed - no Origin or Referer.")
 
        # Parse our origin so we we can extract the required information from
        # it.
        originp = urlparse.urlparse(origin)
 
        # Ensure that our Referer is also secure.
        if originp.scheme != "https":
            return _fail(
                "Referer checking failed - Referer is insecure while host is "
                "secure."
            )
 
        # Determine which origins we trust, which by default will include the
        # current origin.
        if trusted_origins is None:
            trusted_origins = aslist(
                request.registry.settings.get(
                    "pyramid.csrf_trusted_origins", []
                )
            )
 
        if request.host_port not in set(["80", "443"]):
            trusted_origins.append("{0.domain}:{0.host_port}".format(request))
        else:
            trusted_origins.append(request.domain)
 
        # Actually check to see if the request's origin matches any of our
        # trusted origins.
        if not any(
            is_same_domain(originp.netloc, host) for host in trusted_origins
        ):
            reason = (
                "Referer checking failed - {0} does not match any trusted "
                "origins."
            )
            return _fail(reason.format(origin))
 
    return True