import datetime
|
from calendar import timegm
|
from email.utils import formatdate
|
from codecs import utf_8_decode
|
from codecs import utf_8_encode
|
import os
|
import time
|
try:
|
import hashlib
|
except ImportError:
|
import md5 as hashlib # Will only support md5 algorithm
|
from wsgiref.handlers import _monthname # Locale-independent, RFC-2616
|
from wsgiref.handlers import _weekdayname # Locale-independent, RFC-2616
|
try:
|
from urllib.parse import urlencode, parse_qsl
|
except ImportError:
|
from urllib import urlencode
|
from urlparse import parse_qsl
|
|
from zope.interface import implementer
|
|
from repoze.who.interfaces import IIdentifier
|
from repoze.who.interfaces import IAuthenticator
|
from repoze.who._compat import get_cookies
|
import repoze.who._auth_tkt as auth_tkt
|
from repoze.who._compat import STRING_TYPES
|
from repoze.who._compat import u
|
|
_UTCNOW = None # unit tests can replace
|
def _utcnow(): #pragma NO COVERAGE
|
if _UTCNOW is not None:
|
return _UTCNOW
|
return datetime.datetime.utcnow()
|
|
DEFAULT_DIGEST = hashlib.md5
|
|
@implementer(IIdentifier, IAuthenticator)
|
class AuthTktCookiePlugin(object):
|
|
userid_typename = 'userid_type'
|
userid_type_decoders = {'int': int,
|
'unicode': lambda x: utf_8_decode(x)[0],
|
}
|
|
userid_type_encoders = {int: ('int', str),
|
}
|
try:
|
userid_type_encoders[long] = ('int', str)
|
except NameError: #pragma NO COVER Python >= 3.0
|
pass
|
try:
|
userid_type_encoders[unicode] = ('unicode',
|
lambda x: utf_8_encode(x)[0])
|
except NameError: #pragma NO COVER Python >= 3.0
|
pass
|
|
def __init__(self, secret, cookie_name='auth_tkt',
|
secure=False, include_ip=False,
|
timeout=None, reissue_time=None, userid_checker=None,
|
digest_algo=DEFAULT_DIGEST):
|
self.secret = secret
|
self.cookie_name = cookie_name
|
self.include_ip = include_ip
|
self.secure = secure
|
if timeout and ( (not reissue_time) or (reissue_time > timeout) ):
|
raise ValueError('When timeout is specified, reissue_time must '
|
'be set to a lower value')
|
self.timeout = timeout
|
self.reissue_time = reissue_time
|
self.userid_checker = userid_checker
|
self.digest_algo = digest_algo
|
|
# IIdentifier
|
def identify(self, environ):
|
cookies = get_cookies(environ)
|
cookie = cookies.get(self.cookie_name)
|
|
if cookie is None or not cookie.value:
|
return None
|
|
if self.include_ip:
|
remote_addr = environ['REMOTE_ADDR']
|
else:
|
remote_addr = '0.0.0.0'
|
|
try:
|
timestamp, userid, tokens, user_data = auth_tkt.parse_ticket(
|
self.secret, cookie.value, remote_addr, self.digest_algo)
|
except auth_tkt.BadTicket:
|
return None
|
|
if self.timeout and ( (timestamp + self.timeout) < time.time() ):
|
return None
|
|
user_data_dict = dict(parse_qsl(user_data))
|
userid_type = user_data_dict.get(self.userid_typename)
|
if userid_type:
|
decoder = self.userid_type_decoders.get(userid_type)
|
if decoder:
|
userid = decoder(userid)
|
|
environ['REMOTE_USER_TOKENS'] = tokens
|
environ['REMOTE_USER_DATA'] = user_data
|
environ['AUTH_TYPE'] = 'cookie'
|
|
identity = {}
|
identity['timestamp'] = timestamp
|
identity['repoze.who.plugins.auth_tkt.userid'] = userid
|
identity['tokens'] = tokens
|
identity['userdata'] = user_data_dict
|
return identity
|
|
# IIdentifier
|
def forget(self, environ, identity):
|
# return a set of expires Set-Cookie headers
|
return self._get_cookies(environ, 'INVALID', 0)
|
|
# IIdentifier
|
def remember(self, environ, identity):
|
if self.include_ip:
|
remote_addr = environ['REMOTE_ADDR']
|
else:
|
remote_addr = '0.0.0.0'
|
|
cookies = get_cookies(environ)
|
old_cookie = cookies.get(self.cookie_name)
|
existing = cookies.get(self.cookie_name)
|
old_cookie_value = getattr(existing, 'value', None)
|
max_age = identity.get('max_age', None)
|
|
timestamp, userid, tokens, userdata = None, '', (), ''
|
|
if old_cookie_value:
|
try:
|
timestamp,userid,tokens,userdata = auth_tkt.parse_ticket(
|
self.secret, old_cookie_value, remote_addr,
|
self.digest_algo)
|
except auth_tkt.BadTicket:
|
pass
|
tokens = tuple(tokens)
|
|
who_userid = identity['repoze.who.userid']
|
who_tokens = tuple(identity.get('tokens', ()))
|
who_userdata_dict = identity.get('userdata', {})
|
|
encoding_data = self.userid_type_encoders.get(type(who_userid))
|
if encoding_data:
|
encoding, encoder = encoding_data
|
who_userid = encoder(who_userid)
|
who_userdata_dict[self.userid_typename] = encoding
|
|
who_userdata = urlencode(who_userdata_dict)
|
|
old_data = (userid, tokens, userdata)
|
new_data = (who_userid, who_tokens, who_userdata)
|
|
if old_data != new_data or (self.reissue_time and
|
( (timestamp + self.reissue_time) < time.time() )):
|
ticket = auth_tkt.AuthTicket(
|
self.secret,
|
who_userid,
|
remote_addr,
|
tokens=who_tokens,
|
user_data=who_userdata,
|
cookie_name=self.cookie_name,
|
secure=self.secure,
|
digest_algo=self.digest_algo)
|
new_cookie_value = ticket.cookie_value()
|
|
if old_cookie_value != new_cookie_value:
|
# return a set of Set-Cookie headers
|
return self._get_cookies(environ, new_cookie_value, max_age)
|
|
# IAuthenticator
|
def authenticate(self, environ, identity):
|
userid = identity.get('repoze.who.plugins.auth_tkt.userid')
|
if userid is None:
|
return None
|
if self.userid_checker and not self.userid_checker(userid):
|
return None
|
identity['repoze.who.userid'] = userid
|
return userid
|
|
def _get_cookies(self, environ, value, max_age=None):
|
if max_age is not None:
|
max_age = int(max_age)
|
later = _utcnow() + datetime.timedelta(seconds=max_age)
|
# Wdy, DD-Mon-YY HH:MM:SS GMT
|
expires = "%s, %02d %3s %4d %02d:%02d:%02d GMT" % (
|
_weekdayname[later.weekday()],
|
later.day,
|
_monthname[later.month],
|
later.year,
|
later.hour,
|
later.minute,
|
later.second,
|
)
|
# the Expires header is *required* at least for IE7 (IE7 does
|
# not respect Max-Age)
|
max_age = "; Max-Age=%s; Expires=%s" % (max_age, expires)
|
else:
|
max_age = ''
|
|
secure = ''
|
if self.secure:
|
secure = '; secure; HttpOnly'
|
|
cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
|
cur_domain = cur_domain.split(':')[0] # drop port
|
wild_domain = '.' + cur_domain
|
cookies = [
|
('Set-Cookie', '%s="%s"; Path=/%s%s' % (
|
self.cookie_name, value, max_age, secure)),
|
('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s%s' % (
|
self.cookie_name, value, cur_domain, max_age, secure)),
|
('Set-Cookie', '%s="%s"; Path=/; Domain=%s%s%s' % (
|
self.cookie_name, value, wild_domain, max_age, secure))
|
]
|
return cookies
|
|
def __repr__(self):
|
return '<%s %s>' % (self.__class__.__name__,
|
id(self)) #pragma NO COVERAGE
|
|
def _bool(value):
|
if isinstance(value, STRING_TYPES):
|
return value.lower() in ('yes', 'true', '1')
|
return value
|
|
def make_plugin(secret=None,
|
secretfile=None,
|
cookie_name='auth_tkt',
|
secure=False,
|
include_ip=False,
|
timeout=None,
|
reissue_time=None,
|
userid_checker=None,
|
digest_algo=DEFAULT_DIGEST,
|
):
|
from repoze.who.utils import resolveDotted
|
if (secret is None and secretfile is None):
|
raise ValueError("One of 'secret' or 'secretfile' must not be None.")
|
if (secret is not None and secretfile is not None):
|
raise ValueError("Specify only one of 'secret' or 'secretfile'.")
|
if secretfile:
|
secretfile = os.path.abspath(os.path.expanduser(secretfile))
|
if not os.path.exists(secretfile):
|
raise ValueError("No such 'secretfile': %s" % secretfile)
|
with open(secretfile) as f:
|
secret = f.read().strip()
|
if timeout:
|
timeout = int(timeout)
|
if reissue_time:
|
reissue_time = int(reissue_time)
|
if userid_checker is not None:
|
userid_checker = resolveDotted(userid_checker)
|
if isinstance(digest_algo, str):
|
try:
|
digest_algo = getattr(hashlib, digest_algo)
|
except AttributeError:
|
raise ValueError("No such 'digest_algo': %s" % digest_algo)
|
|
plugin = AuthTktCookiePlugin(secret,
|
cookie_name,
|
_bool(secure),
|
_bool(include_ip),
|
timeout,
|
reissue_time,
|
userid_checker,
|
digest_algo,
|
)
|
return plugin
|