Tres Seaver
2012-03-19 ff80e0cafbed76db5dc9293d7376743a5fba8760
Back to 100% coverage under 2.x.

One remaining failure under 3.2.
2 files modified
327 ■■■■■ changed files
repoze/who/_auth_tkt.py 217 ●●●●● patch | view | raw | blame | history
repoze/who/tests/test__auth_tkt.py 110 ●●●●● patch | view | raw | blame | history
repoze/who/_auth_tkt.py
@@ -40,9 +40,7 @@
import time as time_mod
from repoze.who._compat import encodestring
from repoze.who._compat import get_cookies
from repoze.who._compat import SimpleCookie
from repoze.who._compat import STRING_TYPES
from repoze.who._compat import url_quote
from repoze.who._compat import url_unquote
@@ -183,7 +181,7 @@
    return digest
if type(chr(1)) == type(b''):
if type(chr(1)) == type(b''): #pragma NO COVER Python < 3.0
    def ints2bytes(ints):
        return b''.join(map(chr, ints))
else: #pragma NO COVER Python >= 3.0
@@ -207,215 +205,4 @@
    return s
class AuthTKTMiddleware(object):
    """
    Middleware that checks for signed cookies that match what
    `mod_auth_tkt <http://www.openfusion.com.au/labs/mod_auth_tkt/>`_
    looks for (if you have mod_auth_tkt installed, you don't need this
    middleware, since Apache will set the environmental variables for
    you).
    Arguments:
    ``secret``:
        A secret that should be shared by any instances of this application.
        If this app is served from more than one machine, they should all
        have the same secret.
    ``cookie_name``:
        The name of the cookie to read and write from.  Default ``auth_tkt``.
    ``secure``:
        If the cookie should be set as 'secure' (only sent over SSL) and if
        the login must be over SSL. (Defaults to False)
    ``httponly``:
        If the cookie should be marked as HttpOnly, which means that it's
        not accessible to JavaScript. (Defaults to False)
    ``include_ip``:
        If the cookie should include the user's IP address.  If so, then
        if they change IPs their cookie will be invalid.
    ``logout_path``:
        The path under this middleware that should signify a logout.  The
        page will be shown as usual, but the user will also be logged out
        when they visit this page.
    If used with mod_auth_tkt, then these settings (except logout_path) should
    match the analogous Apache configuration settings.
    This also adds two functions to the request:
    ``environ['repoze.who._auth_tkt.set_user'](userid, tokens='',
                                               user_data='')``
        This sets a cookie that logs the user in.  ``tokens`` is a
        string (comma-separated groups) or a list of strings.
        ``user_data`` is a string for your own use.
    ``environ['repoze.who._auth_tkt.logout_user']()``
        Logs out the user.
    """
    def __init__(self, app, secret, cookie_name='auth_tkt', secure=False,
                 include_ip=True, logout_path=None, httponly=False,
                 no_domain_cookie=True, current_domain_cookie=True,
                 wildcard_cookie=True):
        self.app = app
        self.secret = secret
        self.cookie_name = cookie_name
        self.secure = secure
        self.httponly = httponly
        self.include_ip = include_ip
        self.logout_path = logout_path
        self.no_domain_cookie = no_domain_cookie
        self.current_domain_cookie = current_domain_cookie
        self.wildcard_cookie = wildcard_cookie
    def __call__(self, environ, start_response):
        #cookies = request.get_cookies(environ)
        cookies = get_cookies(environ)
        if self.cookie_name in cookies:
            cookie_value = cookies[self.cookie_name].value
        else:
            cookie_value = ''
        if cookie_value:
            if self.include_ip:
                remote_addr = environ['REMOTE_ADDR']
            else:
                # mod_auth_tkt uses this dummy value when IP is not
                # checked:
                remote_addr = '0.0.0.0'
            # @@: This should handle bad signatures better:
            # Also, timeouts should cause cookie refresh
            try:
                timestamp, userid, tokens, user_data = parse_ticket(
                    self.secret, cookie_value, remote_addr)
                tokens = ','.join(tokens)
                environ['REMOTE_USER'] = userid
                if environ.get('REMOTE_USER_TOKENS'):
                    # We want to add tokens/roles to what's there:
                    tokens = environ['REMOTE_USER_TOKENS'] + ',' + tokens
                environ['REMOTE_USER_TOKENS'] = tokens
                environ['REMOTE_USER_DATA'] = user_data
                environ['AUTH_TYPE'] = 'cookie'
            except BadTicket:
                # bad credentials, just ignore without logging the user
                # in or anything
                pass
        set_cookies = []
        def set_user(userid, tokens='', user_data=''):
            set_cookies.extend(self.set_user_cookie(
                environ, userid, tokens, user_data))
        def logout_user():
            set_cookies.extend(self.logout_user_cookie(environ))
        environ['repoze.who._auth_tkt.set_user'] = set_user
        environ['repoze.who._auth_tkt.logout_user'] = logout_user
        if self.logout_path and environ.get('PATH_INFO') == self.logout_path:
            logout_user()
        def cookie_setting_start_response(status, headers, exc_info=None):
            headers.extend(set_cookies)
            return start_response(status, headers, exc_info)
        return self.app(environ, cookie_setting_start_response)
    def set_user_cookie(self, environ, userid, tokens, user_data):
        if not isinstance(tokens, STRING_TYPES):
            tokens = ','.join(tokens)
        if self.include_ip:
            remote_addr = environ['REMOTE_ADDR']
        else:
            remote_addr = '0.0.0.0'
        ticket = AuthTicket(
            self.secret,
            userid,
            remote_addr,
            tokens=tokens,
            user_data=user_data,
            cookie_name=self.cookie_name,
            secure=self.secure)
        # @@: Should we set REMOTE_USER etc in the current
        # environment right now as well?
        cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
        wild_domain = '.' + cur_domain
        cookie_options = ""
        if self.secure:
            cookie_options += "; secure"
        if self.httponly:
            cookie_options += "; HttpOnly"
        cookies = []
        if self.no_domain_cookie:
            cookies.append(('Set-Cookie', '%s=%s; Path=/%s' % (
                self.cookie_name, ticket.cookie_value(), cookie_options)))
        if self.current_domain_cookie:
            cookies.append(('Set-Cookie', '%s=%s; Path=/; Domain=%s%s' % (
                self.cookie_name, ticket.cookie_value(), cur_domain,
                cookie_options)))
        if self.wildcard_cookie:
            cookies.append(('Set-Cookie', '%s=%s; Path=/; Domain=%s%s' % (
                self.cookie_name, ticket.cookie_value(), wild_domain,
                cookie_options)))
        return cookies
    def logout_user_cookie(self, environ):
        cur_domain = environ.get('HTTP_HOST', environ.get('SERVER_NAME'))
        wild_domain = '.' + cur_domain
        expires = 'Sat, 01-Jan-2000 12:00:00 GMT'
        cookies = [
            ('Set-Cookie', '%s=""; Expires="%s"; Path=/' %
             (self.cookie_name, expires)),
            ('Set-Cookie', '%s=""; Expires="%s"; Path=/; Domain=%s' %
             (self.cookie_name, expires, cur_domain)),
            ('Set-Cookie', '%s=""; Expires="%s"; Path=/; Domain=%s' %
             (self.cookie_name, expires, wild_domain)),
            ]
        return cookies
