Tres Seaver
2010-10-01 d7f61380bb37d911239adf667ce0dd7d773b5ca3
commit | author | age
f7efc0 1 import unittest
TS 2
afe561 3 class TestMiddleware(unittest.TestCase):
f7efc0 4
TS 5     def _getTargetClass(self):
6         from repoze.who.middleware import PluggableAuthenticationMiddleware
7         return PluggableAuthenticationMiddleware
8
9     def _makeOne(self,
10                  app=None,
11                  identifiers=None,
12                  authenticators=None,
13                  challengers=None,
cbc983 14                  request_classifier=None,
f7efc0 15                  mdproviders=None,
TS 16                  challenge_decider=None,
17                  log_stream=None,
18                  log_level=None,
b482a1 19                  remote_user_key='REMOTE_USER',
f7efc0 20                  ):
TS 21         if app is None:
22             app = DummyApp()
23         if identifiers is None:
24             identifiers = []
25         if authenticators is None:
26             authenticators = []
27         if challengers is None:
28             challengers = []
cbc983 29         if request_classifier is None:
TS 30             request_classifier = DummyRequestClassifier()
f7efc0 31         if mdproviders is None:
TS 32             mdproviders = []
33         if challenge_decider is None:
34             challenge_decider = DummyChallengeDecider()
35         if log_level is None:
36             import logging
37             log_level = logging.DEBUG
38         mw = self._getTargetClass()(app,
39                                     identifiers,
40                                     authenticators,
41                                     challengers,
42                                     mdproviders,
cbc983 43                                     request_classifier,
f7efc0 44                                     challenge_decider,
TS 45                                     log_stream,
b482a1 46                                     log_level=logging.DEBUG,
TS 47                                     remote_user_key=remote_user_key,
48                                    )
f7efc0 49         return mw
TS 50
afe561 51     def _makeEnviron(self, kw=None):
TS 52         environ = {}
53         environ['wsgi.version'] = (1,0)
54         if kw is not None:
55             environ.update(kw)
56         return environ
57
cbc983 58     def test_ctor_positional_args(self):
TS 59         klass = self._getTargetClass()
60         app = DummyApp()
61         identifiers = []
62         authenticators = []
63         challengers = []
64         request_classifier = DummyRequestClassifier()
65         mdproviders = []
66         challenge_decider = DummyChallengeDecider()
67         mw = klass(app,
68                    identifiers,
69                    authenticators,
70                    challengers,
71                    mdproviders,
72                    request_classifier,
73                    challenge_decider,
74                   )
75         self.assertEqual(mw.app, app)
76         af = mw.api_factory
77         self.assertEqual(af.identifiers, identifiers)
78         self.assertEqual(af.authenticators, authenticators)
79         self.assertEqual(af.challengers, challengers)
80         self.assertEqual(af.mdproviders, mdproviders)
81         self.assertEqual(af.request_classifier, request_classifier)
82         self.assertEqual(af.challenge_decider, challenge_decider)
83
84     def test_ctor_wo_request_classifier_or_classifier_raises(self):
85         # BBB for old argument name
86         klass = self._getTargetClass()
87         app = DummyApp()
88         identifiers = []
89         authenticators = []
90         challengers = []
91         mdproviders = []
92         challenge_decider = DummyChallengeDecider()
93         self.assertRaises(ValueError,
94                           klass,
95                           app,
96                           identifiers,
97                           authenticators,
98                           challengers,
99                           mdproviders,
100                           challenge_decider = challenge_decider,
101                           )
102
103     def test_ctor_w_request_classifier_and_classifier_raises(self):
104         # BBB for old argument name
105         klass = self._getTargetClass()
106         app = DummyApp()
107         identifiers = []
108         authenticators = []
109         challengers = []
110         request_classifier = DummyRequestClassifier()
111         mdproviders = []
112         challenge_decider = DummyChallengeDecider()
113         self.assertRaises(ValueError,
114                           klass,
115                           app,
116                           identifiers,
117                           authenticators,
118                           challengers,
119                           mdproviders,
120                           request_classifier,
121                           challenge_decider,
122                           classifier = object()
123                           )
124
125     def test_ctor_wo_challenge_decider_raises(self):
126         # BBB for old argument name
127         klass = self._getTargetClass()
128         app = DummyApp()
129         identifiers = []
130         authenticators = []
131         challengers = []
132         request_classifier = DummyRequestClassifier()
133         mdproviders = []
134         self.assertRaises(ValueError,
135                           klass,
136                           app,
137                           identifiers,
138                           authenticators,
139                           challengers,
140                           mdproviders,
141                           classifier = request_classifier,
142                           )
143
144     def test_ctor_w_classifier(self):
145         # BBB for old argument name
146         klass = self._getTargetClass()
147         app = DummyApp()
148         identifiers = []
149         authenticators = []
150         challengers = []
151         request_classifier = DummyRequestClassifier()
152         mdproviders = []
153         challenge_decider = DummyChallengeDecider()
154         mw = klass(app,
155                    identifiers,
156                    authenticators,
157                    challengers,
158                    mdproviders,
159                    classifier = request_classifier,
160                    challenge_decider = challenge_decider,
161                   )
162         self.assertEqual(mw.app, app)
163         af = mw.api_factory
164         self.assertEqual(af.identifiers, identifiers)
165         self.assertEqual(af.authenticators, authenticators)
166         self.assertEqual(af.challengers, challengers)
167         self.assertEqual(af.mdproviders, mdproviders)
168         self.assertEqual(af.request_classifier, request_classifier)
169         self.assertEqual(af.challenge_decider, challenge_decider)
170
171     def test_ctor_accepts_logger(self):
f7efc0 172         import logging
140fb9 173         restore = logging.raiseExceptions
TS 174         logging.raiseExceptions = 0
175         try:
176             logger = logging.Logger('something')
177             logger.setLevel(logging.INFO)
178             mw = self._makeOne(log_stream=logger)
179             self.assertEqual(logger, mw.logger)
180         finally:
181             logging.raiseExceptions = restore
f7efc0 182
TS 183     def test_call_remoteuser_already_set(self):
184         environ = self._makeEnviron({'REMOTE_USER':'admin'})
185         mw = self._makeOne()
186         result = mw(environ, None)
187         self.assertEqual(mw.app.environ, environ)
188         self.assertEqual(result, [])
189
190     def test_call_200_no_plugins(self):
191         environ = self._makeEnviron()
192         headers = [('a', '1')]
193         app = DummyWorkingApp('200 OK', headers)
194         mw = self._makeOne(app=app)
195         start_response = DummyStartResponse()
196         result = mw(environ, start_response)
197         self.assertEqual(mw.app.environ, environ)
198         self.assertEqual(result, ['body'])
199         self.assertEqual(start_response.status, '200 OK')
200         self.assertEqual(start_response.headers, headers)
201
202     def test_call_401_no_challengers(self):
203         environ = self._makeEnviron()
204         headers = [('a', '1')]
205         app = DummyWorkingApp('401 Unauthorized', headers)
206         mw = self._makeOne(app=app)
207         start_response = DummyStartResponse()
208         self.assertRaises(RuntimeError, mw, environ, start_response)
209
210     def test_call_200_no_challengers(self):
211         environ = self._makeEnviron()
212         headers = [('a', '1')]
213         app = DummyWorkingApp('200 OK', headers)
214         credentials = {'login':'chris', 'password':'password'}
215         identifier = DummyIdentifier(credentials)
216         identifiers = [ ('identifier', identifier) ]
217         mw = self._makeOne(app=app, identifiers=identifiers)
218         start_response = DummyStartResponse()
219         result = mw(environ, start_response)
220         self.assertEqual(mw.app.environ, environ)
221         self.assertEqual(result, ['body'])
222         self.assertEqual(start_response.status, '200 OK')
223         self.assertEqual(start_response.headers, headers)
224
225     def test_call_401_no_identifiers(self):
b482a1 226         from paste.httpexceptions import HTTPUnauthorized
f7efc0 227         environ = self._makeEnviron()
TS 228         headers = [('a', '1')]
229         app = DummyWorkingApp('401 Unauthorized', headers)
230         challenge_app = HTTPUnauthorized()
231         challenge = DummyChallenger(challenge_app)
232         challengers = [ ('challenge', challenge) ]
233         mw = self._makeOne(app=app, challengers=challengers)
234         start_response = DummyStartResponse()
235         result = mw(environ, start_response)
236         self.assertEqual(environ['challenged'], challenge_app)
237         self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
238
239     def test_call_401_challenger_and_identifier_no_authenticator(self):
b482a1 240         from paste.httpexceptions import HTTPUnauthorized
f7efc0 241         environ = self._makeEnviron()
TS 242         headers = [('a', '1')]
243         app = DummyWorkingApp('401 Unauthorized', headers)
244         challenge_app = HTTPUnauthorized()
245         challenge = DummyChallenger(challenge_app)
246         challengers = [ ('challenge', challenge) ]
247         credentials = {'login':'a', 'password':'b'}
248         identifier = DummyIdentifier(credentials)
249         identifiers = [ ('identifier', identifier) ]
250         mw = self._makeOne(app=app, challengers=challengers,
251                            identifiers=identifiers)
252         start_response = DummyStartResponse()
253
254         result = mw(environ, start_response)
255         self.assertEqual(environ['challenged'], challenge_app)
256         self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
257         self.assertEqual(identifier.forgotten, False)
258         self.assertEqual(environ.get('REMOTE_USER'), None)
259
260     def test_call_401_challenger_and_identifier_and_authenticator(self):
b482a1 261         from paste.httpexceptions import HTTPUnauthorized
f7efc0 262         environ = self._makeEnviron()
TS 263         headers = [('a', '1')]
264         app = DummyWorkingApp('401 Unauthorized', headers)
265         challenge_app = HTTPUnauthorized()
266         challenge = DummyChallenger(challenge_app)
267         challengers = [ ('challenge', challenge) ]
268         credentials = {'login':'chris', 'password':'password'}
269         identifier = DummyIdentifier(credentials)
270         identifiers = [ ('identifier', identifier) ]
271         authenticator = DummyAuthenticator()
272         authenticators = [ ('authenticator', authenticator) ]
273         mw = self._makeOne(app=app, challengers=challengers,
274                            identifiers=identifiers,
275                            authenticators=authenticators)
276         start_response = DummyStartResponse()
277         result = mw(environ, start_response)
278         self.assertEqual(environ['challenged'], challenge_app)
279         self.failUnless(result[0].startswith('401 Unauthorized\r\n'))
280         # @@ unfuck
281 ##         self.assertEqual(identifier.forgotten, identifier.credentials)
282         self.assertEqual(environ['REMOTE_USER'], 'chris')
283 ##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
284
285     def test_call_200_challenger_and_identifier_and_authenticator(self):
b482a1 286         from paste.httpexceptions import HTTPUnauthorized
f7efc0 287         environ = self._makeEnviron()
TS 288         headers = [('a', '1')]
289         app = DummyWorkingApp('200 OK', headers)
290         challenge_app = HTTPUnauthorized()
291         challenge = DummyChallenger(challenge_app)
292         challengers = [ ('challenge', challenge) ]
293         credentials = {'login':'chris', 'password':'password'}
294         identifier = DummyIdentifier(credentials)
295         identifiers = [ ('identifier', identifier) ]
296         authenticator = DummyAuthenticator()
297         authenticators = [ ('authenticator', authenticator) ]
298         mw = self._makeOne(app=app, challengers=challengers,
299                            identifiers=identifiers,
300                            authenticators=authenticators)
301         start_response = DummyStartResponse()
302         result = mw(environ, start_response)
303         self.assertEqual(environ.get('challenged'), None)
304         self.assertEqual(identifier.forgotten, False)
305         # @@ figure out later
306 ##         self.assertEqual(dict(identifier.remembered)['login'], dict(identifier.credentials)['login'])
307 ##         self.assertEqual(dict(identifier.remembered)['password'], dict(identifier.credentials)['password'])
308         self.assertEqual(environ['REMOTE_USER'], 'chris')
309 ##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
310
311
312     def test_call_200_identity_reset(self):
b482a1 313         from paste.httpexceptions import HTTPUnauthorized
f7efc0 314         environ = self._makeEnviron()
TS 315         headers = [('a', '1')]
316         new_identity = {'user_id':'foo', 'password':'bar'}
317         app = DummyIdentityResetApp('200 OK', headers, new_identity)
318         challenge_app = HTTPUnauthorized()
319         challenge = DummyChallenger(challenge_app)
320         challengers = [ ('challenge', challenge) ]
321         credentials = {'login':'chris', 'password':'password'}
322         identifier = DummyIdentifier(credentials)
323         identifiers = [ ('identifier', identifier) ]
324         authenticator = DummyAuthenticator()
325         authenticators = [ ('authenticator', authenticator) ]
326         mw = self._makeOne(app=app, challengers=challengers,
327                            identifiers=identifiers,
328                            authenticators=authenticators)
329         start_response = DummyStartResponse()
330         result = mw(environ, start_response)
331         self.assertEqual(environ.get('challenged'), None)
332         self.assertEqual(identifier.forgotten, False)
333         new_credentials = identifier.credentials.copy()
334         new_credentials['login'] = 'fred'
335         new_credentials['password'] = 'schooled'
336         # @@ unfuck
337 ##         self.assertEqual(identifier.remembered, new_credentials)
338         self.assertEqual(environ['REMOTE_USER'], 'chris')
339 ##         self.assertEqual(environ['repoze.who.identity'], new_credentials)
340
341     def test_call_200_with_metadata(self):
b482a1 342         from paste.httpexceptions import HTTPUnauthorized
f7efc0 343         environ = self._makeEnviron()
TS 344         headers = [('a', '1')]
345         app = DummyWorkingApp('200 OK', headers)
346         challenge_app = HTTPUnauthorized()
347         challenge = DummyChallenger(challenge_app)
348         challengers = [ ('challenge', challenge) ]
349         credentials = {'login':'chris', 'password':'password'}
350         identifier = DummyIdentifier(credentials)
351         identifiers = [ ('identifier', identifier) ]
352         authenticator = DummyAuthenticator()
353         authenticators = [ ('authenticator', authenticator) ]
354         mdprovider = DummyMDProvider({'foo':'bar'})
355         mdproviders = [ ('mdprovider', mdprovider) ]
356         mw = self._makeOne(app=app, challengers=challengers,
357                            identifiers=identifiers,
358                            authenticators=authenticators,
359                            mdproviders=mdproviders)
360         start_response = DummyStartResponse()
361         result = mw(environ, start_response)
362         # metadata
363         self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
364
365     def test_call_ingress_plugin_replaces_application(self):
b482a1 366         from paste.httpexceptions import HTTPFound
f7efc0 367         environ = self._makeEnviron()
TS 368         headers = [('a', '1')]
369         app = DummyWorkingApp('200 OK', headers)
370         challengers = []
371         credentials = {'login':'chris', 'password':'password'}
372         identifier = DummyIdentifier(
373             credentials,
374             remember_headers=[('a', '1')],
375             replace_app = HTTPFound('http://example.com/redirect')
376             )
377         identifiers = [ ('identifier', identifier) ]
378         authenticator = DummyAuthenticator()
379         authenticators = [ ('authenticator', authenticator) ]
380         mdproviders = []
381         mw = self._makeOne(app=app,
382                            challengers=challengers,
383                            identifiers=identifiers,
384                            authenticators=authenticators,
385                            mdproviders=mdproviders)
386         start_response = DummyStartResponse()
387         result = ''.join(mw(environ, start_response))
388         self.failUnless(result.startswith('302 Found'))
389         self.assertEqual(start_response.status, '302 Found')
390         headers = start_response.headers
b482a1 391         self.assertEqual(len(headers), 3, headers)
f7efc0 392         self.assertEqual(headers[0],
TS 393                          ('location', 'http://example.com/redirect'))
394         self.assertEqual(headers[1],
395                          ('content-type', 'text/plain; charset=utf8'))
396         self.assertEqual(headers[2],
397                          ('a', '1'))
398         self.assertEqual(start_response.exc_info, None)
399         self.failIf(environ.has_key('repoze.who.application'))
400
401     def test_call_app_doesnt_call_start_response(self):
b482a1 402         from paste.httpexceptions import HTTPUnauthorized
f7efc0 403         environ = self._makeEnviron()
TS 404         headers = [('a', '1')]
405         app = DummyGeneratorApp('200 OK', headers)
406         challenge_app = HTTPUnauthorized()
407         challenge = DummyChallenger(challenge_app)
408         challengers = [ ('challenge', challenge) ]
409         credentials = {'login':'chris', 'password':'password'}
410         identifier = DummyIdentifier(credentials)
411         identifiers = [ ('identifier', identifier) ]
412         authenticator = DummyAuthenticator()
413         authenticators = [ ('authenticator', authenticator) ]
414         mdprovider = DummyMDProvider({'foo':'bar'})
415         mdproviders = [ ('mdprovider', mdprovider) ]
416         mw = self._makeOne(app=app, challengers=challengers,
417                            identifiers=identifiers,
418                            authenticators=authenticators,
419                            mdproviders=mdproviders)
420         start_response = DummyStartResponse()
421         result = mw(environ, start_response)
422         # metadata
423         self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
424
425     # XXX need more call tests:
426     #  - auth_id sorting
427
428 class TestStartResponseWrapper(unittest.TestCase):
429
430     def _getTargetClass(self):
431         from repoze.who.middleware import StartResponseWrapper
432         return StartResponseWrapper
433
434     def _makeOne(self, *arg, **kw):
435         plugin = self._getTargetClass()(*arg, **kw)
436         return plugin
437
438     def test_ctor(self):
439         wrapper = self._makeOne(None)
440         self.assertEqual(wrapper.start_response, None)
441         self.assertEqual(wrapper.headers, [])
442         self.failUnless(wrapper.buffer)
443
444     def test_finish_response(self):
b482a1 445         from StringIO import StringIO
f7efc0 446         statuses = []
TS 447         headerses = []
448         datases = []
449         closededs = []
450         def write(data):
451             datases.append(data)
452         def close():
453             closededs.append(True)
454         write.close = close
455
456         def start_response(status, headers, exc_info=None):
457             statuses.append(status)
458             headerses.append(headers)
459             return write
460
461         wrapper = self._makeOne(start_response)
462         wrapper.status = '401 Unauthorized'
463         wrapper.headers = [('a', '1')]
464         wrapper.buffer = StringIO('written')
465         extra_headers = [('b', '2')]
466         result = wrapper.finish_response(extra_headers)
467         self.assertEqual(result, None)
468         self.assertEqual(headerses[0], wrapper.headers + extra_headers)
469         self.assertEqual(statuses[0], wrapper.status)
470         self.assertEqual(datases[0], 'written')
471         self.assertEqual(closededs[0], True)
472
473 class WrapGeneratorTests(unittest.TestCase):
474
475     def _getFUT(self):
476         from repoze.who.middleware import wrap_generator
477         return wrap_generator
478
479     def test_it(self):
480         L = []
481         def gen(L=L):
482             L.append('yo!')
483             yield 'a'
484             yield 'b'
485         wrap_generator = self._getFUT()
486         newgen = wrap_generator(gen())
487         self.assertEqual(L, ['yo!'])
488         self.assertEqual(list(newgen), ['a', 'b'])
489
d32c12 490 class TestMakeTestMiddleware(unittest.TestCase):
TS 491
492     def setUp(self):
493         import os
494         self._old_WHO_LOG = os.environ.get('WHO_LOG')
495
496     def tearDown(self):
497         import os
498         if self._old_WHO_LOG is not None:
499             os.environ['WHO_LOG'] = self._old_WHO_LOG
500         else:
501             if 'WHO_LOG' in os.environ:
502                 del os.environ['WHO_LOG']
503
504     def _getFactory(self):
505         from repoze.who.middleware import make_test_middleware
506         return make_test_middleware
507
508     def test_it_no_WHO_LOG_in_environ(self):
509         app = DummyApp()
510         factory = self._getFactory()
511         global_conf = {'here': '/'}
512         middleware = factory(app, global_conf)
993216 513         api_factory = middleware.api_factory
d7f613 514         self.assertEqual(len(api_factory.identifiers), 2)
993216 515         self.assertEqual(len(api_factory.authenticators), 1)
TS 516         self.assertEqual(len(api_factory.challengers), 2)
517         self.assertEqual(len(api_factory.mdproviders), 0)
d32c12 518         self.assertEqual(middleware.logger, None)
TS 519
520     def test_it_w_WHO_LOG_in_environ(self):
521         import logging
522         import os
523         os.environ['WHO_LOG'] = '1'
524         app = DummyApp()
525         factory = self._getFactory()
526         global_conf = {'here': '/'}
527         middleware = factory(app, global_conf)
528         self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG)
529
f7efc0 530 class DummyApp:
TS 531     environ = None
532     def __call__(self, environ, start_response):
533         self.environ = environ
534         return []
535
536 class DummyWorkingApp:
537     def __init__(self, status, headers):
538         self.status = status
539         self.headers = headers
540
541     def __call__(self, environ, start_response):
542         self.environ = environ
543         start_response(self.status, self.headers)
544         return ['body']
545
546 class DummyGeneratorApp:
547     def __init__(self, status, headers):
548         self.status = status
549         self.headers = headers
550
551     def __call__(self, environ, start_response):
552         def gen(self=self, start_response=start_response):
553             self.environ = environ
554             start_response(self.status, self.headers)
555             yield 'body'
556         return gen()
557
558 class DummyIdentityResetApp:
559     def __init__(self, status, headers, new_identity):
560         self.status = status
561         self.headers = headers
562         self.new_identity = new_identity
563
564     def __call__(self, environ, start_response):
565         self.environ = environ
566         environ['repoze.who.identity']['login'] = 'fred'
567         environ['repoze.who.identity']['password'] = 'schooled'
568         start_response(self.status, self.headers)
569         return ['body']
570
571 class DummyChallenger:
572     def __init__(self, app=None):
573         self.app = app
574
575     def challenge(self, environ, status, app_headers, forget_headers):
576         environ['challenged'] = self.app
577         return self.app
578
579 class DummyIdentifier:
580     forgotten = False
581     remembered = False
582
583     def __init__(self, credentials=None, remember_headers=None,
584                  forget_headers=None, replace_app=None):
585         self.credentials = credentials
586         self.remember_headers = remember_headers
587         self.forget_headers = forget_headers
588         self.replace_app = replace_app
589
590     def identify(self, environ):
591         if self.replace_app:
592             environ['repoze.who.application'] = self.replace_app
593         return self.credentials
594
595     def forget(self, environ, identity):
596         self.forgotten = identity
597         return self.forget_headers
598
599     def remember(self, environ, identity):
600         self.remembered = identity
601         return self.remember_headers
602
603 class DummyAuthenticator:
604     def __init__(self, userid=None):
605         self.userid = userid
606
607     def authenticate(self, environ, credentials):
608         if self.userid is None:
609             return credentials['login']
610         return self.userid
611
612 class DummyFailAuthenticator:
613     def authenticate(self, environ, credentials):
614         return None
615
616 class DummyRequestClassifier:
617     def __call__(self, environ):
618         return 'browser'
619
620 class DummyChallengeDecider:
621     def __call__(self, environ, status, headers):
622         if status.startswith('401 '):
623             return True
624
625 class DummyNoResultsIdentifier:
626     def identify(self, environ):
627         return None
628
629     def remember(self, *arg, **kw):
630         pass
631
632     def forget(self, *arg, **kw):
633         pass
634
635 class DummyStartResponse:
636     def __call__(self, status, headers, exc_info=None):
637         self.status = status
638         self.headers = headers
639         self.exc_info = exc_info
640         return []
641
642 class DummyMDProvider:
643     def __init__(self, metadata=None):
644         self._metadata = metadata
645
646     def add_metadata(self, environ, identity):
647         return identity.update(self._metadata)
648
649 class DummyMultiPlugin:
650     pass