Tres Seaver
2009-05-08 f7efc0b04bfd469b911dd00809767198582d0d09
Split tests out into per-module test modules.
4 files added
1 files modified
3163 ■■■■ changed files
repoze/who/tests/__init__.py 1569 ●●●●● patch | view | raw | blame | history
repoze/who/tests/test_classifiers.py 41 ●●●●● patch | view | raw | blame | history
repoze/who/tests/test_config.py 420 ●●●●● patch | view | raw | blame | history
repoze/who/tests/test_middleware.py 981 ●●●●● patch | view | raw | blame | history
repoze/who/tests/test_restrict.py 152 ●●●●● patch | view | raw | blame | history
repoze/who/tests/__init__.py
@@ -1,1568 +1 @@
import unittest
class Base(unittest.TestCase):
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
class TestMiddleware(Base):
    def _getTargetClass(self):
        from repoze.who.middleware import PluggableAuthenticationMiddleware
        return PluggableAuthenticationMiddleware
    def _makeOne(self,
                 app=None,
                 identifiers=None,
                 authenticators=None,
                 challengers=None,
                 classifier=None,
                 mdproviders=None,
                 challenge_decider=None,
                 log_stream=None,
                 log_level=None,
                 ):
        if app is None:
            app = DummyApp()
        if identifiers is None:
            identifiers = []
        if authenticators is None:
            authenticators = []
        if challengers is None:
            challengers = []
        if classifier is None:
            classifier = DummyRequestClassifier()
        if mdproviders is None:
            mdproviders = []
        if challenge_decider is None:
            challenge_decider = DummyChallengeDecider()
        if log_level is None:
            import logging
            log_level = logging.DEBUG
        mw = self._getTargetClass()(app,
                                    identifiers,
                                    authenticators,
                                    challengers,
                                    mdproviders,
                                    classifier,
                                    challenge_decider,
                                    log_stream,
                                    log_level=logging.DEBUG)
        return mw
    def test_accepts_logger(self):
        import logging
        logger = logging.Logger('something')
        logger.setLevel(logging.INFO)
        mw = self._makeOne(log_stream=logger)
        self.assertEqual(logger, mw.logger)
    def test_identify_success(self):
        environ = self._makeEnviron()
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('i', identifier) ]
        mw = self._makeOne(identifiers=identifiers)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, identifier)
        self.assertEqual(identity['login'], 'chris')
        self.assertEqual(identity['password'], 'password')
    def test_identify_success_empty_identity(self):
        environ = self._makeEnviron()
        identifier = DummyIdentifier({})
        identifiers = [ ('i', identifier) ]
        mw = self._makeOne(identifiers=identifiers)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, identifier)
        self.assertEqual(identity, {})
    def test_identify_fail(self):
        environ = self._makeEnviron()
        plugin = DummyNoResultsIdentifier()
        plugins = [ ('dummy', plugin) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 0)
    def test_identify_success_skip_noresults(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyNoResultsIdentifier()
        credentials = {'login':'chris', 'password':'password'}
        plugin2 = DummyIdentifier(credentials)
        plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, plugin2)
        self.assertEqual(identity['login'], 'chris')
        self.assertEqual(identity['password'], 'password')
    def test_identify_success_multiresults(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 2)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, plugin1)
        self.assertEqual(identity['login'], 'fred')
        self.assertEqual(identity['password'], 'fred')
        new_identifier, identity = results[1]
        self.assertEqual(new_identifier, plugin2)
        self.assertEqual(identity['login'], 'bob')
        self.assertEqual(identity['password'], 'bob')
    def test_identify_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        from repoze.who.interfaces import IIdentifier
        plugin1.classifications = {IIdentifier:['nomatch']}
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugins = [ ('identifier1', plugin1),  ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, 'match')
        self.assertEqual(len(results), 1)
        plugin, creds = results[0]
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, plugin2)
    def test_identify_find_explicit_classifier(self):
        environ = self._makeEnviron()
        from repoze.who.interfaces import IIdentifier
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        plugin1.classifications = {IIdentifier:['nomatch']}
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugin2.classifications = {IIdentifier:['match']}
        plugins= [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, 'match')
        self.assertEqual(len(results), 1)
        plugin, creds = results[0]
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, plugin2)
    def test_authenticate_success(self):
        environ = self._makeEnviron()
        plugin1 = DummyAuthenticator('a')
        plugins = [ ('identifier1', plugin1) ]
        mw = self._makeOne(authenticators=plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'a')
    def test_authenticate_fail(self):
        environ = self._makeEnviron()
        mw = self._makeOne() # no authenticators
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        result = mw.authenticate(environ, None, identities)
        self.assertEqual(len(result), 0)
    def test_authenticate_success_skip_fail(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyFailAuthenticator()
        plugin2 = DummyAuthenticator()
        plugins = [ ('dummy1', plugin1), ('dummy2', plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris')
    def test_authenticate_success_multiresult(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyAuthenticator('chris_id1')
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('dummy1',plugin1), ('dummy2',plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 2)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0,))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id1')
        result = results[1]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyAuthenticator('chris_id1')
        from repoze.who.interfaces import IAuthenticator
        plugin1.classifications = {IAuthenticator:['nomatch']}
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
        mw = self._makeOne(authenticators = plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, 'match', identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_find_explicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.who.interfaces import IAuthenticator
        plugin1 = DummyAuthenticator('chris_id1')
        plugin1.classifications = {IAuthenticator:['nomatch']}
        plugin2 = DummyAuthenticator('chris_id2')
        plugin2.classifications = {IAuthenticator:['match']}
        plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
        mw = self._makeOne(authenticators = plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, 'match', identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0, 0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_user_null_but_not_none(self):
        environ = self._makeEnviron()
        plugin1 = DummyAuthenticator(0)
        plugins = [ ('identifier1', plugin1) ]
        mw = self._makeOne(authenticators=plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 0)
    def test_challenge_noidentifier_noapp(self):
        environ = self._makeEnviron()
        challenger = DummyChallenger()
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        app = mw.challenge(environ, 'match', '401 Unauthorized',
                           [], None, identity)
        self.assertEqual(app, None)
        self.assertEqual(environ['challenged'], app)
    def test_authenticate_success_multiresult_one_preauthenticated(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        preauth = DummyIdentifier({'repoze.who.userid':'preauthenticated'})
        plugin1 = DummyAuthenticator('chris_id1')
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('dummy1',plugin1), ('dummy2',plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}),
                       (preauth, preauth.credentials) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 3)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0,))
        self.assertEqual(authenticator, None)
        self.assertEqual(identifier, preauth)
        self.assertEqual(creds['repoze.who.userid'], 'preauthenticated')
        self.assertEqual(userid, 'preauthenticated')
        result = results[1]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,1))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id1')
        result = results[2]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,1))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_challenge_noidentifier_withapp(self):
        environ = self._makeEnviron()
        app = DummyApp()
        challenger = DummyChallenger(app)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], None, identity)
        self.assertEqual(result, app)
        self.assertEqual(environ['challenged'], app)
    def test_challenge_identifier_noapp(self):
        environ = self._makeEnviron()
        challenger = DummyChallenger()
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                              [], identifier, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['challenged'], None)
        self.assertEqual(identifier.forgotten, identity)
    def test_challenge_identifier_app(self):
        environ = self._makeEnviron()
        app = DummyApp()
        challenger = DummyChallenger(app)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app)
        self.assertEqual(environ['challenged'], app)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_firstwins(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        challenger1 = DummyChallenger(app1)
        challenger2 = DummyChallenger(app2)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                              [], identifier, identity)
        self.assertEqual(result, app1)
        self.assertEqual(environ['challenged'], app1)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_skipnomatch_findimplicit(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        from repoze.who.interfaces import IChallenger
        challenger1 = DummyChallenger(app1)
        challenger1.classifications = {IChallenger:['nomatch']}
        challenger2 = DummyChallenger(app2)
        challenger2.classifications = {IChallenger:None}
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app2)
        self.assertEqual(environ['challenged'], app2)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_skipnomatch_findexplicit(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        from repoze.who.interfaces import IChallenger
        challenger1 = DummyChallenger(app1)
        challenger1.classifications = {IChallenger:['nomatch']}
        challenger2 = DummyChallenger(app2)
        challenger2.classifications = {IChallenger:['match']}
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app2)
        self.assertEqual(environ['challenged'], app2)
        self.assertEqual(identifier.forgotten, identity)
    def test_add_metadata(self):
        environ = self._makeEnviron()
        plugin1 = DummyMDProvider({'foo':'bar'})
        plugin2 = DummyMDProvider({'fuz':'baz'})
        plugins = [ ('meta1', plugin1), ('meta2', plugin2) ]
        mw = self._makeOne(mdproviders=plugins)
        classification = ''
        identity = {}
        results = mw.add_metadata(environ, classification, identity)
        self.assertEqual(identity['foo'], 'bar')
        self.assertEqual(identity['fuz'], 'baz')
    def test_add_metadata_w_classification(self):
        environ = self._makeEnviron()
        plugin1 = DummyMDProvider({'foo':'bar'})
        plugin2 = DummyMDProvider({'fuz':'baz'})
        from repoze.who.interfaces import IMetadataProvider
        plugin2.classifications = {IMetadataProvider:['foo']}
        plugins = [ ('meta1', plugin1), ('meta2', plugin2) ]
        mw = self._makeOne(mdproviders=plugins)
        classification = 'monkey'
        identity = {}
        mw.add_metadata(environ, classification, identity)
        self.assertEqual(identity['foo'], 'bar')
        self.assertEqual(identity.get('fuz'), None)
    def test_call_remoteuser_already_set(self):
        environ = self._makeEnviron({'REMOTE_USER':'admin'})
        mw = self._makeOne()
        result = mw(environ, None)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, [])
    def test_call_200_no_plugins(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        self.assertRaises(RuntimeError, mw, environ, start_response)
    def test_call_200_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_identifiers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        mw = self._makeOne(app=app, challengers=challengers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
    def test_call_401_challenger_and_identifier_no_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'a', 'password':'b'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        self.assertEqual(identifier.forgotten, False)
        self.assertEqual(environ.get('REMOTE_USER'), None)
    def test_call_401_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        # @@ unfuck
##         self.assertEqual(identifier.forgotten, identifier.credentials)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
    def test_call_200_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ.get('challenged'), None)
        self.assertEqual(identifier.forgotten, False)
        # @@ figure out later
##         self.assertEqual(dict(identifier.remembered)['login'], dict(identifier.credentials)['login'])
##         self.assertEqual(dict(identifier.remembered)['password'], dict(identifier.credentials)['password'])
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
    def test_call_200_identity_reset(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        new_identity = {'user_id':'foo', 'password':'bar'}
        app = DummyIdentityResetApp('200 OK', headers, new_identity)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ.get('challenged'), None)
        self.assertEqual(identifier.forgotten, False)
        new_credentials = identifier.credentials.copy()
        new_credentials['login'] = 'fred'
        new_credentials['password'] = 'schooled'
        # @@ unfuck
##         self.assertEqual(identifier.remembered, new_credentials)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], new_credentials)
    def test_call_200_with_metadata(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdprovider = DummyMDProvider({'foo':'bar'})
        mdproviders = [ ('mdprovider', mdprovider) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        # metadata
        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
    def test_call_ingress_plugin_replaces_application(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        challengers = []
        credentials = {'login':'chris', 'password':'password'}
        from paste.httpexceptions import HTTPFound
        identifier = DummyIdentifier(
            credentials,
            remember_headers=[('a', '1')],
            replace_app = HTTPFound('http://example.com/redirect')
            )
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdproviders = []
        mw = self._makeOne(app=app,
                           challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = ''.join(mw(environ, start_response))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(start_response.status, '302 Found')
        headers = start_response.headers
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0],
                         ('location', 'http://example.com/redirect'))
        self.assertEqual(headers[1],
                         ('content-type', 'text/plain; charset=utf8'))
        self.assertEqual(headers[2],
                         ('a', '1'))
        self.assertEqual(start_response.exc_info, None)
        self.failIf(environ.has_key('repoze.who.application'))
    def test_call_app_doesnt_call_start_response(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyGeneratorApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdprovider = DummyMDProvider({'foo':'bar'})
        mdproviders = [ ('mdprovider', mdprovider) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        # metadata
        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
    # XXX need more call tests:
    #  - auth_id sorting
class TestMatchClassification(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import match_classification
        return match_classification
    def test_match_classification(self):
        f = self._getFUT()
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        from repoze.who.interfaces import IAuthenticator
        multi1 = DummyMultiPlugin()
        multi2 = DummyMultiPlugin()
        multi1.classifications = {IIdentifier:('foo', 'bar'),
                                  IChallenger:('buz',),
                                  IAuthenticator:None}
        multi2.classifications = {IIdentifier:('foo', 'baz', 'biz')}
        plugins = (multi1, multi2)
        # specific
        self.assertEqual(f(IIdentifier, plugins, 'foo'), [multi1, multi2])
        self.assertEqual(f(IIdentifier, plugins, 'bar'), [multi1])
        self.assertEqual(f(IIdentifier, plugins, 'biz'), [multi2])
        # any for multi2
        self.assertEqual(f(IChallenger, plugins, 'buz'), [multi1, multi2])
        # any for either
        self.assertEqual(f(IAuthenticator, plugins, 'buz'), [multi1, multi2])
class TestStartResponseWrapper(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import StartResponseWrapper
        return StartResponseWrapper
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_ctor(self):
        wrapper = self._makeOne(None)
        self.assertEqual(wrapper.start_response, None)
        self.assertEqual(wrapper.headers, [])
        self.failUnless(wrapper.buffer)
    def test_finish_response(self):
        statuses = []
        headerses = []
        datases = []
        closededs = []
        from StringIO import StringIO
        def write(data):
            datases.append(data)
        def close():
            closededs.append(True)
        write.close = close
        def start_response(status, headers, exc_info=None):
            statuses.append(status)
            headerses.append(headers)
            return write
        wrapper = self._makeOne(start_response)
        wrapper.status = '401 Unauthorized'
        wrapper.headers = [('a', '1')]
        wrapper.buffer = StringIO('written')
        extra_headers = [('b', '2')]
        result = wrapper.finish_response(extra_headers)
        self.assertEqual(result, None)
        self.assertEqual(headerses[0], wrapper.headers + extra_headers)
        self.assertEqual(statuses[0], wrapper.status)
        self.assertEqual(datases[0], 'written')
        self.assertEqual(closededs[0], True)
class TestDefaultRequestClassifier(Base):
    def _getFUT(self):
        from repoze.who.classifiers import default_request_classifier
        return default_request_classifier
    def test_classify_dav_method(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'REQUEST_METHOD':'COPY'})
        result = classifier(environ)
        self.assertEqual(result, 'dav')
    def test_classify_dav_useragent(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'HTTP_USER_AGENT':'WebDrive'})
        result = classifier(environ)
        self.assertEqual(result, 'dav')
    def test_classify_xmlpost(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
                                     'REQUEST_METHOD':'POST'})
        result = classifier(environ)
        self.assertEqual(result, 'xmlpost')
    def test_classify_browser(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
                                     'REQUEST_METHOD':'GET'})
        result = classifier(environ)
        self.assertEqual(result, 'browser')
class TestMakeRegistries(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import make_registries
        return make_registries
    def test_empty(self):
        fn = self._getFUT()
        iface_reg, name_reg = fn([], [], [], [])
        self.assertEqual(iface_reg, {})
        self.assertEqual(name_reg, {})
    def test_brokenimpl(self):
        fn = self._getFUT()
        self.assertRaises(ValueError, fn, [(None, DummyApp())], [], [], [])
    def test_ok(self):
        fn = self._getFUT()
        credentials1 = {'login':'chris', 'password':'password'}
        dummy_id1 = DummyIdentifier(credentials1)
        credentials2 = {'login':'chris', 'password':'password'}
        dummy_id2 = DummyIdentifier(credentials2)
        identifiers = [ ('id1', dummy_id1), ('id2', dummy_id2) ]
        dummy_auth = DummyAuthenticator(None)
        authenticators = [ ('auth', dummy_auth) ]
        dummy_challenger = DummyChallenger(None)
        challengers = [ ('challenger', dummy_challenger) ]
        dummy_mdprovider = DummyMDProvider()
        mdproviders = [ ('mdproviders', dummy_mdprovider) ]
        iface_reg, name_reg = fn(identifiers, authenticators, challengers,
                                 mdproviders)
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IAuthenticator
        from repoze.who.interfaces import IChallenger
        self.assertEqual(iface_reg[IIdentifier], [dummy_id1, dummy_id2])
        self.assertEqual(iface_reg[IAuthenticator], [dummy_auth])
        self.assertEqual(iface_reg[IChallenger], [dummy_challenger])
        self.assertEqual(name_reg['id1'], dummy_id1)
        self.assertEqual(name_reg['id2'], dummy_id2)
        self.assertEqual(name_reg['auth'], dummy_auth)
        self.assertEqual(name_reg['challenger'], dummy_challenger)
class TestIdentityDict(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import Identity
        return Identity
    def _makeOne(self, **kw):
        klass = self._getTargetClass()
        return klass(**kw)
    def test_str(self):
        identity = self._makeOne(foo=1)
        self.failUnless(str(identity).startswith('<repoze.who identity'))
        self.assertEqual(identity['foo'], 1)
    def test_repr(self):
        identity = self._makeOne(foo=1)
        self.failUnless(str(identity).startswith('<repoze.who identity'))
        self.assertEqual(identity['foo'], 1)
class TestWhoConfig(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.config import WhoConfig
        return WhoConfig
    def _makeOne(self, here='/', *args, **kw):
        return self._getTargetClass()(here, *args, **kw)
    def _getDummyPluginClass(self, iface):
        from zope.interface import classImplements
        if not iface.implementedBy(DummyPlugin):
            classImplements(DummyPlugin, iface)
        return DummyPlugin
    def test_defaults_before_parse(self):
        config = self._makeOne()
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_empty_string(self):
        config = self._makeOne()
        config.parse('')
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_empty_file(self):
        from StringIO import StringIO
        config = self._makeOne()
        config.parse(StringIO())
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_plugins(self):
        config = self._makeOne()
        config.parse(PLUGINS_ONLY)
        self.assertEqual(len(config.plugins), 2)
        self.failUnless(isinstance(config.plugins['foo'],
                                   DummyPlugin))
        bar = config.plugins['bar']
        self.failUnless(isinstance(bar, DummyPlugin))
        self.assertEqual(bar.credentials, 'qux')
    def test_parse_general_empty(self):
        config = self._makeOne()
        config.parse('[general]')
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
    def test_parse_general_only(self):
        from repoze.who.interfaces import IRequestClassifier
        from repoze.who.interfaces import IChallengeDecider
        class IDummy(IRequestClassifier, IChallengeDecider):
            pass
        PLUGIN_CLASS = self._getDummyPluginClass(IDummy)
        config = self._makeOne()
        config.parse(GENERAL_ONLY)
        self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS))
        self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS))
        self.assertEqual(config.remote_user_key, 'ANOTHER_REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
    def test_parse_general_with_plugins(self):
        from repoze.who.interfaces import IRequestClassifier
        from repoze.who.interfaces import IChallengeDecider
        class IDummy(IRequestClassifier, IChallengeDecider):
            pass
        PLUGIN_CLASS = self._getDummyPluginClass(IDummy)
        config = self._makeOne()
        config.parse(GENERAL_WITH_PLUGINS)
        self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS))
        self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS))
    def test_parse_identifiers_only(self):
        from repoze.who.interfaces import IIdentifier
        PLUGIN_CLASS = self._getDummyPluginClass(IIdentifier)
        config = self._makeOne()
        config.parse(IDENTIFIERS_ONLY)
        identifiers = config.identifiers
        self.assertEqual(len(identifiers), 2)
        first, second = identifiers
        self.assertEqual(first[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IIdentifier], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_identifiers_with_plugins(self):
        from repoze.who.interfaces import IIdentifier
        PLUGIN_CLASS = self._getDummyPluginClass(IIdentifier)
        config = self._makeOne()
        config.parse(IDENTIFIERS_WITH_PLUGINS)
        identifiers = config.identifiers
        self.assertEqual(len(identifiers), 2)
        first, second = identifiers
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IIdentifier], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_authenticators_only(self):
        from repoze.who.interfaces import IAuthenticator
        PLUGIN_CLASS = self._getDummyPluginClass(IAuthenticator)
        config = self._makeOne()
        config.parse(AUTHENTICATORS_ONLY)
        authenticators = config.authenticators
        self.assertEqual(len(authenticators), 2)
        first, second = authenticators
        self.assertEqual(first[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IAuthenticator], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_authenticators_with_plugins(self):
        from repoze.who.interfaces import IAuthenticator
        PLUGIN_CLASS = self._getDummyPluginClass(IAuthenticator)
        config = self._makeOne()
        config.parse(AUTHENTICATORS_WITH_PLUGINS)
        authenticators = config.authenticators
        self.assertEqual(len(authenticators), 2)
        first, second = authenticators
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IAuthenticator], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_challengers_only(self):
        from repoze.who.interfaces import IChallenger
        PLUGIN_CLASS = self._getDummyPluginClass(IChallenger)
        config = self._makeOne()
        config.parse(CHALLENGERS_ONLY)
        challengers = config.challengers
        self.assertEqual(len(challengers), 2)
        first, second = challengers
        self.assertEqual(first[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IChallenger], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_challengers_with_plugins(self):
        from repoze.who.interfaces import IChallenger
        PLUGIN_CLASS = self._getDummyPluginClass(IChallenger)
        config = self._makeOne()
        config.parse(CHALLENGERS_WITH_PLUGINS)
        challengers = config.challengers
        self.assertEqual(len(challengers), 2)
        first, second = challengers
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IChallenger], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_mdproviders_only(self):
        from repoze.who.interfaces import IMetadataProvider
        PLUGIN_CLASS = self._getDummyPluginClass(IMetadataProvider)
        config = self._makeOne()
        config.parse(MDPROVIDERS_ONLY)
        mdproviders = config.mdproviders
        self.assertEqual(len(mdproviders), 2)
        first, second = mdproviders
        self.assertEqual(first[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_mdproviders_with_plugins(self):
        from repoze.who.interfaces import IMetadataProvider
        PLUGIN_CLASS = self._getDummyPluginClass(IMetadataProvider)
        config = self._makeOne()
        config.parse(MDPROVIDERS_WITH_PLUGINS)
        mdproviders = config.mdproviders
        self.assertEqual(len(mdproviders), 2)
        first, second = mdproviders
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
class DummyPlugin:
    def __init__(self, **kw):
        self.__dict__.update(kw)
PLUGINS_ONLY = """\
[plugin:foo]
use = repoze.who.tests:DummyPlugin
[plugin:bar]
use = repoze.who.tests:DummyPlugin
credentials = qux
"""
GENERAL_ONLY = """\
[general]
request_classifier = repoze.who.tests:DummyPlugin
challenge_decider = repoze.who.tests:DummyPlugin
remote_user_key = ANOTHER_REMOTE_USER
"""
GENERAL_WITH_PLUGINS = """\
[general]
request_classifier = classifier
challenge_decider = decider
[plugin:classifier]
use = repoze.who.tests:DummyPlugin
[plugin:decider]
use = repoze.who.tests:DummyPlugin
"""
IDENTIFIERS_ONLY = """\
[identifiers]
plugins =
    repoze.who.tests:DummyPlugin;klass1
    repoze.who.tests:DummyPlugin
"""
IDENTIFIERS_WITH_PLUGINS = """\
[identifiers]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests:DummyPlugin
[plugin:bar]
use = repoze.who.tests:DummyPlugin
"""
AUTHENTICATORS_ONLY = """\
[authenticators]
plugins =
    repoze.who.tests:DummyPlugin;klass1
    repoze.who.tests:DummyPlugin
"""
AUTHENTICATORS_WITH_PLUGINS = """\
[authenticators]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests:DummyPlugin
[plugin:bar]
use = repoze.who.tests:DummyPlugin
"""
CHALLENGERS_ONLY = """\
[challengers]
plugins =
    repoze.who.tests:DummyPlugin;klass1
    repoze.who.tests:DummyPlugin
"""
CHALLENGERS_WITH_PLUGINS = """\
[challengers]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests:DummyPlugin
[plugin:bar]
use = repoze.who.tests:DummyPlugin
"""
MDPROVIDERS_ONLY = """\
[mdproviders]
plugins =
    repoze.who.tests:DummyPlugin;klass1
    repoze.who.tests:DummyPlugin
"""
MDPROVIDERS_WITH_PLUGINS = """\
[mdproviders]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests:DummyPlugin
[plugin:bar]
use = repoze.who.tests:DummyPlugin
"""
class TestConfigMiddleware(unittest.TestCase):
    tempfile = None
    def setUp(self):
        pass
    def tearDown(self):
        if self.tempfile is not None:
            self.tempfile.close()
    def _getFactory(self):
        from repoze.who.config import make_middleware_with_config
        return make_middleware_with_config
    def _getTempfile(self, text):
        import tempfile
        tf = self.tempfile = tempfile.NamedTemporaryFile()
        tf.write(text)
        tf.flush()
        return tf
    def test_sample_config(self):
        app = DummyApp()
        factory = self._getFactory()
        tempfile = self._getTempfile(SAMPLE_CONFIG)
        global_cohf = {'here': '/'}
        middleware = factory(app, global_cohf, config_file=tempfile.name,
                             log_file='STDOUT', log_level='debug')
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IAuthenticator
        from repoze.who.interfaces import IChallenger
        self.assertEqual(len(middleware.registry[IIdentifier]), 3)
        self.assertEqual(len(middleware.registry[IAuthenticator]), 1)
        self.assertEqual(len(middleware.registry[IChallenger]), 2)
        self.failUnless(middleware.logger, middleware.logger)
        import logging
        self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG)
SAMPLE_CONFIG = """\
[plugin:form]
use = repoze.who.plugins.form:make_plugin
login_form_qs = __do_login
rememberer_name = auth_tkt
[plugin:auth_tkt]
use = repoze.who.plugins.auth_tkt:make_plugin
secret = s33kr1t
cookie_name = oatmeal
secure = False
include_ip = True
[plugin:basicauth]
use = repoze.who.plugins.basicauth:make_plugin
realm = 'sample'
[plugin:htpasswd]
use = repoze.who.plugins.htpasswd:make_plugin
filename = %(here)s/etc/passwd
check_fn = repoze.who.plugins.htpasswd:crypt_check
[general]
request_classifier = repoze.who.classifiers:default_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider
[identifiers]
plugins =
    form;browser
    auth_tkt
    basicauth
[authenticators]
plugins = htpasswd
[challengers]
plugins =
    form;browser
    basicauth
[mdproviders]
plugins =
"""
class AuthenticatedPredicateTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.restrict import authenticated_predicate
        return authenticated_predicate()
    def test___call___no_identity_returns_False(self):
        predicate = self._getFUT()
        environ = {}
        self.failIf(predicate(environ))
    def test___call___w_REMOTE_AUTH_returns_True(self):
        predicate = self._getFUT()
        environ = {'REMOTE_USER': 'fred'}
        self.failUnless(predicate(environ))
    def test___call___w_repoze_who_identity_returns_True(self):
        predicate = self._getFUT()
        environ = {'repoze.who.identity': {'login': 'fred'}}
        self.failUnless(predicate(environ))
class PredicateRestrictionTests(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.restrict import PredicateRestriction
        return PredicateRestriction
    def _makeOne(self, app=None, **kw):
        if app is None:
            app = DummyApp()
        return self._getTargetClass()(app, **kw)
    def test___call___disabled_predicate_false_calls_app_not_predicate(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return False
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True}
        restrict = self._makeOne(predicate=_factory, enabled=False)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 0)
        self.assertEqual(len(_started), 0)
        self.assertEqual(restrict.app.environ, environ)
    def test___call___enabled_predicate_false_returns_401(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return False
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True}
        restrict = self._makeOne(predicate=_factory)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 1)
        self.assertEqual(len(_started), 1, _started)
        self.assertEqual(_started[0][0], '401 Unauthorized')
        self.assertEqual(restrict.app.environ, None)
    def test___call___enabled_predicate_true_calls_app(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return True
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True, 'REMOTE_USER': 'fred'}
        restrict = self._makeOne(predicate=_factory)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 1)
        self.assertEqual(len(_started), 0)
        self.assertEqual(restrict.app.environ, environ)
class MakePredicateRestrictionTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.restrict import make_predicate_restriction
        return make_predicate_restriction
    def test_non_string_predicate_no_args(self):
        fut = self._getFUT()
        app = DummyApp()
        def _predicate(env):
            return True
        def _factory():
            return _predicate
        filter = fut(app, {}, predicate=_factory)
        self.failUnless(filter.app is app)
        self.failUnless(filter.predicate is _predicate)
        self.failUnless(filter.enabled)
    def test_disabled_non_string_predicate_w_args(self):
        fut = self._getFUT()
        app = DummyApp()
        filter = fut(app, {}, predicate=DummyPredicate, enabled=False,
                     foo='Foo')
        self.failUnless(filter.app is app)
        self.failUnless(isinstance(filter.predicate, DummyPredicate))
        self.assertEqual(filter.predicate.foo, 'Foo')
        self.failIf(filter.enabled)
    def test_enabled_string_predicate_w_args(self):
        fut = self._getFUT()
        app = DummyApp()
        filter = fut(app, {}, predicate='repoze.who.tests:DummyPredicate',
                     enabled=True, foo='Foo')
        self.failUnless(filter.app is app)
        self.failUnless(isinstance(filter.predicate, DummyPredicate))
        self.assertEqual(filter.predicate.foo, 'Foo')
        self.failUnless(filter.enabled)
class WrapGeneratorTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import wrap_generator
        return wrap_generator
    def test_it(self):
        L = []
        def gen(L=L):
            L.append('yo!')
            yield 'a'
            yield 'b'
        wrap_generator = self._getFUT()
        newgen = wrap_generator(gen())
        self.assertEqual(L, ['yo!'])
        self.assertEqual(list(newgen), ['a', 'b'])
# XXX need make_middleware tests
class DummyPredicate:
    def __init__(self, **kw):
        self.__dict__.update(kw)
    def __call__(self, env):
        return True
class DummyApp:
    environ = None
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
class DummyWorkingApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        self.environ = environ
        start_response(self.status, self.headers)
        return ['body']
class DummyGeneratorApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        def gen(self=self, start_response=start_response):
            self.environ = environ
            start_response(self.status, self.headers)
            yield 'body'
        return gen()
class DummyIdentityResetApp:
    def __init__(self, status, headers, new_identity):
        self.status = status
        self.headers = headers
        self.new_identity = new_identity
    def __call__(self, environ, start_response):
        self.environ = environ
        environ['repoze.who.identity']['login'] = 'fred'
        environ['repoze.who.identity']['password'] = 'schooled'
        start_response(self.status, self.headers)
        return ['body']
class DummyRequestClassifier:
    def __call__(self, environ):
        return 'browser'
