Chris McDonough
2008-02-25 7dfea7aee9d470b20ee1fede9920ed621ac969b4
Add form and cookie plugins, add post_extractor plugin type.


2 files added
5 files modified
839 ■■■■ changed files
repoze/pam/classifiers.py 2 ●●● patch | view | raw | blame | history
repoze/pam/interfaces.py 98 ●●●● patch | view | raw | blame | history
repoze/pam/middleware.py 299 ●●●● patch | view | raw | blame | history
repoze/pam/plugins/basicauth.py 32 ●●●●● patch | view | raw | blame | history
repoze/pam/plugins/cookie.py 57 ●●●●● patch | view | raw | blame | history
repoze/pam/plugins/form.py 90 ●●●●● patch | view | raw | blame | history
repoze/pam/tests.py 261 ●●●● patch | view | raw | blame | history
repoze/pam/classifiers.py
@@ -53,7 +53,7 @@
    
class DefaultResponseClassifier(object):
    implements(IResponseClassifier)
    def __call__(self, environ, request_classification, headers, exception):
    def __call__(self, environ, request_classification, status, headers):
        """ By default, return the request classification """
        return request_classification
    
repoze/pam/interfaces.py
@@ -2,27 +2,36 @@
class IRequestClassifier(Interface):
    """ On ingress: classify a request.
    This interface is responsible for returning a string representing
    a classification name based on introspection of the WSGI
    environment (environ).
    """
    def __call__(environ):
        """ Return a string representing the classification of this
        request. """
        """ environ -> request classifier string
        This interface is responsible for returning a string
        value representing a request classification.
        o 'environ' is the WSGI environment.
        """
class IResponseClassifier(Interface):
    """ On egress: classify a response.
    This interface is responsible for returning a string representing
    a classification name based on introspection of the ingress
    classification, the WSGI environment (environ), the headers
    returned in the response (headers), or the exception raised by a
    downstream application.
    """
    def __call__(environ, request_classification, headers, exception):
        """ Return a string representing the classification of this
        request. """
    def __call__(environ, request_classification, status, headers):
        """ args -> response classifier string
        This interface is responsible for returning a string representing
        a response classification.
        o 'environ' is the WSGI environment.
        o 'request_classification' is the classification returned during
          ingress by the request classifier.
        o 'status' is the status written into start_response by
          the downstream application.
        o 'headers' is the headers tuple written into start_response
          by the downstream application.
          """
class IExtractorPlugin(Interface):
@@ -48,7 +57,34 @@
        o Only extraction plugins which match one of the the current
          request's classifications will be asked to perform extraction.
        """
class IPostExtractorPlugin(Interface):
    """ On ingress: allow the plugin to have a chance to influence the
    environment once credentials are established and return extra
    headers that will be set in the eventual response.
    Each post-extractor matching the request classification is called
    unconditionally after extraction.
    """
    def post_extract(environ, credentials, extractor):
        """ args -> [ (header-name, header-value), ..] | None
        o 'environ' is the WSGI environment.
        o credentials are the credentials that were extracted by
          repoze.pam during the extraction step.
        o 'extractor' is the plugin instance that provided the
          credentials.  If no plugin instance provided credentials to
          repoze.pam, this will be None.
        The return value should be a list of tuples, where each tuple is
        in the form (header-name, header-value), e.g.
        [ ('Set-Cookie', 'cookie_name=foo; Path=/') ] or None if
        no headers should be set.
        """
class IAuthenticatorPlugin(Interface):
    """ On ingress: Map credentials to a user ID.
