Chris McDonough
2008-02-26 740830059b18411163fd26a0f4baeb0dff0c8129
__call__ middleware tests; fix start_responses to accept exc_info in various places.

2 files modified
139 ■■■■■ changed files
repoze/pam/middleware.py 5 ●●●● patch | view | raw | blame | history
repoze/pam/tests.py 134 ●●●●● patch | view | raw | blame | history
repoze/pam/middleware.py
@@ -208,19 +208,22 @@
class StartResponseWrapper(object):
    def __init__(self, start_response):
        self.start_response = start_response
        self.status = None
        self.headers = []
        self.exc_info = None
        self.buffer = StringIO()
    def wrap_start_response(self, status, headers, exc_info=None):
        self.headers = headers
        self.status = status
        self.exc_info = exc_info
        return self.buffer.write
    def finish_response(self, extra_headers):
        if not extra_headers:
            extra_headers = []
        headers = self.headers + extra_headers
        write = self.start_response(self.status, headers)
        write = self.start_response(self.status, headers, self.exc_info)
        if write:
            self.buffer.seek(0)
            value = self.buffer.getvalue()
repoze/pam/tests.py
@@ -353,7 +353,119 @@
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, [])
    # XXX need more call tests
    def test_call_200_no_plugins(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        self.assertRaises(RuntimeError, mw, environ, start_response)
    def test_call_200_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        identifier = DummyIdentifier()
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_identifiers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        mw = self._makeOne(app=app, challengers=challengers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
    def test_call_401_challenger_and_identifier_no_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        identifier = DummyIdentifier()
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        self.assertEqual(identifier.forgotten, False)
        self.assertEqual(environ.get('REMOTE_USER'), None)
    def test_call_401_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        identifier = DummyIdentifier()
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        self.assertEqual(identifier.forgotten, True)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
    def test_call_200_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        identifier = DummyIdentifier()
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ.get('challenged'), None)
        self.assertEqual(identifier.forgotten, False)
        self.assertEqual(identifier.remembered, True)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
    # XXX need more call tests:
    #  - auth_id sorting
class TestStartResponseWrapper(unittest.TestCase):
    def _getTargetClass(self):
@@ -382,7 +494,7 @@
            closededs.append(True)
        write.close = close
            
        def start_response(status, headers):
        def start_response(status, headers, exc_info=None):
            statuses.append(status)
            headerses.append(headers)
            return write
@@ -825,6 +937,16 @@
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
class DummyWorkingApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        self.environ = environ
        start_response(self.status, self.headers)
        return ['body']
    
class DummyRequestClassifier:
    def __call__(self, environ):
@@ -873,6 +995,7 @@
class DummyChallenger:
    def __init__(self, app=None):
        self.app = app
    def challenge(self, environ, status, app_headers, forget_headers):
        environ['challenged'] = self.app
        return self.app
@@ -881,6 +1004,13 @@
    def __call__(self, environ, status, headers):
        if status.startswith('401 '):
            return True
class DummyStartResponse:
    def __call__(self, status, headers, exc_info=None):
        self.status = status
        self.headers = headers
        self.exc_info = exc_info
        return []
        
def encode_multipart_formdata(fields):
    BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'