repoze/pam/classifiers.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/interfaces.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/middleware.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/plugins/basicauth.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/plugins/cookie.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/plugins/form.py | ●●●●● patch | view | raw | blame | history | |
repoze/pam/tests.py | ●●●●● patch | view | raw | blame | history |
repoze/pam/classifiers.py
@@ -53,7 +53,7 @@ class DefaultResponseClassifier(object): implements(IResponseClassifier) def __call__(self, environ, request_classification, headers, exception): def __call__(self, environ, request_classification, status, headers): """ By default, return the request classification """ return request_classification repoze/pam/interfaces.py
@@ -2,27 +2,36 @@ class IRequestClassifier(Interface): """ On ingress: classify a request. This interface is responsible for returning a string representing a classification name based on introspection of the WSGI environment (environ). """ def __call__(environ): """ Return a string representing the classification of this request. """ """ environ -> request classifier string This interface is responsible for returning a string value representing a request classification. o 'environ' is the WSGI environment. """ class IResponseClassifier(Interface): """ On egress: classify a response. This interface is responsible for returning a string representing a classification name based on introspection of the ingress classification, the WSGI environment (environ), the headers returned in the response (headers), or the exception raised by a downstream application. """ def __call__(environ, request_classification, headers, exception): """ Return a string representing the classification of this request. """ def __call__(environ, request_classification, status, headers): """ args -> response classifier string This interface is responsible for returning a string representing a response classification. o 'environ' is the WSGI environment. o 'request_classification' is the classification returned during ingress by the request classifier. o 'status' is the status written into start_response by the downstream application. o 'headers' is the headers tuple written into start_response by the downstream application. """ class IExtractorPlugin(Interface): @@ -48,7 +57,34 @@ o Only extraction plugins which match one of the the current request's classifications will be asked to perform extraction. """ class IPostExtractorPlugin(Interface): """ On ingress: allow the plugin to have a chance to influence the environment once credentials are established and return extra headers that will be set in the eventual response. Each post-extractor matching the request classification is called unconditionally after extraction. """ def post_extract(environ, credentials, extractor): """ args -> [ (header-name, header-value), ..] | None o 'environ' is the WSGI environment. o credentials are the credentials that were extracted by repoze.pam during the extraction step. o 'extractor' is the plugin instance that provided the credentials. If no plugin instance provided credentials to repoze.pam, this will be None. The return value should be a list of tuples, where each tuple is in the form (header-name, header-value), e.g. [ ('Set-Cookie', 'cookie_name=foo; Path=/') ] or None if no headers should be set. """ class IAuthenticatorPlugin(Interface): """ On ingress: Map credentials to a user ID. @@ -70,21 +106,29 @@ class IChallengerPlugin(Interface): """ On egress: Initiate a challenge to the user to provide credentials. """ On egress: Conditionally initiate a challenge to the user to provide credentials. Only challenge plugins which match one of the the current response's classifications will be asked to perform a challenge. """ def challenge(environ, status, headers): """ args -> WSGI application or None o 'environ' is the WSGI environment. o Only challenge plugins which match one of the the current request's classifications will be asked to perform a challenge. """ o 'status' is the status written into start_response by the downstream application. def challenge(environ, request_classifier, headers, exception): o 'headers' is the headers tuple written into start_response by the downstream application. """ Examine the values passed in and perform an arbitrary action (usually mutating environ or raising an exception) to cause a challenge to be raised to the user. The return value of this method is ignored. Examine the values passed in and return a WSGI application (a callable which accepts environ and start_response as its two positional arguments, ala PEP 333) which causes a challenge to be performed. Return None to forego performing a challenge. """ repoze/pam/middleware.py
@@ -1,89 +1,240 @@ import logging from StringIO import StringIO import sys from paste.httpheaders import REMOTE_USER from repoze.pam.interfaces import IAuthenticatorPlugin from repoze.pam.interfaces import IExtractorPlugin from repoze.pam.interfaces import IPostExtractorPlugin from repoze.pam.interfaces import IChallengerPlugin class StartResponseWrapper(object): def __init__(self, start_response, extra_headers): self.start_response = start_response self.extra_headers = extra_headers self.headers = [] self.buffer = StringIO() def wrap_start_response(self, status, headers, exc_info=None): self.headers = headers self.status = status return self.buffer.write def finish_response(self): headers = self.headers + self.extra_headers write = self.start_response(self.status, headers) if write: self.buffer.seek(0) write(self.buffer.getvalue()) if hasattr(write, 'close'): write.close() _STARTED = '-- repoze.pam request started --' _ENDED = '-- repoze.pam request ended --' class PluggableAuthenticationMiddleware(object): def __init__(self, app, registry, request_classifier, response_classifier, add_credentials=True): def __init__(self, app, registry, request_classifier, response_classifier, add_credentials=False, log_stream=None, log_level=logging.INFO): self.registry = registry self.app = app self.request_classifier = request_classifier self.response_classififer = response_classifier self.response_classifier = response_classifier self.add_credentials = add_credentials self.logger = None if log_stream: handler = logging.StreamHandler(log_stream) fmt = '%(asctime)s %(message)s' formatter = logging.Formatter(fmt) handler.setFormatter(formatter) self.logger = logging.Logger('repoze.pam') self.logger.addHandler(handler) self.logger.setLevel(log_level) def __call__(self, environ, start_response): request_classification = self.on_ingress(environ) return self.app(environ, start_response) # XXX on_egress logger = self.logger logger and logger.info(_STARTED) classification, extra_headers = self.modify_environment(environ) def on_ingress(self, environ): wrapper = StartResponseWrapper(start_response, extra_headers) app_iter = self.app(environ, wrapper.wrap_start_response) challenge_app = self.challenge( environ, classification, wrapper.status, wrapper.headers ) logger and logger.info('challenge app used: %s' % challenge_app) if challenge_app is not None: if hasattr(app_iter, 'close'): app_iter.close() logger and logger.info(_ENDED) return challenge_app(environ, start_response) else: wrapper.finish_response() logger and logger.info(_ENDED) return app_iter def modify_environment(self, environ): # happens on ingress classification = self.request_classifier(environ) credentials = self.extract(environ, classification) logger = self.logger logger and logger.info('request classification: %s' % classification) credentials, extractor = self.extract(environ, classification) headers = self.after_extract(environ, credentials, extractor, classification) userid = None if credentials: userid = self.authenticate(environ, credentials, classification) userid, authenticator = self.authenticate(environ, credentials, classification) else: logger and logger.info( 'no authenticator plugin used (no credentials)') if self.add_credentials: environ['repoze.pam.credentials'] = credentials if userid: remote_user_not_set = not REMOTE_USER(environ) if remote_user_not_set and userid: # only set REMOTE_USER if it's not yet set logger and logger.info('REMOTE_USER set to %s' % userid) environ['REMOTE_USER'] = userid else: logger and logger.info('REMOTE_USER not set') return classification return classification, headers def extract(self, environ, classification): # happens on ingress candidates = self.registry.get(IExtractorPlugin, ()) plugins = self._match_classification(candidates, classification, 'request_classifications') logger = self.logger logger and self.logger.info( 'extractor plugins consulted %s' % plugins) def on_egress(self, environ, request_classifier, headers, exception): self.challenge(environ, request_classifier, headers, exception) def extract(self, environ, classifier): extractor_candidates = self.registry.get(IExtractorPlugin) extractors = self._match_classifier(extractor_candidates, classifier) for extractor in extractors: creds = extractor.extract(environ) for plugin in plugins: creds = plugin.extract(environ) logger and logger.debug( 'credentials returned from extractor %s: %s' % (plugin, creds) ) if creds: # XXX PAS returns all credentials (it fully iterates over all # extraction plugins) return creds return {} logger and logger.info( 'using credentials returned from extractor %s' % plugin) return creds, plugin logger and logger.info('no extractor plugins found credentials') return {}, None def after_extract(self, environ, credentials, extractor, classification): candidates = self.registry.get(IPostExtractorPlugin, ()) plugins = self._match_classification(candidates, classification, 'request_classifications') logger = self.logger logger and logger.info( 'post-extractor plugins consulted %s' % plugins) extra_headers = {} for plugin in plugins: headers = plugin.post_extract(environ, credentials, extractor) logger and logger.debug( 'headers returned from post-extractor %s: %s' % (plugin, headers) ) if headers: extra_headers[plugin] = headers logger and logger.info('extra headers gathered: %s' % extra_headers) return flatten(extra_headers.values()) def authenticate(self, environ, credentials, classification): # on ingress userid = None auth_candidates = self.registry.get(IAuthenticatorPlugin) authenticators = self._match_classifier(auth_candidates, classification) for authenticator in authenticators: userid = authenticator.authenticate(environ, credentials) # happens on ingress candidates = self.registry.get(IAuthenticatorPlugin, ()) plugins = self._match_classification(candidates, classification, 'request_classifications') logger = self.logger logger and logger.info( 'authenticator plugins consulted %s' % plugins) for plugin in plugins: userid = plugin.authenticate(environ, credentials) logger and logger.info( 'userid returned from authenticator %s: %s' % (plugin, userid) ) if userid: # XXX PAS calls all authenticators (it fully iterates over all # authenticator plugins) break return userid logger and logger.info( 'using userid returned from authenticator %s' % plugin) return userid, plugin def challenge(self, environ, request_classification, headers, exception): # on egress classification = self.response_classifier(environ, request_classification, headers, exception) challenger_candidates = self.registry.get(IChallengerPlugin) challengers = self._match_classifier(challenger_candidates, classification) for challenger in challengers: new_headers, new_status = challengers.challenge(environ) logger and logger.info('no authenticator plugin authenticated a userid') return None, None def _match_classifier(self, plugins, classifier): def challenge(self, environ, request_classification, status, headers): # happens on egress classification = self.response_classifier( environ, request_classification, status, headers ) logger = self.logger logger and logger.info('response classification: %s' % classification) candidates = self.registry.get(IChallengerPlugin, ()) plugins = self._match_classification(candidates, classification, 'response_classifications') logger and logger.info('challenger plugins consulted: %s' % plugins) for plugin in plugins: app = plugin.challenge(environ, status, headers) logger and logger.debug('app returned from challenger %s: %s' % (plugin, app) ) if app is not None: # new WSGI application logger and logger.info( 'challenger plugin %s returned an app: %s' % (plugin, app)) return app logger and logger.info('no challenge app returned') # signifies no challenge return None def _match_classification(self, plugins, classification, attr): result = [] for plugin in plugins: plugin_classifiers = getattr(plugin, 'classifiers', None) if not plugin_classifiers: # good for any plugin_classifications = getattr(plugin, attr, None) if not plugin_classifications: # good for any result.append(plugin) continue if classifier in plugin_classifiers: if classification in plugin_classifications: result.append(plugin) return result def flatten(L): result = [] for seq in L: for item in seq: result.append(item) return result def make_middleware(app, global_conf, config_file=None): if config_file is None: @@ -91,23 +242,49 @@ return PluggableAuthenticationMiddleware(app) def make_test_middleware(app, global_conf): # no config file required # be able to test without a config file from repoze.pam.plugins.basicauth import BasicAuthPlugin from repoze.pam.plugins.htpasswd import HTPasswdPlugin from repoze.pam.plugins.cookie import InsecureCookiePlugin from repoze.pam.plugins.form import FormPlugin basicauth = BasicAuthPlugin('repoze.pam') basicauth.classifiers = set() # good for any any = set() # means good for any classification basicauth.request_classifications = any basicauth.response_classifications = any from StringIO import StringIO from repoze.pam.plugins.htpasswd import crypt_check io = StringIO('chrism:aajfMKNH1hTm2\n') io = StringIO() salt = 'aa' import crypt for name, password in [ ('admin', 'admin') ]: io.write('name:%s\n' % crypt.crypt(password, salt)) htpasswd = HTPasswdPlugin(io, crypt_check) htpasswd.classifiers = set() # good for any registry = make_registry((basicauth,), (htpasswd,), (basicauth,)) class DummyClassifier: def __call__(self, *arg, **kw): return None classifier = DummyClassifier() middleware = PluggableAuthenticationMiddleware(app, registry, classifier, classifier) htpasswd.request_classifications = any htpasswd.response_classifications = any cookie = InsecureCookiePlugin('oatmeal') cookie.request_classifications = any cookie.response_classifications = any form = FormPlugin('__do_login') # only do form extract/challenge for browser requests form.request_classifications = set(('browser',)) form.response_classifications = set(('browser',)) registry = make_registry( extractors = (cookie, basicauth, form), post_extractors = (cookie, basicauth), authenticators = (htpasswd,), challengers = (form, basicauth), ) from repoze.pam.classifiers import DefaultRequestClassifier from repoze.pam.classifiers import DefaultResponseClassifier request_classifier = DefaultRequestClassifier() response_classifier = DefaultResponseClassifier() middleware = PluggableAuthenticationMiddleware(app, registry, request_classifier, response_classifier, log_stream=sys.stdout, log_level = logging.DEBUG ) return middleware def verify(plugins, iface): @@ -115,10 +292,12 @@ for plugin in plugins: verifyObject(iface, plugin, tentative=True) def make_registry(extractors, authenticators, challengers): def make_registry(extractors, post_extractors, authenticators, challengers): registry = {} verify(extractors, IExtractorPlugin) registry[IExtractorPlugin] = extractors verify(post_extractors, IExtractorPlugin) registry[IPostExtractorPlugin] = post_extractors verify(authenticators, IAuthenticatorPlugin) registry[IAuthenticatorPlugin] = authenticators verify(challengers, IChallengerPlugin) repoze/pam/plugins/basicauth.py
@@ -8,43 +8,51 @@ from repoze.pam.interfaces import IChallengerPlugin from repoze.pam.interfaces import IExtractorPlugin from repoze.pam.interfaces import IPostExtractorPlugin class BasicAuthPlugin(object): implements(IChallengerPlugin, IExtractorPlugin) implements(IChallengerPlugin, IExtractorPlugin, IPostExtractorPlugin) def __init__(self, realm): self.realm = realm # IChallengerPlugin def challenge(self, environ, request_classifier, headers, exception): head = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm) raise HTTPUnauthorized(headers=head) def challenge(self, environ, status, headers): if status == '401 Unauthorized': headers = WWW_AUTHENTICATE.tuples('Basic realm="%s"' % self.realm) return HTTPUnauthorized(headers=headers) # IExtractorPlugin def extract(self, environ): authorization = AUTHORIZATION(environ) try: authmeth, auth = authorization.split(' ', 1) except ValueError: # not enough values to unpack except ValueError: # not enough values to unpack return {} if authmeth.lower() == 'basic': try: auth = auth.strip().decode('base64') except binascii.Error: # can't decode except binascii.Error: # can't decode return {} try: login, password = auth.split(':', 1) except ValueError: # not enough values to unpack except ValueError: # not enough values to unpack return {} return {'login':login, 'password':password} auth = {'login':login, 'password':password} return auth return {} # IPostExtractorPlugin def post_extract(self, environ, credentials, extractor): if credentials: if not AUTHORIZATION(environ): auth = '%(login)s:%(password)s' % credentials auth = auth.encode('base64').rstrip() header = 'Basic %s' % auth environ['HTTP_AUTHORIZATION'] = header def make_plugin(pam_conf, realm='basic'): plugin = BasicAuthPlugin(realm) return plugin repoze/pam/plugins/cookie.py
New file @@ -0,0 +1,57 @@ import binascii from paste.request import get_cookies from zope.interface import implements from repoze.pam.interfaces import IExtractorPlugin from repoze.pam.interfaces import IPostExtractorPlugin class InsecureCookiePlugin(object): implements(IExtractorPlugin, IPostExtractorPlugin) def __init__(self, cookie_name): self.cookie_name = cookie_name # IExtractorPlugin def extract(self, environ): cookies = get_cookies(environ) cookie = cookies.get(self.cookie_name) if cookie is None: return {} try: auth = cookie.value.decode('base64') except binascii.Error: # can't decode return {} try: login, password = auth.split(':', 1) return {'login':login, 'password':password} except ValueError: # not enough values to unpack return {} # IPostExtractorPlugin def post_extract(self, environ, credentials, extractor): if credentials: cookie_value = '%(login)s:%(password)s' % credentials cookie_value = cookie_value.encode('base64').rstrip() cookies = get_cookies(environ) existing = cookies.get(self.cookie_name) value = getattr(existing, 'value', None) if value != cookie_value: # go ahead and set it in the environment for downstream # apps to consume (XXX?) cookies[self.cookie_name] = cookie_value output = cookies.output(header='', sep='').lstrip() environ['HTTP_COOKIE'] = output # return a Set-Cookie header set_cookie = '%s=%s; Path=/;' % (self.cookie_name, cookie_value) return [('Set-Cookie', set_cookie)] def make_plugin(pam_conf, cookie_name='repoze.pam.plugins.cookie'): plugin = InsecureCookiePlugin(cookie_name) return plugin repoze/pam/plugins/form.py
New file @@ -0,0 +1,90 @@ from paste.httpheaders import CONTENT_LENGTH from paste.httpheaders import CONTENT_TYPE from paste.request import parse_dict_querystring from paste.request import parse_formvars from zope.interface import implements from repoze.pam.interfaces import IChallengerPlugin from repoze.pam.interfaces import IExtractorPlugin _DEFAULT_FORM = """ <html> <head> <title>Login Form</title> </head> <body> <div> <b>Log In</b> </div> <br/> <form method="POST" action="?__do_login=true"> <table border="0"> <tr> <td>User Name</td> <td><input type="text" name="login"></input></td> </tr> <tr> <td>Password</td> <td><input type="password" name="password"></input></td> </tr> <tr> <td></td> <td><input type="submit" name="submit" value="Log In"/></td> </tr> </table> </form> <pre> %s </pre> </body> </html> """ def auth_form(environ, start_response): import pprint form = _DEFAULT_FORM % pprint.pformat(environ) content_length = CONTENT_LENGTH.tuples(str(len(form))) content_type = CONTENT_TYPE.tuples('text/html') headers = content_length + content_type start_response('200 OK', headers) return [form] class FormPlugin(object): implements(IChallengerPlugin, IExtractorPlugin) def __init__(self, login_form_qs): self.login_form_qs = login_form_qs # IExtractorPlugin def extract(self, environ): query = parse_dict_querystring(environ) # If the extractor finds a special query string on any request, # it will attempt to find the values in the input body. if query.get(self.login_form_qs): form = parse_formvars(environ) from StringIO import StringIO # XXX we need to replace wsgi.input because we've read it # this smells funny environ['wsgi.input'] = StringIO() form.update(query) try: login = form['login'] password = form['password'] except KeyError: return {} return {'login':login, 'password':password} return {} # IChallengerPlugin def challenge(self, environ, status, headers): if status == '401 Unauthorized': return auth_form def make_plugin(pam_conf, login_form_qs='__do_login'): plugin = FormPlugin(login_form_qs) return plugin repoze/pam/tests.py
@@ -44,31 +44,37 @@ def test_extract_success(self): environ = self._makeEnviron() mw = self._makeOne() creds = mw.extract(environ, None) creds, plugin = mw.extract(environ, None) self.assertEqual(creds['login'], 'chris') self.assertEqual(creds['password'], 'password') self.failIf(plugin is None) def test_extract_fail(self): environ = self._makeEnviron() from repoze.pam.interfaces import IExtractorPlugin extractor = DummyNoResultsExtractor() registry = { IExtractorPlugin:[DummyNoResultsExtractor()] IExtractorPlugin:[extractor] } mw = self._makeOne(registry=registry) creds = mw.extract(environ, None) creds, plugin = mw.extract(environ, None) self.assertEqual(creds, {}) self.assertEqual(plugin, None) def test_extract_success_skip_noresults(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IExtractorPlugin extractor1 = DummyNoResultsExtractor() extractor2 = DummyExtractor() registry = { IExtractorPlugin:[DummyNoResultsExtractor(), DummyExtractor()] IExtractorPlugin:[extractor1, extractor2] } mw = self._makeOne(registry=registry) creds = mw.extract(environ, None) creds, plugin = mw.extract(environ, None) self.assertEqual(creds['login'], 'chris') self.assertEqual(creds['password'], 'password') self.assertEqual(plugin, extractor2) def test_extract_success_firstwins(self): environ = self._makeEnviron() @@ -80,46 +86,49 @@ IExtractorPlugin:[extractor1, extractor2] } mw = self._makeOne(registry=registry) creds = mw.extract(environ, None) creds, plugin = mw.extract(environ, None) self.assertEqual(creds['login'], 'fred') self.assertEqual(creds['password'], 'fred') self.assertEqual(plugin, extractor1) def test_extract_find_implicit_classifier(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IExtractorPlugin extractor1 = DummyExtractor({'login':'fred','password':'fred'}) extractor1.classifiers = set(['nomatch']) extractor1.request_classifications = set(['nomatch']) extractor2 = DummyExtractor({'login':'bob','password':'bob'}) registry = { IExtractorPlugin:[extractor1, extractor2] } mw = self._makeOne(registry=registry) creds = mw.extract(environ, None) creds, plugin = mw.extract(environ, None) self.assertEqual(creds['login'], 'bob') self.assertEqual(creds['password'], 'bob') self.assertEqual(plugin, extractor2) def test_extract_find_explicit_classifier(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IExtractorPlugin extractor1 = DummyExtractor({'login':'fred','password':'fred'}) extractor1.classifiers = set(['nomatch']) extractor1.request_classifications = set(['nomatch']) extractor2 = DummyExtractor({'login':'bob','password':'bob'}) extractor2.classifiers = set(['match']) extractor2.request_classifications = set(['match']) registry = { IExtractorPlugin:[extractor1, extractor2] } mw = self._makeOne(registry=registry) creds = mw.extract(environ, 'match') creds, plugin = mw.extract(environ, 'match') self.assertEqual(creds['login'], 'bob') self.assertEqual(creds['password'], 'bob') self.assertEqual(plugin, extractor2) def test_authenticate_success(self): environ = self._makeEnviron() mw = self._makeOne() creds = {'login':'chris', 'password':'password'} userid = mw.authenticate(environ, creds, None) userid, plugin = mw.authenticate(environ, creds, None) self.assertEqual(userid, 'chris') def test_authenticate_fail(self): @@ -131,95 +140,132 @@ IAuthenticatorPlugin:[DummyFailAuthenticator()] } mw = self._makeOne(registry=registry) userid = mw.authenticate(environ, creds, None) userid, plugin = mw.authenticate(environ, creds, None) self.assertEqual(userid, None) self.assertEqual(plugin, None) def test_authenticate_success_skip_fail(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IAuthenticatorPlugin plugin1 = DummyFailAuthenticator() plugin2 = DummyAuthenticator() registry = { IAuthenticatorPlugin:[DummyFailAuthenticator(),DummyAuthenticator()] IAuthenticatorPlugin:[plugin1, plugin2] } mw = self._makeOne(registry=registry) creds = {'login':'chris', 'password':'password'} userid = mw.authenticate(environ, creds, None) userid, plugin = mw.authenticate(environ, creds, None) self.assertEqual(userid, 'chris') self.assertEqual(plugin, plugin2) def test_authenticate_success_firstwins(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IAuthenticatorPlugin plugin1 = DummyAuthenticator('chris_id1') plugin2 = DummyAuthenticator('chris_id2') registry = { IAuthenticatorPlugin:[DummyAuthenticator('chris_id1'), DummyAuthenticator('chris_id2')] IAuthenticatorPlugin:[plugin1, plugin2] } mw = self._makeOne(registry=registry) creds = {'login':'chris', 'password':'password'} userid = mw.authenticate(environ, creds, None) userid, plugin = mw.authenticate(environ, creds, None) self.assertEqual(userid, 'chris_id1') self.assertEqual(plugin, plugin1) def test_authenticate_find_implicit_classifier(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IAuthenticatorPlugin plugin1 = DummyAuthenticator('chris_id1') plugin1.classifiers = set(['nomatch']) plugin1.request_classifications = set(['nomatch']) plugin2 = DummyAuthenticator('chris_id2') registry = { IAuthenticatorPlugin:[plugin1, plugin2] } mw = self._makeOne(registry=registry) creds = {'login':'chris', 'password':'password'} userid = mw.authenticate(environ, creds, None) userid, plugin = mw.authenticate(environ, creds, None) self.assertEqual(userid, 'chris_id2') self.assertEqual(plugin, plugin2) def test_authenticate_find_explicit_classifier(self): environ = self._makeEnviron() mw = self._makeOne() from repoze.pam.interfaces import IAuthenticatorPlugin plugin1 = DummyAuthenticator('chris_id1') plugin1.classifiers = set(['nomatch']) plugin1.request_classifications = set(['nomatch']) plugin2 = DummyAuthenticator('chris_id2') plugin2.classifiers = set(['match']) plugin2.request_classifications = set(['match']) registry = { IAuthenticatorPlugin:[plugin1, plugin2] } mw = self._makeOne(registry=registry) creds = {'login':'chris', 'password':'password'} userid = mw.authenticate(environ, creds, 'match') userid, plugin = mw.authenticate(environ, creds, 'match') self.assertEqual(userid, 'chris_id2') self.assertEqual(plugin, plugin2) def test_on_ingress_success_addcredentials(self): def test_modify_environment_success_addcredentials(self): environ = self._makeEnviron() mw = self._makeOne() classification = mw.on_ingress(environ) classification, headers = mw.modify_environment(environ) self.assertEqual(classification, 'browser') self.assertEqual(environ['REMOTE_USER'], 'chris') self.assertEqual(environ['repoze.pam.credentials'], {'login':'chris','password':'password'}) self.assertEqual(headers, []) def test_on_ingress_success_noaddcredentials(self): def test_modify_environment_noaddcredentials(self): environ = self._makeEnviron() mw = self._makeOne() mw.add_credentials = False classification = mw.on_ingress(environ) classification, headers = mw.modify_environment(environ) self.assertEqual(classification, 'browser') self.assertEqual(environ['REMOTE_USER'], 'chris') self.failIf(environ.has_key('repoze.pam.credentials')) self.assertEqual(headers, []) def test_on_ingress_nocredentials(self): def test_modify_environment_nocredentials(self): environ = self._makeEnviron() from repoze.pam.interfaces import IExtractorPlugin registry = { IExtractorPlugin:[DummyNoResultsExtractor()], } mw = self._makeOne(registry=registry) classification = mw.on_ingress(environ) classification, headers = mw.modify_environment(environ) self.assertEqual(classification, 'browser') self.assertEqual(environ.get('REMOTE_USER'), None) self.assertEqual(environ['repoze.pam.credentials'], {}) self.assertEqual(headers, []) def test_modify_environment_remoteuser_already_set(self): environ = self._makeEnviron({'REMOTE_USER':'admin'}) mw = self._makeOne() classification, headers = mw.modify_environment(environ) self.assertEqual(classification, 'browser') self.assertEqual(environ.get('REMOTE_USER'), 'admin') self.assertEqual(environ['repoze.pam.credentials'], {'login':'chris', 'password':'password'}) self.assertEqual(headers, []) def test_modify_environment_with_postextractor(self): environ = self._makeEnviron({'REMOTE_USER':'admin'}) from repoze.pam.interfaces import IExtractorPlugin from repoze.pam.interfaces import IPostExtractorPlugin registry = { IExtractorPlugin:[DummyExtractor()], IPostExtractorPlugin:[DummyPostExtractor()], } mw = self._makeOne(registry=registry) classification, headers = mw.modify_environment(environ) self.assertEqual(classification, 'browser') self.assertEqual(environ.get('REMOTE_USER'), 'admin') self.assertEqual(environ['repoze.pam.credentials'], {'login':'chris', 'password':'password'}) self.assertEqual(headers, [('foo', 'bar')]) class TestBasicAuthPlugin(Base): def _getTargetClass(self): from repoze.pam.plugins.basicauth import BasicAuthPlugin @@ -237,56 +283,87 @@ verifyClass(IChallengerPlugin, klass) verifyClass(IExtractorPlugin, klass) def test_challenge(self): def test_challenge_non401(self): plugin = self._makeOne('realm') environ = self._makeEnviron() from paste.httpexceptions import HTTPUnauthorized self.assertRaises(HTTPUnauthorized, plugin.challenge, environ, None, None, None) result = plugin.challenge(environ, '200 OK', {}) self.assertEqual(result, None) def test_challenge_401(self): plugin = self._makeOne('realm') environ = self._makeEnviron() result = plugin.challenge(environ, '401 Unauthorized', {}) self.assertNotEqual(result, None) app_iter = result(environ, lambda *arg: None) items = [] for item in app_iter: items.append(item) response = ''.join(items) self.failUnless(response.startswith('401 Unauthorized')) def test_extract_noauthinfo(self): plugin = self._makeOne('realm') environ = self._makeEnviron() result = plugin.extract(environ) self.assertEqual(result, {}) creds = plugin.extract(environ) self.assertEqual(creds, {}) def test_extract_nonbasic(self): plugin = self._makeOne('realm') environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'}) result = plugin.extract(environ) self.assertEqual(result, {}) def test_extract_nonbasic(self): plugin = self._makeOne('realm') environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'}) result = plugin.extract(environ) self.assertEqual(result, {}) creds = plugin.extract(environ) self.assertEqual(creds, {}) def test_extract_basic_badencoding(self): plugin = self._makeOne('realm') environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'}) result = plugin.extract(environ) self.assertEqual(result, {}) creds = plugin.extract(environ) self.assertEqual(creds, {}) def test_extract_basic_badrepr(self): plugin = self._makeOne('realm') value = 'foo'.encode('base64') environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value}) result = plugin.extract(environ) self.assertEqual(result, {}) creds = plugin.extract(environ) self.assertEqual(creds, {}) def test_extract_basic_ok(self): plugin = self._makeOne('realm') value = 'foo:bar'.encode('base64') environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value}) result = plugin.extract(environ) self.assertEqual(result, {'login':'foo', 'password':'bar'}) creds = plugin.extract(environ) self.assertEqual(creds, {'login':'foo', 'password':'bar'}) def test_post_extract_nocreds(self): plugin = self._makeOne('realm') creds = {} environ = self._makeEnviron() result = plugin.post_extract(environ, creds, plugin) self.assertEqual(result, None) self.assertEqual(environ.get('HTTP_AUTHORIZATION'), None) def test_post_extract_creds_withauthorization(self): plugin = self._makeOne('realm') creds = {'login':'foo', 'password':'password'} environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic foo'}) result = plugin.post_extract(environ, creds, plugin) self.assertEqual(result, None) self.assertEqual(environ['HTTP_AUTHORIZATION'], 'Basic foo') def test_post_extract_creds_mutates(self): plugin = self._makeOne('realm') creds = {'login':'foo', 'password':'password'} environ = self._makeEnviron() result = plugin.post_extract(environ, creds, plugin) self.assertEqual(result, None) auth = 'foo:password'.encode('base64').rstrip() auth = 'Basic ' + auth self.assertEqual(environ['HTTP_AUTHORIZATION'], auth) def test_factory(self): from repoze.pam.plugins.basicauth import make_plugin plugin = make_plugin({}, 'realm') self.assertEqual(plugin.realm, 'realm') class TestHTPasswdPlugin(Base): def _getTargetClass(self): from repoze.pam.plugins.htpasswd import HTPasswdPlugin @@ -376,7 +453,84 @@ 'repoze.pam.plugins.htpasswd:crypt_check') self.assertEqual(plugin.filename, 'foo') self.assertEqual(plugin.check, crypt_check) class TestInsecureCookiePlugin(Base): def _getTargetClass(self): from repoze.pam.plugins.cookie import InsecureCookiePlugin return InsecureCookiePlugin def _makeOne(self, *arg, **kw): plugin = self._getTargetClass()(*arg, **kw) return plugin def test_implements(self): from zope.interface.verify import verifyClass from repoze.pam.interfaces import IExtractorPlugin from repoze.pam.interfaces import IPostExtractorPlugin klass = self._getTargetClass() verifyClass(IExtractorPlugin, klass) verifyClass(IPostExtractorPlugin, klass) def test_extract_nocookies(self): plugin = self._makeOne('oatmeal') environ = self._makeEnviron() result = plugin.extract(environ) self.assertEqual(result, {}) def test_extract_badcookies(self): plugin = self._makeOne('oatmeal') environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'}) result = plugin.extract(environ) self.assertEqual(result, {}) def test_extract_badcookies(self): plugin = self._makeOne('oatmeal') environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'}) result = plugin.extract(environ) self.assertEqual(result, {}) def test_extract_success(self): plugin = self._makeOne('oatmeal') auth = 'foo:password'.encode('base64').rstrip() environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth}) result = plugin.extract(environ) self.assertEqual(result, {'login':'foo', 'password':'password'}) def test_post_extract_nocreds(self): plugin = self._makeOne('oatmeal') creds = {} environ = self._makeEnviron() result = plugin.post_extract(environ, creds, plugin) self.assertEqual(result, None) self.assertEqual(environ.get('HTTP_COOKIE'), None) def test_post_extract_creds_same(self): plugin = self._makeOne('oatmeal') creds = {'login':'foo', 'password':'password'} auth = 'foo:password'.encode('base64').rstrip() auth = 'oatmeal=%s;' % auth environ = self._makeEnviron({'HTTP_COOKIE':auth}) result = plugin.post_extract(environ, creds, plugin) self.assertEqual(result, None) self.assertEqual(environ.get('HTTP_COOKIE'), auth) def test_post_extract_creds_different(self): plugin = self._makeOne('oatmeal') creds = {'login':'bar', 'password':'password'} auth = 'foo:password'.encode('base64').rstrip() creds_auth = 'bar:password'.encode('base64').rstrip() environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth}) result = plugin.post_extract(environ, creds, plugin) expected = 'oatmeal=%s; Path=/;' % creds_auth self.assertEqual(result, [('Set-Cookie', expected)]) self.assertEqual(environ['HTTP_COOKIE'], 'oatmeal=%s;' % creds_auth) def test_factory(self): from repoze.pam.plugins.cookie import make_plugin plugin = make_plugin(None, 'foo') self.assertEqual(plugin.cookie_name, 'foo') class TestDefaultRequestClassifier(Base): def _getTargetClass(self): @@ -477,5 +631,10 @@ return None class DummyChallenger: def challenge(self, environ, request_classifier, headers, exception): def challenge(self, environ, status, headers): environ['challenged'] = True class DummyPostExtractor: def post_extract(self, environ, credentials, extractor): return [ ('foo', 'bar') ]