1import inspect 2import types 3import unittest 4import contextlib 5 6from test.support.import_helper import import_module 7from test.support import gc_collect 8asyncio = import_module("asyncio") 9 10 11_no_default = object() 12 13 14class AwaitException(Exception): 15 pass 16 17 18@types.coroutine 19def awaitable(*, throw=False): 20 if throw: 21 yield ('throw',) 22 else: 23 yield ('result',) 24 25 26def run_until_complete(coro): 27 exc = False 28 while True: 29 try: 30 if exc: 31 exc = False 32 fut = coro.throw(AwaitException) 33 else: 34 fut = coro.send(None) 35 except StopIteration as ex: 36 return ex.args[0] 37 38 if fut == ('throw',): 39 exc = True 40 41 42def to_list(gen): 43 async def iterate(): 44 res = [] 45 async for i in gen: 46 res.append(i) 47 return res 48 49 return run_until_complete(iterate()) 50 51 52def py_anext(iterator, default=_no_default): 53 """Pure-Python implementation of anext() for testing purposes. 54 55 Closely matches the builtin anext() C implementation. 56 Can be used to compare the built-in implementation of the inner 57 coroutines machinery to C-implementation of __anext__() and send() 58 or throw() on the returned generator. 59 """ 60 61 try: 62 __anext__ = type(iterator).__anext__ 63 except AttributeError: 64 raise TypeError(f'{iterator!r} is not an async iterator') 65 66 if default is _no_default: 67 return __anext__(iterator) 68 69 async def anext_impl(): 70 try: 71 # The C code is way more low-level than this, as it implements 72 # all methods of the iterator protocol. In this implementation 73 # we're relying on higher-level coroutine concepts, but that's 74 # exactly what we want -- crosstest pure-Python high-level 75 # implementation and low-level C anext() iterators. 76 return await __anext__(iterator) 77 except StopAsyncIteration: 78 return default 79 80 return anext_impl() 81 82 83class AsyncGenSyntaxTest(unittest.TestCase): 84 85 def test_async_gen_syntax_01(self): 86 code = '''async def foo(): 87 await abc 88 yield from 123 89 ''' 90 91 with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'): 92 exec(code, {}, {}) 93 94 def test_async_gen_syntax_02(self): 95 code = '''async def foo(): 96 yield from 123 97 ''' 98 99 with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'): 100 exec(code, {}, {}) 101 102 def test_async_gen_syntax_03(self): 103 code = '''async def foo(): 104 await abc 105 yield 106 return 123 107 ''' 108 109 with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): 110 exec(code, {}, {}) 111 112 def test_async_gen_syntax_04(self): 113 code = '''async def foo(): 114 yield 115 return 123 116 ''' 117 118 with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): 119 exec(code, {}, {}) 120 121 def test_async_gen_syntax_05(self): 122 code = '''async def foo(): 123 if 0: 124 yield 125 return 12 126 ''' 127 128 with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'): 129 exec(code, {}, {}) 130 131 132class AsyncGenTest(unittest.TestCase): 133 134 def compare_generators(self, sync_gen, async_gen): 135 def sync_iterate(g): 136 res = [] 137 while True: 138 try: 139 res.append(g.__next__()) 140 except StopIteration: 141 res.append('STOP') 142 break 143 except Exception as ex: 144 res.append(str(type(ex))) 145 return res 146 147 def async_iterate(g): 148 res = [] 149 while True: 150 an = g.__anext__() 151 try: 152 while True: 153 try: 154 an.__next__() 155 except StopIteration as ex: 156 if ex.args: 157 res.append(ex.args[0]) 158 break 159 else: 160 res.append('EMPTY StopIteration') 161 break 162 except StopAsyncIteration: 163 raise 164 except Exception as ex: 165 res.append(str(type(ex))) 166 break 167 except StopAsyncIteration: 168 res.append('STOP') 169 break 170 return res 171 172 sync_gen_result = sync_iterate(sync_gen) 173 async_gen_result = async_iterate(async_gen) 174 self.assertEqual(sync_gen_result, async_gen_result) 175 return async_gen_result 176 177 def test_async_gen_iteration_01(self): 178 async def gen(): 179 await awaitable() 180 a = yield 123 181 self.assertIs(a, None) 182 await awaitable() 183 yield 456 184 await awaitable() 185 yield 789 186 187 self.assertEqual(to_list(gen()), [123, 456, 789]) 188 189 def test_async_gen_iteration_02(self): 190 async def gen(): 191 await awaitable() 192 yield 123 193 await awaitable() 194 195 g = gen() 196 ai = g.__aiter__() 197 198 an = ai.__anext__() 199 self.assertEqual(an.__next__(), ('result',)) 200 201 try: 202 an.__next__() 203 except StopIteration as ex: 204 self.assertEqual(ex.args[0], 123) 205 else: 206 self.fail('StopIteration was not raised') 207 208 an = ai.__anext__() 209 self.assertEqual(an.__next__(), ('result',)) 210 211 try: 212 an.__next__() 213 except StopAsyncIteration as ex: 214 self.assertFalse(ex.args) 215 else: 216 self.fail('StopAsyncIteration was not raised') 217 218 def test_async_gen_exception_03(self): 219 async def gen(): 220 await awaitable() 221 yield 123 222 await awaitable(throw=True) 223 yield 456 224 225 with self.assertRaises(AwaitException): 226 to_list(gen()) 227 228 def test_async_gen_exception_04(self): 229 async def gen(): 230 await awaitable() 231 yield 123 232 1 / 0 233 234 g = gen() 235 ai = g.__aiter__() 236 an = ai.__anext__() 237 self.assertEqual(an.__next__(), ('result',)) 238 239 try: 240 an.__next__() 241 except StopIteration as ex: 242 self.assertEqual(ex.args[0], 123) 243 else: 244 self.fail('StopIteration was not raised') 245 246 with self.assertRaises(ZeroDivisionError): 247 ai.__anext__().__next__() 248 249 def test_async_gen_exception_05(self): 250 async def gen(): 251 yield 123 252 raise StopAsyncIteration 253 254 with self.assertRaisesRegex(RuntimeError, 255 'async generator.*StopAsyncIteration'): 256 to_list(gen()) 257 258 def test_async_gen_exception_06(self): 259 async def gen(): 260 yield 123 261 raise StopIteration 262 263 with self.assertRaisesRegex(RuntimeError, 264 'async generator.*StopIteration'): 265 to_list(gen()) 266 267 def test_async_gen_exception_07(self): 268 def sync_gen(): 269 try: 270 yield 1 271 1 / 0 272 finally: 273 yield 2 274 yield 3 275 276 yield 100 277 278 async def async_gen(): 279 try: 280 yield 1 281 1 / 0 282 finally: 283 yield 2 284 yield 3 285 286 yield 100 287 288 self.compare_generators(sync_gen(), async_gen()) 289 290 def test_async_gen_exception_08(self): 291 def sync_gen(): 292 try: 293 yield 1 294 finally: 295 yield 2 296 1 / 0 297 yield 3 298 299 yield 100 300 301 async def async_gen(): 302 try: 303 yield 1 304 await awaitable() 305 finally: 306 await awaitable() 307 yield 2 308 1 / 0 309 yield 3 310 311 yield 100 312 313 self.compare_generators(sync_gen(), async_gen()) 314 315 def test_async_gen_exception_09(self): 316 def sync_gen(): 317 try: 318 yield 1 319 1 / 0 320 finally: 321 yield 2 322 yield 3 323 324 yield 100 325 326 async def async_gen(): 327 try: 328 await awaitable() 329 yield 1 330 1 / 0 331 finally: 332 yield 2 333 await awaitable() 334 yield 3 335 336 yield 100 337 338 self.compare_generators(sync_gen(), async_gen()) 339 340 def test_async_gen_exception_10(self): 341 async def gen(): 342 yield 123 343 with self.assertRaisesRegex(TypeError, 344 "non-None value .* async generator"): 345 gen().__anext__().send(100) 346 347 def test_async_gen_exception_11(self): 348 def sync_gen(): 349 yield 10 350 yield 20 351 352 def sync_gen_wrapper(): 353 yield 1 354 sg = sync_gen() 355 sg.send(None) 356 try: 357 sg.throw(GeneratorExit()) 358 except GeneratorExit: 359 yield 2 360 yield 3 361 362 async def async_gen(): 363 yield 10 364 yield 20 365 366 async def async_gen_wrapper(): 367 yield 1 368 asg = async_gen() 369 await asg.asend(None) 370 try: 371 await asg.athrow(GeneratorExit()) 372 except GeneratorExit: 373 yield 2 374 yield 3 375 376 self.compare_generators(sync_gen_wrapper(), async_gen_wrapper()) 377 378 def test_async_gen_api_01(self): 379 async def gen(): 380 yield 123 381 382 g = gen() 383 384 self.assertEqual(g.__name__, 'gen') 385 g.__name__ = '123' 386 self.assertEqual(g.__name__, '123') 387 388 self.assertIn('.gen', g.__qualname__) 389 g.__qualname__ = '123' 390 self.assertEqual(g.__qualname__, '123') 391 392 self.assertIsNone(g.ag_await) 393 self.assertIsInstance(g.ag_frame, types.FrameType) 394 self.assertFalse(g.ag_running) 395 self.assertIsInstance(g.ag_code, types.CodeType) 396 397 self.assertTrue(inspect.isawaitable(g.aclose())) 398 399 400class AsyncGenAsyncioTest(unittest.TestCase): 401 402 def setUp(self): 403 self.loop = asyncio.new_event_loop() 404 asyncio.set_event_loop(None) 405 406 def tearDown(self): 407 self.loop.close() 408 self.loop = None 409 asyncio.set_event_loop_policy(None) 410 411 def check_async_iterator_anext(self, ait_class): 412 with self.subTest(anext="pure-Python"): 413 self._check_async_iterator_anext(ait_class, py_anext) 414 with self.subTest(anext="builtin"): 415 self._check_async_iterator_anext(ait_class, anext) 416 417 def _check_async_iterator_anext(self, ait_class, anext): 418 g = ait_class() 419 async def consume(): 420 results = [] 421 results.append(await anext(g)) 422 results.append(await anext(g)) 423 results.append(await anext(g, 'buckle my shoe')) 424 return results 425 res = self.loop.run_until_complete(consume()) 426 self.assertEqual(res, [1, 2, 'buckle my shoe']) 427 with self.assertRaises(StopAsyncIteration): 428 self.loop.run_until_complete(consume()) 429 430 async def test_2(): 431 g1 = ait_class() 432 self.assertEqual(await anext(g1), 1) 433 self.assertEqual(await anext(g1), 2) 434 with self.assertRaises(StopAsyncIteration): 435 await anext(g1) 436 with self.assertRaises(StopAsyncIteration): 437 await anext(g1) 438 439 g2 = ait_class() 440 self.assertEqual(await anext(g2, "default"), 1) 441 self.assertEqual(await anext(g2, "default"), 2) 442 self.assertEqual(await anext(g2, "default"), "default") 443 self.assertEqual(await anext(g2, "default"), "default") 444 445 return "completed" 446 447 result = self.loop.run_until_complete(test_2()) 448 self.assertEqual(result, "completed") 449 450 def test_send(): 451 p = ait_class() 452 obj = anext(p, "completed") 453 with self.assertRaises(StopIteration): 454 with contextlib.closing(obj.__await__()) as g: 455 g.send(None) 456 457 test_send() 458 459 async def test_throw(): 460 p = ait_class() 461 obj = anext(p, "completed") 462 self.assertRaises(SyntaxError, obj.throw, SyntaxError) 463 return "completed" 464 465 result = self.loop.run_until_complete(test_throw()) 466 self.assertEqual(result, "completed") 467 468 def test_async_generator_anext(self): 469 async def agen(): 470 yield 1 471 yield 2 472 self.check_async_iterator_anext(agen) 473 474 def test_python_async_iterator_anext(self): 475 class MyAsyncIter: 476 """Asynchronously yield 1, then 2.""" 477 def __init__(self): 478 self.yielded = 0 479 def __aiter__(self): 480 return self 481 async def __anext__(self): 482 if self.yielded >= 2: 483 raise StopAsyncIteration() 484 else: 485 self.yielded += 1 486 return self.yielded 487 self.check_async_iterator_anext(MyAsyncIter) 488 489 def test_python_async_iterator_types_coroutine_anext(self): 490 import types 491 class MyAsyncIterWithTypesCoro: 492 """Asynchronously yield 1, then 2.""" 493 def __init__(self): 494 self.yielded = 0 495 def __aiter__(self): 496 return self 497 @types.coroutine 498 def __anext__(self): 499 if False: 500 yield "this is a generator-based coroutine" 501 if self.yielded >= 2: 502 raise StopAsyncIteration() 503 else: 504 self.yielded += 1 505 return self.yielded 506 self.check_async_iterator_anext(MyAsyncIterWithTypesCoro) 507 508 def test_async_gen_aiter(self): 509 async def gen(): 510 yield 1 511 yield 2 512 g = gen() 513 async def consume(): 514 return [i async for i in aiter(g)] 515 res = self.loop.run_until_complete(consume()) 516 self.assertEqual(res, [1, 2]) 517 518 def test_async_gen_aiter_class(self): 519 results = [] 520 class Gen: 521 async def __aiter__(self): 522 yield 1 523 yield 2 524 g = Gen() 525 async def consume(): 526 ait = aiter(g) 527 while True: 528 try: 529 results.append(await anext(ait)) 530 except StopAsyncIteration: 531 break 532 self.loop.run_until_complete(consume()) 533 self.assertEqual(results, [1, 2]) 534 535 def test_aiter_idempotent(self): 536 async def gen(): 537 yield 1 538 applied_once = aiter(gen()) 539 applied_twice = aiter(applied_once) 540 self.assertIs(applied_once, applied_twice) 541 542 def test_anext_bad_args(self): 543 async def gen(): 544 yield 1 545 async def call_with_too_few_args(): 546 await anext() 547 async def call_with_too_many_args(): 548 await anext(gen(), 1, 3) 549 async def call_with_wrong_type_args(): 550 await anext(1, gen()) 551 async def call_with_kwarg(): 552 await anext(aiterator=gen()) 553 with self.assertRaises(TypeError): 554 self.loop.run_until_complete(call_with_too_few_args()) 555 with self.assertRaises(TypeError): 556 self.loop.run_until_complete(call_with_too_many_args()) 557 with self.assertRaises(TypeError): 558 self.loop.run_until_complete(call_with_wrong_type_args()) 559 with self.assertRaises(TypeError): 560 self.loop.run_until_complete(call_with_kwarg()) 561 562 def test_anext_bad_await(self): 563 async def bad_awaitable(): 564 class BadAwaitable: 565 def __await__(self): 566 return 42 567 class MyAsyncIter: 568 def __aiter__(self): 569 return self 570 def __anext__(self): 571 return BadAwaitable() 572 regex = r"__await__.*iterator" 573 awaitable = anext(MyAsyncIter(), "default") 574 with self.assertRaisesRegex(TypeError, regex): 575 await awaitable 576 awaitable = anext(MyAsyncIter()) 577 with self.assertRaisesRegex(TypeError, regex): 578 await awaitable 579 return "completed" 580 result = self.loop.run_until_complete(bad_awaitable()) 581 self.assertEqual(result, "completed") 582 583 async def check_anext_returning_iterator(self, aiter_class): 584 awaitable = anext(aiter_class(), "default") 585 with self.assertRaises(TypeError): 586 await awaitable 587 awaitable = anext(aiter_class()) 588 with self.assertRaises(TypeError): 589 await awaitable 590 return "completed" 591 592 def test_anext_return_iterator(self): 593 class WithIterAnext: 594 def __aiter__(self): 595 return self 596 def __anext__(self): 597 return iter("abc") 598 result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithIterAnext)) 599 self.assertEqual(result, "completed") 600 601 def test_anext_return_generator(self): 602 class WithGenAnext: 603 def __aiter__(self): 604 return self 605 def __anext__(self): 606 yield 607 result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithGenAnext)) 608 self.assertEqual(result, "completed") 609 610 def test_anext_await_raises(self): 611 class RaisingAwaitable: 612 def __await__(self): 613 raise ZeroDivisionError() 614 yield 615 class WithRaisingAwaitableAnext: 616 def __aiter__(self): 617 return self 618 def __anext__(self): 619 return RaisingAwaitable() 620 async def do_test(): 621 awaitable = anext(WithRaisingAwaitableAnext()) 622 with self.assertRaises(ZeroDivisionError): 623 await awaitable 624 awaitable = anext(WithRaisingAwaitableAnext(), "default") 625 with self.assertRaises(ZeroDivisionError): 626 await awaitable 627 return "completed" 628 result = self.loop.run_until_complete(do_test()) 629 self.assertEqual(result, "completed") 630 631 def test_anext_iter(self): 632 @types.coroutine 633 def _async_yield(v): 634 return (yield v) 635 636 class MyError(Exception): 637 pass 638 639 async def agenfn(): 640 try: 641 await _async_yield(1) 642 except MyError: 643 await _async_yield(2) 644 return 645 yield 646 647 def test1(anext): 648 agen = agenfn() 649 with contextlib.closing(anext(agen, "default").__await__()) as g: 650 self.assertEqual(g.send(None), 1) 651 self.assertEqual(g.throw(MyError, MyError(), None), 2) 652 try: 653 g.send(None) 654 except StopIteration as e: 655 err = e 656 else: 657 self.fail('StopIteration was not raised') 658 self.assertEqual(err.value, "default") 659 660 def test2(anext): 661 agen = agenfn() 662 with contextlib.closing(anext(agen, "default").__await__()) as g: 663 self.assertEqual(g.send(None), 1) 664 self.assertEqual(g.throw(MyError, MyError(), None), 2) 665 with self.assertRaises(MyError): 666 g.throw(MyError, MyError(), None) 667 668 def test3(anext): 669 agen = agenfn() 670 with contextlib.closing(anext(agen, "default").__await__()) as g: 671 self.assertEqual(g.send(None), 1) 672 g.close() 673 with self.assertRaisesRegex(RuntimeError, 'cannot reuse'): 674 self.assertEqual(g.send(None), 1) 675 676 def test4(anext): 677 @types.coroutine 678 def _async_yield(v): 679 yield v * 10 680 return (yield (v * 10 + 1)) 681 682 async def agenfn(): 683 try: 684 await _async_yield(1) 685 except MyError: 686 await _async_yield(2) 687 return 688 yield 689 690 agen = agenfn() 691 with contextlib.closing(anext(agen, "default").__await__()) as g: 692 self.assertEqual(g.send(None), 10) 693 self.assertEqual(g.throw(MyError, MyError(), None), 20) 694 with self.assertRaisesRegex(MyError, 'val'): 695 g.throw(MyError, MyError('val'), None) 696 697 def test5(anext): 698 @types.coroutine 699 def _async_yield(v): 700 yield v * 10 701 return (yield (v * 10 + 1)) 702 703 async def agenfn(): 704 try: 705 await _async_yield(1) 706 except MyError: 707 return 708 yield 'aaa' 709 710 agen = agenfn() 711 with contextlib.closing(anext(agen, "default").__await__()) as g: 712 self.assertEqual(g.send(None), 10) 713 with self.assertRaisesRegex(StopIteration, 'default'): 714 g.throw(MyError, MyError(), None) 715 716 def test6(anext): 717 @types.coroutine 718 def _async_yield(v): 719 yield v * 10 720 return (yield (v * 10 + 1)) 721 722 async def agenfn(): 723 await _async_yield(1) 724 yield 'aaa' 725 726 agen = agenfn() 727 with contextlib.closing(anext(agen, "default").__await__()) as g: 728 with self.assertRaises(MyError): 729 g.throw(MyError, MyError(), None) 730 731 def run_test(test): 732 with self.subTest('pure-Python anext()'): 733 test(py_anext) 734 with self.subTest('builtin anext()'): 735 test(anext) 736 737 run_test(test1) 738 run_test(test2) 739 run_test(test3) 740 run_test(test4) 741 run_test(test5) 742 run_test(test6) 743 744 def test_aiter_bad_args(self): 745 async def gen(): 746 yield 1 747 async def call_with_too_few_args(): 748 await aiter() 749 async def call_with_too_many_args(): 750 await aiter(gen(), 1) 751 async def call_with_wrong_type_arg(): 752 await aiter(1) 753 with self.assertRaises(TypeError): 754 self.loop.run_until_complete(call_with_too_few_args()) 755 with self.assertRaises(TypeError): 756 self.loop.run_until_complete(call_with_too_many_args()) 757 with self.assertRaises(TypeError): 758 self.loop.run_until_complete(call_with_wrong_type_arg()) 759 760 async def to_list(self, gen): 761 res = [] 762 async for i in gen: 763 res.append(i) 764 return res 765 766 def test_async_gen_asyncio_01(self): 767 async def gen(): 768 yield 1 769 await asyncio.sleep(0.01) 770 yield 2 771 await asyncio.sleep(0.01) 772 return 773 yield 3 774 775 res = self.loop.run_until_complete(self.to_list(gen())) 776 self.assertEqual(res, [1, 2]) 777 778 def test_async_gen_asyncio_02(self): 779 async def gen(): 780 yield 1 781 await asyncio.sleep(0.01) 782 yield 2 783 1 / 0 784 yield 3 785 786 with self.assertRaises(ZeroDivisionError): 787 self.loop.run_until_complete(self.to_list(gen())) 788 789 def test_async_gen_asyncio_03(self): 790 loop = self.loop 791 792 class Gen: 793 async def __aiter__(self): 794 yield 1 795 await asyncio.sleep(0.01) 796 yield 2 797 798 res = loop.run_until_complete(self.to_list(Gen())) 799 self.assertEqual(res, [1, 2]) 800 801 def test_async_gen_asyncio_anext_04(self): 802 async def foo(): 803 yield 1 804 await asyncio.sleep(0.01) 805 try: 806 yield 2 807 yield 3 808 except ZeroDivisionError: 809 yield 1000 810 await asyncio.sleep(0.01) 811 yield 4 812 813 async def run1(): 814 it = foo().__aiter__() 815 816 self.assertEqual(await it.__anext__(), 1) 817 self.assertEqual(await it.__anext__(), 2) 818 self.assertEqual(await it.__anext__(), 3) 819 self.assertEqual(await it.__anext__(), 4) 820 with self.assertRaises(StopAsyncIteration): 821 await it.__anext__() 822 with self.assertRaises(StopAsyncIteration): 823 await it.__anext__() 824 825 async def run2(): 826 it = foo().__aiter__() 827 828 self.assertEqual(await it.__anext__(), 1) 829 self.assertEqual(await it.__anext__(), 2) 830 try: 831 it.__anext__().throw(ZeroDivisionError) 832 except StopIteration as ex: 833 self.assertEqual(ex.args[0], 1000) 834 else: 835 self.fail('StopIteration was not raised') 836 self.assertEqual(await it.__anext__(), 4) 837 with self.assertRaises(StopAsyncIteration): 838 await it.__anext__() 839 840 self.loop.run_until_complete(run1()) 841 self.loop.run_until_complete(run2()) 842 843 def test_async_gen_asyncio_anext_05(self): 844 async def foo(): 845 v = yield 1 846 v = yield v 847 yield v * 100 848 849 async def run(): 850 it = foo().__aiter__() 851 852 try: 853 it.__anext__().send(None) 854 except StopIteration as ex: 855 self.assertEqual(ex.args[0], 1) 856 else: 857 self.fail('StopIteration was not raised') 858 859 try: 860 it.__anext__().send(10) 861 except StopIteration as ex: 862 self.assertEqual(ex.args[0], 10) 863 else: 864 self.fail('StopIteration was not raised') 865 866 try: 867 it.__anext__().send(12) 868 except StopIteration as ex: 869 self.assertEqual(ex.args[0], 1200) 870 else: 871 self.fail('StopIteration was not raised') 872 873 with self.assertRaises(StopAsyncIteration): 874 await it.__anext__() 875 876 self.loop.run_until_complete(run()) 877 878 def test_async_gen_asyncio_anext_06(self): 879 DONE = 0 880 881 # test synchronous generators 882 def foo(): 883 try: 884 yield 885 except: 886 pass 887 g = foo() 888 g.send(None) 889 with self.assertRaises(StopIteration): 890 g.send(None) 891 892 # now with asynchronous generators 893 894 async def gen(): 895 nonlocal DONE 896 try: 897 yield 898 except: 899 pass 900 DONE = 1 901 902 async def run(): 903 nonlocal DONE 904 g = gen() 905 await g.asend(None) 906 with self.assertRaises(StopAsyncIteration): 907 await g.asend(None) 908 DONE += 10 909 910 self.loop.run_until_complete(run()) 911 self.assertEqual(DONE, 11) 912 913 def test_async_gen_asyncio_anext_tuple(self): 914 async def foo(): 915 try: 916 yield (1,) 917 except ZeroDivisionError: 918 yield (2,) 919 920 async def run(): 921 it = foo().__aiter__() 922 923 self.assertEqual(await it.__anext__(), (1,)) 924 with self.assertRaises(StopIteration) as cm: 925 it.__anext__().throw(ZeroDivisionError) 926 self.assertEqual(cm.exception.args[0], (2,)) 927 with self.assertRaises(StopAsyncIteration): 928 await it.__anext__() 929 930 self.loop.run_until_complete(run()) 931 932 def test_async_gen_asyncio_anext_stopiteration(self): 933 async def foo(): 934 try: 935 yield StopIteration(1) 936 except ZeroDivisionError: 937 yield StopIteration(3) 938 939 async def run(): 940 it = foo().__aiter__() 941 942 v = await it.__anext__() 943 self.assertIsInstance(v, StopIteration) 944 self.assertEqual(v.value, 1) 945 with self.assertRaises(StopIteration) as cm: 946 it.__anext__().throw(ZeroDivisionError) 947 v = cm.exception.args[0] 948 self.assertIsInstance(v, StopIteration) 949 self.assertEqual(v.value, 3) 950 with self.assertRaises(StopAsyncIteration): 951 await it.__anext__() 952 953 self.loop.run_until_complete(run()) 954 955 def test_async_gen_asyncio_aclose_06(self): 956 async def foo(): 957 try: 958 yield 1 959 1 / 0 960 finally: 961 await asyncio.sleep(0.01) 962 yield 12 963 964 async def run(): 965 gen = foo() 966 it = gen.__aiter__() 967 await it.__anext__() 968 await gen.aclose() 969 970 with self.assertRaisesRegex( 971 RuntimeError, 972 "async generator ignored GeneratorExit"): 973 self.loop.run_until_complete(run()) 974 975 def test_async_gen_asyncio_aclose_07(self): 976 DONE = 0 977 978 async def foo(): 979 nonlocal DONE 980 try: 981 yield 1 982 1 / 0 983 finally: 984 await asyncio.sleep(0.01) 985 await asyncio.sleep(0.01) 986 DONE += 1 987 DONE += 1000 988 989 async def run(): 990 gen = foo() 991 it = gen.__aiter__() 992 await it.__anext__() 993 await gen.aclose() 994 995 self.loop.run_until_complete(run()) 996 self.assertEqual(DONE, 1) 997 998 def test_async_gen_asyncio_aclose_08(self): 999 DONE = 0 1000 1001 fut = asyncio.Future(loop=self.loop) 1002 1003 async def foo(): 1004 nonlocal DONE 1005 try: 1006 yield 1 1007 await fut 1008 DONE += 1000 1009 yield 2 1010 finally: 1011 await asyncio.sleep(0.01) 1012 await asyncio.sleep(0.01) 1013 DONE += 1 1014 DONE += 1000 1015 1016 async def run(): 1017 gen = foo() 1018 it = gen.__aiter__() 1019 self.assertEqual(await it.__anext__(), 1) 1020 await gen.aclose() 1021 1022 self.loop.run_until_complete(run()) 1023 self.assertEqual(DONE, 1) 1024 1025 # Silence ResourceWarnings 1026 fut.cancel() 1027 self.loop.run_until_complete(asyncio.sleep(0.01)) 1028 1029 def test_async_gen_asyncio_gc_aclose_09(self): 1030 DONE = 0 1031 1032 async def gen(): 1033 nonlocal DONE 1034 try: 1035 while True: 1036 yield 1 1037 finally: 1038 await asyncio.sleep(0.01) 1039 await asyncio.sleep(0.01) 1040 DONE = 1 1041 1042 async def run(): 1043 g = gen() 1044 await g.__anext__() 1045 await g.__anext__() 1046 del g 1047 gc_collect() # For PyPy or other GCs. 1048 1049 await asyncio.sleep(0.1) 1050 1051 self.loop.run_until_complete(run()) 1052 self.assertEqual(DONE, 1) 1053 1054 def test_async_gen_asyncio_aclose_10(self): 1055 DONE = 0 1056 1057 # test synchronous generators 1058 def foo(): 1059 try: 1060 yield 1061 except: 1062 pass 1063 g = foo() 1064 g.send(None) 1065 g.close() 1066 1067 # now with asynchronous generators 1068 1069 async def gen(): 1070 nonlocal DONE 1071 try: 1072 yield 1073 except: 1074 pass 1075 DONE = 1 1076 1077 async def run(): 1078 nonlocal DONE 1079 g = gen() 1080 await g.asend(None) 1081 await g.aclose() 1082 DONE += 10 1083 1084 self.loop.run_until_complete(run()) 1085 self.assertEqual(DONE, 11) 1086 1087 def test_async_gen_asyncio_aclose_11(self): 1088 DONE = 0 1089 1090 # test synchronous generators 1091 def foo(): 1092 try: 1093 yield 1094 except: 1095 pass 1096 yield 1097 g = foo() 1098 g.send(None) 1099 with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'): 1100 g.close() 1101 1102 # now with asynchronous generators 1103 1104 async def gen(): 1105 nonlocal DONE 1106 try: 1107 yield 1108 except: 1109 pass 1110 yield 1111 DONE += 1 1112 1113 async def run(): 1114 nonlocal DONE 1115 g = gen() 1116 await g.asend(None) 1117 with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'): 1118 await g.aclose() 1119 DONE += 10 1120 1121 self.loop.run_until_complete(run()) 1122 self.assertEqual(DONE, 10) 1123 1124 def test_async_gen_asyncio_aclose_12(self): 1125 DONE = 0 1126 1127 async def target(): 1128 await asyncio.sleep(0.01) 1129 1 / 0 1130 1131 async def foo(): 1132 nonlocal DONE 1133 task = asyncio.create_task(target()) 1134 try: 1135 yield 1 1136 finally: 1137 try: 1138 await task 1139 except ZeroDivisionError: 1140 DONE = 1 1141 1142 async def run(): 1143 gen = foo() 1144 it = gen.__aiter__() 1145 await it.__anext__() 1146 await gen.aclose() 1147 1148 self.loop.run_until_complete(run()) 1149 self.assertEqual(DONE, 1) 1150 1151 def test_async_gen_asyncio_asend_01(self): 1152 DONE = 0 1153 1154 # Sanity check: 1155 def sgen(): 1156 v = yield 1 1157 yield v * 2 1158 sg = sgen() 1159 v = sg.send(None) 1160 self.assertEqual(v, 1) 1161 v = sg.send(100) 1162 self.assertEqual(v, 200) 1163 1164 async def gen(): 1165 nonlocal DONE 1166 try: 1167 await asyncio.sleep(0.01) 1168 v = yield 1 1169 await asyncio.sleep(0.01) 1170 yield v * 2 1171 await asyncio.sleep(0.01) 1172 return 1173 finally: 1174 await asyncio.sleep(0.01) 1175 await asyncio.sleep(0.01) 1176 DONE = 1 1177 1178 async def run(): 1179 g = gen() 1180 1181 v = await g.asend(None) 1182 self.assertEqual(v, 1) 1183 1184 v = await g.asend(100) 1185 self.assertEqual(v, 200) 1186 1187 with self.assertRaises(StopAsyncIteration): 1188 await g.asend(None) 1189 1190 self.loop.run_until_complete(run()) 1191 self.assertEqual(DONE, 1) 1192 1193 def test_async_gen_asyncio_asend_02(self): 1194 DONE = 0 1195 1196 async def sleep_n_crash(delay): 1197 await asyncio.sleep(delay) 1198 1 / 0 1199 1200 async def gen(): 1201 nonlocal DONE 1202 try: 1203 await asyncio.sleep(0.01) 1204 v = yield 1 1205 await sleep_n_crash(0.01) 1206 DONE += 1000 1207 yield v * 2 1208 finally: 1209 await asyncio.sleep(0.01) 1210 await asyncio.sleep(0.01) 1211 DONE = 1 1212 1213 async def run(): 1214 g = gen() 1215 1216 v = await g.asend(None) 1217 self.assertEqual(v, 1) 1218 1219 await g.asend(100) 1220 1221 with self.assertRaises(ZeroDivisionError): 1222 self.loop.run_until_complete(run()) 1223 self.assertEqual(DONE, 1) 1224 1225 def test_async_gen_asyncio_asend_03(self): 1226 DONE = 0 1227 1228 async def sleep_n_crash(delay): 1229 fut = asyncio.ensure_future(asyncio.sleep(delay), 1230 loop=self.loop) 1231 self.loop.call_later(delay / 2, lambda: fut.cancel()) 1232 return await fut 1233 1234 async def gen(): 1235 nonlocal DONE 1236 try: 1237 await asyncio.sleep(0.01) 1238 v = yield 1 1239 await sleep_n_crash(0.01) 1240 DONE += 1000 1241 yield v * 2 1242 finally: 1243 await asyncio.sleep(0.01) 1244 await asyncio.sleep(0.01) 1245 DONE = 1 1246 1247 async def run(): 1248 g = gen() 1249 1250 v = await g.asend(None) 1251 self.assertEqual(v, 1) 1252 1253 await g.asend(100) 1254 1255 with self.assertRaises(asyncio.CancelledError): 1256 self.loop.run_until_complete(run()) 1257 self.assertEqual(DONE, 1) 1258 1259 def test_async_gen_asyncio_athrow_01(self): 1260 DONE = 0 1261 1262 class FooEr(Exception): 1263 pass 1264 1265 # Sanity check: 1266 def sgen(): 1267 try: 1268 v = yield 1 1269 except FooEr: 1270 v = 1000 1271 yield v * 2 1272 sg = sgen() 1273 v = sg.send(None) 1274 self.assertEqual(v, 1) 1275 v = sg.throw(FooEr) 1276 self.assertEqual(v, 2000) 1277 with self.assertRaises(StopIteration): 1278 sg.send(None) 1279 1280 async def gen(): 1281 nonlocal DONE 1282 try: 1283 await asyncio.sleep(0.01) 1284 try: 1285 v = yield 1 1286 except FooEr: 1287 v = 1000 1288 await asyncio.sleep(0.01) 1289 yield v * 2 1290 await asyncio.sleep(0.01) 1291 # return 1292 finally: 1293 await asyncio.sleep(0.01) 1294 await asyncio.sleep(0.01) 1295 DONE = 1 1296 1297 async def run(): 1298 g = gen() 1299 1300 v = await g.asend(None) 1301 self.assertEqual(v, 1) 1302 1303 v = await g.athrow(FooEr) 1304 self.assertEqual(v, 2000) 1305 1306 with self.assertRaises(StopAsyncIteration): 1307 await g.asend(None) 1308 1309 self.loop.run_until_complete(run()) 1310 self.assertEqual(DONE, 1) 1311 1312 def test_async_gen_asyncio_athrow_02(self): 1313 DONE = 0 1314 1315 class FooEr(Exception): 1316 pass 1317 1318 async def sleep_n_crash(delay): 1319 fut = asyncio.ensure_future(asyncio.sleep(delay), 1320 loop=self.loop) 1321 self.loop.call_later(delay / 2, lambda: fut.cancel()) 1322 return await fut 1323 1324 async def gen(): 1325 nonlocal DONE 1326 try: 1327 await asyncio.sleep(0.01) 1328 try: 1329 v = yield 1 1330 except FooEr: 1331 await sleep_n_crash(0.01) 1332 yield v * 2 1333 await asyncio.sleep(0.01) 1334 # return 1335 finally: 1336 await asyncio.sleep(0.01) 1337 await asyncio.sleep(0.01) 1338 DONE = 1 1339 1340 async def run(): 1341 g = gen() 1342 1343 v = await g.asend(None) 1344 self.assertEqual(v, 1) 1345 1346 try: 1347 await g.athrow(FooEr) 1348 except asyncio.CancelledError: 1349 self.assertEqual(DONE, 1) 1350 raise 1351 else: 1352 self.fail('CancelledError was not raised') 1353 1354 with self.assertRaises(asyncio.CancelledError): 1355 self.loop.run_until_complete(run()) 1356 self.assertEqual(DONE, 1) 1357 1358 def test_async_gen_asyncio_athrow_03(self): 1359 DONE = 0 1360 1361 # test synchronous generators 1362 def foo(): 1363 try: 1364 yield 1365 except: 1366 pass 1367 g = foo() 1368 g.send(None) 1369 with self.assertRaises(StopIteration): 1370 g.throw(ValueError) 1371 1372 # now with asynchronous generators 1373 1374 async def gen(): 1375 nonlocal DONE 1376 try: 1377 yield 1378 except: 1379 pass 1380 DONE = 1 1381 1382 async def run(): 1383 nonlocal DONE 1384 g = gen() 1385 await g.asend(None) 1386 with self.assertRaises(StopAsyncIteration): 1387 await g.athrow(ValueError) 1388 DONE += 10 1389 1390 self.loop.run_until_complete(run()) 1391 self.assertEqual(DONE, 11) 1392 1393 def test_async_gen_asyncio_athrow_tuple(self): 1394 async def gen(): 1395 try: 1396 yield 1 1397 except ZeroDivisionError: 1398 yield (2,) 1399 1400 async def run(): 1401 g = gen() 1402 v = await g.asend(None) 1403 self.assertEqual(v, 1) 1404 v = await g.athrow(ZeroDivisionError) 1405 self.assertEqual(v, (2,)) 1406 with self.assertRaises(StopAsyncIteration): 1407 await g.asend(None) 1408 1409 self.loop.run_until_complete(run()) 1410 1411 def test_async_gen_asyncio_athrow_stopiteration(self): 1412 async def gen(): 1413 try: 1414 yield 1 1415 except ZeroDivisionError: 1416 yield StopIteration(2) 1417 1418 async def run(): 1419 g = gen() 1420 v = await g.asend(None) 1421 self.assertEqual(v, 1) 1422 v = await g.athrow(ZeroDivisionError) 1423 self.assertIsInstance(v, StopIteration) 1424 self.assertEqual(v.value, 2) 1425 with self.assertRaises(StopAsyncIteration): 1426 await g.asend(None) 1427 1428 self.loop.run_until_complete(run()) 1429 1430 def test_async_gen_asyncio_shutdown_01(self): 1431 finalized = 0 1432 1433 async def waiter(timeout): 1434 nonlocal finalized 1435 try: 1436 await asyncio.sleep(timeout) 1437 yield 1 1438 finally: 1439 await asyncio.sleep(0) 1440 finalized += 1 1441 1442 async def wait(): 1443 async for _ in waiter(1): 1444 pass 1445 1446 t1 = self.loop.create_task(wait()) 1447 t2 = self.loop.create_task(wait()) 1448 1449 self.loop.run_until_complete(asyncio.sleep(0.1)) 1450 1451 # Silence warnings 1452 t1.cancel() 1453 t2.cancel() 1454 1455 with self.assertRaises(asyncio.CancelledError): 1456 self.loop.run_until_complete(t1) 1457 with self.assertRaises(asyncio.CancelledError): 1458 self.loop.run_until_complete(t2) 1459 1460 self.loop.run_until_complete(self.loop.shutdown_asyncgens()) 1461 1462 self.assertEqual(finalized, 2) 1463 1464 def test_async_gen_asyncio_shutdown_02(self): 1465 messages = [] 1466 1467 def exception_handler(loop, context): 1468 messages.append(context) 1469 1470 async def async_iterate(): 1471 yield 1 1472 yield 2 1473 1474 it = async_iterate() 1475 async def main(): 1476 loop = asyncio.get_running_loop() 1477 loop.set_exception_handler(exception_handler) 1478 1479 async for i in it: 1480 break 1481 1482 asyncio.run(main()) 1483 1484 self.assertEqual(messages, []) 1485 1486 def test_async_gen_asyncio_shutdown_exception_01(self): 1487 messages = [] 1488 1489 def exception_handler(loop, context): 1490 messages.append(context) 1491 1492 async def async_iterate(): 1493 try: 1494 yield 1 1495 yield 2 1496 finally: 1497 1/0 1498 1499 it = async_iterate() 1500 async def main(): 1501 loop = asyncio.get_running_loop() 1502 loop.set_exception_handler(exception_handler) 1503 1504 async for i in it: 1505 break 1506 1507 asyncio.run(main()) 1508 1509 message, = messages 1510 self.assertEqual(message['asyncgen'], it) 1511 self.assertIsInstance(message['exception'], ZeroDivisionError) 1512 self.assertIn('an error occurred during closing of asynchronous generator', 1513 message['message']) 1514 1515 def test_async_gen_asyncio_shutdown_exception_02(self): 1516 messages = [] 1517 1518 def exception_handler(loop, context): 1519 messages.append(context) 1520 1521 async def async_iterate(): 1522 try: 1523 yield 1 1524 yield 2 1525 finally: 1526 1/0 1527 1528 async def main(): 1529 loop = asyncio.get_running_loop() 1530 loop.set_exception_handler(exception_handler) 1531 1532 async for i in async_iterate(): 1533 break 1534 gc_collect() 1535 1536 asyncio.run(main()) 1537 1538 message, = messages 1539 self.assertIsInstance(message['exception'], ZeroDivisionError) 1540 self.assertIn('unhandled exception during asyncio.run() shutdown', 1541 message['message']) 1542 1543 def test_async_gen_expression_01(self): 1544 async def arange(n): 1545 for i in range(n): 1546 await asyncio.sleep(0.01) 1547 yield i 1548 1549 def make_arange(n): 1550 # This syntax is legal starting with Python 3.7 1551 return (i * 2 async for i in arange(n)) 1552 1553 async def run(): 1554 return [i async for i in make_arange(10)] 1555 1556 res = self.loop.run_until_complete(run()) 1557 self.assertEqual(res, [i * 2 for i in range(10)]) 1558 1559 def test_async_gen_expression_02(self): 1560 async def wrap(n): 1561 await asyncio.sleep(0.01) 1562 return n 1563 1564 def make_arange(n): 1565 # This syntax is legal starting with Python 3.7 1566 return (i * 2 for i in range(n) if await wrap(i)) 1567 1568 async def run(): 1569 return [i async for i in make_arange(10)] 1570 1571 res = self.loop.run_until_complete(run()) 1572 self.assertEqual(res, [i * 2 for i in range(1, 10)]) 1573 1574 def test_asyncgen_nonstarted_hooks_are_cancellable(self): 1575 # See https://bugs.python.org/issue38013 1576 messages = [] 1577 1578 def exception_handler(loop, context): 1579 messages.append(context) 1580 1581 async def async_iterate(): 1582 yield 1 1583 yield 2 1584 1585 async def main(): 1586 loop = asyncio.get_running_loop() 1587 loop.set_exception_handler(exception_handler) 1588 1589 async for i in async_iterate(): 1590 break 1591 1592 asyncio.run(main()) 1593 1594 self.assertEqual([], messages) 1595 1596 def test_async_gen_await_same_anext_coro_twice(self): 1597 async def async_iterate(): 1598 yield 1 1599 yield 2 1600 1601 async def run(): 1602 it = async_iterate() 1603 nxt = it.__anext__() 1604 await nxt 1605 with self.assertRaisesRegex( 1606 RuntimeError, 1607 r"cannot reuse already awaited __anext__\(\)/asend\(\)" 1608 ): 1609 await nxt 1610 1611 await it.aclose() # prevent unfinished iterator warning 1612 1613 self.loop.run_until_complete(run()) 1614 1615 def test_async_gen_await_same_aclose_coro_twice(self): 1616 async def async_iterate(): 1617 yield 1 1618 yield 2 1619 1620 async def run(): 1621 it = async_iterate() 1622 nxt = it.aclose() 1623 await nxt 1624 with self.assertRaisesRegex( 1625 RuntimeError, 1626 r"cannot reuse already awaited aclose\(\)/athrow\(\)" 1627 ): 1628 await nxt 1629 1630 self.loop.run_until_complete(run()) 1631 1632 def test_async_gen_aclose_twice_with_different_coros(self): 1633 # Regression test for https://bugs.python.org/issue39606 1634 async def async_iterate(): 1635 yield 1 1636 yield 2 1637 1638 async def run(): 1639 it = async_iterate() 1640 await it.aclose() 1641 await it.aclose() 1642 1643 self.loop.run_until_complete(run()) 1644 1645 def test_async_gen_aclose_after_exhaustion(self): 1646 # Regression test for https://bugs.python.org/issue39606 1647 async def async_iterate(): 1648 yield 1 1649 yield 2 1650 1651 async def run(): 1652 it = async_iterate() 1653 async for _ in it: 1654 pass 1655 await it.aclose() 1656 1657 self.loop.run_until_complete(run()) 1658 1659 def test_async_gen_aclose_compatible_with_get_stack(self): 1660 async def async_generator(): 1661 yield object() 1662 1663 async def run(): 1664 ag = async_generator() 1665 asyncio.create_task(ag.aclose()) 1666 tasks = asyncio.all_tasks() 1667 for task in tasks: 1668 # No AttributeError raised 1669 task.get_stack() 1670 1671 self.loop.run_until_complete(run()) 1672 1673 1674if __name__ == "__main__": 1675 unittest.main() 1676