1import inspect
2import types
3import unittest
4import contextlib
5
6from test.support.import_helper import import_module
7from test.support import gc_collect
8asyncio = import_module("asyncio")
9
10
11_no_default = object()
12
13
14class AwaitException(Exception):
15    pass
16
17
18@types.coroutine
19def awaitable(*, throw=False):
20    if throw:
21        yield ('throw',)
22    else:
23        yield ('result',)
24
25
26def run_until_complete(coro):
27    exc = False
28    while True:
29        try:
30            if exc:
31                exc = False
32                fut = coro.throw(AwaitException)
33            else:
34                fut = coro.send(None)
35        except StopIteration as ex:
36            return ex.args[0]
37
38        if fut == ('throw',):
39            exc = True
40
41
42def to_list(gen):
43    async def iterate():
44        res = []
45        async for i in gen:
46            res.append(i)
47        return res
48
49    return run_until_complete(iterate())
50
51
52def py_anext(iterator, default=_no_default):
53    """Pure-Python implementation of anext() for testing purposes.
54
55    Closely matches the builtin anext() C implementation.
56    Can be used to compare the built-in implementation of the inner
57    coroutines machinery to C-implementation of __anext__() and send()
58    or throw() on the returned generator.
59    """
60
61    try:
62        __anext__ = type(iterator).__anext__
63    except AttributeError:
64        raise TypeError(f'{iterator!r} is not an async iterator')
65
66    if default is _no_default:
67        return __anext__(iterator)
68
69    async def anext_impl():
70        try:
71            # The C code is way more low-level than this, as it implements
72            # all methods of the iterator protocol. In this implementation
73            # we're relying on higher-level coroutine concepts, but that's
74            # exactly what we want -- crosstest pure-Python high-level
75            # implementation and low-level C anext() iterators.
76            return await __anext__(iterator)
77        except StopAsyncIteration:
78            return default
79
80    return anext_impl()
81
82
83class AsyncGenSyntaxTest(unittest.TestCase):
84
85    def test_async_gen_syntax_01(self):
86        code = '''async def foo():
87            await abc
88            yield from 123
89        '''
90
91        with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'):
92            exec(code, {}, {})
93
94    def test_async_gen_syntax_02(self):
95        code = '''async def foo():
96            yield from 123
97        '''
98
99        with self.assertRaisesRegex(SyntaxError, 'yield from.*inside async'):
100            exec(code, {}, {})
101
102    def test_async_gen_syntax_03(self):
103        code = '''async def foo():
104            await abc
105            yield
106            return 123
107        '''
108
109        with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
110            exec(code, {}, {})
111
112    def test_async_gen_syntax_04(self):
113        code = '''async def foo():
114            yield
115            return 123
116        '''
117
118        with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
119            exec(code, {}, {})
120
121    def test_async_gen_syntax_05(self):
122        code = '''async def foo():
123            if 0:
124                yield
125            return 12
126        '''
127
128        with self.assertRaisesRegex(SyntaxError, 'return.*value.*async gen'):
129            exec(code, {}, {})
130
131
132class AsyncGenTest(unittest.TestCase):
133
134    def compare_generators(self, sync_gen, async_gen):
135        def sync_iterate(g):
136            res = []
137            while True:
138                try:
139                    res.append(g.__next__())
140                except StopIteration:
141                    res.append('STOP')
142                    break
143                except Exception as ex:
144                    res.append(str(type(ex)))
145            return res
146
147        def async_iterate(g):
148            res = []
149            while True:
150                an = g.__anext__()
151                try:
152                    while True:
153                        try:
154                            an.__next__()
155                        except StopIteration as ex:
156                            if ex.args:
157                                res.append(ex.args[0])
158                                break
159                            else:
160                                res.append('EMPTY StopIteration')
161                                break
162                        except StopAsyncIteration:
163                            raise
164                        except Exception as ex:
165                            res.append(str(type(ex)))
166                            break
167                except StopAsyncIteration:
168                    res.append('STOP')
169                    break
170            return res
171
172        sync_gen_result = sync_iterate(sync_gen)
173        async_gen_result = async_iterate(async_gen)
174        self.assertEqual(sync_gen_result, async_gen_result)
175        return async_gen_result
176
177    def test_async_gen_iteration_01(self):
178        async def gen():
179            await awaitable()
180            a = yield 123
181            self.assertIs(a, None)
182            await awaitable()
183            yield 456
184            await awaitable()
185            yield 789
186
187        self.assertEqual(to_list(gen()), [123, 456, 789])
188
189    def test_async_gen_iteration_02(self):
190        async def gen():
191            await awaitable()
192            yield 123
193            await awaitable()
194
195        g = gen()
196        ai = g.__aiter__()
197
198        an = ai.__anext__()
199        self.assertEqual(an.__next__(), ('result',))
200
201        try:
202            an.__next__()
203        except StopIteration as ex:
204            self.assertEqual(ex.args[0], 123)
205        else:
206            self.fail('StopIteration was not raised')
207
208        an = ai.__anext__()
209        self.assertEqual(an.__next__(), ('result',))
210
211        try:
212            an.__next__()
213        except StopAsyncIteration as ex:
214            self.assertFalse(ex.args)
215        else:
216            self.fail('StopAsyncIteration was not raised')
217
218    def test_async_gen_exception_03(self):
219        async def gen():
220            await awaitable()
221            yield 123
222            await awaitable(throw=True)
223            yield 456
224
225        with self.assertRaises(AwaitException):
226            to_list(gen())
227
228    def test_async_gen_exception_04(self):
229        async def gen():
230            await awaitable()
231            yield 123
232            1 / 0
233
234        g = gen()
235        ai = g.__aiter__()
236        an = ai.__anext__()
237        self.assertEqual(an.__next__(), ('result',))
238
239        try:
240            an.__next__()
241        except StopIteration as ex:
242            self.assertEqual(ex.args[0], 123)
243        else:
244            self.fail('StopIteration was not raised')
245
246        with self.assertRaises(ZeroDivisionError):
247            ai.__anext__().__next__()
248
249    def test_async_gen_exception_05(self):
250        async def gen():
251            yield 123
252            raise StopAsyncIteration
253
254        with self.assertRaisesRegex(RuntimeError,
255                                    'async generator.*StopAsyncIteration'):
256            to_list(gen())
257
258    def test_async_gen_exception_06(self):
259        async def gen():
260            yield 123
261            raise StopIteration
262
263        with self.assertRaisesRegex(RuntimeError,
264                                    'async generator.*StopIteration'):
265            to_list(gen())
266
267    def test_async_gen_exception_07(self):
268        def sync_gen():
269            try:
270                yield 1
271                1 / 0
272            finally:
273                yield 2
274                yield 3
275
276            yield 100
277
278        async def async_gen():
279            try:
280                yield 1
281                1 / 0
282            finally:
283                yield 2
284                yield 3
285
286            yield 100
287
288        self.compare_generators(sync_gen(), async_gen())
289
290    def test_async_gen_exception_08(self):
291        def sync_gen():
292            try:
293                yield 1
294            finally:
295                yield 2
296                1 / 0
297                yield 3
298
299            yield 100
300
301        async def async_gen():
302            try:
303                yield 1
304                await awaitable()
305            finally:
306                await awaitable()
307                yield 2
308                1 / 0
309                yield 3
310
311            yield 100
312
313        self.compare_generators(sync_gen(), async_gen())
314
315    def test_async_gen_exception_09(self):
316        def sync_gen():
317            try:
318                yield 1
319                1 / 0
320            finally:
321                yield 2
322                yield 3
323
324            yield 100
325
326        async def async_gen():
327            try:
328                await awaitable()
329                yield 1
330                1 / 0
331            finally:
332                yield 2
333                await awaitable()
334                yield 3
335
336            yield 100
337
338        self.compare_generators(sync_gen(), async_gen())
339
340    def test_async_gen_exception_10(self):
341        async def gen():
342            yield 123
343        with self.assertRaisesRegex(TypeError,
344                                    "non-None value .* async generator"):
345            gen().__anext__().send(100)
346
347    def test_async_gen_exception_11(self):
348        def sync_gen():
349            yield 10
350            yield 20
351
352        def sync_gen_wrapper():
353            yield 1
354            sg = sync_gen()
355            sg.send(None)
356            try:
357                sg.throw(GeneratorExit())
358            except GeneratorExit:
359                yield 2
360            yield 3
361
362        async def async_gen():
363            yield 10
364            yield 20
365
366        async def async_gen_wrapper():
367            yield 1
368            asg = async_gen()
369            await asg.asend(None)
370            try:
371                await asg.athrow(GeneratorExit())
372            except GeneratorExit:
373                yield 2
374            yield 3
375
376        self.compare_generators(sync_gen_wrapper(), async_gen_wrapper())
377
378    def test_async_gen_api_01(self):
379        async def gen():
380            yield 123
381
382        g = gen()
383
384        self.assertEqual(g.__name__, 'gen')
385        g.__name__ = '123'
386        self.assertEqual(g.__name__, '123')
387
388        self.assertIn('.gen', g.__qualname__)
389        g.__qualname__ = '123'
390        self.assertEqual(g.__qualname__, '123')
391
392        self.assertIsNone(g.ag_await)
393        self.assertIsInstance(g.ag_frame, types.FrameType)
394        self.assertFalse(g.ag_running)
395        self.assertIsInstance(g.ag_code, types.CodeType)
396
397        self.assertTrue(inspect.isawaitable(g.aclose()))
398
399
400class AsyncGenAsyncioTest(unittest.TestCase):
401
402    def setUp(self):
403        self.loop = asyncio.new_event_loop()
404        asyncio.set_event_loop(None)
405
406    def tearDown(self):
407        self.loop.close()
408        self.loop = None
409        asyncio.set_event_loop_policy(None)
410
411    def check_async_iterator_anext(self, ait_class):
412        with self.subTest(anext="pure-Python"):
413            self._check_async_iterator_anext(ait_class, py_anext)
414        with self.subTest(anext="builtin"):
415            self._check_async_iterator_anext(ait_class, anext)
416
417    def _check_async_iterator_anext(self, ait_class, anext):
418        g = ait_class()
419        async def consume():
420            results = []
421            results.append(await anext(g))
422            results.append(await anext(g))
423            results.append(await anext(g, 'buckle my shoe'))
424            return results
425        res = self.loop.run_until_complete(consume())
426        self.assertEqual(res, [1, 2, 'buckle my shoe'])
427        with self.assertRaises(StopAsyncIteration):
428            self.loop.run_until_complete(consume())
429
430        async def test_2():
431            g1 = ait_class()
432            self.assertEqual(await anext(g1), 1)
433            self.assertEqual(await anext(g1), 2)
434            with self.assertRaises(StopAsyncIteration):
435                await anext(g1)
436            with self.assertRaises(StopAsyncIteration):
437                await anext(g1)
438
439            g2 = ait_class()
440            self.assertEqual(await anext(g2, "default"), 1)
441            self.assertEqual(await anext(g2, "default"), 2)
442            self.assertEqual(await anext(g2, "default"), "default")
443            self.assertEqual(await anext(g2, "default"), "default")
444
445            return "completed"
446
447        result = self.loop.run_until_complete(test_2())
448        self.assertEqual(result, "completed")
449
450        def test_send():
451            p = ait_class()
452            obj = anext(p, "completed")
453            with self.assertRaises(StopIteration):
454                with contextlib.closing(obj.__await__()) as g:
455                    g.send(None)
456
457        test_send()
458
459        async def test_throw():
460            p = ait_class()
461            obj = anext(p, "completed")
462            self.assertRaises(SyntaxError, obj.throw, SyntaxError)
463            return "completed"
464
465        result = self.loop.run_until_complete(test_throw())
466        self.assertEqual(result, "completed")
467
468    def test_async_generator_anext(self):
469        async def agen():
470            yield 1
471            yield 2
472        self.check_async_iterator_anext(agen)
473
474    def test_python_async_iterator_anext(self):
475        class MyAsyncIter:
476            """Asynchronously yield 1, then 2."""
477            def __init__(self):
478                self.yielded = 0
479            def __aiter__(self):
480                return self
481            async def __anext__(self):
482                if self.yielded >= 2:
483                    raise StopAsyncIteration()
484                else:
485                    self.yielded += 1
486                    return self.yielded
487        self.check_async_iterator_anext(MyAsyncIter)
488
489    def test_python_async_iterator_types_coroutine_anext(self):
490        import types
491        class MyAsyncIterWithTypesCoro:
492            """Asynchronously yield 1, then 2."""
493            def __init__(self):
494                self.yielded = 0
495            def __aiter__(self):
496                return self
497            @types.coroutine
498            def __anext__(self):
499                if False:
500                    yield "this is a generator-based coroutine"
501                if self.yielded >= 2:
502                    raise StopAsyncIteration()
503                else:
504                    self.yielded += 1
505                    return self.yielded
506        self.check_async_iterator_anext(MyAsyncIterWithTypesCoro)
507
508    def test_async_gen_aiter(self):
509        async def gen():
510            yield 1
511            yield 2
512        g = gen()
513        async def consume():
514            return [i async for i in aiter(g)]
515        res = self.loop.run_until_complete(consume())
516        self.assertEqual(res, [1, 2])
517
518    def test_async_gen_aiter_class(self):
519        results = []
520        class Gen:
521            async def __aiter__(self):
522                yield 1
523                yield 2
524        g = Gen()
525        async def consume():
526            ait = aiter(g)
527            while True:
528                try:
529                    results.append(await anext(ait))
530                except StopAsyncIteration:
531                    break
532        self.loop.run_until_complete(consume())
533        self.assertEqual(results, [1, 2])
534
535    def test_aiter_idempotent(self):
536        async def gen():
537            yield 1
538        applied_once = aiter(gen())
539        applied_twice = aiter(applied_once)
540        self.assertIs(applied_once, applied_twice)
541
542    def test_anext_bad_args(self):
543        async def gen():
544            yield 1
545        async def call_with_too_few_args():
546            await anext()
547        async def call_with_too_many_args():
548            await anext(gen(), 1, 3)
549        async def call_with_wrong_type_args():
550            await anext(1, gen())
551        async def call_with_kwarg():
552            await anext(aiterator=gen())
553        with self.assertRaises(TypeError):
554            self.loop.run_until_complete(call_with_too_few_args())
555        with self.assertRaises(TypeError):
556            self.loop.run_until_complete(call_with_too_many_args())
557        with self.assertRaises(TypeError):
558            self.loop.run_until_complete(call_with_wrong_type_args())
559        with self.assertRaises(TypeError):
560            self.loop.run_until_complete(call_with_kwarg())
561
562    def test_anext_bad_await(self):
563        async def bad_awaitable():
564            class BadAwaitable:
565                def __await__(self):
566                    return 42
567            class MyAsyncIter:
568                def __aiter__(self):
569                    return self
570                def __anext__(self):
571                    return BadAwaitable()
572            regex = r"__await__.*iterator"
573            awaitable = anext(MyAsyncIter(), "default")
574            with self.assertRaisesRegex(TypeError, regex):
575                await awaitable
576            awaitable = anext(MyAsyncIter())
577            with self.assertRaisesRegex(TypeError, regex):
578                await awaitable
579            return "completed"
580        result = self.loop.run_until_complete(bad_awaitable())
581        self.assertEqual(result, "completed")
582
583    async def check_anext_returning_iterator(self, aiter_class):
584        awaitable = anext(aiter_class(), "default")
585        with self.assertRaises(TypeError):
586            await awaitable
587        awaitable = anext(aiter_class())
588        with self.assertRaises(TypeError):
589            await awaitable
590        return "completed"
591
592    def test_anext_return_iterator(self):
593        class WithIterAnext:
594            def __aiter__(self):
595                return self
596            def __anext__(self):
597                return iter("abc")
598        result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithIterAnext))
599        self.assertEqual(result, "completed")
600
601    def test_anext_return_generator(self):
602        class WithGenAnext:
603            def __aiter__(self):
604                return self
605            def __anext__(self):
606                yield
607        result = self.loop.run_until_complete(self.check_anext_returning_iterator(WithGenAnext))
608        self.assertEqual(result, "completed")
609
610    def test_anext_await_raises(self):
611        class RaisingAwaitable:
612            def __await__(self):
613                raise ZeroDivisionError()
614                yield
615        class WithRaisingAwaitableAnext:
616            def __aiter__(self):
617                return self
618            def __anext__(self):
619                return RaisingAwaitable()
620        async def do_test():
621            awaitable = anext(WithRaisingAwaitableAnext())
622            with self.assertRaises(ZeroDivisionError):
623                await awaitable
624            awaitable = anext(WithRaisingAwaitableAnext(), "default")
625            with self.assertRaises(ZeroDivisionError):
626                await awaitable
627            return "completed"
628        result = self.loop.run_until_complete(do_test())
629        self.assertEqual(result, "completed")
630
631    def test_anext_iter(self):
632        @types.coroutine
633        def _async_yield(v):
634            return (yield v)
635
636        class MyError(Exception):
637            pass
638
639        async def agenfn():
640            try:
641                await _async_yield(1)
642            except MyError:
643                await _async_yield(2)
644            return
645            yield
646
647        def test1(anext):
648            agen = agenfn()
649            with contextlib.closing(anext(agen, "default").__await__()) as g:
650                self.assertEqual(g.send(None), 1)
651                self.assertEqual(g.throw(MyError, MyError(), None), 2)
652                try:
653                    g.send(None)
654                except StopIteration as e:
655                    err = e
656                else:
657                    self.fail('StopIteration was not raised')
658                self.assertEqual(err.value, "default")
659
660        def test2(anext):
661            agen = agenfn()
662            with contextlib.closing(anext(agen, "default").__await__()) as g:
663                self.assertEqual(g.send(None), 1)
664                self.assertEqual(g.throw(MyError, MyError(), None), 2)
665                with self.assertRaises(MyError):
666                    g.throw(MyError, MyError(), None)
667
668        def test3(anext):
669            agen = agenfn()
670            with contextlib.closing(anext(agen, "default").__await__()) as g:
671                self.assertEqual(g.send(None), 1)
672                g.close()
673                with self.assertRaisesRegex(RuntimeError, 'cannot reuse'):
674                    self.assertEqual(g.send(None), 1)
675
676        def test4(anext):
677            @types.coroutine
678            def _async_yield(v):
679                yield v * 10
680                return (yield (v * 10 + 1))
681
682            async def agenfn():
683                try:
684                    await _async_yield(1)
685                except MyError:
686                    await _async_yield(2)
687                return
688                yield
689
690            agen = agenfn()
691            with contextlib.closing(anext(agen, "default").__await__()) as g:
692                self.assertEqual(g.send(None), 10)
693                self.assertEqual(g.throw(MyError, MyError(), None), 20)
694                with self.assertRaisesRegex(MyError, 'val'):
695                    g.throw(MyError, MyError('val'), None)
696
697        def test5(anext):
698            @types.coroutine
699            def _async_yield(v):
700                yield v * 10
701                return (yield (v * 10 + 1))
702
703            async def agenfn():
704                try:
705                    await _async_yield(1)
706                except MyError:
707                    return
708                yield 'aaa'
709
710            agen = agenfn()
711            with contextlib.closing(anext(agen, "default").__await__()) as g:
712                self.assertEqual(g.send(None), 10)
713                with self.assertRaisesRegex(StopIteration, 'default'):
714                    g.throw(MyError, MyError(), None)
715
716        def test6(anext):
717            @types.coroutine
718            def _async_yield(v):
719                yield v * 10
720                return (yield (v * 10 + 1))
721
722            async def agenfn():
723                await _async_yield(1)
724                yield 'aaa'
725
726            agen = agenfn()
727            with contextlib.closing(anext(agen, "default").__await__()) as g:
728                with self.assertRaises(MyError):
729                    g.throw(MyError, MyError(), None)
730
731        def run_test(test):
732            with self.subTest('pure-Python anext()'):
733                test(py_anext)
734            with self.subTest('builtin anext()'):
735                test(anext)
736
737        run_test(test1)
738        run_test(test2)
739        run_test(test3)
740        run_test(test4)
741        run_test(test5)
742        run_test(test6)
743
744    def test_aiter_bad_args(self):
745        async def gen():
746            yield 1
747        async def call_with_too_few_args():
748            await aiter()
749        async def call_with_too_many_args():
750            await aiter(gen(), 1)
751        async def call_with_wrong_type_arg():
752            await aiter(1)
753        with self.assertRaises(TypeError):
754            self.loop.run_until_complete(call_with_too_few_args())
755        with self.assertRaises(TypeError):
756            self.loop.run_until_complete(call_with_too_many_args())
757        with self.assertRaises(TypeError):
758            self.loop.run_until_complete(call_with_wrong_type_arg())
759
760    async def to_list(self, gen):
761        res = []
762        async for i in gen:
763            res.append(i)
764        return res
765
766    def test_async_gen_asyncio_01(self):
767        async def gen():
768            yield 1
769            await asyncio.sleep(0.01)
770            yield 2
771            await asyncio.sleep(0.01)
772            return
773            yield 3
774
775        res = self.loop.run_until_complete(self.to_list(gen()))
776        self.assertEqual(res, [1, 2])
777
778    def test_async_gen_asyncio_02(self):
779        async def gen():
780            yield 1
781            await asyncio.sleep(0.01)
782            yield 2
783            1 / 0
784            yield 3
785
786        with self.assertRaises(ZeroDivisionError):
787            self.loop.run_until_complete(self.to_list(gen()))
788
789    def test_async_gen_asyncio_03(self):
790        loop = self.loop
791
792        class Gen:
793            async def __aiter__(self):
794                yield 1
795                await asyncio.sleep(0.01)
796                yield 2
797
798        res = loop.run_until_complete(self.to_list(Gen()))
799        self.assertEqual(res, [1, 2])
800
801    def test_async_gen_asyncio_anext_04(self):
802        async def foo():
803            yield 1
804            await asyncio.sleep(0.01)
805            try:
806                yield 2
807                yield 3
808            except ZeroDivisionError:
809                yield 1000
810            await asyncio.sleep(0.01)
811            yield 4
812
813        async def run1():
814            it = foo().__aiter__()
815
816            self.assertEqual(await it.__anext__(), 1)
817            self.assertEqual(await it.__anext__(), 2)
818            self.assertEqual(await it.__anext__(), 3)
819            self.assertEqual(await it.__anext__(), 4)
820            with self.assertRaises(StopAsyncIteration):
821                await it.__anext__()
822            with self.assertRaises(StopAsyncIteration):
823                await it.__anext__()
824
825        async def run2():
826            it = foo().__aiter__()
827
828            self.assertEqual(await it.__anext__(), 1)
829            self.assertEqual(await it.__anext__(), 2)
830            try:
831                it.__anext__().throw(ZeroDivisionError)
832            except StopIteration as ex:
833                self.assertEqual(ex.args[0], 1000)
834            else:
835                self.fail('StopIteration was not raised')
836            self.assertEqual(await it.__anext__(), 4)
837            with self.assertRaises(StopAsyncIteration):
838                await it.__anext__()
839
840        self.loop.run_until_complete(run1())
841        self.loop.run_until_complete(run2())
842
843    def test_async_gen_asyncio_anext_05(self):
844        async def foo():
845            v = yield 1
846            v = yield v
847            yield v * 100
848
849        async def run():
850            it = foo().__aiter__()
851
852            try:
853                it.__anext__().send(None)
854            except StopIteration as ex:
855                self.assertEqual(ex.args[0], 1)
856            else:
857                self.fail('StopIteration was not raised')
858
859            try:
860                it.__anext__().send(10)
861            except StopIteration as ex:
862                self.assertEqual(ex.args[0], 10)
863            else:
864                self.fail('StopIteration was not raised')
865
866            try:
867                it.__anext__().send(12)
868            except StopIteration as ex:
869                self.assertEqual(ex.args[0], 1200)
870            else:
871                self.fail('StopIteration was not raised')
872
873            with self.assertRaises(StopAsyncIteration):
874                await it.__anext__()
875
876        self.loop.run_until_complete(run())
877
878    def test_async_gen_asyncio_anext_06(self):
879        DONE = 0
880
881        # test synchronous generators
882        def foo():
883            try:
884                yield
885            except:
886                pass
887        g = foo()
888        g.send(None)
889        with self.assertRaises(StopIteration):
890            g.send(None)
891
892        # now with asynchronous generators
893
894        async def gen():
895            nonlocal DONE
896            try:
897                yield
898            except:
899                pass
900            DONE = 1
901
902        async def run():
903            nonlocal DONE
904            g = gen()
905            await g.asend(None)
906            with self.assertRaises(StopAsyncIteration):
907                await g.asend(None)
908            DONE += 10
909
910        self.loop.run_until_complete(run())
911        self.assertEqual(DONE, 11)
912
913    def test_async_gen_asyncio_anext_tuple(self):
914        async def foo():
915            try:
916                yield (1,)
917            except ZeroDivisionError:
918                yield (2,)
919
920        async def run():
921            it = foo().__aiter__()
922
923            self.assertEqual(await it.__anext__(), (1,))
924            with self.assertRaises(StopIteration) as cm:
925                it.__anext__().throw(ZeroDivisionError)
926            self.assertEqual(cm.exception.args[0], (2,))
927            with self.assertRaises(StopAsyncIteration):
928                await it.__anext__()
929
930        self.loop.run_until_complete(run())
931
932    def test_async_gen_asyncio_anext_stopiteration(self):
933        async def foo():
934            try:
935                yield StopIteration(1)
936            except ZeroDivisionError:
937                yield StopIteration(3)
938
939        async def run():
940            it = foo().__aiter__()
941
942            v = await it.__anext__()
943            self.assertIsInstance(v, StopIteration)
944            self.assertEqual(v.value, 1)
945            with self.assertRaises(StopIteration) as cm:
946                it.__anext__().throw(ZeroDivisionError)
947            v = cm.exception.args[0]
948            self.assertIsInstance(v, StopIteration)
949            self.assertEqual(v.value, 3)
950            with self.assertRaises(StopAsyncIteration):
951                await it.__anext__()
952
953        self.loop.run_until_complete(run())
954
955    def test_async_gen_asyncio_aclose_06(self):
956        async def foo():
957            try:
958                yield 1
959                1 / 0
960            finally:
961                await asyncio.sleep(0.01)
962                yield 12
963
964        async def run():
965            gen = foo()
966            it = gen.__aiter__()
967            await it.__anext__()
968            await gen.aclose()
969
970        with self.assertRaisesRegex(
971                RuntimeError,
972                "async generator ignored GeneratorExit"):
973            self.loop.run_until_complete(run())
974
975    def test_async_gen_asyncio_aclose_07(self):
976        DONE = 0
977
978        async def foo():
979            nonlocal DONE
980            try:
981                yield 1
982                1 / 0
983            finally:
984                await asyncio.sleep(0.01)
985                await asyncio.sleep(0.01)
986                DONE += 1
987            DONE += 1000
988
989        async def run():
990            gen = foo()
991            it = gen.__aiter__()
992            await it.__anext__()
993            await gen.aclose()
994
995        self.loop.run_until_complete(run())
996        self.assertEqual(DONE, 1)
997
998    def test_async_gen_asyncio_aclose_08(self):
999        DONE = 0
1000
1001        fut = asyncio.Future(loop=self.loop)
1002
1003        async def foo():
1004            nonlocal DONE
1005            try:
1006                yield 1
1007                await fut
1008                DONE += 1000
1009                yield 2
1010            finally:
1011                await asyncio.sleep(0.01)
1012                await asyncio.sleep(0.01)
1013                DONE += 1
1014            DONE += 1000
1015
1016        async def run():
1017            gen = foo()
1018            it = gen.__aiter__()
1019            self.assertEqual(await it.__anext__(), 1)
1020            await gen.aclose()
1021
1022        self.loop.run_until_complete(run())
1023        self.assertEqual(DONE, 1)
1024
1025        # Silence ResourceWarnings
1026        fut.cancel()
1027        self.loop.run_until_complete(asyncio.sleep(0.01))
1028
1029    def test_async_gen_asyncio_gc_aclose_09(self):
1030        DONE = 0
1031
1032        async def gen():
1033            nonlocal DONE
1034            try:
1035                while True:
1036                    yield 1
1037            finally:
1038                await asyncio.sleep(0.01)
1039                await asyncio.sleep(0.01)
1040                DONE = 1
1041
1042        async def run():
1043            g = gen()
1044            await g.__anext__()
1045            await g.__anext__()
1046            del g
1047            gc_collect()  # For PyPy or other GCs.
1048
1049            await asyncio.sleep(0.1)
1050
1051        self.loop.run_until_complete(run())
1052        self.assertEqual(DONE, 1)
1053
1054    def test_async_gen_asyncio_aclose_10(self):
1055        DONE = 0
1056
1057        # test synchronous generators
1058        def foo():
1059            try:
1060                yield
1061            except:
1062                pass
1063        g = foo()
1064        g.send(None)
1065        g.close()
1066
1067        # now with asynchronous generators
1068
1069        async def gen():
1070            nonlocal DONE
1071            try:
1072                yield
1073            except:
1074                pass
1075            DONE = 1
1076
1077        async def run():
1078            nonlocal DONE
1079            g = gen()
1080            await g.asend(None)
1081            await g.aclose()
1082            DONE += 10
1083
1084        self.loop.run_until_complete(run())
1085        self.assertEqual(DONE, 11)
1086
1087    def test_async_gen_asyncio_aclose_11(self):
1088        DONE = 0
1089
1090        # test synchronous generators
1091        def foo():
1092            try:
1093                yield
1094            except:
1095                pass
1096            yield
1097        g = foo()
1098        g.send(None)
1099        with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'):
1100            g.close()
1101
1102        # now with asynchronous generators
1103
1104        async def gen():
1105            nonlocal DONE
1106            try:
1107                yield
1108            except:
1109                pass
1110            yield
1111            DONE += 1
1112
1113        async def run():
1114            nonlocal DONE
1115            g = gen()
1116            await g.asend(None)
1117            with self.assertRaisesRegex(RuntimeError, 'ignored GeneratorExit'):
1118                await g.aclose()
1119            DONE += 10
1120
1121        self.loop.run_until_complete(run())
1122        self.assertEqual(DONE, 10)
1123
1124    def test_async_gen_asyncio_aclose_12(self):
1125        DONE = 0
1126
1127        async def target():
1128            await asyncio.sleep(0.01)
1129            1 / 0
1130
1131        async def foo():
1132            nonlocal DONE
1133            task = asyncio.create_task(target())
1134            try:
1135                yield 1
1136            finally:
1137                try:
1138                    await task
1139                except ZeroDivisionError:
1140                    DONE = 1
1141
1142        async def run():
1143            gen = foo()
1144            it = gen.__aiter__()
1145            await it.__anext__()
1146            await gen.aclose()
1147
1148        self.loop.run_until_complete(run())
1149        self.assertEqual(DONE, 1)
1150
1151    def test_async_gen_asyncio_asend_01(self):
1152        DONE = 0
1153
1154        # Sanity check:
1155        def sgen():
1156            v = yield 1
1157            yield v * 2
1158        sg = sgen()
1159        v = sg.send(None)
1160        self.assertEqual(v, 1)
1161        v = sg.send(100)
1162        self.assertEqual(v, 200)
1163
1164        async def gen():
1165            nonlocal DONE
1166            try:
1167                await asyncio.sleep(0.01)
1168                v = yield 1
1169                await asyncio.sleep(0.01)
1170                yield v * 2
1171                await asyncio.sleep(0.01)
1172                return
1173            finally:
1174                await asyncio.sleep(0.01)
1175                await asyncio.sleep(0.01)
1176                DONE = 1
1177
1178        async def run():
1179            g = gen()
1180
1181            v = await g.asend(None)
1182            self.assertEqual(v, 1)
1183
1184            v = await g.asend(100)
1185            self.assertEqual(v, 200)
1186
1187            with self.assertRaises(StopAsyncIteration):
1188                await g.asend(None)
1189
1190        self.loop.run_until_complete(run())
1191        self.assertEqual(DONE, 1)
1192
1193    def test_async_gen_asyncio_asend_02(self):
1194        DONE = 0
1195
1196        async def sleep_n_crash(delay):
1197            await asyncio.sleep(delay)
1198            1 / 0
1199
1200        async def gen():
1201            nonlocal DONE
1202            try:
1203                await asyncio.sleep(0.01)
1204                v = yield 1
1205                await sleep_n_crash(0.01)
1206                DONE += 1000
1207                yield v * 2
1208            finally:
1209                await asyncio.sleep(0.01)
1210                await asyncio.sleep(0.01)
1211                DONE = 1
1212
1213        async def run():
1214            g = gen()
1215
1216            v = await g.asend(None)
1217            self.assertEqual(v, 1)
1218
1219            await g.asend(100)
1220
1221        with self.assertRaises(ZeroDivisionError):
1222            self.loop.run_until_complete(run())
1223        self.assertEqual(DONE, 1)
1224
1225    def test_async_gen_asyncio_asend_03(self):
1226        DONE = 0
1227
1228        async def sleep_n_crash(delay):
1229            fut = asyncio.ensure_future(asyncio.sleep(delay),
1230                                        loop=self.loop)
1231            self.loop.call_later(delay / 2, lambda: fut.cancel())
1232            return await fut
1233
1234        async def gen():
1235            nonlocal DONE
1236            try:
1237                await asyncio.sleep(0.01)
1238                v = yield 1
1239                await sleep_n_crash(0.01)
1240                DONE += 1000
1241                yield v * 2
1242            finally:
1243                await asyncio.sleep(0.01)
1244                await asyncio.sleep(0.01)
1245                DONE = 1
1246
1247        async def run():
1248            g = gen()
1249
1250            v = await g.asend(None)
1251            self.assertEqual(v, 1)
1252
1253            await g.asend(100)
1254
1255        with self.assertRaises(asyncio.CancelledError):
1256            self.loop.run_until_complete(run())
1257        self.assertEqual(DONE, 1)
1258
1259    def test_async_gen_asyncio_athrow_01(self):
1260        DONE = 0
1261
1262        class FooEr(Exception):
1263            pass
1264
1265        # Sanity check:
1266        def sgen():
1267            try:
1268                v = yield 1
1269            except FooEr:
1270                v = 1000
1271            yield v * 2
1272        sg = sgen()
1273        v = sg.send(None)
1274        self.assertEqual(v, 1)
1275        v = sg.throw(FooEr)
1276        self.assertEqual(v, 2000)
1277        with self.assertRaises(StopIteration):
1278            sg.send(None)
1279
1280        async def gen():
1281            nonlocal DONE
1282            try:
1283                await asyncio.sleep(0.01)
1284                try:
1285                    v = yield 1
1286                except FooEr:
1287                    v = 1000
1288                    await asyncio.sleep(0.01)
1289                yield v * 2
1290                await asyncio.sleep(0.01)
1291                # return
1292            finally:
1293                await asyncio.sleep(0.01)
1294                await asyncio.sleep(0.01)
1295                DONE = 1
1296
1297        async def run():
1298            g = gen()
1299
1300            v = await g.asend(None)
1301            self.assertEqual(v, 1)
1302
1303            v = await g.athrow(FooEr)
1304            self.assertEqual(v, 2000)
1305
1306            with self.assertRaises(StopAsyncIteration):
1307                await g.asend(None)
1308
1309        self.loop.run_until_complete(run())
1310        self.assertEqual(DONE, 1)
1311
1312    def test_async_gen_asyncio_athrow_02(self):
1313        DONE = 0
1314
1315        class FooEr(Exception):
1316            pass
1317
1318        async def sleep_n_crash(delay):
1319            fut = asyncio.ensure_future(asyncio.sleep(delay),
1320                                        loop=self.loop)
1321            self.loop.call_later(delay / 2, lambda: fut.cancel())
1322            return await fut
1323
1324        async def gen():
1325            nonlocal DONE
1326            try:
1327                await asyncio.sleep(0.01)
1328                try:
1329                    v = yield 1
1330                except FooEr:
1331                    await sleep_n_crash(0.01)
1332                yield v * 2
1333                await asyncio.sleep(0.01)
1334                # return
1335            finally:
1336                await asyncio.sleep(0.01)
1337                await asyncio.sleep(0.01)
1338                DONE = 1
1339
1340        async def run():
1341            g = gen()
1342
1343            v = await g.asend(None)
1344            self.assertEqual(v, 1)
1345
1346            try:
1347                await g.athrow(FooEr)
1348            except asyncio.CancelledError:
1349                self.assertEqual(DONE, 1)
1350                raise
1351            else:
1352                self.fail('CancelledError was not raised')
1353
1354        with self.assertRaises(asyncio.CancelledError):
1355            self.loop.run_until_complete(run())
1356        self.assertEqual(DONE, 1)
1357
1358    def test_async_gen_asyncio_athrow_03(self):
1359        DONE = 0
1360
1361        # test synchronous generators
1362        def foo():
1363            try:
1364                yield
1365            except:
1366                pass
1367        g = foo()
1368        g.send(None)
1369        with self.assertRaises(StopIteration):
1370            g.throw(ValueError)
1371
1372        # now with asynchronous generators
1373
1374        async def gen():
1375            nonlocal DONE
1376            try:
1377                yield
1378            except:
1379                pass
1380            DONE = 1
1381
1382        async def run():
1383            nonlocal DONE
1384            g = gen()
1385            await g.asend(None)
1386            with self.assertRaises(StopAsyncIteration):
1387                await g.athrow(ValueError)
1388            DONE += 10
1389
1390        self.loop.run_until_complete(run())
1391        self.assertEqual(DONE, 11)
1392
1393    def test_async_gen_asyncio_athrow_tuple(self):
1394        async def gen():
1395            try:
1396                yield 1
1397            except ZeroDivisionError:
1398                yield (2,)
1399
1400        async def run():
1401            g = gen()
1402            v = await g.asend(None)
1403            self.assertEqual(v, 1)
1404            v = await g.athrow(ZeroDivisionError)
1405            self.assertEqual(v, (2,))
1406            with self.assertRaises(StopAsyncIteration):
1407                await g.asend(None)
1408
1409        self.loop.run_until_complete(run())
1410
1411    def test_async_gen_asyncio_athrow_stopiteration(self):
1412        async def gen():
1413            try:
1414                yield 1
1415            except ZeroDivisionError:
1416                yield StopIteration(2)
1417
1418        async def run():
1419            g = gen()
1420            v = await g.asend(None)
1421            self.assertEqual(v, 1)
1422            v = await g.athrow(ZeroDivisionError)
1423            self.assertIsInstance(v, StopIteration)
1424            self.assertEqual(v.value, 2)
1425            with self.assertRaises(StopAsyncIteration):
1426                await g.asend(None)
1427
1428        self.loop.run_until_complete(run())
1429
1430    def test_async_gen_asyncio_shutdown_01(self):
1431        finalized = 0
1432
1433        async def waiter(timeout):
1434            nonlocal finalized
1435            try:
1436                await asyncio.sleep(timeout)
1437                yield 1
1438            finally:
1439                await asyncio.sleep(0)
1440                finalized += 1
1441
1442        async def wait():
1443            async for _ in waiter(1):
1444                pass
1445
1446        t1 = self.loop.create_task(wait())
1447        t2 = self.loop.create_task(wait())
1448
1449        self.loop.run_until_complete(asyncio.sleep(0.1))
1450
1451        # Silence warnings
1452        t1.cancel()
1453        t2.cancel()
1454
1455        with self.assertRaises(asyncio.CancelledError):
1456            self.loop.run_until_complete(t1)
1457        with self.assertRaises(asyncio.CancelledError):
1458            self.loop.run_until_complete(t2)
1459
1460        self.loop.run_until_complete(self.loop.shutdown_asyncgens())
1461
1462        self.assertEqual(finalized, 2)
1463
1464    def test_async_gen_asyncio_shutdown_02(self):
1465        messages = []
1466
1467        def exception_handler(loop, context):
1468            messages.append(context)
1469
1470        async def async_iterate():
1471            yield 1
1472            yield 2
1473
1474        it = async_iterate()
1475        async def main():
1476            loop = asyncio.get_running_loop()
1477            loop.set_exception_handler(exception_handler)
1478
1479            async for i in it:
1480                break
1481
1482        asyncio.run(main())
1483
1484        self.assertEqual(messages, [])
1485
1486    def test_async_gen_asyncio_shutdown_exception_01(self):
1487        messages = []
1488
1489        def exception_handler(loop, context):
1490            messages.append(context)
1491
1492        async def async_iterate():
1493            try:
1494                yield 1
1495                yield 2
1496            finally:
1497                1/0
1498
1499        it = async_iterate()
1500        async def main():
1501            loop = asyncio.get_running_loop()
1502            loop.set_exception_handler(exception_handler)
1503
1504            async for i in it:
1505                break
1506
1507        asyncio.run(main())
1508
1509        message, = messages
1510        self.assertEqual(message['asyncgen'], it)
1511        self.assertIsInstance(message['exception'], ZeroDivisionError)
1512        self.assertIn('an error occurred during closing of asynchronous generator',
1513                      message['message'])
1514
1515    def test_async_gen_asyncio_shutdown_exception_02(self):
1516        messages = []
1517
1518        def exception_handler(loop, context):
1519            messages.append(context)
1520
1521        async def async_iterate():
1522            try:
1523                yield 1
1524                yield 2
1525            finally:
1526                1/0
1527
1528        async def main():
1529            loop = asyncio.get_running_loop()
1530            loop.set_exception_handler(exception_handler)
1531
1532            async for i in async_iterate():
1533                break
1534            gc_collect()
1535
1536        asyncio.run(main())
1537
1538        message, = messages
1539        self.assertIsInstance(message['exception'], ZeroDivisionError)
1540        self.assertIn('unhandled exception during asyncio.run() shutdown',
1541                      message['message'])
1542
1543    def test_async_gen_expression_01(self):
1544        async def arange(n):
1545            for i in range(n):
1546                await asyncio.sleep(0.01)
1547                yield i
1548
1549        def make_arange(n):
1550            # This syntax is legal starting with Python 3.7
1551            return (i * 2 async for i in arange(n))
1552
1553        async def run():
1554            return [i async for i in make_arange(10)]
1555
1556        res = self.loop.run_until_complete(run())
1557        self.assertEqual(res, [i * 2 for i in range(10)])
1558
1559    def test_async_gen_expression_02(self):
1560        async def wrap(n):
1561            await asyncio.sleep(0.01)
1562            return n
1563
1564        def make_arange(n):
1565            # This syntax is legal starting with Python 3.7
1566            return (i * 2 for i in range(n) if await wrap(i))
1567
1568        async def run():
1569            return [i async for i in make_arange(10)]
1570
1571        res = self.loop.run_until_complete(run())
1572        self.assertEqual(res, [i * 2 for i in range(1, 10)])
1573
1574    def test_asyncgen_nonstarted_hooks_are_cancellable(self):
1575        # See https://bugs.python.org/issue38013
1576        messages = []
1577
1578        def exception_handler(loop, context):
1579            messages.append(context)
1580
1581        async def async_iterate():
1582            yield 1
1583            yield 2
1584
1585        async def main():
1586            loop = asyncio.get_running_loop()
1587            loop.set_exception_handler(exception_handler)
1588
1589            async for i in async_iterate():
1590                break
1591
1592        asyncio.run(main())
1593
1594        self.assertEqual([], messages)
1595
1596    def test_async_gen_await_same_anext_coro_twice(self):
1597        async def async_iterate():
1598            yield 1
1599            yield 2
1600
1601        async def run():
1602            it = async_iterate()
1603            nxt = it.__anext__()
1604            await nxt
1605            with self.assertRaisesRegex(
1606                    RuntimeError,
1607                    r"cannot reuse already awaited __anext__\(\)/asend\(\)"
1608            ):
1609                await nxt
1610
1611            await it.aclose()  # prevent unfinished iterator warning
1612
1613        self.loop.run_until_complete(run())
1614
1615    def test_async_gen_await_same_aclose_coro_twice(self):
1616        async def async_iterate():
1617            yield 1
1618            yield 2
1619
1620        async def run():
1621            it = async_iterate()
1622            nxt = it.aclose()
1623            await nxt
1624            with self.assertRaisesRegex(
1625                    RuntimeError,
1626                    r"cannot reuse already awaited aclose\(\)/athrow\(\)"
1627            ):
1628                await nxt
1629
1630        self.loop.run_until_complete(run())
1631
1632    def test_async_gen_aclose_twice_with_different_coros(self):
1633        # Regression test for https://bugs.python.org/issue39606
1634        async def async_iterate():
1635            yield 1
1636            yield 2
1637
1638        async def run():
1639            it = async_iterate()
1640            await it.aclose()
1641            await it.aclose()
1642
1643        self.loop.run_until_complete(run())
1644
1645    def test_async_gen_aclose_after_exhaustion(self):
1646        # Regression test for https://bugs.python.org/issue39606
1647        async def async_iterate():
1648            yield 1
1649            yield 2
1650
1651        async def run():
1652            it = async_iterate()
1653            async for _ in it:
1654                pass
1655            await it.aclose()
1656
1657        self.loop.run_until_complete(run())
1658
1659    def test_async_gen_aclose_compatible_with_get_stack(self):
1660        async def async_generator():
1661            yield object()
1662
1663        async def run():
1664            ag = async_generator()
1665            asyncio.create_task(ag.aclose())
1666            tasks = asyncio.all_tasks()
1667            for task in tasks:
1668                # No AttributeError raised
1669                task.get_stack()
1670
1671        self.loop.run_until_complete(run())
1672
1673
1674if __name__ == "__main__":
1675    unittest.main()
1676