class DummyIdentifier:
    forgotten = False
    remembered = False
    def __init__(self, credentials=None, remember_headers=None,
                 forget_headers=None, replace_app=None):
        self.credentials = credentials
        self.remember_headers = remember_headers
        self.forget_headers = forget_headers
        self.replace_app = replace_app
    def identify(self, environ):
        if self.replace_app:
            environ['repoze.who.application'] = self.replace_app
        return self.credentials
    def forget(self, environ, identity):
        self.forgotten = identity
        return self.forget_headers
    def remember(self, environ, identity):
        self.remembered = identity
        return self.remember_headers
class DummyNoResultsIdentifier:
    def identify(self, environ):
        return None
    def remember(self, *arg, **kw):
        pass
    def forget(self, *arg, **kw):
        pass
class DummyAuthenticator:
    def __init__(self, userid=None):
        self.userid = userid
    def authenticate(self, environ, credentials):
        if self.userid is None:
            return credentials['login']
        return self.userid
class DummyMultiPlugin:
    pass
class DummyFailAuthenticator:
    def authenticate(self, environ, credentials):
        return None
class DummyChallenger:
    def __init__(self, app=None):
        self.app = app
    def challenge(self, environ, status, app_headers, forget_headers):
        environ['challenged'] = self.app
        return self.app
class DummyMDProvider:
    def __init__(self, metadata=None):
        self._metadata = metadata
    def add_metadata(self, environ, identity):
        return identity.update(self._metadata)
class DummyChallengeDecider:
    def __call__(self, environ, status, headers):
        if status.startswith('401 '):
            return True
class DummyStartResponse:
    def __call__(self, status, headers, exc_info=None):
        self.status = status
        self.headers = headers
        self.exc_info = exc_info
        return []
#package
repoze/who/tests/test_classifiers.py
New file
@@ -0,0 +1,41 @@
import unittest
class TestDefaultRequestClassifier(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.classifiers import default_request_classifier
        return default_request_classifier
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
    def test_classify_dav_method(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'REQUEST_METHOD':'COPY'})
        result = classifier(environ)
        self.assertEqual(result, 'dav')
    def test_classify_dav_useragent(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'HTTP_USER_AGENT':'WebDrive'})
        result = classifier(environ)
        self.assertEqual(result, 'dav')
    def test_classify_xmlpost(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
                                     'REQUEST_METHOD':'POST'})
        result = classifier(environ)
        self.assertEqual(result, 'xmlpost')
    def test_classify_browser(self):
        classifier = self._getFUT()
        environ = self._makeEnviron({'CONTENT_TYPE':'text/xml',
                                     'REQUEST_METHOD':'GET'})
        result = classifier(environ)
        self.assertEqual(result, 'browser')
