1import unittest
2
3
4class TestMiddleware(unittest.TestCase):
5
6    def _getTargetClass(self):
7        from repoze.who.middleware import PluggableAuthenticationMiddleware
8        return PluggableAuthenticationMiddleware
9
10    def _makeOne(self,
11                 app=None,
12                 identifiers=None,
13                 authenticators=None,
14                 challengers=None,
15                 request_classifier=None,
16                 mdproviders=None,
17                 challenge_decider=None,
18                 log_stream=None,
19                 log_level=None,
20                 remote_user_key='REMOTE_USER',
21                 ):
22        if app is None:
23            app = DummyApp()
24        if identifiers is None:
25            identifiers = []
26        if authenticators is None:
27            authenticators = []
28        if challengers is None:
29            challengers = []
30        if request_classifier is None:
31            request_classifier = DummyRequestClassifier()
32        if mdproviders is None:
33            mdproviders = []
34        if challenge_decider is None:
35            challenge_decider = DummyChallengeDecider()
36        if log_level is None:
37            import logging
38            log_level = logging.DEBUG
39        mw = self._getTargetClass()(app,
40                                    identifiers,
41                                    authenticators,
42                                    challengers,
43                                    mdproviders,
44                                    request_classifier,
45                                    challenge_decider,
46                                    log_stream,
47                                    log_level=logging.DEBUG,
48                                    remote_user_key=remote_user_key,
49                                   )
50        return mw
51
52    def _makeEnviron(self, kw=None):
53        from wsgiref.util import setup_testing_defaults
54        environ = {}
55        setup_testing_defaults(environ)
56        if kw is not None:
57            environ.update(kw)
58        return environ
59
60    def test_ctor_positional_args(self):
61        klass = self._getTargetClass()
62        app = DummyApp()
63        identifiers = []
64        authenticators = []
65        challengers = []
66        request_classifier = DummyRequestClassifier()
67        mdproviders = []
68        challenge_decider = DummyChallengeDecider()
69        mw = klass(app,
70                   identifiers,
71                   authenticators,
72                   challengers,
73                   mdproviders,
74                   request_classifier,
75                   challenge_decider,
76                  )
77        self.assertEqual(mw.app, app)
78        af = mw.api_factory
79        self.assertEqual(af.identifiers, identifiers)
80        self.assertEqual(af.authenticators, authenticators)
81        self.assertEqual(af.challengers, challengers)
82        self.assertEqual(af.mdproviders, mdproviders)
83        self.assertEqual(af.request_classifier, request_classifier)
84        self.assertEqual(af.challenge_decider, challenge_decider)
85
86    def test_ctor_wo_request_classifier_or_classifier_raises(self):
87        # BBB for old argument name
88        klass = self._getTargetClass()
89        app = DummyApp()
90        identifiers = []
91        authenticators = []
92        challengers = []
93        mdproviders = []
94        challenge_decider = DummyChallengeDecider()
95        self.assertRaises(ValueError,
96                          klass,
97                          app,
98                          identifiers,
99                          authenticators,
100                          challengers,
101                          mdproviders,
102                          challenge_decider = challenge_decider,
103                          )
104
105    def test_ctor_w_request_classifier_and_classifier_raises(self):
106        # BBB for old argument name
107        klass = self._getTargetClass()
108        app = DummyApp()
109        identifiers = []
110        authenticators = []
111        challengers = []
112        request_classifier = DummyRequestClassifier()
113        mdproviders = []
114        challenge_decider = DummyChallengeDecider()
115        self.assertRaises(ValueError,
116                          klass,
117                          app,
118                          identifiers,
119                          authenticators,
120                          challengers,
121                          mdproviders,
122                          request_classifier,
123                          challenge_decider,
124                          classifier = object()
125                          )
126
127    def test_ctor_wo_challenge_decider_raises(self):
128        # BBB for old argument name
129        klass = self._getTargetClass()
130        app = DummyApp()
131        identifiers = []
132        authenticators = []
133        challengers = []
134        request_classifier = DummyRequestClassifier()
135        mdproviders = []
136        self.assertRaises(ValueError,
137                          klass,
138                          app,
139                          identifiers,
140                          authenticators,
141                          challengers,
142                          mdproviders,
143                          classifier = request_classifier,
144                          )
145
146    def test_ctor_w_classifier(self):
147        # BBB for old argument name
148        klass = self._getTargetClass()
149        app = DummyApp()
150        identifiers = []
151        authenticators = []
152        challengers = []
153        request_classifier = DummyRequestClassifier()
154        mdproviders = []
155        challenge_decider = DummyChallengeDecider()
156        mw = klass(app,
157                   identifiers,
158                   authenticators,
159                   challengers,
160                   mdproviders,
161                   classifier = request_classifier,
162                   challenge_decider = challenge_decider,
163                  )
164        self.assertEqual(mw.app, app)
165        af = mw.api_factory
166        self.assertEqual(af.identifiers, identifiers)
167        self.assertEqual(af.authenticators, authenticators)
168        self.assertEqual(af.challengers, challengers)
169        self.assertEqual(af.mdproviders, mdproviders)
170        self.assertEqual(af.request_classifier, request_classifier)
171        self.assertEqual(af.challenge_decider, challenge_decider)
172
173    def test_ctor_accepts_logger(self):
174        import logging
175        restore = logging.raiseExceptions
176        logging.raiseExceptions = 0
177        try:
178            logger = logging.Logger('something')
179            logger.setLevel(logging.INFO)
180            mw = self._makeOne(log_stream=logger)
181            self.assertEqual(logger, mw.logger)
182        finally:
183            logging.raiseExceptions = restore
184
185    def test_call_remoteuser_already_set(self):
186        environ = self._makeEnviron({'REMOTE_USER':'admin'})
187        mw = self._makeOne()
188        result = mw(environ, None)
189        self.assertEqual(mw.app.environ, environ)
190        self.assertEqual(result, [])
191
192    def test_call_200_no_plugins(self):
193        environ = self._makeEnviron()
194        headers = [('a', '1')]
195        app = DummyWorkingApp('200 OK', headers)
196        mw = self._makeOne(app=app)
197        start_response = DummyStartResponse()
198        result = mw(environ, start_response)
199        self.assertEqual(mw.app.environ, environ)
200        self.assertEqual(result, ['body'])
201        self.assertEqual(start_response.status, '200 OK')
202        self.assertEqual(start_response.headers, headers)
203
204    def test_call_401_no_challengers(self):
205        environ = self._makeEnviron()
206        headers = [('a', '1')]
207        app = DummyWorkingApp('401 Unauthorized', headers)
208        mw = self._makeOne(app=app)
209        start_response = DummyStartResponse()
210        self.assertRaises(RuntimeError, mw, environ, start_response)
211
212    def test_call_200_no_challengers(self):
213        environ = self._makeEnviron()
214        headers = [('a', '1')]
215        app = DummyWorkingApp('200 OK', headers)
216        credentials = {'login':'chris', 'password':'password'}
217        identifier = DummyIdentifier(credentials)
218        identifiers = [ ('identifier', identifier) ]
219        mw = self._makeOne(app=app, identifiers=identifiers)
220        start_response = DummyStartResponse()
221        result = mw(environ, start_response)
222        self.assertEqual(mw.app.environ, environ)
223        self.assertEqual(result, ['body'])
224        self.assertEqual(start_response.status, '200 OK')
225        self.assertEqual(start_response.headers, headers)
226
227    def test_call_200_no_challengers_app_calls_forget(self):
228        # See https://github.com/repoze/repoze.who/issues/21
229        environ = self._makeEnviron()
230        remember_headers = [('remember', '1')]
231        forget_headers = [('forget', '1')]
232        app = DummyLogoutApp('200 OK')
233        credentials = {'login':'chris', 'password':'password'}
234        identifier = DummyIdentifier(
235            credentials,
236            remember_headers=remember_headers,
237            forget_headers=forget_headers)
238        identifiers = [ ('identifier', identifier) ]
239        authenticator = DummyAuthenticator()
240        authenticators = [ ('authenticator', authenticator) ]
241        mw = self._makeOne(
242            app=app, identifiers=identifiers, authenticators=authenticators)
243        start_response = DummyStartResponse()
244        result = mw(environ, start_response)
245        self.assertEqual(mw.app.environ, environ)
246        self.assertEqual(result, ['body'])
247        self.assertEqual(start_response.status, '200 OK')
248        self.assertEqual(start_response.headers, forget_headers)
249
250    def test_call_401_no_identifiers(self):
251        from webob.exc import HTTPUnauthorized
252        environ = self._makeEnviron()
253        headers = [('a', '1')]
254        app = DummyWorkingApp('401 Unauthorized', headers)
255        challenge_app = HTTPUnauthorized()
256        challenge = DummyChallenger(challenge_app)
257        challengers = [ ('challenge', challenge) ]
258        mw = self._makeOne(app=app, challengers=challengers)
259        start_response = DummyStartResponse()
260        result = b''.join(mw(environ, start_response)).decode('ascii')
261        self.assertEqual(environ['challenged'], challenge_app)
262        self.assertTrue(result.startswith('401 Unauthorized'))
263
264    def test_call_401_challenger_and_identifier_no_authenticator(self):
265        from webob.exc import HTTPUnauthorized
266        environ = self._makeEnviron()
267        headers = [('a', '1')]
268        app = DummyWorkingApp('401 Unauthorized', headers)
269        challenge_app = HTTPUnauthorized()
270        challenge = DummyChallenger(challenge_app)
271        challengers = [ ('challenge', challenge) ]
272        credentials = {'login':'a', 'password':'b'}
273        identifier = DummyIdentifier(credentials)
274        identifiers = [ ('identifier', identifier) ]
275        mw = self._makeOne(app=app, challengers=challengers,
276                           identifiers=identifiers)
277        start_response = DummyStartResponse()
278
279        result = b''.join(mw(environ, start_response)).decode('ascii')
280        self.assertEqual(environ['challenged'], challenge_app)
281        self.assertTrue(result.startswith('401 Unauthorized'))
282        self.assertEqual(identifier.forgotten, False)
283        self.assertEqual(environ.get('REMOTE_USER'), None)
284
285    def test_call_401_challenger_and_identifier_and_authenticator(self):
286        from webob.exc import HTTPUnauthorized
287        environ = self._makeEnviron()
288        headers = [('a', '1')]
289        app = DummyWorkingApp('401 Unauthorized', headers)
290        challenge_app = HTTPUnauthorized()
291        challenge = DummyChallenger(challenge_app)
292        challengers = [ ('challenge', challenge) ]
293        credentials = {'login':'chris', 'password':'password'}
294        identifier = DummyIdentifier(credentials)
295        identifiers = [ ('identifier', identifier) ]
296        authenticator = DummyAuthenticator()
297        authenticators = [ ('authenticator', authenticator) ]
298        mw = self._makeOne(app=app, challengers=challengers,
299                           identifiers=identifiers,
300                           authenticators=authenticators)
301        start_response = DummyStartResponse()
302        result = b''.join(mw(environ, start_response)).decode('ascii')
303        self.assertEqual(environ['challenged'], challenge_app)
304        self.assertTrue(result.startswith('401 Unauthorized'))
305        # @@ unfuck
306##         self.assertEqual(identifier.forgotten, identifier.credentials)
307        self.assertEqual(environ['REMOTE_USER'], 'chris')
308##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
309
310    def test_call_200_challenger_and_identifier_and_authenticator(self):
311        from webob.exc import HTTPUnauthorized
312        environ = self._makeEnviron()
313        headers = [('a', '1')]
314        app = DummyWorkingApp('200 OK', headers)
315        challenge_app = HTTPUnauthorized()
316        challenge = DummyChallenger(challenge_app)
317        challengers = [ ('challenge', challenge) ]
318        credentials = {'login':'chris', 'password':'password'}
319        identifier = DummyIdentifier(credentials)
320        identifiers = [ ('identifier', identifier) ]
321        authenticator = DummyAuthenticator()
322        authenticators = [ ('authenticator', authenticator) ]
323        mw = self._makeOne(app=app, challengers=challengers,
324                           identifiers=identifiers,
325                           authenticators=authenticators)
326        start_response = DummyStartResponse()
327        result = mw(environ, start_response)
328        self.assertEqual(environ.get('challenged'), None)
329        self.assertEqual(identifier.forgotten, False)
330        # @@ figure out later
331##         self.assertEqual(dict(identifier.remembered)['login'], dict(identifier.credentials)['login'])
332##         self.assertEqual(dict(identifier.remembered)['password'], dict(identifier.credentials)['password'])
333        self.assertEqual(environ['REMOTE_USER'], 'chris')
334##         self.assertEqual(environ['repoze.who.identity'], identifier.credentials)
335
336
337    def test_call_200_identity_reset(self):
338        from webob.exc import HTTPUnauthorized
339        environ = self._makeEnviron()
340        headers = [('a', '1')]
341        new_identity = {'user_id':'foo', 'password':'bar'}
342        app = DummyIdentityResetApp('200 OK', headers, new_identity)
343        challenge_app = HTTPUnauthorized()
344        challenge = DummyChallenger(challenge_app)
345        challengers = [ ('challenge', challenge) ]
346        credentials = {'login':'chris', 'password':'password'}
347        identifier = DummyIdentifier(credentials)
348        identifiers = [ ('identifier', identifier) ]
349        authenticator = DummyAuthenticator()
350        authenticators = [ ('authenticator', authenticator) ]
351        mw = self._makeOne(app=app, challengers=challengers,
352                           identifiers=identifiers,
353                           authenticators=authenticators)
354        start_response = DummyStartResponse()
355        result = mw(environ, start_response)
356        self.assertEqual(environ.get('challenged'), None)
357        self.assertEqual(identifier.forgotten, False)
358        new_credentials = identifier.credentials.copy()
359        new_credentials['login'] = 'fred'
360        new_credentials['password'] = 'schooled'
361        # @@ unfuck
362##         self.assertEqual(identifier.remembered, new_credentials)
363        self.assertEqual(environ['REMOTE_USER'], 'chris')
364##         self.assertEqual(environ['repoze.who.identity'], new_credentials)
365
366    def test_call_200_with_metadata(self):
367        from webob.exc import HTTPUnauthorized
368        environ = self._makeEnviron()
369        headers = [('a', '1')]
370        app = DummyWorkingApp('200 OK', headers)
371        challenge_app = HTTPUnauthorized()
372        challenge = DummyChallenger(challenge_app)
373        challengers = [ ('challenge', challenge) ]
374        credentials = {'login':'chris', 'password':'password'}
375        identifier = DummyIdentifier(credentials)
376        identifiers = [ ('identifier', identifier) ]
377        authenticator = DummyAuthenticator()
378        authenticators = [ ('authenticator', authenticator) ]
379        mdprovider = DummyMDProvider({'foo':'bar'})
380        mdproviders = [ ('mdprovider', mdprovider) ]
381        mw = self._makeOne(app=app, challengers=challengers,
382                           identifiers=identifiers,
383                           authenticators=authenticators,
384                           mdproviders=mdproviders)
385        start_response = DummyStartResponse()
386        result = mw(environ, start_response)
387        # metadata
388        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
389
390    def test_call_ingress_plugin_replaces_application(self):
391        from webob.exc import HTTPFound
392        environ = self._makeEnviron()
393        headers = [('a', '1')]
394        app = DummyWorkingApp('200 OK', headers)
395        challengers = []
396        credentials = {'login':'chris', 'password':'password'}
397        identifier = DummyIdentifier(
398            credentials,
399            remember_headers=[('a', '1')],
400            replace_app = HTTPFound('http://example.com/redirect')
401            )
402        identifiers = [ ('identifier', identifier) ]
403        authenticator = DummyAuthenticator()
404        authenticators = [ ('authenticator', authenticator) ]
405        mdproviders = []
406        mw = self._makeOne(app=app,
407                           challengers=challengers,
408                           identifiers=identifiers,
409                           authenticators=authenticators,
410                           mdproviders=mdproviders)
411        start_response = DummyStartResponse()
412        result = b''.join(mw(environ, start_response)).decode('ascii')
413        self.assertTrue(result.startswith('302 Found'))
414        self.assertEqual(start_response.status, '302 Found')
415        headers = start_response.headers
416        #self.assertEqual(len(headers), 3, headers)
417        #self.assertEqual(headers[0],
418        #                 ('Location', 'http://example.com/redirect'))
419        self.assertEqual(headers[2],
420                         ('Content-Type', 'text/plain; charset=UTF-8'))
421        self.assertEqual(headers[3],
422                         ('a', '1'))
423        self.assertEqual(start_response.exc_info, None)
424        self.assertFalse('repoze.who.application' in environ)
425
426    def test_call_app_doesnt_call_start_response(self):
427        from webob.exc import HTTPUnauthorized
428        environ = self._makeEnviron()
429        headers = [('a', '1')]
430        app = DummyGeneratorApp('200 OK', headers)
431        challenge_app = HTTPUnauthorized()
432        challenge = DummyChallenger(challenge_app)
433        challengers = [ ('challenge', challenge) ]
434        credentials = {'login':'chris', 'password':'password'}
435        identifier = DummyIdentifier(credentials)
436        identifiers = [ ('identifier', identifier) ]
437        authenticator = DummyAuthenticator()
438        authenticators = [ ('authenticator', authenticator) ]
439        mdprovider = DummyMDProvider({'foo':'bar'})
440        mdproviders = [ ('mdprovider', mdprovider) ]
441        mw = self._makeOne(app=app, challengers=challengers,
442                           identifiers=identifiers,
443                           authenticators=authenticators,
444                           mdproviders=mdproviders)
445        start_response = DummyStartResponse()
446        result = mw(environ, start_response)
447        # metadata
448        self.assertEqual(environ['repoze.who.identity']['foo'], 'bar')
449
450    def test_call_w_challenge_closes_iterable(self):
451        from webob.exc import HTTPUnauthorized
452        environ = self._makeEnviron()
453        headers = [('a', '1')]
454        app = DummyIterableWithCloseApp('401 Unauthorized', headers)
455        challenge_app = HTTPUnauthorized()
456        challenge = DummyChallenger(challenge_app)
457        challengers = [ ('challenge', challenge) ]
458        credentials = {'login':'chris', 'password':'password'}
459        identifier = DummyIdentifier(credentials)
460        identifiers = [ ('identifier', identifier) ]
461        authenticator = DummyAuthenticator()
462        authenticators = [ ('authenticator', authenticator) ]
463        mdprovider = DummyMDProvider({'foo':'bar'})
464        mdproviders = [ ('mdprovider', mdprovider) ]
465        mw = self._makeOne(app=app, challengers=challengers,
466                           identifiers=identifiers,
467                           authenticators=authenticators,
468                           mdproviders=mdproviders)
469        start_response = DummyStartResponse()
470        result = b''.join(mw(environ, start_response)).decode('ascii')
471        self.assertTrue(result.startswith('401 Unauthorized'))
472        self.assertTrue(app._iterable._closed)
473
474    def test_call_w_challenge_but_no_challenger_still_closes_iterable(self):
475        environ = self._makeEnviron()
476        headers = [('a', '1')]
477        app = DummyIterableWithCloseApp('401 Unauthorized', headers)
478        challengers = []
479        credentials = {'login':'chris', 'password':'password'}
480        identifier = DummyIdentifier(credentials)
481        identifiers = [ ('identifier', identifier) ]
482        authenticator = DummyAuthenticator()
483        authenticators = [ ('authenticator', authenticator) ]
484        mdprovider = DummyMDProvider({'foo':'bar'})
485        mdproviders = [ ('mdprovider', mdprovider) ]
486        mw = self._makeOne(app=app, challengers=challengers,
487                           identifiers=identifiers,
488                           authenticators=authenticators,
489                           mdproviders=mdproviders)
490        start_response = DummyStartResponse()
491        self.assertRaises(RuntimeError, mw, environ, start_response)
492        self.assertTrue(app._iterable._closed)
493
494    # XXX need more call tests:
495    #  - auth_id sorting
496
497class TestStartResponseWrapper(unittest.TestCase):
498
499    def _getTargetClass(self):
500        from repoze.who.middleware import StartResponseWrapper
501        return StartResponseWrapper
502
503    def _makeOne(self, *arg, **kw):
504        plugin = self._getTargetClass()(*arg, **kw)
505        return plugin
506
507    def test_ctor(self):
508        wrapper = self._makeOne(None)
509        self.assertEqual(wrapper.start_response, None)
510        self.assertEqual(wrapper.headers, [])
511        self.assertTrue(wrapper.buffer)
512
513    def test_finish_response(self):
514        from repoze.who._compat import StringIO
515        statuses = []
516        headerses = []
517        datases = []
518        closededs = []
519        def write(data):
520            datases.append(data)
521        def close():
522            closededs.append(True)
523        write.close = close
524
525        def start_response(status, headers, exc_info=None):
526            statuses.append(status)
527            headerses.append(headers)
528            return write
529
530        wrapper = self._makeOne(start_response)
531        wrapper.status = '401 Unauthorized'
532        wrapper.headers = [('a', '1')]
533        wrapper.buffer = StringIO('written')
534        extra_headers = [('b', '2')]
535        result = wrapper.finish_response(extra_headers)
536        self.assertEqual(result, None)
537        self.assertEqual(headerses[0], wrapper.headers + extra_headers)
538        self.assertEqual(statuses[0], wrapper.status)
539        self.assertEqual(datases[0], 'written')
540        self.assertEqual(closededs[0], True)
541
542class WrapGeneratorTests(unittest.TestCase):
543
544    def _callFUT(self, iterable):
545        from repoze.who.middleware import wrap_generator
546        return wrap_generator(iterable)
547
548    def test_w_generator(self):
549        L = []
550        def gen(L=L):
551            L.append('yo!')
552            yield 'a'
553            yield 'b'
554        newgen = self._callFUT(gen())
555        self.assertEqual(L, ['yo!'])
556        self.assertEqual(list(newgen), ['a', 'b'])
557
558    def test_w_empty_generator(self):
559        def gen():
560            if False:
561                yield 'a'  # pragma: no cover
562        newgen = self._callFUT(gen())
563        self.assertEqual(list(newgen), [])
564
565    def test_w_iterator_having_close(self):
566        def gen():
567            yield 'a'
568            yield 'b'
569        iterable = DummyIterableWithClose(gen())
570        newgen = self._callFUT(iterable)
571        self.assertFalse(iterable._closed)
572        self.assertEqual(list(newgen), ['a', 'b'])
573        self.assertTrue(iterable._closed)
574
575class TestMakeTestMiddleware(unittest.TestCase):
576
577    def setUp(self):
578        import os
579        try:
580            del os.environ['WHO_LOG']
581        except KeyError:
582            pass
583
584    def tearDown(self):
585        import os
586        try:
587            del os.environ['WHO_LOG']
588        except KeyError:
589            pass
590
591    def _getFactory(self):
592        from repoze.who.middleware import make_test_middleware
593        return make_test_middleware
594
595    def test_it_no_WHO_LOG_in_environ(self):
596        app = DummyApp()
597        factory = self._getFactory()
598        global_conf = {'here': '/'}
599        middleware = factory(app, global_conf)
600        api_factory = middleware.api_factory
601        self.assertEqual(len(api_factory.identifiers), 2)
602        self.assertEqual(len(api_factory.authenticators), 1)
603        self.assertEqual(len(api_factory.challengers), 2)
604        self.assertEqual(len(api_factory.mdproviders), 0)
605        self.assertEqual(middleware.logger, None)
606
607    def test_it_w_WHO_LOG_in_environ(self):
608        import logging
609        import os
610        os.environ['WHO_LOG'] = '1'
611        app = DummyApp()
612        factory = self._getFactory()
613        global_conf = {'here': '/'}
614        middleware = factory(app, global_conf)
615        self.assertEqual(middleware.logger.getEffectiveLevel(), logging.DEBUG)
616
617class DummyApp(object):
618    environ = None
619    def __call__(self, environ, start_response):
620        self.environ = environ
621        return []
622
623class DummyWorkingApp(object):
624    def __init__(self, status, headers):
625        self.status = status
626        self.headers = headers
627
628    def __call__(self, environ, start_response):
629        self.environ = environ
630        start_response(self.status, self.headers)
631        return ['body']
632
633class DummyLogoutApp(object):
634    def __init__(self, status):
635        self.status = status
636
637    def __call__(self, environ, start_response):
638        self.environ = environ
639        api = environ['repoze.who.api']
640        headers = api.logout()
641        start_response(self.status, headers)
642        return ['body']
643
644class DummyGeneratorApp(object):
645    def __init__(self, status, headers):
646        self.status = status
647        self.headers = headers
648
649    def __call__(self, environ, start_response):
650        def gen(self=self, start_response=start_response):
651            self.environ = environ
652            start_response(self.status, self.headers)
653            yield 'body'
654        return gen()
655
656class DummyIterableWithClose(object):
657    _closed = False
658    def __init__(self, iterable):
659        self._iterable = iterable
660    def __iter__(self):
661        return iter(self._iterable)
662    def close(self):
663        self._closed = True
664
665class DummyIterableWithCloseApp(object):
666    def __init__(self, status, headers):
667        self.status = status
668        self.headers = headers
669        self._iterable = DummyIterableWithClose(['body'])
670
671    def __call__(self, environ, start_response):
672        self.environ = environ
673        start_response(self.status, self.headers)
674        return self._iterable
675
676class DummyIdentityResetApp(object):
677    def __init__(self, status, headers, new_identity):
678        self.status = status
679        self.headers = headers
680        self.new_identity = new_identity
681
682    def __call__(self, environ, start_response):
683        self.environ = environ
684        environ['repoze.who.identity']['login'] = 'fred'
685        environ['repoze.who.identity']['password'] = 'schooled'
686        start_response(self.status, self.headers)
687        return ['body']
688
689class DummyChallenger(object):
690    def __init__(self, app=None):
691        self.app = app
692
693    def challenge(self, environ, status, app_headers, forget_headers):
694        environ['challenged'] = self.app
695        return self.app
696
697class DummyIdentifier(object):
698    forgotten = False
699    remembered = False
700
701    def __init__(self, credentials=None, remember_headers=None,
702                 forget_headers=None, replace_app=None):
703        self.credentials = credentials
704        self.remember_headers = remember_headers
705        self.forget_headers = forget_headers
706        self.replace_app = replace_app
707
708    def identify(self, environ):
709        if self.replace_app:
710            environ['repoze.who.application'] = self.replace_app
711        return self.credentials
712
713    def forget(self, environ, identity):
714        self.forgotten = identity
715        return self.forget_headers
716
717    def remember(self, environ, identity):
718        self.remembered = identity
719        return self.remember_headers
720
721class DummyAuthenticator(object):
722    def authenticate(self, environ, credentials):
723        return credentials['login']
724
725class DummyRequestClassifier(object):
726    def __call__(self, environ):
727        return 'browser'
728
729class DummyChallengeDecider(object):
730    def __call__(self, environ, status, headers):
731        if status.startswith('401 '):
732            return True
733
734class DummyStartResponse(object):
735    def __call__(self, status, headers, exc_info=None):
736        self.status = status
737        self.headers = headers
738        self.exc_info = exc_info
739        return []
740
741class DummyMDProvider(object):
742    def __init__(self, metadata=None):
743        self._metadata = metadata
744
745    def add_metadata(self, environ, identity):
746        return identity.update(self._metadata)
747