import fnmatch
|
import argparse
|
import sys
|
import textwrap
|
import re
|
|
from zope.interface import Interface
|
|
from pyramid.paster import bootstrap
|
from pyramid.compat import string_types
|
from pyramid.interfaces import IRouteRequest
|
from pyramid.config import not_
|
|
from pyramid.scripts.common import get_config_loader
|
from pyramid.scripts.common import parse_vars
|
from pyramid.static import static_view
|
from pyramid.view import _find_views
|
|
|
PAD = 3
|
ANY_KEY = '*'
|
UNKNOWN_KEY = '<unknown>'
|
|
|
def main(argv=sys.argv, quiet=False):
|
command = PRoutesCommand(argv, quiet)
|
return command.run()
|
|
|
def _get_pattern(route):
|
pattern = route.pattern
|
|
if not pattern.startswith('/'):
|
pattern = '/%s' % pattern
|
return pattern
|
|
|
def _get_print_format(fmt, max_name, max_pattern, max_view, max_method):
|
print_fmt = ''
|
max_map = {
|
'name': max_name,
|
'pattern': max_pattern,
|
'view': max_view,
|
'method': max_method,
|
}
|
sizes = []
|
|
for index, col in enumerate(fmt):
|
size = max_map[col] + PAD
|
print_fmt += '{{%s: <{%s}}} ' % (col, index)
|
sizes.append(size)
|
|
return print_fmt.format(*sizes)
|
|
|
def _get_request_methods(route_request_methods, view_request_methods):
|
excludes = set()
|
|
if route_request_methods:
|
route_request_methods = set(route_request_methods)
|
|
if view_request_methods:
|
view_request_methods = set(view_request_methods)
|
|
for method in view_request_methods.copy():
|
if method.startswith('!'):
|
view_request_methods.remove(method)
|
excludes.add(method[1:])
|
|
has_route_methods = route_request_methods is not None
|
has_view_methods = len(view_request_methods) > 0
|
has_methods = has_route_methods or has_view_methods
|
|
if has_route_methods is False and has_view_methods is False:
|
request_methods = [ANY_KEY]
|
elif has_route_methods is False and has_view_methods is True:
|
request_methods = view_request_methods
|
elif has_route_methods is True and has_view_methods is False:
|
request_methods = route_request_methods
|
else:
|
request_methods = route_request_methods.intersection(
|
view_request_methods
|
)
|
|
request_methods = set(request_methods).difference(excludes)
|
|
if has_methods and not request_methods:
|
request_methods = '<route mismatch>'
|
elif request_methods:
|
if excludes and request_methods == set([ANY_KEY]):
|
for exclude in excludes:
|
request_methods.add('!%s' % exclude)
|
|
request_methods = ','.join(sorted(request_methods))
|
|
return request_methods
|
|
|
def _get_view_module(view_callable):
|
if view_callable is None:
|
return UNKNOWN_KEY
|
|
if hasattr(view_callable, '__name__'):
|
if hasattr(view_callable, '__original_view__'):
|
original_view = view_callable.__original_view__
|
else:
|
original_view = None
|
|
if isinstance(original_view, static_view):
|
if original_view.package_name is not None:
|
return '%s:%s' % (
|
original_view.package_name,
|
original_view.docroot,
|
)
|
else:
|
return original_view.docroot
|
else:
|
view_name = view_callable.__name__
|
else:
|
# Currently only MultiView hits this,
|
# we could just not run _get_view_module
|
# for them and remove this logic
|
view_name = str(view_callable)
|
|
view_module = '%s.%s' % (view_callable.__module__, view_name)
|
|
# If pyramid wraps something in wsgiapp or wsgiapp2 decorators
|
# that is currently returned as pyramid.router.decorator, lets
|
# hack a nice name in:
|
if view_module == 'pyramid.router.decorator':
|
view_module = '<wsgiapp>'
|
|
return view_module
|
|
|
def get_route_data(route, registry):
|
pattern = _get_pattern(route)
|
|
request_iface = registry.queryUtility(IRouteRequest, name=route.name)
|
|
route_request_methods = None
|
view_request_methods_order = []
|
view_request_methods = {}
|
view_callable = None
|
|
route_intr = registry.introspector.get('routes', route.name)
|
|
if request_iface is None:
|
return [(route.name, _get_pattern(route), UNKNOWN_KEY, ANY_KEY)]
|
|
view_callables = _find_views(registry, request_iface, Interface, '')
|
if view_callables:
|
view_callable = view_callables[0]
|
else:
|
view_callable = None
|
view_module = _get_view_module(view_callable)
|
|
# Introspectables can be turned off, so there could be a chance
|
# that we have no `route_intr` but we do have a route + callable
|
if route_intr is None:
|
view_request_methods[view_module] = []
|
view_request_methods_order.append(view_module)
|
else:
|
if route_intr.get('static', False) is True:
|
return [
|
(route.name, route_intr['external_url'], UNKNOWN_KEY, ANY_KEY)
|
]
|
|
route_request_methods = route_intr['request_methods']
|
view_intr = registry.introspector.related(route_intr)
|
|
if view_intr:
|
for view in view_intr:
|
request_method = view.get('request_methods')
|
|
if request_method is not None:
|
if view.get('attr') is not None:
|
view_callable = getattr(view['callable'], view['attr'])
|
view_module = '%s.%s' % (
|
_get_view_module(view['callable']),
|
view['attr'],
|
)
|
else:
|
view_callable = view['callable']
|
view_module = _get_view_module(view_callable)
|
|
if view_module not in view_request_methods:
|
view_request_methods[view_module] = []
|
view_request_methods_order.append(view_module)
|
|
if isinstance(request_method, string_types):
|
request_method = (request_method,)
|
elif isinstance(request_method, not_):
|
request_method = ('!%s' % request_method.value,)
|
|
view_request_methods[view_module].extend(request_method)
|
else:
|
if view_module not in view_request_methods:
|
view_request_methods[view_module] = []
|
view_request_methods_order.append(view_module)
|
|
else:
|
view_request_methods[view_module] = []
|
view_request_methods_order.append(view_module)
|
|
final_routes = []
|
|
for view_module in view_request_methods_order:
|
methods = view_request_methods[view_module]
|
request_methods = _get_request_methods(route_request_methods, methods)
|
|
final_routes.append(
|
(route.name, pattern, view_module, request_methods)
|
)
|
|
return final_routes
|
|
|
class PRoutesCommand(object):
|
description = """\
|
Print all URL dispatch routes used by a Pyramid application in the
|
order in which they are evaluated. Each route includes the name of the
|
route, the pattern of the route, and the view callable which will be
|
invoked when the route is matched.
|
|
This command accepts one positional argument named 'config_uri'. It
|
specifies the PasteDeploy config file to use for the interactive
|
shell. The format is 'inifile#name'. If the name is left off, 'main'
|
will be assumed. Example: 'proutes myapp.ini'.
|
|
"""
|
bootstrap = staticmethod(bootstrap) # testing
|
get_config_loader = staticmethod(get_config_loader) # testing
|
stdout = sys.stdout
|
parser = argparse.ArgumentParser(
|
description=textwrap.dedent(description),
|
formatter_class=argparse.RawDescriptionHelpFormatter,
|
)
|
parser.add_argument(
|
'-g',
|
'--glob',
|
action='store',
|
dest='glob',
|
default='',
|
help='Display routes matching glob pattern',
|
)
|
|
parser.add_argument(
|
'-f',
|
'--format',
|
action='store',
|
dest='format',
|
default='',
|
help=(
|
'Choose which columns to display, this will '
|
'override the format key in the [proutes] ini '
|
'section'
|
),
|
)
|
|
parser.add_argument(
|
'config_uri',
|
nargs='?',
|
default=None,
|
help='The URI to the configuration file.',
|
)
|
|
parser.add_argument(
|
'config_vars',
|
nargs='*',
|
default=(),
|
help="Variables required by the config file. For example, "
|
"`http_port=%%(http_port)s` would expect `http_port=8080` to be "
|
"passed here.",
|
)
|
|
def __init__(self, argv, quiet=False):
|
self.args = self.parser.parse_args(argv[1:])
|
self.quiet = quiet
|
self.available_formats = ['name', 'pattern', 'view', 'method']
|
self.column_format = self.available_formats
|
|
def validate_formats(self, formats):
|
invalid_formats = []
|
for fmt in formats:
|
if fmt not in self.available_formats:
|
invalid_formats.append(fmt)
|
|
msg = 'You provided invalid formats %s, ' 'Available formats are %s'
|
|
if invalid_formats:
|
msg = msg % (invalid_formats, self.available_formats)
|
self.out(msg)
|
return False
|
|
return True
|
|
def proutes_file_config(self, loader, global_conf=None):
|
settings = loader.get_settings('proutes', global_conf)
|
format = settings.get('format')
|
if format:
|
cols = re.split(r'[,|\s\n]+', format)
|
self.column_format = [x.strip() for x in cols]
|
|
def out(self, msg): # pragma: no cover
|
if not self.quiet:
|
print(msg)
|
|
def _get_mapper(self, registry):
|
from pyramid.config import Configurator
|
|
config = Configurator(registry=registry)
|
return config.get_routes_mapper()
|
|
def run(self, quiet=False):
|
if not self.args.config_uri:
|
self.out('requires a config file argument')
|
return 2
|
|
config_uri = self.args.config_uri
|
config_vars = parse_vars(self.args.config_vars)
|
loader = self.get_config_loader(config_uri)
|
loader.setup_logging(config_vars)
|
self.proutes_file_config(loader, config_vars)
|
|
env = self.bootstrap(config_uri, options=config_vars)
|
registry = env['registry']
|
mapper = self._get_mapper(registry)
|
|
if self.args.format:
|
columns = self.args.format.split(',')
|
self.column_format = [x.strip() for x in columns]
|
|
is_valid = self.validate_formats(self.column_format)
|
|
if is_valid is False:
|
return 2
|
|
if mapper is None:
|
return 0
|
|
max_name = len('Name')
|
max_pattern = len('Pattern')
|
max_view = len('View')
|
max_method = len('Method')
|
|
routes = mapper.get_routes(include_static=True)
|
|
if len(routes) == 0:
|
return 0
|
|
mapped_routes = [
|
{
|
'name': 'Name',
|
'pattern': 'Pattern',
|
'view': 'View',
|
'method': 'Method',
|
},
|
{
|
'name': '----',
|
'pattern': '-------',
|
'view': '----',
|
'method': '------',
|
},
|
]
|
|
for route in routes:
|
route_data = get_route_data(route, registry)
|
|
for name, pattern, view, method in route_data:
|
if self.args.glob:
|
match = fnmatch.fnmatch(
|
name, self.args.glob
|
) or fnmatch.fnmatch(pattern, self.args.glob)
|
if not match:
|
continue
|
|
if len(name) > max_name:
|
max_name = len(name)
|
|
if len(pattern) > max_pattern:
|
max_pattern = len(pattern)
|
|
if len(view) > max_view:
|
max_view = len(view)
|
|
if len(method) > max_method:
|
max_method = len(method)
|
|
mapped_routes.append(
|
{
|
'name': name,
|
'pattern': pattern,
|
'view': view,
|
'method': method,
|
}
|
)
|
|
fmt = _get_print_format(
|
self.column_format, max_name, max_pattern, max_view, max_method
|
)
|
|
for route in mapped_routes:
|
self.out(fmt.format(**route))
|
|
return 0
|
|
|
if __name__ == '__main__': # pragma: no cover
|
sys.exit(main() or 0)
|