| | |
| | | import platform |
| | | import sys |
| | | import types |
| | | |
| | | if platform.system() == 'Windows': # pragma: no cover |
| | | WIN = True |
| | | else: # pragma: no cover |
| | | WIN = False |
| | | |
| | | try: # pragma: no cover |
| | | import __pypy__ |
| | | PYPY = True |
| | |
| | | import inspect |
| | | import operator |
| | | import os |
| | | from functools import wraps |
| | | |
| | | from zope.interface import ( |
| | |
| | | urlparse, |
| | | im_func, |
| | | url_quote, |
| | | WIN, |
| | | ) |
| | | |
| | | from pyramid.exceptions import ( |
| | |
| | | some URL is visited; :meth:`pyramid.request.Request.static_url` |
| | | generates a URL to that asset. |
| | | |
| | | The ``name`` argument to ``add_static_view`` is usually a :term:`view |
| | | name`. When this is the case, the |
| | | The ``name`` argument to ``add_static_view`` is usually a simple URL |
| | | prefix (e.g. ``'images'``). When this is the case, the |
| | | :meth:`pyramid.request.Request.static_url` API will generate a URL |
| | | which points to a Pyramid view, which will serve up a set of assets |
| | | that live in the package itself. For example: |
| | |
| | | if info is None: |
| | | info = StaticURLInfo() |
| | | self.registry.registerUtility(info, IStaticURLInfo) |
| | | |
| | | info.add(self, name, spec, **kw) |
| | | |
| | | def isexception(o): |
| | |
| | | for (url, spec, route_name) in self._get_registrations(registry): |
| | | if path.startswith(spec): |
| | | subpath = path[len(spec):] |
| | | if WIN: # pragma: no cover |
| | | subpath = subpath.replace('\\', '/') # windows |
| | | if url is None: |
| | | kw['subpath'] = subpath |
| | | return request.route_url(route_name, **kw) |
| | |
| | | # appending a slash here if the spec doesn't have one is |
| | | # required for proper prefix matching done in ``generate`` |
| | | # (``subpath = path[len(spec):]``). |
| | | if not spec.endswith('/'): |
| | | spec = spec + '/' |
| | | if os.path.isabs(spec): |
| | | sep = os.sep |
| | | else: |
| | | sep = '/' |
| | | if not spec.endswith(sep): |
| | | spec = spec + sep |
| | | |
| | | # we also make sure the name ends with a slash, purely as a |
| | | # convenience: a name that is a url is required to end in a |
| | |
| | | self.lock.release() |
| | | return self.fileobj |
| | | |
| | | def __del__(self): |
| | | def close(self): |
| | | fileobj = self.fileobj |
| | | if fileobj is not None: |
| | | fileobj.close() |
| | | |
| | | def __del__(self): |
| | | self.close() |
| | | |
| | | def write(self, text): |
| | | fileobj = self.open() |
| | | fileobj.write(text) |
| | |
| | | <html>static</html> |
| | | <html>static</html> |
| | |
| | | import pyramid.tests.test_config |
| | | provider = self._makeOne(pyramid.tests.test_config) |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, resource_name)) |
| | | with provider.get_resource_stream(None, resource_name) as result: |
| | | self.assertEqual(result.read().replace(b'\r', b''), expected) |
| | | _assertBody(result.read(), os.path.join(here, resource_name)) |
| | | |
| | | def test_get_resource_string_no_overrides(self): |
| | | import os |
| | |
| | | import pyramid.tests.test_config |
| | | provider = self._makeOne(pyramid.tests.test_config) |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, resource_name)) |
| | | result = provider.get_resource_string(None, resource_name) |
| | | self.assertEqual(result.replace(b'\r', b''), expected) |
| | | _assertBody(result, os.path.join(here, resource_name)) |
| | | |
| | | def test_has_resource_no_overrides(self): |
| | | resource_name = 'test_assets.py' |
| | |
| | | import pyramid.tests.test_config |
| | | provider = self._makeOne(pyramid.tests.test_config) |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, resource_name)) |
| | | with provider.get_resource_stream(None, resource_name) as result: |
| | | self.assertEqual(result.read(), expected) |
| | | _assertBody(result.read(), os.path.join(here, resource_name)) |
| | | |
| | | def test_get_resource_string_override_returns_None(self): |
| | | overrides = DummyOverrides(None) |
| | |
| | | import pyramid.tests.test_config |
| | | provider = self._makeOne(pyramid.tests.test_config) |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, resource_name)) |
| | | result = provider.get_resource_string(None, resource_name) |
| | | self.assertEqual(result, expected) |
| | | _assertBody(result, os.path.join(here, resource_name)) |
| | | |
| | | def test_has_resource_override_returns_None(self): |
| | | overrides = DummyOverrides(None) |
| | |
| | | po = self._makeOne(package) |
| | | po.overrides= overrides |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, 'test_assets.py')) |
| | | with po.get_stream('whatever') as stream: |
| | | self.assertEqual(stream.read().replace(b'\r', b''), |
| | | expected) |
| | | _assertBody(stream.read(), os.path.join(here, 'test_assets.py')) |
| | | |
| | | def test_get_stream_file_doesnt_exist(self): |
| | | overrides = [ DummyOverride(None), DummyOverride( |
| | |
| | | po = self._makeOne(package) |
| | | po.overrides= overrides |
| | | here = os.path.dirname(os.path.abspath(__file__)) |
| | | expected = read_(os.path.join(here, 'test_assets.py')) |
| | | self.assertEqual(po.get_string('whatever').replace(b'\r', b''), |
| | | expected) |
| | | _assertBody(po.get_string('whatever'), |
| | | os.path.join(here, 'test_assets.py')) |
| | | |
| | | def test_get_string_file_doesnt_exist(self): |
| | | overrides = [ DummyOverride(None), DummyOverride( |
| | |
| | | contents = f.read() |
| | | return contents |
| | | |
| | | def _assertBody(body, filename): |
| | | # strip both \n and \r for windows |
| | | body = body.replace(b'\r', b'') |
| | | body = body.replace(b'\n', b'') |
| | | data = read_(filename) |
| | | data = data.replace(b'\r', b'') |
| | | data = data.replace(b'\n', b'') |
| | | assert(body == data) |
| | |
| | | import datetime |
| | | import locale |
| | | import os |
| | | import platform |
| | | import unittest |
| | | |
| | | from pyramid.wsgi import wsgiapp |
| | |
| | | res = self.testapp.get('/static/.hiddenfile', status=200) |
| | | _assertBody(res.body, os.path.join(here, 'fixtures/static/.hiddenfile')) |
| | | |
| | | if defaultlocale is not None: |
| | | # These tests are expected to fail on LANG=C systems |
| | | if defaultlocale is not None and platform.system() == 'Linux': |
| | | # These tests are expected to fail on LANG=C systems due to decode |
| | | # errors and on non-Linux systems due to git highchar handling |
| | | # vagaries |
| | | def test_highchars_in_pathelement(self): |
| | | url = url_quote('/static/héhé/index.html') |
| | | res = self.testapp.get(url, status=200) |
| | |
| | | def test_range_tilend(self): |
| | | self.testapp.extra_environ = {'HTTP_RANGE':'bytes=-5'} |
| | | res = self.testapp.get('/static/index.html', status=206) |
| | | self.assertEqual(res.body, b'tml>\n') |
| | | self.assertEqual(res.body, b'html>') |
| | | |
| | | def test_range_notbytes(self): |
| | | self.testapp.extra_environ = {'HTTP_RANGE':'kHz=-5'} |
| | |
| | | root_factory = 'pyramid.tests.pkgs.staticpermapp:RootFactory' |
| | | def test_allowed(self): |
| | | result = self.testapp.get('/allowed/index.html', status=200) |
| | | self.assertEqual( |
| | | result.body.replace(b'\r', b''), |
| | | read_(os.path.join(here, 'fixtures/static/index.html'))) |
| | | _assertBody(result.body, |
| | | os.path.join(here, 'fixtures/static/index.html')) |
| | | |
| | | def test_denied_via_acl_global_root_factory(self): |
| | | self.testapp.extra_environ = {'REMOTE_USER':'bob'} |
| | |
| | | def test_allowed_via_acl_global_root_factory(self): |
| | | self.testapp.extra_environ = {'REMOTE_USER':'fred'} |
| | | result = self.testapp.get('/protected/index.html', status=200) |
| | | self.assertEqual( |
| | | result.body.replace(b'\r', b''), |
| | | read_(os.path.join(here, 'fixtures/static/index.html'))) |
| | | _assertBody(result.body, |
| | | os.path.join(here, 'fixtures/static/index.html')) |
| | | |
| | | def test_denied_via_acl_local_root_factory(self): |
| | | self.testapp.extra_environ = {'REMOTE_USER':'fred'} |
| | |
| | | def test_allowed_via_acl_local_root_factory(self): |
| | | self.testapp.extra_environ = {'REMOTE_USER':'bob'} |
| | | result = self.testapp.get('/factory_protected/index.html', status=200) |
| | | self.assertEqual( |
| | | result.body.replace(b'\r', b''), |
| | | read_(os.path.join(here, 'fixtures/static/index.html'))) |
| | | _assertBody(result.body, |
| | | os.path.join(here, 'fixtures/static/index.html')) |
| | | |
| | | class TestCCBug(IntegrationBase, unittest.TestCase): |
| | | # "unordered" as reported in IRC by author of |
| | |
| | | if defaultlocale is None: # pragma: no cover |
| | | # If system locale does not have an encoding then default to utf-8 |
| | | filename = filename.encode('utf-8') |
| | | assert(body.replace(b'\r', b'') == read_(filename)) |
| | | # strip both \n and \r for windows |
| | | body = body.replace(b'\r', b'') |
| | | body = body.replace(b'\n', b'') |
| | | data = read_(filename) |
| | | data = data.replace(b'\r', b'') |
| | | data = data.replace(b'\n', b'') |
| | | assert(body == data) |
| | | |
| | |
| | | import os |
| | | import unittest |
| | | |
| | | class Test_get_app(unittest.TestCase): |
| | |
| | | return get_app(config_file, section_name, loadapp) |
| | | |
| | | def test_it(self): |
| | | import os |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini', 'myapp', loadapp) |
| | |
| | | self.assertEqual(result, app) |
| | | |
| | | def test_it_with_hash(self): |
| | | import os |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', None, loadapp) |
| | |
| | | self.assertEqual(result, app) |
| | | |
| | | def test_it_with_hash_and_name_override(self): |
| | | import os |
| | | app = DummyApp() |
| | | loadapp = DummyLoadWSGI(app) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', 'yourapp', loadapp) |
| | |
| | | return get_appsettings(config_file, section_name, appconfig) |
| | | |
| | | def test_it(self): |
| | | import os |
| | | values = {'a':1} |
| | | appconfig = DummyLoadWSGI(values) |
| | | result = self._callFUT('/foo/bar/myapp.ini', 'myapp', appconfig) |
| | |
| | | self.assertEqual(result, values) |
| | | |
| | | def test_it_with_hash(self): |
| | | import os |
| | | values = {'a':1} |
| | | appconfig = DummyLoadWSGI(values) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', None, appconfig) |
| | |
| | | self.assertEqual(result, values) |
| | | |
| | | def test_it_with_hash_and_name_override(self): |
| | | import os |
| | | values = {'a':1} |
| | | appconfig = DummyLoadWSGI(values) |
| | | result = self._callFUT('/foo/bar/myapp.ini#myapp', 'yourapp', appconfig) |
| | |
| | | |
| | | def test_it(self): |
| | | config_file, dict = self._callFUT('/abc') |
| | | self.assertEqual(config_file, '/abc') |
| | | self.assertEqual(dict['__file__'], '/abc') |
| | | self.assertEqual(dict['here'], '/') |
| | | # os.path.abspath is a sop to Windows |
| | | self.assertEqual(config_file, os.path.abspath('/abc')) |
| | | self.assertEqual(dict['__file__'], os.path.abspath('/abc')) |
| | | self.assertEqual(dict['here'], os.path.abspath('/')) |
| | | |
| | | def fileConfig(self, config_file, dict): |
| | | return config_file, dict |
| | |
| | | 1, False, |
| | | template_renderer=dummy_template_renderer) |
| | | result = self.out.getvalue() |
| | | self.assertTrue('Creating %s/mypackage/' % self.dirname in result) |
| | | self.assertTrue('Creating' in result) |
| | | self.assertTrue( |
| | | 'Copying fixture_scaffold/+package+/__init__.py_tmpl to' in result) |
| | | source = pkg_resources.resource_filename( |
| | |
| | | 1, False, |
| | | template_renderer=dummy_template_renderer) |
| | | result = self.out.getvalue() |
| | | self.assertTrue('Creating %s/mypackage/' % self.dirname in result) |
| | | self.assertTrue('Creating' in result) |
| | | self.assertTrue('Copying __init__.py_tmpl to' in result) |
| | | source = pkg_resources.resource_filename( |
| | | 'pyramid.tests.test_scaffolds', |
| | |
| | | import os |
| | | import unittest |
| | | |
| | | class Test_logging_file_config(unittest.TestCase): |
| | |
| | | |
| | | def test_it(self): |
| | | config_file, dict = self._callFUT('/abc') |
| | | self.assertEqual(config_file, '/abc') |
| | | self.assertEqual(dict['__file__'], '/abc') |
| | | self.assertEqual(dict['here'], '/') |
| | | # use of os.path.abspath here is a sop to Windows |
| | | self.assertEqual(config_file, os.path.abspath('/abc')) |
| | | self.assertEqual(dict['__file__'], os.path.abspath('/abc')) |
| | | self.assertEqual(dict['here'], os.path.abspath('/')) |
| | | |
| | | def fileConfig(self, config_file, dict): |
| | | return config_file, dict |
| | |
| | | import unittest |
| | | import os |
| | | import tempfile |
| | | |
| | | class TestPServeCommand(unittest.TestCase): |
| | | def setUp(self): |
| | |
| | | self.out_.getvalue(),'Not a valid PID file in %s' % path) |
| | | |
| | | def test_run_stop_daemon_invalid_pid_in_file(self): |
| | | import tempfile |
| | | tmp = tempfile.NamedTemporaryFile() |
| | | tmp.write(b'9999999') |
| | | tmp.flush() |
| | | tmpname = tmp.name |
| | | inst = self._makeOne('--stop-daemon', '--pid-file=%s' % tmpname) |
| | | fn = tempfile.mktemp() |
| | | with open(fn, 'wb') as tmp: |
| | | tmp.write(b'9999999') |
| | | tmp.close() |
| | | inst = self._makeOne('--stop-daemon', '--pid-file=%s' % fn) |
| | | inst.run() |
| | | try: |
| | | tmp.close() |
| | | except: |
| | | pass |
| | | self.assertEqual(self.out_.getvalue(), |
| | | 'PID in %s is not valid (deleting)' % tmpname) |
| | | 'PID in %s is not valid (deleting)' % fn) |
| | | |
| | | def test_parse_vars_good(self): |
| | | vars = ['a=1', 'b=2'] |
| | |
| | | return LazyWriter(filename, mode) |
| | | |
| | | def test_open(self): |
| | | import tempfile |
| | | filename = tempfile.mktemp() |
| | | try: |
| | | inst = self._makeOne(filename) |
| | | fp = inst.open() |
| | | self.assertEqual(fp.name, filename) |
| | | fp.close() |
| | | finally: |
| | | fp.close() |
| | | os.remove(filename) |
| | | |
| | | def test_write(self): |
| | | import tempfile |
| | | filename = tempfile.mktemp() |
| | | try: |
| | | inst = self._makeOne(filename) |
| | |
| | | with open(filename) as f: |
| | | data = f.read() |
| | | self.assertEqual(data, 'hello') |
| | | inst.close() |
| | | os.remove(filename) |
| | | |
| | | def test_writeline(self): |
| | | import tempfile |
| | | filename = tempfile.mktemp() |
| | | try: |
| | | inst = self._makeOne(filename) |
| | |
| | | with open(filename) as f: |
| | | data = f.read() |
| | | self.assertEqual(data, 'hello') |
| | | inst.close() |
| | | os.remove(filename) |
| | | |
| | | def test_flush(self): |
| | | import tempfile |
| | | filename = tempfile.mktemp() |
| | | try: |
| | | inst = self._makeOne(filename) |
| | | inst.flush() |
| | | fp = inst.fileobj |
| | | self.assertEqual(fp.name, filename) |
| | | fp.close() |
| | | finally: |
| | | fp.close() |
| | | os.remove(filename) |
| | | |
| | | class Test__methodwrapper(unittest.TestCase): |
| | |
| | | import os |
| | | import unittest |
| | | import warnings |
| | | |
| | |
| | | from pyramid.compat import ( |
| | | text_, |
| | | native_, |
| | | WIN, |
| | | ) |
| | | |
| | | class TestURLMethodsMixin(unittest.TestCase): |
| | |
| | | abspath = makeabs('static', 'foo.css') |
| | | result = request.static_url(abspath) |
| | | self.assertEqual(result, 'abc') |
| | | self.assertEqual(info.args, ('/static/foo.css', request, {})) |
| | | self.assertEqual(info.args, (makeabs('static', 'foo.css'), request, {})) |
| | | request = self._makeOne() |
| | | |
| | | def test_static_url_found_rel(self): |
| | |
| | | abspath = makeabs('static', 'foo.css') |
| | | result = request.static_path(abspath) |
| | | self.assertEqual(result, 'abc') |
| | | self.assertEqual(info.args, ('/static/foo.css', request, |
| | | self.assertEqual(info.args, (makeabs('static', 'foo.css'), request, |
| | | {'_app_url':'/foo'}) |
| | | ) |
| | | |
| | |
| | | return self.result |
| | | |
| | | def makeabs(*elements): |
| | | import os |
| | | return os.path.sep + os.path.sep.join(elements) |
| | | if WIN: # pragma: no cover |
| | | return r'c:\\' + os.path.sep.join(elements) |
| | | else: |
| | | return os.path.sep + os.path.sep.join(elements) |