import base64 import argparse import sys import textwrap from pyramid.compat import url_unquote from pyramid.request import Request from pyramid.scripts.common import get_config_loader from pyramid.scripts.common import parse_vars def main(argv=sys.argv, quiet=False): command = PRequestCommand(argv, quiet) return command.run() class PRequestCommand(object): description = """\ Submit a HTTP request to a web application. This command makes an artifical request to a web application that uses a PasteDeploy (.ini) configuration file for the server and application. Use "prequest config.ini /path" to request "/path". Use "prequest --method=POST config.ini /path < data" to do a POST with the given request body. Use "prequest --method=PUT config.ini /path < data" to do a PUT with the given request body. Use "prequest --method=PATCH config.ini /path < data" to do a PATCH with the given request body. Use "prequest --method=OPTIONS config.ini /path" to do an OPTIONS request. Use "prequest --method=PROPFIND config.ini /path" to do a PROPFIND request. If the path is relative (doesn't begin with "/") it is interpreted as relative to "/". The path passed to this script should be URL-quoted. The path can be succeeded with a query string (e.g. '/path?a=1&=b2'). The variable "environ['paste.command_request']" will be set to "True" in the request's WSGI environment, so your application can distinguish these calls from normal requests. """ parser = argparse.ArgumentParser( description=textwrap.dedent(description), formatter_class=argparse.RawDescriptionHelpFormatter, ) parser.add_argument( '-n', '--app-name', dest='app_name', metavar='NAME', help=( "Load the named application from the config file (default 'main')" ), ) parser.add_argument( '--header', dest='headers', metavar='NAME:VALUE', action='append', help=( "Header to add to request (you can use this option multiple times)" ), ) parser.add_argument( '-d', '--display-headers', dest='display_headers', action='store_true', help='Display status and headers before the response body' ) parser.add_argument( '-m', '--method', dest='method', choices=['GET', 'HEAD', 'POST', 'PUT', 'PATCH','DELETE', 'PROPFIND', 'OPTIONS'], help='Request method type (GET, POST, PUT, PATCH, DELETE, ' 'PROPFIND, OPTIONS)', ) parser.add_argument( '-l', '--login', dest='login', help='HTTP basic auth username:password pair', ) parser.add_argument( 'config_uri', nargs='?', default=None, help='The URI to the configuration file.', ) parser.add_argument( 'path_info', nargs='?', default=None, help='The path of the request.', ) parser.add_argument( 'config_vars', nargs='*', default=(), help="Variables required by the config file. For example, " "`http_port=%%(http_port)s` would expect `http_port=8080` to be " "passed here.", ) _get_config_loader = staticmethod(get_config_loader) stdin = sys.stdin def __init__(self, argv, quiet=False): self.quiet = quiet self.args = self.parser.parse_args(argv[1:]) def out(self, msg): # pragma: no cover if not self.quiet: print(msg) def run(self): if not self.args.config_uri or not self.args.path_info: self.out('You must provide at least two arguments') return 2 config_uri = self.args.config_uri config_vars = parse_vars(self.args.config_vars) path = self.args.path_info loader = self._get_config_loader(config_uri) loader.setup_logging(config_vars) app = loader.get_wsgi_app(self.args.app_name, config_vars) if not path.startswith('/'): path = '/' + path try: path, qs = path.split('?', 1) except ValueError: qs = '' path = url_unquote(path) headers = {} if self.args.login: enc = base64.b64encode(self.args.login.encode('ascii')) headers['Authorization'] = 'Basic ' + enc.decode('ascii') if self.args.headers: for item in self.args.headers: if ':' not in item: self.out( "Bad --header=%s option, value must be in the form " "'name:value'" % item) return 2 name, value = item.split(':', 1) headers[name] = value.strip() request_method = (self.args.method or 'GET').upper() environ = { 'REQUEST_METHOD': request_method, 'SCRIPT_NAME': '', # may be empty if app is at the root 'PATH_INFO': path, 'SERVER_NAME': 'localhost', # always mandatory 'SERVER_PORT': '80', # always mandatory 'SERVER_PROTOCOL': 'HTTP/1.0', 'CONTENT_TYPE': 'text/plain', 'REMOTE_ADDR':'127.0.0.1', 'wsgi.run_once': True, 'wsgi.multithread': False, 'wsgi.multiprocess': False, 'wsgi.errors': sys.stderr, 'wsgi.url_scheme': 'http', 'wsgi.version': (1, 0), 'QUERY_STRING': qs, 'HTTP_ACCEPT': 'text/plain;q=1.0, */*;q=0.1', 'paste.command_request': True, } if request_method in ('POST', 'PUT', 'PATCH'): environ['wsgi.input'] = self.stdin environ['CONTENT_LENGTH'] = '-1' for name, value in headers.items(): if name.lower() == 'content-type': name = 'CONTENT_TYPE' else: name = 'HTTP_' + name.upper().replace('-', '_') environ[name] = value request = Request.blank(path, environ=environ) response = request.get_response(app) if self.args.display_headers: self.out(response.status) for name, value in response.headerlist: self.out('%s: %s' % (name, value)) if response.charset: self.out(response.ubody) else: self.out(response.body) return 0 if __name__ == '__main__': # pragma: no cover sys.exit(main() or 0)