Tres Seaver
2010-09-07 c80261e876fe0937ad28806c4ee3f445dabab6d7
HMmm, no idea why the 'bzr add' didn't pick these up.
2 files added
399 ■■■■■ changed files
repoze/who/plugins/redirector.py 68 ●●●●● patch | view | raw | blame | history
repoze/who/plugins/tests/test_redirector.py 331 ●●●●● patch | view | raw | blame | history
repoze/who/plugins/redirector.py
New file
@@ -0,0 +1,68 @@
import urlparse
import urllib
import cgi
from paste.httpexceptions import HTTPFound
from paste.request import construct_url
from paste.response import header_value
from zope.interface import implements
from repoze.who.interfaces import IChallenger
class RedirectorPlugin(object):
    """ Plugin for issuing challenges as redirects to a configured URL.
    o If the ``reason_param`` option is configured, and the application has
      supplied an ``X-Authorization-Failure-Reason`` header, the plugin
      includes that reason in the query string of the redirected URL.
    """
    implements(IChallenger)
    def __init__(self,
                 login_url,
                 came_from_param='came_from',
                 reason_param='reason',
                 reason_header='X-Authorization-Failure-Reason',
                ):
        self.login_url = login_url
        self.came_from_param = came_from_param
        self.reason_param = reason_param
        self.reason_header = reason_header
        self._login_url_parts = list(urlparse.urlparse(login_url))
    # IChallenger
    def challenge(self, environ, status, app_headers, forget_headers):
        if self.reason_param is not None or self.came_from_param is not None:
            url_parts = self._login_url_parts[:]
            query = url_parts[4]
            query_elements = cgi.parse_qs(query)
            reason = header_value(app_headers, self.reason_header)
            if reason and self.reason_param is not None:
                query_elements[self.reason_param] = reason
            if self.came_from_param is not None:
                query_elements[self.came_from_param] = construct_url(environ)
            url_parts[4] = urllib.urlencode(query_elements, doseq=True)
            login_url = urlparse.urlunparse(url_parts)
        else:
            login_url = self.login_url
        headers = [('Location', login_url)] + forget_headers
        cookies = [(h,v) for (h,v) in app_headers if h.lower() == 'set-cookie']
        headers += cookies
        return HTTPFound(headers=headers)
def make_plugin(login_url,
                came_from_param=None,
                reason_param=None,
                reason_header=None,
               ):
    if reason_header is not None and reason_param is None:
        raise Exception("Can't set 'reason_header' without 'reason_param'.")
    if reason_header is None and reason_param is not None:
        reason_header='X-Authorization-Failure-Reason'
    return RedirectorPlugin(login_url,
                            came_from_param=came_from_param,
                            reason_param=reason_param,
                            reason_header=reason_header,
                           )
