1import pytest 2 3from .. import aclosing, async_generator, yield_, asynccontextmanager 4 5 6@async_generator 7async def async_range(count, closed_slot): 8 try: 9 for i in range(count): # pragma: no branch 10 await yield_(i) 11 except GeneratorExit: 12 closed_slot[0] = True 13 14 15async def test_aclosing(): 16 closed_slot = [False] 17 async with aclosing(async_range(10, closed_slot)) as gen: 18 it = iter(range(10)) 19 async for item in gen: # pragma: no branch 20 assert item == next(it) 21 if item == 4: 22 break 23 assert closed_slot[0] 24 25 closed_slot = [False] 26 try: 27 async with aclosing(async_range(10, closed_slot)) as gen: 28 it = iter(range(10)) 29 async for item in gen: # pragma: no branch 30 assert item == next(it) 31 if item == 4: 32 raise ValueError() 33 except ValueError: 34 pass 35 assert closed_slot[0] 36 37 38async def test_contextmanager_do_not_unchain_non_stopiteration_exceptions(): 39 @asynccontextmanager 40 @async_generator 41 async def manager_issue29692(): 42 try: 43 await yield_() 44 except Exception as exc: 45 raise RuntimeError('issue29692:Chained') from exc 46 47 with pytest.raises(RuntimeError) as excinfo: 48 async with manager_issue29692(): 49 raise ZeroDivisionError 50 assert excinfo.value.args[0] == 'issue29692:Chained' 51 assert isinstance(excinfo.value.__cause__, ZeroDivisionError) 52 53 # This is a little funky because of implementation details in 54 # async_generator It can all go away once we stop supporting Python3.5 55 with pytest.raises(RuntimeError) as excinfo: 56 async with manager_issue29692(): 57 exc = StopIteration('issue29692:Unchained') 58 raise exc 59 assert excinfo.value.args[0] == 'issue29692:Chained' 60 cause = excinfo.value.__cause__ 61 assert cause.args[0] == 'generator raised StopIteration' 62 assert cause.__cause__ is exc 63 64 with pytest.raises(StopAsyncIteration) as excinfo: 65 async with manager_issue29692(): 66 raise StopAsyncIteration('issue29692:Unchained') 67 assert excinfo.value.args[0] == 'issue29692:Unchained' 68 assert excinfo.value.__cause__ is None 69 70 @asynccontextmanager 71 @async_generator 72 async def noop_async_context_manager(): 73 await yield_() 74 75 with pytest.raises(StopIteration): 76 async with noop_async_context_manager(): 77 raise StopIteration 78 79 80# Native async generators are only available from Python 3.6 and onwards 81nativeasyncgenerators = True 82try: 83 exec( 84 """ 85@asynccontextmanager 86async def manager_issue29692_2(): 87 try: 88 yield 89 except Exception as exc: 90 raise RuntimeError('issue29692:Chained') from exc 91""" 92 ) 93except SyntaxError: 94 nativeasyncgenerators = False 95 96 97@pytest.mark.skipif( 98 not nativeasyncgenerators, 99 reason="Python < 3.6 doesn't have native async generators" 100) 101async def test_native_contextmanager_do_not_unchain_non_stopiteration_exceptions( 102): 103 104 with pytest.raises(RuntimeError) as excinfo: 105 async with manager_issue29692_2(): 106 raise ZeroDivisionError 107 assert excinfo.value.args[0] == 'issue29692:Chained' 108 assert isinstance(excinfo.value.__cause__, ZeroDivisionError) 109 110 for cls in [StopIteration, StopAsyncIteration]: 111 with pytest.raises(cls) as excinfo: 112 async with manager_issue29692_2(): 113 raise cls('issue29692:Unchained') 114 assert excinfo.value.args[0] == 'issue29692:Unchained' 115 assert excinfo.value.__cause__ is None 116 117 118async def test_asynccontextmanager_exception_passthrough(): 119 # This was the cause of annoying coverage flapping, see gh-140 120 @asynccontextmanager 121 @async_generator 122 async def noop_async_context_manager(): 123 await yield_() 124 125 for exc_type in [StopAsyncIteration, RuntimeError, ValueError]: 126 with pytest.raises(exc_type): 127 async with noop_async_context_manager(): 128 raise exc_type 129 130 # And let's also check a boring nothing pass-through while we're at it 131 async with noop_async_context_manager(): 132 pass 133 134 135async def test_asynccontextmanager_catches_exception(): 136 @asynccontextmanager 137 @async_generator 138 async def catch_it(): 139 with pytest.raises(ValueError): 140 await yield_() 141 142 async with catch_it(): 143 raise ValueError 144 145 146async def test_asynccontextmanager_different_exception(): 147 @asynccontextmanager 148 @async_generator 149 async def switch_it(): 150 try: 151 await yield_() 152 except KeyError: 153 raise ValueError 154 155 with pytest.raises(ValueError): 156 async with switch_it(): 157 raise KeyError 158 159 160async def test_asynccontextmanager_nice_message_on_sync_enter(): 161 @asynccontextmanager 162 @async_generator 163 async def xxx(): # pragma: no cover 164 await yield_() 165 166 cm = xxx() 167 168 with pytest.raises(RuntimeError) as excinfo: 169 with cm: 170 pass # pragma: no cover 171 172 assert "async with" in str(excinfo.value) 173 174 async with cm: 175 pass 176 177 178async def test_asynccontextmanager_no_yield(): 179 @asynccontextmanager 180 @async_generator 181 async def yeehaw(): 182 pass 183 184 with pytest.raises(RuntimeError) as excinfo: 185 async with yeehaw(): 186 assert False # pragma: no cover 187 188 assert "didn't yield" in str(excinfo.value) 189 190 191async def test_asynccontextmanager_too_many_yields(): 192 closed_count = 0 193 194 @asynccontextmanager 195 @async_generator 196 async def doubleyield(): 197 try: 198 await yield_() 199 except Exception: 200 pass 201 try: 202 await yield_() 203 finally: 204 nonlocal closed_count 205 closed_count += 1 206 207 with pytest.raises(RuntimeError) as excinfo: 208 async with doubleyield(): 209 pass 210 211 assert "didn't stop" in str(excinfo.value) 212 assert closed_count == 1 213 214 with pytest.raises(RuntimeError) as excinfo: 215 async with doubleyield(): 216 raise ValueError 217 218 assert "didn't stop after athrow" in str(excinfo.value) 219 assert closed_count == 2 220 221 222async def test_asynccontextmanager_requires_asyncgenfunction(): 223 with pytest.raises(TypeError): 224 225 @asynccontextmanager 226 def syncgen(): # pragma: no cover 227 yield 228