Merge branch 'sontek-fix_commands'
| | |
| | | class. A similar microoptimization was done to |
| | | ``pyramid.request.Request.is_response``. |
| | | |
| | | - Make it possible to use variable arguments on ``p*`` commands (``pserve``, |
| | | ``pshell``, ``pviews``, etc) in the form ``a=1 b=2`` so you can fill in |
| | | values in parameterized ``.ini`` file, e.g. ``pshell etc/development.ini |
| | | http_port=8080``. See https://github.com/Pylons/pyramid/pull/714 |
| | | |
| | | Bug Fixes |
| | | --------- |
| | | |
| | |
| | | |
| | | .. autofunction:: bootstrap |
| | | |
| | | .. autofunction:: get_app(config_uri, name=None) |
| | | .. autofunction:: get_app(config_uri, name=None, options=None) |
| | | |
| | | .. autofunction:: get_appsettings(config_uri, name=None) |
| | | |
| | |
| | | from logging.config import fileConfig |
| | | from pyramid.scripting import prepare |
| | | |
| | | def get_app(config_uri, name=None, loadapp=loadapp): |
| | | def get_app(config_uri, name=None, options=None, loadapp=loadapp): |
| | | """ Return the WSGI application named ``name`` in the PasteDeploy |
| | | config file specified by ``config_uri``. |
| | | |
| | | ``options``, if passed, should be a dictionary used as variable assignments |
| | | like ``{'http_port': 8080}``. This is useful if e.g. ``%(http_port)s`` is |
| | | used in the config file. |
| | | |
| | | If the ``name`` is None, this will attempt to parse the name from |
| | | the ``config_uri`` string expecting the format ``inifile#name``. |
| | |
| | | path, section = _getpathsec(config_uri, name) |
| | | config_name = 'config:%s' % path |
| | | here_dir = os.getcwd() |
| | | app = loadapp(config_name, name=section, relative_to=here_dir) |
| | | if options: |
| | | kw = {'global_conf': options} |
| | | else: |
| | | kw = {} |
| | | |
| | | app = loadapp(config_name, name=section, relative_to=here_dir, **kw) |
| | | |
| | | return app |
| | | |
| | | def get_appsettings(config_uri, name=None, appconfig=appconfig): |
| | |
| | | section = name |
| | | return path, section |
| | | |
| | | def bootstrap(config_uri, request=None): |
| | | def bootstrap(config_uri, request=None, options=None): |
| | | """ Load a WSGI application from the PasteDeploy config file specified |
| | | by ``config_uri``. The environment will be configured as if it is |
| | | currently serving ``request``, leaving a natural environment in place |
| | |
| | | for you if none is provided. You can mutate the request's ``environ`` |
| | | later to setup a specific host/port/scheme/etc. |
| | | |
| | | ``options`` Is passed to get_app for use as variable assignments like |
| | | {'http_port': 8080} and then use %(http_port)s in the |
| | | config file. |
| | | |
| | | See :ref:`writing_a_script` for more information about how to use this |
| | | function. |
| | | """ |
| | | app = get_app(config_uri) |
| | | app = get_app(config_uri, options=options) |
| | | env = prepare(request) |
| | | env['app'] = app |
| | | return env |
| | |
| | | from pyramid.compat import configparser |
| | | from logging.config import fileConfig |
| | | |
| | | def parse_vars(args): |
| | | """ |
| | | Given variables like ``['a=b', 'c=d']`` turns it into ``{'a': |
| | | 'b', 'c': 'd'}`` |
| | | """ |
| | | result = {} |
| | | for arg in args: |
| | | if '=' not in arg: |
| | | raise ValueError( |
| | | 'Variable assignment %r invalid (no "=")' |
| | | % arg) |
| | | name, value = arg.split('=', 1) |
| | | result[name] = value |
| | | return result |
| | | |
| | | def logging_file_config(config_file, fileConfig=fileConfig, |
| | | configparser=configparser): |
| | | """ |
| | |
| | | from pyramid.compat import url_unquote |
| | | from pyramid.request import Request |
| | | from pyramid.paster import get_app |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PRequestCommand(argv, quiet) |
| | |
| | | name, value = item.split(':', 1) |
| | | headers[name] = value.strip() |
| | | |
| | | app = self.get_app(app_spec, self.options.app_name) |
| | | app = self.get_app(app_spec, self.options.app_name, |
| | | options=parse_vars(self.args[2:])) |
| | | |
| | | request_method = (self.options.method or 'GET').upper() |
| | | |
| | | environ = { |
| | |
| | | import textwrap |
| | | |
| | | from pyramid.paster import bootstrap |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PRoutesCommand(argv, quiet) |
| | |
| | | if not self.args: |
| | | self.out('requires a config file argument') |
| | | return 2 |
| | | |
| | | from pyramid.interfaces import IRouteRequest |
| | | from pyramid.interfaces import IViewClassifier |
| | | from pyramid.interfaces import IView |
| | | from zope.interface import Interface |
| | | config_uri = self.args[0] |
| | | env = self.bootstrap[0](config_uri) |
| | | |
| | | env = self.bootstrap[0](config_uri, options=parse_vars(self.args[1:])) |
| | | registry = env['registry'] |
| | | mapper = self._get_mapper(registry) |
| | | if mapper is not None: |
| | |
| | | import time |
| | | import traceback |
| | | |
| | | from paste.deploy import loadapp, loadserver |
| | | from paste.deploy import loadserver |
| | | from paste.deploy import loadapp |
| | | |
| | | from pyramid.compat import WIN |
| | | |
| | | from pyramid.paster import setup_logging |
| | | |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | MAXFD = 1024 |
| | | |
| | |
| | | if not self.quiet: |
| | | print(msg) |
| | | |
| | | def get_options(self): |
| | | if (len(self.args) > 1 |
| | | and self.args[1] in self.possible_subcommands): |
| | | restvars = self.args[2:] |
| | | else: |
| | | restvars = self.args[1:] |
| | | |
| | | return parse_vars(restvars) |
| | | |
| | | def run(self): # pragma: no cover |
| | | if self.options.stop_daemon: |
| | | return self.stop_daemon() |
| | |
| | | self.out('You must give a config file') |
| | | return 2 |
| | | app_spec = self.args[0] |
| | | |
| | | if (len(self.args) > 1 |
| | | and self.args[1] in self.possible_subcommands): |
| | | cmd = self.args[1] |
| | | restvars = self.args[2:] |
| | | else: |
| | | cmd = None |
| | | restvars = self.args[1:] |
| | | |
| | | if self.options.reload: |
| | | if os.environ.get(self._reloader_environ_key): |
| | |
| | | self.options.daemon = True |
| | | |
| | | app_name = self.options.app_name |
| | | vars = self.parse_vars(restvars) |
| | | |
| | | vars = self.get_options() |
| | | |
| | | if not self._scheme_re.search(app_spec): |
| | | app_spec = 'config:' + app_spec |
| | | server_name = self.options.server_name |
| | |
| | | |
| | | server = self.loadserver(server_spec, name=server_name, |
| | | relative_to=base, global_conf=vars) |
| | | app = self.loadapp(app_spec, name=app_name, |
| | | relative_to=base, global_conf=vars) |
| | | |
| | | app = self.loadapp(app_spec, name=app_name, relative_to=base, |
| | | global_conf=vars) |
| | | |
| | | if self.verbose > 0: |
| | | if hasattr(os, 'getpid'): |
| | |
| | | |
| | | serve() |
| | | |
| | | def loadserver(self, server_spec, name, relative_to, **kw):# pragma:no cover |
| | | return loadserver( |
| | | server_spec, name=name, relative_to=relative_to, **kw) |
| | | |
| | | def loadapp(self, app_spec, name, relative_to, **kw): # pragma: no cover |
| | | return loadapp(app_spec, name=name, relative_to=relative_to, **kw) |
| | | |
| | | def parse_vars(self, args): |
| | | """ |
| | | Given variables like ``['a=b', 'c=d']`` turns it into ``{'a': |
| | | 'b', 'c': 'd'}`` |
| | | """ |
| | | result = {} |
| | | for arg in args: |
| | | if '=' not in arg: |
| | | raise ValueError( |
| | | 'Variable assignment %r invalid (no "=")' |
| | | % arg) |
| | | name, value = arg.split('=', 1) |
| | | result[name] = value |
| | | return result |
| | | def loadserver(self, server_spec, name, relative_to, **kw):# pragma:no cover |
| | | return loadserver( |
| | | server_spec, name=name, relative_to=relative_to, **kw) |
| | | |
| | | def quote_first_command_arg(self, arg): # pragma: no cover |
| | | """ |
| | |
| | | |
| | | from pyramid.paster import setup_logging |
| | | |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PShellCommand(argv, quiet) |
| | | return command.run() |
| | |
| | | self.pshell_file_config(config_file) |
| | | |
| | | # bootstrap the environ |
| | | env = self.bootstrap[0](config_uri) |
| | | env = self.bootstrap[0](config_uri, options=parse_vars(self.args[1:])) |
| | | |
| | | # remove the closer from the env |
| | | closer = env.pop('closer') |
| | |
| | | from pyramid.tweens import MAIN |
| | | from pyramid.tweens import INGRESS |
| | | from pyramid.paster import bootstrap |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PTweensCommand(argv, quiet) |
| | |
| | | self.out('Requires a config file argument') |
| | | return 2 |
| | | config_uri = self.args[0] |
| | | env = self.bootstrap[0](config_uri) |
| | | env = self.bootstrap[0](config_uri, options=parse_vars(self.args[1:])) |
| | | registry = env['registry'] |
| | | tweens = self._get_tweens(registry) |
| | | if tweens is not None: |
| | |
| | | |
| | | from pyramid.interfaces import IMultiView |
| | | from pyramid.paster import bootstrap |
| | | from pyramid.scripts.common import parse_vars |
| | | |
| | | def main(argv=sys.argv, quiet=False): |
| | | command = PViewsCommand(argv, quiet) |
| | |
| | | if len(self.args) < 2: |
| | | self.out('Command requires a config file arg and a url arg') |
| | | return 2 |
| | | config_uri, url = self.args |
| | | config_uri = self.args[0] |
| | | url = self.args[1] |
| | | |
| | | if not url.startswith('/'): |
| | | url = '/%s' % url |
| | | env = self.bootstrap[0](config_uri) |
| | | env = self.bootstrap[0](config_uri, options=parse_vars(self.args[2:])) |
| | | registry = env['registry'] |
| | | view = self._find_view(url, registry) |
| | | self.out('') |
| | |
| | | import unittest |
| | | |
| | | class Test_get_app(unittest.TestCase): |
| | | def _callFUT(self, config_file, section_name, loadapp): |
| | | def _callFUT(self, config_file, section_name, options=None, loadapp=None): |
| | | from pyramid.paster import get_app |
| | | return get_app(config_file, section_name, loadapp) |
| | | return get_app( |
| | | config_file, section_name, options=options, loadapp=loadapp |
| | | ) |
| | | |
| | | def test_it(self): |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini', 'myapp', loadapp) |
| | | result = self._callFUT('/foo/bar/myapp.ini', 'myapp', loadapp=loadapp) |
| | | self.assertEqual(loadapp.config_name, 'config:/foo/bar/myapp.ini') |
| | | self.assertEqual(loadapp.section_name, 'myapp') |
| | | self.assertEqual(loadapp.relative_to, os.getcwd()) |
| | |
| | | def test_it_with_hash(self): |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', None, loadapp) |
| | | result = self._callFUT( |
| | | '/foo/bar/myapp.ini#myapp', None, loadapp=loadapp |
| | | ) |
| | | self.assertEqual(loadapp.config_name, 'config:/foo/bar/myapp.ini') |
| | | self.assertEqual(loadapp.section_name, 'myapp') |
| | | self.assertEqual(loadapp.relative_to, os.getcwd()) |
| | |
| | | def test_it_with_hash_and_name_override(self): |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', 'yourapp', loadapp) |
| | | result = self._callFUT( |
| | | '/foo/bar/myapp.ini#myapp', 'yourapp', loadapp=loadapp |
| | | ) |
| | | self.assertEqual(loadapp.config_name, 'config:/foo/bar/myapp.ini') |
| | | self.assertEqual(loadapp.section_name, 'yourapp') |
| | | self.assertEqual(loadapp.relative_to, os.getcwd()) |
| | | self.assertEqual(result, app) |
| | | |
| | | def test_it_with_options(self): |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | options = {'a':1} |
| | | result = self._callFUT( |
| | | '/foo/bar/myapp.ini#myapp', |
| | | 'yourapp', |
| | | loadapp=loadapp, |
| | | options=options, |
| | | ) |
| | | self.assertEqual(loadapp.config_name, 'config:/foo/bar/myapp.ini') |
| | | self.assertEqual(loadapp.section_name, 'yourapp') |
| | | self.assertEqual(loadapp.relative_to, os.getcwd()) |
| | | self.assertEqual(loadapp.kw, {'global_conf':options}) |
| | | self.assertEqual(result, app) |
| | | |
| | | class Test_get_appsettings(unittest.TestCase): |
| | |
| | | def __init__(self, result): |
| | | self.result = result |
| | | |
| | | def __call__(self, config_name, name=None, relative_to=None): |
| | | def __call__(self, config_name, name=None, relative_to=None, **kw): |
| | | self.config_name = config_name |
| | | self.section_name = name |
| | | self.relative_to = relative_to |
| | | self.kw = kw |
| | | return self.result |
| | | |
| | | class DummyApp: |
| | |
| | | def fileConfig(self, config_file, dict): |
| | | return config_file, dict |
| | | |
| | | class TestParseVars(unittest.TestCase): |
| | | def test_parse_vars_good(self): |
| | | from pyramid.scripts.common import parse_vars |
| | | vars = ['a=1', 'b=2'] |
| | | result = parse_vars(vars) |
| | | self.assertEqual(result, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_parse_vars_bad(self): |
| | | from pyramid.scripts.common import parse_vars |
| | | vars = ['a'] |
| | | self.assertRaises(ValueError, parse_vars, vars) |
| | | |
| | | |
| | | class DummyConfigParser(object): |
| | | def read(self, x): |
| | | pass |
| | |
| | | cmd.out = self.out |
| | | return cmd |
| | | |
| | | def get_app(self, spec, app_name=None): |
| | | def get_app(self, spec, app_name=None, options=None): |
| | | self._spec = spec |
| | | self._app_name = app_name |
| | | self._options = options or {} |
| | | |
| | | def helloworld(environ, start_request): |
| | | self._environ = environ |
| | | self._path_info = environ['PATH_INFO'] |
| | |
| | | cmd.args = ('/foo/bar/myapp.ini#myapp',) |
| | | return cmd |
| | | |
| | | def test_good_args(self): |
| | | cmd = self._getTargetClass()([]) |
| | | cmd.bootstrap = (dummy.DummyBootstrap(),) |
| | | cmd.args = ('/foo/bar/myapp.ini#myapp', 'a=1') |
| | | route = dummy.DummyRoute('a', '/a') |
| | | mapper = dummy.DummyMapper(route) |
| | | cmd._get_mapper = lambda *arg: mapper |
| | | L = [] |
| | | cmd.out = lambda msg: L.append(msg) |
| | | cmd.run() |
| | | self.assertTrue('<unknown>' in ''.join(L)) |
| | | |
| | | def test_bad_args(self): |
| | | cmd = self._getTargetClass()([]) |
| | | cmd.bootstrap = (dummy.DummyBootstrap(),) |
| | | cmd.args = ('/foo/bar/myapp.ini#myapp', 'a') |
| | | route = dummy.DummyRoute('a', '/a') |
| | | mapper = dummy.DummyMapper(route) |
| | | cmd._get_mapper = lambda *arg: mapper |
| | | |
| | | self.assertRaises(ValueError, cmd.run) |
| | | |
| | | def test_no_routes(self): |
| | | command = self._makeOne() |
| | | mapper = dummy.DummyMapper() |
| | |
| | | def out(self, msg): |
| | | self.out_.write(msg) |
| | | |
| | | def _get_server(*args, **kwargs): |
| | | def server(app): |
| | | return '' |
| | | |
| | | return server |
| | | |
| | | def _getTargetClass(self): |
| | | from pyramid.scripts.pserve import PServeCommand |
| | | return PServeCommand |
| | |
| | | msg = 'PID in %s is not valid (deleting)' % fn |
| | | self.assertEqual(self.out_.getvalue(), msg) |
| | | |
| | | def test_parse_vars_good(self): |
| | | vars = ['a=1', 'b=2'] |
| | | inst = self._makeOne('development.ini') |
| | | result = inst.parse_vars(vars) |
| | | def test_get_options_with_command(self): |
| | | inst = self._makeOne() |
| | | inst.args = ['foo', 'stop', 'a=1', 'b=2'] |
| | | result = inst.get_options() |
| | | self.assertEqual(result, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_get_options_no_command(self): |
| | | inst = self._makeOne() |
| | | inst.args = ['foo', 'a=1', 'b=2'] |
| | | result = inst.get_options() |
| | | self.assertEqual(result, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_parse_vars_good(self): |
| | | from pyramid.tests.test_scripts.dummy import DummyApp |
| | | |
| | | inst = self._makeOne('development.ini', 'a=1', 'b=2') |
| | | inst.loadserver = self._get_server |
| | | |
| | | |
| | | app = DummyApp() |
| | | |
| | | def get_app(*args, **kwargs): |
| | | app.global_conf = kwargs.get('global_conf', None) |
| | | |
| | | inst.loadapp = get_app |
| | | inst.run() |
| | | |
| | | self.assertEqual(app.global_conf, {'a': '1', 'b': '2'}) |
| | | |
| | | def test_parse_vars_bad(self): |
| | | vars = ['a'] |
| | | inst = self._makeOne('development.ini') |
| | | self.assertRaises(ValueError, inst.parse_vars, vars) |
| | | inst = self._makeOne('development.ini', 'a') |
| | | inst.loadserver = self._get_server |
| | | self.assertRaises(ValueError, inst.run) |
| | | |
| | | class Test_read_pidfile(unittest.TestCase): |
| | | def _callFUT(self, filename): |