Merge pull request #3397 from mmerickel/refactor-actions
refactor configurator actions and predicates into mixins
1 files deleted
2 files added
16 files modified
1 files renamed
| | |
| | | import inspect |
| | | import itertools |
| | | import logging |
| | | import operator |
| | | import os |
| | | import sys |
| | | import threading |
| | | import venusian |
| | | |
| | |
| | | from pyramid.interfaces import ( |
| | | IDebugLogger, |
| | | IExceptionResponse, |
| | | IPredicateList, |
| | | PHASE0_CONFIG, |
| | | PHASE1_CONFIG, |
| | | PHASE2_CONFIG, |
| | |
| | | |
| | | from pyramid.authorization import ACLAuthorizationPolicy |
| | | |
| | | from pyramid.compat import text_, reraise, string_types |
| | | from pyramid.compat import text_, string_types |
| | | |
| | | from pyramid.events import ApplicationCreated |
| | | |
| | | from pyramid.exceptions import ( |
| | | ConfigurationConflictError, |
| | | ConfigurationError, |
| | | ConfigurationExecutionError, |
| | | ) |
| | | from pyramid.exceptions import ConfigurationError |
| | | |
| | | from pyramid.httpexceptions import default_exceptionresponse_view |
| | | |
| | | from pyramid.path import caller_package, package_of |
| | | |
| | | from pyramid.registry import Introspectable, Introspector, Registry, undefer |
| | | from pyramid.registry import Introspectable, Introspector, Registry |
| | | |
| | | from pyramid.router import Router |
| | | |
| | |
| | | |
| | | from pyramid.util import WeakOrderedSet, object_description |
| | | |
| | | from pyramid.config.util import ActionInfo, PredicateList, action_method, not_ |
| | | from pyramid.config.actions import action_method, ActionState |
| | | from pyramid.config.predicates import not_ |
| | | |
| | | from pyramid.config.actions import ActionConfiguratorMixin |
| | | from pyramid.config.adapters import AdaptersConfiguratorMixin |
| | | from pyramid.config.assets import AssetsConfiguratorMixin |
| | | from pyramid.config.factories import FactoriesConfiguratorMixin |
| | | from pyramid.config.i18n import I18NConfiguratorMixin |
| | | from pyramid.config.predicates import PredicateConfiguratorMixin |
| | | from pyramid.config.rendering import RenderingConfiguratorMixin |
| | | from pyramid.config.routes import RoutesConfiguratorMixin |
| | | from pyramid.config.security import SecurityConfiguratorMixin |
| | |
| | | PHASE2_CONFIG = PHASE2_CONFIG # api |
| | | PHASE3_CONFIG = PHASE3_CONFIG # api |
| | | |
| | | ActionState = ActionState # bw-compat for pyramid_zcml |
| | | |
| | | |
| | | class Configurator( |
| | | ActionConfiguratorMixin, |
| | | PredicateConfiguratorMixin, |
| | | TestingConfiguratorMixin, |
| | | TweensConfiguratorMixin, |
| | | SecurityConfiguratorMixin, |
| | |
| | | _get_introspector, _set_introspector, _del_introspector |
| | | ) |
| | | |
| | | def get_predlist(self, name): |
| | | predlist = self.registry.queryUtility(IPredicateList, name=name) |
| | | if predlist is None: |
| | | predlist = PredicateList() |
| | | self.registry.registerUtility(predlist, IPredicateList, name=name) |
| | | return predlist |
| | | |
| | | def _add_predicate( |
| | | self, type, name, factory, weighs_more_than=None, weighs_less_than=None |
| | | ): |
| | | factory = self.maybe_dotted(factory) |
| | | discriminator = ('%s option' % type, name) |
| | | intr = self.introspectable( |
| | | '%s predicates' % type, |
| | | discriminator, |
| | | '%s predicate named %s' % (type, name), |
| | | '%s predicate' % type, |
| | | ) |
| | | intr['name'] = name |
| | | intr['factory'] = factory |
| | | intr['weighs_more_than'] = weighs_more_than |
| | | intr['weighs_less_than'] = weighs_less_than |
| | | |
| | | def register(): |
| | | predlist = self.get_predlist(type) |
| | | predlist.add( |
| | | name, |
| | | factory, |
| | | weighs_more_than=weighs_more_than, |
| | | weighs_less_than=weighs_less_than, |
| | | ) |
| | | |
| | | self.action( |
| | | discriminator, |
| | | register, |
| | | introspectables=(intr,), |
| | | order=PHASE1_CONFIG, |
| | | ) # must be registered early |
| | | |
| | | @property |
| | | def action_info(self): |
| | | info = self.info # usually a ZCML action (ParserInfo) if self.info |
| | | if not info: |
| | | # Try to provide more accurate info for conflict reports |
| | | if self._ainfo: |
| | | info = self._ainfo[0] |
| | | else: |
| | | info = ActionInfo(None, 0, '', '') |
| | | return info |
| | | |
| | | def action( |
| | | self, |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | order=0, |
| | | introspectables=(), |
| | | **extra |
| | | ): |
| | | """ Register an action which will be executed when |
| | | :meth:`pyramid.config.Configurator.commit` is called (or executed |
| | | immediately if ``autocommit`` is ``True``). |
| | | |
| | | .. warning:: This method is typically only used by :app:`Pyramid` |
| | | framework extension authors, not by :app:`Pyramid` application |
| | | developers. |
| | | |
| | | The ``discriminator`` uniquely identifies the action. It must be |
| | | given, but it can be ``None``, to indicate that the action never |
| | | conflicts. It must be a hashable value. |
| | | |
| | | The ``callable`` is a callable object which performs the task |
| | | associated with the action when the action is executed. It is |
| | | optional. |
| | | |
| | | ``args`` and ``kw`` are tuple and dict objects respectively, which |
| | | are passed to ``callable`` when this action is executed. Both are |
| | | optional. |
| | | |
| | | ``order`` is a grouping mechanism; an action with a lower order will |
| | | be executed before an action with a higher order (has no effect when |
| | | autocommit is ``True``). |
| | | |
| | | ``introspectables`` is a sequence of :term:`introspectable` objects |
| | | (or the empty sequence if no introspectable objects are associated |
| | | with this action). If this configurator's ``introspection`` |
| | | attribute is ``False``, these introspectables will be ignored. |
| | | |
| | | ``extra`` provides a facility for inserting extra keys and values |
| | | into an action dictionary. |
| | | """ |
| | | # catch nonhashable discriminators here; most unit tests use |
| | | # autocommit=False, which won't catch unhashable discriminators |
| | | assert hash(discriminator) |
| | | |
| | | if kw is None: |
| | | kw = {} |
| | | |
| | | autocommit = self.autocommit |
| | | action_info = self.action_info |
| | | |
| | | if not self.introspection: |
| | | # if we're not introspecting, ignore any introspectables passed |
| | | # to us |
| | | introspectables = () |
| | | |
| | | if autocommit: |
| | | # callables can depend on the side effects of resolving a |
| | | # deferred discriminator |
| | | self.begin() |
| | | try: |
| | | undefer(discriminator) |
| | | if callable is not None: |
| | | callable(*args, **kw) |
| | | for introspectable in introspectables: |
| | | introspectable.register(self.introspector, action_info) |
| | | finally: |
| | | self.end() |
| | | |
| | | else: |
| | | action = extra |
| | | action.update( |
| | | dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | order=order, |
| | | info=action_info, |
| | | includepath=self.includepath, |
| | | introspectables=introspectables, |
| | | ) |
| | | ) |
| | | self.action_state.action(**action) |
| | | |
| | | def _get_action_state(self): |
| | | registry = self.registry |
| | | try: |
| | | state = registry.action_state |
| | | except AttributeError: |
| | | state = ActionState() |
| | | registry.action_state = state |
| | | return state |
| | | |
| | | def _set_action_state(self, state): |
| | | self.registry.action_state = state |
| | | |
| | | action_state = property(_get_action_state, _set_action_state) |
| | | |
| | | _ctx = action_state # bw compat |
| | | |
| | | def commit(self): |
| | | """ |
| | | Commit any pending configuration actions. If a configuration |
| | | conflict is detected in the pending configuration actions, this method |
| | | will raise a :exc:`ConfigurationConflictError`; within the traceback |
| | | of this error will be information about the source of the conflict, |
| | | usually including file names and line numbers of the cause of the |
| | | configuration conflicts. |
| | | |
| | | .. warning:: |
| | | You should think very carefully before manually invoking |
| | | ``commit()``. Especially not as part of any reusable configuration |
| | | methods. Normally it should only be done by an application author at |
| | | the end of configuration in order to override certain aspects of an |
| | | addon. |
| | | |
| | | """ |
| | | self.begin() |
| | | try: |
| | | self.action_state.execute_actions(introspector=self.introspector) |
| | | finally: |
| | | self.end() |
| | | self.action_state = ActionState() # old actions have been processed |
| | | |
| | | def include(self, callable, route_prefix=None): |
| | | """Include a configuration callable, to support imperative |
| | | application extensibility. |
| | |
| | | self.end() |
| | | |
| | | return app |
| | | |
| | | |
| | | # this class is licensed under the ZPL (stolen from Zope) |
| | | class ActionState(object): |
| | | def __init__(self): |
| | | # NB "actions" is an API, dep'd upon by pyramid_zcml's load_zcml func |
| | | self.actions = [] |
| | | self._seen_files = set() |
| | | |
| | | def processSpec(self, spec): |
| | | """Check whether a callable needs to be processed. The ``spec`` |
| | | refers to a unique identifier for the callable. |
| | | |
| | | Return True if processing is needed and False otherwise. If |
| | | the callable needs to be processed, it will be marked as |
| | | processed, assuming that the caller will procces the callable if |
| | | it needs to be processed. |
| | | """ |
| | | if spec in self._seen_files: |
| | | return False |
| | | self._seen_files.add(spec) |
| | | return True |
| | | |
| | | def action( |
| | | self, |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | order=0, |
| | | includepath=(), |
| | | info=None, |
| | | introspectables=(), |
| | | **extra |
| | | ): |
| | | """Add an action with the given discriminator, callable and arguments |
| | | """ |
| | | if kw is None: |
| | | kw = {} |
| | | action = extra |
| | | action.update( |
| | | dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | includepath=includepath, |
| | | info=info, |
| | | order=order, |
| | | introspectables=introspectables, |
| | | ) |
| | | ) |
| | | self.actions.append(action) |
| | | |
| | | def execute_actions(self, clear=True, introspector=None): |
| | | """Execute the configuration actions |
| | | |
| | | This calls the action callables after resolving conflicts |
| | | |
| | | For example: |
| | | |
| | | >>> output = [] |
| | | >>> def f(*a, **k): |
| | | ... output.append(('f', a, k)) |
| | | >>> context = ActionState() |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... (1, f, (11,), {}, ('x', )), |
| | | ... (2, f, (2,)), |
| | | ... ] |
| | | >>> context.execute_actions() |
| | | >>> output |
| | | [('f', (1,), {}), ('f', (2,), {})] |
| | | |
| | | If the action raises an error, we convert it to a |
| | | ConfigurationExecutionError. |
| | | |
| | | >>> output = [] |
| | | >>> def bad(): |
| | | ... bad.xxx |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... (1, f, (11,), {}, ('x', )), |
| | | ... (2, f, (2,)), |
| | | ... (3, bad, (), {}, (), 'oops') |
| | | ... ] |
| | | >>> try: |
| | | ... v = context.execute_actions() |
| | | ... except ConfigurationExecutionError, v: |
| | | ... pass |
| | | >>> print(v) |
| | | exceptions.AttributeError: 'function' object has no attribute 'xxx' |
| | | in: |
| | | oops |
| | | |
| | | Note that actions executed before the error still have an effect: |
| | | |
| | | >>> output |
| | | [('f', (1,), {}), ('f', (2,), {})] |
| | | |
| | | The execution is re-entrant such that actions may be added by other |
| | | actions with the one caveat that the order of any added actions must |
| | | be equal to or larger than the current action. |
| | | |
| | | >>> output = [] |
| | | >>> def f(*a, **k): |
| | | ... output.append(('f', a, k)) |
| | | ... context.actions.append((3, g, (8,), {})) |
| | | >>> def g(*a, **k): |
| | | ... output.append(('g', a, k)) |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... ] |
| | | >>> context.execute_actions() |
| | | >>> output |
| | | [('f', (1,), {}), ('g', (8,), {})] |
| | | |
| | | """ |
| | | try: |
| | | all_actions = [] |
| | | executed_actions = [] |
| | | action_iter = iter([]) |
| | | conflict_state = ConflictResolverState() |
| | | |
| | | while True: |
| | | # We clear the actions list prior to execution so if there |
| | | # are some new actions then we add them to the mix and resolve |
| | | # conflicts again. This orders the new actions as well as |
| | | # ensures that the previously executed actions have no new |
| | | # conflicts. |
| | | if self.actions: |
| | | all_actions.extend(self.actions) |
| | | action_iter = resolveConflicts( |
| | | self.actions, state=conflict_state |
| | | ) |
| | | self.actions = [] |
| | | |
| | | action = next(action_iter, None) |
| | | if action is None: |
| | | # we are done! |
| | | break |
| | | |
| | | callable = action['callable'] |
| | | args = action['args'] |
| | | kw = action['kw'] |
| | | info = action['info'] |
| | | # we use "get" below in case an action was added via a ZCML |
| | | # directive that did not know about introspectables |
| | | introspectables = action.get('introspectables', ()) |
| | | |
| | | try: |
| | | if callable is not None: |
| | | callable(*args, **kw) |
| | | except Exception: |
| | | t, v, tb = sys.exc_info() |
| | | try: |
| | | reraise( |
| | | ConfigurationExecutionError, |
| | | ConfigurationExecutionError(t, v, info), |
| | | tb, |
| | | ) |
| | | finally: |
| | | del t, v, tb |
| | | |
| | | if introspector is not None: |
| | | for introspectable in introspectables: |
| | | introspectable.register(introspector, info) |
| | | |
| | | executed_actions.append(action) |
| | | |
| | | self.actions = all_actions |
| | | return executed_actions |
| | | |
| | | finally: |
| | | if clear: |
| | | self.actions = [] |
| | | |
| | | |
| | | class ConflictResolverState(object): |
| | | def __init__(self): |
| | | # keep a set of resolved discriminators to test against to ensure |
| | | # that a new action does not conflict with something already executed |
| | | self.resolved_ainfos = {} |
| | | |
| | | # actions left over from a previous iteration |
| | | self.remaining_actions = [] |
| | | |
| | | # after executing an action we memoize its order to avoid any new |
| | | # actions sending us backward |
| | | self.min_order = None |
| | | |
| | | # unique tracks the index of the action so we need it to increase |
| | | # monotonically across invocations to resolveConflicts |
| | | self.start = 0 |
| | | |
| | | |
| | | # this function is licensed under the ZPL (stolen from Zope) |
| | | def resolveConflicts(actions, state=None): |
| | | """Resolve conflicting actions |
| | | |
| | | Given an actions list, identify and try to resolve conflicting actions. |
| | | Actions conflict if they have the same non-None discriminator. |
| | | |
| | | Conflicting actions can be resolved if the include path of one of |
| | | the actions is a prefix of the includepaths of the other |
| | | conflicting actions and is unequal to the include paths in the |
| | | other conflicting actions. |
| | | |
| | | Actions are resolved on a per-order basis because some discriminators |
| | | cannot be computed until earlier actions have executed. An action in an |
| | | earlier order may execute successfully only to find out later that it was |
| | | overridden by another action with a smaller include path. This will result |
| | | in a conflict as there is no way to revert the original action. |
| | | |
| | | ``state`` may be an instance of ``ConflictResolverState`` that |
| | | can be used to resume execution and resolve the new actions against the |
| | | list of executed actions from a previous call. |
| | | |
| | | """ |
| | | if state is None: |
| | | state = ConflictResolverState() |
| | | |
| | | # pick up where we left off last time, but track the new actions as well |
| | | state.remaining_actions.extend(normalize_actions(actions)) |
| | | actions = state.remaining_actions |
| | | |
| | | def orderandpos(v): |
| | | n, v = v |
| | | return (v['order'] or 0, n) |
| | | |
| | | def orderonly(v): |
| | | n, v = v |
| | | return v['order'] or 0 |
| | | |
| | | sactions = sorted(enumerate(actions, start=state.start), key=orderandpos) |
| | | for order, actiongroup in itertools.groupby(sactions, orderonly): |
| | | # "order" is an integer grouping. Actions in a lower order will be |
| | | # executed before actions in a higher order. All of the actions in |
| | | # one grouping will be executed (its callable, if any will be called) |
| | | # before any of the actions in the next. |
| | | output = [] |
| | | unique = {} |
| | | |
| | | # error out if we went backward in order |
| | | if state.min_order is not None and order < state.min_order: |
| | | r = [ |
| | | 'Actions were added to order={0} after execution had moved ' |
| | | 'on to order={1}. Conflicting actions: '.format( |
| | | order, state.min_order |
| | | ) |
| | | ] |
| | | for i, action in actiongroup: |
| | | for line in str(action['info']).rstrip().split('\n'): |
| | | r.append(" " + line) |
| | | raise ConfigurationError('\n'.join(r)) |
| | | |
| | | for i, action in actiongroup: |
| | | # Within an order, actions are executed sequentially based on |
| | | # original action ordering ("i"). |
| | | |
| | | # "ainfo" is a tuple of (i, action) where "i" is an integer |
| | | # expressing the relative position of this action in the action |
| | | # list being resolved, and "action" is an action dictionary. The |
| | | # purpose of an ainfo is to associate an "i" with a particular |
| | | # action; "i" exists for sorting after conflict resolution. |
| | | ainfo = (i, action) |
| | | |
| | | # wait to defer discriminators until we are on their order because |
| | | # the discriminator may depend on state from a previous order |
| | | discriminator = undefer(action['discriminator']) |
| | | action['discriminator'] = discriminator |
| | | |
| | | if discriminator is None: |
| | | # The discriminator is None, so this action can never conflict. |
| | | # We can add it directly to the result. |
| | | output.append(ainfo) |
| | | continue |
| | | |
| | | L = unique.setdefault(discriminator, []) |
| | | L.append(ainfo) |
| | | |
| | | # Check for conflicts |
| | | conflicts = {} |
| | | for discriminator, ainfos in unique.items(): |
| | | # We use (includepath, i) as a sort key because we need to |
| | | # sort the actions by the paths so that the shortest path with a |
| | | # given prefix comes first. The "first" action is the one with the |
| | | # shortest include path. We break sorting ties using "i". |
| | | def bypath(ainfo): |
| | | path, i = ainfo[1]['includepath'], ainfo[0] |
| | | return path, order, i |
| | | |
| | | ainfos.sort(key=bypath) |
| | | ainfo, rest = ainfos[0], ainfos[1:] |
| | | _, action = ainfo |
| | | |
| | | # ensure this new action does not conflict with a previously |
| | | # resolved action from an earlier order / invocation |
| | | prev_ainfo = state.resolved_ainfos.get(discriminator) |
| | | if prev_ainfo is not None: |
| | | _, paction = prev_ainfo |
| | | basepath, baseinfo = paction['includepath'], paction['info'] |
| | | includepath = action['includepath'] |
| | | # if the new action conflicts with the resolved action then |
| | | # note the conflict, otherwise drop the action as it's |
| | | # effectively overriden by the previous action |
| | | if ( |
| | | includepath[: len(basepath)] != basepath |
| | | or includepath == basepath |
| | | ): |
| | | L = conflicts.setdefault(discriminator, [baseinfo]) |
| | | L.append(action['info']) |
| | | |
| | | else: |
| | | output.append(ainfo) |
| | | |
| | | basepath, baseinfo = action['includepath'], action['info'] |
| | | for _, action in rest: |
| | | includepath = action['includepath'] |
| | | # Test whether path is a prefix of opath |
| | | if ( |
| | | includepath[: len(basepath)] != basepath |
| | | or includepath == basepath # not a prefix |
| | | ): |
| | | L = conflicts.setdefault(discriminator, [baseinfo]) |
| | | L.append(action['info']) |
| | | |
| | | if conflicts: |
| | | raise ConfigurationConflictError(conflicts) |
| | | |
| | | # sort resolved actions by "i" and yield them one by one |
| | | for i, action in sorted(output, key=operator.itemgetter(0)): |
| | | # do not memoize the order until we resolve an action inside it |
| | | state.min_order = action['order'] |
| | | state.start = i + 1 |
| | | state.remaining_actions.remove(action) |
| | | state.resolved_ainfos[action['discriminator']] = (i, action) |
| | | yield action |
| | | |
| | | |
| | | def normalize_actions(actions): |
| | | """Convert old-style tuple actions to new-style dicts.""" |
| | | result = [] |
| | | for v in actions: |
| | | if not isinstance(v, dict): |
| | | v = expand_action_tuple(*v) |
| | | result.append(v) |
| | | return result |
| | | |
| | | |
| | | def expand_action_tuple( |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | includepath=(), |
| | | info=None, |
| | | order=0, |
| | | introspectables=(), |
| | | ): |
| | | if kw is None: |
| | | kw = {} |
| | | return dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | includepath=includepath, |
| | | info=info, |
| | | order=order, |
| | | introspectables=introspectables, |
| | | ) |
| | | |
| | | |
| | | global_registries = WeakOrderedSet() |
New file |
| | |
| | | import functools |
| | | import itertools |
| | | import operator |
| | | import sys |
| | | import traceback |
| | | from zope.interface import implementer |
| | | |
| | | from pyramid.compat import reraise |
| | | from pyramid.exceptions import ( |
| | | ConfigurationConflictError, |
| | | ConfigurationError, |
| | | ConfigurationExecutionError, |
| | | ) |
| | | from pyramid.interfaces import IActionInfo |
| | | from pyramid.registry import undefer |
| | | from pyramid.util import is_nonstr_iter |
| | | |
| | | |
| | | class ActionConfiguratorMixin(object): |
| | | @property |
| | | def action_info(self): |
| | | info = self.info # usually a ZCML action (ParserInfo) if self.info |
| | | if not info: |
| | | # Try to provide more accurate info for conflict reports |
| | | if self._ainfo: |
| | | info = self._ainfo[0] |
| | | else: |
| | | info = ActionInfo(None, 0, '', '') |
| | | return info |
| | | |
| | | def action( |
| | | self, |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | order=0, |
| | | introspectables=(), |
| | | **extra |
| | | ): |
| | | """ Register an action which will be executed when |
| | | :meth:`pyramid.config.Configurator.commit` is called (or executed |
| | | immediately if ``autocommit`` is ``True``). |
| | | |
| | | .. warning:: This method is typically only used by :app:`Pyramid` |
| | | framework extension authors, not by :app:`Pyramid` application |
| | | developers. |
| | | |
| | | The ``discriminator`` uniquely identifies the action. It must be |
| | | given, but it can be ``None``, to indicate that the action never |
| | | conflicts. It must be a hashable value. |
| | | |
| | | The ``callable`` is a callable object which performs the task |
| | | associated with the action when the action is executed. It is |
| | | optional. |
| | | |
| | | ``args`` and ``kw`` are tuple and dict objects respectively, which |
| | | are passed to ``callable`` when this action is executed. Both are |
| | | optional. |
| | | |
| | | ``order`` is a grouping mechanism; an action with a lower order will |
| | | be executed before an action with a higher order (has no effect when |
| | | autocommit is ``True``). |
| | | |
| | | ``introspectables`` is a sequence of :term:`introspectable` objects |
| | | (or the empty sequence if no introspectable objects are associated |
| | | with this action). If this configurator's ``introspection`` |
| | | attribute is ``False``, these introspectables will be ignored. |
| | | |
| | | ``extra`` provides a facility for inserting extra keys and values |
| | | into an action dictionary. |
| | | """ |
| | | # catch nonhashable discriminators here; most unit tests use |
| | | # autocommit=False, which won't catch unhashable discriminators |
| | | assert hash(discriminator) |
| | | |
| | | if kw is None: |
| | | kw = {} |
| | | |
| | | autocommit = self.autocommit |
| | | action_info = self.action_info |
| | | |
| | | if not self.introspection: |
| | | # if we're not introspecting, ignore any introspectables passed |
| | | # to us |
| | | introspectables = () |
| | | |
| | | if autocommit: |
| | | # callables can depend on the side effects of resolving a |
| | | # deferred discriminator |
| | | self.begin() |
| | | try: |
| | | undefer(discriminator) |
| | | if callable is not None: |
| | | callable(*args, **kw) |
| | | for introspectable in introspectables: |
| | | introspectable.register(self.introspector, action_info) |
| | | finally: |
| | | self.end() |
| | | |
| | | else: |
| | | action = extra |
| | | action.update( |
| | | dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | order=order, |
| | | info=action_info, |
| | | includepath=self.includepath, |
| | | introspectables=introspectables, |
| | | ) |
| | | ) |
| | | self.action_state.action(**action) |
| | | |
| | | def _get_action_state(self): |
| | | registry = self.registry |
| | | try: |
| | | state = registry.action_state |
| | | except AttributeError: |
| | | state = ActionState() |
| | | registry.action_state = state |
| | | return state |
| | | |
| | | def _set_action_state(self, state): |
| | | self.registry.action_state = state |
| | | |
| | | action_state = property(_get_action_state, _set_action_state) |
| | | |
| | | _ctx = action_state # bw compat |
| | | |
| | | def commit(self): |
| | | """ |
| | | Commit any pending configuration actions. If a configuration |
| | | conflict is detected in the pending configuration actions, this method |
| | | will raise a :exc:`ConfigurationConflictError`; within the traceback |
| | | of this error will be information about the source of the conflict, |
| | | usually including file names and line numbers of the cause of the |
| | | configuration conflicts. |
| | | |
| | | .. warning:: |
| | | You should think very carefully before manually invoking |
| | | ``commit()``. Especially not as part of any reusable configuration |
| | | methods. Normally it should only be done by an application author at |
| | | the end of configuration in order to override certain aspects of an |
| | | addon. |
| | | |
| | | """ |
| | | self.begin() |
| | | try: |
| | | self.action_state.execute_actions(introspector=self.introspector) |
| | | finally: |
| | | self.end() |
| | | self.action_state = ActionState() # old actions have been processed |
| | | |
| | | |
| | | # this class is licensed under the ZPL (stolen from Zope) |
| | | class ActionState(object): |
| | | def __init__(self): |
| | | # NB "actions" is an API, dep'd upon by pyramid_zcml's load_zcml func |
| | | self.actions = [] |
| | | self._seen_files = set() |
| | | |
| | | def processSpec(self, spec): |
| | | """Check whether a callable needs to be processed. The ``spec`` |
| | | refers to a unique identifier for the callable. |
| | | |
| | | Return True if processing is needed and False otherwise. If |
| | | the callable needs to be processed, it will be marked as |
| | | processed, assuming that the caller will procces the callable if |
| | | it needs to be processed. |
| | | """ |
| | | if spec in self._seen_files: |
| | | return False |
| | | self._seen_files.add(spec) |
| | | return True |
| | | |
| | | def action( |
| | | self, |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | order=0, |
| | | includepath=(), |
| | | info=None, |
| | | introspectables=(), |
| | | **extra |
| | | ): |
| | | """Add an action with the given discriminator, callable and arguments |
| | | """ |
| | | if kw is None: |
| | | kw = {} |
| | | action = extra |
| | | action.update( |
| | | dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | includepath=includepath, |
| | | info=info, |
| | | order=order, |
| | | introspectables=introspectables, |
| | | ) |
| | | ) |
| | | self.actions.append(action) |
| | | |
| | | def execute_actions(self, clear=True, introspector=None): |
| | | """Execute the configuration actions |
| | | |
| | | This calls the action callables after resolving conflicts |
| | | |
| | | For example: |
| | | |
| | | >>> output = [] |
| | | >>> def f(*a, **k): |
| | | ... output.append(('f', a, k)) |
| | | >>> context = ActionState() |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... (1, f, (11,), {}, ('x', )), |
| | | ... (2, f, (2,)), |
| | | ... ] |
| | | >>> context.execute_actions() |
| | | >>> output |
| | | [('f', (1,), {}), ('f', (2,), {})] |
| | | |
| | | If the action raises an error, we convert it to a |
| | | ConfigurationExecutionError. |
| | | |
| | | >>> output = [] |
| | | >>> def bad(): |
| | | ... bad.xxx |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... (1, f, (11,), {}, ('x', )), |
| | | ... (2, f, (2,)), |
| | | ... (3, bad, (), {}, (), 'oops') |
| | | ... ] |
| | | >>> try: |
| | | ... v = context.execute_actions() |
| | | ... except ConfigurationExecutionError, v: |
| | | ... pass |
| | | >>> print(v) |
| | | exceptions.AttributeError: 'function' object has no attribute 'xxx' |
| | | in: |
| | | oops |
| | | |
| | | Note that actions executed before the error still have an effect: |
| | | |
| | | >>> output |
| | | [('f', (1,), {}), ('f', (2,), {})] |
| | | |
| | | The execution is re-entrant such that actions may be added by other |
| | | actions with the one caveat that the order of any added actions must |
| | | be equal to or larger than the current action. |
| | | |
| | | >>> output = [] |
| | | >>> def f(*a, **k): |
| | | ... output.append(('f', a, k)) |
| | | ... context.actions.append((3, g, (8,), {})) |
| | | >>> def g(*a, **k): |
| | | ... output.append(('g', a, k)) |
| | | >>> context.actions = [ |
| | | ... (1, f, (1,)), |
| | | ... ] |
| | | >>> context.execute_actions() |
| | | >>> output |
| | | [('f', (1,), {}), ('g', (8,), {})] |
| | | |
| | | """ |
| | | try: |
| | | all_actions = [] |
| | | executed_actions = [] |
| | | action_iter = iter([]) |
| | | conflict_state = ConflictResolverState() |
| | | |
| | | while True: |
| | | # We clear the actions list prior to execution so if there |
| | | # are some new actions then we add them to the mix and resolve |
| | | # conflicts again. This orders the new actions as well as |
| | | # ensures that the previously executed actions have no new |
| | | # conflicts. |
| | | if self.actions: |
| | | all_actions.extend(self.actions) |
| | | action_iter = resolveConflicts( |
| | | self.actions, state=conflict_state |
| | | ) |
| | | self.actions = [] |
| | | |
| | | action = next(action_iter, None) |
| | | if action is None: |
| | | # we are done! |
| | | break |
| | | |
| | | callable = action['callable'] |
| | | args = action['args'] |
| | | kw = action['kw'] |
| | | info = action['info'] |
| | | # we use "get" below in case an action was added via a ZCML |
| | | # directive that did not know about introspectables |
| | | introspectables = action.get('introspectables', ()) |
| | | |
| | | try: |
| | | if callable is not None: |
| | | callable(*args, **kw) |
| | | except Exception: |
| | | t, v, tb = sys.exc_info() |
| | | try: |
| | | reraise( |
| | | ConfigurationExecutionError, |
| | | ConfigurationExecutionError(t, v, info), |
| | | tb, |
| | | ) |
| | | finally: |
| | | del t, v, tb |
| | | |
| | | if introspector is not None: |
| | | for introspectable in introspectables: |
| | | introspectable.register(introspector, info) |
| | | |
| | | executed_actions.append(action) |
| | | |
| | | self.actions = all_actions |
| | | return executed_actions |
| | | |
| | | finally: |
| | | if clear: |
| | | self.actions = [] |
| | | |
| | | |
| | | class ConflictResolverState(object): |
| | | def __init__(self): |
| | | # keep a set of resolved discriminators to test against to ensure |
| | | # that a new action does not conflict with something already executed |
| | | self.resolved_ainfos = {} |
| | | |
| | | # actions left over from a previous iteration |
| | | self.remaining_actions = [] |
| | | |
| | | # after executing an action we memoize its order to avoid any new |
| | | # actions sending us backward |
| | | self.min_order = None |
| | | |
| | | # unique tracks the index of the action so we need it to increase |
| | | # monotonically across invocations to resolveConflicts |
| | | self.start = 0 |
| | | |
| | | |
| | | # this function is licensed under the ZPL (stolen from Zope) |
| | | def resolveConflicts(actions, state=None): |
| | | """Resolve conflicting actions |
| | | |
| | | Given an actions list, identify and try to resolve conflicting actions. |
| | | Actions conflict if they have the same non-None discriminator. |
| | | |
| | | Conflicting actions can be resolved if the include path of one of |
| | | the actions is a prefix of the includepaths of the other |
| | | conflicting actions and is unequal to the include paths in the |
| | | other conflicting actions. |
| | | |
| | | Actions are resolved on a per-order basis because some discriminators |
| | | cannot be computed until earlier actions have executed. An action in an |
| | | earlier order may execute successfully only to find out later that it was |
| | | overridden by another action with a smaller include path. This will result |
| | | in a conflict as there is no way to revert the original action. |
| | | |
| | | ``state`` may be an instance of ``ConflictResolverState`` that |
| | | can be used to resume execution and resolve the new actions against the |
| | | list of executed actions from a previous call. |
| | | |
| | | """ |
| | | if state is None: |
| | | state = ConflictResolverState() |
| | | |
| | | # pick up where we left off last time, but track the new actions as well |
| | | state.remaining_actions.extend(normalize_actions(actions)) |
| | | actions = state.remaining_actions |
| | | |
| | | def orderandpos(v): |
| | | n, v = v |
| | | return (v['order'] or 0, n) |
| | | |
| | | def orderonly(v): |
| | | n, v = v |
| | | return v['order'] or 0 |
| | | |
| | | sactions = sorted(enumerate(actions, start=state.start), key=orderandpos) |
| | | for order, actiongroup in itertools.groupby(sactions, orderonly): |
| | | # "order" is an integer grouping. Actions in a lower order will be |
| | | # executed before actions in a higher order. All of the actions in |
| | | # one grouping will be executed (its callable, if any will be called) |
| | | # before any of the actions in the next. |
| | | output = [] |
| | | unique = {} |
| | | |
| | | # error out if we went backward in order |
| | | if state.min_order is not None and order < state.min_order: |
| | | r = [ |
| | | 'Actions were added to order={0} after execution had moved ' |
| | | 'on to order={1}. Conflicting actions: '.format( |
| | | order, state.min_order |
| | | ) |
| | | ] |
| | | for i, action in actiongroup: |
| | | for line in str(action['info']).rstrip().split('\n'): |
| | | r.append(" " + line) |
| | | raise ConfigurationError('\n'.join(r)) |
| | | |
| | | for i, action in actiongroup: |
| | | # Within an order, actions are executed sequentially based on |
| | | # original action ordering ("i"). |
| | | |
| | | # "ainfo" is a tuple of (i, action) where "i" is an integer |
| | | # expressing the relative position of this action in the action |
| | | # list being resolved, and "action" is an action dictionary. The |
| | | # purpose of an ainfo is to associate an "i" with a particular |
| | | # action; "i" exists for sorting after conflict resolution. |
| | | ainfo = (i, action) |
| | | |
| | | # wait to defer discriminators until we are on their order because |
| | | # the discriminator may depend on state from a previous order |
| | | discriminator = undefer(action['discriminator']) |
| | | action['discriminator'] = discriminator |
| | | |
| | | if discriminator is None: |
| | | # The discriminator is None, so this action can never conflict. |
| | | # We can add it directly to the result. |
| | | output.append(ainfo) |
| | | continue |
| | | |
| | | L = unique.setdefault(discriminator, []) |
| | | L.append(ainfo) |
| | | |
| | | # Check for conflicts |
| | | conflicts = {} |
| | | for discriminator, ainfos in unique.items(): |
| | | # We use (includepath, i) as a sort key because we need to |
| | | # sort the actions by the paths so that the shortest path with a |
| | | # given prefix comes first. The "first" action is the one with the |
| | | # shortest include path. We break sorting ties using "i". |
| | | def bypath(ainfo): |
| | | path, i = ainfo[1]['includepath'], ainfo[0] |
| | | return path, order, i |
| | | |
| | | ainfos.sort(key=bypath) |
| | | ainfo, rest = ainfos[0], ainfos[1:] |
| | | _, action = ainfo |
| | | |
| | | # ensure this new action does not conflict with a previously |
| | | # resolved action from an earlier order / invocation |
| | | prev_ainfo = state.resolved_ainfos.get(discriminator) |
| | | if prev_ainfo is not None: |
| | | _, paction = prev_ainfo |
| | | basepath, baseinfo = paction['includepath'], paction['info'] |
| | | includepath = action['includepath'] |
| | | # if the new action conflicts with the resolved action then |
| | | # note the conflict, otherwise drop the action as it's |
| | | # effectively overriden by the previous action |
| | | if ( |
| | | includepath[: len(basepath)] != basepath |
| | | or includepath == basepath |
| | | ): |
| | | L = conflicts.setdefault(discriminator, [baseinfo]) |
| | | L.append(action['info']) |
| | | |
| | | else: |
| | | output.append(ainfo) |
| | | |
| | | basepath, baseinfo = action['includepath'], action['info'] |
| | | for _, action in rest: |
| | | includepath = action['includepath'] |
| | | # Test whether path is a prefix of opath |
| | | if ( |
| | | includepath[: len(basepath)] != basepath |
| | | or includepath == basepath # not a prefix |
| | | ): |
| | | L = conflicts.setdefault(discriminator, [baseinfo]) |
| | | L.append(action['info']) |
| | | |
| | | if conflicts: |
| | | raise ConfigurationConflictError(conflicts) |
| | | |
| | | # sort resolved actions by "i" and yield them one by one |
| | | for i, action in sorted(output, key=operator.itemgetter(0)): |
| | | # do not memoize the order until we resolve an action inside it |
| | | state.min_order = action['order'] |
| | | state.start = i + 1 |
| | | state.remaining_actions.remove(action) |
| | | state.resolved_ainfos[action['discriminator']] = (i, action) |
| | | yield action |
| | | |
| | | |
| | | def normalize_actions(actions): |
| | | """Convert old-style tuple actions to new-style dicts.""" |
| | | result = [] |
| | | for v in actions: |
| | | if not isinstance(v, dict): |
| | | v = expand_action_tuple(*v) |
| | | result.append(v) |
| | | return result |
| | | |
| | | |
| | | def expand_action_tuple( |
| | | discriminator, |
| | | callable=None, |
| | | args=(), |
| | | kw=None, |
| | | includepath=(), |
| | | info=None, |
| | | order=0, |
| | | introspectables=(), |
| | | ): |
| | | if kw is None: |
| | | kw = {} |
| | | return dict( |
| | | discriminator=discriminator, |
| | | callable=callable, |
| | | args=args, |
| | | kw=kw, |
| | | includepath=includepath, |
| | | info=info, |
| | | order=order, |
| | | introspectables=introspectables, |
| | | ) |
| | | |
| | | |
| | | @implementer(IActionInfo) |
| | | class ActionInfo(object): |
| | | def __init__(self, file, line, function, src): |
| | | self.file = file |
| | | self.line = line |
| | | self.function = function |
| | | self.src = src |
| | | |
| | | def __str__(self): |
| | | srclines = self.src.split('\n') |
| | | src = '\n'.join(' %s' % x for x in srclines) |
| | | return 'Line %s of file %s:\n%s' % (self.line, self.file, src) |
| | | |
| | | |
| | | def action_method(wrapped): |
| | | """ Wrapper to provide the right conflict info report data when a method |
| | | that calls Configurator.action calls another that does the same. Not a |
| | | documented API but used by some external systems.""" |
| | | |
| | | def wrapper(self, *arg, **kw): |
| | | if self._ainfo is None: |
| | | self._ainfo = [] |
| | | info = kw.pop('_info', None) |
| | | # backframes for outer decorators to actionmethods |
| | | backframes = kw.pop('_backframes', 0) + 2 |
| | | if is_nonstr_iter(info) and len(info) == 4: |
| | | # _info permitted as extract_stack tuple |
| | | info = ActionInfo(*info) |
| | | if info is None: |
| | | try: |
| | | f = traceback.extract_stack(limit=4) |
| | | |
| | | # Work around a Python 3.5 issue whereby it would insert an |
| | | # extra stack frame. This should no longer be necessary in |
| | | # Python 3.5.1 |
| | | last_frame = ActionInfo(*f[-1]) |
| | | if last_frame.function == 'extract_stack': # pragma: no cover |
| | | f.pop() |
| | | info = ActionInfo(*f[-backframes]) |
| | | except Exception: # pragma: no cover |
| | | info = ActionInfo(None, 0, '', '') |
| | | self._ainfo.append(info) |
| | | try: |
| | | result = wrapped(self, *arg, **kw) |
| | | finally: |
| | | self._ainfo.pop() |
| | | return result |
| | | |
| | | if hasattr(wrapped, '__name__'): |
| | | functools.update_wrapper(wrapper, wrapped) |
| | | wrapper.__docobj__ = wrapped |
| | | return wrapper |
| | |
| | | |
| | | from pyramid.util import takes_one_arg |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class AdaptersConfiguratorMixin(object): |
| | |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.threadlocal import get_current_registry |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class OverrideProvider(pkg_resources.DefaultProvider): |
| | |
| | | |
| | | from pyramid.util import get_callable_name, InstancePropertyHelper |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class FactoriesConfiguratorMixin(object): |
| | |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.path import AssetResolver |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class I18NConfiguratorMixin(object): |
| | |
| | | import zope.deprecation |
| | | from hashlib import md5 |
| | | from webob.acceptparse import Accept |
| | | |
| | | zope.deprecation.moved('pyramid.predicates', 'Pyramid 1.10') |
| | | from pyramid.compat import bytes_, is_nonstr_iter |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.interfaces import IPredicateList, PHASE1_CONFIG |
| | | from pyramid.predicates import Notted |
| | | from pyramid.registry import predvalseq |
| | | from pyramid.util import TopologicalSorter |
| | | |
| | | |
| | | MAX_ORDER = 1 << 30 |
| | | DEFAULT_PHASH = md5().hexdigest() |
| | | |
| | | |
| | | class PredicateConfiguratorMixin(object): |
| | | def get_predlist(self, name): |
| | | predlist = self.registry.queryUtility(IPredicateList, name=name) |
| | | if predlist is None: |
| | | predlist = PredicateList() |
| | | self.registry.registerUtility(predlist, IPredicateList, name=name) |
| | | return predlist |
| | | |
| | | def _add_predicate( |
| | | self, type, name, factory, weighs_more_than=None, weighs_less_than=None |
| | | ): |
| | | factory = self.maybe_dotted(factory) |
| | | discriminator = ('%s option' % type, name) |
| | | intr = self.introspectable( |
| | | '%s predicates' % type, |
| | | discriminator, |
| | | '%s predicate named %s' % (type, name), |
| | | '%s predicate' % type, |
| | | ) |
| | | intr['name'] = name |
| | | intr['factory'] = factory |
| | | intr['weighs_more_than'] = weighs_more_than |
| | | intr['weighs_less_than'] = weighs_less_than |
| | | |
| | | def register(): |
| | | predlist = self.get_predlist(type) |
| | | predlist.add( |
| | | name, |
| | | factory, |
| | | weighs_more_than=weighs_more_than, |
| | | weighs_less_than=weighs_less_than, |
| | | ) |
| | | |
| | | self.action( |
| | | discriminator, |
| | | register, |
| | | introspectables=(intr,), |
| | | order=PHASE1_CONFIG, |
| | | ) # must be registered early |
| | | |
| | | |
| | | class not_(object): |
| | | """ |
| | | |
| | | You can invert the meaning of any predicate value by wrapping it in a call |
| | | to :class:`pyramid.config.not_`. |
| | | |
| | | .. code-block:: python |
| | | :linenos: |
| | | |
| | | from pyramid.config import not_ |
| | | |
| | | config.add_view( |
| | | 'mypackage.views.my_view', |
| | | route_name='ok', |
| | | request_method=not_('POST') |
| | | ) |
| | | |
| | | The above example will ensure that the view is called if the request method |
| | | is *not* ``POST``, at least if no other view is more specific. |
| | | |
| | | This technique of wrapping a predicate value in ``not_`` can be used |
| | | anywhere predicate values are accepted: |
| | | |
| | | - :meth:`pyramid.config.Configurator.add_view` |
| | | |
| | | - :meth:`pyramid.config.Configurator.add_route` |
| | | |
| | | - :meth:`pyramid.config.Configurator.add_subscriber` |
| | | |
| | | - :meth:`pyramid.view.view_config` |
| | | |
| | | - :meth:`pyramid.events.subscriber` |
| | | |
| | | .. versionadded:: 1.5 |
| | | """ |
| | | |
| | | def __init__(self, value): |
| | | self.value = value |
| | | |
| | | |
| | | # under = after |
| | | # over = before |
| | | |
| | | |
| | | class PredicateList(object): |
| | | def __init__(self): |
| | | self.sorter = TopologicalSorter() |
| | | self.last_added = None |
| | | |
| | | def add(self, name, factory, weighs_more_than=None, weighs_less_than=None): |
| | | # Predicates should be added to a predicate list in (presumed) |
| | | # computation expense order. |
| | | # if weighs_more_than is None and weighs_less_than is None: |
| | | # weighs_more_than = self.last_added or FIRST |
| | | # weighs_less_than = LAST |
| | | self.last_added = name |
| | | self.sorter.add( |
| | | name, factory, after=weighs_more_than, before=weighs_less_than |
| | | ) |
| | | |
| | | def names(self): |
| | | # Return the list of valid predicate names. |
| | | return self.sorter.names |
| | | |
| | | def make(self, config, **kw): |
| | | # Given a configurator and a list of keywords, a predicate list is |
| | | # computed. Elsewhere in the code, we evaluate predicates using a |
| | | # generator expression. All predicates associated with a view or |
| | | # route must evaluate true for the view or route to "match" during a |
| | | # request. The fastest predicate should be evaluated first, then the |
| | | # next fastest, and so on, as if one returns false, the remainder of |
| | | # the predicates won't need to be evaluated. |
| | | # |
| | | # While we compute predicates, we also compute a predicate hash (aka |
| | | # phash) that can be used by a caller to identify identical predicate |
| | | # lists. |
| | | ordered = self.sorter.sorted() |
| | | phash = md5() |
| | | weights = [] |
| | | preds = [] |
| | | for n, (name, predicate_factory) in enumerate(ordered): |
| | | vals = kw.pop(name, None) |
| | | if vals is None: # XXX should this be a sentinel other than None? |
| | | continue |
| | | if not isinstance(vals, predvalseq): |
| | | vals = (vals,) |
| | | for val in vals: |
| | | realval = val |
| | | notted = False |
| | | if isinstance(val, not_): |
| | | realval = val.value |
| | | notted = True |
| | | pred = predicate_factory(realval, config) |
| | | if notted: |
| | | pred = Notted(pred) |
| | | hashes = pred.phash() |
| | | if not is_nonstr_iter(hashes): |
| | | hashes = [hashes] |
| | | for h in hashes: |
| | | phash.update(bytes_(h)) |
| | | weights.append(1 << n + 1) |
| | | preds.append(pred) |
| | | if kw: |
| | | from difflib import get_close_matches |
| | | |
| | | closest = [] |
| | | names = [name for name, _ in ordered] |
| | | for name in kw: |
| | | closest.extend(get_close_matches(name, names, 3)) |
| | | |
| | | raise ConfigurationError( |
| | | 'Unknown predicate values: %r (did you mean %s)' |
| | | % (kw, ','.join(closest)) |
| | | ) |
| | | # A "order" is computed for the predicate list. An order is |
| | | # a scoring. |
| | | # |
| | | # Each predicate is associated with a weight value. The weight of a |
| | | # predicate symbolizes the relative potential "importance" of the |
| | | # predicate to all other predicates. A larger weight indicates |
| | | # greater importance. |
| | | # |
| | | # All weights for a given predicate list are bitwise ORed together |
| | | # to create a "score"; this score is then subtracted from |
| | | # MAX_ORDER and divided by an integer representing the number of |
| | | # predicates+1 to determine the order. |
| | | # |
| | | # For views, the order represents the ordering in which a "multiview" |
| | | # ( a collection of views that share the same context/request/name |
| | | # triad but differ in other ways via predicates) will attempt to call |
| | | # its set of views. Views with lower orders will be tried first. |
| | | # The intent is to a) ensure that views with more predicates are |
| | | # always evaluated before views with fewer predicates and b) to |
| | | # ensure a stable call ordering of views that share the same number |
| | | # of predicates. Views which do not have any predicates get an order |
| | | # of MAX_ORDER, meaning that they will be tried very last. |
| | | score = 0 |
| | | for bit in weights: |
| | | score = score | bit |
| | | order = (MAX_ORDER - score) / (len(preds) + 1) |
| | | return order, preds, phash.hexdigest() |
| | | |
| | | |
| | | def normalize_accept_offer(offer, allow_range=False): |
| | | if allow_range and '*' in offer: |
| | | return offer.lower() |
| | | return str(Accept.parse_offer(offer)) |
| | | |
| | | |
| | | def sort_accept_offers(offers, order=None): |
| | | """ |
| | | Sort a list of offers by preference. |
| | | |
| | | For a given ``type/subtype`` category of offers, this algorithm will |
| | | always sort offers with params higher than the bare offer. |
| | | |
| | | :param offers: A list of offers to be sorted. |
| | | :param order: A weighted list of offers where items closer to the start of |
| | | the list will be a preferred over items closer to the end. |
| | | :return: A list of offers sorted first by specificity (higher to lower) |
| | | then by ``order``. |
| | | |
| | | """ |
| | | if order is None: |
| | | order = [] |
| | | |
| | | max_weight = len(offers) |
| | | |
| | | def find_order_index(value, default=None): |
| | | return next((i for i, x in enumerate(order) if x == value), default) |
| | | |
| | | def offer_sort_key(value): |
| | | """ |
| | | (type_weight, params_weight) |
| | | |
| | | type_weight: |
| | | - index of specific ``type/subtype`` in order list |
| | | - ``max_weight * 2`` if no match is found |
| | | |
| | | params_weight: |
| | | - index of specific ``type/subtype;params`` in order list |
| | | - ``max_weight`` if not found |
| | | - ``max_weight + 1`` if no params at all |
| | | |
| | | """ |
| | | parsed = Accept.parse_offer(value) |
| | | |
| | | type_w = find_order_index( |
| | | parsed.type + '/' + parsed.subtype, max_weight |
| | | ) |
| | | |
| | | if parsed.params: |
| | | param_w = find_order_index(value, max_weight) |
| | | |
| | | else: |
| | | param_w = max_weight + 1 |
| | | |
| | | return (type_w, param_w) |
| | | |
| | | return sorted(offers, key=offer_sort_key) |
| | |
| | | from pyramid.interfaces import IRendererFactory, PHASE1_CONFIG |
| | | |
| | | from pyramid import renderers |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | DEFAULT_RENDERERS = ( |
| | | ('json', renderers.json_renderer_factory), |
| | |
| | | ) |
| | | |
| | | from pyramid.exceptions import ConfigurationError |
| | | import pyramid.predicates |
| | | from pyramid.request import route_request_iface |
| | | from pyramid.urldispatch import RoutesMapper |
| | | |
| | | from pyramid.util import as_sorted_tuple, is_nonstr_iter |
| | | |
| | | import pyramid.predicates |
| | | |
| | | from pyramid.config.util import ( |
| | | action_method, |
| | | normalize_accept_offer, |
| | | predvalseq, |
| | | ) |
| | | from pyramid.config.actions import action_method |
| | | from pyramid.config.predicates import normalize_accept_offer, predvalseq |
| | | |
| | | |
| | | class RoutesConfiguratorMixin(object): |
| | |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.util import as_sorted_tuple |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class SecurityConfiguratorMixin(object): |
| | |
| | | |
| | | from pyramid.traversal import decode_path_info, split_path_info |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class TestingConfiguratorMixin(object): |
| | |
| | | |
| | | from pyramid.util import is_string_or_iterable, TopologicalSorter |
| | | |
| | | from pyramid.config.util import action_method |
| | | from pyramid.config.actions import action_method |
| | | |
| | | |
| | | class TweensConfiguratorMixin(object): |
| | |
| | | wraps_view, |
| | | ) |
| | | |
| | | from pyramid.config.util import ( |
| | | action_method, |
| | | from pyramid.config.actions import action_method |
| | | from pyramid.config.predicates import ( |
| | | DEFAULT_PHASH, |
| | | MAX_ORDER, |
| | | normalize_accept_offer, |
New file |
| | |
| | | import unittest |
| | | |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | from pyramid.exceptions import ConfigurationExecutionError |
| | | |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | |
| | | class ActionConfiguratorMixinTests(unittest.TestCase): |
| | | def _makeOne(self, *arg, **kw): |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator(*arg, **kw) |
| | | return config |
| | | |
| | | def _getViewCallable( |
| | | self, |
| | | config, |
| | | ctx_iface=None, |
| | | request_iface=None, |
| | | name='', |
| | | exception_view=False, |
| | | ): |
| | | from zope.interface import Interface |
| | | from pyramid.interfaces import IView |
| | | from pyramid.interfaces import IViewClassifier |
| | | from pyramid.interfaces import IExceptionViewClassifier |
| | | |
| | | if exception_view: # pragma: no cover |
| | | classifier = IExceptionViewClassifier |
| | | else: |
| | | classifier = IViewClassifier |
| | | if ctx_iface is None: |
| | | ctx_iface = Interface |
| | | if request_iface is None: |
| | | request_iface = IRequest |
| | | return config.registry.adapters.lookup( |
| | | (classifier, request_iface, ctx_iface), |
| | | IView, |
| | | name=name, |
| | | default=None, |
| | | ) |
| | | |
| | | def test_action_branching_kw_is_None(self): |
| | | config = self._makeOne(autocommit=True) |
| | | self.assertEqual(config.action('discrim'), None) |
| | | |
| | | def test_action_branching_kw_is_not_None(self): |
| | | config = self._makeOne(autocommit=True) |
| | | self.assertEqual(config.action('discrim', kw={'a': 1}), None) |
| | | |
| | | def test_action_autocommit_with_introspectables(self): |
| | | from pyramid.config.actions import ActionInfo |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(len(intr.registered), 1) |
| | | self.assertEqual(intr.registered[0][0], config.introspector) |
| | | self.assertEqual(intr.registered[0][1].__class__, ActionInfo) |
| | | |
| | | def test_action_autocommit_with_introspectables_introspection_off(self): |
| | | config = self._makeOne(autocommit=True) |
| | | config.introspection = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(len(intr.registered), 0) |
| | | |
| | | def test_action_branching_nonautocommit_with_config_info(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = 'abc' |
| | | state = DummyActionState() |
| | | state.autocommit = False |
| | | config.action_state = state |
| | | config.action('discrim', kw={'a': 1}) |
| | | self.assertEqual( |
| | | state.actions, |
| | | [ |
| | | ( |
| | | (), |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': 'discrim', |
| | | 'includepath': (), |
| | | 'info': 'abc', |
| | | 'introspectables': (), |
| | | 'kw': {'a': 1}, |
| | | 'order': 0, |
| | | }, |
| | | ) |
| | | ], |
| | | ) |
| | | |
| | | def test_action_branching_nonautocommit_without_config_info(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = ['z'] |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | config.action('discrim', kw={'a': 1}) |
| | | self.assertEqual( |
| | | state.actions, |
| | | [ |
| | | ( |
| | | (), |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': 'discrim', |
| | | 'includepath': (), |
| | | 'info': 'z', |
| | | 'introspectables': (), |
| | | 'kw': {'a': 1}, |
| | | 'order': 0, |
| | | }, |
| | | ) |
| | | ], |
| | | ) |
| | | |
| | | def test_action_branching_nonautocommit_with_introspectables(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = [] |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(state.actions[0][1]['introspectables'], (intr,)) |
| | | |
| | | def test_action_nonautocommit_with_introspectables_introspection_off(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = [] |
| | | config.introspection = False |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(state.actions[0][1]['introspectables'], ()) |
| | | |
| | | def test_commit_conflict_simple(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.add_view(view1) |
| | | config.add_view(view2) |
| | | self.assertRaises(ConfigurationConflictError, config.commit) |
| | | |
| | | def test_commit_conflict_resolved_with_include(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme(config): |
| | | config.add_view(view2) |
| | | |
| | | config.add_view(view1) |
| | | config.include(includeme) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view1') |
| | | |
| | | def test_commit_conflict_with_two_includes(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme1(config): |
| | | config.add_view(view1) |
| | | |
| | | def includeme2(config): |
| | | config.add_view(view2) |
| | | |
| | | config.include(includeme1) |
| | | config.include(includeme2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'includeme1') |
| | | self.assertEqual(c2, 'includeme2') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | def test_commit_conflict_resolved_with_two_includes_and_local(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view3(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme1(config): |
| | | config.add_view(view1) |
| | | |
| | | def includeme2(config): |
| | | config.add_view(view2) |
| | | |
| | | config.include(includeme1) |
| | | config.include(includeme2) |
| | | config.add_view(view3) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view3') |
| | | |
| | | def test_autocommit_no_conflicts(self): |
| | | from pyramid.renderers import null_renderer |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view3(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.add_view(view1, renderer=null_renderer) |
| | | config.add_view(view2, renderer=null_renderer) |
| | | config.add_view(view3, renderer=null_renderer) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view3') |
| | | |
| | | def test_conflict_set_notfound_view(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.set_notfound_view(view1) |
| | | config.set_notfound_view(view2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'test_conflict_set_notfound_view') |
| | | self.assertEqual(c2, 'test_conflict_set_notfound_view') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | def test_conflict_set_forbidden_view(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.set_forbidden_view(view1) |
| | | config.set_forbidden_view(view2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'test_conflict_set_forbidden_view') |
| | | self.assertEqual(c2, 'test_conflict_set_forbidden_view') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | |
| | | class TestActionState(unittest.TestCase): |
| | | def _makeOne(self): |
| | | from pyramid.config.actions import ActionState |
| | | |
| | | return ActionState() |
| | | |
| | | def test_it(self): |
| | | c = self._makeOne() |
| | | self.assertEqual(c.actions, []) |
| | | |
| | | def test_action_simple(self): |
| | | from . import dummyfactory as f |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(1, f, (1,), {'x': 1}) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {'x': 1}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | c.action(None) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {'x': 1}, |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_includepath(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(None, includepath=('abc',)) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': ('abc',), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_info(self): |
| | | c = self._makeOne() |
| | | c.action(None, info='abc') |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': 'abc', |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_includepath_and_info(self): |
| | | c = self._makeOne() |
| | | c.action(None, includepath=('spec',), info='bleh') |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': ('spec',), |
| | | 'info': 'bleh', |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_order(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(None, order=99999) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 99999, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_introspectables(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | intr = DummyIntrospectable() |
| | | c.action(None, introspectables=(intr,)) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_processSpec(self): |
| | | c = self._makeOne() |
| | | self.assertTrue(c.processSpec('spec')) |
| | | self.assertFalse(c.processSpec('spec')) |
| | | |
| | | def test_execute_actions_tuples(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | (1, f, (1,)), |
| | | (1, f, (11,), {}, ('x',)), |
| | | (2, f, (2,)), |
| | | (None, None), |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [((1,), {}), ((2,), {})]) |
| | | |
| | | def test_execute_actions_dicts(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (11,), |
| | | 'kw': {}, |
| | | 'includepath': ('x',), |
| | | 'order': 0, |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': 2, |
| | | 'callable': f, |
| | | 'args': (2,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': None, |
| | | 'callable': None, |
| | | 'args': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [((1,), {}), ((2,), {})]) |
| | | |
| | | def test_execute_actions_with_introspectables(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | intr = DummyIntrospectable() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | } |
| | | ] |
| | | introspector = object() |
| | | c.execute_actions(introspector=introspector) |
| | | self.assertEqual(output, [((1,), {})]) |
| | | self.assertEqual(intr.registered, [(introspector, None)]) |
| | | |
| | | def test_execute_actions_with_introspectable_no_callable(self): |
| | | c = self._makeOne() |
| | | intr = DummyIntrospectable() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': None, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | } |
| | | ] |
| | | introspector = object() |
| | | c.execute_actions(introspector=introspector) |
| | | self.assertEqual(intr.registered, [(introspector, None)]) |
| | | |
| | | def test_execute_actions_error(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | |
| | | def bad(): |
| | | raise NotImplementedError |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | (1, f, (1,)), |
| | | (1, f, (11,), {}, ('x',)), |
| | | (2, f, (2,)), |
| | | (3, bad, (), {}, (), 'oops'), |
| | | ] |
| | | self.assertRaises(ConfigurationExecutionError, c.execute_actions) |
| | | self.assertEqual(output, [('f', (1,), {}), ('f', (2,), {})]) |
| | | |
| | | def test_reentrant_action(self): |
| | | output = [] |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | c.actions.append((3, g, (8,), {})) |
| | | |
| | | def g(*a, **k): |
| | | output.append(('g', a, k)) |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [('f', (1,), {}), ('g', (8,), {})]) |
| | | |
| | | def test_reentrant_action_with_deferred_discriminator(self): |
| | | # see https://github.com/Pylons/pyramid/issues/2697 |
| | | from pyramid.registry import Deferred |
| | | |
| | | output = [] |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | c.actions.append((4, g, (4,), {}, (), None, 2)) |
| | | |
| | | def g(*a, **k): |
| | | output.append(('g', a, k)) |
| | | |
| | | def h(*a, **k): |
| | | output.append(('h', a, k)) |
| | | |
| | | def discrim(): |
| | | self.assertEqual(output, [('f', (1,), {}), ('g', (2,), {})]) |
| | | return 3 |
| | | |
| | | d = Deferred(discrim) |
| | | c.actions = [ |
| | | (d, h, (3,), {}, (), None, 1), # order 1 |
| | | (1, f, (1,)), # order 0 |
| | | (2, g, (2,)), # order 0 |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual( |
| | | output, |
| | | [ |
| | | ('f', (1,), {}), |
| | | ('g', (2,), {}), |
| | | ('h', (3,), {}), |
| | | ('g', (4,), {}), |
| | | ], |
| | | ) |
| | | |
| | | def test_reentrant_action_error(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((3, g, (8,), {}, (), None, -1)) |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | self.assertRaises(ConfigurationError, c.execute_actions) |
| | | |
| | | def test_reentrant_action_without_clear(self): |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((3, g, (8,))) |
| | | |
| | | def g(*a, **k): |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | c.execute_actions(clear=False) |
| | | self.assertEqual(c.actions, [(1, f, (1,)), (3, g, (8,))]) |
| | | |
| | | def test_executing_conflicting_action_across_orders(self): |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | pass |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,), {}, (), None, -1), (1, g, (2,))] |
| | | self.assertRaises(ConfigurationConflictError, c.execute_actions) |
| | | |
| | | def test_executing_conflicting_action_across_reentrant_orders(self): |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((1, g, (8,))) |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,), {}, (), None, -1)] |
| | | self.assertRaises(ConfigurationConflictError, c.execute_actions) |
| | | |
| | | |
| | | class Test_reentrant_action_functional(unittest.TestCase): |
| | | def _makeConfigurator(self, *arg, **kw): |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator(*arg, **kw) |
| | | return config |
| | | |
| | | def test_functional(self): |
| | | def add_auto_route(config, name, view): |
| | | def register(): |
| | | config.add_view(route_name=name, view=view) |
| | | config.add_route(name, '/' + name) |
| | | |
| | | config.action(('auto route', name), register, order=-30) |
| | | |
| | | config = self._makeConfigurator() |
| | | config.add_directive('add_auto_route', add_auto_route) |
| | | |
| | | def my_view(request): # pragma: no cover |
| | | return request.response |
| | | |
| | | config.add_auto_route('foo', my_view) |
| | | config.commit() |
| | | from pyramid.interfaces import IRoutesMapper |
| | | |
| | | mapper = config.registry.getUtility(IRoutesMapper) |
| | | routes = mapper.get_routes() |
| | | route = routes[0] |
| | | self.assertEqual(len(routes), 1) |
| | | self.assertEqual(route.name, 'foo') |
| | | self.assertEqual(route.path, '/foo') |
| | | |
| | | def test_deferred_discriminator(self): |
| | | # see https://github.com/Pylons/pyramid/issues/2697 |
| | | from pyramid.config import PHASE0_CONFIG |
| | | |
| | | config = self._makeConfigurator() |
| | | |
| | | def deriver(view, info): |
| | | return view |
| | | |
| | | deriver.options = ('foo',) |
| | | config.add_view_deriver(deriver, 'foo_view') |
| | | # add_view uses a deferred discriminator and will fail if executed |
| | | # prior to add_view_deriver executing its action |
| | | config.add_view(lambda r: r.response, name='', foo=1) |
| | | |
| | | def dummy_action(): |
| | | # trigger a re-entrant action |
| | | config.action(None, lambda: None) |
| | | |
| | | config.action(None, dummy_action, order=PHASE0_CONFIG) |
| | | config.commit() |
| | | |
| | | |
| | | class Test_resolveConflicts(unittest.TestCase): |
| | | def _callFUT(self, actions): |
| | | from pyramid.config.actions import resolveConflicts |
| | | |
| | | return resolveConflicts(actions) |
| | | |
| | | def test_it_success_tuples(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (1,), {}, (), 'first'), |
| | | (1, f, (2,), {}, ('x',), 'second'), |
| | | (1, f, (3,), {}, ('y',), 'third'), |
| | | (4, f, (4,), {}, ('y',), 'should be last', 99999), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (5,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'should be last', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_it_success_dicts(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (1,), {}, (), 'first'), |
| | | (1, f, (2,), {}, ('x',), 'second'), |
| | | (1, f, (3,), {}, ('y',), 'third'), |
| | | (4, f, (4,), {}, ('y',), 'should be last', 99999), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (5,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'should be last', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_it_conflict(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (2,), {}, ('x',), 'eek'), # will conflict |
| | | (1, f, (3,), {}, ('y',), 'ack'), # will conflict |
| | | (4, f, (4,), {}, ('y',)), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | self.assertRaises(ConfigurationConflictError, list, result) |
| | | |
| | | def test_it_with_actions_grouped_by_order(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), # X |
| | | (1, f, (1,), {}, (), 'third', 10), # X |
| | | (1, f, (2,), {}, ('x',), 'fourth', 10), |
| | | (1, f, (3,), {}, ('y',), 'fifth', 10), |
| | | (2, f, (1,), {}, (), 'sixth', 10), # X |
| | | (3, f, (1,), {}, (), 'seventh', 10), # X |
| | | (5, f, (4,), {}, ('y',), 'eighth', 99999), # X |
| | | (4, f, (3,), {}, (), 'first', 5), # X |
| | | (4, f, (5,), {}, ('y',), 'second', 5), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual(len(result), 6) |
| | | # resolved actions should be grouped by (order, i) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': (), |
| | | 'order': 5, |
| | | }, |
| | | { |
| | | 'info': 'third', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'sixth', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 2, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'seventh', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'eighth', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 5, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_override_success_across_orders(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (1, f, (2,), {}, ('x',), 'eek', 0), |
| | | (1, f, (3,), {}, ('x', 'y'), 'ack', 10), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': 'eek', |
| | | 'args': (2,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': ('x',), |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_conflicts_across_orders(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (1, f, (2,), {}, ('x', 'y'), 'eek', 0), |
| | | (1, f, (3,), {}, ('x'), 'ack', 10), |
| | | ] |
| | | ) |
| | | self.assertRaises(ConfigurationConflictError, list, result) |
| | | |
| | | |
| | | class TestActionInfo(unittest.TestCase): |
| | | def _getTargetClass(self): |
| | | from pyramid.config.actions import ActionInfo |
| | | |
| | | return ActionInfo |
| | | |
| | | def _makeOne(self, filename, lineno, function, linerepr): |
| | | return self._getTargetClass()(filename, lineno, function, linerepr) |
| | | |
| | | def test_class_conforms(self): |
| | | from zope.interface.verify import verifyClass |
| | | from pyramid.interfaces import IActionInfo |
| | | |
| | | verifyClass(IActionInfo, self._getTargetClass()) |
| | | |
| | | def test_instance_conforms(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import IActionInfo |
| | | |
| | | verifyObject(IActionInfo, self._makeOne('f', 0, 'f', 'f')) |
| | | |
| | | def test_ctor(self): |
| | | inst = self._makeOne('filename', 10, 'function', 'src') |
| | | self.assertEqual(inst.file, 'filename') |
| | | self.assertEqual(inst.line, 10) |
| | | self.assertEqual(inst.function, 'function') |
| | | self.assertEqual(inst.src, 'src') |
| | | |
| | | def test___str__(self): |
| | | inst = self._makeOne('filename', 0, 'function', ' linerepr ') |
| | | self.assertEqual( |
| | | str(inst), "Line 0 of file filename:\n linerepr " |
| | | ) |
| | | |
| | | |
| | | def _conflictFunctions(e): |
| | | conflicts = e._conflicts.values() |
| | | for conflict in conflicts: |
| | | for confinst in conflict: |
| | | yield confinst.function |
| | | |
| | | |
| | | class DummyActionState(object): |
| | | autocommit = False |
| | | info = '' |
| | | |
| | | def __init__(self): |
| | | self.actions = [] |
| | | |
| | | def action(self, *arg, **kw): |
| | | self.actions.append((arg, kw)) |
| | | |
| | | |
| | | class DummyIntrospectable(object): |
| | | def __init__(self): |
| | | self.registered = [] |
| | | |
| | | def register(self, introspector, action_info): |
| | | self.registered.append((introspector, action_info)) |
| | |
| | | import os |
| | | import unittest |
| | | from zope.interface import Interface |
| | | from zope.interface import implementer |
| | | |
| | | from pyramid.compat import im_func |
| | | from pyramid.testing import skip_on |
| | |
| | | from . import dummy_include |
| | | from . import dummy_extend |
| | | from . import dummy_extend2 |
| | | from . import IDummy |
| | | from . import DummyContext |
| | | |
| | | from pyramid.exceptions import ConfigurationExecutionError |
| | |
| | | name='', |
| | | exception_view=False, |
| | | ): |
| | | from zope.interface import Interface |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.interfaces import IView |
| | | from pyramid.interfaces import IViewClassifier |
| | | from pyramid.interfaces import IExceptionViewClassifier |
| | |
| | | classifier = IExceptionViewClassifier |
| | | else: |
| | | classifier = IViewClassifier |
| | | if ctx_iface is None: |
| | | ctx_iface = Interface |
| | | if request_iface is None: |
| | | request_iface = IRequest |
| | | return config.registry.adapters.lookup( |
| | | (classifier, request_iface, ctx_iface), |
| | | IView, |
| | |
| | | def test_ctor_httpexception_view_default(self): |
| | | from pyramid.interfaces import IExceptionResponse |
| | | from pyramid.httpexceptions import default_exceptionresponse_view |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | config = self._makeOne() |
| | | view = self._getViewCallable( |
| | |
| | | |
| | | def test_ctor_exceptionresponse_view_None(self): |
| | | from pyramid.interfaces import IExceptionResponse |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | config = self._makeOne(exceptionresponse_view=None) |
| | | view = self._getViewCallable( |
| | |
| | | |
| | | def test_ctor_exceptionresponse_view_custom(self): |
| | | from pyramid.interfaces import IExceptionResponse |
| | | from pyramid.interfaces import IRequest |
| | | |
| | | def exceptionresponse_view(context, request): # pragma: no cover |
| | | pass |
| | |
| | | def test_setup_registry_explicit_notfound_trumps_iexceptionresponse(self): |
| | | from pyramid.renderers import null_renderer |
| | | from zope.interface import implementedBy |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.httpexceptions import HTTPNotFound |
| | | from pyramid.registry import Registry |
| | | |
| | |
| | | config.include(include) |
| | | self.assertTrue(stack[0] is config.registry) |
| | | |
| | | def test_action_branching_kw_is_None(self): |
| | | config = self._makeOne(autocommit=True) |
| | | self.assertEqual(config.action('discrim'), None) |
| | | |
| | | def test_action_branching_kw_is_not_None(self): |
| | | config = self._makeOne(autocommit=True) |
| | | self.assertEqual(config.action('discrim', kw={'a': 1}), None) |
| | | |
| | | def test_action_autocommit_with_introspectables(self): |
| | | from pyramid.config.util import ActionInfo |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(len(intr.registered), 1) |
| | | self.assertEqual(intr.registered[0][0], config.introspector) |
| | | self.assertEqual(intr.registered[0][1].__class__, ActionInfo) |
| | | |
| | | def test_action_autocommit_with_introspectables_introspection_off(self): |
| | | config = self._makeOne(autocommit=True) |
| | | config.introspection = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(len(intr.registered), 0) |
| | | |
| | | def test_action_branching_nonautocommit_with_config_info(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = 'abc' |
| | | state = DummyActionState() |
| | | state.autocommit = False |
| | | config.action_state = state |
| | | config.action('discrim', kw={'a': 1}) |
| | | self.assertEqual( |
| | | state.actions, |
| | | [ |
| | | ( |
| | | (), |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': 'discrim', |
| | | 'includepath': (), |
| | | 'info': 'abc', |
| | | 'introspectables': (), |
| | | 'kw': {'a': 1}, |
| | | 'order': 0, |
| | | }, |
| | | ) |
| | | ], |
| | | ) |
| | | |
| | | def test_action_branching_nonautocommit_without_config_info(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = ['z'] |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | config.action('discrim', kw={'a': 1}) |
| | | self.assertEqual( |
| | | state.actions, |
| | | [ |
| | | ( |
| | | (), |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': 'discrim', |
| | | 'includepath': (), |
| | | 'info': 'z', |
| | | 'introspectables': (), |
| | | 'kw': {'a': 1}, |
| | | 'order': 0, |
| | | }, |
| | | ) |
| | | ], |
| | | ) |
| | | |
| | | def test_action_branching_nonautocommit_with_introspectables(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = [] |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(state.actions[0][1]['introspectables'], (intr,)) |
| | | |
| | | def test_action_nonautocommit_with_introspectables_introspection_off(self): |
| | | config = self._makeOne(autocommit=False) |
| | | config.info = '' |
| | | config._ainfo = [] |
| | | config.introspection = False |
| | | state = DummyActionState() |
| | | config.action_state = state |
| | | state.autocommit = False |
| | | intr = DummyIntrospectable() |
| | | config.action('discrim', introspectables=(intr,)) |
| | | self.assertEqual(state.actions[0][1]['introspectables'], ()) |
| | | |
| | | def test_scan_integration(self): |
| | | from zope.interface import alsoProvides |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.view import render_view_to_response |
| | | import tests.test_config.pkgs.scannable as package |
| | | |
| | |
| | | |
| | | def test_scan_integration_with_ignore(self): |
| | | from zope.interface import alsoProvides |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.view import render_view_to_response |
| | | import tests.test_config.pkgs.scannable as package |
| | | |
| | |
| | | |
| | | def test_scan_integration_dottedname_package(self): |
| | | from zope.interface import alsoProvides |
| | | from pyramid.interfaces import IRequest |
| | | from pyramid.view import render_view_to_response |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | |
| | | self.assertNotEqual(sm, '123') |
| | | finally: |
| | | getSiteManager.reset() |
| | | |
| | | def test_commit_conflict_simple(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.add_view(view1) |
| | | config.add_view(view2) |
| | | self.assertRaises(ConfigurationConflictError, config.commit) |
| | | |
| | | def test_commit_conflict_resolved_with_include(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme(config): |
| | | config.add_view(view2) |
| | | |
| | | config.add_view(view1) |
| | | config.include(includeme) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view1') |
| | | |
| | | def test_commit_conflict_with_two_includes(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme1(config): |
| | | config.add_view(view1) |
| | | |
| | | def includeme2(config): |
| | | config.add_view(view2) |
| | | |
| | | config.include(includeme1) |
| | | config.include(includeme2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'includeme1') |
| | | self.assertEqual(c2, 'includeme2') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | def test_commit_conflict_resolved_with_two_includes_and_local(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view3(request): # pragma: no cover |
| | | pass |
| | | |
| | | def includeme1(config): |
| | | config.add_view(view1) |
| | | |
| | | def includeme2(config): |
| | | config.add_view(view2) |
| | | |
| | | config.include(includeme1) |
| | | config.include(includeme2) |
| | | config.add_view(view3) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view3') |
| | | |
| | | def test_autocommit_no_conflicts(self): |
| | | from pyramid.renderers import null_renderer |
| | | |
| | | config = self._makeOne(autocommit=True) |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view3(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.add_view(view1, renderer=null_renderer) |
| | | config.add_view(view2, renderer=null_renderer) |
| | | config.add_view(view3, renderer=null_renderer) |
| | | config.commit() |
| | | registeredview = self._getViewCallable(config) |
| | | self.assertEqual(registeredview.__name__, 'view3') |
| | | |
| | | def test_conflict_set_notfound_view(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.set_notfound_view(view1) |
| | | config.set_notfound_view(view2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'test_conflict_set_notfound_view') |
| | | self.assertEqual(c2, 'test_conflict_set_notfound_view') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | def test_conflict_set_forbidden_view(self): |
| | | config = self._makeOne() |
| | | |
| | | def view1(request): # pragma: no cover |
| | | pass |
| | | |
| | | def view2(request): # pragma: no cover |
| | | pass |
| | | |
| | | config.set_forbidden_view(view1) |
| | | config.set_forbidden_view(view2) |
| | | try: |
| | | config.commit() |
| | | except ConfigurationConflictError as why: |
| | | c1, c2 = _conflictFunctions(why) |
| | | self.assertEqual(c1, 'test_conflict_set_forbidden_view') |
| | | self.assertEqual(c2, 'test_conflict_set_forbidden_view') |
| | | else: # pragma: no cover |
| | | raise AssertionError |
| | | |
| | | def test___getattr__missing_when_directives_exist(self): |
| | | config = self._makeOne() |
| | |
| | | ) |
| | | |
| | | |
| | | class TestActionState(unittest.TestCase): |
| | | def _makeOne(self): |
| | | from pyramid.config import ActionState |
| | | |
| | | return ActionState() |
| | | |
| | | def test_it(self): |
| | | c = self._makeOne() |
| | | self.assertEqual(c.actions, []) |
| | | |
| | | def test_action_simple(self): |
| | | from . import dummyfactory as f |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(1, f, (1,), {'x': 1}) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {'x': 1}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | c.action(None) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {'x': 1}, |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_includepath(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(None, includepath=('abc',)) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': ('abc',), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_info(self): |
| | | c = self._makeOne() |
| | | c.action(None, info='abc') |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': 'abc', |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_includepath_and_info(self): |
| | | c = self._makeOne() |
| | | c.action(None, includepath=('spec',), info='bleh') |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': ('spec',), |
| | | 'info': 'bleh', |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_order(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | c.action(None, order=99999) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'order': 99999, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_action_with_introspectables(self): |
| | | c = self._makeOne() |
| | | c.actions = [] |
| | | intr = DummyIntrospectable() |
| | | c.action(None, introspectables=(intr,)) |
| | | self.assertEqual( |
| | | c.actions, |
| | | [ |
| | | { |
| | | 'args': (), |
| | | 'callable': None, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_processSpec(self): |
| | | c = self._makeOne() |
| | | self.assertTrue(c.processSpec('spec')) |
| | | self.assertFalse(c.processSpec('spec')) |
| | | |
| | | def test_execute_actions_tuples(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | (1, f, (1,)), |
| | | (1, f, (11,), {}, ('x',)), |
| | | (2, f, (2,)), |
| | | (None, None), |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [((1,), {}), ((2,), {})]) |
| | | |
| | | def test_execute_actions_dicts(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (11,), |
| | | 'kw': {}, |
| | | 'includepath': ('x',), |
| | | 'order': 0, |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': 2, |
| | | 'callable': f, |
| | | 'args': (2,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | { |
| | | 'discriminator': None, |
| | | 'callable': None, |
| | | 'args': (), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (), |
| | | }, |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [((1,), {}), ((2,), {})]) |
| | | |
| | | def test_execute_actions_with_introspectables(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append((a, k)) |
| | | |
| | | c = self._makeOne() |
| | | intr = DummyIntrospectable() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': f, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | } |
| | | ] |
| | | introspector = object() |
| | | c.execute_actions(introspector=introspector) |
| | | self.assertEqual(output, [((1,), {})]) |
| | | self.assertEqual(intr.registered, [(introspector, None)]) |
| | | |
| | | def test_execute_actions_with_introspectable_no_callable(self): |
| | | c = self._makeOne() |
| | | intr = DummyIntrospectable() |
| | | c.actions = [ |
| | | { |
| | | 'discriminator': 1, |
| | | 'callable': None, |
| | | 'args': (1,), |
| | | 'kw': {}, |
| | | 'order': 0, |
| | | 'includepath': (), |
| | | 'info': None, |
| | | 'introspectables': (intr,), |
| | | } |
| | | ] |
| | | introspector = object() |
| | | c.execute_actions(introspector=introspector) |
| | | self.assertEqual(intr.registered, [(introspector, None)]) |
| | | |
| | | def test_execute_actions_error(self): |
| | | output = [] |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | |
| | | def bad(): |
| | | raise NotImplementedError |
| | | |
| | | c = self._makeOne() |
| | | c.actions = [ |
| | | (1, f, (1,)), |
| | | (1, f, (11,), {}, ('x',)), |
| | | (2, f, (2,)), |
| | | (3, bad, (), {}, (), 'oops'), |
| | | ] |
| | | self.assertRaises(ConfigurationExecutionError, c.execute_actions) |
| | | self.assertEqual(output, [('f', (1,), {}), ('f', (2,), {})]) |
| | | |
| | | def test_reentrant_action(self): |
| | | output = [] |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | c.actions.append((3, g, (8,), {})) |
| | | |
| | | def g(*a, **k): |
| | | output.append(('g', a, k)) |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | c.execute_actions() |
| | | self.assertEqual(output, [('f', (1,), {}), ('g', (8,), {})]) |
| | | |
| | | def test_reentrant_action_with_deferred_discriminator(self): |
| | | # see https://github.com/Pylons/pyramid/issues/2697 |
| | | from pyramid.registry import Deferred |
| | | |
| | | output = [] |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | output.append(('f', a, k)) |
| | | c.actions.append((4, g, (4,), {}, (), None, 2)) |
| | | |
| | | def g(*a, **k): |
| | | output.append(('g', a, k)) |
| | | |
| | | def h(*a, **k): |
| | | output.append(('h', a, k)) |
| | | |
| | | def discrim(): |
| | | self.assertEqual(output, [('f', (1,), {}), ('g', (2,), {})]) |
| | | return 3 |
| | | |
| | | d = Deferred(discrim) |
| | | c.actions = [ |
| | | (d, h, (3,), {}, (), None, 1), # order 1 |
| | | (1, f, (1,)), # order 0 |
| | | (2, g, (2,)), # order 0 |
| | | ] |
| | | c.execute_actions() |
| | | self.assertEqual( |
| | | output, |
| | | [ |
| | | ('f', (1,), {}), |
| | | ('g', (2,), {}), |
| | | ('h', (3,), {}), |
| | | ('g', (4,), {}), |
| | | ], |
| | | ) |
| | | |
| | | def test_reentrant_action_error(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((3, g, (8,), {}, (), None, -1)) |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | self.assertRaises(ConfigurationError, c.execute_actions) |
| | | |
| | | def test_reentrant_action_without_clear(self): |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((3, g, (8,))) |
| | | |
| | | def g(*a, **k): |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,))] |
| | | c.execute_actions(clear=False) |
| | | self.assertEqual(c.actions, [(1, f, (1,)), (3, g, (8,))]) |
| | | |
| | | def test_executing_conflicting_action_across_orders(self): |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | pass |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,), {}, (), None, -1), (1, g, (2,))] |
| | | self.assertRaises(ConfigurationConflictError, c.execute_actions) |
| | | |
| | | def test_executing_conflicting_action_across_reentrant_orders(self): |
| | | from pyramid.exceptions import ConfigurationConflictError |
| | | |
| | | c = self._makeOne() |
| | | |
| | | def f(*a, **k): |
| | | c.actions.append((1, g, (8,))) |
| | | |
| | | def g(*a, **k): # pragma: no cover |
| | | pass |
| | | |
| | | c.actions = [(1, f, (1,), {}, (), None, -1)] |
| | | self.assertRaises(ConfigurationConflictError, c.execute_actions) |
| | | |
| | | |
| | | class Test_reentrant_action_functional(unittest.TestCase): |
| | | def _makeConfigurator(self, *arg, **kw): |
| | | from pyramid.config import Configurator |
| | | |
| | | config = Configurator(*arg, **kw) |
| | | return config |
| | | |
| | | def test_functional(self): |
| | | def add_auto_route(config, name, view): |
| | | def register(): |
| | | config.add_view(route_name=name, view=view) |
| | | config.add_route(name, '/' + name) |
| | | |
| | | config.action(('auto route', name), register, order=-30) |
| | | |
| | | config = self._makeConfigurator() |
| | | config.add_directive('add_auto_route', add_auto_route) |
| | | |
| | | def my_view(request): # pragma: no cover |
| | | return request.response |
| | | |
| | | config.add_auto_route('foo', my_view) |
| | | config.commit() |
| | | from pyramid.interfaces import IRoutesMapper |
| | | |
| | | mapper = config.registry.getUtility(IRoutesMapper) |
| | | routes = mapper.get_routes() |
| | | route = routes[0] |
| | | self.assertEqual(len(routes), 1) |
| | | self.assertEqual(route.name, 'foo') |
| | | self.assertEqual(route.path, '/foo') |
| | | |
| | | def test_deferred_discriminator(self): |
| | | # see https://github.com/Pylons/pyramid/issues/2697 |
| | | from pyramid.config import PHASE0_CONFIG |
| | | |
| | | config = self._makeConfigurator() |
| | | |
| | | def deriver(view, info): |
| | | return view |
| | | |
| | | deriver.options = ('foo',) |
| | | config.add_view_deriver(deriver, 'foo_view') |
| | | # add_view uses a deferred discriminator and will fail if executed |
| | | # prior to add_view_deriver executing its action |
| | | config.add_view(lambda r: r.response, name='', foo=1) |
| | | |
| | | def dummy_action(): |
| | | # trigger a re-entrant action |
| | | config.action(None, lambda: None) |
| | | |
| | | config.action(None, dummy_action, order=PHASE0_CONFIG) |
| | | config.commit() |
| | | |
| | | |
| | | class Test_resolveConflicts(unittest.TestCase): |
| | | def _callFUT(self, actions): |
| | | from pyramid.config import resolveConflicts |
| | | |
| | | return resolveConflicts(actions) |
| | | |
| | | def test_it_success_tuples(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (1,), {}, (), 'first'), |
| | | (1, f, (2,), {}, ('x',), 'second'), |
| | | (1, f, (3,), {}, ('y',), 'third'), |
| | | (4, f, (4,), {}, ('y',), 'should be last', 99999), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (5,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'should be last', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_it_success_dicts(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (1,), {}, (), 'first'), |
| | | (1, f, (2,), {}, ('x',), 'second'), |
| | | (1, f, (3,), {}, ('y',), 'third'), |
| | | (4, f, (4,), {}, ('y',), 'should be last', 99999), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': None, |
| | | 'args': (5,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': ('y',), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'should be last', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_it_conflict(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), |
| | | (1, f, (2,), {}, ('x',), 'eek'), # will conflict |
| | | (1, f, (3,), {}, ('y',), 'ack'), # will conflict |
| | | (4, f, (4,), {}, ('y',)), |
| | | (3, f, (3,), {}, ('y',)), |
| | | (None, f, (5,), {}, ('y',)), |
| | | ] |
| | | ) |
| | | self.assertRaises(ConfigurationConflictError, list, result) |
| | | |
| | | def test_it_with_actions_grouped_by_order(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (None, f), # X |
| | | (1, f, (1,), {}, (), 'third', 10), # X |
| | | (1, f, (2,), {}, ('x',), 'fourth', 10), |
| | | (1, f, (3,), {}, ('y',), 'fifth', 10), |
| | | (2, f, (1,), {}, (), 'sixth', 10), # X |
| | | (3, f, (1,), {}, (), 'seventh', 10), # X |
| | | (5, f, (4,), {}, ('y',), 'eighth', 99999), # X |
| | | (4, f, (3,), {}, (), 'first', 5), # X |
| | | (4, f, (5,), {}, ('y',), 'second', 5), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual(len(result), 6) |
| | | # resolved actions should be grouped by (order, i) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': None, |
| | | 'args': (), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': None, |
| | | 'includepath': (), |
| | | 'order': 0, |
| | | }, |
| | | { |
| | | 'info': 'first', |
| | | 'args': (3,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 4, |
| | | 'includepath': (), |
| | | 'order': 5, |
| | | }, |
| | | { |
| | | 'info': 'third', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'sixth', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 2, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'seventh', |
| | | 'args': (1,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 3, |
| | | 'includepath': (), |
| | | 'order': 10, |
| | | }, |
| | | { |
| | | 'info': 'eighth', |
| | | 'args': (4,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 5, |
| | | 'includepath': ('y',), |
| | | 'order': 99999, |
| | | }, |
| | | ], |
| | | ) |
| | | |
| | | def test_override_success_across_orders(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (1, f, (2,), {}, ('x',), 'eek', 0), |
| | | (1, f, (3,), {}, ('x', 'y'), 'ack', 10), |
| | | ] |
| | | ) |
| | | result = list(result) |
| | | self.assertEqual( |
| | | result, |
| | | [ |
| | | { |
| | | 'info': 'eek', |
| | | 'args': (2,), |
| | | 'callable': f, |
| | | 'introspectables': (), |
| | | 'kw': {}, |
| | | 'discriminator': 1, |
| | | 'includepath': ('x',), |
| | | 'order': 0, |
| | | } |
| | | ], |
| | | ) |
| | | |
| | | def test_conflicts_across_orders(self): |
| | | from . import dummyfactory as f |
| | | |
| | | result = self._callFUT( |
| | | [ |
| | | (1, f, (2,), {}, ('x', 'y'), 'eek', 0), |
| | | (1, f, (3,), {}, ('x'), 'ack', 10), |
| | | ] |
| | | ) |
| | | self.assertRaises(ConfigurationConflictError, list, result) |
| | | |
| | | |
| | | class TestGlobalRegistriesIntegration(unittest.TestCase): |
| | | def setUp(self): |
| | | from pyramid.config import global_registries |
| | |
| | | self.popped = True |
| | | |
| | | |
| | | @implementer(IDummy) |
| | | class DummyEvent: |
| | | pass |
| | | |
| | | |
| | | class DummyRegistry(object): |
| | | def __init__(self, adaptation=None, util=None): |
| | | self.utilities = [] |
| | |
| | | |
| | | def queryUtility(self, *arg, **kw): |
| | | return self.util |
| | | |
| | | |
| | | class IOther(Interface): |
| | | pass |
| | | |
| | | |
| | | def _conflictFunctions(e): |
| | | conflicts = e._conflicts.values() |
| | | for conflict in conflicts: |
| | | for confinst in conflict: |
| | | yield confinst.function |
| | | |
| | | |
| | | class DummyActionState(object): |
| | | autocommit = False |
| | | info = '' |
| | | |
| | | def __init__(self): |
| | | self.actions = [] |
| | | |
| | | def action(self, *arg, **kw): |
| | | self.actions.append((arg, kw)) |
| | | |
| | | |
| | | class DummyIntrospectable(object): |
| | | def __init__(self): |
| | | self.registered = [] |
| | | |
| | | def register(self, introspector, action_info): |
| | | self.registered.append((introspector, action_info)) |
| | | |
| | | |
| | | class DummyPredicate(object): |
File was renamed from tests/test_config/test_util.py |
| | |
| | | from pyramid.compat import text_ |
| | | |
| | | |
| | | class TestActionInfo(unittest.TestCase): |
| | | def _getTargetClass(self): |
| | | from pyramid.config.util import ActionInfo |
| | | |
| | | return ActionInfo |
| | | |
| | | def _makeOne(self, filename, lineno, function, linerepr): |
| | | return self._getTargetClass()(filename, lineno, function, linerepr) |
| | | |
| | | def test_class_conforms(self): |
| | | from zope.interface.verify import verifyClass |
| | | from pyramid.interfaces import IActionInfo |
| | | |
| | | verifyClass(IActionInfo, self._getTargetClass()) |
| | | |
| | | def test_instance_conforms(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import IActionInfo |
| | | |
| | | verifyObject(IActionInfo, self._makeOne('f', 0, 'f', 'f')) |
| | | |
| | | def test_ctor(self): |
| | | inst = self._makeOne('filename', 10, 'function', 'src') |
| | | self.assertEqual(inst.file, 'filename') |
| | | self.assertEqual(inst.line, 10) |
| | | self.assertEqual(inst.function, 'function') |
| | | self.assertEqual(inst.src, 'src') |
| | | |
| | | def test___str__(self): |
| | | inst = self._makeOne('filename', 0, 'function', ' linerepr ') |
| | | self.assertEqual( |
| | | str(inst), "Line 0 of file filename:\n linerepr " |
| | | ) |
| | | |
| | | |
| | | class TestPredicateList(unittest.TestCase): |
| | | def _makeOne(self): |
| | | from pyramid.config.util import PredicateList |
| | | from pyramid.config.predicates import PredicateList |
| | | from pyramid import predicates |
| | | |
| | | inst = PredicateList() |
| | |
| | | self.assertTrue(order1 < order2) |
| | | |
| | | def test_ordering_number_of_predicates(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | order1, _, _ = self._callFUT( |
| | | xhr='xhr', |
| | |
| | | self.assertTrue(order12 > order10) |
| | | |
| | | def test_ordering_importance_of_predicates(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | order1, _, _ = self._callFUT(xhr='xhr') |
| | | order2, _, _ = self._callFUT(request_method='request_method') |
| | |
| | | self.assertTrue(order9 > order10) |
| | | |
| | | def test_ordering_importance_and_number(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | order1, _, _ = self._callFUT( |
| | | xhr='xhr', request_method='request_method' |
| | |
| | | self.assertTrue(order1 > order2) |
| | | |
| | | def test_different_custom_predicates_with_same_hash(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | class PredicateWithHash(object): |
| | | def __hash__(self): |
| | |
| | | ) |
| | | |
| | | def test_custom_predicates_can_affect_traversal(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | def custom(info, request): |
| | | m = info['match'] |
| | |
| | | ) |
| | | |
| | | def test_predicate_text_is_correct(self): |
| | | from pyramid.config.util import predvalseq |
| | | from pyramid.config.predicates import predvalseq |
| | | |
| | | _, predicates, _ = self._callFUT( |
| | | xhr='xhr', |
| | |
| | | self.assertEqual(predicates[2](None, request), True) |
| | | |
| | | |
| | | class TestDeprecatedPredicates(unittest.TestCase): |
| | | def test_it(self): |
| | | import warnings |
| | | |
| | | with warnings.catch_warnings(record=True) as w: |
| | | warnings.filterwarnings('always') |
| | | from pyramid.config.predicates import XHRPredicate # noqa: F401 |
| | | |
| | | self.assertEqual(len(w), 1) |
| | | |
| | | |
| | | class Test_sort_accept_offers(unittest.TestCase): |
| | | def _callFUT(self, offers, order=None): |
| | | from pyramid.config.util import sort_accept_offers |
| | | from pyramid.config.predicates import sort_accept_offers |
| | | |
| | | return sort_accept_offers(offers, order) |
| | | |
| | |
| | | |
| | | def test_testing_add_subscriber_dottedname(self): |
| | | config = self._makeOne(autocommit=True) |
| | | L = config.testing_add_subscriber('tests.test_config.test_init.IDummy') |
| | | L = config.testing_add_subscriber('tests.test_config.IDummy') |
| | | event = DummyEvent() |
| | | config.registry.notify(event) |
| | | self.assertEqual(len(L), 1) |
| | |
| | | self.assertEqual(wrapper(None, request), 'OK') |
| | | |
| | | def test_add_view_default_phash_overrides_default_phash(self): |
| | | from pyramid.config.util import DEFAULT_PHASH |
| | | from pyramid.config.predicates import DEFAULT_PHASH |
| | | from pyramid.renderers import null_renderer |
| | | from zope.interface import Interface |
| | | from pyramid.interfaces import IRequest |
| | |
| | | self.assertEqual(wrapper(None, request), 'OK') |
| | | |
| | | def test_add_view_exc_default_phash_overrides_default_phash(self): |
| | | from pyramid.config.util import DEFAULT_PHASH |
| | | from pyramid.config.predicates import DEFAULT_PHASH |
| | | from pyramid.renderers import null_renderer |
| | | from zope.interface import implementedBy |
| | | from pyramid.interfaces import IRequest |
| | |
| | | self.assertEqual(result(None, None), response) |
| | | |
| | | def test_attr_wrapped_view_branching_default_phash(self): |
| | | from pyramid.config.util import DEFAULT_PHASH |
| | | from pyramid.config.predicates import DEFAULT_PHASH |
| | | |
| | | def view(context, request): # pragma: no cover |
| | | pass |