repoze/who/tests/test_config.py
New file
@@ -0,0 +1,420 @@
import unittest
class TestWhoConfig(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.config import WhoConfig
        return WhoConfig
    def _makeOne(self, here='/', *args, **kw):
        return self._getTargetClass()(here, *args, **kw)
    def _getDummyPluginClass(self, iface):
        from zope.interface import classImplements
        if not iface.implementedBy(DummyPlugin):
            classImplements(DummyPlugin, iface)
        return DummyPlugin
    def test_defaults_before_parse(self):
        config = self._makeOne()
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_empty_string(self):
        config = self._makeOne()
        config.parse('')
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_empty_file(self):
        from StringIO import StringIO
        config = self._makeOne()
        config.parse(StringIO())
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
        self.assertEqual(len(config.identifiers), 0)
        self.assertEqual(len(config.authenticators), 0)
        self.assertEqual(len(config.challengers), 0)
        self.assertEqual(len(config.mdproviders), 0)
    def test_parse_plugins(self):
        config = self._makeOne()
        config.parse(PLUGINS_ONLY)
        self.assertEqual(len(config.plugins), 2)
        self.failUnless(isinstance(config.plugins['foo'],
                                   DummyPlugin))
        bar = config.plugins['bar']
        self.failUnless(isinstance(bar, DummyPlugin))
        self.assertEqual(bar.credentials, 'qux')
    def test_parse_general_empty(self):
        config = self._makeOne()
        config.parse('[general]')
        self.assertEqual(config.request_classifier, None)
        self.assertEqual(config.challenge_decider, None)
        self.assertEqual(config.remote_user_key, 'REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
    def test_parse_general_only(self):
        from repoze.who.interfaces import IRequestClassifier
        from repoze.who.interfaces import IChallengeDecider
        class IDummy(IRequestClassifier, IChallengeDecider):
            pass
        PLUGIN_CLASS = self._getDummyPluginClass(IDummy)
        config = self._makeOne()
        config.parse(GENERAL_ONLY)
        self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS))
        self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS))
        self.assertEqual(config.remote_user_key, 'ANOTHER_REMOTE_USER')
        self.assertEqual(len(config.plugins), 0)
    def test_parse_general_with_plugins(self):
        from repoze.who.interfaces import IRequestClassifier
        from repoze.who.interfaces import IChallengeDecider
        class IDummy(IRequestClassifier, IChallengeDecider):
            pass
        PLUGIN_CLASS = self._getDummyPluginClass(IDummy)
        config = self._makeOne()
        config.parse(GENERAL_WITH_PLUGINS)
        self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS))
        self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS))
    def test_parse_identifiers_only(self):
        from repoze.who.interfaces import IIdentifier
        PLUGIN_CLASS = self._getDummyPluginClass(IIdentifier)
        config = self._makeOne()
        config.parse(IDENTIFIERS_ONLY)
        identifiers = config.identifiers
        self.assertEqual(len(identifiers), 2)
        first, second = identifiers
        self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IIdentifier], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_identifiers_with_plugins(self):
        from repoze.who.interfaces import IIdentifier
        PLUGIN_CLASS = self._getDummyPluginClass(IIdentifier)
        config = self._makeOne()
        config.parse(IDENTIFIERS_WITH_PLUGINS)
        identifiers = config.identifiers
        self.assertEqual(len(identifiers), 2)
        first, second = identifiers
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IIdentifier], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_authenticators_only(self):
        from repoze.who.interfaces import IAuthenticator
        PLUGIN_CLASS = self._getDummyPluginClass(IAuthenticator)
        config = self._makeOne()
        config.parse(AUTHENTICATORS_ONLY)
        authenticators = config.authenticators
        self.assertEqual(len(authenticators), 2)
        first, second = authenticators
        self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IAuthenticator], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_authenticators_with_plugins(self):
        from repoze.who.interfaces import IAuthenticator
        PLUGIN_CLASS = self._getDummyPluginClass(IAuthenticator)
        config = self._makeOne()
        config.parse(AUTHENTICATORS_WITH_PLUGINS)
        authenticators = config.authenticators
        self.assertEqual(len(authenticators), 2)
        first, second = authenticators
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IAuthenticator], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_challengers_only(self):
        from repoze.who.interfaces import IChallenger
        PLUGIN_CLASS = self._getDummyPluginClass(IChallenger)
        config = self._makeOne()
        config.parse(CHALLENGERS_ONLY)
        challengers = config.challengers
        self.assertEqual(len(challengers), 2)
        first, second = challengers
        self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IChallenger], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_challengers_with_plugins(self):
        from repoze.who.interfaces import IChallenger
        PLUGIN_CLASS = self._getDummyPluginClass(IChallenger)
        config = self._makeOne()
        config.parse(CHALLENGERS_WITH_PLUGINS)
        challengers = config.challengers
        self.assertEqual(len(challengers), 2)
        first, second = challengers
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IChallenger], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_mdproviders_only(self):
        from repoze.who.interfaces import IMetadataProvider
        PLUGIN_CLASS = self._getDummyPluginClass(IMetadataProvider)
        config = self._makeOne()
        config.parse(MDPROVIDERS_ONLY)
        mdproviders = config.mdproviders
        self.assertEqual(len(mdproviders), 2)
        first, second = mdproviders
        self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1')
        self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
    def test_parse_mdproviders_with_plugins(self):
        from repoze.who.interfaces import IMetadataProvider
        PLUGIN_CLASS = self._getDummyPluginClass(IMetadataProvider)
        config = self._makeOne()
        config.parse(MDPROVIDERS_WITH_PLUGINS)
        mdproviders = config.mdproviders
        self.assertEqual(len(mdproviders), 2)
        first, second = mdproviders
        self.assertEqual(first[0], 'foo')
        self.failUnless(isinstance(first[1], PLUGIN_CLASS))
        self.assertEqual(len(first[1].classifications), 1)
        self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1')
        self.assertEqual(second[0], 'bar')
        self.failUnless(isinstance(second[1], PLUGIN_CLASS))
class DummyPlugin:
    def __init__(self, **kw):
        self.__dict__.update(kw)
PLUGINS_ONLY = """\
[plugin:foo]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:bar]
use = repoze.who.tests.test_config:DummyPlugin
credentials = qux
"""
GENERAL_ONLY = """\
[general]
request_classifier = repoze.who.tests.test_config:DummyPlugin
challenge_decider = repoze.who.tests.test_config:DummyPlugin
remote_user_key = ANOTHER_REMOTE_USER
"""
GENERAL_WITH_PLUGINS = """\
[general]
request_classifier = classifier
challenge_decider = decider
[plugin:classifier]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:decider]
use = repoze.who.tests.test_config:DummyPlugin
"""
IDENTIFIERS_ONLY = """\
[identifiers]
plugins =
    repoze.who.tests.test_config:DummyPlugin;klass1
    repoze.who.tests.test_config:DummyPlugin
"""
IDENTIFIERS_WITH_PLUGINS = """\
[identifiers]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:bar]
use = repoze.who.tests.test_config:DummyPlugin
"""
AUTHENTICATORS_ONLY = """\
[authenticators]
plugins =
    repoze.who.tests.test_config:DummyPlugin;klass1
    repoze.who.tests.test_config:DummyPlugin
"""
AUTHENTICATORS_WITH_PLUGINS = """\
[authenticators]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:bar]
use = repoze.who.tests.test_config:DummyPlugin
"""
CHALLENGERS_ONLY = """\
[challengers]
plugins =
    repoze.who.tests.test_config:DummyPlugin;klass1
    repoze.who.tests.test_config:DummyPlugin
"""
CHALLENGERS_WITH_PLUGINS = """\
[challengers]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:bar]
use = repoze.who.tests.test_config:DummyPlugin
"""
MDPROVIDERS_ONLY = """\
[mdproviders]
plugins =
    repoze.who.tests.test_config:DummyPlugin;klass1
    repoze.who.tests.test_config:DummyPlugin
"""
MDPROVIDERS_WITH_PLUGINS = """\
[mdproviders]
plugins =
    foo;klass1
    bar
[plugin:foo]
use = repoze.who.tests.test_config:DummyPlugin
[plugin:bar]
use = repoze.who.tests.test_config:DummyPlugin
"""
class TestConfigMiddleware(unittest.TestCase):
    tempfile = None
    def setUp(self):
        pass
    def tearDown(self):
        if self.tempfile is not None:
            self.tempfile.close()
    def _getFactory(self):
        from repoze.who.config import make_middleware_with_config
        return make_middleware_with_config
    def _getTempfile(self, text):
        import tempfile
        tf = self.tempfile = tempfile.NamedTemporaryFile()
        tf.write(text)
        tf.flush()
        return tf
    def test_sample_config(self):
        app = DummyApp()
        factory = self._getFactory()
        tempfile = self._getTempfile(SAMPLE_CONFIG)
        global_cohf = {'here': '/'}
        middleware = factory(app, global_cohf, config_file=tempfile.name,
                             log_file='STDOUT', log_level='debug')
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IAuthenticator
        from repoze.who.interfaces import IChallenger
        self.assertEqual(len(middleware.registry[IIdentifier]), 3)
        self.assertEqual(len(middleware.registry[IAuthenticator]), 1)
        self.assertEqual(len(middleware.registry[IChallenger]), 2)
        self.failUnless(middleware.logger, middleware.logger)
        import logging
        self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG)
