import functools from hashlib import md5 import traceback from webob.acceptparse import Accept from zope.interface import implementer from pyramid.compat import ( bytes_, is_nonstr_iter ) from pyramid.interfaces import IActionInfo from pyramid.exceptions import ConfigurationError from pyramid.predicates import Notted from pyramid.registry import predvalseq from pyramid.util import ( TopologicalSorter, takes_one_arg, ) TopologicalSorter = TopologicalSorter # support bw-compat imports takes_one_arg = takes_one_arg # support bw-compat imports @implementer(IActionInfo) class ActionInfo(object): def __init__(self, file, line, function, src): self.file = file self.line = line self.function = function self.src = src def __str__(self): srclines = self.src.split('\n') src = '\n'.join(' %s' % x for x in srclines) return 'Line %s of file %s:\n%s' % (self.line, self.file, src) def action_method(wrapped): """ Wrapper to provide the right conflict info report data when a method that calls Configurator.action calls another that does the same. Not a documented API but used by some external systems.""" def wrapper(self, *arg, **kw): if self._ainfo is None: self._ainfo = [] info = kw.pop('_info', None) # backframes for outer decorators to actionmethods backframes = kw.pop('_backframes', 0) + 2 if is_nonstr_iter(info) and len(info) == 4: # _info permitted as extract_stack tuple info = ActionInfo(*info) if info is None: try: f = traceback.extract_stack(limit=4) # Work around a Python 3.5 issue whereby it would insert an # extra stack frame. This should no longer be necessary in # Python 3.5.1 last_frame = ActionInfo(*f[-1]) if last_frame.function == 'extract_stack': # pragma: no cover f.pop() info = ActionInfo(*f[-backframes]) except Exception: # pragma: no cover info = ActionInfo(None, 0, '', '') self._ainfo.append(info) try: result = wrapped(self, *arg, **kw) finally: self._ainfo.pop() return result if hasattr(wrapped, '__name__'): functools.update_wrapper(wrapper, wrapped) wrapper.__docobj__ = wrapped return wrapper MAX_ORDER = 1 << 30 DEFAULT_PHASH = md5().hexdigest() class not_(object): """ You can invert the meaning of any predicate value by wrapping it in a call to :class:`pyramid.config.not_`. .. code-block:: python :linenos: from pyramid.config import not_ config.add_view( 'mypackage.views.my_view', route_name='ok', request_method=not_('POST') ) The above example will ensure that the view is called if the request method is *not* ``POST``, at least if no other view is more specific. This technique of wrapping a predicate value in ``not_`` can be used anywhere predicate values are accepted: - :meth:`pyramid.config.Configurator.add_view` - :meth:`pyramid.config.Configurator.add_route` - :meth:`pyramid.config.Configurator.add_subscriber` - :meth:`pyramid.view.view_config` - :meth:`pyramid.events.subscriber` .. versionadded:: 1.5 """ def __init__(self, value): self.value = value # under = after # over = before class PredicateList(object): def __init__(self): self.sorter = TopologicalSorter() self.last_added = None def add(self, name, factory, weighs_more_than=None, weighs_less_than=None): # Predicates should be added to a predicate list in (presumed) # computation expense order. ## if weighs_more_than is None and weighs_less_than is None: ## weighs_more_than = self.last_added or FIRST ## weighs_less_than = LAST self.last_added = name self.sorter.add( name, factory, after=weighs_more_than, before=weighs_less_than, ) def names(self): # Return the list of valid predicate names. return self.sorter.names def make(self, config, **kw): # Given a configurator and a list of keywords, a predicate list is # computed. Elsewhere in the code, we evaluate predicates using a # generator expression. All predicates associated with a view or # route must evaluate true for the view or route to "match" during a # request. The fastest predicate should be evaluated first, then the # next fastest, and so on, as if one returns false, the remainder of # the predicates won't need to be evaluated. # # While we compute predicates, we also compute a predicate hash (aka # phash) that can be used by a caller to identify identical predicate # lists. ordered = self.sorter.sorted() phash = md5() weights = [] preds = [] for n, (name, predicate_factory) in enumerate(ordered): vals = kw.pop(name, None) if vals is None: # XXX should this be a sentinel other than None? continue if not isinstance(vals, predvalseq): vals = (vals,) for val in vals: realval = val notted = False if isinstance(val, not_): realval = val.value notted = True pred = predicate_factory(realval, config) if notted: pred = Notted(pred) hashes = pred.phash() if not is_nonstr_iter(hashes): hashes = [hashes] for h in hashes: phash.update(bytes_(h)) weights.append(1 << n + 1) preds.append(pred) if kw: from difflib import get_close_matches closest = [] names = [ name for name, _ in ordered ] for name in kw: closest.extend(get_close_matches(name, names, 3)) raise ConfigurationError( 'Unknown predicate values: %r (did you mean %s)' % (kw, ','.join(closest)) ) # A "order" is computed for the predicate list. An order is # a scoring. # # Each predicate is associated with a weight value. The weight of a # predicate symbolizes the relative potential "importance" of the # predicate to all other predicates. A larger weight indicates # greater importance. # # All weights for a given predicate list are bitwise ORed together # to create a "score"; this score is then subtracted from # MAX_ORDER and divided by an integer representing the number of # predicates+1 to determine the order. # # For views, the order represents the ordering in which a "multiview" # ( a collection of views that share the same context/request/name # triad but differ in other ways via predicates) will attempt to call # its set of views. Views with lower orders will be tried first. # The intent is to a) ensure that views with more predicates are # always evaluated before views with fewer predicates and b) to # ensure a stable call ordering of views that share the same number # of predicates. Views which do not have any predicates get an order # of MAX_ORDER, meaning that they will be tried very last. score = 0 for bit in weights: score = score | bit order = (MAX_ORDER - score) / (len(preds) + 1) return order, preds, phash.hexdigest() def normalize_accept_offer(offer, allow_range=False): if allow_range and '*' in offer: return offer.lower() return str(Accept.parse_offer(offer)) def sort_accept_offers(offers, order=None): """ Sort a list of offers by preference. For a given ``type/subtype`` category of offers, this algorithm will always sort offers with params higher than the bare offer. :param offers: A list of offers to be sorted. :param order: A weighted list of offers where items closer to the start of the list will be a preferred over items closer to the end. :return: A list of offers sorted first by specificity (higher to lower) then by ``order``. """ if order is None: order = [] max_weight = len(offers) def find_order_index(value, default=None): return next((i for i, x in enumerate(order) if x == value), default) def offer_sort_key(value): """ (type_weight, params_weight) type_weight: - index of specific ``type/subtype`` in order list - ``max_weight * 2`` if no match is found params_weight: - index of specific ``type/subtype;params`` in order list - ``max_weight`` if not found - ``max_weight + 1`` if no params at all """ parsed = Accept.parse_offer(value) type_w = find_order_index( parsed.type + '/' + parsed.subtype, max_weight, ) if parsed.params: param_w = find_order_index(value, max_weight) else: param_w = max_weight + 1 return (type_w, param_w) return sorted(offers, key=offer_sort_key)