@@ -70,21 +106,29 @@
class IChallengerPlugin(Interface):
    """ On egress: Initiate a challenge to the user to provide credentials.
    """ On egress: Conditionally initiate a challenge to the user to
        provide credentials.
        Only challenge plugins which match one of the the current
        response's classifications will be asked to perform a
        challenge.
    """
    def challenge(environ, status, headers):
        """ args -> WSGI application or None
        o 'environ' is the WSGI environment.
        o Only challenge plugins which match one of the the current
          request's classifications will be asked to perform a
          challenge.
    """
        o 'status' is the status written into start_response by the
          downstream application.
    def challenge(environ, request_classifier, headers, exception):
        o 'headers' is the headers tuple written into start_response by the
          downstream application.
        """ Examine the values passed in and perform an arbitrary action
        (usually mutating environ or raising an exception) to cause a
        challenge to be raised to the user.
        The return value of this method is ignored.
        Examine the values passed in and return a WSGI application
        (a callable which accepts environ and start_response as its
        two positional arguments, ala PEP 333) which causes a
        challenge to be performed.  Return None to forego performing a
        challenge.
        """
repoze/pam/middleware.py
@@ -1,89 +1,240 @@
import logging
from StringIO import StringIO
import sys
from paste.httpheaders import REMOTE_USER
from repoze.pam.interfaces import IAuthenticatorPlugin
from repoze.pam.interfaces import IExtractorPlugin
from repoze.pam.interfaces import IPostExtractorPlugin
from repoze.pam.interfaces import IChallengerPlugin
class StartResponseWrapper(object):
    def __init__(self, start_response, extra_headers):
        self.start_response = start_response
        self.extra_headers = extra_headers
        self.headers = []
        self.buffer = StringIO()
    def wrap_start_response(self, status, headers, exc_info=None):
        self.headers = headers
        self.status = status
        return self.buffer.write
    def finish_response(self):
        headers = self.headers + self.extra_headers
        write = self.start_response(self.status, headers)
        if write:
            self.buffer.seek(0)
            write(self.buffer.getvalue())
            if hasattr(write, 'close'):
                write.close()
_STARTED = '-- repoze.pam request started --'
_ENDED = '-- repoze.pam request ended --'
class PluggableAuthenticationMiddleware(object):
    def __init__(self, app, registry, request_classifier, response_classifier,
                 add_credentials=True):
    def __init__(self, app,
                 registry,
                 request_classifier,
                 response_classifier,
                 add_credentials=False,
                 log_stream=None,
                 log_level=logging.INFO):
        self.registry = registry
        self.app = app
        self.request_classifier = request_classifier
        self.response_classififer = response_classifier
        self.response_classifier = response_classifier
        self.add_credentials = add_credentials
        self.logger = None
        if log_stream:
            handler = logging.StreamHandler(log_stream)
            fmt = '%(asctime)s %(message)s'
            formatter = logging.Formatter(fmt)
            handler.setFormatter(formatter)
            self.logger = logging.Logger('repoze.pam')
            self.logger.addHandler(handler)
            self.logger.setLevel(log_level)
    def __call__(self, environ, start_response):
        request_classification = self.on_ingress(environ)
        return self.app(environ, start_response)
        # XXX on_egress
        logger = self.logger
        logger and logger.info(_STARTED)
        classification, extra_headers = self.modify_environment(environ)
    def on_ingress(self, environ):
        wrapper = StartResponseWrapper(start_response, extra_headers)
        app_iter = self.app(environ, wrapper.wrap_start_response)
        challenge_app = self.challenge(
            environ,
            classification,
            wrapper.status,
            wrapper.headers
            )
        logger and logger.info('challenge app used: %s' % challenge_app)
        if challenge_app is not None:
            if hasattr(app_iter, 'close'):
                app_iter.close()
            logger and logger.info(_ENDED)
            return challenge_app(environ, start_response)
        else:
            wrapper.finish_response()
            logger and logger.info(_ENDED)
            return app_iter
    def modify_environment(self, environ):
        # happens on ingress
        classification = self.request_classifier(environ)
        credentials = self.extract(environ, classification)
        logger = self.logger
        logger and logger.info('request classification: %s' % classification)
        credentials, extractor = self.extract(environ, classification)
        headers = self.after_extract(environ, credentials, extractor,
                                     classification)
        userid = None
        if credentials:
            userid = self.authenticate(environ, credentials, classification)
            userid, authenticator = self.authenticate(environ,
                                                      credentials,
                                                      classification)
        else:
            logger and logger.info(
                'no authenticator plugin used (no credentials)')
        if self.add_credentials:
            environ['repoze.pam.credentials'] = credentials
        if userid:
        remote_user_not_set = not REMOTE_USER(environ)
        if remote_user_not_set and userid:
            # only set REMOTE_USER if it's not yet set
            logger and logger.info('REMOTE_USER set to %s' % userid)
            environ['REMOTE_USER'] = userid
        else:
            logger and logger.info('REMOTE_USER not set')
        return classification
        return classification, headers
    def extract(self, environ, classification):
        # happens on ingress
        candidates = self.registry.get(IExtractorPlugin, ())
        plugins = self._match_classification(candidates, classification,
                                             'request_classifications')
        logger = self.logger
        logger and self.logger.info(
            'extractor plugins consulted %s' % plugins)
    def on_egress(self, environ, request_classifier, headers, exception):
        self.challenge(environ, request_classifier, headers, exception)
    def extract(self, environ, classifier):
        extractor_candidates = self.registry.get(IExtractorPlugin)
        extractors = self._match_classifier(extractor_candidates, classifier)
        for extractor in extractors:
            creds = extractor.extract(environ)
        for plugin in plugins:
            creds = plugin.extract(environ)
            logger and logger.debug(
                'credentials returned from extractor %s: %s' %
                (plugin, creds)
                )
            if creds:
                # XXX PAS returns all credentials (it fully iterates over all
                # extraction plugins)
                return creds
        return {}
                logger and logger.info(
                    'using credentials returned from extractor %s' % plugin)
                return creds, plugin
        logger and logger.info('no extractor plugins found credentials')
        return {}, None
    def after_extract(self, environ, credentials, extractor, classification):
        candidates = self.registry.get(IPostExtractorPlugin, ())
        plugins = self._match_classification(candidates,
                                             classification,
                                             'request_classifications')
        logger = self.logger
        logger and logger.info(
            'post-extractor plugins consulted %s' % plugins)
        extra_headers = {}
        for plugin in plugins:
            headers = plugin.post_extract(environ, credentials, extractor)
            logger and logger.debug(
                'headers returned from post-extractor %s: %s' %
                (plugin, headers)
                )
            if headers:
                extra_headers[plugin] = headers
        logger and logger.info('extra headers gathered: %s' % extra_headers)
        return flatten(extra_headers.values())
    def authenticate(self, environ, credentials, classification):
        # on ingress
        userid = None
        auth_candidates = self.registry.get(IAuthenticatorPlugin)
        authenticators = self._match_classifier(auth_candidates, classification)
        for authenticator in authenticators:
            userid = authenticator.authenticate(environ, credentials)
        # happens on ingress
        candidates = self.registry.get(IAuthenticatorPlugin, ())
        plugins = self._match_classification(candidates,
                                             classification,
                                             'request_classifications')
        logger = self.logger
        logger and logger.info(
            'authenticator plugins consulted %s' % plugins)
        for plugin in plugins:
            userid = plugin.authenticate(environ, credentials)
            logger and logger.info(
                'userid returned from authenticator %s: %s' %
                (plugin, userid)
                )
            if userid:
                # XXX PAS calls all authenticators (it fully iterates over all
                # authenticator plugins)
                break
        return userid
                logger and logger.info(
                    'using userid returned from authenticator %s' % plugin)
                return userid, plugin
    def challenge(self, environ, request_classification, headers, exception):
        # on egress
        classification = self.response_classifier(environ,
                                                  request_classification,
                                                  headers,
                                                  exception)
        challenger_candidates = self.registry.get(IChallengerPlugin)
        challengers = self._match_classifier(challenger_candidates,
                                              classification)
        for challenger in challengers:
            new_headers, new_status = challengers.challenge(environ)
        logger and logger.info('no authenticator plugin authenticated a userid')
        return None, None
    def _match_classifier(self, plugins, classifier):
    def challenge(self, environ, request_classification, status, headers):
        # happens on egress
        classification = self.response_classifier(
            environ,
            request_classification,
            status,
            headers
            )
        logger = self.logger
        logger and logger.info('response classification: %s' % classification)
        candidates = self.registry.get(IChallengerPlugin, ())
        plugins = self._match_classification(candidates,
                                             classification,
                                             'response_classifications')
        logger and logger.info('challenger plugins consulted: %s' % plugins)
        for plugin in plugins:
            app = plugin.challenge(environ, status, headers)
            logger and logger.debug('app returned from challenger %s: %s' %
                                    (plugin, app)
                                    )
            if app is not None:
                # new WSGI application
                logger and logger.info(
                    'challenger plugin %s returned an app: %s' % (plugin, app))
                return app
        logger and logger.info('no challenge app returned')
        # signifies no challenge
        return None
    def _match_classification(self, plugins, classification, attr):
        result = []
        for plugin in plugins:
            plugin_classifiers = getattr(plugin, 'classifiers', None)
            if not plugin_classifiers: # good for any
            plugin_classifications = getattr(plugin, attr, None)
            if not plugin_classifications: # good for any
                result.append(plugin)
                continue
            if classifier in plugin_classifiers:
            if classification in plugin_classifications:
                result.append(plugin)
                    
        return result
def flatten(L):
    result = []
    for seq in L:
        for item in seq:
            result.append(item)
    return result
def make_middleware(app, global_conf, config_file=None):
    if config_file is None:
@@ -91,23 +242,49 @@
    return PluggableAuthenticationMiddleware(app)
def make_test_middleware(app, global_conf):
    # no config file required
    # be able to test without a config file
    from repoze.pam.plugins.basicauth import BasicAuthPlugin
    from repoze.pam.plugins.htpasswd import HTPasswdPlugin
    from repoze.pam.plugins.cookie import InsecureCookiePlugin
    from repoze.pam.plugins.form import FormPlugin
    basicauth = BasicAuthPlugin('repoze.pam')
    basicauth.classifiers = set() # good for any
    any = set() # means good for any classification
    basicauth.request_classifications = any
    basicauth.response_classifications = any
    from StringIO import StringIO
    from repoze.pam.plugins.htpasswd import crypt_check
    io = StringIO('chrism:aajfMKNH1hTm2\n')
    io = StringIO()
    salt = 'aa'
    import crypt
    for name, password in [ ('admin', 'admin') ]:
        io.write('name:%s\n' % crypt.crypt(password, salt))
    htpasswd = HTPasswdPlugin(io, crypt_check)
    htpasswd.classifiers = set() # good for any
    registry = make_registry((basicauth,), (htpasswd,), (basicauth,))
    class DummyClassifier:
        def __call__(self, *arg, **kw):
            return None
    classifier = DummyClassifier()
    middleware = PluggableAuthenticationMiddleware(app, registry,
                                                   classifier, classifier)
    htpasswd.request_classifications = any
    htpasswd.response_classifications = any
    cookie = InsecureCookiePlugin('oatmeal')
    cookie.request_classifications = any
    cookie.response_classifications = any
    form = FormPlugin('__do_login')
    # only do form extract/challenge for browser requests
    form.request_classifications = set(('browser',))
    form.response_classifications = set(('browser',))
    registry = make_registry(
        extractors = (cookie, basicauth, form),
        post_extractors = (cookie, basicauth),
        authenticators = (htpasswd,),
        challengers = (form, basicauth),
        )
    from repoze.pam.classifiers import DefaultRequestClassifier
    from repoze.pam.classifiers import DefaultResponseClassifier
    request_classifier = DefaultRequestClassifier()
    response_classifier = DefaultResponseClassifier()
    middleware = PluggableAuthenticationMiddleware(app,
                                                   registry,
                                                   request_classifier,
                                                   response_classifier,
                                                   log_stream=sys.stdout,
                                                   log_level = logging.DEBUG
                                                   )
    return middleware
def verify(plugins, iface):
@@ -115,10 +292,12 @@
    for plugin in plugins:
        verifyObject(iface, plugin, tentative=True)
    
def make_registry(extractors, authenticators, challengers):
def make_registry(extractors, post_extractors, authenticators, challengers):
    registry = {}
    verify(extractors, IExtractorPlugin)
    registry[IExtractorPlugin] = extractors
    verify(post_extractors, IExtractorPlugin)
    registry[IPostExtractorPlugin] = post_extractors
    verify(authenticators, IAuthenticatorPlugin)
    registry[IAuthenticatorPlugin] = authenticators
    verify(challengers, IChallengerPlugin)
repoze/pam/plugins/basicauth.py
@@ -8,43 +8,51 @@
from repoze.pam.interfaces import IChallengerPlugin
from repoze.pam.interfaces import IExtractorPlugin
from repoze.pam.interfaces import IPostExtractorPlugin
class BasicAuthPlugin(object):
    implements(IChallengerPlugin, IExtractorPlugin)
    implements(IChallengerPlugin, IExtractorPlugin, IPostExtractorPlugin)
    
    def __init__(self, realm):
        self.realm = realm
    # IChallengerPlugin
    def challenge(self, environ, request_classifier, headers, exception):
        head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
        raise HTTPUnauthorized(headers=head)
    def challenge(self, environ, status, headers):
        if status == '401 Unauthorized':
            headers = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm)
            return HTTPUnauthorized(headers=headers)
    # IExtractorPlugin
    def extract(self, environ):
        authorization = AUTHORIZATION(environ)
        try:
            authmeth, auth = authorization.split(' ', 1)
        except ValueError:
            # not enough values to unpack
        except ValueError: # not enough values to unpack
            return {}
        if authmeth.lower() == 'basic':
            try:
                auth = auth.strip().decode('base64')
            except binascii.Error:
                # can't decode
            except binascii.Error: # can't decode
                return {}
            try:
                login, password = auth.split(':', 1)
            except ValueError:
                # not enough values to unpack
            except ValueError: # not enough values to unpack
                return {}
            return {'login':login, 'password':password}
            auth = {'login':login, 'password':password}
            return auth
        return {}
    # IPostExtractorPlugin
    def post_extract(self, environ, credentials, extractor):
        if credentials:
            if not AUTHORIZATION(environ):
                auth = '%(login)s:%(password)s' % credentials
                auth = auth.encode('base64').rstrip()
                header = 'Basic %s' % auth
                environ['HTTP_AUTHORIZATION'] = header
def make_plugin(pam_conf, realm='basic'):
    plugin = BasicAuthPlugin(realm)
    return plugin
repoze/pam/plugins/cookie.py
New file
@@ -0,0 +1,57 @@
import binascii
from paste.request import get_cookies
from zope.interface import implements
from repoze.pam.interfaces import IExtractorPlugin
from repoze.pam.interfaces import IPostExtractorPlugin
class InsecureCookiePlugin(object):
    implements(IExtractorPlugin, IPostExtractorPlugin)
    def __init__(self, cookie_name):
        self.cookie_name = cookie_name
    # IExtractorPlugin
    def extract(self, environ):
        cookies = get_cookies(environ)
        cookie = cookies.get(self.cookie_name)
        if cookie is None:
            return {}
        try:
            auth = cookie.value.decode('base64')
        except binascii.Error: # can't decode
            return {}
        try:
            login, password = auth.split(':', 1)
            return {'login':login, 'password':password}
        except ValueError: # not enough values to unpack
            return {}
    # IPostExtractorPlugin
    def post_extract(self, environ, credentials, extractor):
        if credentials:
            cookie_value = '%(login)s:%(password)s' % credentials
            cookie_value = cookie_value.encode('base64').rstrip()
            cookies = get_cookies(environ)
            existing = cookies.get(self.cookie_name)
            value = getattr(existing, 'value', None)
            if value != cookie_value:
                # go ahead and set it in the environment for downstream
                # apps to consume (XXX?)
                cookies[self.cookie_name] = cookie_value
                output = cookies.output(header='', sep='').lstrip()
                environ['HTTP_COOKIE'] = output
                # return a Set-Cookie header
                set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value)
                return [('Set-Cookie', set_cookie)]
def make_plugin(pam_conf, cookie_name='repoze.pam.plugins.cookie'):
    plugin = InsecureCookiePlugin(cookie_name)
    return plugin
repoze/pam/plugins/form.py
New file
@@ -0,0 +1,90 @@
from paste.httpheaders import CONTENT_LENGTH
from paste.httpheaders import CONTENT_TYPE
from paste.request import parse_dict_querystring
from paste.request import parse_formvars
from zope.interface import implements
from repoze.pam.interfaces import IChallengerPlugin
from repoze.pam.interfaces import IExtractorPlugin
_DEFAULT_FORM = """
<html>
<head>
  <title>Login Form</title>