SAMPLE_CONFIG = """\
[plugin:form]
use = repoze.who.plugins.form:make_plugin
login_form_qs = __do_login
rememberer_name = auth_tkt
[plugin:auth_tkt]
use = repoze.who.plugins.auth_tkt:make_plugin
secret = s33kr1t
cookie_name = oatmeal
secure = False
include_ip = True
[plugin:basicauth]
use = repoze.who.plugins.basicauth:make_plugin
realm = 'sample'
[plugin:htpasswd]
use = repoze.who.plugins.htpasswd:make_plugin
filename = %(here)s/etc/passwd
check_fn = repoze.who.plugins.htpasswd:crypt_check
[general]
request_classifier = repoze.who.classifiers:default_request_classifier
challenge_decider = repoze.who.classifiers:default_challenge_decider
[identifiers]
plugins =
    form;browser
    auth_tkt
    basicauth
[authenticators]
plugins = htpasswd
[challengers]
plugins =
    form;browser
    basicauth
[mdproviders]
plugins =
"""
# XXX need make_middleware tests
class DummyApp:
    environ = None
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
repoze/who/tests/test_middleware.py
New file
@@ -0,0 +1,981 @@
import unittest
class Base(unittest.TestCase):
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
class TestMiddleware(Base):
    def _getTargetClass(self):
        from repoze.who.middleware import PluggableAuthenticationMiddleware
        return PluggableAuthenticationMiddleware
    def _makeOne(self,
                 app=None,
                 identifiers=None,
                 authenticators=None,
                 challengers=None,
                 classifier=None,
                 mdproviders=None,
                 challenge_decider=None,
                 log_stream=None,
                 log_level=None,
                 ):
        if app is None:
            app = DummyApp()
        if identifiers is None:
            identifiers = []
        if authenticators is None:
            authenticators = []
        if challengers is None:
            challengers = []
        if classifier is None:
            classifier = DummyRequestClassifier()
        if mdproviders is None:
            mdproviders = []
        if challenge_decider is None:
            challenge_decider = DummyChallengeDecider()
        if log_level is None:
            import logging
            log_level = logging.DEBUG
        mw = self._getTargetClass()(app,
                                    identifiers,
                                    authenticators,
                                    challengers,
                                    mdproviders,
                                    classifier,
                                    challenge_decider,
                                    log_stream,
                                    log_level=logging.DEBUG)
        return mw
    def test_accepts_logger(self):
        import logging
        logger = logging.Logger('something')
        logger.setLevel(logging.INFO)
        mw = self._makeOne(log_stream=logger)
        self.assertEqual(logger, mw.logger)
    def test_identify_success(self):
        environ = self._makeEnviron()
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('i', identifier) ]
        mw = self._makeOne(identifiers=identifiers)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, identifier)
        self.assertEqual(identity['login'], 'chris')
        self.assertEqual(identity['password'], 'password')
    def test_identify_success_empty_identity(self):
        environ = self._makeEnviron()
        identifier = DummyIdentifier({})
        identifiers = [ ('i', identifier) ]
        mw = self._makeOne(identifiers=identifiers)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, identifier)
        self.assertEqual(identity, {})
    def test_identify_fail(self):
        environ = self._makeEnviron()
        plugin = DummyNoResultsIdentifier()
        plugins = [ ('dummy', plugin) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 0)
    def test_identify_success_skip_noresults(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyNoResultsIdentifier()
        credentials = {'login':'chris', 'password':'password'}
        plugin2 = DummyIdentifier(credentials)
        plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 1)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, plugin2)
        self.assertEqual(identity['login'], 'chris')
        self.assertEqual(identity['password'], 'password')
    def test_identify_success_multiresults(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugins = [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, None)
        self.assertEqual(len(results), 2)
        new_identifier, identity = results[0]
        self.assertEqual(new_identifier, plugin1)
        self.assertEqual(identity['login'], 'fred')
        self.assertEqual(identity['password'], 'fred')
        new_identifier, identity = results[1]
        self.assertEqual(new_identifier, plugin2)
        self.assertEqual(identity['login'], 'bob')
        self.assertEqual(identity['password'], 'bob')
    def test_identify_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        from repoze.who.interfaces import IIdentifier
        plugin1.classifications = {IIdentifier:['nomatch']}
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugins = [ ('identifier1', plugin1),  ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, 'match')
        self.assertEqual(len(results), 1)
        plugin, creds = results[0]
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, plugin2)
    def test_identify_find_explicit_classifier(self):
        environ = self._makeEnviron()
        from repoze.who.interfaces import IIdentifier
        plugin1 = DummyIdentifier({'login':'fred','password':'fred'})
        plugin1.classifications = {IIdentifier:['nomatch']}
        plugin2 = DummyIdentifier({'login':'bob','password':'bob'})
        plugin2.classifications = {IIdentifier:['match']}
        plugins= [ ('identifier1', plugin1), ('identifier2', plugin2) ]
        mw = self._makeOne(identifiers=plugins)
        results = mw.identify(environ, 'match')
        self.assertEqual(len(results), 1)
        plugin, creds = results[0]
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, plugin2)
    def test_authenticate_success(self):
        environ = self._makeEnviron()
        plugin1 = DummyAuthenticator('a')
        plugins = [ ('identifier1', plugin1) ]
        mw = self._makeOne(authenticators=plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'a')
    def test_authenticate_fail(self):
        environ = self._makeEnviron()
        mw = self._makeOne() # no authenticators
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        result = mw.authenticate(environ, None, identities)
        self.assertEqual(len(result), 0)
    def test_authenticate_success_skip_fail(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyFailAuthenticator()
        plugin2 = DummyAuthenticator()
        plugins = [ ('dummy1', plugin1), ('dummy2', plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris')
    def test_authenticate_success_multiresult(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyAuthenticator('chris_id1')
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('dummy1',plugin1), ('dummy2',plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 2)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0,))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id1')
        result = results[1]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        plugin1 = DummyAuthenticator('chris_id1')
        from repoze.who.interfaces import IAuthenticator
        plugin1.classifications = {IAuthenticator:['nomatch']}
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
        mw = self._makeOne(authenticators = plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, 'match', identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_find_explicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.who.interfaces import IAuthenticator
        plugin1 = DummyAuthenticator('chris_id1')
        plugin1.classifications = {IAuthenticator:['nomatch']}
        plugin2 = DummyAuthenticator('chris_id2')
        plugin2.classifications = {IAuthenticator:['match']}
        plugins = [ ('auth1', plugin1), ('auth2', plugin2) ]
        mw = self._makeOne(authenticators = plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, 'match', identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0, 0))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_authenticate_user_null_but_not_none(self):
        environ = self._makeEnviron()
        plugin1 = DummyAuthenticator(0)
        plugins = [ ('identifier1', plugin1) ]
        mw = self._makeOne(authenticators=plugins)
        identities = [ (None, {'login':'chris', 'password':'password'}) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 1)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 0)
    def test_challenge_noidentifier_noapp(self):
        environ = self._makeEnviron()
        challenger = DummyChallenger()
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        app = mw.challenge(environ, 'match', '401 Unauthorized',
                           [], None, identity)
        self.assertEqual(app, None)
        self.assertEqual(environ['challenged'], app)
    def test_authenticate_success_multiresult_one_preauthenticated(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        preauth = DummyIdentifier({'repoze.who.userid':'preauthenticated'})
        plugin1 = DummyAuthenticator('chris_id1')
        plugin2 = DummyAuthenticator('chris_id2')
        plugins = [ ('dummy1',plugin1), ('dummy2',plugin2) ]
        mw = self._makeOne(authenticators=plugins)
        creds = {'login':'chris', 'password':'password'}
        identities = [ (None, {'login':'chris', 'password':'password'}),
                       (preauth, preauth.credentials) ]
        results = mw.authenticate(environ, None, identities)
        self.assertEqual(len(results), 3)
        result = results[0]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,0,))
        self.assertEqual(authenticator, None)
        self.assertEqual(identifier, preauth)
        self.assertEqual(creds['repoze.who.userid'], 'preauthenticated')
        self.assertEqual(userid, 'preauthenticated')
        result = results[1]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (0,1))
        self.assertEqual(authenticator, plugin1)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id1')
        result = results[2]
        rank, authenticator, identifier, creds, userid = result
        self.assertEqual(rank, (1,1))
        self.assertEqual(authenticator, plugin2)
        self.assertEqual(identifier, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_challenge_noidentifier_withapp(self):
        environ = self._makeEnviron()
        app = DummyApp()
        challenger = DummyChallenger(app)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], None, identity)
        self.assertEqual(result, app)
        self.assertEqual(environ['challenged'], app)
    def test_challenge_identifier_noapp(self):
        environ = self._makeEnviron()
        challenger = DummyChallenger()
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                              [], identifier, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['challenged'], None)
        self.assertEqual(identifier.forgotten, identity)
    def test_challenge_identifier_app(self):
        environ = self._makeEnviron()
        app = DummyApp()
        challenger = DummyChallenger(app)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge', challenger) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app)
        self.assertEqual(environ['challenged'], app)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_firstwins(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        challenger1 = DummyChallenger(app1)
        challenger2 = DummyChallenger(app2)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                              [], identifier, identity)
        self.assertEqual(result, app1)
        self.assertEqual(environ['challenged'], app1)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_skipnomatch_findimplicit(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        from repoze.who.interfaces import IChallenger
        challenger1 = DummyChallenger(app1)
        challenger1.classifications = {IChallenger:['nomatch']}
        challenger2 = DummyChallenger(app2)
        challenger2.classifications = {IChallenger:None}
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app2)
        self.assertEqual(environ['challenged'], app2)
        self.assertEqual(identifier.forgotten, identity)
    def test_multi_challenge_skipnomatch_findexplicit(self):
        environ = self._makeEnviron()
        app1 = DummyApp()
        app2 = DummyApp()
        from repoze.who.interfaces import IChallenger
        challenger1 = DummyChallenger(app1)
        challenger1.classifications = {IChallenger:['nomatch']}
        challenger2 = DummyChallenger(app2)
        challenger2.classifications = {IChallenger:['match']}
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        plugins = [ ('challenge1', challenger1), ('challenge2', challenger2) ]
        mw = self._makeOne(challengers = plugins)
        identity = {'login':'chris', 'password':'password'}
        result = mw.challenge(environ, 'match', '401 Unauthorized',
                               [], identifier, identity)
        self.assertEqual(result, app2)
        self.assertEqual(environ['challenged'], app2)
        self.assertEqual(identifier.forgotten, identity)
    def test_add_metadata(self):
        environ = self._makeEnviron()
        plugin1 = DummyMDProvider({'foo':'bar'})
        plugin2 = DummyMDProvider({'fuz':'baz'})
        plugins = [ ('meta1', plugin1), ('meta2', plugin2) ]
        mw = self._makeOne(mdproviders=plugins)
        classification = ''
        identity = {}
        results = mw.add_metadata(environ, classification, identity)
        self.assertEqual(identity['foo'], 'bar')
        self.assertEqual(identity['fuz'], 'baz')
    def test_add_metadata_w_classification(self):
        environ = self._makeEnviron()
        plugin1 = DummyMDProvider({'foo':'bar'})
        plugin2 = DummyMDProvider({'fuz':'baz'})
        from repoze.who.interfaces import IMetadataProvider
        plugin2.classifications = {IMetadataProvider:['foo']}
        plugins = [ ('meta1', plugin1), ('meta2', plugin2) ]
        mw = self._makeOne(mdproviders=plugins)
        classification = 'monkey'
        identity = {}
        mw.add_metadata(environ, classification, identity)
        self.assertEqual(identity['foo'], 'bar')
        self.assertEqual(identity.get('fuz'), None)
    def test_call_remoteuser_already_set(self):
        environ = self._makeEnviron({'REMOTE_USER':'admin'})
        mw = self._makeOne()
        result = mw(environ, None)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, [])
    def test_call_200_no_plugins(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        mw = self._makeOne(app=app)
        start_response = DummyStartResponse()
        self.assertRaises(RuntimeError, mw, environ, start_response)
    def test_call_200_no_challengers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(mw.app.environ, environ)
        self.assertEqual(result, ['body'])
        self.assertEqual(start_response.status, '200 OK')
        self.assertEqual(start_response.headers, headers)
    def test_call_401_no_identifiers(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        mw = self._makeOne(app=app, challengers=challengers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
    def test_call_401_challenger_and_identifier_no_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'a', 'password':'b'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        self.assertEqual(identifier.forgotten, False)
        self.assertEqual(environ.get('REMOTE_USER'), None)
    def test_call_401_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('401 Unauthorized', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ['challenged'], challenge_app)
        self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
        # @@ unfuck
##         self.assertEqual(identifier.forgotten, identifier.credentials)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
    def test_call_200_challenger_and_identifier_and_authenticator(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ.get('challenged'), None)
        self.assertEqual(identifier.forgotten, False)
        # @@ figure out later
##         self.assertEqual(dict(identifier.remembered)['login'], dict(identifier.credentials)['login'])
##         self.assertEqual(dict(identifier.remembered)['password'], dict(identifier.credentials)['password'])
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
    def test_call_200_identity_reset(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        new_identity = {'user_id':'foo', 'password':'bar'}
        app = DummyIdentityResetApp('200 OK', headers, new_identity)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        self.assertEqual(environ.get('challenged'), None)
        self.assertEqual(identifier.forgotten, False)
        new_credentials = identifier.credentials.copy()
        new_credentials['login'] = 'fred'
        new_credentials['password'] = 'schooled'
        # @@ unfuck
##         self.assertEqual(identifier.remembered, new_credentials)
        self.assertEqual(environ['REMOTE_USER'], 'chris')
##         self.assertEqual(environ['repoze.who.identity'], new_credentials)
    def test_call_200_with_metadata(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdprovider = DummyMDProvider({'foo':'bar'})
        mdproviders = [ ('mdprovider', mdprovider) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        # metadata
        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
    def test_call_ingress_plugin_replaces_application(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyWorkingApp('200 OK', headers)
        challengers = []
        credentials = {'login':'chris', 'password':'password'}
        from paste.httpexceptions import HTTPFound
        identifier = DummyIdentifier(
            credentials,
            remember_headers=[('a', '1')],
            replace_app = HTTPFound('http://example.com/redirect')
            )
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdproviders = []
        mw = self._makeOne(app=app,
                           challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = ''.join(mw(environ, start_response))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(start_response.status, '302 Found')
        headers = start_response.headers
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0],
                         ('location', 'http://example.com/redirect'))
        self.assertEqual(headers[1],
                         ('content-type', 'text/plain; charset=utf8'))
        self.assertEqual(headers[2],
                         ('a', '1'))
        self.assertEqual(start_response.exc_info, None)
        self.failIf(environ.has_key('repoze.who.application'))
    def test_call_app_doesnt_call_start_response(self):
        environ = self._makeEnviron()
        headers = [('a', '1')]
        app = DummyGeneratorApp('200 OK', headers)
        from paste.httpexceptions import HTTPUnauthorized
        challenge_app = HTTPUnauthorized()
        challenge = DummyChallenger(challenge_app)
        challengers = [ ('challenge', challenge) ]
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        identifiers = [ ('identifier', identifier) ]
        authenticator = DummyAuthenticator()
        authenticators = [ ('authenticator', authenticator) ]
        mdprovider = DummyMDProvider({'foo':'bar'})
        mdproviders = [ ('mdprovider', mdprovider) ]
        mw = self._makeOne(app=app, challengers=challengers,
                           identifiers=identifiers,
                           authenticators=authenticators,
                           mdproviders=mdproviders)
        start_response = DummyStartResponse()
        result = mw(environ, start_response)
        # metadata
        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
    # XXX need more call tests:
    #  - auth_id sorting
class TestMatchClassification(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import match_classification
        return match_classification
    def test_match_classification(self):
        f = self._getFUT()
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        from repoze.who.interfaces import IAuthenticator
        multi1 = DummyMultiPlugin()
        multi2 = DummyMultiPlugin()
        multi1.classifications = {IIdentifier:('foo', 'bar'),
                                  IChallenger:('buz',),
                                  IAuthenticator:None}
        multi2.classifications = {IIdentifier:('foo', 'baz', 'biz')}
        plugins = (multi1, multi2)
        # specific
        self.assertEqual(f(IIdentifier, plugins, 'foo'), [multi1, multi2])
        self.assertEqual(f(IIdentifier, plugins, 'bar'), [multi1])
        self.assertEqual(f(IIdentifier, plugins, 'biz'), [multi2])
        # any for multi2
        self.assertEqual(f(IChallenger, plugins, 'buz'), [multi1, multi2])
        # any for either
        self.assertEqual(f(IAuthenticator, plugins, 'buz'), [multi1, multi2])
class TestStartResponseWrapper(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import StartResponseWrapper
        return StartResponseWrapper
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_ctor(self):
        wrapper = self._makeOne(None)
        self.assertEqual(wrapper.start_response, None)
        self.assertEqual(wrapper.headers, [])
        self.failUnless(wrapper.buffer)
    def test_finish_response(self):
        statuses = []
        headerses = []
        datases = []
        closededs = []
        from StringIO import StringIO
        def write(data):
            datases.append(data)
        def close():
            closededs.append(True)
        write.close = close
        def start_response(status, headers, exc_info=None):
            statuses.append(status)
            headerses.append(headers)
            return write
        wrapper = self._makeOne(start_response)
        wrapper.status = '401 Unauthorized'
        wrapper.headers = [('a', '1')]
        wrapper.buffer = StringIO('written')
        extra_headers = [('b', '2')]
        result = wrapper.finish_response(extra_headers)
        self.assertEqual(result, None)
        self.assertEqual(headerses[0], wrapper.headers + extra_headers)
        self.assertEqual(statuses[0], wrapper.status)
        self.assertEqual(datases[0], 'written')
        self.assertEqual(closededs[0], True)
class TestMakeRegistries(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import make_registries
        return make_registries
    def test_empty(self):
        fn = self._getFUT()
        iface_reg, name_reg = fn([], [], [], [])
        self.assertEqual(iface_reg, {})
        self.assertEqual(name_reg, {})
    def test_brokenimpl(self):
        fn = self._getFUT()
        self.assertRaises(ValueError, fn, [(None, DummyApp())], [], [], [])
    def test_ok(self):
        fn = self._getFUT()
        credentials1 = {'login':'chris', 'password':'password'}
        dummy_id1 = DummyIdentifier(credentials1)
        credentials2 = {'login':'chris', 'password':'password'}
        dummy_id2 = DummyIdentifier(credentials2)
        identifiers = [ ('id1', dummy_id1), ('id2', dummy_id2) ]
        dummy_auth = DummyAuthenticator(None)
        authenticators = [ ('auth', dummy_auth) ]
        dummy_challenger = DummyChallenger(None)
        challengers = [ ('challenger', dummy_challenger) ]
        dummy_mdprovider = DummyMDProvider()
        mdproviders = [ ('mdproviders', dummy_mdprovider) ]
        iface_reg, name_reg = fn(identifiers, authenticators, challengers,
                                 mdproviders)
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IAuthenticator
        from repoze.who.interfaces import IChallenger
        self.assertEqual(iface_reg[IIdentifier], [dummy_id1, dummy_id2])
        self.assertEqual(iface_reg[IAuthenticator], [dummy_auth])
        self.assertEqual(iface_reg[IChallenger], [dummy_challenger])
        self.assertEqual(name_reg['id1'], dummy_id1)
        self.assertEqual(name_reg['id2'], dummy_id2)
        self.assertEqual(name_reg['auth'], dummy_auth)
        self.assertEqual(name_reg['challenger'], dummy_challenger)
class TestIdentityDict(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import Identity
        return Identity
    def _makeOne(self, **kw):
        klass = self._getTargetClass()
        return klass(**kw)
    def test_str(self):
        identity = self._makeOne(foo=1)
        self.failUnless(str(identity).startswith('<repoze.who identity'))
        self.assertEqual(identity['foo'], 1)
    def test_repr(self):
        identity = self._makeOne(foo=1)
        self.failUnless(str(identity).startswith('<repoze.who identity'))
        self.assertEqual(identity['foo'], 1)
class WrapGeneratorTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import wrap_generator
        return wrap_generator
    def test_it(self):
        L = []
        def gen(L=L):
            L.append('yo!')
            yield 'a'
            yield 'b'
        wrap_generator = self._getFUT()
        newgen = wrap_generator(gen())
        self.assertEqual(L, ['yo!'])
        self.assertEqual(list(newgen), ['a', 'b'])
class DummyApp:
    environ = None
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
class DummyWorkingApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        self.environ = environ
        start_response(self.status, self.headers)
        return ['body']
class DummyGeneratorApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        def gen(self=self, start_response=start_response):
            self.environ = environ
            start_response(self.status, self.headers)
            yield 'body'
        return gen()
class DummyIdentityResetApp:
    def __init__(self, status, headers, new_identity):
        self.status = status
        self.headers = headers
        self.new_identity = new_identity
    def __call__(self, environ, start_response):
        self.environ = environ
        environ['repoze.who.identity']['login'] = 'fred'
        environ['repoze.who.identity']['password'] = 'schooled'
        start_response(self.status, self.headers)
        return ['body']
class DummyChallenger:
    def __init__(self, app=None):
        self.app = app
    def challenge(self, environ, status, app_headers, forget_headers):
        environ['challenged'] = self.app
        return self.app
class DummyIdentifier:
    forgotten = False
    remembered = False
    def __init__(self, credentials=None, remember_headers=None,
                 forget_headers=None, replace_app=None):
        self.credentials = credentials
        self.remember_headers = remember_headers
        self.forget_headers = forget_headers
        self.replace_app = replace_app
    def identify(self, environ):
        if self.replace_app:
            environ['repoze.who.application'] = self.replace_app
        return self.credentials
    def forget(self, environ, identity):
        self.forgotten = identity
        return self.forget_headers
    def remember(self, environ, identity):
        self.remembered = identity
        return self.remember_headers
class DummyAuthenticator:
    def __init__(self, userid=None):
        self.userid = userid
    def authenticate(self, environ, credentials):
        if self.userid is None:
            return credentials['login']
        return self.userid
class DummyFailAuthenticator:
    def authenticate(self, environ, credentials):
        return None
class DummyRequestClassifier:
    def __call__(self, environ):
        return 'browser'
class DummyChallengeDecider:
    def __call__(self, environ, status, headers):
        if status.startswith('401 '):
            return True
class DummyNoResultsIdentifier:
    def identify(self, environ):
        return None
    def remember(self, *arg, **kw):
        pass
    def forget(self, *arg, **kw):
        pass
class DummyStartResponse:
    def __call__(self, status, headers, exc_info=None):
        self.status = status
        self.headers = headers
        self.exc_info = exc_info
        return []
class DummyMDProvider:
    def __init__(self, metadata=None):
        self._metadata = metadata
    def add_metadata(self, environ, identity):
        return identity.update(self._metadata)
class DummyMultiPlugin:
    pass
repoze/who/tests/test_restrict.py
New file
@@ -0,0 +1,152 @@
import unittest
class AuthenticatedPredicateTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.restrict import authenticated_predicate
        return authenticated_predicate()
    def test___call___no_identity_returns_False(self):
        predicate = self._getFUT()
        environ = {}
        self.failIf(predicate(environ))
    def test___call___w_REMOTE_AUTH_returns_True(self):
        predicate = self._getFUT()
        environ = {'REMOTE_USER': 'fred'}
        self.failUnless(predicate(environ))
    def test___call___w_repoze_who_identity_returns_True(self):
        predicate = self._getFUT()
        environ = {'repoze.who.identity': {'login': 'fred'}}
        self.failUnless(predicate(environ))
class PredicateRestrictionTests(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.restrict import PredicateRestriction
        return PredicateRestriction
    def _makeOne(self, app=None, **kw):
        if app is None:
            app = DummyApp()
        return self._getTargetClass()(app, **kw)
    def test___call___disabled_predicate_false_calls_app_not_predicate(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return False
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True}
        restrict = self._makeOne(predicate=_factory, enabled=False)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 0)
        self.assertEqual(len(_started), 0)
        self.assertEqual(restrict.app.environ, environ)
    def test___call___enabled_predicate_false_returns_401(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return False
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True}
        restrict = self._makeOne(predicate=_factory)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 1)
        self.assertEqual(len(_started), 1, _started)
        self.assertEqual(_started[0][0], '401 Unauthorized')
        self.assertEqual(restrict.app.environ, None)
    def test___call___enabled_predicate_true_calls_app(self):
        _tested = []
        def _factory():
            def _predicate(env):
                _tested.append(env)
                return True
            return _predicate
        _started = []
        def _start_response(status, headers):
            _started.append((status, headers))
        environ = {'testing': True, 'REMOTE_USER': 'fred'}
        restrict = self._makeOne(predicate=_factory)
        restrict(environ, _start_response)
        self.assertEqual(len(_tested), 1)
        self.assertEqual(len(_started), 0)
        self.assertEqual(restrict.app.environ, environ)
class MakePredicateRestrictionTests(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.restrict import make_predicate_restriction
        return make_predicate_restriction
    def test_non_string_predicate_no_args(self):
        fut = self._getFUT()
        app = DummyApp()
        def _predicate(env):
            return True
        def _factory():
            return _predicate
        filter = fut(app, {}, predicate=_factory)
        self.failUnless(filter.app is app)
        self.failUnless(filter.predicate is _predicate)
        self.failUnless(filter.enabled)
    def test_disabled_non_string_predicate_w_args(self):
        fut = self._getFUT()
        app = DummyApp()
        filter = fut(app, {}, predicate=DummyPredicate, enabled=False,
                     foo='Foo')
        self.failUnless(filter.app is app)
        self.failUnless(isinstance(filter.predicate, DummyPredicate))
        self.assertEqual(filter.predicate.foo, 'Foo')
        self.failIf(filter.enabled)
    def test_enabled_string_predicate_w_args(self):
        fut = self._getFUT()
        app = DummyApp()
        filter = fut(app, {},
                     predicate='repoze.who.tests.test_restrict:DummyPredicate',
                     enabled=True, foo='Foo')
        self.failUnless(filter.app is app)
        self.failUnless(isinstance(filter.predicate, DummyPredicate))
        self.assertEqual(filter.predicate.foo, 'Foo')
        self.failUnless(filter.enabled)
class DummyApp:
    environ = None
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
class DummyPredicate:
    def __init__(self, **kw):
        self.__dict__.update(kw)
    def __call__(self, env):
        return True