centralize and properly escape query string and anchor arguments
| | |
| | | from pyramid.static import static_view |
| | | from pyramid.threadlocal import get_current_registry |
| | | |
| | | from pyramid.url import parse_url_overrides |
| | | |
| | | from pyramid.view import ( |
| | | render_view_to_response, |
| | | AppendSlashNotFoundViewFactory, |
| | |
| | | kw['subpath'] = subpath |
| | | return request.route_url(route_name, **kw) |
| | | else: |
| | | app_url, scheme, host, port, qs, anchor = \ |
| | | parse_url_overrides(kw) |
| | | parsed = url_parse(url) |
| | | if not parsed.scheme: |
| | | url = urlparse.urlunparse(parsed._replace( |
| | | scheme=request.environ['wsgi.url_scheme'])) |
| | | subpath = url_quote(subpath) |
| | | result = urljoin(url, subpath) |
| | | if '_query' in kw: |
| | | query = kw.pop('_query') |
| | | if isinstance(query, string_types): |
| | | result += '?' + quote_plus(query) |
| | | elif query: |
| | | result += '?' + urlencode(query, doseq=True) |
| | | if '_anchor' in kw: |
| | | anchor = kw.pop('_anchor') |
| | | anchor = quote_plus(anchor) |
| | | result += '#' + anchor |
| | | return result |
| | | return result + qs + anchor |
| | | |
| | | raise ValueError('No static URL definition matching %s' % path) |
| | | |
| | |
| | | return result |
| | | |
| | | # bw compat api (dnr) |
| | | def quote_plus(val): |
| | | def quote_plus(val, safe=''): |
| | | cls = val.__class__ |
| | | if cls is text_type: |
| | | val = val.encode('utf-8') |
| | | elif cls is not binary_type: |
| | | val = str(val).encode('utf-8') |
| | | return _quote_plus(val) |
| | | return _quote_plus(val, safe=safe) |
| | | |
| | |
| | | result = inst.generate('package:path/abc def', request, a=1, |
| | | _query='(openlayers)') |
| | | self.assertEqual(result, |
| | | 'http://example.com/abc%20def?%28openlayers%29') |
| | | 'http://example.com/abc%20def?(openlayers)') |
| | | |
| | | def test_generate_url_with_custom_anchor(self): |
| | | inst = self._makeOne() |
| | |
| | | context = DummyContext() |
| | | result = request.resource_url(context, 'a', query='(openlayers)') |
| | | self.assertEqual(result, |
| | | 'http://example.com:5432/context/a?%28openlayers%29') |
| | | 'http://example.com:5432/context/a?(openlayers)') |
| | | |
| | | def test_resource_url_with_query_dict(self): |
| | | request = self._makeOne() |
| | |
| | | self.assertEqual(result, |
| | | 'http://example.com:5432/context/#La+Pe%C3%B1a') |
| | | |
| | | def test_resource_url_anchor_is_urlencoded(self): |
| | | def test_resource_url_anchor_is_urlencoded_safe(self): |
| | | request = self._makeOne() |
| | | self._registerResourceURL(request.registry) |
| | | context = DummyContext() |
| | | result = request.resource_url(context, anchor=' /#') |
| | | result = request.resource_url(context, anchor=' /#?&+') |
| | | self.assertEqual(result, |
| | | 'http://example.com:5432/context/#+%2F%23') |
| | | 'http://example.com:5432/context/#+/%23?&+') |
| | | |
| | | def test_resource_url_no_IResourceURL_registered(self): |
| | | # falls back to ResourceURL |
| | |
| | | request.registry.registerUtility(mapper, IRoutesMapper) |
| | | result = request.route_url('flub', _query='(openlayers)') |
| | | self.assertEqual(result, |
| | | 'http://example.com:5432/1/2/3?%28openlayers%29') |
| | | 'http://example.com:5432/1/2/3?(openlayers)') |
| | | |
| | | def test_route_url_with_empty_query(self): |
| | | from pyramid.interfaces import IRoutesMapper |
| | |
| | | ) |
| | | |
| | | PATH_SAFE = '/:@&+$,' # from webob |
| | | QUERY_SAFE = '/?:@!$&\'()*+,;=' # RFC 3986 |
| | | ANCHOR_SAFE = QUERY_SAFE |
| | | |
| | | def parse_url_overrides(kw): |
| | | """Parse special arguments passed when generating urls. |
| | | |
| | | The supplied dictionary is mutated, popping arguments as necessary. |
| | | Returns a 6-tuple of the format ``(app_url, scheme, host, port, |
| | | qs, anchor)``. |
| | | """ |
| | | anchor = '' |
| | | qs = '' |
| | | app_url = None |
| | | host = None |
| | | scheme = None |
| | | port = None |
| | | |
| | | if '_query' in kw: |
| | | query = kw.pop('_query') |
| | | if isinstance(query, string_types): |
| | | qs = '?' + quote_plus(query, safe=QUERY_SAFE) |
| | | elif query: |
| | | qs = '?' + urlencode(query, doseq=True) |
| | | |
| | | if '_anchor' in kw: |
| | | anchor = kw.pop('_anchor') |
| | | anchor = quote_plus(anchor, safe=ANCHOR_SAFE) |
| | | anchor = '#' + anchor |
| | | |
| | | if '_app_url' in kw: |
| | | app_url = kw.pop('_app_url') |
| | | |
| | | if '_host' in kw: |
| | | host = kw.pop('_host') |
| | | |
| | | if '_scheme' in kw: |
| | | scheme = kw.pop('_scheme') |
| | | |
| | | if '_port' in kw: |
| | | port = kw.pop('_port') |
| | | |
| | | return app_url, scheme, host, port, qs, anchor |
| | | |
| | | class URLMethodsMixin(object): |
| | | """ Request methods mixin for BaseRequest having to do with URL |
| | |
| | | if route.pregenerator is not None: |
| | | elements, kw = route.pregenerator(self, elements, kw) |
| | | |
| | | anchor = '' |
| | | qs = '' |
| | | app_url = None |
| | | host = None |
| | | scheme = None |
| | | port = None |
| | | |
| | | if '_query' in kw: |
| | | query = kw.pop('_query') |
| | | if isinstance(query, string_types): |
| | | qs = '?' + quote_plus(query) |
| | | elif query: |
| | | qs = '?' + urlencode(query, doseq=True) |
| | | |
| | | if '_anchor' in kw: |
| | | anchor = kw.pop('_anchor') |
| | | anchor = quote_plus(anchor) |
| | | anchor = '#' + anchor |
| | | |
| | | if '_app_url' in kw: |
| | | app_url = kw.pop('_app_url') |
| | | |
| | | if '_host' in kw: |
| | | host = kw.pop('_host') |
| | | |
| | | if '_scheme' in kw: |
| | | scheme = kw.pop('_scheme') |
| | | |
| | | if '_port' in kw: |
| | | port = kw.pop('_port') |
| | | app_url, scheme, host, port, qs, anchor = parse_url_overrides(kw) |
| | | |
| | | if app_url is None: |
| | | if (scheme is not None or host is not None or port is not None): |
| | |
| | | if 'query' in kw: |
| | | query = kw['query'] |
| | | if isinstance(query, string_types): |
| | | qs = '?' + quote_plus(query) |
| | | qs = '?' + quote_plus(query, safe=QUERY_SAFE) |
| | | elif query: |
| | | qs = '?' + urlencode(query, doseq=True) |
| | | |
| | | if 'anchor' in kw: |
| | | anchor = kw['anchor'] |
| | | anchor = quote_plus(anchor) |
| | | anchor = quote_plus(anchor, safe=ANCHOR_SAFE) |
| | | anchor = '#' + anchor |
| | | |
| | | if elements: |