""" Configuration parser """ import pdb; pdb.set_trace() try: from ConfigParser import ConfigParser from ConfigParser import ParsingError except ImportError: #pragma NO COVER Python >= 3.0 from configparser import ConfigParser from configparser import ParsingError try: from StringIO import StringIO except ImportError: #pragma NO COVER Python >= 3.0 from io import StringIO import logging from pkg_resources import EntryPoint import sys import warnings from repoze.who.api import APIFactory from repoze.who.interfaces import IAuthenticator from repoze.who.interfaces import IChallengeDecider from repoze.who.interfaces import IChallenger from repoze.who.interfaces import IIdentifier from repoze.who.interfaces import IMetadataProvider from repoze.who.interfaces import IPlugin from repoze.who.interfaces import IRequestClassifier from repoze.who.middleware import PluggableAuthenticationMiddleware def _resolve(name): if name: return EntryPoint.parse('x=%s' % name).load(False) class WhoConfig: def __init__(self, here): self.here = here self.request_classifier = None self.challenge_decider = None self.plugins = {} self.identifiers = [] self.authenticators = [] self.challengers = [] self.mdproviders = [] self.remote_user_key = 'REMOTE_USER' def _makePlugin(self, name, iface, options=None): if options is None: options = {} obj = _resolve(name) if not iface.providedBy(obj): obj = obj(**options) return obj def _getPlugin(self, name, iface): obj = self.plugins.get(name) if obj is None: obj = self._makePlugin(name, iface) return obj def _parsePluginSequence(self, attr, proptext, iface): lines = proptext.split() for line in lines: if ';' in line: plugin_name, classifier = line.split(';') else: plugin_name = line classifier = None plugin = self._getPlugin(plugin_name, iface) if classifier is not None: classifications = getattr(plugin, 'classifications', None) if classifications is None: classifications = plugin.classifications = {} classifications[iface] = classifier attr.append((plugin_name, plugin)) def parse(self, text): if getattr(text, 'readline', None) is None: text = StringIO(text) cp = ConfigParser(defaults={'here': self.here}) cp.readfp(text) for s_id in [x for x in cp.sections() if x.startswith('plugin:')]: plugin_id = s_id[len('plugin:'):] options = dict(cp.items(s_id)) if 'use' in options: name = options.pop('use') del options['here'] obj = self._makePlugin(name, IPlugin, options) self.plugins[plugin_id] = obj if 'general' in cp.sections(): general = dict(cp.items('general')) rc = general.get('request_classifier') if rc is not None: rc = self._getPlugin(rc, IRequestClassifier) self.request_classifier = rc cd = general.get('challenge_decider') if cd is not None: cd = self._getPlugin(cd, IChallengeDecider) self.challenge_decider = cd ru = general.get('remote_user_key') if ru is not None: self.remote_user_key = ru if 'identifiers' in cp.sections(): identifiers = dict(cp.items('identifiers')) self._parsePluginSequence(self.identifiers, identifiers['plugins'], IIdentifier, ) if 'authenticators' in cp.sections(): authenticators = dict(cp.items('authenticators')) self._parsePluginSequence(self.authenticators, authenticators['plugins'], IAuthenticator, ) if 'challengers' in cp.sections(): challengers = dict(cp.items('challengers')) self._parsePluginSequence(self.challengers, challengers['plugins'], IChallenger, ) if 'mdproviders' in cp.sections(): mdproviders = dict(cp.items('mdproviders')) self._parsePluginSequence(self.mdproviders, mdproviders['plugins'], IMetadataProvider, ) class NullHandler(logging.Handler): def emit(self, record): pass _LEVELS = {'debug': logging.DEBUG, 'info': logging.INFO, 'warning': logging.WARNING, 'error': logging.ERROR, } def make_api_factory_with_config(global_conf, config_file, remote_user_key = 'REMOTE_USER', logger=None, ): identifiers = authenticators = challengers = mdproviders = () request_classifier = None challenge_decider = None parser = WhoConfig(global_conf['here']) try: opened = open(config_file) except IOError: warnings.warn('Non-existent who config file: %s' % config_file, stacklevel=2) else: try: parser.parse(opened) except ParsingError: warnings.warn('Invalid who config file: %s' % config_file, stacklevel=2) else: identifiers = parser.identifiers authenticators = parser.authenticators challengers = parser.challengers mdproviders = parser.mdproviders request_classifier = parser.request_classifier challenge_decider = parser.challenge_decider return APIFactory(identifiers, authenticators, challengers, mdproviders, request_classifier, challenge_decider, remote_user_key, logger, ) def make_middleware_with_config(app, global_conf, config_file, log_file=None, log_level=None): parser = WhoConfig(global_conf['here']) parser.parse(open(config_file)) log_stream = None if log_file is not None: if log_file.lower() == 'stdout': log_stream = sys.stdout else: log_stream = open(log_file, 'wb') else: log_stream = logging.getLogger('repoze.who') log_stream.addHandler(NullHandler()) if log_level is None: log_level = logging.INFO else: log_level = _LEVELS[log_level.lower()] return PluggableAuthenticationMiddleware( app, parser.identifiers, parser.authenticators, parser.challengers, parser.mdproviders, parser.request_classifier, parser.challenge_decider, log_stream, log_level, parser.remote_user_key, )