David Tulloh
2016-04-20 7483ece150564b242ba0ac5c091319ee570dd9e1
repoze/who/plugins/auth_tkt.py
@@ -1,35 +1,74 @@
import datetime
from calendar import timegm
from email.utils import formatdate
from codecs import utf_8_decode
from codecs import utf_8_encode
import os
import time
try:
    import hashlib
except ImportError:
    import md5 as hashlib # Will only support md5 algorithm
from wsgiref.handlers import _monthname     # Locale-independent, RFC-2616
from wsgiref.handlers import _weekdayname   # Locale-independent, RFC-2616
try:
    from urllib.parse import urlencode, parse_qsl
except ImportError:
    from urllib import urlencode
    from urlparse import parse_qsl
from paste.request import get_cookies
from paste.auth import auth_tkt
from zope.interface import implements
from zope.interface import implementer
from repoze.who.interfaces import IIdentifier
from repoze.who.interfaces import IAuthenticator
from repoze.who._compat import get_cookies
import repoze.who._auth_tkt as auth_tkt
from repoze.who._compat import STRING_TYPES
from repoze.who._compat import u
_UTCNOW = None  # unit tests can replace
def _utcnow():  #pragma NO COVERAGE
    if _UTCNOW is not None:
        return _UTCNOW
    return datetime.datetime.utcnow()
DEFAULT_DIGEST = hashlib.md5
@implementer(IIdentifier, IAuthenticator)
class AuthTktCookiePlugin(object):
    implements(IIdentifier)
    userid_typename = 'userid_type'
    userid_type_decoders = {'int': int,
                            'unicode': lambda x: utf_8_decode(x)[0],
                           }
    userid_type_decoders = {
        'int':int,
        'unicode':lambda x: utf_8_decode(x)[0],
        }
    userid_type_encoders = {
        int: ('int', str),
        long: ('int', str),
        unicode: ('unicode', lambda x: utf_8_encode(x)[0]),
        }
    userid_type_encoders = {int: ('int', str),
                           }
    try:
        userid_type_encoders[long] = ('int', str)
    except NameError: #pragma NO COVER Python >= 3.0
        pass
    try:
        userid_type_encoders[unicode] = ('unicode',
                                         lambda x: utf_8_encode(x)[0])
    except NameError: #pragma NO COVER Python >= 3.0
        pass
    def __init__(self, secret, cookie_name='auth_tkt',
                 secure=False, include_ip=False):
                 secure=False, include_ip=False,
                 timeout=None, reissue_time=None, userid_checker=None,
                 digest_algo=DEFAULT_DIGEST):
        self.secret = secret
        self.cookie_name = cookie_name
        self.include_ip = include_ip
        self.secure = secure
        if timeout and ( (not reissue_time) or (reissue_time > timeout) ):
            raise ValueError('When timeout is specified, reissue_time must '
                             'be set to a lower value')
        self.timeout = timeout
        self.reissue_time = reissue_time
        self.userid_checker = userid_checker
        self.digest_algo = digest_algo
    # IIdentifier
    def identify(self, environ):
@@ -46,18 +85,19 @@
        
        try:
            timestamp, userid, tokens, user_data = auth_tkt.parse_ticket(
                self.secret, cookie.value, remote_addr)
                self.secret, cookie.value, remote_addr, self.digest_algo)
        except auth_tkt.BadTicket:
            return None
        userid_typename = 'userid_type:'
        user_data_info = user_data.split('|')
        for datum in filter(None, user_data_info):
            if datum.startswith(userid_typename):
                userid_type = datum[len(userid_typename):]
                decoder = self.userid_type_decoders.get(userid_type)
                if decoder:
                    userid = decoder(userid)
        if self.timeout and ( (timestamp + self.timeout) < time.time() ):
            return None
        user_data_dict = dict(parse_qsl(user_data))
        userid_type = user_data_dict.get(self.userid_typename)
        if userid_type:
            decoder = self.userid_type_decoders.get(userid_type)
            if decoder:
                userid = decoder(userid)
            
        environ['REMOTE_USER_TOKENS'] = tokens
        environ['REMOTE_USER_DATA'] = user_data
@@ -65,28 +105,15 @@
        identity = {}
        identity['timestamp'] = timestamp
        identity['repoze.who.userid'] = userid
        identity['repoze.who.plugins.auth_tkt.userid'] = userid
        identity['tokens'] = tokens
        identity['userdata'] = user_data
        identity['userdata'] = user_data_dict
        return identity
    def _get_cookies(self, environ, value):
        cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
        wild_domain = '.' + cur_domain
        cookies = [
            ('Set-Cookie', '%s="%s"; Path=/' % (
            self.cookie_name, value)),
            ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % (
            self.cookie_name, value, cur_domain)),
            ('Set-Cookie', '%s="%s"; Path=/; Domain=%s' % (
            self.cookie_name, value, wild_domain))
            ]
        return cookies
    # IIdentifier
    def forget(self, environ, identity):
        # return a set of expires Set-Cookie headers
        return self._get_cookies(environ, '""')
        return self._get_cookies(environ, 'INVALID', 0)
    
    # IIdentifier
    def remember(self, environ, identity):
