1"""Unit tests for contextlib.py, and other context managers."""
2
3import io
4import sys
5import tempfile
6import threading
7import unittest
8from contextlib import *  # Tests __all__
9from test import support
10import weakref
11
12
13class TestAbstractContextManager(unittest.TestCase):
14
15    def test_enter(self):
16        class DefaultEnter(AbstractContextManager):
17            def __exit__(self, *args):
18                super().__exit__(*args)
19
20        manager = DefaultEnter()
21        self.assertIs(manager.__enter__(), manager)
22
23    def test_exit_is_abstract(self):
24        class MissingExit(AbstractContextManager):
25            pass
26
27        with self.assertRaises(TypeError):
28            MissingExit()
29
30    def test_structural_subclassing(self):
31        class ManagerFromScratch:
32            def __enter__(self):
33                return self
34            def __exit__(self, exc_type, exc_value, traceback):
35                return None
36
37        self.assertTrue(issubclass(ManagerFromScratch, AbstractContextManager))
38
39        class DefaultEnter(AbstractContextManager):
40            def __exit__(self, *args):
41                super().__exit__(*args)
42
43        self.assertTrue(issubclass(DefaultEnter, AbstractContextManager))
44
45        class NoEnter(ManagerFromScratch):
46            __enter__ = None
47
48        self.assertFalse(issubclass(NoEnter, AbstractContextManager))
49
50        class NoExit(ManagerFromScratch):
51            __exit__ = None
52
53        self.assertFalse(issubclass(NoExit, AbstractContextManager))
54
55
56class ContextManagerTestCase(unittest.TestCase):
57
58    def test_contextmanager_plain(self):
59        state = []
60        @contextmanager
61        def woohoo():
62            state.append(1)
63            yield 42
64            state.append(999)
65        with woohoo() as x:
66            self.assertEqual(state, [1])
67            self.assertEqual(x, 42)
68            state.append(x)
69        self.assertEqual(state, [1, 42, 999])
70
71    def test_contextmanager_finally(self):
72        state = []
73        @contextmanager
74        def woohoo():
75            state.append(1)
76            try:
77                yield 42
78            finally:
79                state.append(999)
80        with self.assertRaises(ZeroDivisionError):
81            with woohoo() as x:
82                self.assertEqual(state, [1])
83                self.assertEqual(x, 42)
84                state.append(x)
85                raise ZeroDivisionError()
86        self.assertEqual(state, [1, 42, 999])
87
88    def test_contextmanager_no_reraise(self):
89        @contextmanager
90        def whee():
91            yield
92        ctx = whee()
93        ctx.__enter__()
94        # Calling __exit__ should not result in an exception
95        self.assertFalse(ctx.__exit__(TypeError, TypeError("foo"), None))
96
97    def test_contextmanager_trap_yield_after_throw(self):
98        @contextmanager
99        def whoo():
100            try:
101                yield
102            except:
103                yield
104        ctx = whoo()
105        ctx.__enter__()
106        self.assertRaises(
107            RuntimeError, ctx.__exit__, TypeError, TypeError("foo"), None
108        )
109
110    def test_contextmanager_except(self):
111        state = []
112        @contextmanager
113        def woohoo():
114            state.append(1)
115            try:
116                yield 42
117            except ZeroDivisionError as e:
118                state.append(e.args[0])
119                self.assertEqual(state, [1, 42, 999])
120        with woohoo() as x:
121            self.assertEqual(state, [1])
122            self.assertEqual(x, 42)
123            state.append(x)
124            raise ZeroDivisionError(999)
125        self.assertEqual(state, [1, 42, 999])
126
127    def test_contextmanager_except_stopiter(self):
128        stop_exc = StopIteration('spam')
129        @contextmanager
130        def woohoo():
131            yield
132        try:
133            with self.assertWarnsRegex(DeprecationWarning,
134                                       "StopIteration"):
135                with woohoo():
136                    raise stop_exc
137        except Exception as ex:
138            self.assertIs(ex, stop_exc)
139        else:
140            self.fail('StopIteration was suppressed')
141
142    def test_contextmanager_except_pep479(self):
143        code = """\
144from __future__ import generator_stop
145from contextlib import contextmanager
146@contextmanager
147def woohoo():
148    yield
149"""
150        locals = {}
151        exec(code, locals, locals)
152        woohoo = locals['woohoo']
153
154        stop_exc = StopIteration('spam')
155        try:
156            with woohoo():
157                raise stop_exc
158        except Exception as ex:
159            self.assertIs(ex, stop_exc)
160        else:
161            self.fail('StopIteration was suppressed')
162
163    def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(self):
164        @contextmanager
165        def test_issue29692():
166            try:
167                yield
168            except Exception as exc:
169                raise RuntimeError('issue29692:Chained') from exc
170        try:
171            with test_issue29692():
172                raise ZeroDivisionError
173        except Exception as ex:
174            self.assertIs(type(ex), RuntimeError)
175            self.assertEqual(ex.args[0], 'issue29692:Chained')
176            self.assertIsInstance(ex.__cause__, ZeroDivisionError)
177
178        try:
179            with test_issue29692():
180                raise StopIteration('issue29692:Unchained')
181        except Exception as ex:
182            self.assertIs(type(ex), StopIteration)
183            self.assertEqual(ex.args[0], 'issue29692:Unchained')
184            self.assertIsNone(ex.__cause__)
185
186    def _create_contextmanager_attribs(self):
187        def attribs(**kw):
188            def decorate(func):
189                for k,v in kw.items():
190                    setattr(func,k,v)
191                return func
192            return decorate
193        @contextmanager
194        @attribs(foo='bar')
195        def baz(spam):
196            """Whee!"""
197        return baz
198
199    def test_contextmanager_attribs(self):
200        baz = self._create_contextmanager_attribs()
201        self.assertEqual(baz.__name__,'baz')
202        self.assertEqual(baz.foo, 'bar')
203
204    @support.requires_docstrings
205    def test_contextmanager_doc_attrib(self):
206        baz = self._create_contextmanager_attribs()
207        self.assertEqual(baz.__doc__, "Whee!")
208
209    @support.requires_docstrings
210    def test_instance_docstring_given_cm_docstring(self):
211        baz = self._create_contextmanager_attribs()(None)
212        self.assertEqual(baz.__doc__, "Whee!")
213
214    def test_keywords(self):
215        # Ensure no keyword arguments are inhibited
216        @contextmanager
217        def woohoo(self, func, args, kwds):
218            yield (self, func, args, kwds)
219        with woohoo(self=11, func=22, args=33, kwds=44) as target:
220            self.assertEqual(target, (11, 22, 33, 44))
221
222    def test_nokeepref(self):
223        class A:
224            pass
225
226        @contextmanager
227        def woohoo(a, b):
228            a = weakref.ref(a)
229            b = weakref.ref(b)
230            self.assertIsNone(a())
231            self.assertIsNone(b())
232            yield
233
234        with woohoo(A(), b=A()):
235            pass
236
237    def test_param_errors(self):
238        @contextmanager
239        def woohoo(a, *, b):
240            yield
241
242        with self.assertRaises(TypeError):
243            woohoo()
244        with self.assertRaises(TypeError):
245            woohoo(3, 5)
246        with self.assertRaises(TypeError):
247            woohoo(b=3)
248
249    def test_recursive(self):
250        depth = 0
251        @contextmanager
252        def woohoo():
253            nonlocal depth
254            before = depth
255            depth += 1
256            yield
257            depth -= 1
258            self.assertEqual(depth, before)
259
260        @woohoo()
261        def recursive():
262            if depth < 10:
263                recursive()
264
265        recursive()
266        self.assertEqual(depth, 0)
267
268
269class ClosingTestCase(unittest.TestCase):
270
271    @support.requires_docstrings
272    def test_instance_docs(self):
273        # Issue 19330: ensure context manager instances have good docstrings
274        cm_docstring = closing.__doc__
275        obj = closing(None)
276        self.assertEqual(obj.__doc__, cm_docstring)
277
278    def test_closing(self):
279        state = []
280        class C:
281            def close(self):
282                state.append(1)
283        x = C()
284        self.assertEqual(state, [])
285        with closing(x) as y:
286            self.assertEqual(x, y)
287        self.assertEqual(state, [1])
288
289    def test_closing_error(self):
290        state = []
291        class C:
292            def close(self):
293                state.append(1)
294        x = C()
295        self.assertEqual(state, [])
296        with self.assertRaises(ZeroDivisionError):
297            with closing(x) as y:
298                self.assertEqual(x, y)
299                1 / 0
300        self.assertEqual(state, [1])
301
302
303class NullcontextTestCase(unittest.TestCase):
304    def test_nullcontext(self):
305        class C:
306            pass
307        c = C()
308        with nullcontext(c) as c_in:
309            self.assertIs(c_in, c)
310
311
312class FileContextTestCase(unittest.TestCase):
313
314    def testWithOpen(self):
315        tfn = tempfile.mktemp()
316        try:
317            f = None
318            with open(tfn, "w") as f:
319                self.assertFalse(f.closed)
320                f.write("Booh\n")
321            self.assertTrue(f.closed)
322            f = None
323            with self.assertRaises(ZeroDivisionError):
324                with open(tfn, "r") as f:
325                    self.assertFalse(f.closed)
326                    self.assertEqual(f.read(), "Booh\n")
327                    1 / 0
328            self.assertTrue(f.closed)
329        finally:
330            support.unlink(tfn)
331
332class LockContextTestCase(unittest.TestCase):
333
334    def boilerPlate(self, lock, locked):
335        self.assertFalse(locked())
336        with lock:
337            self.assertTrue(locked())
338        self.assertFalse(locked())
339        with self.assertRaises(ZeroDivisionError):
340            with lock:
341                self.assertTrue(locked())
342                1 / 0
343        self.assertFalse(locked())
344
345    def testWithLock(self):
346        lock = threading.Lock()
347        self.boilerPlate(lock, lock.locked)
348
349    def testWithRLock(self):
350        lock = threading.RLock()
351        self.boilerPlate(lock, lock._is_owned)
352
353    def testWithCondition(self):
354        lock = threading.Condition()
355        def locked():
356            return lock._is_owned()
357        self.boilerPlate(lock, locked)
358
359    def testWithSemaphore(self):
360        lock = threading.Semaphore()
361        def locked():
362            if lock.acquire(False):
363                lock.release()
364                return False
365            else:
366                return True
367        self.boilerPlate(lock, locked)
368
369    def testWithBoundedSemaphore(self):
370        lock = threading.BoundedSemaphore()
371        def locked():
372            if lock.acquire(False):
373                lock.release()
374                return False
375            else:
376                return True
377        self.boilerPlate(lock, locked)
378
379
380class mycontext(ContextDecorator):
381    """Example decoration-compatible context manager for testing"""
382    started = False
383    exc = None
384    catch = False
385
386    def __enter__(self):
387        self.started = True
388        return self
389
390    def __exit__(self, *exc):
391        self.exc = exc
392        return self.catch
393
394
395class TestContextDecorator(unittest.TestCase):
396
397    @support.requires_docstrings
398    def test_instance_docs(self):
399        # Issue 19330: ensure context manager instances have good docstrings
400        cm_docstring = mycontext.__doc__
401        obj = mycontext()
402        self.assertEqual(obj.__doc__, cm_docstring)
403
404    def test_contextdecorator(self):
405        context = mycontext()
406        with context as result:
407            self.assertIs(result, context)
408            self.assertTrue(context.started)
409
410        self.assertEqual(context.exc, (None, None, None))
411
412
413    def test_contextdecorator_with_exception(self):
414        context = mycontext()
415
416        with self.assertRaisesRegex(NameError, 'foo'):
417            with context:
418                raise NameError('foo')
419        self.assertIsNotNone(context.exc)
420        self.assertIs(context.exc[0], NameError)
421
422        context = mycontext()
423        context.catch = True
424        with context:
425            raise NameError('foo')
426        self.assertIsNotNone(context.exc)
427        self.assertIs(context.exc[0], NameError)
428
429
430    def test_decorator(self):
431        context = mycontext()
432
433        @context
434        def test():
435            self.assertIsNone(context.exc)
436            self.assertTrue(context.started)
437        test()
438        self.assertEqual(context.exc, (None, None, None))
439
440
441    def test_decorator_with_exception(self):
442        context = mycontext()
443
444        @context
445        def test():
446            self.assertIsNone(context.exc)
447            self.assertTrue(context.started)
448            raise NameError('foo')
449
450        with self.assertRaisesRegex(NameError, 'foo'):
451            test()
452        self.assertIsNotNone(context.exc)
453        self.assertIs(context.exc[0], NameError)
454
455
456    def test_decorating_method(self):
457        context = mycontext()
458
459        class Test(object):
460
461            @context
462            def method(self, a, b, c=None):
463                self.a = a
464                self.b = b
465                self.c = c
466
467        # these tests are for argument passing when used as a decorator
468        test = Test()
469        test.method(1, 2)
470        self.assertEqual(test.a, 1)
471        self.assertEqual(test.b, 2)
472        self.assertEqual(test.c, None)
473
474        test = Test()
475        test.method('a', 'b', 'c')
476        self.assertEqual(test.a, 'a')
477        self.assertEqual(test.b, 'b')
478        self.assertEqual(test.c, 'c')
479
480        test = Test()
481        test.method(a=1, b=2)
482        self.assertEqual(test.a, 1)
483        self.assertEqual(test.b, 2)
484
485
486    def test_typo_enter(self):
487        class mycontext(ContextDecorator):
488            def __unter__(self):
489                pass
490            def __exit__(self, *exc):
491                pass
492
493        with self.assertRaises(AttributeError):
494            with mycontext():
495                pass
496
497
498    def test_typo_exit(self):
499        class mycontext(ContextDecorator):
500            def __enter__(self):
501                pass
502            def __uxit__(self, *exc):
503                pass
504
505        with self.assertRaises(AttributeError):
506            with mycontext():
507                pass
508
509
510    def test_contextdecorator_as_mixin(self):
511        class somecontext(object):
512            started = False
513            exc = None
514
515            def __enter__(self):
516                self.started = True
517                return self
518
519            def __exit__(self, *exc):
520                self.exc = exc
521
522        class mycontext(somecontext, ContextDecorator):
523            pass
524
525        context = mycontext()
526        @context
527        def test():
528            self.assertIsNone(context.exc)
529            self.assertTrue(context.started)
530        test()
531        self.assertEqual(context.exc, (None, None, None))
532
533
534    def test_contextmanager_as_decorator(self):
535        @contextmanager
536        def woohoo(y):
537            state.append(y)
538            yield
539            state.append(999)
540
541        state = []
542        @woohoo(1)
543        def test(x):
544            self.assertEqual(state, [1])
545            state.append(x)
546        test('something')
547        self.assertEqual(state, [1, 'something', 999])
548
549        # Issue #11647: Ensure the decorated function is 'reusable'
550        state = []
551        test('something else')
552        self.assertEqual(state, [1, 'something else', 999])
553
554
555class TestBaseExitStack:
556    exit_stack = None
557
558    @support.requires_docstrings
559    def test_instance_docs(self):
560        # Issue 19330: ensure context manager instances have good docstrings
561        cm_docstring = self.exit_stack.__doc__
562        obj = self.exit_stack()
563        self.assertEqual(obj.__doc__, cm_docstring)
564
565    def test_no_resources(self):
566        with self.exit_stack():
567            pass
568
569    def test_callback(self):
570        expected = [
571            ((), {}),
572            ((1,), {}),
573            ((1,2), {}),
574            ((), dict(example=1)),
575            ((1,), dict(example=1)),
576            ((1,2), dict(example=1)),
577            ((1,2), dict(self=3, callback=4)),
578        ]
579        result = []
580        def _exit(*args, **kwds):
581            """Test metadata propagation"""
582            result.append((args, kwds))
583        with self.exit_stack() as stack:
584            for args, kwds in reversed(expected):
585                if args and kwds:
586                    f = stack.callback(_exit, *args, **kwds)
587                elif args:
588                    f = stack.callback(_exit, *args)
589                elif kwds:
590                    f = stack.callback(_exit, **kwds)
591                else:
592                    f = stack.callback(_exit)
593                self.assertIs(f, _exit)
594            for wrapper in stack._exit_callbacks:
595                self.assertIs(wrapper[1].__wrapped__, _exit)
596                self.assertNotEqual(wrapper[1].__name__, _exit.__name__)
597                self.assertIsNone(wrapper[1].__doc__, _exit.__doc__)
598        self.assertEqual(result, expected)
599
600        result = []
601        with self.exit_stack() as stack:
602            with self.assertRaises(TypeError):
603                stack.callback(arg=1)
604            with self.assertRaises(TypeError):
605                self.exit_stack.callback(arg=2)
606            with self.assertWarns(DeprecationWarning):
607                stack.callback(callback=_exit, arg=3)
608        self.assertEqual(result, [((), {'arg': 3})])
609
610    def test_push(self):
611        exc_raised = ZeroDivisionError
612        def _expect_exc(exc_type, exc, exc_tb):
613            self.assertIs(exc_type, exc_raised)
614        def _suppress_exc(*exc_details):
615            return True
616        def _expect_ok(exc_type, exc, exc_tb):
617            self.assertIsNone(exc_type)
618            self.assertIsNone(exc)
619            self.assertIsNone(exc_tb)
620        class ExitCM(object):
621            def __init__(self, check_exc):
622                self.check_exc = check_exc
623            def __enter__(self):
624                self.fail("Should not be called!")
625            def __exit__(self, *exc_details):
626                self.check_exc(*exc_details)
627        with self.exit_stack() as stack:
628            stack.push(_expect_ok)
629            self.assertIs(stack._exit_callbacks[-1][1], _expect_ok)
630            cm = ExitCM(_expect_ok)
631            stack.push(cm)
632            self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
633            stack.push(_suppress_exc)
634            self.assertIs(stack._exit_callbacks[-1][1], _suppress_exc)
635            cm = ExitCM(_expect_exc)
636            stack.push(cm)
637            self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
638            stack.push(_expect_exc)
639            self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
640            stack.push(_expect_exc)
641            self.assertIs(stack._exit_callbacks[-1][1], _expect_exc)
642            1/0
643
644    def test_enter_context(self):
645        class TestCM(object):
646            def __enter__(self):
647                result.append(1)
648            def __exit__(self, *exc_details):
649                result.append(3)
650
651        result = []
652        cm = TestCM()
653        with self.exit_stack() as stack:
654            @stack.callback  # Registered first => cleaned up last
655            def _exit():
656                result.append(4)
657            self.assertIsNotNone(_exit)
658            stack.enter_context(cm)
659            self.assertIs(stack._exit_callbacks[-1][1].__self__, cm)
660            result.append(2)
661        self.assertEqual(result, [1, 2, 3, 4])
662
663    def test_close(self):
664        result = []
665        with self.exit_stack() as stack:
666            @stack.callback
667            def _exit():
668                result.append(1)
669            self.assertIsNotNone(_exit)
670            stack.close()
671            result.append(2)
672        self.assertEqual(result, [1, 2])
673
674    def test_pop_all(self):
675        result = []
676        with self.exit_stack() as stack:
677            @stack.callback
678            def _exit():
679                result.append(3)
680            self.assertIsNotNone(_exit)
681            new_stack = stack.pop_all()
682            result.append(1)
683        result.append(2)
684        new_stack.close()
685        self.assertEqual(result, [1, 2, 3])
686
687    def test_exit_raise(self):
688        with self.assertRaises(ZeroDivisionError):
689            with self.exit_stack() as stack:
690                stack.push(lambda *exc: False)
691                1/0
692
693    def test_exit_suppress(self):
694        with self.exit_stack() as stack:
695            stack.push(lambda *exc: True)
696            1/0
697
698    def test_exit_exception_chaining_reference(self):
699        # Sanity check to make sure that ExitStack chaining matches
700        # actual nested with statements
701        class RaiseExc:
702            def __init__(self, exc):
703                self.exc = exc
704            def __enter__(self):
705                return self
706            def __exit__(self, *exc_details):
707                raise self.exc
708
709        class RaiseExcWithContext:
710            def __init__(self, outer, inner):
711                self.outer = outer
712                self.inner = inner
713            def __enter__(self):
714                return self
715            def __exit__(self, *exc_details):
716                try:
717                    raise self.inner
718                except:
719                    raise self.outer
720
721        class SuppressExc:
722            def __enter__(self):
723                return self
724            def __exit__(self, *exc_details):
725                type(self).saved_details = exc_details
726                return True
727
728        try:
729            with RaiseExc(IndexError):
730                with RaiseExcWithContext(KeyError, AttributeError):
731                    with SuppressExc():
732                        with RaiseExc(ValueError):
733                            1 / 0
734        except IndexError as exc:
735            self.assertIsInstance(exc.__context__, KeyError)
736            self.assertIsInstance(exc.__context__.__context__, AttributeError)
737            # Inner exceptions were suppressed
738            self.assertIsNone(exc.__context__.__context__.__context__)
739        else:
740            self.fail("Expected IndexError, but no exception was raised")
741        # Check the inner exceptions
742        inner_exc = SuppressExc.saved_details[1]
743        self.assertIsInstance(inner_exc, ValueError)
744        self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
745
746    def test_exit_exception_chaining(self):
747        # Ensure exception chaining matches the reference behaviour
748        def raise_exc(exc):
749            raise exc
750
751        saved_details = None
752        def suppress_exc(*exc_details):
753            nonlocal saved_details
754            saved_details = exc_details
755            return True
756
757        try:
758            with self.exit_stack() as stack:
759                stack.callback(raise_exc, IndexError)
760                stack.callback(raise_exc, KeyError)
761                stack.callback(raise_exc, AttributeError)
762                stack.push(suppress_exc)
763                stack.callback(raise_exc, ValueError)
764                1 / 0
765        except IndexError as exc:
766            self.assertIsInstance(exc.__context__, KeyError)
767            self.assertIsInstance(exc.__context__.__context__, AttributeError)
768            # Inner exceptions were suppressed
769            self.assertIsNone(exc.__context__.__context__.__context__)
770        else:
771            self.fail("Expected IndexError, but no exception was raised")
772        # Check the inner exceptions
773        inner_exc = saved_details[1]
774        self.assertIsInstance(inner_exc, ValueError)
775        self.assertIsInstance(inner_exc.__context__, ZeroDivisionError)
776
777    def test_exit_exception_non_suppressing(self):
778        # http://bugs.python.org/issue19092
779        def raise_exc(exc):
780            raise exc
781
782        def suppress_exc(*exc_details):
783            return True
784
785        try:
786            with self.exit_stack() as stack:
787                stack.callback(lambda: None)
788                stack.callback(raise_exc, IndexError)
789        except Exception as exc:
790            self.assertIsInstance(exc, IndexError)
791        else:
792            self.fail("Expected IndexError, but no exception was raised")
793
794        try:
795            with self.exit_stack() as stack:
796                stack.callback(raise_exc, KeyError)
797                stack.push(suppress_exc)
798                stack.callback(raise_exc, IndexError)
799        except Exception as exc:
800            self.assertIsInstance(exc, KeyError)
801        else:
802            self.fail("Expected KeyError, but no exception was raised")
803
804    def test_exit_exception_with_correct_context(self):
805        # http://bugs.python.org/issue20317
806        @contextmanager
807        def gets_the_context_right(exc):
808            try:
809                yield
810            finally:
811                raise exc
812
813        exc1 = Exception(1)
814        exc2 = Exception(2)
815        exc3 = Exception(3)
816        exc4 = Exception(4)
817
818        # The contextmanager already fixes the context, so prior to the
819        # fix, ExitStack would try to fix it *again* and get into an
820        # infinite self-referential loop
821        try:
822            with self.exit_stack() as stack:
823                stack.enter_context(gets_the_context_right(exc4))
824                stack.enter_context(gets_the_context_right(exc3))
825                stack.enter_context(gets_the_context_right(exc2))
826                raise exc1
827        except Exception as exc:
828            self.assertIs(exc, exc4)
829            self.assertIs(exc.__context__, exc3)
830            self.assertIs(exc.__context__.__context__, exc2)
831            self.assertIs(exc.__context__.__context__.__context__, exc1)
832            self.assertIsNone(
833                       exc.__context__.__context__.__context__.__context__)
834
835    def test_exit_exception_with_existing_context(self):
836        # Addresses a lack of test coverage discovered after checking in a
837        # fix for issue 20317 that still contained debugging code.
838        def raise_nested(inner_exc, outer_exc):
839            try:
840                raise inner_exc
841            finally:
842                raise outer_exc
843        exc1 = Exception(1)
844        exc2 = Exception(2)
845        exc3 = Exception(3)
846        exc4 = Exception(4)
847        exc5 = Exception(5)
848        try:
849            with self.exit_stack() as stack:
850                stack.callback(raise_nested, exc4, exc5)
851                stack.callback(raise_nested, exc2, exc3)
852                raise exc1
853        except Exception as exc:
854            self.assertIs(exc, exc5)
855            self.assertIs(exc.__context__, exc4)
856            self.assertIs(exc.__context__.__context__, exc3)
857            self.assertIs(exc.__context__.__context__.__context__, exc2)
858            self.assertIs(
859                 exc.__context__.__context__.__context__.__context__, exc1)
860            self.assertIsNone(
861                exc.__context__.__context__.__context__.__context__.__context__)
862
863    def test_body_exception_suppress(self):
864        def suppress_exc(*exc_details):
865            return True
866        try:
867            with self.exit_stack() as stack:
868                stack.push(suppress_exc)
869                1/0
870        except IndexError as exc:
871            self.fail("Expected no exception, got IndexError")
872
873    def test_exit_exception_chaining_suppress(self):
874        with self.exit_stack() as stack:
875            stack.push(lambda *exc: True)
876            stack.push(lambda *exc: 1/0)
877            stack.push(lambda *exc: {}[1])
878
879    def test_excessive_nesting(self):
880        # The original implementation would die with RecursionError here
881        with self.exit_stack() as stack:
882            for i in range(10000):
883                stack.callback(int)
884
885    def test_instance_bypass(self):
886        class Example(object): pass
887        cm = Example()
888        cm.__exit__ = object()
889        stack = self.exit_stack()
890        self.assertRaises(AttributeError, stack.enter_context, cm)
891        stack.push(cm)
892        self.assertIs(stack._exit_callbacks[-1][1], cm)
893
894    def test_dont_reraise_RuntimeError(self):
895        # https://bugs.python.org/issue27122
896        class UniqueException(Exception): pass
897        class UniqueRuntimeError(RuntimeError): pass
898
899        @contextmanager
900        def second():
901            try:
902                yield 1
903            except Exception as exc:
904                raise UniqueException("new exception") from exc
905
906        @contextmanager
907        def first():
908            try:
909                yield 1
910            except Exception as exc:
911                raise exc
912
913        # The UniqueRuntimeError should be caught by second()'s exception
914        # handler which chain raised a new UniqueException.
915        with self.assertRaises(UniqueException) as err_ctx:
916            with self.exit_stack() as es_ctx:
917                es_ctx.enter_context(second())
918                es_ctx.enter_context(first())
919                raise UniqueRuntimeError("please no infinite loop.")
920
921        exc = err_ctx.exception
922        self.assertIsInstance(exc, UniqueException)
923        self.assertIsInstance(exc.__context__, UniqueRuntimeError)
924        self.assertIsNone(exc.__context__.__context__)
925        self.assertIsNone(exc.__context__.__cause__)
926        self.assertIs(exc.__cause__, exc.__context__)
927
928
929class TestExitStack(TestBaseExitStack, unittest.TestCase):
930    exit_stack = ExitStack
931
932
933class TestRedirectStream:
934
935    redirect_stream = None
936    orig_stream = None
937
938    @support.requires_docstrings
939    def test_instance_docs(self):
940        # Issue 19330: ensure context manager instances have good docstrings
941        cm_docstring = self.redirect_stream.__doc__
942        obj = self.redirect_stream(None)
943        self.assertEqual(obj.__doc__, cm_docstring)
944
945    def test_no_redirect_in_init(self):
946        orig_stdout = getattr(sys, self.orig_stream)
947        self.redirect_stream(None)
948        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
949
950    def test_redirect_to_string_io(self):
951        f = io.StringIO()
952        msg = "Consider an API like help(), which prints directly to stdout"
953        orig_stdout = getattr(sys, self.orig_stream)
954        with self.redirect_stream(f):
955            print(msg, file=getattr(sys, self.orig_stream))
956        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
957        s = f.getvalue().strip()
958        self.assertEqual(s, msg)
959
960    def test_enter_result_is_target(self):
961        f = io.StringIO()
962        with self.redirect_stream(f) as enter_result:
963            self.assertIs(enter_result, f)
964
965    def test_cm_is_reusable(self):
966        f = io.StringIO()
967        write_to_f = self.redirect_stream(f)
968        orig_stdout = getattr(sys, self.orig_stream)
969        with write_to_f:
970            print("Hello", end=" ", file=getattr(sys, self.orig_stream))
971        with write_to_f:
972            print("World!", file=getattr(sys, self.orig_stream))
973        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
974        s = f.getvalue()
975        self.assertEqual(s, "Hello World!\n")
976
977    def test_cm_is_reentrant(self):
978        f = io.StringIO()
979        write_to_f = self.redirect_stream(f)
980        orig_stdout = getattr(sys, self.orig_stream)
981        with write_to_f:
982            print("Hello", end=" ", file=getattr(sys, self.orig_stream))
983            with write_to_f:
984                print("World!", file=getattr(sys, self.orig_stream))
985        self.assertIs(getattr(sys, self.orig_stream), orig_stdout)
986        s = f.getvalue()
987        self.assertEqual(s, "Hello World!\n")
988
989
990class TestRedirectStdout(TestRedirectStream, unittest.TestCase):
991
992    redirect_stream = redirect_stdout
993    orig_stream = "stdout"
994
995
996class TestRedirectStderr(TestRedirectStream, unittest.TestCase):
997
998    redirect_stream = redirect_stderr
999    orig_stream = "stderr"
1000
1001
1002class TestSuppress(unittest.TestCase):
1003
1004    @support.requires_docstrings
1005    def test_instance_docs(self):
1006        # Issue 19330: ensure context manager instances have good docstrings
1007        cm_docstring = suppress.__doc__
1008        obj = suppress()
1009        self.assertEqual(obj.__doc__, cm_docstring)
1010
1011    def test_no_result_from_enter(self):
1012        with suppress(ValueError) as enter_result:
1013            self.assertIsNone(enter_result)
1014
1015    def test_no_exception(self):
1016        with suppress(ValueError):
1017            self.assertEqual(pow(2, 5), 32)
1018
1019    def test_exact_exception(self):
1020        with suppress(TypeError):
1021            len(5)
1022
1023    def test_exception_hierarchy(self):
1024        with suppress(LookupError):
1025            'Hello'[50]
1026
1027    def test_other_exception(self):
1028        with self.assertRaises(ZeroDivisionError):
1029            with suppress(TypeError):
1030                1/0
1031
1032    def test_no_args(self):
1033        with self.assertRaises(ZeroDivisionError):
1034            with suppress():
1035                1/0
1036
1037    def test_multiple_exception_args(self):
1038        with suppress(ZeroDivisionError, TypeError):
1039            1/0
1040        with suppress(ZeroDivisionError, TypeError):
1041            len(5)
1042
1043    def test_cm_is_reentrant(self):
1044        ignore_exceptions = suppress(Exception)
1045        with ignore_exceptions:
1046            pass
1047        with ignore_exceptions:
1048            len(5)
1049        with ignore_exceptions:
1050            with ignore_exceptions: # Check nested usage
1051                len(5)
1052            outer_continued = True
1053            1/0
1054        self.assertTrue(outer_continued)
1055
1056if __name__ == "__main__":
1057    unittest.main()
1058