Tres Seaver
2016-05-31 455778d138ea623d224c9206e5001fd2a1fd7e1c
commit | author | age
f7efc0 1 import unittest
TS 2
9c1e6f 3
826ba0 4 class TestMiddleware(unittest.TestCase):
f7efc0 5
TS 6     def _getTargetClass(self):
7         from repoze.who.middleware import PluggableAuthenticationMiddleware
8         return PluggableAuthenticationMiddleware
9
10     def _makeOne(self,
11                  app=None,
12                  identifiers=None,
13                  authenticators=None,
14                  challengers=None,
cbc983 15                  request_classifier=None,
f7efc0 16                  mdproviders=None,
TS 17                  challenge_decider=None,
18                  log_stream=None,
19                  log_level=None,
b482a1 20                  remote_user_key='REMOTE_USER',
f7efc0 21                  ):
TS 22         if app is None:
23             app = DummyApp()
24         if identifiers is None:
25             identifiers = []
26         if authenticators is None:
27             authenticators = []
28         if challengers is None:
29             challengers = []
cbc983 30         if request_classifier is None:
TS 31             request_classifier = DummyRequestClassifier()
f7efc0 32         if mdproviders is None:
TS 33             mdproviders = []
34         if challenge_decider is None:
35             challenge_decider = DummyChallengeDecider()
36         if log_level is None:
37             import logging
38             log_level = logging.DEBUG
39         mw = self._getTargetClass()(app,
40                                     identifiers,
41                                     authenticators,
42                                     challengers,
43                                     mdproviders,
cbc983 44                                     request_classifier,
f7efc0 45                                     challenge_decider,
TS 46                                     log_stream,
b482a1 47                                     log_level=logging.DEBUG,
TS 48                                     remote_user_key=remote_user_key,
49                                    )
f7efc0 50         return mw
TS 51
afe561 52     def _makeEnviron(self, kw=None):
02a504 53         from wsgiref.util import setup_testing_defaults
afe561 54         environ = {}
02a504 55         setup_testing_defaults(environ)
afe561 56         if kw is not None:
TS 57             environ.update(kw)
58         return environ
59
cbc983 60     def test_ctor_positional_args(self):
TS 61         klass = self._getTargetClass()
62         app = DummyApp()
63         identifiers = []
64         authenticators = []
65         challengers = []
66         request_classifier = DummyRequestClassifier()
67         mdproviders = []
68         challenge_decider = DummyChallengeDecider()
69         mw = klass(app,
70                    identifiers,
71                    authenticators,
72                    challengers,
73                    mdproviders,
74                    request_classifier,
75                    challenge_decider,
76                   )
77         self.assertEqual(mw.app, app)
78         af = mw.api_factory
79         self.assertEqual(af.identifiers, identifiers)
80         self.assertEqual(af.authenticators, authenticators)
81         self.assertEqual(af.challengers, challengers)
82         self.assertEqual(af.mdproviders, mdproviders)
83         self.assertEqual(af.request_classifier, request_classifier)
84         self.assertEqual(af.challenge_decider, challenge_decider)
85
86     def test_ctor_wo_request_classifier_or_classifier_raises(self):
87         # BBB for old argument name
88         klass = self._getTargetClass()
89         app = DummyApp()
90         identifiers = []
91         authenticators = []
92         challengers = []
93         mdproviders = []
94         challenge_decider = DummyChallengeDecider()
95         self.assertRaises(ValueError,
96                           klass,
97                           app,
98                           identifiers,
99                           authenticators,
100                           challengers,
101                           mdproviders,
102                           challenge_decider = challenge_decider,
103                           )
104
105     def test_ctor_w_request_classifier_and_classifier_raises(self):
106         # BBB for old argument name
107         klass = self._getTargetClass()
108         app = DummyApp()
109         identifiers = []
110         authenticators = []
111         challengers = []
112         request_classifier = DummyRequestClassifier()
113         mdproviders = []
114         challenge_decider = DummyChallengeDecider()
115         self.assertRaises(ValueError,
116                           klass,
117                           app,
118                           identifiers,
119                           authenticators,
120                           challengers,
121                           mdproviders,
122                           request_classifier,
123                           challenge_decider,
124                           classifier = object()
125                           )
126
127     def test_ctor_wo_challenge_decider_raises(self):
128         # BBB for old argument name
129         klass = self._getTargetClass()
130         app = DummyApp()
131         identifiers = []
132         authenticators = []
133         challengers = []
134         request_classifier = DummyRequestClassifier()
135         mdproviders = []
136         self.assertRaises(ValueError,
137                           klass,
138                           app,
139                           identifiers,
140                           authenticators,
141                           challengers,
142                           mdproviders,
143                           classifier = request_classifier,
144                           )
145
146     def test_ctor_w_classifier(self):
147         # BBB for old argument name
148         klass = self._getTargetClass()
149         app = DummyApp()
150         identifiers = []
151         authenticators = []
152         challengers = []
153         request_classifier = DummyRequestClassifier()
154         mdproviders = []
155         challenge_decider = DummyChallengeDecider()
156         mw = klass(app,
157                    identifiers,
158                    authenticators,
159                    challengers,
160                    mdproviders,
161                    classifier = request_classifier,
162                    challenge_decider = challenge_decider,
163                   )
164         self.assertEqual(mw.app, app)
165         af = mw.api_factory
166         self.assertEqual(af.identifiers, identifiers)
167         self.assertEqual(af.authenticators, authenticators)
168         self.assertEqual(af.challengers, challengers)
169         self.assertEqual(af.mdproviders, mdproviders)
170         self.assertEqual(af.request_classifier, request_classifier)
171         self.assertEqual(af.challenge_decider, challenge_decider)
172
173     def test_ctor_accepts_logger(self):
f7efc0 174         import logging
140fb9 175         restore = logging.raiseExceptions
TS 176         logging.raiseExceptions = 0
177         try:
178             logger = logging.Logger('something')
179             logger.setLevel(logging.INFO)
180             mw = self._makeOne(log_stream=logger)
181             self.assertEqual(logger, mw.logger)
182         finally:
183             logging.raiseExceptions = restore
f7efc0 184
TS 185     def test_call_remoteuser_already_set(self):
186         environ = self._makeEnviron({'REMOTE_USER':'admin'})
187         mw = self._makeOne()
188         result = mw(environ, None)
189         self.assertEqual(mw.app.environ, environ)
190         self.assertEqual(result, [])
191
192     def test_call_200_no_plugins(self):
193         environ = self._makeEnviron()
194         headers = [('a', '1')]
195         app = DummyWorkingApp('200 OK', headers)
196         mw = self._makeOne(app=app)
197         start_response = DummyStartResponse()
198         result = mw(environ, start_response)
199         self.assertEqual(mw.app.environ, environ)
200         self.assertEqual(result, ['body'])
201         self.assertEqual(start_response.status, '200 OK')
202         self.assertEqual(start_response.headers, headers)
203
204     def test_call_401_no_challengers(self):
205         environ = self._makeEnviron()
206         headers = [('a', '1')]
207         app = DummyWorkingApp('401 Unauthorized', headers)
208         mw = self._makeOne(app=app)
209         start_response = DummyStartResponse()
210         self.assertRaises(RuntimeError, mw, environ, start_response)
211
212     def test_call_200_no_challengers(self):
213         environ = self._makeEnviron()
214         headers = [('a', '1')]
215         app = DummyWorkingApp('200 OK', headers)
216         credentials = {'login':'chris', 'password':'password'}
217         identifier = DummyIdentifier(credentials)
218         identifiers = [ ('identifier', identifier) ]
219         mw = self._makeOne(app=app, identifiers=identifiers)
220         start_response = DummyStartResponse()
221         result = mw(environ, start_response)
222         self.assertEqual(mw.app.environ, environ)
223         self.assertEqual(result, ['body'])
224         self.assertEqual(start_response.status, '200 OK')
225         self.assertEqual(start_response.headers, headers)
226
455778 227     def test_call_200_no_challengers_app_calls_forget(self):
TS 228         # See https://github.com/repoze/repoze.who/issues/21
229         environ = self._makeEnviron()
230         remember_headers = [('remember', '1')]
231         forget_headers = [('forget', '1')]
232         app = DummyLogoutApp('200 OK')
233         credentials = {'login':'chris', 'password':'password'}
234         identifier = DummyIdentifier(
235             credentials,
236             remember_headers=remember_headers,
237             forget_headers=forget_headers)
238         identifiers = [ ('identifier', identifier) ]
239         authenticator = DummyAuthenticator()
240         authenticators = [ ('authenticator', authenticator) ]
241         mw = self._makeOne(
242             app=app, identifiers=identifiers, authenticators=authenticators)
243         start_response = DummyStartResponse()
244         result = mw(environ, start_response)
245         self.assertEqual(mw.app.environ, environ)
246         self.assertEqual(result, ['body'])
247         self.assertEqual(start_response.status, '200 OK')
248         self.assertEqual(start_response.headers, forget_headers)
249
f7efc0 250     def test_call_401_no_identifiers(self):
02a504 251         from webob.exc import HTTPUnauthorized
f7efc0 252         environ = self._makeEnviron()
TS 253         headers = [('a', '1')]
254         app = DummyWorkingApp('401 Unauthorized', headers)
255         challenge_app = HTTPUnauthorized()
256         challenge = DummyChallenger(challenge_app)
257         challengers = [ ('challenge', challenge) ]
258         mw = self._makeOne(app=app, challengers=challengers)
259         start_response = DummyStartResponse()
bc538a 260         result = b''.join(mw(environ, start_response)).decode('ascii')
f7efc0 261         self.assertEqual(environ['challenged'], challenge_app)
826ba0 262         self.assertTrue(result.startswith('401 Unauthorized'))
f7efc0 263
TS 264     def test_call_401_challenger_and_identifier_no_authenticator(self):
02a504 265         from webob.exc import HTTPUnauthorized
f7efc0 266         environ = self._makeEnviron()
TS 267         headers = [('a', '1')]
268         app = DummyWorkingApp('401 Unauthorized', headers)
269         challenge_app = HTTPUnauthorized()
270         challenge = DummyChallenger(challenge_app)
271         challengers = [ ('challenge', challenge) ]
272         credentials = {'login':'a', 'password':'b'}
273         identifier = DummyIdentifier(credentials)
274         identifiers = [ ('identifier', identifier) ]
275         mw = self._makeOne(app=app, challengers=challengers,
276                            identifiers=identifiers)
277         start_response = DummyStartResponse()
278
bc538a 279         result = b''.join(mw(environ, start_response)).decode('ascii')
f7efc0 280         self.assertEqual(environ['challenged'], challenge_app)
826ba0 281         self.assertTrue(result.startswith('401 Unauthorized'))
f7efc0 282         self.assertEqual(identifier.forgotten, False)
TS 283         self.assertEqual(environ.get('REMOTE_USER'), None)
284
285     def test_call_401_challenger_and_identifier_and_authenticator(self):
02a504 286         from webob.exc import HTTPUnauthorized
f7efc0 287         environ = self._makeEnviron()
TS 288         headers = [('a', '1')]
289         app = DummyWorkingApp('401 Unauthorized', headers)
290         challenge_app = HTTPUnauthorized()
291         challenge = DummyChallenger(challenge_app)
292         challengers = [ ('challenge', challenge) ]
293         credentials = {'login':'chris', 'password':'password'}
294         identifier = DummyIdentifier(credentials)
295         identifiers = [ ('identifier', identifier) ]
296         authenticator = DummyAuthenticator()
297         authenticators = [ ('authenticator', authenticator) ]
298         mw = self._makeOne(app=app, challengers=challengers,
299                            identifiers=identifiers,
300                            authenticators=authenticators)
301         start_response = DummyStartResponse()
bc538a 302         result = b''.join(mw(environ, start_response)).decode('ascii')
f7efc0 303         self.assertEqual(environ['challenged'], challenge_app)
826ba0 304         self.assertTrue(result.startswith('401 Unauthorized'))
f7efc0 305         # @@ unfuck
TS 306 ##         self.assertEqual(identifier.forgotten, identifier.credentials)
307         self.assertEqual(environ['REMOTE_USER'], 'chris')
308 ##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
309
310     def test_call_200_challenger_and_identifier_and_authenticator(self):
02a504 311         from webob.exc import HTTPUnauthorized
f7efc0 312         environ = self._makeEnviron()
TS 313         headers = [('a', '1')]
314         app = DummyWorkingApp('200 OK', headers)
315         challenge_app = HTTPUnauthorized()
316         challenge = DummyChallenger(challenge_app)
317         challengers = [ ('challenge', challenge) ]
318         credentials = {'login':'chris', 'password':'password'}
319         identifier = DummyIdentifier(credentials)
320         identifiers = [ ('identifier', identifier) ]
321         authenticator = DummyAuthenticator()
322         authenticators = [ ('authenticator', authenticator) ]
323         mw = self._makeOne(app=app, challengers=challengers,
324                            identifiers=identifiers,
325                            authenticators=authenticators)
326         start_response = DummyStartResponse()
327         result = mw(environ, start_response)
328         self.assertEqual(environ.get('challenged'), None)
329         self.assertEqual(identifier.forgotten, False)
330         # @@ figure out later
331 ##         self.assertEqual(dict(identifier.remembered)['login'], dict(identifier.credentials)['login'])
332 ##         self.assertEqual(dict(identifier.remembered)['password'], dict(identifier.credentials)['password'])
333         self.assertEqual(environ['REMOTE_USER'], 'chris')
334 ##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
335
336
337     def test_call_200_identity_reset(self):
02a504 338         from webob.exc import HTTPUnauthorized
f7efc0 339         environ = self._makeEnviron()
TS 340         headers = [('a', '1')]
341         new_identity = {'user_id':'foo', 'password':'bar'}
342         app = DummyIdentityResetApp('200 OK', headers, new_identity)
343         challenge_app = HTTPUnauthorized()
344         challenge = DummyChallenger(challenge_app)
345         challengers = [ ('challenge', challenge) ]
346         credentials = {'login':'chris', 'password':'password'}
347         identifier = DummyIdentifier(credentials)
348         identifiers = [ ('identifier', identifier) ]
349         authenticator = DummyAuthenticator()
350         authenticators = [ ('authenticator', authenticator) ]
351         mw = self._makeOne(app=app, challengers=challengers,
352                            identifiers=identifiers,
353                            authenticators=authenticators)
354         start_response = DummyStartResponse()
355         result = mw(environ, start_response)
356         self.assertEqual(environ.get('challenged'), None)
357         self.assertEqual(identifier.forgotten, False)
358         new_credentials = identifier.credentials.copy()
359         new_credentials['login'] = 'fred'
360         new_credentials['password'] = 'schooled'
361         # @@ unfuck
362 ##         self.assertEqual(identifier.remembered, new_credentials)
363         self.assertEqual(environ['REMOTE_USER'], 'chris')
364 ##         self.assertEqual(environ['repoze.who.identity'], new_credentials)
365
366     def test_call_200_with_metadata(self):
02a504 367         from webob.exc import HTTPUnauthorized
f7efc0 368         environ = self._makeEnviron()
TS 369         headers = [('a', '1')]
370         app = DummyWorkingApp('200 OK', headers)
371         challenge_app = HTTPUnauthorized()
372         challenge = DummyChallenger(challenge_app)
373         challengers = [ ('challenge', challenge) ]
374         credentials = {'login':'chris', 'password':'password'}
375         identifier = DummyIdentifier(credentials)
376         identifiers = [ ('identifier', identifier) ]
377         authenticator = DummyAuthenticator()
378         authenticators = [ ('authenticator', authenticator) ]
379         mdprovider = DummyMDProvider({'foo':'bar'})
380         mdproviders = [ ('mdprovider', mdprovider) ]
381         mw = self._makeOne(app=app, challengers=challengers,
382                            identifiers=identifiers,
383                            authenticators=authenticators,
384                            mdproviders=mdproviders)
385         start_response = DummyStartResponse()
386         result = mw(environ, start_response)
387         # metadata
388         self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
389
390     def test_call_ingress_plugin_replaces_application(self):
02a504 391         from webob.exc import HTTPFound
f7efc0 392         environ = self._makeEnviron()
TS 393         headers = [('a', '1')]
394         app = DummyWorkingApp('200 OK', headers)
395         challengers = []
396         credentials = {'login':'chris', 'password':'password'}
397         identifier = DummyIdentifier(
398             credentials,
399             remember_headers=[('a', '1')],
400             replace_app = HTTPFound('http://example.com/redirect')
401             )
402         identifiers = [ ('identifier', identifier) ]
403         authenticator = DummyAuthenticator()
404         authenticators = [ ('authenticator', authenticator) ]
405         mdproviders = []
406         mw = self._makeOne(app=app,
407                            challengers=challengers,
408                            identifiers=identifiers,
409                            authenticators=authenticators,
410                            mdproviders=mdproviders)
411         start_response = DummyStartResponse()
bc538a 412         result = b''.join(mw(environ, start_response)).decode('ascii')
826ba0 413         self.assertTrue(result.startswith('302 Found'))
f7efc0 414         self.assertEqual(start_response.status, '302 Found')
TS 415         headers = start_response.headers
02a504 416         #self.assertEqual(len(headers), 3, headers)
AO 417         #self.assertEqual(headers[0],
418         #                 ('Location', 'http://example.com/redirect'))
f7efc0 419         self.assertEqual(headers[2],
02a504 420                          ('Content-Type', 'text/plain; charset=UTF-8'))
AO 421         self.assertEqual(headers[3],
f7efc0 422                          ('a', '1'))
TS 423         self.assertEqual(start_response.exc_info, None)
826ba0 424         self.assertFalse('repoze.who.application' in environ)
f7efc0 425
TS 426     def test_call_app_doesnt_call_start_response(self):
02a504 427         from webob.exc import HTTPUnauthorized
f7efc0 428         environ = self._makeEnviron()
TS 429         headers = [('a', '1')]
430         app = DummyGeneratorApp('200 OK', headers)
431         challenge_app = HTTPUnauthorized()
432         challenge = DummyChallenger(challenge_app)
433         challengers = [ ('challenge', challenge) ]
434         credentials = {'login':'chris', 'password':'password'}
435         identifier = DummyIdentifier(credentials)
436         identifiers = [ ('identifier', identifier) ]
437         authenticator = DummyAuthenticator()
438         authenticators = [ ('authenticator', authenticator) ]
439         mdprovider = DummyMDProvider({'foo':'bar'})
440         mdproviders = [ ('mdprovider', mdprovider) ]
441         mw = self._makeOne(app=app, challengers=challengers,
442                            identifiers=identifiers,
443                            authenticators=authenticators,
444                            mdproviders=mdproviders)
445         start_response = DummyStartResponse()
446         result = mw(environ, start_response)
447         # metadata
448         self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
449
b01f44 450     def test_call_w_challenge_closes_iterable(self):
02a504 451         from webob.exc import HTTPUnauthorized
b01f44 452         environ = self._makeEnviron()
TS 453         headers = [('a', '1')]
454         app = DummyIterableWithCloseApp('401 Unauthorized', headers)
455         challenge_app = HTTPUnauthorized()
456         challenge = DummyChallenger(challenge_app)
457         challengers = [ ('challenge', challenge) ]
458         credentials = {'login':'chris', 'password':'password'}
459         identifier = DummyIdentifier(credentials)
460         identifiers = [ ('identifier', identifier) ]
461         authenticator = DummyAuthenticator()
462         authenticators = [ ('authenticator', authenticator) ]
463         mdprovider = DummyMDProvider({'foo':'bar'})
464         mdproviders = [ ('mdprovider', mdprovider) ]
465         mw = self._makeOne(app=app, challengers=challengers,
466                            identifiers=identifiers,
467                            authenticators=authenticators,
468                            mdproviders=mdproviders)
469         start_response = DummyStartResponse()
bc538a 470         result = b''.join(mw(environ, start_response)).decode('ascii')
826ba0 471         self.assertTrue(result.startswith('401 Unauthorized'))
TS 472         self.assertTrue(app._iterable._closed)
b01f44 473
493726 474     def test_call_w_challenge_but_no_challenger_still_closes_iterable(self):
TS 475         environ = self._makeEnviron()
476         headers = [('a', '1')]
477         app = DummyIterableWithCloseApp('401 Unauthorized', headers)
478         challengers = []
479         credentials = {'login':'chris', 'password':'password'}
480         identifier = DummyIdentifier(credentials)
481         identifiers = [ ('identifier', identifier) ]
482         authenticator = DummyAuthenticator()
483         authenticators = [ ('authenticator', authenticator) ]
484         mdprovider = DummyMDProvider({'foo':'bar'})
485         mdproviders = [ ('mdprovider', mdprovider) ]
486         mw = self._makeOne(app=app, challengers=challengers,
487                            identifiers=identifiers,
488                            authenticators=authenticators,
489                            mdproviders=mdproviders)
490         start_response = DummyStartResponse()
491         self.assertRaises(RuntimeError, mw, environ, start_response)
826ba0 492         self.assertTrue(app._iterable._closed)
493726 493
f7efc0 494     # XXX need more call tests:
TS 495     #  - auth_id sorting
496
826ba0 497 class TestStartResponseWrapper(unittest.TestCase):
f7efc0 498
TS 499     def _getTargetClass(self):
500         from repoze.who.middleware import StartResponseWrapper
501         return StartResponseWrapper
502
503     def _makeOne(self, *arg, **kw):
504         plugin = self._getTargetClass()(*arg, **kw)
505         return plugin
506
507     def test_ctor(self):
508         wrapper = self._makeOne(None)
509         self.assertEqual(wrapper.start_response, None)
510         self.assertEqual(wrapper.headers, [])
826ba0 511         self.assertTrue(wrapper.buffer)
f7efc0 512
TS 513     def test_finish_response(self):
adef05 514         from repoze.who._compat import StringIO
f7efc0 515         statuses = []
TS 516         headerses = []
517         datases = []
518         closededs = []
519         def write(data):
520             datases.append(data)
521         def close():
522             closededs.append(True)
523         write.close = close
524
525         def start_response(status, headers, exc_info=None):
526             statuses.append(status)
527             headerses.append(headers)
528             return write
529
530         wrapper = self._makeOne(start_response)
531         wrapper.status = '401 Unauthorized'
532         wrapper.headers = [('a', '1')]
533         wrapper.buffer = StringIO('written')
534         extra_headers = [('b', '2')]
535         result = wrapper.finish_response(extra_headers)
536         self.assertEqual(result, None)
537         self.assertEqual(headerses[0], wrapper.headers + extra_headers)
538         self.assertEqual(statuses[0], wrapper.status)
539         self.assertEqual(datases[0], 'written')
540         self.assertEqual(closededs[0], True)
541
826ba0 542 class WrapGeneratorTests(unittest.TestCase):
f7efc0 543
b01f44 544     def _callFUT(self, iterable):
f7efc0 545         from repoze.who.middleware import wrap_generator
b01f44 546         return wrap_generator(iterable)
f7efc0 547
b01f44 548     def test_w_generator(self):
f7efc0 549         L = []
TS 550         def gen(L=L):
551             L.append('yo!')
552             yield 'a'
553             yield 'b'
b01f44 554         newgen = self._callFUT(gen())
f7efc0 555         self.assertEqual(L, ['yo!'])
TS 556         self.assertEqual(list(newgen), ['a', 'b'])
b01f44 557
19d219 558     def test_w_empty_generator(self):
TS 559         def gen():
560             if False:
d13829 561                 yield 'a'  # pragma: no cover
19d219 562         newgen = self._callFUT(gen())
TS 563         self.assertEqual(list(newgen), [])
564
b01f44 565     def test_w_iterator_having_close(self):
TS 566         def gen():
567             yield 'a'
568             yield 'b'
569         iterable = DummyIterableWithClose(gen())
570         newgen = self._callFUT(iterable)
826ba0 571         self.assertFalse(iterable._closed)
b01f44 572         self.assertEqual(list(newgen), ['a', 'b'])
826ba0 573         self.assertTrue(iterable._closed)
f7efc0 574
826ba0 575 class TestMakeTestMiddleware(unittest.TestCase):
d32c12 576
TS 577     def setUp(self):
578         import os
d13829 579         try:
TS 580             del os.environ['WHO_LOG']
581         except KeyError:
582             pass
d32c12 583
TS 584     def tearDown(self):
585         import os
d13829 586         try:
TS 587             del os.environ['WHO_LOG']
588         except KeyError:
589             pass
d32c12 590
TS 591     def _getFactory(self):
592         from repoze.who.middleware import make_test_middleware
593         return make_test_middleware
594
595     def test_it_no_WHO_LOG_in_environ(self):
596         app = DummyApp()
597         factory = self._getFactory()
598         global_conf = {'here': '/'}
599         middleware = factory(app, global_conf)
993216 600         api_factory = middleware.api_factory
d7f613 601         self.assertEqual(len(api_factory.identifiers), 2)
993216 602         self.assertEqual(len(api_factory.authenticators), 1)
TS 603         self.assertEqual(len(api_factory.challengers), 2)
604         self.assertEqual(len(api_factory.mdproviders), 0)
d32c12 605         self.assertEqual(middleware.logger, None)
TS 606
607     def test_it_w_WHO_LOG_in_environ(self):
608         import logging
609         import os
610         os.environ['WHO_LOG'] = '1'
611         app = DummyApp()
612         factory = self._getFactory()
613         global_conf = {'here': '/'}
614         middleware = factory(app, global_conf)
615         self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG)
616
d13829 617 class DummyApp(object):
f7efc0 618     environ = None
TS 619     def __call__(self, environ, start_response):
620         self.environ = environ
621         return []
622
d13829 623 class DummyWorkingApp(object):
f7efc0 624     def __init__(self, status, headers):
TS 625         self.status = status
626         self.headers = headers
627
628     def __call__(self, environ, start_response):
629         self.environ = environ
630         start_response(self.status, self.headers)
631         return ['body']
632
455778 633 class DummyLogoutApp(object):
TS 634     def __init__(self, status):
635         self.status = status
636
637     def __call__(self, environ, start_response):
638         self.environ = environ
639         api = environ['repoze.who.api']
640         headers = api.logout()
641         start_response(self.status, headers)
642         return ['body']
643
d13829 644 class DummyGeneratorApp(object):
f7efc0 645     def __init__(self, status, headers):
TS 646         self.status = status
647         self.headers = headers
648
649     def __call__(self, environ, start_response):
650         def gen(self=self, start_response=start_response):
651             self.environ = environ
652             start_response(self.status, self.headers)
653             yield 'body'
654         return gen()
655
d13829 656 class DummyIterableWithClose(object):
b01f44 657     _closed = False
TS 658     def __init__(self, iterable):
659         self._iterable = iterable
660     def __iter__(self):
661         return iter(self._iterable)
662     def close(self):
663         self._closed = True
664
d13829 665 class DummyIterableWithCloseApp(object):
b01f44 666     def __init__(self, status, headers):
TS 667         self.status = status
668         self.headers = headers
669         self._iterable = DummyIterableWithClose(['body'])
670
671     def __call__(self, environ, start_response):
672         self.environ = environ
673         start_response(self.status, self.headers)
674         return self._iterable
675
d13829 676 class DummyIdentityResetApp(object):
f7efc0 677     def __init__(self, status, headers, new_identity):
TS 678         self.status = status
679         self.headers = headers
680         self.new_identity = new_identity
681
682     def __call__(self, environ, start_response):
683         self.environ = environ
684         environ['repoze.who.identity']['login'] = 'fred'
685         environ['repoze.who.identity']['password'] = 'schooled'
686         start_response(self.status, self.headers)
687         return ['body']
688
d13829 689 class DummyChallenger(object):
f7efc0 690     def __init__(self, app=None):
TS 691         self.app = app
692
693     def challenge(self, environ, status, app_headers, forget_headers):
694         environ['challenged'] = self.app
695         return self.app
696
d13829 697 class DummyIdentifier(object):
f7efc0 698     forgotten = False
TS 699     remembered = False
700
701     def __init__(self, credentials=None, remember_headers=None,
702                  forget_headers=None, replace_app=None):
703         self.credentials = credentials
704         self.remember_headers = remember_headers
705         self.forget_headers = forget_headers
706         self.replace_app = replace_app
707
708     def identify(self, environ):
709         if self.replace_app:
710             environ['repoze.who.application'] = self.replace_app
711         return self.credentials
712
713     def forget(self, environ, identity):
714         self.forgotten = identity
715         return self.forget_headers
716
717     def remember(self, environ, identity):
718         self.remembered = identity
719         return self.remember_headers
720
d13829 721 class DummyAuthenticator(object):
f7efc0 722     def authenticate(self, environ, credentials):
d13829 723         return credentials['login']
f7efc0 724
d13829 725 class DummyRequestClassifier(object):
f7efc0 726     def __call__(self, environ):
TS 727         return 'browser'
728
d13829 729 class DummyChallengeDecider(object):
f7efc0 730     def __call__(self, environ, status, headers):
TS 731         if status.startswith('401 '):
732             return True
733
d13829 734 class DummyStartResponse(object):
f7efc0 735     def __call__(self, status, headers, exc_info=None):
TS 736         self.status = status
737         self.headers = headers
738         self.exc_info = exc_info
739         return []
740
d13829 741 class DummyMDProvider(object):
f7efc0 742     def __init__(self, metadata=None):
TS 743         self._metadata = metadata
744
745     def add_metadata(self, environ, identity):
746         return identity.update(self._metadata)