@@ -99,34 +126,36 @@
        old_cookie = cookies.get(self.cookie_name)
        existing = cookies.get(self.cookie_name)
        old_cookie_value = getattr(existing, 'value', None)
        max_age = identity.get('max_age', None)
        timestamp, userid, tokens, userdata = None, '', '', ''
        timestamp, userid, tokens, userdata = None, '', (), ''
        if old_cookie_value:
            try:
                timestamp,userid,tokens,userdata = auth_tkt.parse_ticket(
                    self.secret, old_cookie_value, remote_addr)
                    self.secret, old_cookie_value, remote_addr,
                    self.digest_algo)
            except auth_tkt.BadTicket:
                pass
        tokens = tuple(tokens)
        who_userid = identity['repoze.who.userid']
        who_tokens = identity.get('tokens', '')
        who_userdata = identity.get('userdata', '')
        who_tokens = tuple(identity.get('tokens', ()))
        who_userdata_dict = identity.get('userdata', {})
        encoding_data = self.userid_type_encoders.get(type(who_userid))
        if encoding_data:
            encoding, encoder = encoding_data
            who_userid = encoder(who_userid)
            who_userdata = 'userid_type:%s' % encoding
        if not isinstance(tokens, basestring):
            tokens = ','.join(tokens)
        if not isinstance(who_tokens, basestring):
            who_tokens = ','.join(who_tokens)
            who_userdata_dict[self.userid_typename] = encoding
        who_userdata = urlencode(who_userdata_dict)
        old_data = (userid, tokens, userdata)
        new_data = (who_userid, who_tokens, who_userdata)
        if old_data != new_data:
        if old_data != new_data or (self.reissue_time and
                ( (timestamp + self.reissue_time) < time.time() )):
            ticket = auth_tkt.AuthTicket(
                self.secret,
                who_userid,
@@ -134,20 +163,67 @@
                tokens=who_tokens,
                user_data=who_userdata,
                cookie_name=self.cookie_name,
                secure=self.secure)
                secure=self.secure,
                digest_algo=self.digest_algo)
            new_cookie_value = ticket.cookie_value()
            
            cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
            wild_domain = '.' + cur_domain
            if old_cookie_value != new_cookie_value:
                # return a set of Set-Cookie headers
                return self._get_cookies(environ, new_cookie_value)
                return self._get_cookies(environ, new_cookie_value, max_age)
    # IAuthenticator
    def authenticate(self, environ, identity):
        userid = identity.get('repoze.who.plugins.auth_tkt.userid')
        if userid is None:
            return None
        if self.userid_checker and not self.userid_checker(userid):
            return None
        identity['repoze.who.userid'] = userid
        return userid
    def _get_cookies(self, environ, value, max_age=None):
        if max_age is not None:
            max_age = int(max_age)
            later = _utcnow() + datetime.timedelta(seconds=max_age)
            # Wdy, DD-Mon-YY HH:MM:SS GMT
            expires = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
                _weekdayname[later.weekday()],
                later.day,
                _monthname[later.month],
                later.year,
                later.hour,
                later.minute,
                later.second,
            )
            # the Expires header is *required* at least for IE7 (IE7 does
            # not respect Max-Age)
            max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
        else:
            max_age = ''
        secure = ''
        if self.secure:
            secure = '; secure; HttpOnly'
        cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
        cur_domain = cur_domain.split(':')[0] # drop port
        wild_domain = '.' + cur_domain
        cookies = [
            ('Set-Cookie', '%s="%s"; Path=/%s%s' % (
            self.cookie_name, value, max_age, secure)),
            ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s%s' % (
            self.cookie_name, value, cur_domain, max_age, secure)),
            ('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s%s' % (
            self.cookie_name, value, wild_domain, max_age, secure))
            ]
        return cookies
    def __repr__(self):
        return '<%s %s>' % (self.__class__.__name__, id(self))
        return '<%s %s>' % (self.__class__.__name__,
                            id(self)) #pragma NO COVERAGE
def _bool(value):
    if isinstance(value, basestring):
    if isinstance(value, STRING_TYPES):
        return value.lower() in ('yes', 'true', '1')
    return value
@@ -156,7 +232,12 @@
                cookie_name='auth_tkt',
                secure=False,
                include_ip=False,
                timeout=None,
                reissue_time=None,
                userid_checker=None,
                digest_algo=DEFAULT_DIGEST,
               ):
    from repoze.who.utils import resolveDotted
    if (secret is None and secretfile is None):
        raise ValueError("One of 'secret' or 'secretfile' must not be None.")
    if (secret is not None and secretfile is not None):
@@ -165,8 +246,28 @@
        secretfile = os.path.abspath(os.path.expanduser(secretfile))
        if not os.path.exists(secretfile):
            raise ValueError("No such 'secretfile': %s" % secretfile)
        secret = open(secretfile).read().strip()
    plugin = AuthTktCookiePlugin(secret, cookie_name,
                                 _bool(secure), _bool(include_ip))
        with open(secretfile) as f:
            secret = f.read().strip()
    if timeout:
        timeout = int(timeout)
    if reissue_time:
        reissue_time = int(reissue_time)
    if userid_checker is not None:
        userid_checker = resolveDotted(userid_checker)
    if isinstance(digest_algo, str):
        try:
            digest_algo = getattr(hashlib, digest_algo)
        except AttributeError:
            raise ValueError("No such 'digest_algo': %s" % digest_algo)
    plugin = AuthTktCookiePlugin(secret,
                                 cookie_name,
                                 _bool(secure),
                                 _bool(include_ip),
                                 timeout,
                                 reissue_time,
                                 userid_checker,
                                 digest_algo,
                                 )
    return plugin