import logging import sys from repoze.who.api import APIFactory from repoze.who.interfaces import IChallenger from repoze.who._compat import StringIO _STARTED = '-- repoze.who request started (%s) --' _ENDED = '-- repoze.who request ended (%s) --' class PluggableAuthenticationMiddleware(object): def __init__(self, app, identifiers, authenticators, challengers, mdproviders, request_classifier = None, challenge_decider = None, log_stream = None, log_level = logging.INFO, remote_user_key = 'REMOTE_USER', classifier = None ): if challenge_decider is None: raise ValueError('challenge_decider is required') if request_classifier is not None and classifier is not None: raise ValueError( 'Only one of request_classifier and classifier is allowed') if request_classifier is None: if classifier is None: raise ValueError( 'Either request_classifier or classifier is required') request_classifier = classifier self.app = app logger = self.logger = None if isinstance(log_stream, logging.Logger): logger = self.logger = log_stream elif log_stream: handler = logging.StreamHandler(log_stream) fmt = '%(asctime)s %(message)s' formatter = logging.Formatter(fmt) handler.setFormatter(formatter) logger = self.logger = logging.Logger('repoze.who') logger.addHandler(handler) logger.setLevel(log_level) self.remote_user_key = remote_user_key self.api_factory = APIFactory(identifiers, authenticators, challengers, mdproviders, request_classifier, challenge_decider, remote_user_key, logger ) def __call__(self, environ, start_response): if self.remote_user_key in environ: # act as a pass through if REMOTE_USER (or whatever) is # already set return self.app(environ, start_response) api = self.api_factory(environ) environ['repoze.who.plugins'] = api.name_registry # BBB? environ['repoze.who.logger'] = self.logger environ['repoze.who.application'] = self.app logger = self.logger path_info = environ.get('PATH_INFO', None) logger and logger.info(_STARTED % path_info) api.authenticate() # identity saved in environ # allow identifier plugins to replace the downstream # application (to do redirection and unauthorized themselves # mostly) app = environ.pop('repoze.who.application') if app is not self.app: logger and logger.info( 'static downstream application replaced with %s' % app) wrapper = StartResponseWrapper(start_response) app_iter = app(environ, wrapper.wrap_start_response) # The challenge decider almost(?) always needs information from the # response. The WSGI spec (PEP 333) states that a WSGI application # must call start_response by the iterable's first iteration. If # start_response hasn't been called, we'll wrap it in a way that # triggers that call. if not wrapper.called: app_iter = wrap_generator(app_iter) if api.challenge_decider(environ, wrapper.status, wrapper.headers): logger and logger.info('challenge required') close = getattr(app_iter, 'close', _no_op) challenge_app = api.challenge(wrapper.status, wrapper.headers) if challenge_app is not None: logger and logger.info('executing challenge app') if app_iter: list(app_iter) # unwind the original app iterator # PEP 333 requires that we call the original iterator's # 'close' method, if it exists, before releasing it. close() # replace the downstream app with the challenge app app_iter = challenge_app(environ, start_response) else: logger and logger.info('configuration error: no challengers') close() raise RuntimeError('no challengers found') else: logger and logger.info('no challenge required') remember_headers = api.remember() wrapper.finish_response(remember_headers) logger and logger.info(_ENDED % path_info) return app_iter def _no_op(): pass def wrap_generator(result): """\ This function returns a generator that behaves exactly the same as the original. It's only difference is it pulls the first iteration off and caches it to trigger any immediate side effects (in a WSGI world, this ensures start_response is called). """ # PEP 333 requires that we call the original iterator's # 'close' method, if it exists, before releasing it. close = getattr(result, 'close', lambda: None) # Neat trick to pull the first iteration only. We need to do this outside # of the generator function to ensure it is called. first = marker = [] for iter in result: first = iter break # Wrapper yields the first iteration, then passes result's iterations # directly up. def wrapper(): if first is not marker: yield first for iter in result: # We'll let result's StopIteration bubble up directly. yield iter close() return wrapper() class StartResponseWrapper(object): def __init__(self, start_response): self.start_response = start_response self.status = None self.headers = [] self.exc_info = None self.buffer = StringIO() # A WSGI app may delay calling start_response until the first iteration # of its generator. We track this so we know whether or not we need to # trigger an iteration before examining the response. self.called = False def wrap_start_response(self, status, headers, exc_info=None): self.headers = headers self.status = status self.exc_info = exc_info # The response has been initiated, so we have a valid code. self.called = True return self.buffer.write def finish_response(self, extra_headers): if not extra_headers: extra_headers = [] headers = self.headers + extra_headers write = self.start_response(self.status, headers, self.exc_info) if write: self.buffer.seek(0) value = self.buffer.getvalue() if value: write(value) if hasattr(write, 'close'): write.close() def make_test_middleware(app, global_conf): """ Functionally equivalent to [plugin:redirector] use = repoze.who.plugins.redirector.RedirectorPlugin login_url = /login.html [plugin:auth_tkt] use = repoze.who.plugins.auth_tkt:AuthTktCookiePlugin secret = SEEKRIT cookie_name = oatmeal [plugin:basicauth] use = repoze.who.plugins.basicauth.BasicAuthPlugin realm = repoze.who [plugin:htpasswd] use = repoze.who.plugins.htpasswd.HTPasswdPlugin filename = <...> check_fn = repoze.who.plugins.htpasswd:crypt_check [general] request_classifier = repoze.who.classifiers:default_request_classifier challenge_decider = repoze.who.classifiers:default_challenge_decider [identifiers] plugins = authtkt basicauth [authenticators] plugins = authtkt htpasswd [challengers] plugins = redirector:browser basicauth """ # be able to test without a config file from repoze.who.plugins.basicauth import BasicAuthPlugin from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin from repoze.who.plugins.redirector import RedirectorPlugin from repoze.who.plugins.htpasswd import HTPasswdPlugin io = StringIO() for name, password in [ ('admin', 'admin'), ('chris', 'chris') ]: io.write('%s:%s\n' % (name, password)) io.seek(0) def cleartext_check(password, hashed): return password == hashed #pragma NO COVERAGE htpasswd = HTPasswdPlugin(io, cleartext_check) basicauth = BasicAuthPlugin('repoze.who') auth_tkt = AuthTktCookiePlugin('secret', 'auth_tkt') redirector = RedirectorPlugin('/login.html') redirector.classifications = {IChallenger: ['browser']} # only for browser identifiers = [('auth_tkt', auth_tkt), ('basicauth', basicauth), ] authenticators = [('htpasswd', htpasswd)] challengers = [('redirector', redirector), ('basicauth', basicauth)] mdproviders = [] from repoze.who.classifiers import default_request_classifier from repoze.who.classifiers import default_challenge_decider log_stream = None import os if os.environ.get('WHO_LOG'): log_stream = sys.stdout middleware = PluggableAuthenticationMiddleware( app, identifiers, authenticators, challengers, mdproviders, default_request_classifier, default_challenge_decider, log_stream = log_stream, log_level = logging.DEBUG ) return middleware