Merged with master, fixed imports for py3
1 files added
19 files modified
New file |
| | |
| | | # Wire up travis |
| | | language: python |
| | | |
| | | env: |
| | | - TOXENV=py26 |
| | | - TOXENV=py27 |
| | | - TOXENV=py32 |
| | | - TOXENV=py33 |
| | | - TOXENV=py34 |
| | | - TOXENV=pypy |
| | | - TOXENV=pypy3 |
| | | - TOXENV=cover |
| | | |
| | | install: |
| | | - travis_retry pip install tox |
| | | |
| | | script: |
| | | - travis_retry tox |
| | | |
| | | notifications: |
| | | email: |
| | | - repoze-checkins@lists.repoze.org |
| | |
| | | Unreleased |
| | | ---------- |
| | | |
| | | - Add support for Python 3.4, PyPy3. |
| | | |
| | | - middleware: avoid UnboundLocalError when wrapped generater yields no |
| | | items. See: http://bugs.repoze.org/issue184 |
| | | |
| | |
| | | ``repoze.who`` -- WSGI Authentication Middleware / API |
| | | ====================================================== |
| | | ``repoze.who`` |
| | | ============== |
| | | |
| | | .. image:: https://travis-ci.org/repoze/repoze.who.png?branch=master |
| | | :target: https://travis-ci.org/repoze/repoze.who |
| | | |
| | | .. image:: https://readthedocs.org/projects/repozewho/badge/?version=latest |
| | | :target: http://repozewho.readthedocs.org/en/latest/ |
| | | :alt: Documentation Status |
| | | |
| | | Overview |
| | | -------- |
| | |
| | | for authorization (ensuring whether a user can or cannot perform the |
| | | operation implied by the request). This is considered to be the |
| | | domain of the WSGI application. |
| | | |
| | | See the ``docs`` subdirectory of this package (also available at least |
| | | provisionally at http://static.repoze.org/whodocs) for more |
| | | information. |
| | |
| | | from urllib.parse import parse_qsl |
| | | |
| | | try: |
| | | from ConfigParser import SafeConfigParser |
| | | import ConfigParser |
| | | except ImportError: #pragma NO COVER Python >= 3.0 |
| | | from configparser import SafeConfigParser |
| | | from configparser import ConfigParser |
| | | from configparser import ParsingError |
| | | else: #pragma NO COVER Python < 3.0 |
| | | from ConfigParser import SafeConfigParser as ConfigParser |
| | | from ConfigParser import ParsingError |
| | | |
| | | try: |
| | |
| | | from repoze.who.interfaces import IRequestClassifier |
| | | from repoze.who.middleware import PluggableAuthenticationMiddleware |
| | | from repoze.who._compat import StringIO |
| | | from repoze.who._compat import SafeConfigParser |
| | | from repoze.who._compat import ConfigParser |
| | | from repoze.who._compat import ParsingError |
| | | |
| | | def _resolve(name): |
| | |
| | | def parse(self, text): |
| | | if getattr(text, 'readline', None) is None: |
| | | text = StringIO(text) |
| | | cp = SafeConfigParser(defaults={'here': self.here}) |
| | | cp = ConfigParser(defaults={'here': self.here}) |
| | | try: |
| | | cp.read_file(text) |
| | | except AttributeError: #pragma NO COVER Python < 3.0 |
| | |
| | | import time |
| | | from wsgiref.handlers import _monthname # Locale-independent, RFC-2616 |
| | | from wsgiref.handlers import _weekdayname # Locale-independent, RFC-2616 |
| | | from urllib import urlencode |
| | | from urlparse import parse_qsl |
| | | try: |
| | | from urllib.parse import urlencode, parse_qsl |
| | | except ImportError: |
| | | from urllib import urlencode |
| | | from urlparse import parse_qsl |
| | | |
| | | from zope.interface import implementer |
| | | |
| | |
| | | if self._now_testing is not None: |
| | | self._setNowTesting(self._now_testing) |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin |
| | | return AuthTktCookiePlugin |
| | |
| | | return plugin |
| | | |
| | | def _makeTicket(self, userid='userid', remote_addr='0.0.0.0', |
| | | tokens = [], userdata='', |
| | | tokens = [], userdata='userdata', |
| | | cookie_name='auth_tkt', secure=False, |
| | | time=None): |
| | | #from paste.auth import auth_tkt |
| | |
| | | environ = self._makeEnviron() |
| | | result = plugin.identify(environ) |
| | | self.assertEqual(result, None) |
| | | |
| | | |
| | | def test_identify_good_cookie_include_ip(self): |
| | | plugin = self._makeOne('secret', include_ip=True) |
| | | val = self._makeTicket(remote_addr='1.1.1.1', userdata='foo=123') |
| | |
| | | environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=bogus'}) |
| | | result = plugin.identify(environ) |
| | | self.assertEqual(result, None) |
| | | |
| | | |
| | | def test_identify_bad_cookie_expired(self): |
| | | import time |
| | | plugin = self._makeOne('secret', timeout=2, reissue_time=1) |
| | |
| | | old_val = self._makeTicket(userid='userid') |
| | | environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val}) |
| | | userid = b'\xc2\xa9'.decode('utf-8') |
| | | if type(b'') == type(''): |
| | | userdata = 'userid_type=unicode' |
| | | else: # pragma: no cover Py3k |
| | | userdata = '' |
| | | new_val = self._makeTicket(userid=userid.encode('utf-8'), |
| | | userdata='userid_type=unicode') |
| | | userdata=userdata) |
| | | result = plugin.remember(environ, {'repoze.who.userid':userid, |
| | | 'userdata':{}}) |
| | | self.assertEqual(type(result[0][1]), str) |
| | |
| | | name, value = result.pop(0) |
| | | |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.endswith('; Expires=Sun, 08 Nov 2009 16:23:42 GMT')) |
| | | |
| | | def test_remember_max_age(self): |
| | |
| | | |
| | | name, value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.startswith('auth_tkt="%s"; Path=/; Max-Age=500' % tkt), |
| | | value) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.endswith('; Expires=Sun, 08 Nov 2009 16:23:42 GMT')) |
| | | |
| | | name, value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.startswith( |
| | | 'auth_tkt="%s"; Path=/; Domain=example.com; Max-Age=500' |
| | | % tkt), value) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.endswith('; Expires=Sun, 08 Nov 2009 16:23:42 GMT')) |
| | | |
| | | name, value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.startswith( |
| | | 'auth_tkt="%s"; Path=/; Domain=.example.com; Max-Age=500' % tkt), |
| | | value) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.endswith('; Expires=Sun, 08 Nov 2009 16:23:42 GMT')) |
| | | |
| | | def test_forget(self): |
| | |
| | | def test_authenticate_non_auth_tkt_credentials(self): |
| | | plugin = self._makeOne() |
| | | self.assertEqual(plugin.authenticate(environ={}, identity={}), None) |
| | | |
| | | |
| | | def test_authenticate_without_checker(self): |
| | | plugin = self._makeOne() |
| | | identity = {'repoze.who.plugins.auth_tkt.userid': 'phred'} |
| | | self.assertEqual(plugin.authenticate({}, identity), 'phred') |
| | | |
| | | |
| | | def test_authenticate_with_checker_and_non_existing_account(self): |
| | | plugin = self._makeOne('secret', userid_checker=dummy_userid_checker) |
| | | identity = {'repoze.who.plugins.auth_tkt.userid': 'phred'} |
| | | self.assertEqual(plugin.authenticate({}, identity), None) |
| | | |
| | | |
| | | def test_authenticate_with_checker_and_existing_account(self): |
| | | plugin = self._makeOne('secret', userid_checker=dummy_userid_checker) |
| | | identity = {'repoze.who.plugins.auth_tkt.userid': 'existing'} |
| | |
| | | 'max_age': u('500')}) |
| | | name, value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless(isinstance(value, str)) |
| | | self.failUnless( |
| | | self.assertTrue(isinstance(value, str)) |
| | | self.assertTrue( |
| | | value.startswith('auth_tkt="%s"; Path=/; Max-Age=500' % tkt), |
| | | (value, tkt)) |
| | | self.failUnless('; Expires=' in value) |
| | | |
| | | self.assertTrue('; Expires=' in value) |
| | | |
| | | name,value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.startswith( |
| | | 'auth_tkt="%s"; Path=/; Domain=example.com; Max-Age=500' |
| | | % tkt), value) |
| | | self.failUnless('; Expires=' in value) |
| | | self.assertTrue('; Expires=' in value) |
| | | |
| | | name,value = result.pop(0) |
| | | self.assertEqual('Set-Cookie', name) |
| | | self.failUnless( |
| | | self.assertTrue( |
| | | value.startswith( |
| | | 'auth_tkt="%s"; Path=/; Domain=.example.com; Max-Age=500' % tkt), |
| | | value) |
| | | self.failUnless('; Expires=' in value) |
| | | self.assertTrue('; Expires=' in value) |
| | | |
| | | |
| | | def dummy_userid_checker(userid): |
| | |
| | | import unittest |
| | | |
| | | |
| | | class TestBasicAuthPlugin(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | |
| | | def _makeOne(self, *arg, **kw): |
| | | plugin = self._getTargetClass()(*arg, **kw) |
| | | return plugin |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | def _makeEnviron(self, kw=None): |
| | | from wsgiref.util import setup_testing_defaults |
| | |
| | | for item in app_iter: |
| | | items.append(item) |
| | | response = b''.join(items).decode('utf-8') |
| | | self.failUnless(response.startswith('401 Unauthorized')) |
| | | self.assertTrue(response.startswith('401 Unauthorized')) |
| | | |
| | | def test_identify_noauthinfo(self): |
| | | plugin = self._makeOne('realm') |
| | |
| | | plugin = self._getTargetClass()(*arg, **kw) |
| | | return plugin |
| | | |
| | | def _makeEnviron(self, kw=None): |
| | | def _makeEnviron(self): |
| | | environ = {} |
| | | environ['wsgi.version'] = (1,0) |
| | | if kw is not None: |
| | | environ.update(kw) |
| | | return environ |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | def test_implements(self): |
| | | from zope.interface.verify import verifyClass |
| | |
| | | import os |
| | | here = os.path.abspath(os.path.dirname(__file__)) |
| | | htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd.nonesuch') |
| | | def check(password, hashed): |
| | | def check(password, hashed): # pragma: no cover |
| | | return True |
| | | plugin = self._makeOne(htpasswd, check) |
| | | environ = self._makeEnviron() |
| | |
| | | result = plugin.authenticate(environ, creds) |
| | | self.assertEqual(result, None) |
| | | self.assertEqual(len(logger.warnings), 1) |
| | | self.failUnless('could not open htpasswd' in logger.warnings[0]) |
| | | self.assertTrue('could not open htpasswd' in logger.warnings[0]) |
| | | |
| | | def test_crypt_check(self): |
| | | import sys |
| | | # win32 does not have a crypt library, don't |
| | | # fail here |
| | | if "win32" == sys.platform: |
| | | if "win32" == sys.platform: # pragma: no cover |
| | | return |
| | | |
| | | from crypt import crypt |
| | |
| | | |
| | | def test_plain_check(self): |
| | | from repoze.who.plugins.htpasswd import plain_check |
| | | self.failUnless(plain_check('password', 'password')) |
| | | self.failIf(plain_check('notpassword', 'password')) |
| | | self.assertTrue(plain_check('password', 'password')) |
| | | self.assertFalse(plain_check('notpassword', 'password')) |
| | | |
| | | def test_factory_no_filename_raises(self): |
| | | from repoze.who.plugins.htpasswd import make_plugin |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | |
| | | class TestRedirectorPlugin(_Base): |
| | | class TestRedirectorPlugin(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.plugins.redirector import RedirectorPlugin |
| | |
| | | reason_param=reason_param, |
| | | reason_header=reason_header) |
| | | |
| | | def _makeEnviron(self, login=None, password=None, came_from=None, |
| | | path_info='/', identifier=None, max_age=None): |
| | | def _makeEnviron(self, path_info='/', identifier=None): |
| | | from repoze.who._compat import StringIO |
| | | fields = [] |
| | | if login: |
| | | fields.append(('login', login)) |
| | | if password: |
| | | fields.append(('password', password)) |
| | | if came_from: |
| | | fields.append(('came_from', came_from)) |
| | | if max_age: |
| | | fields.append(('max_age', max_age)) |
| | | if identifier is None: |
| | | credentials = {'login':'chris', 'password':'password'} |
| | | identifier = DummyIdentifier(credentials) |
| | | content_type, body = encode_multipart_formdata(fields) |
| | | content_type, body = encode_multipart_formdata() |
| | | environ = {'wsgi.version': (1,0), |
| | | 'wsgi.input': StringIO(body), |
| | | 'wsgi.url_scheme':'http', |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[0][0], 'forget') |
| | | self.assertEqual(sr.headers[0][1], '1') |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | | url = sr.headers[1][1] |
| | | parts = urlparse(url) |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | | url = sr.headers[1][1] |
| | | parts = urlparse(url) |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[0][0], "forget") |
| | | self.assertEqual(sr.headers[0][1], "1") |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | | url = sr.headers[1][1] |
| | | parts = urlparse(url) |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | | url = sr.headers[1][1] |
| | | parts = urlparse(url) |
| | |
| | | [('forget', '1')]) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[1][0], 'Location') |
| | | url = sr.headers[1][1] |
| | | parts = urlparse(url) |
| | |
| | | []) |
| | | sr = DummyStartResponse() |
| | | result = b''.join(app(environ, sr)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(sr.headers[0][0], 'set-cookie') |
| | | self.assertEqual(sr.headers[0][1], 'a') |
| | | self.assertEqual(sr.headers[1][0], 'set-cookie') |
| | | self.assertEqual(sr.headers[1][1], 'b') |
| | | |
| | | class Test_make_redirecting_plugin(_Base): |
| | | class Test_make_redirecting_plugin(unittest.TestCase): |
| | | |
| | | def _callFUT(self, *args, **kw): |
| | | from repoze.who.plugins.redirector import make_plugin |
| | |
| | | self.assertEqual(plugin.reason_param, 'why') |
| | | self.assertEqual(plugin.reason_header, 'X-Reason') |
| | | |
| | | class DummyIdentifier: |
| | | class DummyIdentifier(object): |
| | | forgotten = False |
| | | remembered = False |
| | | |
| | |
| | | self.forget_headers = forget_headers |
| | | self.replace_app = replace_app |
| | | |
| | | def identify(self, environ): |
| | | if self.replace_app: |
| | | environ['repoze.who.application'] = self.replace_app |
| | | return self.credentials |
| | | |
| | | def forget(self, environ, identity): |
| | | self.forgotten = identity |
| | | return self.forget_headers |
| | | |
| | | def remember(self, environ, identity): |
| | | self.remembered = identity |
| | | return self.remember_headers |
| | | |
| | | class DummyStartResponse: |
| | | def __call__(self, status, headers, exc_info=None): |
| | | self.status = status |
| | |
| | | self.exc_info = exc_info |
| | | return [] |
| | | |
| | | def encode_multipart_formdata(fields): |
| | | def encode_multipart_formdata(): |
| | | BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$' |
| | | CRLF = '\r\n' |
| | | L = [] |
| | | for (key, value) in fields: |
| | | L.append('--' + BOUNDARY) |
| | | L.append('Content-Disposition: form-data; name="%s"' % key) |
| | | L.append('') |
| | | L.append(value) |
| | | L.append('--' + BOUNDARY + '--') |
| | | L.append('') |
| | | body = CRLF.join(L) |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class TestSQLAuthenticatorPlugin(_Base): |
| | | class TestSQLAuthenticatorPlugin(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.plugins.sql import SQLAuthenticatorPlugin |
| | |
| | | plugin = self._getTargetClass()(*arg, **kw) |
| | | return plugin |
| | | |
| | | def _makeEnviron(self, kw=None): |
| | | def _makeEnviron(self): |
| | | environ = {} |
| | | environ['wsgi.version'] = (1,0) |
| | | if kw is not None: |
| | | environ.update(kw) |
| | | return environ |
| | | |
| | | def test_implements(self): |
| | |
| | | self.assertEqual(dummy_factory.query, None) |
| | | self.assertEqual(dummy_factory.closed, False) |
| | | |
| | | class TestDefaultPasswordCompare(_Base): |
| | | class TestDefaultPasswordCompare(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.plugins.sql import default_password_compare |
| | |
| | | def _get_sha_hex_digest(self, clear='password'): |
| | | try: |
| | | from hashlib import sha1 |
| | | except ImportError: |
| | | except ImportError: # pragma: no cover Py3k |
| | | from sha import new as sha1 |
| | | if not isinstance(clear, type(b'')): |
| | | if not isinstance(clear, type(b'')): # pragma: no cover Py3k |
| | | clear = clear.encode('utf-8') |
| | | return sha1(clear).hexdigest() |
| | | |
| | |
| | | result = compare('notpassword', stored) |
| | | self.assertEqual(result, False) |
| | | |
| | | class TestSQLMetadataProviderPlugin(_Base): |
| | | class TestSQLMetadataProviderPlugin(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.plugins.sql import SQLMetadataProviderPlugin |
| | |
| | | self.assertEqual(dummy_factory.closed, True) |
| | | self.assertEqual(identity['md'], [ [1,2,3] ]) |
| | | self.assertEqual(dummy_factory.query, 'select foo from bar') |
| | | self.failIf('__userid' in identity) |
| | | self.assertFalse('__userid' in identity) |
| | | |
| | | class TestMakeSQLAuthenticatorPlugin(_Base): |
| | | class TestMakeSQLAuthenticatorPlugin(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.plugins.sql import make_authenticator_plugin |
| | |
| | | self.assertEqual(plugin.conn_factory, DummyConnFactory) |
| | | self.assertEqual(plugin.compare_fn, make_dummy_connfactory) |
| | | |
| | | class TestMakeSQLMetadataProviderPlugin(_Base): |
| | | class TestMakeSQLMetadataProviderPlugin(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.plugins.sql import make_metadata_plugin |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class AuthTicketTests(_Base): |
| | | class AuthTicketTests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from .._auth_tkt import AuthTicket |
| | |
| | | self.assertEqual(cookie['oatmeal']['secure'], 'true') |
| | | |
| | | |
| | | class BadTicketTests(_Base): |
| | | class BadTicketTests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from .._auth_tkt import BadTicket |
| | |
| | | self.assertEqual(exc.expected, 'foo') |
| | | |
| | | |
| | | class Test_parse_ticket(_Base): |
| | | class Test_parse_ticket(unittest.TestCase): |
| | | |
| | | def _callFUT(self, secret='SEEKRIT', ticket=None, ip='1.2.3.4'): |
| | | from .._auth_tkt import parse_ticket |
| | |
| | | try: |
| | | self._callFUT(ticket=TICKET) |
| | | except BadTicket as e: |
| | | self.failUnless(e.args[0].startswith( |
| | | self.assertTrue(e.args[0].startswith( |
| | | 'Timestamp is not a hex integer:')) |
| | | else: |
| | | else: # pragma: no cover |
| | | self.fail('Did not raise') |
| | | |
| | | def test_no_bang_after_userid(self): |
| | |
| | | self._callFUT(ticket=TICKET) |
| | | except BadTicket as e: |
| | | self.assertEqual(e.args[0], 'userid is not followed by !') |
| | | else: |
| | | else: # pragma: no cover |
| | | self.fail('Did not raise') |
| | | |
| | | def test_wo_tokens_or_data_bad_digest(self): |
| | |
| | | self._callFUT(ticket=TICKET) |
| | | except BadTicket as e: |
| | | self.assertEqual(e.args[0], 'Digest signature is not correct') |
| | | else: |
| | | else: # pragma: no cover |
| | | self.fail('Did not raise') |
| | | |
| | | def test_wo_tokens_or_data_ok_digest(self): |
| | |
| | | self.assertEqual(user_data, 'DATA') |
| | | |
| | | |
| | | class Test_helpers(_Base): |
| | | class Test_helpers(unittest.TestCase): |
| | | |
| | | # calculate_digest is not very testable, and fully exercised throug callers. |
| | | |
| | |
| | | def test_maybe_encode_bytes(self): |
| | | from .._auth_tkt import maybe_encode |
| | | foo = b'foo' |
| | | self.failUnless(maybe_encode(foo) is foo) |
| | | self.assertTrue(maybe_encode(foo) is foo) |
| | | |
| | | def test_maybe_encode_native_string(self): |
| | | from .._auth_tkt import maybe_encode |
| | |
| | | import unittest |
| | | |
| | | |
| | | class CompatTests(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | def test_REQUEST_METHOD_miss(self): |
| | | # PEP 3333 says CONTENT_TYPE is mandatory |
| | |
| | | from .._compat import SimpleCookie |
| | | environ = {'HTTP_COOKIE': 'qux=spam'} |
| | | cookies = get_cookies(environ) |
| | | self.failUnless(isinstance(cookies, SimpleCookie)) |
| | | self.assertTrue(isinstance(cookies, SimpleCookie)) |
| | | self.assertEqual(len(cookies), 1) |
| | | self.assertEqual(cookies['qux'].value, 'spam') |
| | | self.assertEqual(environ['paste.cookies'], (cookies, 'qux=spam')) |
| | |
| | | 'paste.cookies': (object(), 'foo=bar'), |
| | | } |
| | | cookies = get_cookies(environ) |
| | | self.failUnless(isinstance(cookies, SimpleCookie)) |
| | | self.assertTrue(isinstance(cookies, SimpleCookie)) |
| | | self.assertEqual(len(cookies), 1) |
| | | self.assertEqual(cookies['qux'].value, 'spam') |
| | | self.assertEqual(environ['paste.cookies'], (cookies, 'qux=spam')) |
| | |
| | | 'paste.cookies': (existing, 'qux=spam'), |
| | | } |
| | | cookies = get_cookies(environ) |
| | | self.failUnless(cookies is existing) |
| | | self.assertTrue(cookies is existing) |
| | | |
| | | def test_construct_url(self): |
| | | from .._compat import construct_url |
| | |
| | | def test_must_decode_non_string(self): |
| | | from .._compat import must_decode |
| | | foo = object() |
| | | self.failUnless(must_decode(foo) is foo) |
| | | self.assertTrue(must_decode(foo) is foo) |
| | | |
| | | def test_must_decode_unicode(self): |
| | | from .._compat import must_decode |
| | | from .._compat import u |
| | | foo = u('foo') |
| | | self.failUnless(must_decode(foo) is foo) |
| | | self.assertTrue(must_decode(foo) is foo) |
| | | |
| | | def test_must_decode_utf8(self): |
| | | from .._compat import must_decode |
| | |
| | | def test_must_encode_non_string(self): |
| | | from .._compat import must_encode |
| | | foo = object() |
| | | self.failUnless(must_encode(foo) is foo) |
| | | self.assertTrue(must_encode(foo) is foo) |
| | | |
| | | def test_must_encode_unicode(self): |
| | | from .._compat import must_encode |
| | |
| | | def test_must_encode_utf8(self): |
| | | from .._compat import must_encode |
| | | foo = b'b\xc3\xa2tard' |
| | | self.failUnless(must_encode(foo) is foo) |
| | | self.assertTrue(must_encode(foo) is foo) |
| | | |
| | | def test_must_encode_latin1(self): |
| | | from .._compat import must_encode |
| | | foo = b'b\xe2tard' |
| | | self.failUnless(must_encode(foo) is foo) |
| | | self.assertTrue(must_encode(foo) is foo) |
| | | |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class Test_get_api(_Base): |
| | | class Test_get_api(unittest.TestCase): |
| | | |
| | | def _callFUT(self, environ): |
| | | from repoze.who.api import get_api |
| | |
| | | def test___call___empty_environ(self): |
| | | environ = {} |
| | | api = self._callFUT(environ) |
| | | self.failUnless(api is None) |
| | | self.assertTrue(api is None) |
| | | |
| | | def test___call___w_api_in_environ(self): |
| | | expected = object() |
| | | environ = {'repoze.who.api': expected} |
| | | api = self._callFUT(environ) |
| | | self.failUnless(api is expected) |
| | | self.assertTrue(api is expected) |
| | | |
| | | class APIFactoryTests(_Base): |
| | | class APIFactoryTests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.api import APIFactory |
| | |
| | | environ = {} |
| | | factory = self._makeOne() |
| | | api = factory(environ) |
| | | self.failUnless(isinstance(api, API)) |
| | | self.failUnless(environ['repoze.who.api'] is api) |
| | | self.assertTrue(isinstance(api, API)) |
| | | self.assertTrue(environ['repoze.who.api'] is api) |
| | | |
| | | def test___call___w_api_in_environ(self): |
| | | expected = object() |
| | | environ = {'repoze.who.api': expected} |
| | | factory = self._makeOne() |
| | | api = factory(environ) |
| | | self.failUnless(api is expected) |
| | | self.assertTrue(api is expected) |
| | | |
| | | |
| | | class TestMakeRegistries(_Base): |
| | | class TestMakeRegistries(unittest.TestCase): |
| | | |
| | | def _callFUT(self, identifiers, authenticators, challengers, mdproviders): |
| | | from repoze.who.api import make_registries |
| | |
| | | self.assertEqual(name_reg['challenger'], dummy_challenger) |
| | | self.assertEqual(name_reg['mdprovider'], dummy_mdprovider) |
| | | |
| | | class TestMatchClassification(_Base): |
| | | class TestMatchClassification(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.api import match_classification |
| | |
| | | # any for either |
| | | self.assertEqual(f(IAuthenticator, plugins, 'buz'), [multi1, multi2]) |
| | | |
| | | class APITests(_Base): |
| | | class APITests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.api import API |
| | |
| | | logger=logger) |
| | | identity = api.authenticate() |
| | | self.assertEqual(identity['repoze.who.userid'], 'chrisid') |
| | | self.failUnless(identity['identifier'] is identifier) |
| | | self.failUnless(identity['authenticator'] is authenticator) |
| | | self.assertTrue(identity['identifier'] is identifier) |
| | | self.assertTrue(identity['authenticator'] is authenticator) |
| | | |
| | | self.assertEqual(len(logger._info), 1) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | |
| | | self.assertEqual(logger._info[0], 'request classification: match') |
| | | self.assertEqual(logger._info[1], 'no challenge app returned') |
| | | self.assertEqual(len(logger._debug), 2) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'challengers registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'challengers matched for ' |
| | | 'classification "match": [')) |
| | | |
| | |
| | | self.assertEqual(environ['challenged'], app) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(logger._info[0], 'request classification: match') |
| | | self.failUnless(logger._info[1].startswith('challenger plugin ')) |
| | | self.failUnless(logger._info[1].endswith( |
| | | self.assertTrue(logger._info[1].startswith('challenger plugin ')) |
| | | self.assertTrue(logger._info[1].endswith( |
| | | '"challenge" returned an app')) |
| | | self.assertEqual(len(logger._debug), 2) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'challengers registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'challengers matched for ' |
| | | 'classification "match": [')) |
| | | |
| | |
| | | self.assertEqual(result, None) |
| | | self.assertEqual(environ['challenged'], None) |
| | | self.assertEqual(identifier.forgotten, identity) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(len(logger._info), 3) |
| | | self.assertEqual(logger._info[0], 'request classification: match') |
| | | self.assertEqual(logger._info[1], 'no challenge app returned') |
| | | self.assertTrue(logger._info[1].startswith('forgetting via headers ')) |
| | | self.assertEqual(logger._info[2], 'no challenge app returned') |
| | | self.assertEqual(len(logger._debug), 2) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'challengers registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'challengers matched for ' |
| | | 'classification "match": [')) |
| | | |
| | |
| | | self.assertEqual(result, app) |
| | | self.assertEqual(environ['challenged'], app) |
| | | self.assertEqual(identifier.forgotten, identity) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(len(logger._info), 3) |
| | | self.assertEqual(logger._info[0], 'request classification: match') |
| | | self.failUnless(logger._info[1].startswith('challenger plugin ')) |
| | | self.failUnless(logger._info[1].endswith( |
| | | self.assertTrue(logger._info[1].startswith('forgetting via headers ')) |
| | | self.assertTrue(logger._info[2].startswith('challenger plugin ')) |
| | | self.assertTrue(logger._info[2].endswith( |
| | | '"challenge" returned an app')) |
| | | self.assertEqual(len(logger._debug), 2) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'challengers registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'challengers matched for ' |
| | | 'classification "match": [')) |
| | | |
| | |
| | | self.assertEqual(challenger._challenged_with[3], FORGET_HEADERS) |
| | | self.assertEqual(len(logger._info), 3) |
| | | self.assertEqual(logger._info[0], 'request classification: match') |
| | | self.failUnless(logger._info[1].startswith( |
| | | self.assertTrue(logger._info[1].startswith( |
| | | 'forgetting via headers from')) |
| | | self.failUnless(logger._info[1].endswith(repr(FORGET_HEADERS))) |
| | | self.failUnless(logger._info[2].startswith('challenger plugin ')) |
| | | self.failUnless(logger._info[2].endswith( |
| | | self.assertTrue(logger._info[1].endswith(repr(FORGET_HEADERS))) |
| | | self.assertTrue(logger._info[2].startswith('challenger plugin ')) |
| | | self.assertTrue(logger._info[2].endswith( |
| | | '"challenge" returned an app')) |
| | | self.assertEqual(len(logger._debug), 2) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'challengers registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'challengers matched for ' |
| | | 'classification "match": [')) |
| | | |
| | |
| | | self.assertEqual(identifier.forgotten, identity) |
| | | |
| | | def test_remember_identifier_plugin_returns_none(self): |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | return None |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return () |
| | | identity = {'identifier': _Identifier()} |
| | | identity = {'identifier': DummyNoResultsIdentifier()} |
| | | api = self._makeOne() |
| | | headers = api.remember(identity=identity) |
| | | self.assertEqual(tuple(headers), ()) |
| | |
| | | def test_remember_no_identity_passed_but_in_environ(self): |
| | | HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | logger = DummyLogger() |
| | | class _Identifier: |
| | | def remember(self, environ, identity): |
| | | return HEADERS |
| | | environ = self._makeEnviron() |
| | | environ['repoze.who.identity'] = {'identifier': _Identifier()} |
| | | environ['repoze.who.identity'] = { |
| | | 'identifier': DummyIdentifier(remember_headers=HEADERS)} |
| | | api = self._makeOne(environ=environ, logger=logger) |
| | | self.assertEqual(api.remember(), HEADERS) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.failUnless(logger._info[1].startswith( |
| | | self.assertTrue(logger._info[1].startswith( |
| | | 'remembering via headers from')) |
| | | self.failUnless(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertTrue(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertEqual(len(logger._debug), 0) |
| | | |
| | | def test_remember_w_identity_passed_no_identifier(self): |
| | |
| | | def test_remember_w_identity_passed_w_identifier(self): |
| | | HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | logger = DummyLogger() |
| | | class _Identifier: |
| | | def remember(self, environ, identity): |
| | | return HEADERS |
| | | environ = self._makeEnviron() |
| | | api = self._makeOne(environ=environ, logger=logger) |
| | | identity = {'identifier': _Identifier()} |
| | | identity = {'identifier': DummyIdentifier(remember_headers=HEADERS)} |
| | | self.assertEqual(api.remember(identity), HEADERS) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.failUnless(logger._info[1].startswith( |
| | | self.assertTrue(logger._info[1].startswith( |
| | | 'remembering via headers from')) |
| | | self.failUnless(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertTrue(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertEqual(len(logger._debug), 0) |
| | | |
| | | def test_forget_identifier_plugin_returns_none(self): |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | return None |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return () |
| | | identity = {'identifier': _Identifier()} |
| | | identity = {'identifier': DummyNoResultsIdentifier()} |
| | | api = self._makeOne() |
| | | headers = api.forget(identity=identity) |
| | | self.assertEqual(tuple(headers), ()) |
| | |
| | | def test_forget_no_identity_passed_but_in_environ(self): |
| | | HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | logger = DummyLogger() |
| | | class _Identifier: |
| | | def forget(self, environ, identity): |
| | | return HEADERS |
| | | environ = self._makeEnviron() |
| | | environ['repoze.who.identity'] = {'identifier': _Identifier()} |
| | | environ['repoze.who.identity'] = { |
| | | 'identifier': DummyIdentifier(forget_headers=HEADERS)} |
| | | api = self._makeOne(environ=environ, logger=logger) |
| | | self.assertEqual(api.forget(), HEADERS) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.failUnless(logger._info[1].startswith( |
| | | self.assertTrue(logger._info[1].startswith( |
| | | 'forgetting via headers from')) |
| | | self.failUnless(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertTrue(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertEqual(len(logger._debug), 0) |
| | | |
| | | def test_forget_w_identity_passed_no_identifier(self): |
| | |
| | | def test_forget_w_identity_passed_w_identifier(self): |
| | | HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | logger = DummyLogger() |
| | | class _Identifier: |
| | | def forget(self, environ, identity): |
| | | return HEADERS |
| | | environ = self._makeEnviron() |
| | | api = self._makeOne(environ=environ, logger=logger) |
| | | identity = {'identifier': _Identifier()} |
| | | identity = {'identifier': DummyIdentifier(forget_headers=HEADERS)} |
| | | self.assertEqual(api.forget(identity), HEADERS) |
| | | self.assertEqual(len(logger._info), 2) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.failUnless(logger._info[1].startswith( |
| | | self.assertTrue(logger._info[1].startswith( |
| | | 'forgetting via headers from')) |
| | | self.failUnless(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertTrue(logger._info[1].endswith(repr(HEADERS))) |
| | | self.assertEqual(len(logger._debug), 0) |
| | | |
| | | def test_login_w_identifier_name_hit(self): |
| | | REMEMBER_HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS[1:] |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS[:1] |
| | | def forget(self, environ, identity): |
| | | pass |
| | | authenticator = DummyAuthenticator('chrisid') |
| | | environ = self._makeEnviron() |
| | | identifiers = [('bogus', _BogusIdentifier()), |
| | | ('valid', _Identifier()), |
| | | identifiers = [('bogus', DummyNoResultsIdentifier()), |
| | | ('valid', DummyIdentifier( |
| | | remember_headers=REMEMBER_HEADERS)), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | authenticators=[('authentic', authenticator)], |
| | | environ=environ) |
| | | identity, headers = api.login({'login': 'chrisid'}, 'valid') |
| | | self.assertEqual(identity['repoze.who.userid'], 'chrisid') |
| | | self.assertEqual(headers, REMEMBER_HEADERS[1:]) |
| | | self.assertEqual(headers, REMEMBER_HEADERS) |
| | | |
| | | def test_login_wo_identifier_name_hit(self): |
| | | REMEMBER_HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS[1:] |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS[:1] |
| | | def forget(self, environ, identity): |
| | | pass |
| | | authenticator = DummyAuthenticator('chrisid') |
| | | environ = self._makeEnviron() |
| | | identifiers = [('bogus', _BogusIdentifier()), |
| | | ('valid', _Identifier()), |
| | | identifiers = [('bogus', DummyIdentifier( |
| | | remember_headers=REMEMBER_HEADERS[:1])), |
| | | ('valid', DummyIdentifier( |
| | | remember_headers=REMEMBER_HEADERS[1:])), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | authenticators=[('authentic', authenticator)], |
| | |
| | | def test_login_w_identifier_name_miss(self): |
| | | REMEMBER_HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return () |
| | | authenticator = DummyFailAuthenticator() |
| | | environ = self._makeEnviron() |
| | | identifiers = [('bogus', _BogusIdentifier()), |
| | | ('valid', _Identifier()), |
| | | identifiers = [('bogus', DummyNoResultsIdentifier()), |
| | | ('valid', DummyIdentifier( |
| | | remember_headers=REMEMBER_HEADERS, |
| | | forget_headers=FORGET_HEADERS)), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | authenticators=[('authentic', authenticator)], |
| | |
| | | |
| | | def test_logout_wo_identifier_name_miss(self): |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS[:1] |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS[1:] |
| | | environ = self._makeEnviron() |
| | | identifiers = [('valid', _Identifier()), |
| | | ('bogus', _BogusIdentifier()), |
| | | identifiers = [('valid', DummyIdentifier( |
| | | forget_headers=FORGET_HEADERS[:1])), |
| | | ('bogus', DummyIdentifier( |
| | | forget_headers=FORGET_HEADERS[1:])), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | environ=environ) |
| | |
| | | |
| | | def test_logout_w_identifier_name(self): |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return () |
| | | environ = self._makeEnviron() |
| | | identifiers = [('bogus', _BogusIdentifier()), |
| | | ('valid', _Identifier()), |
| | | identifiers = [('bogus', DummyNoResultsIdentifier()), |
| | | ('valid', DummyIdentifier( |
| | | forget_headers=FORGET_HEADERS)), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | environ=environ) |
| | |
| | | def test_logout_wo_identifier_name(self): |
| | | REMEMBER_HEADERS = [('Foo', 'Bar'), ('Baz', 'Qux')] |
| | | FORGET_HEADERS = [('Spam', 'Blah')] |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return REMEMBER_HEADERS |
| | | def forget(self, environ, identity): |
| | | return FORGET_HEADERS |
| | | class _BogusIdentifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def remember(self, environ, identity): |
| | | return () |
| | | def forget(self, environ, identity): |
| | | return () |
| | | authenticator = DummyFailAuthenticator() |
| | | environ = self._makeEnviron() |
| | | identifiers = [('valid', _Identifier()), |
| | | ('bogus', _BogusIdentifier()), |
| | | identifiers = [('bogus', DummyNoResultsIdentifier()), |
| | | ('valid', DummyIdentifier( |
| | | forget_headers=FORGET_HEADERS)), |
| | | ] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | authenticators=[('authentic', authenticator)], |
| | |
| | | self.assertEqual(headers, FORGET_HEADERS) |
| | | |
| | | def test_logout_removes_repoze_who_identity(self): |
| | | class _Identifier: |
| | | def identify(self, environ): |
| | | pass |
| | | def forget(self, environ, identity): |
| | | return () |
| | | def remember(self, environ, identity): |
| | | return () |
| | | authenticator = DummyFailAuthenticator() |
| | | environ = self._makeEnviron() |
| | | environ['repoze.who.identity'] = 'identity' |
| | | identifiers = [('valid', _Identifier())] |
| | | identifiers = [('valid', DummyNoResultsIdentifier())] |
| | | api = self._makeOne(identifiers=identifiers, |
| | | authenticators=[('authentic', authenticator)], |
| | | environ=environ) |
| | | api.logout() |
| | | self.failIf('repoze.who.identity' in environ) |
| | | self.assertFalse('repoze.who.identity' in environ) |
| | | |
| | | def test__identify_success(self): |
| | | environ = self._makeEnviron() |
| | |
| | | self.assertEqual(len(logger._info), 1) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.assertEqual(len(logger._debug), 4) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'identifier plugins registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'identifier plugins matched for ' |
| | | 'classification "browser": [')) |
| | | self.failUnless(logger._debug[2].startswith( |
| | | self.assertTrue(logger._debug[2].startswith( |
| | | 'no identity returned from <')) |
| | | self.failUnless(logger._debug[2].endswith('> (None)')) |
| | | self.assertTrue(logger._debug[2].endswith('> (None)')) |
| | | self.assertEqual(logger._debug[3], 'identities found: []') |
| | | |
| | | def test__identify_success_skip_noresults(self): |
| | |
| | | self.assertEqual(len(logger._info), 1) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.assertEqual(len(logger._debug), 5) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'authenticator plugins registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'authenticator plugins matched for ' |
| | | 'classification "browser": [')) |
| | | self.failUnless(logger._debug[2].startswith('no userid returned from')) |
| | | self.failUnless(logger._debug[3].startswith('userid returned from')) |
| | | self.failUnless(logger._debug[3].endswith('"chris"')) |
| | | self.failUnless(logger._debug[4].startswith( |
| | | self.assertTrue(logger._debug[2].startswith('no userid returned from')) |
| | | self.assertTrue(logger._debug[3].startswith('userid returned from')) |
| | | self.assertTrue(logger._debug[3].endswith('"chris"')) |
| | | self.assertTrue(logger._debug[4].startswith( |
| | | 'identities authenticated: [((1, 0),')) |
| | | |
| | | def test__authenticate_success_multiresult(self): |
| | |
| | | self.assertEqual(len(logger._info), 1) |
| | | self.assertEqual(logger._info[0], 'request classification: browser') |
| | | self.assertEqual(len(logger._debug), 5) |
| | | self.failUnless(logger._debug[0].startswith( |
| | | self.assertTrue(logger._debug[0].startswith( |
| | | 'authenticator plugins registered: [')) |
| | | self.failUnless(logger._debug[1].startswith( |
| | | self.assertTrue(logger._debug[1].startswith( |
| | | 'authenticator plugins matched for ' |
| | | 'classification "browser": [')) |
| | | self.failUnless(logger._debug[2].startswith('userid returned from')) |
| | | self.failUnless(logger._debug[2].endswith('"chris_id1"')) |
| | | self.failUnless(logger._debug[3].startswith('userid returned from')) |
| | | self.failUnless(logger._debug[3].endswith('"chris_id2"')) |
| | | self.failUnless(logger._debug[4].startswith( |
| | | self.assertTrue(logger._debug[2].startswith('userid returned from')) |
| | | self.assertTrue(logger._debug[2].endswith('"chris_id1"')) |
| | | self.assertTrue(logger._debug[3].startswith('userid returned from')) |
| | | self.assertTrue(logger._debug[3].endswith('"chris_id2"')) |
| | | self.assertTrue(logger._debug[4].startswith( |
| | | 'identities authenticated: [((0, 0),') |
| | | ) |
| | | |
| | |
| | | self.assertEqual(identity.get('fuz'), None) |
| | | |
| | | |
| | | class TestIdentityDict(_Base): |
| | | class TestIdentityDict(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.api import Identity |
| | |
| | | |
| | | def test_str(self): |
| | | identity = self._makeOne(foo=1) |
| | | self.failUnless(str(identity).startswith('<repoze.who identity')) |
| | | self.assertTrue(str(identity).startswith('<repoze.who identity')) |
| | | self.assertEqual(identity['foo'], 1) |
| | | |
| | | def test_repr(self): |
| | | identity = self._makeOne(foo=1) |
| | | self.failUnless(str(identity).startswith('<repoze.who identity')) |
| | | self.assertTrue(str(identity).startswith('<repoze.who identity')) |
| | | self.assertEqual(identity['foo'], 1) |
| | | |
| | | |
| | | |
| | | class DummyIdentifier: |
| | | class DummyIdentifier(object): |
| | | forgotten = False |
| | | remembered = False |
| | | |
| | | def __init__(self, credentials=None, remember_headers=None, |
| | | forget_headers=None, replace_app=None): |
| | | def __init__(self, credentials=None, |
| | | remember_headers=(), forget_headers=()): |
| | | self.credentials = credentials |
| | | self.remember_headers = remember_headers |
| | | self.forget_headers = forget_headers |
| | | self.replace_app = replace_app |
| | | |
| | | def identify(self, environ): |
| | | if self.replace_app: |
| | | environ['repoze.who.application'] = self.replace_app |
| | | return self.credentials |
| | | |
| | | def forget(self, environ, identity): |
| | |
| | | return self.remember_headers |
| | | |
| | | |
| | | class DummyNoResultsIdentifier: |
| | | class DummyNoResultsIdentifier(object): |
| | | |
| | | def identify(self, environ): |
| | | return None |
| | | |
| | | def remember(self, *arg, **kw): |
| | | pass |
| | | return () |
| | | |
| | | def forget(self, *arg, **kw): |
| | | pass |
| | | return () |
| | | |
| | | |
| | | class DummyAuthenticator: |
| | | class DummyAuthenticator(object): |
| | | def __init__(self, userid=None): |
| | | self.userid = userid |
| | | |
| | |
| | | return self.userid |
| | | |
| | | |
| | | class DummyFailAuthenticator: |
| | | class DummyFailAuthenticator(object): |
| | | def authenticate(self, environ, credentials): |
| | | return None |
| | | |
| | | |
| | | class DummyChallenger: |
| | | class DummyChallenger(object): |
| | | _challenged_with = None |
| | | def __init__(self, app=None): |
| | | self.app = app |
| | |
| | | return self.app |
| | | |
| | | |
| | | class DummyMDProvider: |
| | | class DummyMDProvider(object): |
| | | def __init__(self, metadata=None): |
| | | self._metadata = metadata |
| | | |
| | |
| | | return identity.update(self._metadata) |
| | | |
| | | |
| | | class DummyMultiPlugin: |
| | | class DummyMultiPlugin(object): |
| | | pass |
| | | |
| | | |
| | | class DummyRequestClassifier: |
| | | class DummyRequestClassifier(object): |
| | | def __call__(self, environ): |
| | | return 'browser' |
| | | |
| | | |
| | | class DummyChallengeDecider: |
| | | def __call__(self, environ, status, headers): |
| | | if status.startswith('401 '): |
| | | return True |
| | | class DummyChallengeDecider(object): |
| | | pass |
| | | |
| | | |
| | | class DummyLogger: |
| | | class DummyLogger(object): |
| | | _info = _debug = () |
| | | def info(self, msg): |
| | | self._info += (msg,) |
| | | def debug(self, msg): |
| | | self._debug += (msg,) |
| | | |
| | | class DummyApp: |
| | | class DummyApp(object): |
| | | environ = None |
| | | def __call__(self, environ, start_response): |
| | | self.environ = environ |
| | | return [] |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class TestDefaultRequestClassifier(_Base): |
| | | class TestDefaultRequestClassifier(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.classifiers import default_request_classifier |
| | |
| | | |
| | | def test_conforms_to_IRequestClassifier(self): |
| | | from repoze.who.interfaces import IRequestClassifier |
| | | self.failUnless(IRequestClassifier.providedBy(self._getFUT())) |
| | | self.assertTrue(IRequestClassifier.providedBy(self._getFUT())) |
| | | |
| | | def test_classify_dav_method(self): |
| | | classifier = self._getFUT() |
| | |
| | | self.assertEqual(result, 'browser') |
| | | |
| | | |
| | | class TestDefaultChallengeDecider(_Base): |
| | | class TestDefaultChallengeDecider(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.classifiers import default_challenge_decider |
| | | return default_challenge_decider |
| | | |
| | | def _makeEnviron(self, kw=None): |
| | | environ = {} |
| | | environ['wsgi.version'] = (1,0) |
| | | if kw is not None: |
| | | environ.update(kw) |
| | | return environ |
| | | |
| | | def test_conforms_to_IChallengeDecider(self): |
| | | from repoze.who.interfaces import IChallengeDecider |
| | | self.failUnless(IChallengeDecider.providedBy(self._getFUT())) |
| | | self.assertTrue(IChallengeDecider.providedBy(self._getFUT())) |
| | | |
| | | def test_challenges_on_401(self): |
| | | decider = self._getFUT() |
| | | self.failUnless(decider({}, '401 Unauthorized', [])) |
| | | self.assertTrue(decider({}, '401 Unauthorized', [])) |
| | | |
| | | def test_doesnt_challenges_on_non_401(self): |
| | | decider = self._getFUT() |
| | | self.failIf(decider({}, '200 Ok', [])) |
| | | self.assertFalse(decider({}, '200 Ok', [])) |
| | | |
| | | class TestPassthroughChallengeDecider(_Base): |
| | | class TestPassthroughChallengeDecider(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.classifiers import passthrough_challenge_decider |
| | | return passthrough_challenge_decider |
| | | |
| | | def _makeEnviron(self, kw=None): |
| | | environ = {} |
| | | environ['wsgi.version'] = (1,0) |
| | | if kw is not None: |
| | | environ.update(kw) |
| | | return environ |
| | | |
| | | def test_conforms_to_IChallengeDecider(self): |
| | | from repoze.who.interfaces import IChallengeDecider |
| | | self.failUnless(IChallengeDecider.providedBy(self._getFUT())) |
| | | self.assertTrue(IChallengeDecider.providedBy(self._getFUT())) |
| | | |
| | | def test_challenges_on_bare_401(self): |
| | | decider = self._getFUT() |
| | | self.failUnless(decider({}, '401 Unauthorized', [])) |
| | | self.assertTrue(decider({}, '401 Unauthorized', [])) |
| | | |
| | | def test_doesnt_challenges_on_non_401(self): |
| | | decider = self._getFUT() |
| | | self.failIf(decider({}, '200 Ok', [])) |
| | | self.assertFalse(decider({}, '200 Ok', [])) |
| | | |
| | | def test_doesnt_challenges_on_401_with_WWW_Authenticate(self): |
| | | decider = self._getFUT() |
| | | self.failIf(decider({}, '401 Ok', [('WWW-Authenticate', 'xxx')])) |
| | | self.assertFalse(decider({}, '401 Ok', [('WWW-Authenticate', 'xxx')])) |
| | | |
| | | def test_doesnt_challenges_on_401_with_text_html(self): |
| | | decider = self._getFUT() |
| | | self.failIf(decider({}, '401 Ok', [('Content-Type', 'text/html')])) |
| | | self.assertFalse(decider({}, '401 Ok', [('Content-Type', 'text/html')])) |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class TestWhoConfig(_Base): |
| | | class TestWhoConfig(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.config import WhoConfig |
| | |
| | | config = self._makeOne() |
| | | config.parse(PLUGINS_ONLY) |
| | | self.assertEqual(len(config.plugins), 2) |
| | | self.failUnless(isinstance(config.plugins['foo'], |
| | | self.assertTrue(isinstance(config.plugins['foo'], |
| | | DummyPlugin)) |
| | | bar = config.plugins['bar'] |
| | | self.failUnless(isinstance(bar, DummyPlugin)) |
| | | self.assertTrue(isinstance(bar, DummyPlugin)) |
| | | self.assertEqual(bar.credentials, 'qux') |
| | | |
| | | def test_parse_general_empty(self): |
| | |
| | | PLUGIN_CLASS = self._getDummyPluginClass(IDummy) |
| | | config = self._makeOne() |
| | | config.parse(GENERAL_ONLY) |
| | | self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS)) |
| | | self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(config.request_classifier, PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(config.challenge_decider, PLUGIN_CLASS)) |
| | | self.assertEqual(config.remote_user_key, 'ANOTHER_REMOTE_USER') |
| | | self.assertEqual(len(config.plugins), 0) |
| | | |
| | |
| | | PLUGIN_CLASS = self._getDummyPluginClass(IDummy) |
| | | config = self._makeOne() |
| | | config.parse(GENERAL_WITH_PLUGINS) |
| | | self.failUnless(isinstance(config.request_classifier, PLUGIN_CLASS)) |
| | | self.failUnless(isinstance(config.challenge_decider, PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(config.request_classifier, PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(config.challenge_decider, PLUGIN_CLASS)) |
| | | |
| | | def test_parse_identifiers_only(self): |
| | | from repoze.who.interfaces import IIdentifier |
| | |
| | | self.assertEqual(len(identifiers), 2) |
| | | first, second = identifiers |
| | | self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IIdentifier], 'klass1') |
| | | self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_identifiers_with_plugins(self): |
| | | from repoze.who.interfaces import IIdentifier |
| | |
| | | self.assertEqual(len(identifiers), 2) |
| | | first, second = identifiers |
| | | self.assertEqual(first[0], 'foo') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IIdentifier], 'klass1') |
| | | self.assertEqual(second[0], 'bar') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_authenticators_only(self): |
| | | from repoze.who.interfaces import IAuthenticator |
| | |
| | | self.assertEqual(len(authenticators), 2) |
| | | first, second = authenticators |
| | | self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IAuthenticator], 'klass1') |
| | | self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_authenticators_with_plugins(self): |
| | | from repoze.who.interfaces import IAuthenticator |
| | |
| | | self.assertEqual(len(authenticators), 2) |
| | | first, second = authenticators |
| | | self.assertEqual(first[0], 'foo') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IAuthenticator], 'klass1') |
| | | self.assertEqual(second[0], 'bar') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_challengers_only(self): |
| | | from repoze.who.interfaces import IChallenger |
| | |
| | | self.assertEqual(len(challengers), 2) |
| | | first, second = challengers |
| | | self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IChallenger], 'klass1') |
| | | self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_challengers_with_plugins(self): |
| | | from repoze.who.interfaces import IChallenger |
| | |
| | | self.assertEqual(len(challengers), 2) |
| | | first, second = challengers |
| | | self.assertEqual(first[0], 'foo') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IChallenger], 'klass1') |
| | | self.assertEqual(second[0], 'bar') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_mdproviders_only(self): |
| | | from repoze.who.interfaces import IMetadataProvider |
| | |
| | | self.assertEqual(len(mdproviders), 2) |
| | | first, second = mdproviders |
| | | self.assertEqual(first[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1') |
| | | self.assertEqual(second[0], 'repoze.who.tests.test_config:DummyPlugin') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_mdproviders_with_plugins(self): |
| | | from repoze.who.interfaces import IMetadataProvider |
| | |
| | | self.assertEqual(len(mdproviders), 2) |
| | | first, second = mdproviders |
| | | self.assertEqual(first[0], 'foo') |
| | | self.failUnless(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(first[1], PLUGIN_CLASS)) |
| | | self.assertEqual(len(first[1].classifications), 1) |
| | | self.assertEqual(first[1].classifications[IMetadataProvider], 'klass1') |
| | | self.assertEqual(second[0], 'bar') |
| | | self.failUnless(isinstance(second[1], PLUGIN_CLASS)) |
| | | self.assertTrue(isinstance(second[1], PLUGIN_CLASS)) |
| | | |
| | | def test_parse_make_plugin_names(self): |
| | | # see http://bugs.repoze.org/issue92 |
| | |
| | | config.parse(MAKE_PLUGIN_ARG_NAMES) |
| | | self.assertEqual(len(config.plugins), 1) |
| | | foo = config.plugins['foo'] |
| | | self.failUnless(isinstance(foo, DummyPlugin)) |
| | | self.assertTrue(isinstance(foo, DummyPlugin)) |
| | | self.assertEqual(foo.iface, 'iface') |
| | | self.assertEqual(foo.name, 'name') |
| | | self.assertEqual(foo.template, '%(template)s') |
| | |
| | | template_with_eq = template_with_eq = %%(template_with_eq)s |
| | | """ |
| | | |
| | | class TestConfigMiddleware(_Base): |
| | | class TestConfigMiddleware(unittest.TestCase): |
| | | _tempdir = None |
| | | |
| | | def setUp(self): |
| | |
| | | self.assertEqual(len(api_factory.authenticators), 1) |
| | | self.assertEqual(len(api_factory.challengers), 2) |
| | | self.assertEqual(len(api_factory.mdproviders), 0) |
| | | self.failUnless(middleware.logger, middleware.logger) |
| | | self.assertTrue(middleware.logger, middleware.logger) |
| | | self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG) |
| | | |
| | | def test_sample_config_no_log_level(self): |
| | |
| | | self.assertEqual(middleware.logger.getEffectiveLevel(), logging.WARN) |
| | | handlers = middleware.logger.handlers |
| | | self.assertEqual(len(handlers), 1) |
| | | self.failUnless(isinstance(handlers[0], logging.StreamHandler)) |
| | | self.assertTrue(isinstance(handlers[0], logging.StreamHandler)) |
| | | self.assertEqual(handlers[0].stream.name, logfile) |
| | | logging.shutdown() |
| | | handlers[0].stream.close() |
| | |
| | | self.assertEqual(middleware.logger.getEffectiveLevel(), logging.INFO) |
| | | handlers = middleware.logger.handlers |
| | | self.assertEqual(len(handlers), 1) |
| | | self.failUnless(isinstance(handlers[0], NullHandler)) |
| | | self.assertTrue(isinstance(handlers[0], NullHandler)) |
| | | logging.shutdown() |
| | | |
| | | class NullHandlerTests(_Base): |
| | | class NullHandlerTests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.config import NullHandler |
| | |
| | | def test_inheritance(self): |
| | | import logging |
| | | handler = self._makeOne() |
| | | self.failUnless(isinstance(handler, logging.Handler)) |
| | | self.assertTrue(isinstance(handler, logging.Handler)) |
| | | |
| | | def test_emit_doesnt_raise_NotImplementedError(self): |
| | | handler = self._makeOne() |
| | | handler.emit(object()) |
| | | |
| | | class Test_make_api_factory_with_config(_Base): |
| | | class Test_make_api_factory_with_config(unittest.TestCase): |
| | | _tempdir = None |
| | | _warning_filters = None |
| | | |
| | | def setUp(self): |
| | | pass |
| | |
| | | if self._tempdir is not None: |
| | | import shutil |
| | | shutil.rmtree(self._tempdir) |
| | | if self._warning_filters is not None: |
| | | import warnings |
| | | warnings.filters[:] = self._warning_filters |
| | | |
| | | def _getFactory(self): |
| | | from repoze.who.config import make_api_factory_with_config |
| | |
| | | self.assertEqual(len(api_factory.challengers), 0) |
| | | self.assertEqual(len(api_factory.mdproviders), 0) |
| | | self.assertEqual(api_factory.remote_user_key, 'REMOTE_USER') |
| | | self.failUnless(api_factory.logger is None) |
| | | self.failUnless(warned) |
| | | self.assertTrue(api_factory.logger is None) |
| | | self.assertTrue(warned) |
| | | |
| | | def test_bad_config_content(self): |
| | | import warnings |
| | |
| | | self.assertEqual(len(api_factory.challengers), 0) |
| | | self.assertEqual(len(api_factory.mdproviders), 0) |
| | | self.assertEqual(api_factory.remote_user_key, 'REMOTE_USER') |
| | | self.failUnless(api_factory.logger is None) |
| | | self.failUnless(warned) |
| | | self.assertTrue(api_factory.logger is None) |
| | | self.assertTrue(warned) |
| | | |
| | | def test_sample_config_no_logger(self): |
| | | factory = self._getFactory() |
| | |
| | | self.assertEqual(len(api_factory.challengers), 2) |
| | | self.assertEqual(len(api_factory.mdproviders), 0) |
| | | self.assertEqual(api_factory.remote_user_key, 'REMOTE_USER') |
| | | self.failUnless(api_factory.logger is None) |
| | | self.assertTrue(api_factory.logger is None) |
| | | |
| | | def test_sample_config_w_remote_user_key(self): |
| | | factory = self._getFactory() |
| | |
| | | self.assertEqual(len(api_factory.authenticators), 1) |
| | | self.assertEqual(len(api_factory.challengers), 2) |
| | | self.assertEqual(len(api_factory.mdproviders), 0) |
| | | self.failUnless(api_factory.logger is logger) |
| | | self.assertTrue(api_factory.logger is logger) |
| | | |
| | | SAMPLE_CONFIG = """\ |
| | | [plugin:redirector] |
| | |
| | | |
| | | class DummyApp: |
| | | environ = None |
| | | def __call__(self, environ, start_response): |
| | | self.environ = environ |
| | | return [] |
| | | |
| | | |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class TestMiddleware(_Base): |
| | | class TestMiddleware(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.middleware import PluggableAuthenticationMiddleware |
| | |
| | | start_response = DummyStartResponse() |
| | | result = b''.join(mw(environ, start_response)).decode('ascii') |
| | | self.assertEqual(environ['challenged'], challenge_app) |
| | | self.failUnless(result.startswith('401 Unauthorized')) |
| | | self.assertTrue(result.startswith('401 Unauthorized')) |
| | | |
| | | def test_call_401_challenger_and_identifier_no_authenticator(self): |
| | | from webob.exc import HTTPUnauthorized |
| | |
| | | |
| | | result = b''.join(mw(environ, start_response)).decode('ascii') |
| | | self.assertEqual(environ['challenged'], challenge_app) |
| | | self.failUnless(result.startswith('401 Unauthorized')) |
| | | self.assertTrue(result.startswith('401 Unauthorized')) |
| | | self.assertEqual(identifier.forgotten, False) |
| | | self.assertEqual(environ.get('REMOTE_USER'), None) |
| | | |
| | |
| | | start_response = DummyStartResponse() |
| | | result = b''.join(mw(environ, start_response)).decode('ascii') |
| | | self.assertEqual(environ['challenged'], challenge_app) |
| | | self.failUnless(result.startswith('401 Unauthorized')) |
| | | self.assertTrue(result.startswith('401 Unauthorized')) |
| | | # @@ unfuck |
| | | ## self.assertEqual(identifier.forgotten, identifier.credentials) |
| | | self.assertEqual(environ['REMOTE_USER'], 'chris') |
| | |
| | | mdproviders=mdproviders) |
| | | start_response = DummyStartResponse() |
| | | result = b''.join(mw(environ, start_response)).decode('ascii') |
| | | self.failUnless(result.startswith('302 Found')) |
| | | self.assertTrue(result.startswith('302 Found')) |
| | | self.assertEqual(start_response.status, '302 Found') |
| | | headers = start_response.headers |
| | | #self.assertEqual(len(headers), 3, headers) |
| | |
| | | self.assertEqual(headers[3], |
| | | ('a', '1')) |
| | | self.assertEqual(start_response.exc_info, None) |
| | | self.failIf('repoze.who.application' in environ) |
| | | self.assertFalse('repoze.who.application' in environ) |
| | | |
| | | def test_call_app_doesnt_call_start_response(self): |
| | | from webob.exc import HTTPUnauthorized |
| | |
| | | mdproviders=mdproviders) |
| | | start_response = DummyStartResponse() |
| | | result = b''.join(mw(environ, start_response)).decode('ascii') |
| | | self.failUnless(result.startswith('401 Unauthorized')) |
| | | self.failUnless(app._iterable._closed) |
| | | self.assertTrue(result.startswith('401 Unauthorized')) |
| | | self.assertTrue(app._iterable._closed) |
| | | |
| | | def test_call_w_challenge_but_no_challenger_still_closes_iterable(self): |
| | | environ = self._makeEnviron() |
| | |
| | | mdproviders=mdproviders) |
| | | start_response = DummyStartResponse() |
| | | self.assertRaises(RuntimeError, mw, environ, start_response) |
| | | self.failUnless(app._iterable._closed) |
| | | self.assertTrue(app._iterable._closed) |
| | | |
| | | # XXX need more call tests: |
| | | # - auth_id sorting |
| | | |
| | | class TestStartResponseWrapper(_Base): |
| | | class TestStartResponseWrapper(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.middleware import StartResponseWrapper |
| | |
| | | wrapper = self._makeOne(None) |
| | | self.assertEqual(wrapper.start_response, None) |
| | | self.assertEqual(wrapper.headers, []) |
| | | self.failUnless(wrapper.buffer) |
| | | self.assertTrue(wrapper.buffer) |
| | | |
| | | def test_finish_response(self): |
| | | from repoze.who._compat import StringIO |
| | |
| | | self.assertEqual(datases[0], 'written') |
| | | self.assertEqual(closededs[0], True) |
| | | |
| | | class WrapGeneratorTests(_Base): |
| | | class WrapGeneratorTests(unittest.TestCase): |
| | | |
| | | def _callFUT(self, iterable): |
| | | from repoze.who.middleware import wrap_generator |
| | |
| | | def test_w_empty_generator(self): |
| | | def gen(): |
| | | if False: |
| | | yield 'a' |
| | | yield 'a' # pragma: no cover |
| | | newgen = self._callFUT(gen()) |
| | | self.assertEqual(list(newgen), []) |
| | | |
| | |
| | | yield 'b' |
| | | iterable = DummyIterableWithClose(gen()) |
| | | newgen = self._callFUT(iterable) |
| | | self.failIf(iterable._closed) |
| | | self.assertFalse(iterable._closed) |
| | | self.assertEqual(list(newgen), ['a', 'b']) |
| | | self.failUnless(iterable._closed) |
| | | self.assertTrue(iterable._closed) |
| | | |
| | | class TestMakeTestMiddleware(_Base): |
| | | class TestMakeTestMiddleware(unittest.TestCase): |
| | | |
| | | def setUp(self): |
| | | import os |
| | | self._old_WHO_LOG = os.environ.get('WHO_LOG') |
| | | try: |
| | | del os.environ['WHO_LOG'] |
| | | except KeyError: |
| | | pass |
| | | |
| | | def tearDown(self): |
| | | import os |
| | | if self._old_WHO_LOG is not None: |
| | | os.environ['WHO_LOG'] = self._old_WHO_LOG |
| | | else: |
| | | if 'WHO_LOG' in os.environ: |
| | | del os.environ['WHO_LOG'] |
| | | try: |
| | | del os.environ['WHO_LOG'] |
| | | except KeyError: |
| | | pass |
| | | |
| | | def _getFactory(self): |
| | | from repoze.who.middleware import make_test_middleware |
| | |
| | | middleware = factory(app, global_conf) |
| | | self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG) |
| | | |
| | | class DummyApp: |
| | | class DummyApp(object): |
| | | environ = None |
| | | def __call__(self, environ, start_response): |
| | | self.environ = environ |
| | | return [] |
| | | |
| | | class DummyWorkingApp: |
| | | class DummyWorkingApp(object): |
| | | def __init__(self, status, headers): |
| | | self.status = status |
| | | self.headers = headers |
| | |
| | | start_response(self.status, self.headers) |
| | | return ['body'] |
| | | |
| | | class DummyGeneratorApp: |
| | | class DummyGeneratorApp(object): |
| | | def __init__(self, status, headers): |
| | | self.status = status |
| | | self.headers = headers |
| | |
| | | yield 'body' |
| | | return gen() |
| | | |
| | | class DummyIterableWithClose: |
| | | class DummyIterableWithClose(object): |
| | | _closed = False |
| | | def __init__(self, iterable): |
| | | self._iterable = iterable |
| | |
| | | def close(self): |
| | | self._closed = True |
| | | |
| | | class DummyIterableWithCloseApp: |
| | | class DummyIterableWithCloseApp(object): |
| | | def __init__(self, status, headers): |
| | | self.status = status |
| | | self.headers = headers |
| | |
| | | start_response(self.status, self.headers) |
| | | return self._iterable |
| | | |
| | | class DummyIdentityResetApp: |
| | | class DummyIdentityResetApp(object): |
| | | def __init__(self, status, headers, new_identity): |
| | | self.status = status |
| | | self.headers = headers |
| | |
| | | start_response(self.status, self.headers) |
| | | return ['body'] |
| | | |
| | | class DummyChallenger: |
| | | class DummyChallenger(object): |
| | | def __init__(self, app=None): |
| | | self.app = app |
| | | |
| | |
| | | environ['challenged'] = self.app |
| | | return self.app |
| | | |
| | | class DummyIdentifier: |
| | | class DummyIdentifier(object): |
| | | forgotten = False |
| | | remembered = False |
| | | |
| | |
| | | self.remembered = identity |
| | | return self.remember_headers |
| | | |
| | | class DummyAuthenticator: |
| | | def __init__(self, userid=None): |
| | | self.userid = userid |
| | | |
| | | class DummyAuthenticator(object): |
| | | def authenticate(self, environ, credentials): |
| | | if self.userid is None: |
| | | return credentials['login'] |
| | | return self.userid |
| | | return credentials['login'] |
| | | |
| | | class DummyFailAuthenticator: |
| | | def authenticate(self, environ, credentials): |
| | | return None |
| | | |
| | | class DummyRequestClassifier: |
| | | class DummyRequestClassifier(object): |
| | | def __call__(self, environ): |
| | | return 'browser' |
| | | |
| | | class DummyChallengeDecider: |
| | | class DummyChallengeDecider(object): |
| | | def __call__(self, environ, status, headers): |
| | | if status.startswith('401 '): |
| | | return True |
| | | |
| | | class DummyNoResultsIdentifier: |
| | | def identify(self, environ): |
| | | return None |
| | | |
| | | def remember(self, *arg, **kw): |
| | | pass |
| | | |
| | | def forget(self, *arg, **kw): |
| | | pass |
| | | |
| | | class DummyStartResponse: |
| | | class DummyStartResponse(object): |
| | | def __call__(self, status, headers, exc_info=None): |
| | | self.status = status |
| | | self.headers = headers |
| | | self.exc_info = exc_info |
| | | return [] |
| | | |
| | | class DummyMDProvider: |
| | | class DummyMDProvider(object): |
| | | def __init__(self, metadata=None): |
| | | self._metadata = metadata |
| | | |
| | | def add_metadata(self, environ, identity): |
| | | return identity.update(self._metadata) |
| | | |
| | | class DummyMultiPlugin: |
| | | pass |
| | |
| | | import unittest |
| | | |
| | | class _Base(unittest.TestCase): |
| | | |
| | | def failUnless(self, predicate, message=''): |
| | | self.assertTrue(predicate, message) # Nannies go home! |
| | | |
| | | def failIf(self, predicate, message=''): |
| | | self.assertFalse(predicate, message) # Nannies go home! |
| | | |
| | | class AuthenticatedPredicateTests(_Base): |
| | | class AuthenticatedPredicateTests(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.restrict import authenticated_predicate |
| | |
| | | def test___call___no_identity_returns_False(self): |
| | | predicate = self._getFUT() |
| | | environ = {} |
| | | self.failIf(predicate(environ)) |
| | | self.assertFalse(predicate(environ)) |
| | | |
| | | def test___call___w_REMOTE_AUTH_returns_True(self): |
| | | predicate = self._getFUT() |
| | | environ = {'REMOTE_USER': 'fred'} |
| | | self.failUnless(predicate(environ)) |
| | | self.assertTrue(predicate(environ)) |
| | | |
| | | def test___call___w_repoze_who_identity_returns_True(self): |
| | | predicate = self._getFUT() |
| | | environ = {'repoze.who.identity': {'login': 'fred'}} |
| | | self.failUnless(predicate(environ)) |
| | | self.assertTrue(predicate(environ)) |
| | | |
| | | class MakeAuthenticatedRestrictionTests(_Base): |
| | | class MakeAuthenticatedRestrictionTests(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.restrict import make_authenticated_restriction |
| | | return make_authenticated_restriction |
| | | |
| | | def test_enabled(self): |
| | | from repoze.who.restrict import authenticated_predicate |
| | | fut = self._getFUT() |
| | | app = DummyApp() |
| | | |
| | | filter = fut(app, {}, enabled=True) |
| | | |
| | | self.failUnless(filter.app is app) |
| | | self.failUnless(filter.enabled) |
| | | self.assertTrue(filter.app is app) |
| | | self.assertTrue(filter.enabled) |
| | | predicate = filter.predicate |
| | | self.failUnless(predicate({'REMOTE_USER': 'fred'})) |
| | | self.failUnless(predicate({'repoze.who.identity': {'login': 'fred'}})) |
| | | self.assertTrue(predicate({'REMOTE_USER': 'fred'})) |
| | | self.assertTrue(predicate({'repoze.who.identity': {'login': 'fred'}})) |
| | | |
| | | class PredicateRestrictionTests(_Base): |
| | | class PredicateRestrictionTests(unittest.TestCase): |
| | | |
| | | def _getTargetClass(self): |
| | | from repoze.who.restrict import PredicateRestriction |
| | |
| | | def test___call___disabled_predicate_false_calls_app_not_predicate(self): |
| | | _tested = [] |
| | | def _factory(): |
| | | def _predicate(env): |
| | | _tested.append(env) |
| | | return False |
| | | def _predicate(env): # pragma: no cover |
| | | assert False |
| | | return _predicate |
| | | |
| | | _started = [] |
| | | def _start_response(status, headers): |
| | | _started.append((status, headers)) |
| | | assert False # pragma: no cover |
| | | environ = {'testing': True} |
| | | |
| | | restrict = self._makeOne(predicate=_factory, enabled=False) |
| | | restrict(environ, _start_response) |
| | | |
| | | self.assertEqual(len(_tested), 0) |
| | | self.assertEqual(len(_started), 0) |
| | | self.assertEqual(restrict.app.environ, environ) |
| | | |
| | | def test___call___enabled_predicate_false_returns_401(self): |
| | |
| | | return True |
| | | return _predicate |
| | | |
| | | _started = [] |
| | | def _start_response(status, headers): |
| | | _started.append((status, headers)) |
| | | assert False # pragma: no cover |
| | | environ = {'testing': True, 'REMOTE_USER': 'fred'} |
| | | |
| | | restrict = self._makeOne(predicate=_factory) |
| | | restrict(environ, _start_response) |
| | | |
| | | self.assertEqual(len(_tested), 1) |
| | | self.assertEqual(len(_started), 0) |
| | | self.assertEqual(restrict.app.environ, environ) |
| | | |
| | | class MakePredicateRestrictionTests(_Base): |
| | | class MakePredicateRestrictionTests(unittest.TestCase): |
| | | |
| | | def _getFUT(self): |
| | | from repoze.who.restrict import make_predicate_restriction |
| | |
| | | fut = self._getFUT() |
| | | app = DummyApp() |
| | | def _predicate(env): |
| | | return True |
| | | return True # pragma: no cover |
| | | def _factory(): |
| | | return _predicate |
| | | |
| | | filter = fut(app, {}, predicate=_factory) |
| | | |
| | | self.failUnless(filter.app is app) |
| | | self.failUnless(filter.predicate is _predicate) |
| | | self.failUnless(filter.enabled) |
| | | self.assertTrue(filter.app is app) |
| | | self.assertTrue(filter.predicate is _predicate) |
| | | self.assertTrue(filter.enabled) |
| | | |
| | | def test_disabled_non_string_predicate_w_args(self): |
| | | fut = self._getFUT() |
| | |
| | | filter = fut(app, {}, predicate=DummyPredicate, enabled=False, |
| | | foo='Foo') |
| | | |
| | | self.failUnless(filter.app is app) |
| | | self.failUnless(isinstance(filter.predicate, DummyPredicate)) |
| | | self.assertTrue(filter.app is app) |
| | | self.assertTrue(isinstance(filter.predicate, DummyPredicate)) |
| | | self.assertEqual(filter.predicate.foo, 'Foo') |
| | | self.failIf(filter.enabled) |
| | | self.assertFalse(filter.enabled) |
| | | |
| | | def test_enabled_string_predicate_w_args(self): |
| | | fut = self._getFUT() |
| | |
| | | predicate='repoze.who.tests.test_restrict:DummyPredicate', |
| | | enabled=True, foo='Foo') |
| | | |
| | | self.failUnless(filter.app is app) |
| | | self.failUnless(isinstance(filter.predicate, DummyPredicate)) |
| | | self.assertTrue(filter.app is app) |
| | | self.assertTrue(isinstance(filter.predicate, DummyPredicate)) |
| | | self.assertEqual(filter.predicate.foo, 'Foo') |
| | | self.failUnless(filter.enabled) |
| | | self.assertTrue(filter.enabled) |
| | | |
| | | |
| | | class DummyApp: |
| | | class DummyApp(object): |
| | | environ = None |
| | | def __call__(self, environ, start_response): |
| | | self.environ = environ |
| | | return [] |
| | | |
| | | class DummyPredicate: |
| | | class DummyPredicate(object): |
| | | def __init__(self, **kw): |
| | | self.__dict__.update(kw) |
| | | def __call__(self, env): |
| | | return True |
| | |
| | | from setuptools import setup, find_packages |
| | | |
| | | here = os.path.abspath(os.path.dirname(__file__)) |
| | | README = open(os.path.join(here, 'README.rst')).read() |
| | | CHANGES = open(os.path.join(here, 'CHANGES.rst')).read() |
| | | def _read_file(filename): |
| | | try: |
| | | with open(os.path.join(here, filename)) as f: |
| | | return f.read() |
| | | except IOError: # Travis??? |
| | | return '' |
| | | |
| | | README = _read_file('README.rst') |
| | | CHANGES = _read_file('CHANGES.rst') |
| | | tests_require = ['WebOb', 'zope.interface'] |
| | | testing_extras = tests_require + ['nose', 'coverage'] |
| | | docs_extras = tests_require + ['Sphinx', 'repoze.sphinx.autointerface'] |
| | |
| | | 'framework for WSGI.'), |
| | | long_description='\n\n'.join([README, CHANGES]), |
| | | classifiers=[ |
| | | "Development Status :: 5 - Production/Stable", |
| | | "Intended Audience :: Developers", |
| | | "Programming Language :: Python :: 2", |
| | | "Programming Language :: Python :: 2.6", |
| | |
| | | "Programming Language :: Python :: 3", |
| | | "Programming Language :: Python :: 3.2", |
| | | "Programming Language :: Python :: 3.3", |
| | | "Programming Language :: Python :: 3.4", |
| | | "Programming Language :: Python :: Implementation :: CPython", |
| | | "Programming Language :: Python :: Implementation :: PyPy", |
| | | "Topic :: Internet :: WWW/HTTP", |
| | |
| | | [tox] |
| | | envlist = |
| | | py26,py27,py32,py33,pypy,cover,docs |
| | | py26,py27,pypy,py32,py33,py34,pypy3,cover,docs |
| | | |
| | | [testenv] |
| | | commands = |
| | |
| | | basepython = |
| | | python2.6 |
| | | commands = |
| | | nosetests --with-xunit --with-xcoverage |
| | | coverage erase |
| | | coverage run --timid --source=repoze setup.py test -q |
| | | coverage report --show-missing --omit="*fixture*" |
| | | coverage xml |
| | | deps = |
| | | zope.interface |
| | | WebOb |
| | | virtualenv |
| | | nose |
| | | coverage |
| | | nosexcover |
| | | |
| | | # we separate coverage into its own testenv because a) "last run wins" wrt |
| | | # cobertura jenkins reporting and b) pypy and jython can't handle any |