1import pytest
2
3from .. import aclosing, async_generator, yield_, asynccontextmanager
4
5
6@async_generator
7async def async_range(count, closed_slot):
8    try:
9        for i in range(count):  # pragma: no branch
10            await yield_(i)
11    except GeneratorExit:
12        closed_slot[0] = True
13
14
15async def test_aclosing():
16    closed_slot = [False]
17    async with aclosing(async_range(10, closed_slot)) as gen:
18        it = iter(range(10))
19        async for item in gen:  # pragma: no branch
20            assert item == next(it)
21            if item == 4:
22                break
23    assert closed_slot[0]
24
25    closed_slot = [False]
26    try:
27        async with aclosing(async_range(10, closed_slot)) as gen:
28            it = iter(range(10))
29            async for item in gen:  # pragma: no branch
30                assert item == next(it)
31                if item == 4:
32                    raise ValueError()
33    except ValueError:
34        pass
35    assert closed_slot[0]
36
37
38async def test_contextmanager_do_not_unchain_non_stopiteration_exceptions():
39    @asynccontextmanager
40    @async_generator
41    async def manager_issue29692():
42        try:
43            await yield_()
44        except Exception as exc:
45            raise RuntimeError('issue29692:Chained') from exc
46
47    with pytest.raises(RuntimeError) as excinfo:
48        async with manager_issue29692():
49            raise ZeroDivisionError
50    assert excinfo.value.args[0] == 'issue29692:Chained'
51    assert isinstance(excinfo.value.__cause__, ZeroDivisionError)
52
53    # This is a little funky because of implementation details in
54    # async_generator It can all go away once we stop supporting Python3.5
55    with pytest.raises(RuntimeError) as excinfo:
56        async with manager_issue29692():
57            exc = StopIteration('issue29692:Unchained')
58            raise exc
59    assert excinfo.value.args[0] == 'issue29692:Chained'
60    cause = excinfo.value.__cause__
61    assert cause.args[0] == 'generator raised StopIteration'
62    assert cause.__cause__ is exc
63
64    with pytest.raises(StopAsyncIteration) as excinfo:
65        async with manager_issue29692():
66            raise StopAsyncIteration('issue29692:Unchained')
67    assert excinfo.value.args[0] == 'issue29692:Unchained'
68    assert excinfo.value.__cause__ is None
69
70    @asynccontextmanager
71    @async_generator
72    async def noop_async_context_manager():
73        await yield_()
74
75    with pytest.raises(StopIteration):
76        async with noop_async_context_manager():
77            raise StopIteration
78
79
80# Native async generators are only available from Python 3.6 and onwards
81nativeasyncgenerators = True
82try:
83    exec(
84        """
85@asynccontextmanager
86async def manager_issue29692_2():
87    try:
88        yield
89    except Exception as exc:
90        raise RuntimeError('issue29692:Chained') from exc
91"""
92    )
93except SyntaxError:
94    nativeasyncgenerators = False
95
96
97@pytest.mark.skipif(
98    not nativeasyncgenerators,
99    reason="Python < 3.6 doesn't have native async generators"
100)
101async def test_native_contextmanager_do_not_unchain_non_stopiteration_exceptions(
102):
103
104    with pytest.raises(RuntimeError) as excinfo:
105        async with manager_issue29692_2():
106            raise ZeroDivisionError
107    assert excinfo.value.args[0] == 'issue29692:Chained'
108    assert isinstance(excinfo.value.__cause__, ZeroDivisionError)
109
110    for cls in [StopIteration, StopAsyncIteration]:
111        with pytest.raises(cls) as excinfo:
112            async with manager_issue29692_2():
113                raise cls('issue29692:Unchained')
114        assert excinfo.value.args[0] == 'issue29692:Unchained'
115        assert excinfo.value.__cause__ is None
116
117
118async def test_asynccontextmanager_exception_passthrough():
119    # This was the cause of annoying coverage flapping, see gh-140
120    @asynccontextmanager
121    @async_generator
122    async def noop_async_context_manager():
123        await yield_()
124
125    for exc_type in [StopAsyncIteration, RuntimeError, ValueError]:
126        with pytest.raises(exc_type):
127            async with noop_async_context_manager():
128                raise exc_type
129
130    # And let's also check a boring nothing pass-through while we're at it
131    async with noop_async_context_manager():
132        pass
133
134
135async def test_asynccontextmanager_catches_exception():
136    @asynccontextmanager
137    @async_generator
138    async def catch_it():
139        with pytest.raises(ValueError):
140            await yield_()
141
142    async with catch_it():
143        raise ValueError
144
145
146async def test_asynccontextmanager_different_exception():
147    @asynccontextmanager
148    @async_generator
149    async def switch_it():
150        try:
151            await yield_()
152        except KeyError:
153            raise ValueError
154
155    with pytest.raises(ValueError):
156        async with switch_it():
157            raise KeyError
158
159
160async def test_asynccontextmanager_nice_message_on_sync_enter():
161    @asynccontextmanager
162    @async_generator
163    async def xxx():  # pragma: no cover
164        await yield_()
165
166    cm = xxx()
167
168    with pytest.raises(RuntimeError) as excinfo:
169        with cm:
170            pass  # pragma: no cover
171
172    assert "async with" in str(excinfo.value)
173
174    async with cm:
175        pass
176
177
178async def test_asynccontextmanager_no_yield():
179    @asynccontextmanager
180    @async_generator
181    async def yeehaw():
182        pass
183
184    with pytest.raises(RuntimeError) as excinfo:
185        async with yeehaw():
186            assert False  # pragma: no cover
187
188    assert "didn't yield" in str(excinfo.value)
189
190
191async def test_asynccontextmanager_too_many_yields():
192    closed_count = 0
193
194    @asynccontextmanager
195    @async_generator
196    async def doubleyield():
197        try:
198            await yield_()
199        except Exception:
200            pass
201        try:
202            await yield_()
203        finally:
204            nonlocal closed_count
205            closed_count += 1
206
207    with pytest.raises(RuntimeError) as excinfo:
208        async with doubleyield():
209            pass
210
211    assert "didn't stop" in str(excinfo.value)
212    assert closed_count == 1
213
214    with pytest.raises(RuntimeError) as excinfo:
215        async with doubleyield():
216            raise ValueError
217
218    assert "didn't stop after athrow" in str(excinfo.value)
219    assert closed_count == 2
220
221
222async def test_asynccontextmanager_requires_asyncgenfunction():
223    with pytest.raises(TypeError):
224
225        @asynccontextmanager
226        def syncgen():  # pragma: no cover
227            yield
228