repoze/who/plugins/tests/test_redirector.py
New file
@@ -0,0 +1,331 @@
import unittest
class TestRedirectorPlugin(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.plugins.redirector import RedirectorPlugin
        return RedirectorPlugin
    def _makeOne(self,
                 login_url='http://example.com/login.html',
                 came_from_param='came_from',
                 reason_param='reason',
                 reason_header='X-Authorization-Failure-Reason',
                ):
        return self._getTargetClass()(login_url,
                                      came_from_param=came_from_param,
                                      reason_param=reason_param,
                                      reason_header=reason_header)
    def _makeEnviron(self, login=None, password=None, came_from=None,
                         path_info='/', identifier=None, max_age=None):
        from StringIO import StringIO
        fields = []
        if login:
            fields.append(('login', login))
        if password:
            fields.append(('password', password))
        if came_from:
            fields.append(('came_from', came_from))
        if max_age:
            fields.append(('max_age', max_age))
        if identifier is None:
            credentials = {'login':'chris', 'password':'password'}
            identifier = DummyIdentifier(credentials)
        content_type, body = encode_multipart_formdata(fields)
        environ = {'wsgi.version': (1,0),
                   'wsgi.input': StringIO(body),
                   'wsgi.url_scheme':'http',
                   'SERVER_NAME': 'www.example.com',
                   'SERVER_PORT': '80',
                   'CONTENT_TYPE': content_type,
                   'CONTENT_LENGTH': len(body),
                   'REQUEST_METHOD': 'POST',
                   'repoze.who.plugins': {'cookie':identifier},
                   'QUERY_STRING': 'default=1',
                   'PATH_INFO': path_info,
                  }
        return environ
    def test_class_conforms_to_IChallenger(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IChallenger
        verifyClass(IChallenger, self._getTargetClass())
    def test_instance_conforms_to_IChallenger(self):
        from zope.interface.verify import verifyObject
        from repoze.who.interfaces import IChallenger
        verifyObject(IChallenger, self._makeOne())
    def test_challenge(self):
        import urlparse
        import cgi
        plugin = self._makeOne()
        environ = self._makeEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [('app', '1')],
                               [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
        headers = sr.headers
        self.assertEqual(len(headers), 3)
        self.assertEqual(sr.headers[1][0], 'forget')
        self.assertEqual(sr.headers[1][1], '1')
        self.assertEqual(sr.headers[2][0], 'content-type')
        self.assertEqual(sr.headers[2][1], 'text/plain; charset=utf8')
        self.assertEqual(sr.status, '302 Found')
    def test_challenge_with_reason_header(self):
        import urlparse
        import cgi
        plugin = self._makeOne()
        environ = self._makeEnviron()
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        came_from_key, came_from_value = parts_qsl[0]
        reason_key, reason_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
        self.assertEqual(reason_key, 'reason')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_custom_reason_header(self):
        import urlparse
        import cgi
        plugin = self._makeOne(reason_header='X-Custom-Auth-Failure')
        environ = self._makeEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
    def test_challenge_w_reason_no_reason_param_no_came_from_param(self):
        import urlparse
        import cgi
        plugin = self._makeOne(reason_param=None, came_from_param=None)
        environ = self._makeEnviron()
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 0)
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
    def test_challenge_w_reason_no_reason_param_w_came_from_param(self):
        import urlparse
        import cgi
        plugin = self._makeOne(reason_param=None)
        environ = self._makeEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
    def test_challenge_with_reason_and_custom_reason_param(self):
        import urlparse
        import cgi
        plugin = self._makeOne(reason_param='auth_failure')
        environ = self._makeEnviron()
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        reason_key, reason_value = parts_qsl[0]
        came_from_key, came_from_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
        self.assertEqual(reason_key, 'auth_failure')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_setcookie_from_app(self):
        plugin = self._makeOne()
        environ = self._makeEnviron()
        app = plugin.challenge(
            environ,
            '401 Unauthorized',
            [('app', '1'), ('set-cookie','a'), ('set-cookie','b')],
            [])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(sr.headers[1][0], 'set-cookie')
        self.assertEqual(sr.headers[1][1], 'a')
        self.assertEqual(sr.headers[2][0], 'set-cookie')
        self.assertEqual(sr.headers[2][1], 'b')
class Test_make_redirecting_plugin(unittest.TestCase):
    def _callFUT(self, *args, **kw):
        from repoze.who.plugins.redirector import make_plugin
        return make_plugin(*args, **kw)
    def test_no_login_url_raises(self):
        self.assertRaises(Exception, self._callFUT, None)
    def test_defaults(self):
        plugin = self._callFUT('/go_there')
        self.assertEqual(plugin.login_url, '/go_there')
        self.assertEqual(plugin.came_from_param, None)
        self.assertEqual(plugin.reason_param, None)
        self.assertEqual(plugin.reason_header, None)
    def test_explicit_came_from_param(self):
        plugin = self._callFUT('/go_there', came_from_param='whence')
        self.assertEqual(plugin.login_url, '/go_there')
        self.assertEqual(plugin.came_from_param, 'whence')
        self.assertEqual(plugin.reason_param, None)
        self.assertEqual(plugin.reason_header, None)
    def test_explicit_reason_param(self):
        plugin = self._callFUT('/go_there', reason_param='why')
        self.assertEqual(plugin.login_url, '/go_there')
        self.assertEqual(plugin.came_from_param, None)
        self.assertEqual(plugin.reason_param, 'why')
        self.assertEqual(plugin.reason_header, 'X-Authorization-Failure-Reason')
    def test_explicit_reason_header_param_no_reason_param_raises(self):
        self.assertRaises(Exception, self._callFUT, '/go_there',
                                                    reason_header='X-Reason')
    def test_explicit_reason_header_param(self):
        plugin = self._callFUT('/go_there', reason_param='why',
                                            reason_header='X-Reason')
        self.assertEqual(plugin.login_url, '/go_there')
        self.assertEqual(plugin.came_from_param, None)
        self.assertEqual(plugin.reason_param, 'why')
        self.assertEqual(plugin.reason_header, 'X-Reason')
class DummyIdentifier:
    forgotten = False
    remembered = False
    def __init__(self, credentials=None, remember_headers=None,
                 forget_headers=None, replace_app=None):
        self.credentials = credentials
        self.remember_headers = remember_headers
        self.forget_headers = forget_headers
        self.replace_app = replace_app
    def identify(self, environ):
        if self.replace_app:
            environ['repoze.who.application'] = self.replace_app
        return self.credentials
    def forget(self, environ, identity):
        self.forgotten = identity
        return self.forget_headers
    def remember(self, environ, identity):
        self.remembered = identity
        return self.remember_headers
class DummyStartResponse:
    def __call__(self, status, headers, exc_info=None):
        self.status = status
        self.headers = headers
        self.exc_info = exc_info
        return []
def encode_multipart_formdata(fields):
    BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'
    CRLF = '\r\n'
    L = []
    for (key, value) in fields:
        L.append('--' + BOUNDARY)
        L.append('Content-Disposition: form-data; name="%s"' % key)
        L.append('')
        L.append(value)
    L.append('--' + BOUNDARY + '--')
    L.append('')
    body = CRLF.join(L)
    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
    return content_type, body