""" Configuration parser """ import logging from pkg_resources import EntryPoint import sys import warnings from repoze.who.api import APIFactory from repoze.who.interfaces import IAuthenticator from repoze.who.interfaces import IChallengeDecider from repoze.who.interfaces import IChallenger from repoze.who.interfaces import IIdentifier from repoze.who.interfaces import IMetadataProvider from repoze.who.interfaces import IPlugin from repoze.who.interfaces import IRequestClassifier from repoze.who.middleware import PluggableAuthenticationMiddleware from repoze.who._compat import StringIO from repoze.who._compat import ConfigParser from repoze.who._compat import ParsingError def _resolve(name): if name: return EntryPoint.parse('x=%s' % name).load(False) class WhoConfig: def __init__(self, here): self.here = here self.request_classifier = None self.challenge_decider = None self.plugins = {} self.identifiers = [] self.authenticators = [] self.challengers = [] self.mdproviders = [] self.remote_user_key = 'REMOTE_USER' def _makePlugin(self, name, iface, options=None): if options is None: options = {} obj = _resolve(name) if not iface.providedBy(obj): obj = obj(**options) return obj def _getPlugin(self, name, iface): obj = self.plugins.get(name) if obj is None: obj = self._makePlugin(name, iface) return obj def _parsePluginSequence(self, attr, proptext, iface): lines = proptext.split() for line in lines: if ';' in line: plugin_name, classifier = line.split(';') else: plugin_name = line classifier = None plugin = self._getPlugin(plugin_name, iface) if classifier is not None: classifications = getattr(plugin, 'classifications', None) if classifications is None: classifications = plugin.classifications = {} classifications[iface] = classifier attr.append((plugin_name, plugin)) def parse(self, text): if getattr(text, 'readline', None) is None: text = StringIO(text) cp = ConfigParser(defaults={'here': self.here}) try: cp.read_file(text) except AttributeError: #pragma NO COVER Python < 3.0 cp.readfp(text) for s_id in [x for x in cp.sections() if x.startswith('plugin:')]: plugin_id = s_id[len('plugin:'):] options = dict(cp.items(s_id)) if 'use' in options: name = options.pop('use') del options['here'] obj = self._makePlugin(name, IPlugin, options) self.plugins[plugin_id] = obj if 'general' in cp.sections(): general = dict(cp.items('general')) rc = general.get('request_classifier') if rc is not None: rc = self._getPlugin(rc, IRequestClassifier) self.request_classifier = rc cd = general.get('challenge_decider') if cd is not None: cd = self._getPlugin(cd, IChallengeDecider) self.challenge_decider = cd ru = general.get('remote_user_key') if ru is not None: self.remote_user_key = ru if 'identifiers' in cp.sections(): identifiers = dict(cp.items('identifiers')) self._parsePluginSequence(self.identifiers, identifiers['plugins'], IIdentifier, ) if 'authenticators' in cp.sections(): authenticators = dict(cp.items('authenticators')) self._parsePluginSequence(self.authenticators, authenticators['plugins'], IAuthenticator, ) if 'challengers' in cp.sections(): challengers = dict(cp.items('challengers')) self._parsePluginSequence(self.challengers, challengers['plugins'], IChallenger, ) if 'mdproviders' in cp.sections(): mdproviders = dict(cp.items('mdproviders')) self._parsePluginSequence(self.mdproviders, mdproviders['plugins'], IMetadataProvider, ) class NullHandler(logging.Handler): def emit(self, record): pass _LEVELS = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, } def make_api_factory_with_config(global_conf, config_file, remote_user_key = 'REMOTE_USER', logger=None, ): identifiers = authenticators = challengers = mdproviders = () request_classifier = None challenge_decider = None parser = WhoConfig(global_conf['here']) try: opened = open(config_file) except IOError: warnings.warn('Non-existent who config file: %s' % config_file, stacklevel=2) else: try: try: parser.parse(opened) except ParsingError: warnings.warn('Invalid who config file: %s' % config_file, stacklevel=2) else: identifiers = parser.identifiers authenticators = parser.authenticators challengers = parser.challengers mdproviders = parser.mdproviders request_classifier = parser.request_classifier challenge_decider = parser.challenge_decider finally: opened.close() return APIFactory(identifiers, authenticators, challengers, mdproviders, request_classifier, challenge_decider, remote_user_key, logger, ) def make_middleware_with_config(app, global_conf, config_file, log_file=None, log_level=None): parser = WhoConfig(global_conf['here']) with open(config_file) as f: parser.parse(f) log_stream = None if log_level is None: log_level = logging.INFO elif not isinstance(log_level, int): log_level = _LEVELS[log_level.lower()] if log_file is not None: if log_file.lower() == 'stdout': log_stream = sys.stdout else: log_stream = open(log_file, 'wb') else: log_stream = logging.getLogger('repoze.who') log_stream.addHandler(NullHandler()) log_stream.setLevel(log_level or 0) return PluggableAuthenticationMiddleware( app, parser.identifiers, parser.authenticators, parser.challengers, parser.mdproviders, parser.request_classifier, parser.challenge_decider, log_stream, log_level, parser.remote_user_key, )