Merge branch 'master' of github.com:Pylons/pyramid
1 files deleted
39 files modified
| | |
| | | Features |
| | | -------- |
| | | |
| | | - ``pyramid.authentication.AuthTktAuthenticationPolicy`` has been updated to |
| | | support newer hashing algorithms such as ``sha512``. Existing applications |
| | | should consider updating if possible. |
| | | |
| | | - Added an ``effective_principals`` route and view predicate. |
| | | |
| | | - Do not allow the userid returned from the ``authenticated_userid`` or the |
| | | userid that is one of the list of principals returned by |
| | | ``effective_principals`` to be either of the strings ``system.Everyone`` or |
| | | ``system.Authenticated`` when any of the built-in authorization policies that |
| | | live in ``pyramid.authentication`` are in use. These two strings are |
| | | reserved for internal usage by Pyramid and they will not be accepted as valid |
| | | userids. |
| | | |
| | | - Slightly better debug logging from |
| | | ``pyramid.authentication.RepozeWho1AuthenticationPolicy``. |
| | | |
| | | - ``pyramid.security.view_execution_permitted`` used to return `True` if no |
| | | view could be found. It now raises a ``TypeError`` exception in that case, as |
| | | it doesn't make sense to assert that a nonexistent view is |
| | | execution-permitted. See https://github.com/Pylons/pyramid/issues/299. |
| | | |
| | | - Get rid of shady monkeypatching of ``pyramid.request.Request`` and |
| | | ``pyramid.response.Response`` done within the ``__init__.py`` of Pyramid. |
| | | Webob no longer relies on this being done. Instead, the ResponseClass |
| | | attribute of the Pyramid Request class is assigned to the Pyramid response |
| | | class; that's enough to satisfy WebOb and behave as it did before with the |
| | | monkeypatching. |
| | | |
| | | - Allow a ``_depth`` argument to ``pyramid.view.view_config``, which will |
| | | permit limited composition reuse of the decorator by other software that |
| | | wants to provide custom decorators that are much like view_config. |
| | | |
| | | - Allow an iterable of decorators to be passed to |
| | | ``pyramid.config.Configurator.add_view``. This allows views to be wrapped |
| | | by more than one decorator without requiring combining the decorators |
| | | yourself. |
| | | |
| | | Bug Fixes |
| | | --------- |
| | | |
| | | - In the past if a renderer returned ``None``, the body of the resulting |
| | | response would be set explicitly to the empty string. Instead, now, the body |
| | | is left unchanged, which allows the renderer to set a body itself by using |
| | | e.g. ``request.response.body = b'foo'``. The body set by the renderer will |
| | | be unmolested on the way out. See |
| | | https://github.com/Pylons/pyramid/issues/709 |
| | | |
| | | - In uncommon cases, the ``pyramid_excview_tween_factory`` might have |
| | | inadvertently raised a ``KeyError`` looking for ``request_iface`` as an |
| | | attribute of the request. It no longer fails in this case. See |
| | | https://github.com/Pylons/pyramid/issues/700 |
| | | |
| | | Deprecations |
| | | ------------ |
| | | |
| | | - ``pyramid.authentication.AuthTktAuthenticationPolicy`` will emit a warning |
| | | if an application is using the policy without explicitly setting the |
| | | ``hashalg``. This is because the default is "md5" which is considered |
| | | insecure. If you really want "md5" then you must specify it explicitly to |
| | | get rid of the warning. |
| | | |
| | | Internals |
| | | --------- |
| | | |
| | | - Move ``TopologicalSorter`` from ``pyramid.config.util`` to ``pyramid.util``, |
| | | move ``CyclicDependencyError`` from ``pyramid.config.util`` to |
| | | ``pyramid.exceptions``, rename ``Singleton`` to ``Sentinel`` and move from |
| | | ``pyramid.config.util`` to ``pyramid.util``; this is in an effort to |
| | | move that stuff that may be an API one day out of ``pyramid.config.util``, |
| | | because that package should never be imported from non-Pyramid code. |
| | | TopologicalSorter is still not an API, but may become one. |
| | | |
| | | 1.4a3 (2012-10-26) |
| | | ================== |
| | | |
| | |
| | | Nice-to-Have |
| | | ------------ |
| | | |
| | | - config.set_registry_attr (with conflict detection). |
| | | |
| | | - _fix_registry should dictify the registry being fixed. |
| | | |
| | | - Provide the presumed renderer name to the called view as an attribute of |
| | | the request. |
| | | |
| | | - Have action methods return their discriminators. |
| | | |
| | | - Add docs about upgrading between Pyramid versions (e.g. how to see |
| | | deprecation warnings). |
| | | |
| | | - Fix renderers chapter to better document system values passed to template |
| | | renderers. |
| | |
| | | |
| | | - 1.6: Remove IContextURL and TraversalContextURL. |
| | | |
| | | - 1.7: Change ``pyramid.authentication.AuthTktAuthenticationPolicy`` default |
| | | ``hashalg`` to ``sha512``. |
| | | |
| | | Probably Bad Ideas |
| | | ------------------ |
| | | |
| | |
| | | with config.partial(introspection=False) as c: |
| | | c.add_view(..) |
| | | |
| | | - _fix_registry should dictify the registry being fixed. |
| | | |
| | | - config.set_registry_attr (with conflict detection)... bad idea because it |
| | | won't take effect until after a commit and folks will be confused by that. |
| | | |
| | |
| | | .. automodule:: pyramid.authentication |
| | | |
| | | .. autoclass:: AuthTktAuthenticationPolicy |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | .. autoclass:: RemoteUserAuthenticationPolicy |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | .. autoclass:: SessionAuthenticationPolicy |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | .. autoclass:: BasicAuthAuthenticationPolicy |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | .. autoclass:: RepozeWho1AuthenticationPolicy |
| | | :members: |
| | | :inherited-members: |
| | | |
| | | Helper Classes |
| | | ~~~~~~~~~~~~~~ |
| | |
| | | Once this renderer is registered via |
| | | :meth:`~pyramid.config.Configurator.add_renderer` as above, you can use |
| | | ``jsonp`` as the ``renderer=`` parameter to ``@view_config`` or |
| | | :meth:`pyramid.config.Configurator.add_view``: |
| | | :meth:`pyramid.config.Configurator.add_view`: |
| | | |
| | | .. code-block:: python |
| | | |
| | |
| | | from pyramid.config import Configurator |
| | | from pyramid.authentication import AuthTktAuthenticationPolicy |
| | | from pyramid.authorization import ACLAuthorizationPolicy |
| | | authentication_policy = AuthTktAuthenticationPolicy('seekrit') |
| | | authorization_policy = ACLAuthorizationPolicy() |
| | | authn_policy = AuthTktAuthenticationPolicy('seekrit', hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config = Configurator() |
| | | config.set_authentication_policy(authentication_policy) |
| | | config.set_authorization_policy(authorization_policy) |
| | | config.set_authentication_policy(authn_policy) |
| | | config.set_authorization_policy(authz_policy) |
| | | |
| | | .. note:: the ``authentication_policy`` and ``authorization_policy`` |
| | | arguments may also be passed to their respective methods mentioned above |
| | |
| | | this implementation is, by default, *unencrypted*. You should not use it |
| | | when you keep sensitive information in the session object, as the |
| | | information can be easily read by both users of your application and third |
| | | parties who have access to your users' network traffic. Use a different |
| | | session factory implementation (preferably one which keeps session data on |
| | | the server) for anything but the most basic of applications where "session |
| | | security doesn't matter". |
| | | parties who have access to your users' network traffic. And if you use this |
| | | sessioning implementation, and you inadvertently create a cross-site |
| | | scripting vulnerability in your application, because the session data is |
| | | stored unencrypted in a cookie, it will also be easier for evildoers to |
| | | obtain the current user's cross-site scripting token. In short, use a |
| | | different session factory implementation (preferably one which keeps session |
| | | data on the server) for anything but the most basic of applications where |
| | | "session security doesn't matter", and you are sure your application has no |
| | | cross-site scripting vulnerabilities. |
| | | |
| | | .. index:: |
| | | single: session object |
| | |
| | | |
| | | (Only the highlighted lines need to be added.) |
| | | |
| | | We are enabling an ``AuthTktAuthenticationPolicy``, it is based in an auth |
| | | ticket that may be included in the request, and an ``ACLAuthorizationPolicy`` |
| | | that uses an ACL to determine the allow or deny outcome for a view. |
| | | We are enabling an ``AuthTktAuthenticationPolicy``, it is based in an |
| | | auth ticket that may be included in the request, and an |
| | | ``ACLAuthorizationPolicy`` that uses an ACL to determine the allow or deny |
| | | outcome for a view. |
| | | |
| | | Note that the |
| | | :class:`pyramid.authentication.AuthTktAuthenticationPolicy` constructor |
| | | accepts two arguments: ``secret`` and ``callback``. ``secret`` is a string |
| | | representing an encryption key used by the "authentication ticket" machinery |
| | | represented by this policy: it is required. The ``callback`` is the |
| | | Note that the :class:`pyramid.authentication.AuthTktAuthenticationPolicy` |
| | | constructor accepts two arguments: ``secret`` and ``callback``. ``secret`` is |
| | | a string representing an encryption key used by the "authentication ticket" |
| | | machinery represented by this policy: it is required. The ``callback`` is the |
| | | ``groupfinder()`` function that we created before. |
| | | |
| | | Add permission declarations |
| | |
| | | def main(global_config, **settings): |
| | | """ This function returns a WSGI application. |
| | | """ |
| | | authn_policy = AuthTktAuthenticationPolicy(secret='sosecret', |
| | | callback=groupfinder) |
| | | authn_policy = AuthTktAuthenticationPolicy( |
| | | 'sosecret', callback=groupfinder, hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config = Configurator(root_factory=root_factory, settings=settings) |
| | | config.set_authentication_policy(authn_policy) |
| | |
| | | def main(global_config, **settings): |
| | | """ This function returns a WSGI application. |
| | | """ |
| | | authn_policy = AuthTktAuthenticationPolicy(secret='sosecret', |
| | | callback=groupfinder) |
| | | authn_policy = AuthTktAuthenticationPolicy( |
| | | 'sosecret', callback=groupfinder, hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config = Configurator(root_factory=root_factory, settings=settings) |
| | | config.set_authentication_policy(authn_policy) |
| | |
| | | |
| | | (Only the highlighted lines need to be added.) |
| | | |
| | | We are enabling an ``AuthTktAuthenticationPolicy``, it is based in an auth |
| | | ticket that may be included in the request, and an ``ACLAuthorizationPolicy`` |
| | | that uses an ACL to determine the allow or deny outcome for a view. |
| | | We are enabling an ``AuthTktAuthenticationPolicy``, it is based in an |
| | | auth ticket that may be included in the request, and an |
| | | ``ACLAuthorizationPolicy`` that uses an ACL to determine the allow or deny |
| | | outcome for a view. |
| | | |
| | | Note that the |
| | | :class:`pyramid.authentication.AuthTktAuthenticationPolicy` constructor |
| | | accepts two arguments: ``secret`` and ``callback``. ``secret`` is a string |
| | | representing an encryption key used by the "authentication ticket" machinery |
| | | represented by this policy: it is required. The ``callback`` is the |
| | | Note that the :class:`pyramid.authentication.AuthTktAuthenticationPolicy` |
| | | constructor accepts two arguments: ``secret`` and ``callback``. ``secret`` is |
| | | a string representing an encryption key used by the "authentication ticket" |
| | | machinery represented by this policy: it is required. The ``callback`` is the |
| | | ``groupfinder()`` function that we created before. |
| | | |
| | | Add permission declarations |
| | |
| | | DBSession.configure(bind=engine) |
| | | Base.metadata.bind = engine |
| | | authn_policy = AuthTktAuthenticationPolicy( |
| | | 'sosecret', callback=groupfinder) |
| | | 'sosecret', callback=groupfinder, hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config = Configurator(settings=settings, |
| | | root_factory='tutorial.models.RootFactory') |
| | |
| | | DBSession.configure(bind=engine) |
| | | Base.metadata.bind = engine |
| | | authn_policy = AuthTktAuthenticationPolicy( |
| | | 'sosecret', callback=groupfinder) |
| | | 'sosecret', callback=groupfinder, hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config = Configurator(settings=settings, |
| | | root_factory='tutorial.models.RootFactory') |
| | |
| | | from pyramid.request import Request |
| | | from pyramid.response import Response |
| | | Response.RequestClass = Request |
| | | Request.ResponseClass = Response |
| | | del Request, Response |
| | | # package |
| | |
| | | import binascii |
| | | from codecs import utf_8_decode |
| | | from codecs import utf_8_encode |
| | | from hashlib import md5 |
| | | import hashlib |
| | | import base64 |
| | | import datetime |
| | | import re |
| | | import time as time_mod |
| | | import warnings |
| | | |
| | | from zope.interface import implementer |
| | | |
| | |
| | | methodname = classname + '.' + methodname |
| | | logger.debug(methodname + ': ' + msg) |
| | | |
| | | def _clean_principal(self, princid): |
| | | if princid in (Authenticated, Everyone): |
| | | princid = None |
| | | return princid |
| | | |
| | | def authenticated_userid(self, request): |
| | | """ Return the authenticated userid or ``None``. |
| | | |
| | | If no callback is registered, this will be the same as |
| | | ``unauthenticated_userid``. |
| | | |
| | | If a ``callback`` is registered, this will return the userid if |
| | | and only if the callback returns a value that is not ``None``. |
| | | |
| | | """ |
| | | debug = self.debug |
| | | userid = self.unauthenticated_userid(request) |
| | | if userid is None: |
| | |
| | | 'authenticated_userid', |
| | | request) |
| | | return None |
| | | if self._clean_principal(userid) is None: |
| | | debug and self._log( |
| | | ('use of userid %r is disallowed by any built-in Pyramid ' |
| | | 'security policy, returning None' % userid), |
| | | 'authenticated_userid' , |
| | | request) |
| | | return None |
| | | |
| | | if self.callback is None: |
| | | debug and self._log( |
| | | 'there was no groupfinder callback; returning %r' % (userid,), |
| | |
| | | ) |
| | | |
| | | def effective_principals(self, request): |
| | | """ A list of effective principals derived from request. |
| | | |
| | | This will return a list of principals including, at least, |
| | | :data:`pyramid.security.Everyone`. If there is no authenticated |
| | | userid, or the ``callback`` returns ``None``, this will be the |
| | | only principal: |
| | | |
| | | .. code-block:: python |
| | | |
| | | return [Everyone] |
| | | |
| | | If the ``callback`` does not return ``None`` and an authenticated |
| | | userid is found, then the principals will include |
| | | :data:`pyramid.security.Authenticated`, the ``authenticated_userid`` |
| | | and the list of principals returned by the ``callback``: |
| | | |
| | | .. code-block:: python |
| | | |
| | | extra_principals = callback(userid, request) |
| | | return [Everyone, Authenticated, userid] + extra_principals |
| | | |
| | | """ |
| | | debug = self.debug |
| | | effective_principals = [Everyone] |
| | | userid = self.unauthenticated_userid(request) |
| | | |
| | | if userid is None: |
| | | debug and self._log( |
| | | 'unauthenticated_userid returned %r; returning %r' % ( |
| | |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | if self._clean_principal(userid) is None: |
| | | debug and self._log( |
| | | ('unauthenticated_userid returned disallowed %r; returning %r ' |
| | | 'as if it was None' % (userid, effective_principals)), |
| | | 'effective_principals', |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | if self.callback is None: |
| | | debug and self._log( |
| | | 'groupfinder callback is None, so groups is []', |
| | |
| | | 'groupfinder callback returned %r as groups' % (groups,), |
| | | 'effective_principals', |
| | | request) |
| | | |
| | | if groups is None: # is None! |
| | | debug and self._log( |
| | | 'returning effective principals: %r' % ( |
| | |
| | | return identifier |
| | | |
| | | def authenticated_userid(self, request): |
| | | """ Return the authenticated userid or ``None``. |
| | | |
| | | If no callback is registered, this will be the same as |
| | | ``unauthenticated_userid``. |
| | | |
| | | If a ``callback`` is registered, this will return the userid if |
| | | and only if the callback returns a value that is not ``None``. |
| | | |
| | | """ |
| | | identity = self._get_identity(request) |
| | | |
| | | if identity is None: |
| | | self.debug and self._log( |
| | | 'repoze.who identity is None, returning None', |
| | | 'authenticated_userid', |
| | | request) |
| | | return None |
| | | |
| | | userid = identity['repoze.who.userid'] |
| | | |
| | | if userid is None: |
| | | self.debug and self._log( |
| | | 'repoze.who.userid is None, returning None' % userid, |
| | | 'authenticated_userid', |
| | | request) |
| | | return None |
| | | |
| | | if self._clean_principal(userid) is None: |
| | | self.debug and self._log( |
| | | ('use of userid %r is disallowed by any built-in Pyramid ' |
| | | 'security policy, returning None' % userid), |
| | | 'authenticated_userid', |
| | | request) |
| | | return None |
| | | |
| | | if self.callback is None: |
| | | return identity['repoze.who.userid'] |
| | | return userid |
| | | |
| | | if self.callback(identity, request) is not None: # is not None! |
| | | return identity['repoze.who.userid'] |
| | | return userid |
| | | |
| | | def unauthenticated_userid(self, request): |
| | | """ Return the ``repoze.who.userid`` key from the detected identity.""" |
| | | identity = self._get_identity(request) |
| | | if identity is None: |
| | | return None |
| | | return identity['repoze.who.userid'] |
| | | |
| | | def effective_principals(self, request): |
| | | """ A list of effective principals derived from the identity. |
| | | |
| | | This will return a list of principals including, at least, |
| | | :data:`pyramid.security.Everyone`. If there is no identity, or |
| | | the ``callback`` returns ``None``, this will be the only principal. |
| | | |
| | | If the ``callback`` does not return ``None`` and an identity is |
| | | found, then the principals will include |
| | | :data:`pyramid.security.Authenticated`, the ``authenticated_userid`` |
| | | and the list of principals returned by the ``callback``. |
| | | |
| | | """ |
| | | effective_principals = [Everyone] |
| | | identity = self._get_identity(request) |
| | | |
| | | if identity is None: |
| | | self.debug and self._log( |
| | | ('repoze.who identity was None; returning %r' % |
| | | effective_principals), |
| | | 'effective_principals', |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | if self.callback is None: |
| | | groups = [] |
| | | else: |
| | | groups = self.callback(identity, request) |
| | | |
| | | if groups is None: # is None! |
| | | self.debug and self._log( |
| | | ('security policy groups callback returned None; returning %r' % |
| | | effective_principals), |
| | | 'effective_principals', |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | userid = identity['repoze.who.userid'] |
| | | |
| | | if userid is None: |
| | | self.debug and self._log( |
| | | ('repoze.who.userid was None; returning %r' % |
| | | effective_principals), |
| | | 'effective_principals', |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | if self._clean_principal(userid) is None: |
| | | self.debug and self._log( |
| | | ('unauthenticated_userid returned disallowed %r; returning %r ' |
| | | 'as if it was None' % (userid, effective_principals)), |
| | | 'effective_principals', |
| | | request |
| | | ) |
| | | return effective_principals |
| | | |
| | | effective_principals.append(Authenticated) |
| | | effective_principals.append(userid) |
| | | effective_principals.extend(groups) |
| | | |
| | | return effective_principals |
| | | |
| | | def remember(self, request, principal, **kw): |
| | | """ Store the ``principal`` as ``repoze.who.userid``.""" |
| | | identifier = self._get_identifier(request) |
| | | if identifier is None: |
| | | return [] |
| | |
| | | return identifier.remember(environ, identity) |
| | | |
| | | def forget(self, request): |
| | | """ Forget the current authenticated user. |
| | | |
| | | Return headers that, if included in a response, will delete the |
| | | cookie responsible for tracking the current user. |
| | | |
| | | """ |
| | | identifier = self._get_identifier(request) |
| | | if identifier is None: |
| | | return [] |
| | |
| | | self.debug = debug |
| | | |
| | | def unauthenticated_userid(self, request): |
| | | """ The ``REMOTE_USER`` value found within the ``environ``.""" |
| | | return request.environ.get(self.environ_key) |
| | | |
| | | def remember(self, request, principal, **kw): |
| | | """ A no-op. The ``REMOTE_USER`` does not provide a protocol for |
| | | remembering the user. This will be application-specific and can |
| | | be done somewhere else or in a subclass.""" |
| | | return [] |
| | | |
| | | def forget(self, request): |
| | | """ A no-op. The ``REMOTE_USER`` does not provide a protocol for |
| | | forgetting the user. This will be application-specific and can |
| | | be done somewhere else or in a subclass.""" |
| | | return [] |
| | | |
| | | _marker = object() |
| | | |
| | | @implementer(IAuthenticationPolicy) |
| | | class AuthTktAuthenticationPolicy(CallbackAuthenticationPolicy): |
| | | """ A :app:`Pyramid` :term:`authentication policy` which |
| | | """A :app:`Pyramid` :term:`authentication policy` which |
| | | obtains data from a Pyramid "auth ticket" cookie. |
| | | |
| | | .. warning:: |
| | | |
| | | The default hash algorithm used in this policy is MD5 and has known |
| | | hash collision vulnerabilities. The risk of an exploit is low. |
| | | However, for improved authentication security, use |
| | | ``hashalg='sha512'``. |
| | | |
| | | Constructor Arguments |
| | | |
| | |
| | | wildcard domain. |
| | | Optional. |
| | | |
| | | ``hashalg`` |
| | | |
| | | Default: ``md5`` (the literal string). |
| | | |
| | | Any hash algorithm supported by Python's ``hashlib.new()`` function |
| | | can be used as the ``hashalg``. |
| | | |
| | | Cookies generated by different instances of AuthTktAuthenticationPolicy |
| | | using different ``hashalg`` options are not compatible. Switching the |
| | | ``hashalg`` will imply that all existing users with a valid cookie will |
| | | be required to re-login. |
| | | |
| | | A warning is emitted at startup if an explicit ``hashalg`` is not |
| | | passed. This is for backwards compatibility reasons. |
| | | |
| | | This option is available as of :app:`Pyramid` 1.4. |
| | | |
| | | Optional. |
| | | |
| | | .. note:: |
| | | |
| | | ``md5`` is the default for backwards compatibility reasons. However, |
| | | if you don't specify ``md5`` as the hashalg explicitly, a warning is |
| | | issued at application startup time. An explicit value of ``sha512`` |
| | | is recommended for improved security, and ``sha512`` will become the |
| | | default in a future Pyramid version. |
| | | |
| | | ``debug`` |
| | | |
| | | Default: ``False``. If ``debug`` is ``True``, log messages to the |
| | |
| | | Objects of this class implement the interface described by |
| | | :class:`pyramid.interfaces.IAuthenticationPolicy`. |
| | | """ |
| | | |
| | | def __init__(self, |
| | | secret, |
| | | callback=None, |
| | |
| | | http_only=False, |
| | | wild_domain=True, |
| | | debug=False, |
| | | hashalg=_marker |
| | | ): |
| | | if hashalg is _marker: |
| | | hashalg = 'md5' |
| | | warnings.warn( |
| | | 'The MD5 hash function used by default by the ' |
| | | 'AuthTktAuthenticationPolicy is known to be ' |
| | | 'susceptible to collision attacks. It is the current default ' |
| | | 'for backwards compatibility reasons, but we recommend that ' |
| | | 'you use the SHA512 algorithm instead for improved security. ' |
| | | 'Pass ``hashalg=\'sha512\'`` to the ' |
| | | 'AuthTktAuthenticationPolicy constructor to do so.\n\nNote ' |
| | | 'that a change to the hash algorithms will invalidate existing ' |
| | | 'auth tkt cookies set by your application. If backwards ' |
| | | 'compatibility of existing auth tkt cookies is of greater ' |
| | | 'concern than the risk posed by the potential for a hash ' |
| | | 'collision, you\'ll want to continue using MD5 explicitly. ' |
| | | 'To do so, pass ``hashalg=\'md5\'`` in your application to ' |
| | | 'the AuthTktAuthenticationPolicy constructor. When you do so ' |
| | | 'this warning will not be emitted again. The default ' |
| | | 'algorithm used in this policy will change in the future, so ' |
| | | 'setting an explicit hashalg will futureproof your ' |
| | | 'application.', |
| | | DeprecationWarning, |
| | | stacklevel=2 |
| | | ) |
| | | self.cookie = AuthTktCookieHelper( |
| | | secret, |
| | | cookie_name=cookie_name, |
| | |
| | | http_only=http_only, |
| | | path=path, |
| | | wild_domain=wild_domain, |
| | | hashalg=hashalg, |
| | | ) |
| | | self.callback = callback |
| | | self.debug = debug |
| | | |
| | | def unauthenticated_userid(self, request): |
| | | """ The userid key within the auth_tkt cookie.""" |
| | | result = self.cookie.identify(request) |
| | | if result: |
| | | return result['userid'] |
| | | |
| | | def remember(self, request, principal, **kw): |
| | | """ Accepts the following kw args: ``max_age=<int-seconds>, |
| | | ``tokens=<sequence-of-ascii-strings>``""" |
| | | ``tokens=<sequence-of-ascii-strings>``. |
| | | |
| | | Return a list of headers which will set appropriate cookies on |
| | | the response. |
| | | |
| | | """ |
| | | return self.cookie.remember(request, principal, **kw) |
| | | |
| | | def forget(self, request): |
| | | """ A list of headers which will delete appropriate cookies.""" |
| | | return self.cookie.forget(request) |
| | | |
| | | def b64encode(v): |
| | |
| | | """ |
| | | |
| | | def __init__(self, secret, userid, ip, tokens=(), user_data='', |
| | | time=None, cookie_name='auth_tkt', secure=False): |
| | | time=None, cookie_name='auth_tkt', secure=False, |
| | | hashalg='md5'): |
| | | self.secret = secret |
| | | self.userid = userid |
| | | self.ip = ip |
| | |
| | | self.time = time |
| | | self.cookie_name = cookie_name |
| | | self.secure = secure |
| | | self.hashalg = hashalg |
| | | |
| | | def digest(self): |
| | | return calculate_digest( |
| | | self.ip, self.time, self.secret, self.userid, self.tokens, |
| | | self.user_data) |
| | | self.user_data, self.hashalg) |
| | | |
| | | def cookie_value(self): |
| | | v = '%s%08x%s!' % (self.digest(), int(self.time), |
| | |
| | | Exception.__init__(self, msg) |
| | | |
| | | # this function licensed under the MIT license (stolen from Paste) |
| | | def parse_ticket(secret, ticket, ip): |
| | | def parse_ticket(secret, ticket, ip, hashalg='md5'): |
| | | """ |
| | | Parse the ticket, returning (timestamp, userid, tokens, user_data). |
| | | |
| | |
| | | with an explanation. |
| | | """ |
| | | ticket = ticket.strip('"') |
| | | digest = ticket[:32] |
| | | digest_size = hashlib.new(hashalg).digest_size * 2 |
| | | digest = ticket[:digest_size] |
| | | try: |
| | | timestamp = int(ticket[32:40], 16) |
| | | timestamp = int(ticket[digest_size:digest_size + 8], 16) |
| | | except ValueError as e: |
| | | raise BadTicket('Timestamp is not a hex integer: %s' % e) |
| | | try: |
| | | userid, data = ticket[40:].split('!', 1) |
| | | userid, data = ticket[digest_size + 8:].split('!', 1) |
| | | except ValueError: |
| | | raise BadTicket('userid is not followed by !') |
| | | userid = url_unquote(userid) |
| | |
| | | user_data = data |
| | | |
| | | expected = calculate_digest(ip, timestamp, secret, |
| | | userid, tokens, user_data) |
| | | userid, tokens, user_data, hashalg) |
| | | |
| | | # Avoid timing attacks (see |
| | | # http://seb.dbzteam.org/crypto/python-oauth-timing-hmac.pdf) |
| | |
| | | return (timestamp, userid, tokens, user_data) |
| | | |
| | | # this function licensed under the MIT license (stolen from Paste) |
| | | def calculate_digest(ip, timestamp, secret, userid, tokens, user_data): |
| | | def calculate_digest(ip, timestamp, secret, userid, tokens, user_data, |
| | | hashalg='md5'): |
| | | secret = bytes_(secret, 'utf-8') |
| | | userid = bytes_(userid, 'utf-8') |
| | | tokens = bytes_(tokens, 'utf-8') |
| | | user_data = bytes_(user_data, 'utf-8') |
| | | digest0 = md5( |
| | | hash_obj = hashlib.new(hashalg) |
| | | hash_obj.update( |
| | | encode_ip_timestamp(ip, timestamp) + secret + userid + b'\0' |
| | | + tokens + b'\0' + user_data).hexdigest() |
| | | digest = md5(bytes_(digest0) + secret).hexdigest() |
| | | return digest |
| | | + tokens + b'\0' + user_data) |
| | | digest = hash_obj.hexdigest() |
| | | hash_obj2 = hashlib.new(hashalg) |
| | | hash_obj2.update(bytes_(digest) + secret) |
| | | return hash_obj2.hexdigest() |
| | | |
| | | # this function licensed under the MIT license (stolen from Paste) |
| | | def encode_ip_timestamp(ip, timestamp): |
| | |
| | | |
| | | def __init__(self, secret, cookie_name='auth_tkt', secure=False, |
| | | include_ip=False, timeout=None, reissue_time=None, |
| | | max_age=None, http_only=False, path="/", wild_domain=True): |
| | | max_age=None, http_only=False, path="/", wild_domain=True, |
| | | hashalg='md5'): |
| | | self.secret = secret |
| | | self.cookie_name = cookie_name |
| | | self.include_ip = include_ip |
| | |
| | | self.http_only = http_only |
| | | self.path = path |
| | | self.wild_domain = wild_domain |
| | | self.hashalg = hashalg |
| | | |
| | | static_flags = [] |
| | | if self.secure: |
| | |
| | | |
| | | try: |
| | | timestamp, userid, tokens, user_data = self.parse_ticket( |
| | | self.secret, cookie, remote_addr) |
| | | self.secret, cookie, remote_addr, self.hashalg) |
| | | except self.BadTicket: |
| | | return None |
| | | |
| | |
| | | tokens=tokens, |
| | | user_data=user_data, |
| | | cookie_name=self.cookie_name, |
| | | secure=self.secure) |
| | | secure=self.secure, |
| | | hashalg=self.hashalg |
| | | ) |
| | | |
| | | cookie_value = ticket.cookie_value() |
| | | return self._get_cookies(environ, cookie_value, max_age) |
| | |
| | | self.debug = debug |
| | | |
| | | def unauthenticated_userid(self, request): |
| | | """ The userid parsed from the ``Authorization`` request header.""" |
| | | credentials = self._get_credentials(request) |
| | | if credentials: |
| | | return credentials[0] |
| | | |
| | | def remember(self, request, principal, **kw): |
| | | """ A no-op. Basic authentication does not provide a protocol for |
| | | remembering the user. Credentials are sent on every request. |
| | | |
| | | """ |
| | | return [] |
| | | |
| | | def forget(self, request): |
| | | """ Returns challenge headers. This should be attached to a response |
| | | to indicate that credentials are required.""" |
| | | return [('WWW-Authenticate', 'Basic realm="%s"' % self.realm)] |
| | | |
| | | def callback(self, username, request): |
| | |
| | | ) |
| | | |
| | | from pyramid.exceptions import ConfigurationError |
| | | |
| | | from pyramid.registry import predvalseq |
| | | from pyramid.util import TopologicalSorter |
| | | |
| | | from hashlib import md5 |
| | | |
| | |
| | | |
| | | # under = after |
| | | # over = before |
| | | |
| | | class Singleton(object): |
| | | def __init__(self, repr): |
| | | self.repr = repr |
| | | |
| | | def __repr__(self): |
| | | return self.repr |
| | | |
| | | FIRST = Singleton('FIRST') |
| | | LAST = Singleton('LAST') |
| | | |
| | | class TopologicalSorter(object): |
| | | def __init__( |
| | | self, |
| | | default_before=LAST, |
| | | default_after=None, |
| | | first=FIRST, |
| | | last=LAST, |
| | | ): |
| | | self.names = [] |
| | | self.req_before = set() |
| | | self.req_after = set() |
| | | self.name2before = {} |
| | | self.name2after = {} |
| | | self.name2val = {} |
| | | self.order = [] |
| | | self.default_before = default_before |
| | | self.default_after = default_after |
| | | self.first = first |
| | | self.last = last |
| | | |
| | | def remove(self, name): |
| | | self.names.remove(name) |
| | | del self.name2val[name] |
| | | after = self.name2after.pop(name, []) |
| | | if after: |
| | | self.req_after.remove(name) |
| | | for u in after: |
| | | self.order.remove((u, name)) |
| | | before = self.name2before.pop(name, []) |
| | | if before: |
| | | self.req_before.remove(name) |
| | | for u in before: |
| | | self.order.remove((name, u)) |
| | | |
| | | def add(self, name, val, after=None, before=None): |
| | | if name in self.names: |
| | | self.remove(name) |
| | | self.names.append(name) |
| | | self.name2val[name] = val |
| | | if after is None and before is None: |
| | | before = self.default_before |
| | | after = self.default_after |
| | | if after is not None: |
| | | if not is_nonstr_iter(after): |
| | | after = (after,) |
| | | self.name2after[name] = after |
| | | self.order += [(u, name) for u in after] |
| | | self.req_after.add(name) |
| | | if before is not None: |
| | | if not is_nonstr_iter(before): |
| | | before = (before,) |
| | | self.name2before[name] = before |
| | | self.order += [(name, o) for o in before] |
| | | self.req_before.add(name) |
| | | |
| | | def sorted(self): |
| | | order = [(self.first, self.last)] |
| | | roots = [] |
| | | graph = {} |
| | | names = [self.first, self.last] |
| | | names.extend(self.names) |
| | | |
| | | for a, b in self.order: |
| | | order.append((a, b)) |
| | | |
| | | def add_node(node): |
| | | if not node in graph: |
| | | roots.append(node) |
| | | graph[node] = [0] # 0 = number of arcs coming into this node |
| | | |
| | | def add_arc(fromnode, tonode): |
| | | graph[fromnode].append(tonode) |
| | | graph[tonode][0] += 1 |
| | | if tonode in roots: |
| | | roots.remove(tonode) |
| | | |
| | | for name in names: |
| | | add_node(name) |
| | | |
| | | has_before, has_after = set(), set() |
| | | for a, b in order: |
| | | if a in names and b in names: # deal with missing dependencies |
| | | add_arc(a, b) |
| | | has_before.add(a) |
| | | has_after.add(b) |
| | | |
| | | if not self.req_before.issubset(has_before): |
| | | raise ConfigurationError( |
| | | 'Unsatisfied before dependencies: %s' |
| | | % (', '.join(sorted(self.req_before - has_before))) |
| | | ) |
| | | if not self.req_after.issubset(has_after): |
| | | raise ConfigurationError( |
| | | 'Unsatisfied after dependencies: %s' |
| | | % (', '.join(sorted(self.req_after - has_after))) |
| | | ) |
| | | |
| | | sorted_names = [] |
| | | |
| | | while roots: |
| | | root = roots.pop(0) |
| | | sorted_names.append(root) |
| | | children = graph[root][1:] |
| | | for child in children: |
| | | arcs = graph[child][0] |
| | | arcs -= 1 |
| | | graph[child][0] = arcs |
| | | if arcs == 0: |
| | | roots.insert(0, child) |
| | | del graph[root] |
| | | |
| | | if graph: |
| | | # loop in input |
| | | cycledeps = {} |
| | | for k, v in graph.items(): |
| | | cycledeps[k] = v[1:] |
| | | raise CyclicDependencyError(cycledeps) |
| | | |
| | | result = [] |
| | | |
| | | for name in sorted_names: |
| | | if name in self.names: |
| | | result.append((name, self.name2val[name])) |
| | | |
| | | return result |
| | | |
| | | class CyclicDependencyError(Exception): |
| | | def __init__(self, cycles): |
| | | self.cycles = cycles |
| | | |
| | | def __str__(self): |
| | | L = [] |
| | | cycles = self.cycles |
| | | for cycle in cycles: |
| | | dependent = cycle |
| | | dependees = cycles[cycle] |
| | | L.append('%r sorts before %r' % (dependent, dependees)) |
| | | msg = 'Implicit ordering cycle:' + '; '.join(L) |
| | | return msg |
| | | |
| | | class PredicateList(object): |
| | | |
| | |
| | | url_quote, |
| | | WIN, |
| | | is_bound_method, |
| | | is_nonstr_iter |
| | | ) |
| | | |
| | | from pyramid.exceptions import ( |
| | |
| | | |
| | | decorator |
| | | |
| | | A :term:`dotted Python name` to function (or the function itself) |
| | | which will be used to decorate the registered :term:`view |
| | | callable`. The decorator function will be called with the view |
| | | callable as a single argument. The view callable it is passed will |
| | | accept ``(context, request)``. The decorator must return a |
| | | A :term:`dotted Python name` to function (or the function itself, |
| | | or an iterable of the aforementioned) which will be used to |
| | | decorate the registered :term:`view callable`. The decorator |
| | | function(s) will be called with the view callable as a single |
| | | argument. The view callable it is passed will accept |
| | | ``(context, request)``. The decorator(s) must return a |
| | | replacement view callable which also accepts ``(context, |
| | | request)``. |
| | | |
| | | If decorator is an iterable, the callables will be combined and |
| | | used in the order provided as a decorator. |
| | | For example:: |
| | | |
| | | @view_config(..., |
| | | decorator=(decorator2, |
| | | decorator1)) |
| | | def myview(request): |
| | | .... |
| | | |
| | | Is similar to doing:: |
| | | |
| | | @view_config(...) |
| | | @decorator2 |
| | | @decorator1 |
| | | def myview(request): |
| | | ... |
| | | |
| | | Except with the existing benefits of ``decorator=`` (having a common |
| | | decorator syntax for all view calling conventions and not having to |
| | | think about preserving function attributes such as ``__name__`` and |
| | | ``__module__`` within decorator logic). |
| | | |
| | | Passing an iterable is only supported as of :app:`Pyramid` 1.4a4. |
| | | |
| | | mapper |
| | | |
| | |
| | | for_ = self.maybe_dotted(for_) |
| | | containment = self.maybe_dotted(containment) |
| | | mapper = self.maybe_dotted(mapper) |
| | | decorator = self.maybe_dotted(decorator) |
| | | |
| | | def combine(*decorators): |
| | | def decorated(view_callable): |
| | | # reversed() is allows a more natural ordering in the api |
| | | for decorator in reversed(decorators): |
| | | view_callable = decorator(view_callable) |
| | | return view_callable |
| | | return decorated |
| | | |
| | | if is_nonstr_iter(decorator): |
| | | decorator = combine(*map(self.maybe_dotted, decorator)) |
| | | else: |
| | | decorator = self.maybe_dotted(decorator) |
| | | |
| | | if not view: |
| | | if renderer: |
| | |
| | | |
| | | def __str__(self): |
| | | return "%s: %s\n in:\n %s" % (self.etype, self.evalue, self.info) |
| | | |
| | | class CyclicDependencyError(Exception): |
| | | """ The exception raised when the Pyramid topological sorter detects a |
| | | cyclic dependency.""" |
| | | def __init__(self, cycles): |
| | | self.cycles = cycles |
| | | |
| | | def __str__(self): |
| | | L = [] |
| | | cycles = self.cycles |
| | | for cycle in cycles: |
| | | dependent = cycle |
| | | dependees = cycles[cycle] |
| | | L.append('%r sorts before %r' % (dependent, dependees)) |
| | | msg = 'Implicit ordering cycle:' + '; '.join(L) |
| | | return msg |
| | | |
| | | |
| | |
| | | |
| | | response = response_factory() |
| | | |
| | | if result is None: |
| | | result = '' |
| | | |
| | | if isinstance(result, text_type): |
| | | response.text = result |
| | | else: |
| | | response.body = result |
| | | if result is not None: |
| | | if isinstance(result, text_type): |
| | | response.text = result |
| | | else: |
| | | response.body = result |
| | | |
| | | if request is not None: |
| | | # deprecated mechanism to set up request.response_* attrs, see |
| | |
| | | matchdict = None |
| | | matched_route = None |
| | | |
| | | ResponseClass = Response |
| | | |
| | | @reify |
| | | def tmpl_context(self): |
| | | # docs-deprecated template context for Pylons-like apps; do not |
| | |
| | | def makedirs(dir, verbosity, pad): |
| | | parent = os.path.dirname(os.path.abspath(dir)) |
| | | if not os.path.exists(parent): |
| | | makedirs(parent, verbosity, pad) |
| | | makedirs(parent, verbosity, pad) # pragma: no cover |
| | | os.mkdir(dir) |
| | | |
| | | def substitute_filename(fn, vars): |
| | |
| | | IAuthenticationPolicy, |
| | | IAuthorizationPolicy, |
| | | ISecuredView, |
| | | IView, |
| | | IViewClassifier, |
| | | ) |
| | | |
| | |
| | | view using the effective authentication/authorization policies and |
| | | the ``request``. Return a boolean result. If no |
| | | :term:`authorization policy` is in effect, or if the view is not |
| | | protected by a permission, return ``True``.""" |
| | | protected by a permission, return ``True``. If no view can view found, |
| | | an exception will be raised. |
| | | |
| | | .. versionchanged:: 1.4a4 |
| | | An exception is raised if no view is found. |
| | | |
| | | """ |
| | | try: |
| | | reg = request.registry |
| | | except AttributeError: |
| | |
| | | provides = [IViewClassifier] + map_(providedBy, (request, context)) |
| | | view = reg.adapters.lookup(provides, ISecuredView, name=name) |
| | | if view is None: |
| | | view = reg.adapters.lookup(provides, IView, name=name) |
| | | if view is None: |
| | | raise TypeError('No registered view satisfies the constraints. ' |
| | | 'It would not make sense to claim that this view ' |
| | | '"is" or "is not" permitted.') |
| | | return Allowed( |
| | | 'Allowed: view name %r in context %r (no permission defined)' % |
| | | (name, context)) |
| | |
| | | config.add_view(protectedview, name='protected', permission='view') |
| | | config.add_view(routeview, route_name='aroute') |
| | | config.add_route('aroute', '/route') |
| | | config.set_authentication_policy(AuthTktAuthenticationPolicy('seekri1t')) |
| | | config.set_authentication_policy(AuthTktAuthenticationPolicy( |
| | | 'seekri1t', hashalg='sha512')) |
| | | config.set_authorization_policy(ACLAuthorizationPolicy()) |
| | | config.include('pyramid.tests.pkgs.conflictapp.included') |
| | |
| | | def includeme(config): |
| | | from pyramid.authorization import ACLAuthorizationPolicy |
| | | from pyramid.authentication import AuthTktAuthenticationPolicy |
| | | authn_policy = AuthTktAuthenticationPolicy('seekt1t') |
| | | authn_policy = AuthTktAuthenticationPolicy('seekt1t', hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config.scan('pyramid.tests.pkgs.defpermbugapp') |
| | | config._set_authentication_policy(authn_policy) |
| | |
| | | def includeme(config): |
| | | from pyramid.authentication import AuthTktAuthenticationPolicy |
| | | from pyramid.authorization import ACLAuthorizationPolicy |
| | | authn_policy = AuthTktAuthenticationPolicy('seekr1t') |
| | | authn_policy = AuthTktAuthenticationPolicy('seekr1t', hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config._set_authentication_policy(authn_policy) |
| | | config._set_authorization_policy(authz_policy) |
| | |
| | | return Response('OK bar') |
| | | |
| | | def includeme(config): |
| | | authn_policy = AuthTktAuthenticationPolicy('seekri1') |
| | | authn_policy = AuthTktAuthenticationPolicy('seekri1', hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config.set_authentication_policy(authn_policy) |
| | | config.set_authorization_policy(authz_policy) |
| | |
| | | def includeme(config): |
| | | from pyramid.authentication import AuthTktAuthenticationPolicy |
| | | from pyramid.authorization import ACLAuthorizationPolicy |
| | | authn_policy = AuthTktAuthenticationPolicy('seekt1t') |
| | | authn_policy = AuthTktAuthenticationPolicy('seekt1t', hashalg='sha512') |
| | | authz_policy = ACLAuthorizationPolicy() |
| | | config.set_authentication_policy(authn_policy) |
| | | config.set_authorization_policy(authz_policy) |
| | |
| | | import unittest |
| | | import warnings |
| | | from pyramid import testing |
| | | from pyramid.compat import ( |
| | | text_, |
| | |
| | | "authenticated_userid: groupfinder callback returned []; " |
| | | "returning 'fred'") |
| | | |
| | | def test_authenticated_userid_fails_cleaning_as_Authenticated(self): |
| | | request = DummyRequest(registry=self.config.registry) |
| | | policy = self._makeOne(userid='system.Authenticated') |
| | | self.assertEqual(policy.authenticated_userid(request), None) |
| | | self.assertEqual(len(self.messages), 1) |
| | | self.assertEqual( |
| | | self.messages[0], |
| | | "pyramid.tests.test_authentication.MyAuthenticationPolicy." |
| | | "authenticated_userid: use of userid 'system.Authenticated' is " |
| | | "disallowed by any built-in Pyramid security policy, returning " |
| | | "None") |
| | | |
| | | def test_authenticated_userid_fails_cleaning_as_Everyone(self): |
| | | request = DummyRequest(registry=self.config.registry) |
| | | policy = self._makeOne(userid='system.Everyone') |
| | | self.assertEqual(policy.authenticated_userid(request), None) |
| | | self.assertEqual(len(self.messages), 1) |
| | | self.assertEqual( |
| | | self.messages[0], |
| | | "pyramid.tests.test_authentication.MyAuthenticationPolicy." |
| | | "authenticated_userid: use of userid 'system.Everyone' is " |
| | | "disallowed by any built-in Pyramid security policy, returning " |
| | | "None") |
| | | |
| | | def test_effective_principals_no_unauthenticated_userid(self): |
| | | request = DummyRequest(registry=self.config.registry) |
| | | policy = self._makeOne() |
| | |
| | | "effective_principals: returning effective principals: " |
| | | "['system.Everyone', 'system.Authenticated', 'fred']") |
| | | |
| | | def test_effective_principals_with_unclean_principal_Authenticated(self): |
| | | request = DummyRequest(registry=self.config.registry) |
| | | policy = self._makeOne(userid='system.Authenticated') |
| | | self.assertEqual( |
| | | policy.effective_principals(request), |
| | | ['system.Everyone']) |
| | | self.assertEqual(len(self.messages), 1) |
| | | self.assertEqual( |
| | | self.messages[0], |
| | | "pyramid.tests.test_authentication.MyAuthenticationPolicy." |
| | | "effective_principals: unauthenticated_userid returned disallowed " |
| | | "'system.Authenticated'; returning ['system.Everyone'] as if it " |
| | | "was None") |
| | | |
| | | def test_effective_principals_with_unclean_principal_Everyone(self): |
| | | request = DummyRequest(registry=self.config.registry) |
| | | policy = self._makeOne(userid='system.Everyone') |
| | | self.assertEqual( |
| | | policy.effective_principals(request), |
| | | ['system.Everyone']) |
| | | self.assertEqual(len(self.messages), 1) |
| | | self.assertEqual( |
| | | self.messages[0], |
| | | "pyramid.tests.test_authentication.MyAuthenticationPolicy." |
| | | "effective_principals: unauthenticated_userid returned disallowed " |
| | | "'system.Everyone'; returning ['system.Everyone'] as if it " |
| | | "was None") |
| | | |
| | | class TestRepozeWho1AuthenticationPolicy(unittest.TestCase): |
| | | def _getTargetClass(self): |
| | | from pyramid.authentication import RepozeWho1AuthenticationPolicy |
| | |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.authenticated_userid(request), 'fred') |
| | | |
| | | def test_authenticated_userid_repoze_who_userid_is_None(self): |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':None}}) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.authenticated_userid(request), None) |
| | | |
| | | def test_authenticated_userid_with_callback_returns_None(self): |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':'fred'}}) |
| | |
| | | return ['agroup'] |
| | | policy = self._makeOne(callback=callback) |
| | | self.assertEqual(policy.authenticated_userid(request), 'fred') |
| | | |
| | | def test_authenticated_userid_unclean_principal_Authenticated(self): |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':'system.Authenticated'}} |
| | | ) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.authenticated_userid(request), None) |
| | | |
| | | def test_authenticated_userid_unclean_principal_Everyone(self): |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':'system.Everyone'}} |
| | | ) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.authenticated_userid(request), None) |
| | | |
| | | def test_effective_principals_None(self): |
| | | from pyramid.security import Everyone |
| | |
| | | def callback(identity, request): |
| | | return None |
| | | policy = self._makeOne(callback=callback) |
| | | self.assertEqual(policy.effective_principals(request), [Everyone]) |
| | | |
| | | def test_effective_principals_repoze_who_userid_is_None(self): |
| | | from pyramid.security import Everyone |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':None}} |
| | | ) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.effective_principals(request), [Everyone]) |
| | | |
| | | def test_effective_principals_repoze_who_userid_is_unclean_Everyone(self): |
| | | from pyramid.security import Everyone |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':'system.Everyone'}} |
| | | ) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.effective_principals(request), [Everyone]) |
| | | |
| | | def test_effective_principals_repoze_who_userid_is_unclean_Authenticated( |
| | | self): |
| | | from pyramid.security import Everyone |
| | | request = DummyRequest( |
| | | {'repoze.who.identity':{'repoze.who.userid':'system.Authenticated'}} |
| | | ) |
| | | policy = self._makeOne() |
| | | self.assertEqual(policy.effective_principals(request), [Everyone]) |
| | | |
| | | def test_remember_no_plugins(self): |
| | |
| | | result = policy.forget(request) |
| | | self.assertEqual(result, []) |
| | | |
| | | class TestAutkTktAuthenticationPolicy(unittest.TestCase): |
| | | class TestAuthTktAuthenticationPolicy(unittest.TestCase): |
| | | def _getTargetClass(self): |
| | | from pyramid.authentication import AuthTktAuthenticationPolicy |
| | | return AuthTktAuthenticationPolicy |
| | |
| | | inst.cookie = DummyCookieHelper(cookieidentity) |
| | | return inst |
| | | |
| | | def setUp(self): |
| | | self.warnings = warnings.catch_warnings() |
| | | self.warnings.__enter__() |
| | | warnings.simplefilter('ignore', DeprecationWarning) |
| | | |
| | | def tearDown(self): |
| | | self.warnings.__exit__(None, None, None) |
| | | |
| | | def test_allargs(self): |
| | | # pass all known args |
| | | inst = self._getTargetClass()( |
| | | 'secret', callback=None, cookie_name=None, secure=False, |
| | | include_ip=False, timeout=None, reissue_time=None, |
| | | hashalg='sha512', |
| | | ) |
| | | self.assertEqual(inst.callback, None) |
| | | |
| | | def test_class_implements_IAuthenticationPolicy(self): |
| | | from zope.interface.verify import verifyClass |
| | | from pyramid.interfaces import IAuthenticationPolicy |
| | | verifyClass(IAuthenticationPolicy, self._getTargetClass()) |
| | | |
| | | def test_instance_implements_IAuthenticationPolicy(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import IAuthenticationPolicy |
| | | verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) |
| | | def test_hashalg_override(self): |
| | | # important to ensure hashalg is passed to cookie helper |
| | | inst = self._getTargetClass()('secret', hashalg='sha512') |
| | | self.assertEqual(inst.cookie.hashalg, 'sha512') |
| | | |
| | | def test_unauthenticated_userid_returns_None(self): |
| | | request = DummyRequest({}) |
| | |
| | | policy = self._makeOne(None, None) |
| | | result = policy.forget(request) |
| | | self.assertEqual(result, []) |
| | | |
| | | def test_class_implements_IAuthenticationPolicy(self): |
| | | from zope.interface.verify import verifyClass |
| | | from pyramid.interfaces import IAuthenticationPolicy |
| | | verifyClass(IAuthenticationPolicy, self._getTargetClass()) |
| | | |
| | | def test_instance_implements_IAuthenticationPolicy(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import IAuthenticationPolicy |
| | | verifyObject(IAuthenticationPolicy, self._makeOne(None, None)) |
| | | |
| | | class TestAuthTktCookieHelper(unittest.TestCase): |
| | | def _getTargetClass(self): |
| | |
| | | result = ticket.digest() |
| | | self.assertEqual(result, '126fd6224912187ee9ffa80e0b81420c') |
| | | |
| | | def test_digest_sha512(self): |
| | | ticket = self._makeOne('secret', 'userid', '0.0.0.0', |
| | | time=10, hashalg='sha512') |
| | | result = ticket.digest() |
| | | self.assertEqual(result, '74770b2e0d5b1a54c2a466ec567a40f7d7823576aa49'\ |
| | | '3c65fc3445e9b44097f4a80410319ef8cb256a2e60b9'\ |
| | | 'c2002e48a9e33a3e8ee4379352c04ef96d2cb278') |
| | | |
| | | def test_cookie_value(self): |
| | | ticket = self._makeOne('secret', 'userid', '0.0.0.0', time=10, |
| | | tokens=('a', 'b')) |
| | |
| | | self.assertTrue(isinstance(exc, Exception)) |
| | | |
| | | class Test_parse_ticket(unittest.TestCase): |
| | | def _callFUT(self, secret, ticket, ip): |
| | | def _callFUT(self, secret, ticket, ip, hashalg='md5'): |
| | | from pyramid.authentication import parse_ticket |
| | | return parse_ticket(secret, ticket, ip) |
| | | return parse_ticket(secret, ticket, ip, hashalg) |
| | | |
| | | def _assertRaisesBadTicket(self, secret, ticket, ip): |
| | | def _assertRaisesBadTicket(self, secret, ticket, ip, hashalg='md5'): |
| | | from pyramid.authentication import BadTicket |
| | | self.assertRaises(BadTicket,self._callFUT, secret, ticket, ip) |
| | | self.assertRaises(BadTicket,self._callFUT, secret, ticket, ip, hashalg) |
| | | |
| | | def test_bad_timestamp(self): |
| | | ticket = 'x' * 64 |
| | |
| | | def test_correct_with_user_data(self): |
| | | ticket = '66f9cc3e423dc57c91df696cf3d1f0d80000000auserid!a,b!' |
| | | result = self._callFUT('secret', ticket, '0.0.0.0') |
| | | self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) |
| | | |
| | | def test_correct_with_user_data_sha512(self): |
| | | ticket = '7d947cdef99bad55f8e3382a8bd089bb9dd0547f7925b7d189adc1160cab'\ |
| | | '0ec0e6888faa41eba641a18522b26f19109f3ffafb769767ba8a26d02aae'\ |
| | | 'ae56599a0000000auserid!a,b!' |
| | | result = self._callFUT('secret', ticket, '0.0.0.0', 'sha512') |
| | | self.assertEqual(result, (10, 'userid', ['a', 'b'], '')) |
| | | |
| | | class TestSessionAuthenticationPolicy(unittest.TestCase): |
| | |
| | | |
| | | class DummyAuthTktModule(object): |
| | | def __init__(self, timestamp=0, userid='userid', tokens=(), user_data='', |
| | | parse_raise=False): |
| | | parse_raise=False, hashalg="md5"): |
| | | self.timestamp = timestamp |
| | | self.userid = userid |
| | | self.tokens = tokens |
| | | self.user_data = user_data |
| | | self.parse_raise = parse_raise |
| | | def parse_ticket(secret, value, remote_addr): |
| | | self.hashalg = hashalg |
| | | def parse_ticket(secret, value, remote_addr, hashalg): |
| | | self.secret = secret |
| | | self.value = value |
| | | self.remote_addr = remote_addr |
| | |
| | | self.assertRaises(ConfigurationError, tweens.implicit) |
| | | |
| | | def test_implicit_ordering_conflict_direct(self): |
| | | from pyramid.config.util import CyclicDependencyError |
| | | from pyramid.exceptions import CyclicDependencyError |
| | | tweens = self._makeOne() |
| | | add = tweens.add_implicit |
| | | add('browserid', 'browserid_factory') |
| | |
| | | self.assertRaises(CyclicDependencyError, tweens.implicit) |
| | | |
| | | def test_implicit_ordering_conflict_indirect(self): |
| | | from pyramid.config.util import CyclicDependencyError |
| | | from pyramid.exceptions import CyclicDependencyError |
| | | tweens = self._makeOne() |
| | | add = tweens.add_implicit |
| | | add('browserid', 'browserid_factory') |
| | |
| | | self.assertEqual(str(inst), |
| | | "Line 0 of file filename:\n linerepr ") |
| | | |
| | | class TestTopologicalSorter(unittest.TestCase): |
| | | def _makeOne(self, *arg, **kw): |
| | | from pyramid.config.util import TopologicalSorter |
| | | return TopologicalSorter(*arg, **kw) |
| | | |
| | | def test_remove(self): |
| | | inst = self._makeOne() |
| | | inst.names.append('name') |
| | | inst.name2val['name'] = 1 |
| | | inst.req_after.add('name') |
| | | inst.req_before.add('name') |
| | | inst.name2after['name'] = ('bob',) |
| | | inst.name2before['name'] = ('fred',) |
| | | inst.order.append(('bob', 'name')) |
| | | inst.order.append(('name', 'fred')) |
| | | inst.remove('name') |
| | | self.assertFalse(inst.names) |
| | | self.assertFalse(inst.req_before) |
| | | self.assertFalse(inst.req_after) |
| | | self.assertFalse(inst.name2before) |
| | | self.assertFalse(inst.name2after) |
| | | self.assertFalse(inst.name2val) |
| | | self.assertFalse(inst.order) |
| | | |
| | | def test_add(self): |
| | | from pyramid.config.util import LAST |
| | | sorter = self._makeOne() |
| | | sorter.add('name', 'factory') |
| | | self.assertEqual(sorter.names, ['name']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory'}) |
| | | self.assertEqual(sorter.order, [('name', LAST)]) |
| | | sorter.add('name2', 'factory2') |
| | | self.assertEqual(sorter.names, ['name', 'name2']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory', 'name2':'factory2'}) |
| | | self.assertEqual(sorter.order, |
| | | [('name', LAST), ('name2', LAST)]) |
| | | sorter.add('name3', 'factory3', before='name2') |
| | | self.assertEqual(sorter.names, |
| | | ['name', 'name2', 'name3']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory', 'name2':'factory2', |
| | | 'name3':'factory3'}) |
| | | self.assertEqual(sorter.order, |
| | | [('name', LAST), ('name2', LAST), |
| | | ('name3', 'name2')]) |
| | | |
| | | def test_sorted_ordering_1(self): |
| | | sorter = self._makeOne() |
| | | sorter.add('name1', 'factory1') |
| | | sorter.add('name2', 'factory2') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('name1', 'factory1'), |
| | | ('name2', 'factory2'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_2(self): |
| | | from pyramid.config.util import FIRST |
| | | sorter = self._makeOne() |
| | | sorter.add('name1', 'factory1') |
| | | sorter.add('name2', 'factory2', after=FIRST) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('name2', 'factory2'), |
| | | ('name1', 'factory1'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_3(self): |
| | | from pyramid.config.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('dbt', 'dbt_factory') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview') |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_4(self): |
| | | from pyramid.config.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_5(self): |
| | | from pyramid.config.util import LAST, FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory') |
| | | add('auth', 'auth_factory', after=FIRST) |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory', after=FIRST) |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview', before=LAST) |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_missing_before_partial(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_after_partial(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='txnmgr') |
| | | add('retry', 'retry_factory', before='dbt', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_before_and_after_partials(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='foo', after='txnmgr') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_before_partial_with_fallback(self): |
| | | from pyramid.config.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before=('txnmgr', LAST), |
| | | after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_missing_after_partial_with_fallback(self): |
| | | from pyramid.config.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | add('auth', 'auth_factory', after=('txnmgr','browserid')) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_partial_fallbacks(self): |
| | | from pyramid.config.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=('wontbethere', LAST)) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('wont2', 'exceptionview')) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_multiple_matching_fallbacks(self): |
| | | from pyramid.config.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('retry', 'exceptionview')) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_missing_fallbacks(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.config.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('txnmgr', 'auth')) |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_conflict_direct(self): |
| | | from pyramid.config.util import CyclicDependencyError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('browserid', 'browserid_factory') |
| | | add('auth', 'auth_factory', before='browserid', after='browserid') |
| | | self.assertRaises(CyclicDependencyError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_conflict_indirect(self): |
| | | from pyramid.config.util import CyclicDependencyError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('browserid', 'browserid_factory') |
| | | add('auth', 'auth_factory', before='browserid') |
| | | add('dbt', 'dbt_factory', after='browserid', before='auth') |
| | | self.assertRaises(CyclicDependencyError, sorter.sorted) |
| | | |
| | | class TestSingleton(unittest.TestCase): |
| | | def test_repr(self): |
| | | from pyramid.config.util import Singleton |
| | | r = repr(Singleton('ABC')) |
| | | self.assertEqual(r, 'ABC') |
| | | |
| | | class TestCyclicDependencyError(unittest.TestCase): |
| | | def _makeOne(self, cycles): |
| | | from pyramid.config.util import CyclicDependencyError |
| | | return CyclicDependencyError(cycles) |
| | | |
| | | def test___str__(self): |
| | | exc = self._makeOne({'a':['c', 'd'], 'c':['a']}) |
| | | result = str(exc) |
| | | self.assertTrue("'a' sorts before ['c', 'd']" in result) |
| | | self.assertTrue("'c' sorts before ['a']" in result) |
| | | |
| | | class DummyCustomPredicate(object): |
| | | def __init__(self): |
| | | self.__text__ = 'custom predicate' |
| | |
| | | result = wrapper(None, None) |
| | | self.assertEqual(result, 'OK') |
| | | |
| | | def test_add_view_with_decorator_tuple(self): |
| | | from pyramid.renderers import null_renderer |
| | | def view(request): |
| | | """ ABC """ |
| | | return 'OK' |
| | | def view_wrapper1(fn): |
| | | def inner(context, request): |
| | | return 'wrapped1' + fn(context, request) |
| | | return inner |
| | | def view_wrapper2(fn): |
| | | def inner(context, request): |
| | | return 'wrapped2' + fn(context, request) |
| | | return inner |
| | | config = self._makeOne(autocommit=True) |
| | | config.add_view(view=view, decorator=(view_wrapper2, view_wrapper1), |
| | | renderer=null_renderer) |
| | | wrapper = self._getViewCallable(config) |
| | | self.assertFalse(wrapper is view) |
| | | self.assertEqual(wrapper.__doc__, view.__doc__) |
| | | result = wrapper(None, None) |
| | | self.assertEqual(result, 'wrapped2wrapped1OK') |
| | | |
| | | def test_add_view_with_http_cache(self): |
| | | import datetime |
| | | from pyramid.response import Response |
| | |
| | | exc = self._makeOne('etype', 'evalue', 'info') |
| | | self.assertEqual(str(exc), 'etype: evalue\n in:\n info') |
| | | |
| | | class TestCyclicDependencyError(unittest.TestCase): |
| | | def _makeOne(self, cycles): |
| | | from pyramid.exceptions import CyclicDependencyError |
| | | return CyclicDependencyError(cycles) |
| | | |
| | | def test___str__(self): |
| | | exc = self._makeOne({'a':['c', 'd'], 'c':['a']}) |
| | | result = str(exc) |
| | | self.assertTrue("'a' sorts before ['c', 'd']" in result) |
| | | self.assertTrue("'c' sorts before ['a']" in result) |
| | | |
| | | |
| | |
| | | response = helper._make_response(la.encode('utf-8'), request) |
| | | self.assertEqual(response.body, la.encode('utf-8')) |
| | | |
| | | def test__make_response_result_is_None(self): |
| | | def test__make_response_result_is_None_no_body(self): |
| | | from pyramid.response import Response |
| | | request = testing.DummyRequest() |
| | | request.response = Response() |
| | | helper = self._makeOne('loo.foo') |
| | | response = helper._make_response(None, request) |
| | | self.assertEqual(response.body, b'') |
| | | |
| | | def test__make_response_result_is_None_existing_body_not_molested(self): |
| | | from pyramid.response import Response |
| | | request = testing.DummyRequest() |
| | | response = Response() |
| | | response.body = b'abc' |
| | | request.response = response |
| | | helper = self._makeOne('loo.foo') |
| | | response = helper._make_response(None, request) |
| | | self.assertEqual(response.body, b'abc') |
| | | |
| | | def test__make_response_with_content_type(self): |
| | | from pyramid.response import Response |
| | |
| | | from zope.interface.verify import verifyClass |
| | | from pyramid.interfaces import IRequest |
| | | verifyClass(IRequest, self._getTargetClass()) |
| | | klass = self._getTargetClass() |
| | | |
| | | def test_instance_conforms_to_IRequest(self): |
| | | from zope.interface.verify import verifyObject |
| | | from pyramid.interfaces import IRequest |
| | | verifyObject(IRequest, self._makeOne()) |
| | | |
| | | def test_ResponseClass_is_pyramid_Response(self): |
| | | from pyramid.response import Response |
| | | cls = self._getTargetClass() |
| | | self.assertEqual(cls.ResponseClass, Response) |
| | | |
| | | def test_charset_defaults_to_utf8(self): |
| | | r = self._makeOne({'PATH_INFO':'/'}) |
| | | self.assertEqual(r.charset, 'UTF-8') |
| | |
| | | return checker |
| | | |
| | | def test_no_permission(self): |
| | | from zope.interface import Interface |
| | | from pyramid.threadlocal import get_current_registry |
| | | from pyramid.interfaces import ISettings |
| | | from pyramid.interfaces import IView |
| | | from pyramid.interfaces import IViewClassifier |
| | | settings = dict(debug_authorization=True) |
| | | reg = get_current_registry() |
| | | reg.registerUtility(settings, ISettings) |
| | | context = DummyContext() |
| | | request = DummyRequest({}) |
| | | class DummyView(object): |
| | | pass |
| | | view = DummyView() |
| | | reg.registerAdapter(view, (IViewClassifier, Interface, Interface), |
| | | IView, '') |
| | | result = self._callFUT(context, request, '') |
| | | msg = result.msg |
| | | self.assertTrue("Allowed: view name '' in context" in msg) |
| | | self.assertTrue('(no permission defined)' in msg) |
| | | self.assertEqual(result, True) |
| | | |
| | | def test_no_view_registered(self): |
| | | from pyramid.threadlocal import get_current_registry |
| | | from pyramid.interfaces import ISettings |
| | | settings = dict(debug_authorization=True) |
| | |
| | | reg.registerUtility(settings, ISettings) |
| | | context = DummyContext() |
| | | request = DummyRequest({}) |
| | | result = self._callFUT(context, request, '') |
| | | msg = result.msg |
| | | self.assertTrue("Allowed: view name '' in context" in msg) |
| | | self.assertTrue('(no permission defined)' in msg) |
| | | self.assertEqual(result, True) |
| | | self.assertRaises(TypeError, self._callFUT, context, request, '') |
| | | |
| | | def test_with_permission(self): |
| | | from zope.interface import Interface |
| | |
| | | self._callFUT(inst), |
| | | str(inst)[:100] + ' ... ]') |
| | | |
| | | class TestTopologicalSorter(unittest.TestCase): |
| | | def _makeOne(self, *arg, **kw): |
| | | from pyramid.util import TopologicalSorter |
| | | return TopologicalSorter(*arg, **kw) |
| | | |
| | | def test_remove(self): |
| | | inst = self._makeOne() |
| | | inst.names.append('name') |
| | | inst.name2val['name'] = 1 |
| | | inst.req_after.add('name') |
| | | inst.req_before.add('name') |
| | | inst.name2after['name'] = ('bob',) |
| | | inst.name2before['name'] = ('fred',) |
| | | inst.order.append(('bob', 'name')) |
| | | inst.order.append(('name', 'fred')) |
| | | inst.remove('name') |
| | | self.assertFalse(inst.names) |
| | | self.assertFalse(inst.req_before) |
| | | self.assertFalse(inst.req_after) |
| | | self.assertFalse(inst.name2before) |
| | | self.assertFalse(inst.name2after) |
| | | self.assertFalse(inst.name2val) |
| | | self.assertFalse(inst.order) |
| | | |
| | | def test_add(self): |
| | | from pyramid.util import LAST |
| | | sorter = self._makeOne() |
| | | sorter.add('name', 'factory') |
| | | self.assertEqual(sorter.names, ['name']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory'}) |
| | | self.assertEqual(sorter.order, [('name', LAST)]) |
| | | sorter.add('name2', 'factory2') |
| | | self.assertEqual(sorter.names, ['name', 'name2']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory', 'name2':'factory2'}) |
| | | self.assertEqual(sorter.order, |
| | | [('name', LAST), ('name2', LAST)]) |
| | | sorter.add('name3', 'factory3', before='name2') |
| | | self.assertEqual(sorter.names, |
| | | ['name', 'name2', 'name3']) |
| | | self.assertEqual(sorter.name2val, |
| | | {'name':'factory', 'name2':'factory2', |
| | | 'name3':'factory3'}) |
| | | self.assertEqual(sorter.order, |
| | | [('name', LAST), ('name2', LAST), |
| | | ('name3', 'name2')]) |
| | | |
| | | def test_sorted_ordering_1(self): |
| | | sorter = self._makeOne() |
| | | sorter.add('name1', 'factory1') |
| | | sorter.add('name2', 'factory2') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('name1', 'factory1'), |
| | | ('name2', 'factory2'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_2(self): |
| | | from pyramid.util import FIRST |
| | | sorter = self._makeOne() |
| | | sorter.add('name1', 'factory1') |
| | | sorter.add('name2', 'factory2', after=FIRST) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('name2', 'factory2'), |
| | | ('name1', 'factory1'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_3(self): |
| | | from pyramid.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('dbt', 'dbt_factory') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview') |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_4(self): |
| | | from pyramid.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_5(self): |
| | | from pyramid.util import LAST, FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory') |
| | | add('auth', 'auth_factory', after=FIRST) |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory', after=FIRST) |
| | | add('txnmgr', 'txnmgr_factory', after='exceptionview', before=LAST) |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('txnmgr', 'txnmgr_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_missing_before_partial(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='txnmgr', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_after_partial(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='txnmgr') |
| | | add('retry', 'retry_factory', before='dbt', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_before_and_after_partials(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('dbt', 'dbt_factory') |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before='foo', after='txnmgr') |
| | | add('browserid', 'browserid_factory') |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_missing_before_partial_with_fallback(self): |
| | | from pyramid.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('auth', 'auth_factory', after='browserid') |
| | | add('retry', 'retry_factory', before=('txnmgr', LAST), |
| | | after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_missing_after_partial_with_fallback(self): |
| | | from pyramid.util import FIRST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', after=FIRST) |
| | | add('auth', 'auth_factory', after=('txnmgr','browserid')) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory') |
| | | add('dbt', 'dbt_factory') |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ('browserid', 'browserid_factory'), |
| | | ('auth', 'auth_factory'), |
| | | ('dbt', 'dbt_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_partial_fallbacks(self): |
| | | from pyramid.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=('wontbethere', LAST)) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('wont2', 'exceptionview')) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_multiple_matching_fallbacks(self): |
| | | from pyramid.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('retry', 'exceptionview')) |
| | | self.assertEqual(sorter.sorted(), |
| | | [ |
| | | ('browserid', 'browserid_factory'), |
| | | ('exceptionview', 'excview_factory'), |
| | | ('retry', 'retry_factory'), |
| | | ]) |
| | | |
| | | def test_sorted_ordering_with_missing_fallbacks(self): |
| | | from pyramid.exceptions import ConfigurationError |
| | | from pyramid.util import LAST |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('exceptionview', 'excview_factory', before=LAST) |
| | | add('retry', 'retry_factory', after='exceptionview') |
| | | add('browserid', 'browserid_factory', before=('txnmgr', 'auth')) |
| | | self.assertRaises(ConfigurationError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_conflict_direct(self): |
| | | from pyramid.exceptions import CyclicDependencyError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('browserid', 'browserid_factory') |
| | | add('auth', 'auth_factory', before='browserid', after='browserid') |
| | | self.assertRaises(CyclicDependencyError, sorter.sorted) |
| | | |
| | | def test_sorted_ordering_conflict_indirect(self): |
| | | from pyramid.exceptions import CyclicDependencyError |
| | | sorter = self._makeOne() |
| | | add = sorter.add |
| | | add('browserid', 'browserid_factory') |
| | | add('auth', 'auth_factory', before='browserid') |
| | | add('dbt', 'dbt_factory', after='browserid', before='auth') |
| | | self.assertRaises(CyclicDependencyError, sorter.sorted) |
| | | |
| | | class TestSentinel(unittest.TestCase): |
| | | def test_repr(self): |
| | | from pyramid.util import Sentinel |
| | | r = repr(Sentinel('ABC')) |
| | | self.assertEqual(r, 'ABC') |
| | | |
| | | def dummyfunc(): pass |
| | | |
| | | class Dummy(object): |
| | |
| | | def test_create_with_other_predicates(self): |
| | | decorator = self._makeOne(foo=1) |
| | | self.assertEqual(decorator.foo, 1) |
| | | |
| | | def test_create_decorator_tuple(self): |
| | | decorator = self._makeOne(decorator=('decorator1', 'decorator2')) |
| | | self.assertEqual(decorator.decorator, ('decorator1', 'decorator2')) |
| | | |
| | | def test_call_function(self): |
| | | decorator = self._makeOne() |
| | |
| | | renderer = settings[0]['renderer'] |
| | | self.assertTrue(renderer is renderer_helper) |
| | | self.assertEqual(config.pkg, pyramid.tests) |
| | | |
| | | def test_call_withdepth(self): |
| | | decorator = self._makeOne(_depth=2) |
| | | venusian = DummyVenusian() |
| | | decorator.venusian = venusian |
| | | def foo(): pass |
| | | decorator(foo) |
| | | self.assertEqual(venusian.depth, 2) |
| | | |
| | | class Test_append_slash_notfound_view(BaseTest, unittest.TestCase): |
| | | def _callFUT(self, context, request): |
| | |
| | | self.info = info |
| | | self.attachments = [] |
| | | |
| | | def attach(self, wrapped, callback, category=None): |
| | | def attach(self, wrapped, callback, category=None, depth=1): |
| | | self.attachments.append((wrapped, callback, category)) |
| | | self.depth = depth |
| | | return self.info |
| | | |
| | | class DummyRegistry(object): |
| | |
| | | |
| | | from pyramid.interfaces import ( |
| | | IExceptionViewClassifier, |
| | | IRequest, |
| | | IView, |
| | | ) |
| | | |
| | |
| | | # sane (e.g. caching headers) |
| | | if 'response' in attrs: |
| | | del attrs['response'] |
| | | request_iface = attrs['request_iface'] |
| | | # we use .get instead of .__getitem__ below due to |
| | | # https://github.com/Pylons/pyramid/issues/700 |
| | | request_iface = attrs.get('request_iface', IRequest) |
| | | provides = providedBy(exc) |
| | | for_ = (IExceptionViewClassifier, request_iface.combined, provides) |
| | | view_callable = adapters.lookup(for_, IView, default=None) |
| | |
| | | import inspect |
| | | import weakref |
| | | |
| | | from pyramid.exceptions import ( |
| | | ConfigurationError, |
| | | CyclicDependencyError, |
| | | ) |
| | | |
| | | from pyramid.compat import ( |
| | | iteritems_, |
| | | is_nonstr_iter, |
| | | integer_types, |
| | | string_types, |
| | | text_, |
| | |
| | | r = r[:100] + ' ... %s' % closer |
| | | return r |
| | | |
| | | class Sentinel(object): |
| | | def __init__(self, repr): |
| | | self.repr = repr |
| | | |
| | | def __repr__(self): |
| | | return self.repr |
| | | |
| | | FIRST = Sentinel('FIRST') |
| | | LAST = Sentinel('LAST') |
| | | |
| | | class TopologicalSorter(object): |
| | | """ A utility class which can be used to perform topological sorts against |
| | | tuple-like data.""" |
| | | def __init__( |
| | | self, |
| | | default_before=LAST, |
| | | default_after=None, |
| | | first=FIRST, |
| | | last=LAST, |
| | | ): |
| | | self.names = [] |
| | | self.req_before = set() |
| | | self.req_after = set() |
| | | self.name2before = {} |
| | | self.name2after = {} |
| | | self.name2val = {} |
| | | self.order = [] |
| | | self.default_before = default_before |
| | | self.default_after = default_after |
| | | self.first = first |
| | | self.last = last |
| | | |
| | | def remove(self, name): |
| | | """ Remove a node from the sort input """ |
| | | self.names.remove(name) |
| | | del self.name2val[name] |
| | | after = self.name2after.pop(name, []) |
| | | if after: |
| | | self.req_after.remove(name) |
| | | for u in after: |
| | | self.order.remove((u, name)) |
| | | before = self.name2before.pop(name, []) |
| | | if before: |
| | | self.req_before.remove(name) |
| | | for u in before: |
| | | self.order.remove((name, u)) |
| | | |
| | | def add(self, name, val, after=None, before=None): |
| | | """ Add a node to the sort input. The ``name`` should be a string or |
| | | any other hashable object, the ``val`` should be the sortable (doesn't |
| | | need to be hashable). ``after`` and ``before`` represents the name of |
| | | one of the other sortables (or a sequence of such named) or one of the |
| | | special sentinel values :attr:`pyramid.util.FIRST`` or |
| | | :attr:`pyramid.util.LAST` representing the first or last positions |
| | | respectively. ``FIRST`` and ``LAST`` can also be part of a sequence |
| | | passed as ``before`` or ``after``. A sortable should not be added |
| | | after LAST or before FIRST. An example:: |
| | | |
| | | sorter = TopologicalSorter() |
| | | sorter.add('a', {'a':1}, before=LAST, after='b') |
| | | sorter.add('b', {'b':2}, before=LAST, after='c') |
| | | sorter.add('c', {'c':3}) |
| | | |
| | | sorter.sorted() # will be {'c':3}, {'b':2}, {'a':1} |
| | | |
| | | """ |
| | | if name in self.names: |
| | | self.remove(name) |
| | | self.names.append(name) |
| | | self.name2val[name] = val |
| | | if after is None and before is None: |
| | | before = self.default_before |
| | | after = self.default_after |
| | | if after is not None: |
| | | if not is_nonstr_iter(after): |
| | | after = (after,) |
| | | self.name2after[name] = after |
| | | self.order += [(u, name) for u in after] |
| | | self.req_after.add(name) |
| | | if before is not None: |
| | | if not is_nonstr_iter(before): |
| | | before = (before,) |
| | | self.name2before[name] = before |
| | | self.order += [(name, o) for o in before] |
| | | self.req_before.add(name) |
| | | |
| | | |
| | | def sorted(self): |
| | | """ Returns the sort input values in topologically sorted order""" |
| | | order = [(self.first, self.last)] |
| | | roots = [] |
| | | graph = {} |
| | | names = [self.first, self.last] |
| | | names.extend(self.names) |
| | | |
| | | for a, b in self.order: |
| | | order.append((a, b)) |
| | | |
| | | def add_node(node): |
| | | if not node in graph: |
| | | roots.append(node) |
| | | graph[node] = [0] # 0 = number of arcs coming into this node |
| | | |
| | | def add_arc(fromnode, tonode): |
| | | graph[fromnode].append(tonode) |
| | | graph[tonode][0] += 1 |
| | | if tonode in roots: |
| | | roots.remove(tonode) |
| | | |
| | | for name in names: |
| | | add_node(name) |
| | | |
| | | has_before, has_after = set(), set() |
| | | for a, b in order: |
| | | if a in names and b in names: # deal with missing dependencies |
| | | add_arc(a, b) |
| | | has_before.add(a) |
| | | has_after.add(b) |
| | | |
| | | if not self.req_before.issubset(has_before): |
| | | raise ConfigurationError( |
| | | 'Unsatisfied before dependencies: %s' |
| | | % (', '.join(sorted(self.req_before - has_before))) |
| | | ) |
| | | if not self.req_after.issubset(has_after): |
| | | raise ConfigurationError( |
| | | 'Unsatisfied after dependencies: %s' |
| | | % (', '.join(sorted(self.req_after - has_after))) |
| | | ) |
| | | |
| | | sorted_names = [] |
| | | |
| | | while roots: |
| | | root = roots.pop(0) |
| | | sorted_names.append(root) |
| | | children = graph[root][1:] |
| | | for child in children: |
| | | arcs = graph[child][0] |
| | | arcs -= 1 |
| | | graph[child][0] = arcs |
| | | if arcs == 0: |
| | | roots.insert(0, child) |
| | | del graph[root] |
| | | |
| | | if graph: |
| | | # loop in input |
| | | cycledeps = {} |
| | | for k, v in graph.items(): |
| | | cycledeps[k] = v[1:] |
| | | raise CyclicDependencyError(cycledeps) |
| | | |
| | | result = [] |
| | | |
| | | for name in sorted_names: |
| | | if name in self.names: |
| | | result.append((name, self.name2val[name])) |
| | | |
| | | return result |
| | | |
| | |
| | | :meth:`pyramid.config.Configurator.add_view`. If any argument is left |
| | | out, its default will be the equivalent ``add_view`` default. |
| | | |
| | | An additional keyword argument named ``_depth`` is provided for people who |
| | | wish to reuse this class from another decorator. It will be passed in to |
| | | the :term:`venusian` ``attach`` function as the depth of the callstack when |
| | | Venusian checks if the decorator is being used in a class or module |
| | | context. It's not often used, but it can be useful in this circumstance. |
| | | See the ``attach`` function in Venusian for more information. |
| | | |
| | | See :ref:`mapping_views_using_a_decorator_section` for details about |
| | | using :class:`view_config`. |
| | | |
| | |
| | | |
| | | def __call__(self, wrapped): |
| | | settings = self.__dict__.copy() |
| | | depth = settings.pop('_depth', 1) |
| | | |
| | | def callback(context, name, ob): |
| | | config = context.config.with_package(info.module) |
| | | config.add_view(view=ob, **settings) |
| | | |
| | | info = self.venusian.attach(wrapped, callback, category='pyramid') |
| | | info = self.venusian.attach(wrapped, callback, category='pyramid', |
| | | depth=depth) |
| | | |
| | | if info.scope == 'class': |
| | | # if the decorator was attached to a method in a class, or |