Apply Paul Johnston's patch for allowing a timeout and a reissue_time for auth_tkt cookies: http://bugs.repoze.org/issue83
| | |
| | | |
| | | .. module:: repoze.who.plugins.auth_tkt |
| | | |
| | | .. class:: AuthTktCookiePlugin(secret [, cookie_name='auth_tkt' [, secure=False [, include_ip=False]]]) |
| | | .. class:: AuthTktCookiePlugin(secret [, cookie_name='auth_tkt' [, secure=False [, include_ip=False [, timeout=None [, reissue_time=None]]]]]) |
| | | |
| | | An :class:`AuthTktCookiePlugin` is an ``IIdentifier`` plugin which |
| | | remembers its identity state in a client-side cookie. This plugin |
| | |
| | | will be sent across any HTTP or HTTPS connection; if it is True, the |
| | | cookie will be sent only across an HTTPS connection. If |
| | | *include_ip* is True, the ``REMOTE_ADDR`` of the WSGI environment |
| | | will be placed in the cookie. |
| | | will be placed in the cookie. If *timeout* is specfied, it is the |
| | | maximum age in seconds allowed for a cookie. If *reissue_time* is |
| | | specified, when we encounter a cookie that is older than the reissue |
| | | time (in seconds), but younger that the timeout, a new cookie will |
| | | be issued. If *timeout* is specified, you must also set |
| | | *reissue_time* to a lower value. |
| | | |
| | | .. note:: |
| | | Using the *include_ip* setting for public-facing applications may |
| | |
| | | from codecs import utf_8_decode |
| | | from codecs import utf_8_encode |
| | | import os |
| | | import time |
| | | |
| | | from paste.request import get_cookies |
| | | from paste.auth import auth_tkt |
| | |
| | | } |
| | | |
| | | def __init__(self, secret, cookie_name='auth_tkt', |
| | | secure=False, include_ip=False): |
| | | secure=False, include_ip=False, |
| | | timeout=None, reissue_time=None): |
| | | self.secret = secret |
| | | self.cookie_name = cookie_name |
| | | self.include_ip = include_ip |
| | | self.secure = secure |
| | | if timeout and ( (not reissue_time) or (reissue_time > timeout) ): |
| | | raise ValueError('When timeout is specified, reissue_time must ' |
| | | 'be set to a lower value') |
| | | self.timeout = timeout |
| | | self.reissue_time = reissue_time |
| | | |
| | | # IIdentifier |
| | | def identify(self, environ): |
| | |
| | | timestamp, userid, tokens, user_data = auth_tkt.parse_ticket( |
| | | self.secret, cookie.value, remote_addr) |
| | | except auth_tkt.BadTicket: |
| | | return None |
| | | |
| | | if self.timeout and ( (timestamp + self.timeout) < time.time() ): |
| | | return None |
| | | |
| | | userid_typename = 'userid_type:' |
| | |
| | | old_data = (userid, tokens, userdata) |
| | | new_data = (who_userid, who_tokens, who_userdata) |
| | | |
| | | if old_data != new_data: |
| | | if old_data != new_data or (self.reissue_time and |
| | | ( (timestamp + self.reissue_time) < time.time() )): |
| | | ticket = auth_tkt.AuthTicket( |
| | | self.secret, |
| | | who_userid, |
| | |
| | | cookie_name='auth_tkt', |
| | | secure=False, |
| | | include_ip=False, |
| | | timeout=None, |
| | | reissue_time=None, |
| | | ): |
| | | if (secret is None and secretfile is None): |
| | | raise ValueError("One of 'secret' or 'secretfile' must not be None.") |
| | |
| | | if not os.path.exists(secretfile): |
| | | raise ValueError("No such 'secretfile': %s" % secretfile) |
| | | secret = open(secretfile).read().strip() |
| | | plugin = AuthTktCookiePlugin(secret, cookie_name, |
| | | _bool(secure), _bool(include_ip)) |
| | | if timeout: |
| | | timeout = int(timeout) |
| | | if reissue_time: |
| | | reissue_time = int(reissue_time) |
| | | plugin = AuthTktCookiePlugin(secret, |
| | | cookie_name, |
| | | _bool(secure), |
| | | _bool(include_ip), |
| | | timeout, |
| | | reissue_time, |
| | | ) |
| | | return plugin |
| | | |
| | |
| | | |
| | | def _makeTicket(self, userid='userid', remote_addr='0.0.0.0', |
| | | tokens = [], userdata='userdata', |
| | | cookie_name='auth_tkt', secure=False): |
| | | cookie_name='auth_tkt', secure=False, |
| | | time=None): |
| | | from paste.auth import auth_tkt |
| | | ticket = auth_tkt.AuthTicket( |
| | | 'secret', |
| | |
| | | remote_addr, |
| | | tokens=tokens, |
| | | user_data=userdata, |
| | | time=time, |
| | | cookie_name=cookie_name, |
| | | secure=secure) |
| | | return ticket.cookie_value() |
| | |
| | | result = plugin.identify(environ) |
| | | self.assertEqual(result, None) |
| | | |
| | | def test_identify_bad_cookie_expired(self): |
| | | import time |
| | | plugin = self._makeOne('secret', timeout=2, reissue_time=1) |
| | | val = self._makeTicket(userid='userid', time=time.time()-3) |
| | | environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val}) |
| | | result = plugin.identify(environ) |
| | | self.assertEqual(result, None) |
| | | |
| | | def test_remember_creds_same(self): |
| | | plugin = self._makeOne('secret') |
| | | val = self._makeTicket(userid='userid') |
| | |
| | | ('Set-Cookie', |
| | | 'auth_tkt="%s"; Path=/' % new_val)) |
| | | |
| | | def test_remember_creds_reissue(self): |
| | | import time |
| | | plugin = self._makeOne('secret', reissue_time=1) |
| | | old_val = self._makeTicket(userid='userid', userdata='', time=time.time()-2) |
| | | environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) |
| | | new_val = self._makeTicket(userid='userid', userdata='') |
| | | result = plugin.remember(environ, {'repoze.who.userid':'userid', |
| | | 'userdata':''}) |
| | | self.assertEqual(type(result[0][1]), str) |
| | | self.assertEqual(len(result), 3) |
| | | self.assertEqual(result[0], |
| | | ('Set-Cookie', |
| | | 'auth_tkt="%s"; Path=/' % new_val)) |
| | | |
| | | def test_forget(self): |
| | | plugin = self._makeOne('secret') |
| | | environ = self._makeEnviron() |
| | |
| | | plugin = make_plugin(secretfile=path) |
| | | self.assertEqual(plugin.secret, 's33kr1t') |
| | | |
| | | def test_factory_with_timeout_and_reissue_time(self): |
| | | from repoze.who.plugins.auth_tkt import make_plugin |
| | | plugin = make_plugin('secret', timeout=5, reissue_time=1) |
| | | self.assertEqual(plugin.timeout, 5) |
| | | self.assertEqual(plugin.reissue_time, 1) |
| | | |
| | | def test_timeout_no_reissue(self): |
| | | self.assertRaises(ValueError, self._makeOne, 'userid', timeout=1) |
| | | |
| | | def test_timeout_lower_than_reissue(self): |
| | | self.assertRaises(ValueError, self._makeOne, 'userid', timeout=1, |
| | | reissue_time=2) |