def asbool(obj):
    # Lifted from paste.deploy.converters
    if isinstance(obj, STRING_TYPES):
        obj = obj.strip().lower()
        if obj in ['true', 'yes', 'on', 'y', 't', '1']:
            return True
        elif obj in ['false', 'no', 'off', 'n', 'f', '0']:
            return False
        else:
            raise ValueError(
                "String is not true/false: %r" % obj)
    return bool(obj)
def make_auth_tkt_middleware(
    app,
    global_conf,
    secret=None,
    cookie_name='auth_tkt',
    secure=False,
    include_ip=True,
    logout_path=None,
    ):
    """
    Creates the `AuthTKTMiddleware
    <class-repoze.who._auth_tkt.AuthTKTMiddleware.html>`_.
    ``secret`` is required, but can be set globally or locally.
    """
    secure = asbool(secure)
    include_ip = asbool(include_ip)
    if secret is None:
        secret = global_conf.get('secret')
    if not secret:
        raise ValueError(
            "You must provide a 'secret' (in global or local configuration)")
    return AuthTKTMiddleware(
        app, secret, cookie_name, secure, include_ip, logout_path or None)
# Original Paste AuthTktMiddleware stripped:  we don't have a use for it.
repoze/who/tests/test__auth_tkt.py
@@ -89,6 +89,116 @@
        self.assertEqual(cookie['oatmeal']['secure'], 'true')
 
