| | |
| | | import datetime |
| | | from calendar import timegm |
| | | from email.utils import formatdate |
| | | from codecs import utf_8_decode |
| | | from codecs import utf_8_encode |
| | | import hashlib |
| | | import os |
| | | import time |
| | | from wsgiref.handlers import _monthname # Locale-independent, RFC-2616 |
| | | from wsgiref.handlers import _weekdayname # Locale-independent, RFC-2616 |
| | | try: |
| | | from urllib.parse import urlencode, parse_qsl |
| | | except ImportError: |
| | | from urllib import urlencode |
| | | from urlparse import parse_qsl |
| | | |
| | | from zope.interface import implementer |
| | | |
| | |
| | | from repoze.who._compat import get_cookies |
| | | import repoze.who._auth_tkt as auth_tkt |
| | | from repoze.who._compat import STRING_TYPES |
| | | from repoze.who._compat import u |
| | | |
| | | _UTCNOW = None # unit tests can replace |
| | | def _utcnow(): #pragma NO COVERAGE |
| | |
| | | return _UTCNOW |
| | | return datetime.datetime.utcnow() |
| | | |
| | | DEFAULT_DIGEST = hashlib.md5 |
| | | |
| | | @implementer(IIdentifier, IAuthenticator) |
| | | class AuthTktCookiePlugin(object): |
| | | |
| | | userid_typename = 'userid_type' |
| | | userid_type_decoders = {'int': int, |
| | | 'unicode': lambda x: utf_8_decode(x)[0], |
| | | } |
| | |
| | | |
| | | def __init__(self, secret, cookie_name='auth_tkt', |
| | | secure=False, include_ip=False, |
| | | timeout=None, reissue_time=None, userid_checker=None): |
| | | timeout=None, reissue_time=None, userid_checker=None, |
| | | digest_algo=DEFAULT_DIGEST): |
| | | self.secret = secret |
| | | self.cookie_name = cookie_name |
| | | self.include_ip = include_ip |
| | |
| | | self.timeout = timeout |
| | | self.reissue_time = reissue_time |
| | | self.userid_checker = userid_checker |
| | | self.digest_algo = digest_algo |
| | | |
| | | # IIdentifier |
| | | def identify(self, environ): |
| | |
| | | |
| | | try: |
| | | timestamp, userid, tokens, user_data = auth_tkt.parse_ticket( |
| | | self.secret, cookie.value, remote_addr) |
| | | self.secret, cookie.value, remote_addr, self.digest_algo) |
| | | except auth_tkt.BadTicket: |
| | | return None |
| | | |
| | | if self.timeout and ( (timestamp + self.timeout) < time.time() ): |
| | | return None |
| | | |
| | | userid_typename = 'userid_type:' |
| | | user_data_info = user_data.split('|') |
| | | for datum in filter(None, user_data_info): |
| | | if datum.startswith(userid_typename): |
| | | userid_type = datum[len(userid_typename):] |
| | | decoder = self.userid_type_decoders.get(userid_type) |
| | | if decoder: |
| | | userid = decoder(userid) |
| | | user_data_dict = dict(parse_qsl(user_data)) |
| | | userid_type = user_data_dict.get(self.userid_typename) |
| | | if userid_type: |
| | | decoder = self.userid_type_decoders.get(userid_type) |
| | | if decoder: |
| | | userid = decoder(userid) |
| | | |
| | | environ['REMOTE_USER_TOKENS'] = tokens |
| | | environ['REMOTE_USER_DATA'] = user_data |
| | |
| | | identity['timestamp'] = timestamp |
| | | identity['repoze.who.plugins.auth_tkt.userid'] = userid |
| | | identity['tokens'] = tokens |
| | | identity['userdata'] = user_data |
| | | identity['userdata'] = user_data_dict |
| | | return identity |
| | | |
| | | # IIdentifier |
| | |
| | | if old_cookie_value: |
| | | try: |
| | | timestamp,userid,tokens,userdata = auth_tkt.parse_ticket( |
| | | self.secret, old_cookie_value, remote_addr) |
| | | self.secret, old_cookie_value, remote_addr, |
| | | self.digest_algo) |
| | | except auth_tkt.BadTicket: |
| | | pass |
| | | tokens = tuple(tokens) |
| | | |
| | | who_userid = identity['repoze.who.userid'] |
| | | who_tokens = tuple(identity.get('tokens', ())) |
| | | who_userdata = identity.get('userdata', '') |
| | | who_userdata_dict = identity.get('userdata', {}) |
| | | |
| | | encoding_data = self.userid_type_encoders.get(type(who_userid)) |
| | | if encoding_data: |
| | | encoding, encoder = encoding_data |
| | | who_userid = encoder(who_userid) |
| | | # XXX we are discarding the userdata passed in the identity? |
| | | who_userdata = 'userid_type:%s' % encoding |
| | | |
| | | who_userdata_dict[self.userid_typename] = encoding |
| | | |
| | | who_userdata = urlencode(who_userdata_dict) |
| | | |
| | | old_data = (userid, tokens, userdata) |
| | | new_data = (who_userid, who_tokens, who_userdata) |
| | | |
| | |
| | | tokens=who_tokens, |
| | | user_data=who_userdata, |
| | | cookie_name=self.cookie_name, |
| | | secure=self.secure) |
| | | secure=self.secure, |
| | | digest_algo=self.digest_algo) |
| | | new_cookie_value = ticket.cookie_value() |
| | | |
| | | if old_cookie_value != new_cookie_value: |
| | |
| | | timeout=None, |
| | | reissue_time=None, |
| | | userid_checker=None, |
| | | digest_algo=DEFAULT_DIGEST, |
| | | ): |
| | | from repoze.who.utils import resolveDotted |
| | | if (secret is None and secretfile is None): |
| | |
| | | reissue_time = int(reissue_time) |
| | | if userid_checker is not None: |
| | | userid_checker = resolveDotted(userid_checker) |
| | | if isinstance(digest_algo, str): |
| | | try: |
| | | digest_algo = getattr(hashlib, digest_algo) |
| | | except AttributeError: |
| | | raise ValueError("No such 'digest_algo': %s" % digest_algo) |
| | | |
| | | plugin = AuthTktCookiePlugin(secret, |
| | | cookie_name, |
| | | _bool(secure), |
| | |
| | | timeout, |
| | | reissue_time, |
| | | userid_checker, |
| | | digest_algo, |
| | | ) |
| | | return plugin |
| | | |