</head>
<body>
  <div>
     <b>Log In</b>
  </div>
  <br/>
  <form method="POST" action="?__do_login=true">
    <table border="0">
    <tr>
      <td>User Name</td>
      <td><input type="text" name="login"></input></td>
    </tr>
    <tr>
      <td>Password</td>
      <td><input type="password" name="password"></input></td>
    </tr>
    <tr>
      <td></td>
      <td><input type="submit" name="submit" value="Log In"/></td>
    </tr>
    </table>
  </form>
  <pre>
  %s
  </pre>
</body>
</html>
"""
def auth_form(environ, start_response):
    import pprint
    form = _DEFAULT_FORM % pprint.pformat(environ)
    content_length = CONTENT_LENGTH.tuples(str(len(form)))
    content_type = CONTENT_TYPE.tuples('text/html')
    headers = content_length + content_type
    start_response('200 OK', headers)
    return [form]
class FormPlugin(object):
    implements(IChallengerPlugin, IExtractorPlugin)
    def __init__(self, login_form_qs):
        self.login_form_qs = login_form_qs
    # IExtractorPlugin
    def extract(self, environ):
        query = parse_dict_querystring(environ)
        # If the extractor finds a special query string on any request,
        # it will attempt to find the values in the input body.
        if query.get(self.login_form_qs):
            form = parse_formvars(environ)
            from StringIO import StringIO
            # XXX we need to replace wsgi.input because we've read it
            # this smells funny
            environ['wsgi.input'] = StringIO()
            form.update(query)
            try:
                login = form['login']
                password = form['password']
            except KeyError:
                return {}
            return {'login':login, 'password':password}
        return {}
    # IChallengerPlugin
    def challenge(self, environ, status, headers):
        if status == '401 Unauthorized':
            return auth_form
def make_plugin(pam_conf, login_form_qs='__do_login'):
    plugin = FormPlugin(login_form_qs)
    return plugin
repoze/pam/tests.py
@@ -44,31 +44,37 @@
    def test_extract_success(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        creds = mw.extract(environ, None)
        creds, plugin = mw.extract(environ, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.failIf(plugin is None)
    def test_extract_fail(self):
        environ = self._makeEnviron()
        from repoze.pam.interfaces import IExtractorPlugin
        extractor = DummyNoResultsExtractor()
        registry = {
            IExtractorPlugin:[DummyNoResultsExtractor()]
            IExtractorPlugin:[extractor]
            }
        mw = self._makeOne(registry=registry)
        creds = mw.extract(environ, None)
        creds, plugin = mw.extract(environ, None)
        self.assertEqual(creds, {})
        self.assertEqual(plugin, None)
    def test_extract_success_skip_noresults(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IExtractorPlugin
        extractor1 = DummyNoResultsExtractor()
        extractor2 = DummyExtractor()
        registry = {
            IExtractorPlugin:[DummyNoResultsExtractor(), DummyExtractor()]
            IExtractorPlugin:[extractor1, extractor2]
            }
        mw = self._makeOne(registry=registry)
        creds = mw.extract(environ, None)
        creds, plugin = mw.extract(environ, None)
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(plugin, extractor2)
    def test_extract_success_firstwins(self):
        environ = self._makeEnviron()
@@ -80,46 +86,49 @@
            IExtractorPlugin:[extractor1, extractor2]
            }
        mw = self._makeOne(registry=registry)
        creds = mw.extract(environ, None)
        creds, plugin = mw.extract(environ, None)
        self.assertEqual(creds['login'], 'fred')
        self.assertEqual(creds['password'], 'fred')
        self.assertEqual(plugin, extractor1)
    def test_extract_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IExtractorPlugin
        extractor1 = DummyExtractor({'login':'fred','password':'fred'})
        extractor1.classifiers = set(['nomatch'])
        extractor1.request_classifications = set(['nomatch'])
        extractor2 = DummyExtractor({'login':'bob','password':'bob'})
        registry = {
            IExtractorPlugin:[extractor1, extractor2]
            }
        mw = self._makeOne(registry=registry)
        creds = mw.extract(environ, None)
        creds, plugin = mw.extract(environ, None)
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, extractor2)
    def test_extract_find_explicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IExtractorPlugin
        extractor1 = DummyExtractor({'login':'fred','password':'fred'})
        extractor1.classifiers = set(['nomatch'])
        extractor1.request_classifications = set(['nomatch'])
        extractor2 = DummyExtractor({'login':'bob','password':'bob'})
        extractor2.classifiers = set(['match'])
        extractor2.request_classifications = set(['match'])
        registry = {
            IExtractorPlugin:[extractor1, extractor2]
            }
        mw = self._makeOne(registry=registry)
        creds = mw.extract(environ, 'match')
        creds, plugin = mw.extract(environ, 'match')
        self.assertEqual(creds['login'], 'bob')
        self.assertEqual(creds['password'], 'bob')
        self.assertEqual(plugin, extractor2)
    def test_authenticate_success(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        creds = {'login':'chris', 'password':'password'}
        userid = mw.authenticate(environ, creds, None)
        userid, plugin = mw.authenticate(environ, creds, None)
        self.assertEqual(userid, 'chris')
    def test_authenticate_fail(self):
@@ -131,95 +140,132 @@
            IAuthenticatorPlugin:[DummyFailAuthenticator()]
            }
        mw = self._makeOne(registry=registry)
        userid = mw.authenticate(environ, creds, None)
        userid, plugin = mw.authenticate(environ, creds, None)
        self.assertEqual(userid, None)
        self.assertEqual(plugin, None)
    def test_authenticate_success_skip_fail(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IAuthenticatorPlugin
        plugin1 = DummyFailAuthenticator()
        plugin2 = DummyAuthenticator()
        registry = {
            IAuthenticatorPlugin:[DummyFailAuthenticator(),DummyAuthenticator()]
            IAuthenticatorPlugin:[plugin1, plugin2]
            }
        mw = self._makeOne(registry=registry)
        creds = {'login':'chris', 'password':'password'}
        userid = mw.authenticate(environ, creds, None)
        userid, plugin = mw.authenticate(environ, creds, None)
        self.assertEqual(userid, 'chris')
        self.assertEqual(plugin, plugin2)
    def test_authenticate_success_firstwins(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IAuthenticatorPlugin
        plugin1 = DummyAuthenticator('chris_id1')
        plugin2 = DummyAuthenticator('chris_id2')
        registry = {
            IAuthenticatorPlugin:[DummyAuthenticator('chris_id1'),
                                  DummyAuthenticator('chris_id2')]
            IAuthenticatorPlugin:[plugin1, plugin2]
            }
        mw = self._makeOne(registry=registry)
        creds = {'login':'chris', 'password':'password'}
        userid = mw.authenticate(environ, creds, None)
        userid, plugin = mw.authenticate(environ, creds, None)
        self.assertEqual(userid, 'chris_id1')
        self.assertEqual(plugin, plugin1)
    def test_authenticate_find_implicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IAuthenticatorPlugin
        plugin1 = DummyAuthenticator('chris_id1')
        plugin1.classifiers = set(['nomatch'])
        plugin1.request_classifications = set(['nomatch'])
        plugin2 = DummyAuthenticator('chris_id2')
        registry = {
            IAuthenticatorPlugin:[plugin1, plugin2]
            }
        mw = self._makeOne(registry=registry)
        creds = {'login':'chris', 'password':'password'}
        userid = mw.authenticate(environ, creds, None)
        userid, plugin = mw.authenticate(environ, creds, None)
        self.assertEqual(userid, 'chris_id2')
        self.assertEqual(plugin, plugin2)
    def test_authenticate_find_explicit_classifier(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        from repoze.pam.interfaces import IAuthenticatorPlugin
        plugin1 = DummyAuthenticator('chris_id1')
        plugin1.classifiers = set(['nomatch'])
        plugin1.request_classifications = set(['nomatch'])
        plugin2 = DummyAuthenticator('chris_id2')
        plugin2.classifiers = set(['match'])
        plugin2.request_classifications = set(['match'])
        registry = {
            IAuthenticatorPlugin:[plugin1, plugin2]
            }
        mw = self._makeOne(registry=registry)
        creds = {'login':'chris', 'password':'password'}
        userid = mw.authenticate(environ, creds, 'match')
        userid, plugin = mw.authenticate(environ, creds, 'match')
        self.assertEqual(userid, 'chris_id2')
        self.assertEqual(plugin, plugin2)
    def test_on_ingress_success_addcredentials(self):
    def test_modify_environment_success_addcredentials(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        classification = mw.on_ingress(environ)
        classification, headers = mw.modify_environment(environ)
        self.assertEqual(classification, 'browser')
        self.assertEqual(environ['REMOTE_USER'], 'chris')
        self.assertEqual(environ['repoze.pam.credentials'],
                         {'login':'chris','password':'password'})
        self.assertEqual(headers, [])
        
    def test_on_ingress_success_noaddcredentials(self):
    def test_modify_environment_noaddcredentials(self):
        environ = self._makeEnviron()
        mw = self._makeOne()
        mw.add_credentials = False
        classification = mw.on_ingress(environ)
        classification, headers = mw.modify_environment(environ)
        self.assertEqual(classification, 'browser')
        self.assertEqual(environ['REMOTE_USER'], 'chris')
        self.failIf(environ.has_key('repoze.pam.credentials'))
        self.assertEqual(headers, [])
    def test_on_ingress_nocredentials(self):
    def test_modify_environment_nocredentials(self):
        environ = self._makeEnviron()
        from repoze.pam.interfaces import IExtractorPlugin
        registry = {
            IExtractorPlugin:[DummyNoResultsExtractor()],
            }
        mw = self._makeOne(registry=registry)
        classification = mw.on_ingress(environ)
        classification, headers = mw.modify_environment(environ)
        self.assertEqual(classification, 'browser')
        self.assertEqual(environ.get('REMOTE_USER'), None)
        self.assertEqual(environ['repoze.pam.credentials'], {})
        self.assertEqual(headers, [])
    def test_modify_environment_remoteuser_already_set(self):
        environ = self._makeEnviron({'REMOTE_USER':'admin'})
        mw = self._makeOne()
        classification, headers = mw.modify_environment(environ)
        self.assertEqual(classification, 'browser')
        self.assertEqual(environ.get('REMOTE_USER'), 'admin')
        self.assertEqual(environ['repoze.pam.credentials'],
                         {'login':'chris', 'password':'password'})
        self.assertEqual(headers, [])
    def test_modify_environment_with_postextractor(self):
        environ = self._makeEnviron({'REMOTE_USER':'admin'})
        from repoze.pam.interfaces import IExtractorPlugin
        from repoze.pam.interfaces import IPostExtractorPlugin
        registry = {
            IExtractorPlugin:[DummyExtractor()],
            IPostExtractorPlugin:[DummyPostExtractor()],
            }
        mw = self._makeOne(registry=registry)
        classification, headers = mw.modify_environment(environ)
        self.assertEqual(classification, 'browser')
        self.assertEqual(environ.get('REMOTE_USER'), 'admin')
        self.assertEqual(environ['repoze.pam.credentials'],
                         {'login':'chris', 'password':'password'})
        self.assertEqual(headers, [('foo', 'bar')])
class TestBasicAuthPlugin(Base):
    def _getTargetClass(self):
        from repoze.pam.plugins.basicauth import BasicAuthPlugin
@@ -237,56 +283,87 @@
        verifyClass(IChallengerPlugin, klass)
        verifyClass(IExtractorPlugin, klass)
    def test_challenge(self):
    def test_challenge_non401(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        from paste.httpexceptions import HTTPUnauthorized
        self.assertRaises(HTTPUnauthorized, plugin.challenge, environ,
                          None, None, None)
        result = plugin.challenge(environ, '200 OK', {})
        self.assertEqual(result, None)
    def test_challenge_401(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        result = plugin.challenge(environ, '401 Unauthorized', {})
        self.assertNotEqual(result, None)
        app_iter = result(environ, lambda *arg: None)
        items = []
        for item in app_iter:
            items.append(item)
        response = ''.join(items)
        self.failUnless(response.startswith('401 Unauthorized'))
        
    def test_extract_noauthinfo(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        result = plugin.extract(environ)
        self.assertEqual(result, {})
        creds = plugin.extract(environ)
        self.assertEqual(creds, {})
    def test_extract_nonbasic(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
    def test_extract_nonbasic(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
        creds = plugin.extract(environ)
        self.assertEqual(creds, {})
    def test_extract_basic_badencoding(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
        creds = plugin.extract(environ)
        self.assertEqual(creds, {})
    def test_extract_basic_badrepr(self):
        plugin = self._makeOne('realm')
        value = 'foo'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
        creds = plugin.extract(environ)
        self.assertEqual(creds, {})
    def test_extract_basic_ok(self):
        plugin = self._makeOne('realm')
        value = 'foo:bar'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        result = plugin.extract(environ)
        self.assertEqual(result, {'login':'foo', 'password':'bar'})
        creds = plugin.extract(environ)
        self.assertEqual(creds, {'login':'foo', 'password':'bar'})
    def test_post_extract_nocreds(self):
        plugin = self._makeOne('realm')
        creds = {}
        environ = self._makeEnviron()
        result = plugin.post_extract(environ, creds, plugin)
        self.assertEqual(result, None)
        self.assertEqual(environ.get('HTTP_AUTHORIZATION'), None)
    def test_post_extract_creds_withauthorization(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic foo'})
        result = plugin.post_extract(environ, creds, plugin)
        self.assertEqual(result, None)
        self.assertEqual(environ['HTTP_AUTHORIZATION'], 'Basic foo')
    def test_post_extract_creds_mutates(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        result = plugin.post_extract(environ, creds, plugin)
        self.assertEqual(result, None)
        auth = 'foo:password'.encode('base64').rstrip()
        auth = 'Basic ' + auth
        self.assertEqual(environ['HTTP_AUTHORIZATION'], auth)
    def test_factory(self):
        from repoze.pam.plugins.basicauth import make_plugin
        plugin = make_plugin({}, 'realm')
        self.assertEqual(plugin.realm, 'realm')
class TestHTPasswdPlugin(Base):
    def _getTargetClass(self):
        from repoze.pam.plugins.htpasswd import HTPasswdPlugin
@@ -376,7 +453,84 @@
                             'repoze.pam.plugins.htpasswd:crypt_check')
        self.assertEqual(plugin.filename, 'foo')
        self.assertEqual(plugin.check, crypt_check)
class TestInsecureCookiePlugin(Base):
    def _getTargetClass(self):
        from repoze.pam.plugins.cookie import InsecureCookiePlugin
        return InsecureCookiePlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.pam.interfaces import IExtractorPlugin
        from repoze.pam.interfaces import IPostExtractorPlugin
        klass = self._getTargetClass()
        verifyClass(IExtractorPlugin, klass)
        verifyClass(IPostExtractorPlugin, klass)
    def test_extract_nocookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron()
        result = plugin.extract(environ)
        self.assertEqual(result, {})
        
    def test_extract_badcookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
    def test_extract_badcookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
        result = plugin.extract(environ)
        self.assertEqual(result, {})
    def test_extract_success(self):
        plugin = self._makeOne('oatmeal')
        auth = 'foo:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.extract(environ)
        self.assertEqual(result, {'login':'foo', 'password':'password'})
    def test_post_extract_nocreds(self):
        plugin = self._makeOne('oatmeal')
        creds = {}
        environ = self._makeEnviron()
        result = plugin.post_extract(environ, creds, plugin)
        self.assertEqual(result, None)
        self.assertEqual(environ.get('HTTP_COOKIE'), None)
    def test_post_extract_creds_same(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'foo', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        auth = 'oatmeal=%s;' % auth
        environ = self._makeEnviron({'HTTP_COOKIE':auth})
        result = plugin.post_extract(environ, creds, plugin)
        self.assertEqual(result, None)
        self.assertEqual(environ.get('HTTP_COOKIE'), auth)
    def test_post_extract_creds_different(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'bar', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        creds_auth = 'bar:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.post_extract(environ, creds, plugin)
        expected = 'oatmeal=%s; Path=/;' % creds_auth
        self.assertEqual(result, [('Set-Cookie', expected)])
        self.assertEqual(environ['HTTP_COOKIE'], 'oatmeal=%s;' % creds_auth)
    def test_factory(self):
        from repoze.pam.plugins.cookie import make_plugin
        plugin = make_plugin(None, 'foo')
        self.assertEqual(plugin.cookie_name, 'foo')
class TestDefaultRequestClassifier(Base):
    def _getTargetClass(self):
@@ -477,5 +631,10 @@
        return None
class DummyChallenger:
    def challenge(self, environ, request_classifier, headers, exception):
    def challenge(self, environ, status, headers):
        environ['challenged'] = True
class DummyPostExtractor:
    def post_extract(self, environ, credentials, extractor):
        return [ ('foo', 'bar') ]