class BadTicketTests(unittest.TestCase):
    def _getTargetClass(self):
        from .._auth_tkt import BadTicket
        return BadTicket
    def _makeOne(self, *args, **kw):
        return self._getTargetClass()(*args, **kw)
    def test_wo_expected(self):
        exc = self._makeOne('message')
        self.assertEqual(exc.args, ('message',))
        self.assertEqual(exc.expected, None)
    def test_w_expected(self):
        exc = self._makeOne('message', 'foo')
        self.assertEqual(exc.args, ('message',))
        self.assertEqual(exc.expected, 'foo')
class Test_parse_ticket(unittest.TestCase):
    def _callFUT(self, secret='SEEKRIT', ticket=None, ip='1.2.3.4'):
        from .._auth_tkt import parse_ticket
        return parse_ticket(secret, ticket, ip)
    def test_bad_timestamp(self):
        from .._auth_tkt import BadTicket
        TICKET = '12345678901234567890123456789012XXXXXXXXuserid!'
        try:
            self._callFUT(ticket=TICKET)
        except BadTicket as e:
            self.failUnless(e.args[0].startswith(
                            'Timestamp is not a hex integer:'))
        else:
            self.fail('Did not raise')
    def test_no_bang_after_userid(self):
        from .._auth_tkt import BadTicket
        TICKET = '1234567890123456789012345678901201020304userid'
        try:
            self._callFUT(ticket=TICKET)
        except BadTicket as e:
            self.assertEqual(e.args[0], 'userid is not followed by !')
        else:
            self.fail('Did not raise')
    def test_wo_tokens_or_data_bad_digest(self):
        from .._auth_tkt import BadTicket
        TICKET = '1234567890123456789012345678901201020304userid!'
        try:
            self._callFUT(ticket=TICKET)
        except BadTicket as e:
            self.assertEqual(e.args[0], 'Digest signature is not correct')
        else:
            self.fail('Did not raise')
    def test_wo_tokens_or_data_ok_digest(self):
        from .._auth_tkt import calculate_digest
        digest = calculate_digest('1.2.3.4', _WHEN, 'SEEKRIT', 'USERID', '', '')
        TICKET = '%s%08xUSERID!' % (digest, _WHEN)
        timestamp, userid, tokens, user_data = self._callFUT(ticket=TICKET)
        self.assertEqual(timestamp, _WHEN)
        self.assertEqual(userid, 'USERID')
        self.assertEqual(tokens, [''])
        self.assertEqual(user_data, '')
    def test_w_tokens_and_data_ok_digest(self):
        from .._auth_tkt import calculate_digest
        digest = calculate_digest('1.2.3.4', _WHEN, 'SEEKRIT', 'USERID',
                                  'a,b', 'DATA')
        TICKET = '%s%08xUSERID!a,b!DATA' % (digest, _WHEN)
        timestamp, userid, tokens, user_data = self._callFUT(ticket=TICKET)
        self.assertEqual(timestamp, _WHEN)
        self.assertEqual(userid, 'USERID')
        self.assertEqual(tokens, ['a', 'b'])
        self.assertEqual(user_data, 'DATA')
class Test_helpers(unittest.TestCase):
    # calculate_digest is not very testable, and fully exercised throug callers.
    def test_ints_to_bytes(self):
        from struct import pack
        from .._auth_tkt import ints2bytes
        self.assertEqual(ints2bytes([1, 2, 3, 4]), pack('>BBBB', 1, 2, 3, 4))
    def test_encode_ip_timestamp(self):
        from struct import pack
        from .._auth_tkt import encode_ip_timestamp
        self.assertEqual(encode_ip_timestamp('1.2.3.4', _WHEN),
                         pack('>BBBBL', 1, 2, 3, 4, _WHEN))
    def test_maybe_encode_bytes(self):
        from .._auth_tkt import maybe_encode
        foo = b'foo'
        self.failUnless(maybe_encode(foo) is foo)
    def test_maybe_encode_native_string(self):
        from .._auth_tkt import maybe_encode
        foo = 'foo'
        self.assertEqual(maybe_encode(foo), b'foo')
    def test_maybe_encode_unicode(self):
        from .._auth_tkt import maybe_encode
        from .._compat import u
        foo = u('foo')
        self.assertEqual(maybe_encode(foo), b'foo')
_WHEN = 1234567
class _Timemod(object):