From 0c29cf2df41600d3906d521c72991c7686018b71 Mon Sep 17 00:00:00 2001 From: Michael Merickel <michael@merickel.org> Date: Mon, 15 Oct 2018 16:24:07 +0200 Subject: [PATCH] format source using black --- src/pyramid/authentication.py | 344 +++++++++++++++++++++++++++++++++----------------------- 1 files changed, 203 insertions(+), 141 deletions(-) diff --git a/src/pyramid/authentication.py b/src/pyramid/authentication.py index a9604e3..f4c2b51 100644 --- a/src/pyramid/authentication.py +++ b/src/pyramid/authentication.py @@ -21,17 +21,11 @@ bytes_, ascii_native_, native_, - ) +) -from pyramid.interfaces import ( - IAuthenticationPolicy, - IDebugLogger, - ) +from pyramid.interfaces import IAuthenticationPolicy, IDebugLogger -from pyramid.security import ( - Authenticated, - Everyone, - ) +from pyramid.security import Authenticated, Everyone from pyramid.util import strings_differ from pyramid.util import SimpleSerializer @@ -74,36 +68,41 @@ debug and self._log( 'call to unauthenticated_userid returned None; returning None', 'authenticated_userid', - request) + request, + ) return None if self._clean_principal(userid) is None: debug and self._log( - ('use of userid %r is disallowed by any built-in Pyramid ' - 'security policy, returning None' % userid), + ( + 'use of userid %r is disallowed by any built-in Pyramid ' + 'security policy, returning None' % userid + ), 'authenticated_userid', - request) + request, + ) return None if self.callback is None: debug and self._log( 'there was no groupfinder callback; returning %r' % (userid,), 'authenticated_userid', - request) + request, + ) return userid callback_ok = self.callback(userid, request) - if callback_ok is not None: # is not None! + if callback_ok is not None: # is not None! debug and self._log( - 'groupfinder callback returned %r; returning %r' % ( - callback_ok, userid), + 'groupfinder callback returned %r; returning %r' + % (callback_ok, userid), 'authenticated_userid', - request - ) + request, + ) return userid debug and self._log( 'groupfinder callback returned None; returning None', 'authenticated_userid', - request - ) + request, + ) def effective_principals(self, request): """ A list of effective principals derived from request. @@ -134,42 +133,45 @@ if userid is None: debug and self._log( - 'unauthenticated_userid returned %r; returning %r' % ( - userid, effective_principals), + 'unauthenticated_userid returned %r; returning %r' + % (userid, effective_principals), 'effective_principals', - request - ) + request, + ) return effective_principals if self._clean_principal(userid) is None: debug and self._log( - ('unauthenticated_userid returned disallowed %r; returning %r ' - 'as if it was None' % (userid, effective_principals)), + ( + 'unauthenticated_userid returned disallowed %r; returning %r ' + 'as if it was None' % (userid, effective_principals) + ), 'effective_principals', - request - ) + request, + ) return effective_principals if self.callback is None: debug and self._log( 'groupfinder callback is None, so groups is []', 'effective_principals', - request) + request, + ) groups = [] else: groups = self.callback(userid, request) debug and self._log( 'groupfinder callback returned %r as groups' % (groups,), 'effective_principals', - request) + request, + ) - if groups is None: # is None! + if groups is None: # is None! debug and self._log( - 'returning effective principals: %r' % ( - effective_principals,), + 'returning effective principals: %r' % (effective_principals,), 'effective_principals', - request - ) + request, + ) return effective_principals effective_principals.append(Authenticated) @@ -177,10 +179,9 @@ effective_principals.extend(groups) debug and self._log( - 'returning effective principals: %r' % ( - effective_principals,), + 'returning effective principals: %r' % (effective_principals,), 'effective_principals', - request + request, ) return effective_principals @@ -241,7 +242,8 @@ self.debug and self._log( 'repoze.who identity is None, returning None', 'authenticated_userid', - request) + request, + ) return None userid = identity['repoze.who.userid'] @@ -250,21 +252,25 @@ self.debug and self._log( 'repoze.who.userid is None, returning None' % userid, 'authenticated_userid', - request) + request, + ) return None if self._clean_principal(userid) is None: self.debug and self._log( - ('use of userid %r is disallowed by any built-in Pyramid ' - 'security policy, returning None' % userid), + ( + 'use of userid %r is disallowed by any built-in Pyramid ' + 'security policy, returning None' % userid + ), 'authenticated_userid', - request) + request, + ) return None if self.callback is None: return userid - if self.callback(identity, request) is not None: # is not None! + if self.callback(identity, request) is not None: # is not None! return userid def unauthenticated_userid(self, request): @@ -292,11 +298,13 @@ if identity is None: self.debug and self._log( - ('repoze.who identity was None; returning %r' % - effective_principals), + ( + 'repoze.who identity was None; returning %r' + % effective_principals + ), 'effective_principals', - request - ) + request, + ) return effective_principals if self.callback is None: @@ -304,33 +312,39 @@ else: groups = self.callback(identity, request) - if groups is None: # is None! + if groups is None: # is None! self.debug and self._log( - ('security policy groups callback returned None; returning %r' % - effective_principals), + ( + 'security policy groups callback returned None; returning %r' + % effective_principals + ), 'effective_principals', - request - ) + request, + ) return effective_principals userid = identity['repoze.who.userid'] if userid is None: self.debug and self._log( - ('repoze.who.userid was None; returning %r' % - effective_principals), + ( + 'repoze.who.userid was None; returning %r' + % effective_principals + ), 'effective_principals', - request - ) + request, + ) return effective_principals if self._clean_principal(userid) is None: self.debug and self._log( - ('unauthenticated_userid returned disallowed %r; returning %r ' - 'as if it was None' % (userid, effective_principals)), + ( + 'unauthenticated_userid returned disallowed %r; returning %r ' + 'as if it was None' % (userid, effective_principals) + ), 'effective_principals', - request - ) + request, + ) return effective_principals effective_principals.append(Authenticated) @@ -366,6 +380,7 @@ return [] identity = self._get_identity(request) return identifier.forget(request.environ, identity) + @implementer(IAuthenticationPolicy) class RemoteUserAuthenticationPolicy(CallbackAuthenticationPolicy): @@ -418,6 +433,7 @@ forgetting the user. This will be application-specific and can be done somewhere else or in a subclass.""" return [] + @implementer(IAuthenticationPolicy) class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy): @@ -586,24 +602,25 @@ """ - def __init__(self, - secret, - callback=None, - cookie_name='auth_tkt', - secure=False, - include_ip=False, - timeout=None, - reissue_time=None, - max_age=None, - path="/", - http_only=False, - wild_domain=True, - debug=False, - hashalg='sha512', - parent_domain=False, - domain=None, - samesite='Lax', - ): + def __init__( + self, + secret, + callback=None, + cookie_name='auth_tkt', + secure=False, + include_ip=False, + timeout=None, + reissue_time=None, + max_age=None, + path="/", + http_only=False, + wild_domain=True, + debug=False, + hashalg='sha512', + parent_domain=False, + domain=None, + samesite='Lax', + ): self.cookie = AuthTktCookieHelper( secret, cookie_name=cookie_name, @@ -619,7 +636,7 @@ parent_domain=parent_domain, domain=domain, samesite=samesite, - ) + ) self.callback = callback self.debug = debug @@ -643,11 +660,14 @@ """ A list of headers which will delete appropriate cookies.""" return self.cookie.forget(request) + def b64encode(v): return base64.b64encode(bytes_(v)).strip().replace(b'\n', b'') + def b64decode(v): return base64.b64decode(bytes_(v)) + # this class licensed under the MIT license (stolen from Paste) class AuthTicket(object): @@ -670,9 +690,18 @@ """ - def __init__(self, secret, userid, ip, tokens=(), user_data='', - time=None, cookie_name='auth_tkt', secure=False, - hashalg='md5'): + def __init__( + self, + secret, + userid, + ip, + tokens=(), + user_data='', + time=None, + cookie_name='auth_tkt', + secure=False, + hashalg='md5', + ): self.secret = secret self.userid = userid self.ip = ip @@ -688,16 +717,26 @@ def digest(self): return calculate_digest( - self.ip, self.time, self.secret, self.userid, self.tokens, - self.user_data, self.hashalg) + self.ip, + self.time, + self.secret, + self.userid, + self.tokens, + self.user_data, + self.hashalg, + ) def cookie_value(self): - v = '%s%08x%s!' % (self.digest(), int(self.time), - url_quote(self.userid)) + v = '%s%08x%s!' % ( + self.digest(), + int(self.time), + url_quote(self.userid), + ) if self.tokens: v += self.tokens + '!' v += self.user_data return v + # this class licensed under the MIT license (stolen from Paste) class BadTicket(Exception): @@ -706,9 +745,11 @@ determine what the expected digest should have been, expected is set. This should not be shown by default, but can be useful for debugging. """ + def __init__(self, msg, expected=None): self.expected = expected Exception.__init__(self, msg) + # this function licensed under the MIT license (stolen from Paste) def parse_ticket(secret, ticket, ip, hashalg='md5'): @@ -722,37 +763,41 @@ digest_size = hashlib.new(hashalg).digest_size * 2 digest = ticket[:digest_size] try: - timestamp = int(ticket[digest_size:digest_size + 8], 16) + timestamp = int(ticket[digest_size : digest_size + 8], 16) except ValueError as e: raise BadTicket('Timestamp is not a hex integer: %s' % e) try: - userid, data = ticket[digest_size + 8:].split('!', 1) + userid, data = ticket[digest_size + 8 :].split('!', 1) except ValueError: raise BadTicket('userid is not followed by !') userid = url_unquote(userid) if '!' in data: tokens, user_data = data.split('!', 1) - else: # pragma: no cover (never generated) + else: # pragma: no cover (never generated) # @@: Is this the right order? tokens = '' user_data = data - expected = calculate_digest(ip, timestamp, secret, - userid, tokens, user_data, hashalg) + expected = calculate_digest( + ip, timestamp, secret, userid, tokens, user_data, hashalg + ) # Avoid timing attacks (see # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf) if strings_differ(expected, digest): - raise BadTicket('Digest signature is not correct', - expected=(expected, digest)) + raise BadTicket( + 'Digest signature is not correct', expected=(expected, digest) + ) tokens = tokens.split(',') return (timestamp, userid, tokens, user_data) + # this function licensed under the MIT license (stolen from Paste) -def calculate_digest(ip, timestamp, secret, userid, tokens, user_data, - hashalg='md5'): +def calculate_digest( + ip, timestamp, secret, userid, tokens, user_data, hashalg='md5' +): secret = bytes_(secret, 'utf-8') userid = bytes_(userid, 'utf-8') tokens = bytes_(tokens, 'utf-8') @@ -767,23 +812,28 @@ # encode_ip_timestamp not required, left in for backwards compatibility ip_timestamp = encode_ip_timestamp(ip, timestamp) - hash_obj.update(ip_timestamp + secret + userid + b'\0' + - tokens + b'\0' + user_data) + hash_obj.update( + ip_timestamp + secret + userid + b'\0' + tokens + b'\0' + user_data + ) digest = hash_obj.hexdigest() hash_obj2 = hashlib.new(hashalg) hash_obj2.update(bytes_(digest) + secret) return hash_obj2.hexdigest() + # this function licensed under the MIT license (stolen from Paste) def encode_ip_timestamp(ip, timestamp): ip_chars = ''.join(map(chr, map(int, ip.split('.')))) t = int(timestamp) - ts = ((t & 0xff000000) >> 24, - (t & 0xff0000) >> 16, - (t & 0xff00) >> 8, - t & 0xff) + ts = ( + (t & 0xFF000000) >> 24, + (t & 0xFF0000) >> 16, + (t & 0xFF00) >> 8, + t & 0xFF, + ) ts_chars = ''.join(map(chr, ts)) return bytes_(ip_chars + ts_chars) + class AuthTktCookieHelper(object): """ @@ -792,41 +842,43 @@ :class:`pyramid.authentication.AuthTktAuthenticationPolicy` for the meanings of the constructor arguments. """ - parse_ticket = staticmethod(parse_ticket) # for tests - AuthTicket = AuthTicket # for tests - BadTicket = BadTicket # for tests - now = None # for tests + + parse_ticket = staticmethod(parse_ticket) # for tests + AuthTicket = AuthTicket # for tests + BadTicket = BadTicket # for tests + now = None # for tests userid_type_decoders = { - 'int':int, - 'unicode':lambda x: utf_8_decode(x)[0], # bw compat for old cookies + 'int': int, + 'unicode': lambda x: utf_8_decode(x)[0], # bw compat for old cookies 'b64unicode': lambda x: utf_8_decode(b64decode(x))[0], 'b64str': lambda x: b64decode(x), - } + } userid_type_encoders = { int: ('int', str), long: ('int', str), text_type: ('b64unicode', lambda x: b64encode(utf_8_encode(x)[0])), binary_type: ('b64str', lambda x: b64encode(x)), - } + } - def __init__(self, - secret, - cookie_name='auth_tkt', - secure=False, - include_ip=False, - timeout=None, - reissue_time=None, - max_age=None, - http_only=False, - path="/", - wild_domain=True, - hashalg='md5', - parent_domain=False, - domain=None, - samesite='Lax', - ): + def __init__( + self, + secret, + cookie_name='auth_tkt', + secure=False, + include_ip=False, + timeout=None, + reissue_time=None, + max_age=None, + http_only=False, + path="/", + wild_domain=True, + hashalg='md5', + parent_domain=False, + domain=None, + samesite='Lax', + ): serializer = SimpleSerializer() @@ -845,7 +897,9 @@ self.secure = secure self.include_ip = include_ip self.timeout = timeout if timeout is None else int(timeout) - self.reissue_time = reissue_time if reissue_time is None else int(reissue_time) + self.reissue_time = ( + reissue_time if reissue_time is None else int(reissue_time) + ) self.max_age = max_age if max_age is None else int(max_age) self.wild_domain = wild_domain self.parent_domain = parent_domain @@ -893,16 +947,17 @@ try: timestamp, userid, tokens, user_data = self.parse_ticket( - self.secret, cookie, remote_addr, self.hashalg) + self.secret, cookie, remote_addr, self.hashalg + ) except self.BadTicket: return None - now = self.now # service tests + now = self.now # service tests if now is None: now = time_mod.time() - if self.timeout and ( (timestamp + self.timeout) < now ): + if self.timeout and ((timestamp + self.timeout) < now): # the auth_tkt data has expired return None @@ -910,7 +965,7 @@ user_data_info = user_data.split('|') for datum in filter(None, user_data_info): if datum.startswith(userid_typename): - userid_type = datum[len(userid_typename):] + userid_type = datum[len(userid_typename) :] decoder = self.userid_type_decoders.get(userid_type) if decoder: userid = decoder(userid) @@ -918,15 +973,18 @@ reissue = self.reissue_time is not None if reissue and not hasattr(request, '_authtkt_reissued'): - if ( (now - timestamp) > self.reissue_time ): + if (now - timestamp) > self.reissue_time: # See https://github.com/Pylons/pyramid/issues#issue/108 tokens = list(filter(None, tokens)) - headers = self.remember(request, userid, max_age=self.max_age, - tokens=tokens) + headers = self.remember( + request, userid, max_age=self.max_age, tokens=tokens + ) + def reissue_authtkt(request, response): if not hasattr(request, '_authtkt_reissue_revoked'): for k, v in headers: response.headerlist.append((k, v)) + request.add_response_callback(reissue_authtkt) request._authtkt_reissued = True @@ -987,7 +1045,8 @@ "AuthTktAuthenticationPolicy. Explicitly converting to string " "and storing as base64. Subsequent requests will receive a " "string as the userid, it will not be decoded back to the type " - "provided.".format(type(userid)), RuntimeWarning + "provided.".format(type(userid)), + RuntimeWarning, ) encoding, encoder = self.userid_type_encoders.get(text_type) userid = str(userid) @@ -1018,11 +1077,12 @@ user_data=user_data, cookie_name=self.cookie_name, secure=self.secure, - hashalg=self.hashalg - ) + hashalg=self.hashalg, + ) cookie_value = ticket.cookie_value() return self._get_cookies(request, cookie_value, max_age) + @implementer(IAuthenticationPolicy) class SessionAuthenticationPolicy(CallbackAuthenticationPolicy): @@ -1123,6 +1183,7 @@ return response return HTTPForbidden() """ + def __init__(self, check, realm='Realm', debug=False): self.check = check self.realm = realm @@ -1158,7 +1219,8 @@ HTTPBasicCredentials = namedtuple( - 'HTTPBasicCredentials', ['username', 'password']) + 'HTTPBasicCredentials', ['username', 'password'] +) def extract_http_basic_credentials(request): @@ -1183,7 +1245,7 @@ try: authbytes = b64decode(auth.strip()) - except (TypeError, binascii.Error): # can't decode + except (TypeError, binascii.Error): # can't decode return None # try utf-8 first, then latin-1; see discussion in @@ -1195,7 +1257,7 @@ try: username, password = auth.split(':', 1) - except ValueError: # not enough values to unpack + except ValueError: # not enough values to unpack return None return HTTPBasicCredentials(username, password) -- Gitblit v1.9.3