Tres Seaver
2009-04-18 9242ca3e503017c73b6e77f7b9b8c42e7b64dbeb
Move tests for plugins subpackage into that package.
1 files added
3 files modified
4 files renamed
2592 ■■■■ changed files
TODO.txt 6 ●●●● patch | view | raw | blame | history
repoze/who/plugins/fixtures/__init__.py patch | view | raw | blame | history
repoze/who/plugins/fixtures/form.html patch | view | raw | blame | history
repoze/who/plugins/fixtures/test.htpasswd patch | view | raw | blame | history
repoze/who/plugins/fixtures/testapp.py patch | view | raw | blame | history
repoze/who/plugins/tests.py 1337 ●●●●● patch | view | raw | blame | history
repoze/who/tests.py 1247 ●●●●● patch | view | raw | blame | history
setup.py 2 ●●● patch | view | raw | blame | history
TODO.txt
@@ -1,2 +1,6 @@
- Config file parser.
repoze.who TODOs
================
- [X} Config file parser.
- [_] Allow the ZODB plugin to connect read-only.
repoze/who/plugins/fixtures/__init__.py
repoze/who/plugins/fixtures/form.html
repoze/who/plugins/fixtures/test.htpasswd
repoze/who/plugins/fixtures/testapp.py
repoze/who/plugins/tests.py
New file
@@ -0,0 +1,1337 @@
import os
import unittest
class Base(unittest.TestCase):
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
class TestBasicAuthPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.basicauth import BasicAuthPlugin
        return BasicAuthPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IChallenger
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IChallenger, klass)
        verifyClass(IIdentifier, klass)
    def test_challenge(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        result = plugin.challenge(environ, '401 Unauthorized', [], [])
        self.assertNotEqual(result, None)
        app_iter = result(environ, lambda *arg: None)
        items = []
        for item in app_iter:
            items.append(item)
        response = ''.join(items)
        self.failUnless(response.startswith('401 Unauthorized'))
    def test_identify_noauthinfo(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_nonbasic(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_badencoding(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_badrepr(self):
        plugin = self._makeOne('realm')
        value = 'foo'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_ok(self):
        plugin = self._makeOne('realm')
        value = 'foo:bar'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        creds = plugin.identify(environ)
        self.assertEqual(creds, {'login':'foo', 'password':'bar'})
    def test_remember(self):
        plugin = self._makeOne('realm')
        creds = {}
        environ = self._makeEnviron()
        result = plugin.remember(environ, creds)
        self.assertEqual(result, None)
    def test_forget(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        result = plugin.forget(environ, creds)
        self.assertEqual(result, [('WWW-Authenticate', 'Basic realm="realm"')] )
    def test_challenge_forgetheaders_includes(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        forget = plugin._get_wwwauth()
        result = plugin.challenge(environ, '401 Unauthorized', [], forget)
        self.assertEqual(result.headers, forget)
    def test_challenge_forgetheaders_omits(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        forget = plugin._get_wwwauth()
        result = plugin.challenge(environ, '401 Unauthorized', [], [])
        self.assertEqual(result.headers, forget)
    def test_factory(self):
        from repoze.who.plugins.basicauth import make_plugin
        plugin = make_plugin('realm')
        self.assertEqual(plugin.realm, 'realm')
class TestHTPasswdPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.htpasswd import HTPasswdPlugin
        return HTPasswdPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IAuthenticator
        klass = self._getTargetClass()
        verifyClass(IAuthenticator, klass)
    def test_authenticate_nocreds(self):
        from StringIO import StringIO
        io = StringIO()
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_nolines(self):
        from StringIO import StringIO
        io = StringIO()
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_nousermatch(self):
        from StringIO import StringIO
        io = StringIO('nobody:foo')
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_match(self):
        from StringIO import StringIO
        io = StringIO('chrism:pass')
        def check(password, hashed):
            return True
        plugin = self._makeOne(io, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_badline(self):
        from StringIO import StringIO
        io = StringIO('badline\nchrism:pass')
        def check(password, hashed):
            return True
        plugin = self._makeOne(io, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_filename(self):
        here = os.path.abspath(os.path.dirname(__file__))
        htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd')
        def check(password, hashed):
            return True
        plugin = self._makeOne(htpasswd, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_bad_filename_logs_to_repoze_who_logger(self):
        here = os.path.abspath(os.path.dirname(__file__))
        htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd.nonesuch')
        def check(password, hashed):
            return True
        plugin = self._makeOne(htpasswd, check)
        environ = self._makeEnviron()
        class DummyLogger:
            warnings = []
            def warn(self, msg):
                self.warnings.append(msg)
        logger = environ['repoze.who.logger'] = DummyLogger()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
        self.assertEqual(len(logger.warnings), 1)
        self.failUnless('could not open htpasswd' in logger.warnings[0])
    def test_crypt_check(self):
        import sys
        # win32 does not have a crypt library, don't
        # fail here
        if "win32" == sys.platform:
            return
        from crypt import crypt
        salt = '123'
        hashed = crypt('password', salt)
        from repoze.who.plugins.htpasswd import crypt_check
        self.assertEqual(crypt_check('password', hashed), True)
        self.assertEqual(crypt_check('notpassword', hashed), False)
    def test_factory(self):
        from repoze.who.plugins.htpasswd import make_plugin
        from repoze.who.plugins.htpasswd import crypt_check
        plugin = make_plugin('foo',
                             'repoze.who.plugins.htpasswd:crypt_check')
        self.assertEqual(plugin.filename, 'foo')
        self.assertEqual(plugin.check, crypt_check)
class TestInsecureCookiePlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.cookie import InsecureCookiePlugin
        return InsecureCookiePlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
    def test_identify_nocookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_badcookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_success(self):
        plugin = self._makeOne('oatmeal')
        auth = 'foo:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'foo', 'password':'password'})
    def test_remember_creds_same(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'foo', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        auth = 'oatmeal=%s;' % auth
        environ = self._makeEnviron({'HTTP_COOKIE':auth})
        result = plugin.remember(environ, creds)
        self.assertEqual(result, None)
    def test_remember_creds_different(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'bar', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        creds_auth = 'bar:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.remember(environ, creds)
        expected = 'oatmeal=%s; Path=/;' % creds_auth
        self.assertEqual(result, [('Set-Cookie', expected)])
    def test_factory(self):
        from repoze.who.plugins.cookie import make_plugin
        plugin = make_plugin('foo')
        self.assertEqual(plugin.cookie_name, 'foo')
    def test_forget(self):
        plugin = self._makeOne('oatmeal')
        headers = plugin.forget({}, None)
        self.assertEqual(len(headers), 1)
        header = headers[0]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value,
            'oatmeal=""; Path=/; Expires=Sun, 10-May-1971 11:59:00 GMT')
class TestFormPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.form import FormPlugin
        return FormPlugin
    def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie',
                 formbody=None):
        plugin = self._getTargetClass()(login_form_qs, rememberer_name,
                                        formbody)
        return plugin
    def _makeFormEnviron(self, login=None, password=None, do_login=False):
        from StringIO import StringIO
        fields = []
        if login:
            fields.append(('login', login))
        if password:
            fields.append(('password', password))
        content_type, body = encode_multipart_formdata(fields)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        extra = {'wsgi.input':StringIO(body),
                 'wsgi.url_scheme': 'http',
                 'SERVER_NAME': 'localhost',
                 'SERVER_PORT': '8080',
                 'CONTENT_TYPE':content_type,
                 'CONTENT_LENGTH':len(body),
                 'REQUEST_METHOD':'POST',
                 'repoze.who.plugins': {'cookie':identifier},
                 'PATH_INFO': '/protected',
                 'QUERY_STRING':'',
                 }
        if do_login:
            extra['QUERY_STRING'] = '__do_login=true'
        environ = self._makeEnviron(extra)
        return environ
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
        verifyClass(IChallenger, klass)
    def test_identify_noqs(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_qs_no_values(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True)
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_nologin(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, login='chris')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_nopassword(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_success(self):
        from paste.httpexceptions import HTTPFound
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.failUnless(isinstance(app, HTTPFound))
        self.assertEqual(app.location(), 'http://localhost:8080/protected')
    def test_remember(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.remember(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].remembered,
                         identity)
    def test_forget(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.forget(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].forgotten,
                         identity
                         )
    def test_challenge_defaultform(self):
        from repoze.who.plugins.form import _DEFAULT_FORM
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [], [])
        sr = DummyStartResponse()
        result = app(environ, sr)
        self.assertEqual(''.join(result), _DEFAULT_FORM)
        self.assertEqual(len(sr.headers), 2)
        cl = str(len(_DEFAULT_FORM))
        self.assertEqual(sr.headers[0], ('Content-Length', cl))
        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
        self.assertEqual(sr.status, '200 OK')
    def test_challenge_customform(self):
        here = os.path.dirname(__file__)
        fixtures = os.path.join(here, 'fixtures')
        form = os.path.join(fixtures, 'form.html')
        formbody = open(form).read()
        plugin = self._makeOne(formbody=formbody)
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [], [])
        sr = DummyStartResponse()
        result = app(environ, sr)
        self.assertEqual(''.join(result), formbody)
        self.assertEqual(len(sr.headers), 2)
        cl = str(len(formbody))
        self.assertEqual(sr.headers[0], ('Content-Length', cl))
        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
        self.assertEqual(sr.status, '200 OK')
    def test_challenge_with_location(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized',
                               [('Location', 'http://foo/bar')],
                               [('Set-Cookie', 'a=123')])
        sr = DummyStartResponse()
        app(environ, sr)
        headers = sorted(sr.headers)
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0], ('Location', 'http://foo/bar'))
        self.assertEqual(headers[1],
                         ('Set-Cookie', 'a=123'))
        self.assertEqual(headers[2],
                         ('content-type', 'text/plain; charset=utf8'))
        self.assertEqual(sr.status, '302 Found')
    def test_factory_withform(self):
        from repoze.who.plugins.form import make_plugin
        here = os.path.dirname(__file__)
        fixtures = os.path.join(here, 'fixtures')
        form = os.path.join(fixtures, 'form.html')
        formbody = open(form).read()
        plugin = make_plugin('__login', 'cookie', form)
        self.assertEqual(plugin.login_form_qs, '__login')
        self.assertEqual(plugin.rememberer_name, 'cookie')
        self.assertEqual(plugin.formbody, formbody)
    def test_factory_defaultform(self):
        from repoze.who.plugins.form import make_plugin
        plugin = make_plugin('__login', 'cookie')
        self.assertEqual(plugin.login_form_qs, '__login')
        self.assertEqual(plugin.rememberer_name, 'cookie')
        self.assertEqual(plugin.formbody, None)
class TestRedirectingFormPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.form import RedirectingFormPlugin
        return RedirectingFormPlugin
    def _makeOne(self, login_form_url='http://example.com/login.html',
                 login_handler_path = '/login_handler',
                 logout_handler_path = '/logout_handler',
                 rememberer_name='cookie',
                 reason_param='reason'):
        plugin = self._getTargetClass()(login_form_url, login_handler_path,
                                        logout_handler_path,
                                        rememberer_name, reason_param)
        return plugin
    def _makeFormEnviron(self, login=None, password=None, came_from=None,
                         path_info='/', identifier=None):
        from StringIO import StringIO
        fields = []
        if login:
            fields.append(('login', login))
        if password:
            fields.append(('password', password))
        if came_from:
            fields.append(('came_from', came_from))
        if identifier is None:
            credentials = {'login':'chris', 'password':'password'}
            identifier = DummyIdentifier(credentials)
        content_type, body = encode_multipart_formdata(fields)
        extra = {'wsgi.input':StringIO(body),
                 'wsgi.url_scheme':'http',
                 'SERVER_NAME':'www.example.com',
                 'SERVER_PORT':'80',
                 'CONTENT_TYPE':content_type,
                 'CONTENT_LENGTH':len(body),
                 'REQUEST_METHOD':'POST',
                 'repoze.who.plugins': {'cookie':identifier},
                 'QUERY_STRING':'default=1',
                 'PATH_INFO':path_info,
                 }
        environ = self._makeEnviron(extra)
        return environ
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
        verifyClass(IChallenger, klass)
    def test_identify_pathinfo_miss(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/not_login_handler')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        self.failIf(environ.get('repoze.who.application'))
    def test_identify_via_login_handler(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password',
                                        came_from='http://example.com')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, 'http://example.com')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_username_pass(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, '/')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_came_from_no_http_referer(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, '/')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_came_from(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password')
        environ['HTTP_REFERER'] = 'http://foo.bar'
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, 'http://foo.bar')
        self.assertEqual(app.code, 302)
    def test_identify_via_logout_handler(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password',
                                        came_from='http://example.com')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], 'http://example.com')
    def test_identify_via_logout_handler_no_came_from_no_http_referer(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], '/')
    def test_identify_via_logout_handler_no_came_from(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password')
        environ['HTTP_REFERER'] = 'http://example.com/referer'
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], 'http://example.com/referer')
    def test_remember(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.remember(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].remembered,
                         identity)
    def test_forget(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.forget(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].forgotten,
                         identity
                         )
    def test_challenge(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [('app', '1')],
                               [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
        headers = sr.headers
        self.assertEqual(len(headers), 3)
        self.assertEqual(sr.headers[1][0], 'forget')
        self.assertEqual(sr.headers[1][1], '1')
        self.assertEqual(sr.headers[2][0], 'content-type')
        self.assertEqual(sr.headers[2][1], 'text/plain; charset=utf8')
        self.assertEqual(sr.status, '302 Found')
    def test_challenge_came_from_in_environ(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(environ, '401 Unauthorized', [('app', '1')],
                               [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
    def test_challenge_with_reason_header(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        came_from_key, came_from_value = parts_qsl[0]
        reason_key, reason_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
        self.assertEqual(reason_key, 'reason')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_reason_and_custom_reason_param(self):
        plugin = self._makeOne(reason_param='auth_failure')
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        reason_key, reason_value = parts_qsl[0]
        came_from_key, came_from_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
        self.assertEqual(reason_key, 'auth_failure')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_setcookie_from_app(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(
            environ,
            '401 Unauthorized',
            [('app', '1'), ('set-cookie','a'), ('set-cookie','b')],
            [])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(sr.headers[1][0], 'set-cookie')
        self.assertEqual(sr.headers[1][1], 'a')
        self.assertEqual(sr.headers[2][0], 'set-cookie')
        self.assertEqual(sr.headers[2][1], 'b')
class TestAuthTktCookiePlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
        return AuthTktCookiePlugin
    def _makeEnviron(self, *arg, **kw):
        environ = Base._makeEnviron(self, *arg, **kw)
        environ['REMOTE_ADDR'] = '1.1.1.1'
        environ['SERVER_NAME'] = 'localhost'
        return environ
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def _makeTicket(self, userid='userid', remote_addr='0.0.0.0',
                    tokens = [], userdata='userdata',
                    cookie_name='auth_tkt', secure=False):
        from paste.auth import auth_tkt
        ticket = auth_tkt.AuthTicket(
            'secret',
            userid,
            remote_addr,
            tokens=tokens,
            user_data=userdata,
            cookie_name=cookie_name,
            secure=secure)
        return ticket.cookie_value()
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
    def test_identify_nocookie(self):
        plugin = self._makeOne('secret')
        environ = self._makeEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_good_cookie_include_ip(self):
        plugin = self._makeOne('secret', include_ip=True)
        val = self._makeTicket(remote_addr='1.1.1.1')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userdata')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_dont_include_ip(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket()
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userdata')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_int_useridtype(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket(userid='1', userdata='userid_type:int')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 1)
        self.assertEqual(result['userdata'], 'userid_type:int')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:int')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_unknown_useridtype(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket(userid='userid', userdata='userid_type:unknown')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userid_type:unknown')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:unknown')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_bad_cookie(self):
        plugin = self._makeOne('secret', include_ip=True)
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=bogus'})
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_remember_creds_same(self):
        plugin = self._makeOne('secret')
        val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.remember(environ, {'repoze.who.userid':'userid',
                                           'userdata':'userdata'})
        self.assertEqual(result, None)
    def test_remember_creds_different(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='other', userdata='userdata')
        result = plugin.remember(environ, {'repoze.who.userid':'other',
                                           'userdata':'userdata'})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
        self.assertEqual(result[1],
                         ('Set-Cookie',
                           'auth_tkt="%s"; Path=/; Domain=localhost' % new_val))
        self.assertEqual(result[2],
                         ('Set-Cookie',
                           'auth_tkt="%s"; Path=/; Domain=.localhost' % new_val))
    def test_remember_creds_different_int_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='1', userdata='userid_type:int')
        result = plugin.remember(environ, {'repoze.who.userid':1,
                                           'userdata':''})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_remember_creds_different_long_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='1', userdata='userid_type:int')
        result = plugin.remember(environ, {'repoze.who.userid':long(1),
                                           'userdata':''})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_remember_creds_different_unicode_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        userid = unicode('\xc2\xa9', 'utf-8')
        new_val = self._makeTicket(userid=userid.encode('utf-8'),
                                   userdata='userid_type:unicode')
        result = plugin.remember(environ, {'repoze.who.userid':userid,
                                           'userdata':''})
        self.assertEqual(type(result[0][1]), str)
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_forget(self):
        plugin = self._makeOne('secret')
        environ = self._makeEnviron()
        headers = plugin.forget(environ, None)
        self.assertEqual(len(headers), 3)
        header = headers[0]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/')
        header = headers[1]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=localhost')
        header = headers[2]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=.localhost')
    def test_factory(self):
        from repoze.who.plugins.auth_tkt import make_plugin
        plugin = make_plugin('secret')
        self.assertEqual(plugin.cookie_name, 'auth_tkt')
        self.assertEqual(plugin.secret, 'secret')
        self.assertEqual(plugin.include_ip, False)
        self.assertEqual(plugin.secure, False)
class TestSQLAuthenticatorPlugin(unittest.TestCase):
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
    def _getTargetClass(self):
        from repoze.who.plugins.sql import SQLAuthenticatorPlugin
        return SQLAuthenticatorPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IAuthenticator
        klass = self._getTargetClass()
        verifyClass(IAuthenticator, klass, tentative=True)
    def test_authenticate_noresults(self):
        dummy_factory = DummyConnectionFactory([])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {'login':'foo', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_comparefail(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_fail)
        environ = self._makeEnviron()
        identity = {'login':'fred', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_comparesuccess(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {'login':'fred', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, 'userid')
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_nologin(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, None)
        self.assertEqual(dummy_factory.closed, False)
class TestDefaultPasswordCompare(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import default_password_compare
        return default_password_compare
    def test_shaprefix_success(self):
        import sha
        stored = sha.new('password').hexdigest()
        stored = '{SHA}' + stored
        compare = self._getFUT()
        result = compare('password', stored)
        self.assertEqual(result, True)
    def test_shaprefix_fail(self):
        import sha
        stored = sha.new('password').hexdigest()
        stored = '{SHA}' + stored
        compare = self._getFUT()
        result = compare('notpassword', stored)
        self.assertEqual(result, False)
    def test_noprefix_success(self):
        stored = 'password'
        compare = self._getFUT()
        result = compare('password', stored)
        self.assertEqual(result, True)
    def test_noprefix_fail(self):
        stored = 'password'
        compare = self._getFUT()
        result = compare('notpassword', stored)
        self.assertEqual(result, False)
class TestSQLMetadataProviderPlugin(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.plugins.sql import SQLMetadataProviderPlugin
        return SQLMetadataProviderPlugin
    def _makeOne(self, *arg, **kw):
        klass = self._getTargetClass()
        return klass(*arg, **kw)
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IMetadataProvider
        klass = self._getTargetClass()
        verifyClass(IMetadataProvider, klass, tentative=True)
    def test_add_metadata(self):
        dummy_factory = DummyConnectionFactory([ [1, 2, 3] ])
        def dummy_filter(results):
            return results
        plugin = self._makeOne('md', 'select foo from bar', dummy_factory,
                               dummy_filter)
        environ = {}
        identity = {'repoze.who.userid':1}
        plugin.add_metadata(environ, identity)
        self.assertEqual(dummy_factory.closed, True)
        self.assertEqual(identity['md'], [ [1,2,3] ])
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.failIf(identity.has_key('__userid'))
class TestMakeSQLAuthenticatorPlugin(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import make_authenticator_plugin
        return make_authenticator_plugin
    def test_noquery(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, None, 'conn', 'compare')
    def test_no_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'statement', None, 'compare')
    def test_bad_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'statement', 'does.not:exist', None)
    def test_connfactory_specd(self):
        f = self._getFUT()
        plugin = f('statement',
                   'repoze.who.plugins.tests:make_dummy_connfactory',
                   None)
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        from repoze.who.plugins.sql import default_password_compare
        self.assertEqual(plugin.compare_fn, default_password_compare)
    def test_comparefunc_specd(self):
        f = self._getFUT()
        plugin = f('statement',
                   'repoze.who.plugins.tests:make_dummy_connfactory',
                   'repoze.who.plugins.tests:make_dummy_connfactory')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.compare_fn, make_dummy_connfactory)
class TestMakeSQLMetadataProviderPlugin(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import make_metadata_plugin
        return make_metadata_plugin
    def test_no_name(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f)
    def test_no_query(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'name', None, None)
    def test_bad_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'name', 'statement',
                          'does.not:exist', None)
    def test_connfactory_specd(self):
        f = self._getFUT()
        plugin = f('name', 'statement',
                   'repoze.who.plugins.tests:make_dummy_connfactory', None)
        self.assertEqual(plugin.name, 'name')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.filter, None)
    def test_comparefn_specd(self):
        f = self._getFUT()
        plugin = f('name', 'statement',
                   'repoze.who.plugins.tests:make_dummy_connfactory',
                   'repoze.who.plugins.tests:make_dummy_connfactory')
        self.assertEqual(plugin.name, 'name')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.filter, make_dummy_connfactory)
class DummyApp:
    environ = None
    def __call__(self, environ, start_response):
        self.environ = environ
        return []
class DummyWorkingApp:
    def __init__(self, status, headers):
        self.status = status
        self.headers = headers
    def __call__(self, environ, start_response):
        self.environ = environ
        start_response(self.status, self.headers)
        return ['body']
class DummyIdentityResetApp:
    def __init__(self, status, headers, new_identity):
        self.status = status
        self.headers = headers
        self.new_identity = new_identity
    def __call__(self, environ, start_response):
        self.environ = environ
        environ['repoze.who.identity']['login'] = 'fred'
        environ['repoze.who.identity']['password'] = 'schooled'
        start_response(self.status, self.headers)
        return ['body']
class DummyRequestClassifier:
    def __call__(self, environ):
        return 'browser'
class DummyIdentifier:
    forgotten = False
    remembered = False
    def __init__(self, credentials=None, remember_headers=None,
                 forget_headers=None, replace_app=None):
        self.credentials = credentials
        self.remember_headers = remember_headers
        self.forget_headers = forget_headers
        self.replace_app = replace_app
    def identify(self, environ):
        if self.replace_app:
            environ['repoze.who.application'] = self.replace_app
        return self.credentials
    def forget(self, environ, identity):
        self.forgotten = identity
        return self.forget_headers
    def remember(self, environ, identity):
        self.remembered = identity
        return self.remember_headers
class DummyNoResultsIdentifier:
    def identify(self, environ):
        return None
    def remember(self, *arg, **kw):
        pass
    def forget(self, *arg, **kw):
        pass
class DummyAuthenticator:
    def __init__(self, userid=None):
        self.userid = userid
    def authenticate(self, environ, credentials):
        if self.userid is None:
            return credentials['login']
        return self.userid
class DummyMultiPlugin:
    pass
class DummyFailAuthenticator:
    def authenticate(self, environ, credentials):
        return None
class DummyChallenger:
    def __init__(self, app=None):
        self.app = app
    def challenge(self, environ, status, app_headers, forget_headers):
        environ['challenged'] = self.app
        return self.app
class DummyMDProvider:
    def __init__(self, metadata=None):
        self._metadata = metadata
    def add_metadata(self, environ, identity):
        return identity.update(self._metadata)
class DummyChallengeDecider:
    def __call__(self, environ, status, headers):
        if status.startswith('401 '):
            return True
class DummyStartResponse:
    def __call__(self, status, headers, exc_info=None):
        self.status = status
        self.headers = headers
        self.exc_info = exc_info
        return []
class DummyConnectionFactory:
    # acts as all of: a factory, a connection, and a cursor
    closed = False
    query = None
    def __init__(self, results):
        self.results = results
    def __call__(self):
        return self
    def cursor(self):
        return self
    def execute(self, query, *arg):
        self.query = query
        self.bindargs = arg
    def fetchall(self):
        return self.results
    def fetchone(self):
        if self.results:
            return self.results[0]
        return []
    def close(self):
        self.closed = True
def compare_fail(cleartext, stored):
    return False
def compare_succeed(cleartext, stored):
    return True
class _DummyConnFactory:
    pass
DummyConnFactory = _DummyConnFactory()
def make_dummy_connfactory(**kw):
    return DummyConnFactory
def encode_multipart_formdata(fields):
    BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'
    CRLF = '\r\n'
    L = []
    for (key, value) in fields:
        L.append('--' + BOUNDARY)
        L.append('Content-Disposition: form-data; name="%s"' % key)
        L.append('')
        L.append(value)
    L.append('--' + BOUNDARY + '--')
    L.append('')
    body = CRLF.join(L)
    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
    return content_type, body
repoze/who/tests.py
@@ -1,4 +1,3 @@
import os
import unittest
class Base(unittest.TestCase):
@@ -11,6 +10,7 @@
class TestMiddleware(Base):
    def _getTargetClass(self):
        from repoze.who.middleware import PluggableAuthenticationMiddleware
        return PluggableAuthenticationMiddleware
@@ -337,7 +337,6 @@
        self.assertEqual(creds['login'], 'chris')
        self.assertEqual(creds['password'], 'password')
        self.assertEqual(userid, 'chris_id2')
    def test_challenge_noidentifier_withapp(self):
        environ = self._makeEnviron()
@@ -685,6 +684,7 @@
    #  - auth_id sorting
class TestMatchClassification(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import match_classification
        return match_classification
@@ -711,6 +711,7 @@
        self.assertEqual(f(IAuthenticator, plugins, 'buz'), [multi1, multi2])
class TestStartResponseWrapper(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import StartResponseWrapper
        return StartResponseWrapper
@@ -754,954 +755,8 @@
        self.assertEqual(datases[0], 'written')
        self.assertEqual(closededs[0], True)
class TestBasicAuthPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.basicauth import BasicAuthPlugin
        return BasicAuthPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IChallenger
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IChallenger, klass)
        verifyClass(IIdentifier, klass)
    def test_challenge(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        result = plugin.challenge(environ, '401 Unauthorized', [], [])
        self.assertNotEqual(result, None)
        app_iter = result(environ, lambda *arg: None)
        items = []
        for item in app_iter:
            items.append(item)
        response = ''.join(items)
        self.failUnless(response.startswith('401 Unauthorized'))
    def test_identify_noauthinfo(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron()
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_nonbasic(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Digest abc'})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_badencoding(self):
        plugin = self._makeOne('realm')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic abc'})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_badrepr(self):
        plugin = self._makeOne('realm')
        value = 'foo'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        creds = plugin.identify(environ)
        self.assertEqual(creds, None)
    def test_identify_basic_ok(self):
        plugin = self._makeOne('realm')
        value = 'foo:bar'.encode('base64')
        environ = self._makeEnviron({'HTTP_AUTHORIZATION':'Basic %s' % value})
        creds = plugin.identify(environ)
        self.assertEqual(creds, {'login':'foo', 'password':'bar'})
    def test_remember(self):
        plugin = self._makeOne('realm')
        creds = {}
        environ = self._makeEnviron()
        result = plugin.remember(environ, creds)
        self.assertEqual(result, None)
    def test_forget(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        result = plugin.forget(environ, creds)
        self.assertEqual(result, [('WWW-Authenticate', 'Basic realm="realm"')] )
    def test_challenge_forgetheaders_includes(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        forget = plugin._get_wwwauth()
        result = plugin.challenge(environ, '401 Unauthorized', [], forget)
        self.assertEqual(result.headers, forget)
    def test_challenge_forgetheaders_omits(self):
        plugin = self._makeOne('realm')
        creds = {'login':'foo', 'password':'password'}
        environ = self._makeEnviron()
        forget = plugin._get_wwwauth()
        result = plugin.challenge(environ, '401 Unauthorized', [], [])
        self.assertEqual(result.headers, forget)
    def test_factory(self):
        from repoze.who.plugins.basicauth import make_plugin
        plugin = make_plugin('realm')
        self.assertEqual(plugin.realm, 'realm')
class TestHTPasswdPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.htpasswd import HTPasswdPlugin
        return HTPasswdPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IAuthenticator
        klass = self._getTargetClass()
        verifyClass(IAuthenticator, klass)
    def test_authenticate_nocreds(self):
        from StringIO import StringIO
        io = StringIO()
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_nolines(self):
        from StringIO import StringIO
        io = StringIO()
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_nousermatch(self):
        from StringIO import StringIO
        io = StringIO('nobody:foo')
        plugin = self._makeOne(io, None)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
    def test_authenticate_match(self):
        from StringIO import StringIO
        io = StringIO('chrism:pass')
        def check(password, hashed):
            return True
        plugin = self._makeOne(io, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_badline(self):
        from StringIO import StringIO
        io = StringIO('badline\nchrism:pass')
        def check(password, hashed):
            return True
        plugin = self._makeOne(io, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_filename(self):
        here = os.path.abspath(os.path.dirname(__file__))
        htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd')
        def check(password, hashed):
            return True
        plugin = self._makeOne(htpasswd, check)
        environ = self._makeEnviron()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, 'chrism')
    def test_authenticate_bad_filename_logs_to_repoze_who_logger(self):
        here = os.path.abspath(os.path.dirname(__file__))
        htpasswd = os.path.join(here, 'fixtures', 'test.htpasswd.nonesuch')
        def check(password, hashed):
            return True
        plugin = self._makeOne(htpasswd, check)
        environ = self._makeEnviron()
        class DummyLogger:
            warnings = []
            def warn(self, msg):
                self.warnings.append(msg)
        logger = environ['repoze.who.logger'] = DummyLogger()
        creds = {'login':'chrism', 'password':'pass'}
        result = plugin.authenticate(environ, creds)
        self.assertEqual(result, None)
        self.assertEqual(len(logger.warnings), 1)
        self.failUnless('could not open htpasswd' in logger.warnings[0])
    def test_crypt_check(self):
        import sys
        # win32 does not have a crypt library, don't
        # fail here
        if "win32" == sys.platform:
            return
        from crypt import crypt
        salt = '123'
        hashed = crypt('password', salt)
        from repoze.who.plugins.htpasswd import crypt_check
        self.assertEqual(crypt_check('password', hashed), True)
        self.assertEqual(crypt_check('notpassword', hashed), False)
    def test_factory(self):
        from repoze.who.plugins.htpasswd import make_plugin
        from repoze.who.plugins.htpasswd import crypt_check
        plugin = make_plugin('foo',
                             'repoze.who.plugins.htpasswd:crypt_check')
        self.assertEqual(plugin.filename, 'foo')
        self.assertEqual(plugin.check, crypt_check)
class TestInsecureCookiePlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.cookie import InsecureCookiePlugin
        return InsecureCookiePlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
    def test_identify_nocookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_badcookies(self):
        plugin = self._makeOne('oatmeal')
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=a'})
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_success(self):
        plugin = self._makeOne('oatmeal')
        auth = 'foo:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'foo', 'password':'password'})
    def test_remember_creds_same(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'foo', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        auth = 'oatmeal=%s;' % auth
        environ = self._makeEnviron({'HTTP_COOKIE':auth})
        result = plugin.remember(environ, creds)
        self.assertEqual(result, None)
    def test_remember_creds_different(self):
        plugin = self._makeOne('oatmeal')
        creds = {'login':'bar', 'password':'password'}
        auth = 'foo:password'.encode('base64').rstrip()
        creds_auth = 'bar:password'.encode('base64').rstrip()
        environ = self._makeEnviron({'HTTP_COOKIE':'oatmeal=%s;' % auth})
        result = plugin.remember(environ, creds)
        expected = 'oatmeal=%s; Path=/;' % creds_auth
        self.assertEqual(result, [('Set-Cookie', expected)])
    def test_factory(self):
        from repoze.who.plugins.cookie import make_plugin
        plugin = make_plugin('foo')
        self.assertEqual(plugin.cookie_name, 'foo')
    def test_forget(self):
        plugin = self._makeOne('oatmeal')
        headers = plugin.forget({}, None)
        self.assertEqual(len(headers), 1)
        header = headers[0]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value,
            'oatmeal=""; Path=/; Expires=Sun, 10-May-1971 11:59:00 GMT')
class TestFormPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.form import FormPlugin
        return FormPlugin
    def _makeOne(self, login_form_qs='__do_login', rememberer_name='cookie',
                 formbody=None):
        plugin = self._getTargetClass()(login_form_qs, rememberer_name,
                                        formbody)
        return plugin
    def _makeFormEnviron(self, login=None, password=None, do_login=False):
        from StringIO import StringIO
        fields = []
        if login:
            fields.append(('login', login))
        if password:
            fields.append(('password', password))
        content_type, body = encode_multipart_formdata(fields)
        credentials = {'login':'chris', 'password':'password'}
        identifier = DummyIdentifier(credentials)
        extra = {'wsgi.input':StringIO(body),
                 'wsgi.url_scheme': 'http',
                 'SERVER_NAME': 'localhost',
                 'SERVER_PORT': '8080',
                 'CONTENT_TYPE':content_type,
                 'CONTENT_LENGTH':len(body),
                 'REQUEST_METHOD':'POST',
                 'repoze.who.plugins': {'cookie':identifier},
                 'PATH_INFO': '/protected',
                 'QUERY_STRING':'',
                 }
        if do_login:
            extra['QUERY_STRING'] = '__do_login=true'
        environ = self._makeEnviron(extra)
        return environ
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
        verifyClass(IChallenger, klass)
    def test_identify_noqs(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_qs_no_values(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True)
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_nologin(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, login='chris')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_nopassword(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_success(self):
        from paste.httpexceptions import HTTPFound
        plugin = self._makeOne()
        environ = self._makeFormEnviron(do_login=True, login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.failUnless(isinstance(app, HTTPFound))
        self.assertEqual(app.location(), 'http://localhost:8080/protected')
    def test_remember(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.remember(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].remembered,
                         identity)
    def test_forget(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.forget(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].forgotten,
                         identity
                         )
    def test_challenge_defaultform(self):
        from repoze.who.plugins.form import _DEFAULT_FORM
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [], [])
        sr = DummyStartResponse()
        result = app(environ, sr)
        self.assertEqual(''.join(result), _DEFAULT_FORM)
        self.assertEqual(len(sr.headers), 2)
        cl = str(len(_DEFAULT_FORM))
        self.assertEqual(sr.headers[0], ('Content-Length', cl))
        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
        self.assertEqual(sr.status, '200 OK')
    def test_challenge_customform(self):
        here = os.path.dirname(__file__)
        fixtures = os.path.join(here, 'fixtures')
        form = os.path.join(fixtures, 'form.html')
        formbody = open(form).read()
        plugin = self._makeOne(formbody=formbody)
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [], [])
        sr = DummyStartResponse()
        result = app(environ, sr)
        self.assertEqual(''.join(result), formbody)
        self.assertEqual(len(sr.headers), 2)
        cl = str(len(formbody))
        self.assertEqual(sr.headers[0], ('Content-Length', cl))
        self.assertEqual(sr.headers[1], ('Content-Type', 'text/html'))
        self.assertEqual(sr.status, '200 OK')
    def test_challenge_with_location(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized',
                               [('Location', 'http://foo/bar')],
                               [('Set-Cookie', 'a=123')])
        sr = DummyStartResponse()
        app(environ, sr)
        headers = sorted(sr.headers)
        self.assertEqual(len(headers), 3)
        self.assertEqual(headers[0], ('Location', 'http://foo/bar'))
        self.assertEqual(headers[1],
                         ('Set-Cookie', 'a=123'))
        self.assertEqual(headers[2],
                         ('content-type', 'text/plain; charset=utf8'))
        self.assertEqual(sr.status, '302 Found')
    def test_factory_withform(self):
        from repoze.who.plugins.form import make_plugin
        here = os.path.dirname(__file__)
        fixtures = os.path.join(here, 'fixtures')
        form = os.path.join(fixtures, 'form.html')
        formbody = open(form).read()
        plugin = make_plugin('__login', 'cookie', form)
        self.assertEqual(plugin.login_form_qs, '__login')
        self.assertEqual(plugin.rememberer_name, 'cookie')
        self.assertEqual(plugin.formbody, formbody)
    def test_factory_defaultform(self):
        from repoze.who.plugins.form import make_plugin
        plugin = make_plugin('__login', 'cookie')
        self.assertEqual(plugin.login_form_qs, '__login')
        self.assertEqual(plugin.rememberer_name, 'cookie')
        self.assertEqual(plugin.formbody, None)
class TestRedirectingFormPlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.form import RedirectingFormPlugin
        return RedirectingFormPlugin
    def _makeOne(self, login_form_url='http://example.com/login.html',
                 login_handler_path = '/login_handler',
                 logout_handler_path = '/logout_handler',
                 rememberer_name='cookie',
                 reason_param='reason'):
        plugin = self._getTargetClass()(login_form_url, login_handler_path,
                                        logout_handler_path,
                                        rememberer_name, reason_param)
        return plugin
    def _makeFormEnviron(self, login=None, password=None, came_from=None,
                         path_info='/', identifier=None):
        from StringIO import StringIO
        fields = []
        if login:
            fields.append(('login', login))
        if password:
            fields.append(('password', password))
        if came_from:
            fields.append(('came_from', came_from))
        if identifier is None:
            credentials = {'login':'chris', 'password':'password'}
            identifier = DummyIdentifier(credentials)
        content_type, body = encode_multipart_formdata(fields)
        extra = {'wsgi.input':StringIO(body),
                 'wsgi.url_scheme':'http',
                 'SERVER_NAME':'www.example.com',
                 'SERVER_PORT':'80',
                 'CONTENT_TYPE':content_type,
                 'CONTENT_LENGTH':len(body),
                 'REQUEST_METHOD':'POST',
                 'repoze.who.plugins': {'cookie':identifier},
                 'QUERY_STRING':'default=1',
                 'PATH_INFO':path_info,
                 }
        environ = self._makeEnviron(extra)
        return environ
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        from repoze.who.interfaces import IChallenger
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
        verifyClass(IChallenger, klass)
    def test_identify_pathinfo_miss(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/not_login_handler')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        self.failIf(environ.get('repoze.who.application'))
    def test_identify_via_login_handler(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password',
                                        came_from='http://example.com')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, 'http://example.com')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_username_pass(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, '/')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_came_from_no_http_referer(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, '/')
        self.assertEqual(app.code, 302)
    def test_identify_via_login_handler_no_came_from(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/login_handler',
                                        login='chris',
                                        password='password')
        environ['HTTP_REFERER'] = 'http://foo.bar'
        result = plugin.identify(environ)
        self.assertEqual(result, {'login':'chris', 'password':'password'})
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 1)
        name, value = app.headers[0]
        self.assertEqual(name, 'location')
        self.assertEqual(value, 'http://foo.bar')
        self.assertEqual(app.code, 302)
    def test_identify_via_logout_handler(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password',
                                        came_from='http://example.com')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], 'http://example.com')
    def test_identify_via_logout_handler_no_came_from_no_http_referer(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password')
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], '/')
    def test_identify_via_logout_handler_no_came_from(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron(path_info='/logout_handler',
                                        login='chris',
                                        password='password')
        environ['HTTP_REFERER'] = 'http://example.com/referer'
        result = plugin.identify(environ)
        self.assertEqual(result, None)
        app = environ['repoze.who.application']
        self.assertEqual(len(app.headers), 0)
        self.assertEqual(app.code, 401)
        self.assertEqual(environ['came_from'], 'http://example.com/referer')
    def test_remember(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.remember(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].remembered,
                         identity)
    def test_forget(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        identity = {}
        result = plugin.forget(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(environ['repoze.who.plugins']['cookie'].forgotten,
                         identity
                         )
    def test_challenge(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(environ, '401 Unauthorized', [('app', '1')],
                               [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://www.example.com/?default=1')
        headers = sr.headers
        self.assertEqual(len(headers), 3)
        self.assertEqual(sr.headers[1][0], 'forget')
        self.assertEqual(sr.headers[1][1], '1')
        self.assertEqual(sr.headers[2][0], 'content-type')
        self.assertEqual(sr.headers[2][1], 'text/plain; charset=utf8')
        self.assertEqual(sr.status, '302 Found')
    def test_challenge_came_from_in_environ(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(environ, '401 Unauthorized', [('app', '1')],
                               [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 1)
        came_from_key, came_from_value = parts_qsl[0]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
    def test_challenge_with_reason_header(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        came_from_key, came_from_value = parts_qsl[0]
        reason_key, reason_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
        self.assertEqual(reason_key, 'reason')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_reason_and_custom_reason_param(self):
        plugin = self._makeOne(reason_param='auth_failure')
        environ = self._makeFormEnviron()
        environ['came_from'] = 'http://example.com/came_from'
        app = plugin.challenge(
            environ, '401 Unauthorized',
            [('X-Authorization-Failure-Reason', 'you are ugly')],
            [('forget', '1')])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(len(sr.headers), 3)
        self.assertEqual(sr.headers[0][0], 'Location')
        url = sr.headers[0][1]
        import urlparse
        import cgi
        parts = urlparse.urlparse(url)
        parts_qsl = cgi.parse_qsl(parts[4])
        self.assertEqual(len(parts_qsl), 2)
        parts_qsl.sort()
        reason_key, reason_value = parts_qsl[0]
        came_from_key, came_from_value = parts_qsl[1]
        self.assertEqual(parts[0], 'http')
        self.assertEqual(parts[1], 'example.com')
        self.assertEqual(parts[2], '/login.html')
        self.assertEqual(parts[3], '')
        self.assertEqual(came_from_key, 'came_from')
        self.assertEqual(came_from_value, 'http://example.com/came_from')
        self.assertEqual(reason_key, 'auth_failure')
        self.assertEqual(reason_value, 'you are ugly')
    def test_challenge_with_setcookie_from_app(self):
        plugin = self._makeOne()
        environ = self._makeFormEnviron()
        app = plugin.challenge(
            environ,
            '401 Unauthorized',
            [('app', '1'), ('set-cookie','a'), ('set-cookie','b')],
            [])
        sr = DummyStartResponse()
        result = ''.join(app(environ, sr))
        self.failUnless(result.startswith('302 Found'))
        self.assertEqual(sr.headers[1][0], 'set-cookie')
        self.assertEqual(sr.headers[1][1], 'a')
        self.assertEqual(sr.headers[2][0], 'set-cookie')
        self.assertEqual(sr.headers[2][1], 'b')
class TestAuthTktCookiePlugin(Base):
    def _getTargetClass(self):
        from repoze.who.plugins.auth_tkt import AuthTktCookiePlugin
        return AuthTktCookiePlugin
    def _makeEnviron(self, *arg, **kw):
        environ = Base._makeEnviron(self, *arg, **kw)
        environ['REMOTE_ADDR'] = '1.1.1.1'
        environ['SERVER_NAME'] = 'localhost'
        return environ
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def _makeTicket(self, userid='userid', remote_addr='0.0.0.0',
                    tokens = [], userdata='userdata',
                    cookie_name='auth_tkt', secure=False):
        from paste.auth import auth_tkt
        ticket = auth_tkt.AuthTicket(
            'secret',
            userid,
            remote_addr,
            tokens=tokens,
            user_data=userdata,
            cookie_name=cookie_name,
            secure=secure)
        return ticket.cookie_value()
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IIdentifier
        klass = self._getTargetClass()
        verifyClass(IIdentifier, klass)
    def test_identify_nocookie(self):
        plugin = self._makeOne('secret')
        environ = self._makeEnviron()
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_identify_good_cookie_include_ip(self):
        plugin = self._makeOne('secret', include_ip=True)
        val = self._makeTicket(remote_addr='1.1.1.1')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userdata')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_dont_include_ip(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket()
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userdata')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userdata')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_int_useridtype(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket(userid='1', userdata='userid_type:int')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 1)
        self.assertEqual(result['userdata'], 'userid_type:int')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:int')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_good_cookie_unknown_useridtype(self):
        plugin = self._makeOne('secret', include_ip=False)
        val = self._makeTicket(userid='userid', userdata='userid_type:unknown')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.identify(environ)
        self.assertEqual(len(result), 4)
        self.assertEqual(result['tokens'], [''])
        self.assertEqual(result['repoze.who.userid'], 'userid')
        self.assertEqual(result['userdata'], 'userid_type:unknown')
        self.failUnless('timestamp' in result)
        self.assertEqual(environ['REMOTE_USER_TOKENS'], [''])
        self.assertEqual(environ['REMOTE_USER_DATA'],'userid_type:unknown')
        self.assertEqual(environ['AUTH_TYPE'],'cookie')
    def test_identify_bad_cookie(self):
        plugin = self._makeOne('secret', include_ip=True)
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=bogus'})
        result = plugin.identify(environ)
        self.assertEqual(result, None)
    def test_remember_creds_same(self):
        plugin = self._makeOne('secret')
        val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % val})
        result = plugin.remember(environ, {'repoze.who.userid':'userid',
                                           'userdata':'userdata'})
        self.assertEqual(result, None)
    def test_remember_creds_different(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='other', userdata='userdata')
        result = plugin.remember(environ, {'repoze.who.userid':'other',
                                           'userdata':'userdata'})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
        self.assertEqual(result[1],
                         ('Set-Cookie',
                           'auth_tkt="%s"; Path=/; Domain=localhost' % new_val))
        self.assertEqual(result[2],
                         ('Set-Cookie',
                           'auth_tkt="%s"; Path=/; Domain=.localhost' % new_val))
    def test_remember_creds_different_int_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='1', userdata='userid_type:int')
        result = plugin.remember(environ, {'repoze.who.userid':1,
                                           'userdata':''})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_remember_creds_different_long_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        new_val = self._makeTicket(userid='1', userdata='userid_type:int')
        result = plugin.remember(environ, {'repoze.who.userid':long(1),
                                           'userdata':''})
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_remember_creds_different_unicode_userid(self):
        plugin = self._makeOne('secret')
        old_val = self._makeTicket(userid='userid')
        environ = self._makeEnviron({'HTTP_COOKIE':'auth_tkt=%s' % old_val})
        userid = unicode('\xc2\xa9', 'utf-8')
        new_val = self._makeTicket(userid=userid.encode('utf-8'),
                                   userdata='userid_type:unicode')
        result = plugin.remember(environ, {'repoze.who.userid':userid,
                                           'userdata':''})
        self.assertEqual(type(result[0][1]), str)
        self.assertEqual(len(result), 3)
        self.assertEqual(result[0],
                         ('Set-Cookie',
                          'auth_tkt="%s"; Path=/' % new_val))
    def test_forget(self):
        plugin = self._makeOne('secret')
        environ = self._makeEnviron()
        headers = plugin.forget(environ, None)
        self.assertEqual(len(headers), 3)
        header = headers[0]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/')
        header = headers[1]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=localhost')
        header = headers[2]
        name, value = header
        self.assertEqual(name, 'Set-Cookie')
        self.assertEqual(value, 'auth_tkt=""""; Path=/; Domain=.localhost')
    def test_factory(self):
        from repoze.who.plugins.auth_tkt import make_plugin
        plugin = make_plugin('secret')
        self.assertEqual(plugin.cookie_name, 'auth_tkt')
        self.assertEqual(plugin.secret, 'secret')
        self.assertEqual(plugin.include_ip, False)
        self.assertEqual(plugin.secure, False)
class TestDefaultRequestClassifier(Base):
    def _getFUT(self):
        from repoze.who.classifiers import default_request_classifier
        return default_request_classifier
@@ -1733,6 +788,7 @@
        self.assertEqual(result, 'browser')
class TestMakeRegistries(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.middleware import make_registries
        return make_registries
@@ -1773,208 +829,8 @@
        self.assertEqual(name_reg['auth'], dummy_auth)
        self.assertEqual(name_reg['challenger'], dummy_challenger)
class TestSQLAuthenticatorPlugin(unittest.TestCase):
    def _makeEnviron(self, kw=None):
        environ = {}
        environ['wsgi.version'] = (1,0)
        if kw is not None:
            environ.update(kw)
        return environ
    def _getTargetClass(self):
        from repoze.who.plugins.sql import SQLAuthenticatorPlugin
        return SQLAuthenticatorPlugin
    def _makeOne(self, *arg, **kw):
        plugin = self._getTargetClass()(*arg, **kw)
        return plugin
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IAuthenticator
        klass = self._getTargetClass()
        verifyClass(IAuthenticator, klass, tentative=True)
    def test_authenticate_noresults(self):
        dummy_factory = DummyConnectionFactory([])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {'login':'foo', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_comparefail(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_fail)
        environ = self._makeEnviron()
        identity = {'login':'fred', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_comparesuccess(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {'login':'fred', 'password':'bar'}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, 'userid')
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.assertEqual(dummy_factory.closed, True)
    def test_authenticate_nologin(self):
        dummy_factory = DummyConnectionFactory([ ['userid', 'password'] ])
        plugin = self._makeOne('select foo from bar', dummy_factory,
                               compare_succeed)
        environ = self._makeEnviron()
        identity = {}
        result = plugin.authenticate(environ, identity)
        self.assertEqual(result, None)
        self.assertEqual(dummy_factory.query, None)
        self.assertEqual(dummy_factory.closed, False)
class TestDefaultPasswordCompare(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import default_password_compare
        return default_password_compare
    def test_shaprefix_success(self):
        import sha
        stored = sha.new('password').hexdigest()
        stored = '{SHA}' + stored
        compare = self._getFUT()
        result = compare('password', stored)
        self.assertEqual(result, True)
    def test_shaprefix_fail(self):
        import sha
        stored = sha.new('password').hexdigest()
        stored = '{SHA}' + stored
        compare = self._getFUT()
        result = compare('notpassword', stored)
        self.assertEqual(result, False)
    def test_noprefix_success(self):
        stored = 'password'
        compare = self._getFUT()
        result = compare('password', stored)
        self.assertEqual(result, True)
    def test_noprefix_fail(self):
        stored = 'password'
        compare = self._getFUT()
        result = compare('notpassword', stored)
        self.assertEqual(result, False)
class TestSQLMetadataProviderPlugin(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.plugins.sql import SQLMetadataProviderPlugin
        return SQLMetadataProviderPlugin
    def _makeOne(self, *arg, **kw):
        klass = self._getTargetClass()
        return klass(*arg, **kw)
    def test_implements(self):
        from zope.interface.verify import verifyClass
        from repoze.who.interfaces import IMetadataProvider
        klass = self._getTargetClass()
        verifyClass(IMetadataProvider, klass, tentative=True)
    def test_add_metadata(self):
        dummy_factory = DummyConnectionFactory([ [1, 2, 3] ])
        def dummy_filter(results):
            return results
        plugin = self._makeOne('md', 'select foo from bar', dummy_factory,
                               dummy_filter)
        environ = {}
        identity = {'repoze.who.userid':1}
        plugin.add_metadata(environ, identity)
        self.assertEqual(dummy_factory.closed, True)
        self.assertEqual(identity['md'], [ [1,2,3] ])
        self.assertEqual(dummy_factory.query, 'select foo from bar')
        self.failIf(identity.has_key('__userid'))
class TestMakeSQLAuthenticatorPlugin(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import make_authenticator_plugin
        return make_authenticator_plugin
    def test_noquery(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, None, 'conn', 'compare')
    def test_no_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'statement', None, 'compare')
    def test_bad_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'statement', 'does.not:exist', None)
    def test_connfactory_specd(self):
        f = self._getFUT()
        plugin = f('statement',
                   'repoze.who.tests:make_dummy_connfactory',
                   None)
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        from repoze.who.plugins.sql import default_password_compare
        self.assertEqual(plugin.compare_fn, default_password_compare)
    def test_comparefunc_specd(self):
        f = self._getFUT()
        plugin = f('statement',
                   'repoze.who.tests:make_dummy_connfactory',
                   'repoze.who.tests:make_dummy_connfactory')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.compare_fn, make_dummy_connfactory)
class TestMakeSQLMetadataProviderPlugin(unittest.TestCase):
    def _getFUT(self):
        from repoze.who.plugins.sql import make_metadata_plugin
        return make_metadata_plugin
    def test_no_name(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f)
    def test_no_query(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'name', None, None)
    def test_bad_connfactory(self):
        f = self._getFUT()
        self.assertRaises(ValueError, f, 'name', 'statement',
                          'does.not:exist', None)
    def test_connfactory_specd(self):
        f = self._getFUT()
        plugin = f('name', 'statement',
                   'repoze.who.tests:make_dummy_connfactory', None)
        self.assertEqual(plugin.name, 'name')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.filter, None)
    def test_comparefn_specd(self):
        f = self._getFUT()
        plugin = f('name', 'statement',
                   'repoze.who.tests:make_dummy_connfactory',
                   'repoze.who.tests:make_dummy_connfactory')
        self.assertEqual(plugin.name, 'name')
        self.assertEqual(plugin.query, 'statement')
        self.assertEqual(plugin.conn_factory, DummyConnFactory)
        self.assertEqual(plugin.filter, make_dummy_connfactory)
class TestIdentityDict(unittest.TestCase):
    def _getTargetClass(self):
        from repoze.who.middleware import Identity
        return Identity
@@ -2538,45 +1394,14 @@
        self.assertEqual(filter.predicate.foo, 'Foo')
        self.failUnless(filter.enabled)
# XXX need make_middleware tests
class DummyPredicate:
    def __init__(self, **kw):
        self.__dict__.update(kw)
    def __call__(self, env):
        return True
def compare_success(*arg):
    return True
def compare_fail(*arg):
    return False
class DummyCursor:
    def __init__(self, result):
        self.result = result
        self.statement = None
        self.bindargs = None
        self.closed = False
    def execute(self, statement, bindargs):
        self.statement = statement
        self.bindargs = bindargs
    def close(self):
        self.closed = True
    def fetchone(self):
        return self.result
class DummyConnection:
    def __init__(self, dsn, cursor):
        self.dsn = dsn
        self.curs = cursor
    def cursor(self):
        return self.curs
# XXX need make_middleware tests
class DummyApp:
    environ = None
@@ -2688,59 +1513,3 @@
        self.exc_info = exc_info
        return []
class DummyConnectionFactory:
    # acts as all of: a factory, a connection, and a cursor
    closed = False
    query = None
    def __init__(self, results):
        self.results = results
    def __call__(self):
        return self
    def cursor(self):
        return self
    def execute(self, query, *arg):
        self.query = query
        self.bindargs = arg
    def fetchall(self):
        return self.results
    def fetchone(self):
        if self.results:
            return self.results[0]
        return []
    def close(self):
        self.closed = True
def compare_fail(cleartext, stored):
    return False
def compare_succeed(cleartext, stored):
    return True
class _DummyConnFactory:
    pass
DummyConnFactory = _DummyConnFactory()
def make_dummy_connfactory(**kw):
    return DummyConnFactory
def encode_multipart_formdata(fields):
    BOUNDARY = '----------ThIs_Is_tHe_bouNdaRY_$'
    CRLF = '\r\n'
    L = []
    for (key, value) in fields:
        L.append('--' + BOUNDARY)
        L.append('Content-Disposition: form-data; name="%s"' % key)
        L.append('')
        L.append(value)
    L.append('--' + BOUNDARY + '--')
    L.append('')
    body = CRLF.join(L)
    content_type = 'multipart/form-data; boundary=%s' % BOUNDARY
    return content_type, body
setup.py
@@ -49,7 +49,7 @@
      zip_safe=False,
      tests_require = ['Paste', 'zope.interface'],
      install_requires=['Paste', 'zope.interface', 'setuptools'],
      test_suite="repoze.who.tests",
      test_suite="repoze.who",
      entry_points = """\
      [paste.filter_app_factory]
      test = repoze.who.middleware:make_test_middleware