1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import traceback
14import types
15import unittest
16import weakref
17from unittest import mock
18
19import asyncio
20from asyncio import coroutines
21from asyncio import futures
22from asyncio import tasks
23from test.test_asyncio import utils as test_utils
24from test import support
25from test.support.script_helper import assert_python_ok
26
27
28def tearDownModule():
29    asyncio.set_event_loop_policy(None)
30
31
32async def coroutine_function():
33    pass
34
35
36@contextlib.contextmanager
37def set_coroutine_debug(enabled):
38    coroutines = asyncio.coroutines
39
40    old_debug = coroutines._DEBUG
41    try:
42        coroutines._DEBUG = enabled
43        yield
44    finally:
45        coroutines._DEBUG = old_debug
46
47
48def format_coroutine(qualname, state, src, source_traceback, generator=False):
49    if generator:
50        state = '%s' % state
51    else:
52        state = '%s, defined' % state
53    if source_traceback is not None:
54        frame = source_traceback[-1]
55        return ('coro=<%s() %s at %s> created at %s:%s'
56                % (qualname, state, src, frame[0], frame[1]))
57    else:
58        return 'coro=<%s() %s at %s>' % (qualname, state, src)
59
60
61def get_innermost_context(exc):
62    """
63    Return information about the innermost exception context in the chain.
64    """
65    depth = 0
66    while True:
67        context = exc.__context__
68        if context is None:
69            break
70
71        exc = context
72        depth += 1
73
74    return (type(exc), exc.args, depth)
75
76
77class Dummy:
78
79    def __repr__(self):
80        return '<Dummy>'
81
82    def __call__(self, *args):
83        pass
84
85
86class CoroLikeObject:
87    def send(self, v):
88        raise StopIteration(42)
89
90    def throw(self, *exc):
91        pass
92
93    def close(self):
94        pass
95
96    def __await__(self):
97        return self
98
99
100# The following value can be used as a very small timeout:
101# it passes check "timeout > 0", but has almost
102# no effect on the test performance
103_EPSILON = 0.0001
104
105
106class BaseTaskTests:
107
108    Task = None
109    Future = None
110
111    def new_task(self, loop, coro, name='TestTask'):
112        return self.__class__.Task(coro, loop=loop, name=name)
113
114    def new_future(self, loop):
115        return self.__class__.Future(loop=loop)
116
117    def setUp(self):
118        super().setUp()
119        self.loop = self.new_test_loop()
120        self.loop.set_task_factory(self.new_task)
121        self.loop.create_future = lambda: self.new_future(self.loop)
122
123    def test_task_cancel_message_getter(self):
124        async def coro():
125            pass
126        t = self.new_task(self.loop, coro())
127        self.assertTrue(hasattr(t, '_cancel_message'))
128        self.assertEqual(t._cancel_message, None)
129
130        t.cancel('my message')
131        self.assertEqual(t._cancel_message, 'my message')
132
133        with self.assertRaises(asyncio.CancelledError):
134            self.loop.run_until_complete(t)
135
136    def test_task_cancel_message_setter(self):
137        async def coro():
138            pass
139        t = self.new_task(self.loop, coro())
140        t.cancel('my message')
141        t._cancel_message = 'my new message'
142        self.assertEqual(t._cancel_message, 'my new message')
143
144        with self.assertRaises(asyncio.CancelledError):
145            self.loop.run_until_complete(t)
146
147    def test_task_del_collect(self):
148        class Evil:
149            def __del__(self):
150                gc.collect()
151
152        async def run():
153            return Evil()
154
155        self.loop.run_until_complete(
156            asyncio.gather(*[
157                self.new_task(self.loop, run()) for _ in range(100)
158            ]))
159
160    def test_other_loop_future(self):
161        other_loop = asyncio.new_event_loop()
162        fut = self.new_future(other_loop)
163
164        async def run(fut):
165            await fut
166
167        try:
168            with self.assertRaisesRegex(RuntimeError,
169                                        r'Task .* got Future .* attached'):
170                self.loop.run_until_complete(run(fut))
171        finally:
172            other_loop.close()
173
174    def test_task_awaits_on_itself(self):
175
176        async def test():
177            await task
178
179        task = asyncio.ensure_future(test(), loop=self.loop)
180
181        with self.assertRaisesRegex(RuntimeError,
182                                    'Task cannot await on itself'):
183            self.loop.run_until_complete(task)
184
185    def test_task_class(self):
186        async def notmuch():
187            return 'ok'
188        t = self.new_task(self.loop, notmuch())
189        self.loop.run_until_complete(t)
190        self.assertTrue(t.done())
191        self.assertEqual(t.result(), 'ok')
192        self.assertIs(t._loop, self.loop)
193        self.assertIs(t.get_loop(), self.loop)
194
195        loop = asyncio.new_event_loop()
196        self.set_event_loop(loop)
197        t = self.new_task(loop, notmuch())
198        self.assertIs(t._loop, loop)
199        loop.run_until_complete(t)
200        loop.close()
201
202    def test_ensure_future_coroutine(self):
203        with self.assertWarns(DeprecationWarning):
204            @asyncio.coroutine
205            def notmuch():
206                return 'ok'
207        t = asyncio.ensure_future(notmuch(), loop=self.loop)
208        self.loop.run_until_complete(t)
209        self.assertTrue(t.done())
210        self.assertEqual(t.result(), 'ok')
211        self.assertIs(t._loop, self.loop)
212
213        loop = asyncio.new_event_loop()
214        self.set_event_loop(loop)
215        t = asyncio.ensure_future(notmuch(), loop=loop)
216        self.assertIs(t._loop, loop)
217        loop.run_until_complete(t)
218        loop.close()
219
220    def test_ensure_future_future(self):
221        f_orig = self.new_future(self.loop)
222        f_orig.set_result('ko')
223
224        f = asyncio.ensure_future(f_orig)
225        self.loop.run_until_complete(f)
226        self.assertTrue(f.done())
227        self.assertEqual(f.result(), 'ko')
228        self.assertIs(f, f_orig)
229
230        loop = asyncio.new_event_loop()
231        self.set_event_loop(loop)
232
233        with self.assertRaises(ValueError):
234            f = asyncio.ensure_future(f_orig, loop=loop)
235
236        loop.close()
237
238        f = asyncio.ensure_future(f_orig, loop=self.loop)
239        self.assertIs(f, f_orig)
240
241    def test_ensure_future_task(self):
242        async def notmuch():
243            return 'ok'
244        t_orig = self.new_task(self.loop, notmuch())
245        t = asyncio.ensure_future(t_orig)
246        self.loop.run_until_complete(t)
247        self.assertTrue(t.done())
248        self.assertEqual(t.result(), 'ok')
249        self.assertIs(t, t_orig)
250
251        loop = asyncio.new_event_loop()
252        self.set_event_loop(loop)
253
254        with self.assertRaises(ValueError):
255            t = asyncio.ensure_future(t_orig, loop=loop)
256
257        loop.close()
258
259        t = asyncio.ensure_future(t_orig, loop=self.loop)
260        self.assertIs(t, t_orig)
261
262    def test_ensure_future_awaitable(self):
263        class Aw:
264            def __init__(self, coro):
265                self.coro = coro
266            def __await__(self):
267                return (yield from self.coro)
268
269        with self.assertWarns(DeprecationWarning):
270            @asyncio.coroutine
271            def coro():
272                return 'ok'
273
274        loop = asyncio.new_event_loop()
275        self.set_event_loop(loop)
276        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
277        loop.run_until_complete(fut)
278        assert fut.result() == 'ok'
279
280    def test_ensure_future_neither(self):
281        with self.assertRaises(TypeError):
282            asyncio.ensure_future('ok')
283
284    def test_ensure_future_error_msg(self):
285        loop = asyncio.new_event_loop()
286        f = self.new_future(self.loop)
287        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
288                                    'different loop than the one specified as '
289                                    'the loop argument'):
290            asyncio.ensure_future(f, loop=loop)
291        loop.close()
292
293    def test_get_stack(self):
294        T = None
295
296        async def foo():
297            await bar()
298
299        async def bar():
300            # test get_stack()
301            f = T.get_stack(limit=1)
302            try:
303                self.assertEqual(f[0].f_code.co_name, 'foo')
304            finally:
305                f = None
306
307            # test print_stack()
308            file = io.StringIO()
309            T.print_stack(limit=1, file=file)
310            file.seek(0)
311            tb = file.read()
312            self.assertRegex(tb, r'foo\(\) running')
313
314        async def runner():
315            nonlocal T
316            T = asyncio.ensure_future(foo(), loop=self.loop)
317            await T
318
319        self.loop.run_until_complete(runner())
320
321    def test_task_repr(self):
322        self.loop.set_debug(False)
323
324        async def notmuch():
325            return 'abc'
326
327        # test coroutine function
328        self.assertEqual(notmuch.__name__, 'notmuch')
329        self.assertRegex(notmuch.__qualname__,
330                         r'\w+.test_task_repr.<locals>.notmuch')
331        self.assertEqual(notmuch.__module__, __name__)
332
333        filename, lineno = test_utils.get_function_source(notmuch)
334        src = "%s:%s" % (filename, lineno)
335
336        # test coroutine object
337        gen = notmuch()
338        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
339        self.assertEqual(gen.__name__, 'notmuch')
340        self.assertEqual(gen.__qualname__, coro_qualname)
341
342        # test pending Task
343        t = self.new_task(self.loop, gen)
344        t.add_done_callback(Dummy())
345
346        coro = format_coroutine(coro_qualname, 'running', src,
347                                t._source_traceback, generator=True)
348        self.assertEqual(repr(t),
349                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
350
351        # test cancelling Task
352        t.cancel()  # Does not take immediate effect!
353        self.assertEqual(repr(t),
354                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
355
356        # test cancelled Task
357        self.assertRaises(asyncio.CancelledError,
358                          self.loop.run_until_complete, t)
359        coro = format_coroutine(coro_qualname, 'done', src,
360                                t._source_traceback)
361        self.assertEqual(repr(t),
362                         "<Task cancelled name='TestTask' %s>" % coro)
363
364        # test finished Task
365        t = self.new_task(self.loop, notmuch())
366        self.loop.run_until_complete(t)
367        coro = format_coroutine(coro_qualname, 'done', src,
368                                t._source_traceback)
369        self.assertEqual(repr(t),
370                         "<Task finished name='TestTask' %s result='abc'>" % coro)
371
372    def test_task_repr_autogenerated(self):
373        async def notmuch():
374            return 123
375
376        t1 = self.new_task(self.loop, notmuch(), None)
377        t2 = self.new_task(self.loop, notmuch(), None)
378        self.assertNotEqual(repr(t1), repr(t2))
379
380        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
381        self.assertIsNotNone(match1)
382        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
383        self.assertIsNotNone(match2)
384
385        # Autogenerated task names should have monotonically increasing numbers
386        self.assertLess(int(match1.group(1)), int(match2.group(1)))
387        self.loop.run_until_complete(t1)
388        self.loop.run_until_complete(t2)
389
390    def test_task_repr_name_not_str(self):
391        async def notmuch():
392            return 123
393
394        t = self.new_task(self.loop, notmuch())
395        t.set_name({6})
396        self.assertEqual(t.get_name(), '{6}')
397        self.loop.run_until_complete(t)
398
399    def test_task_repr_coro_decorator(self):
400        self.loop.set_debug(False)
401
402        with self.assertWarns(DeprecationWarning):
403            @asyncio.coroutine
404            def notmuch():
405                # notmuch() function doesn't use yield from: it will be wrapped by
406                # @coroutine decorator
407                return 123
408
409        # test coroutine function
410        self.assertEqual(notmuch.__name__, 'notmuch')
411        self.assertRegex(notmuch.__qualname__,
412                         r'\w+.test_task_repr_coro_decorator'
413                         r'\.<locals>\.notmuch')
414        self.assertEqual(notmuch.__module__, __name__)
415
416        # test coroutine object
417        gen = notmuch()
418        # On Python >= 3.5, generators now inherit the name of the
419        # function, as expected, and have a qualified name (__qualname__
420        # attribute).
421        coro_name = 'notmuch'
422        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
423                         '.<locals>.notmuch')
424        self.assertEqual(gen.__name__, coro_name)
425        self.assertEqual(gen.__qualname__, coro_qualname)
426
427        # test repr(CoroWrapper)
428        if coroutines._DEBUG:
429            # format the coroutine object
430            if coroutines._DEBUG:
431                filename, lineno = test_utils.get_function_source(notmuch)
432                frame = gen._source_traceback[-1]
433                coro = ('%s() running, defined at %s:%s, created at %s:%s'
434                        % (coro_qualname, filename, lineno,
435                           frame[0], frame[1]))
436            else:
437                code = gen.gi_code
438                coro = ('%s() running at %s:%s'
439                        % (coro_qualname, code.co_filename,
440                           code.co_firstlineno))
441
442            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
443
444        # test pending Task
445        t = self.new_task(self.loop, gen)
446        t.add_done_callback(Dummy())
447
448        # format the coroutine object
449        if coroutines._DEBUG:
450            src = '%s:%s' % test_utils.get_function_source(notmuch)
451        else:
452            code = gen.gi_code
453            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
454        coro = format_coroutine(coro_qualname, 'running', src,
455                                t._source_traceback,
456                                generator=not coroutines._DEBUG)
457        self.assertEqual(repr(t),
458                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
459        self.loop.run_until_complete(t)
460
461    def test_task_repr_wait_for(self):
462        self.loop.set_debug(False)
463
464        async def wait_for(fut):
465            return await fut
466
467        fut = self.new_future(self.loop)
468        task = self.new_task(self.loop, wait_for(fut))
469        test_utils.run_briefly(self.loop)
470        self.assertRegex(repr(task),
471                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
472
473        fut.set_result(None)
474        self.loop.run_until_complete(task)
475
476    def test_task_repr_partial_corowrapper(self):
477        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
478        # coroutine is a partial function
479        with set_coroutine_debug(True):
480            self.loop.set_debug(True)
481
482            async def func(x, y):
483                await asyncio.sleep(0)
484
485            with self.assertWarns(DeprecationWarning):
486                partial_func = asyncio.coroutine(functools.partial(func, 1))
487            task = self.loop.create_task(partial_func(2))
488
489            # make warnings quiet
490            task._log_destroy_pending = False
491            self.addCleanup(task._coro.close)
492
493        coro_repr = repr(task._coro)
494        expected = (
495            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
496            r'\.<locals>\.func at'
497        )
498        self.assertRegex(coro_repr, expected)
499
500    def test_task_basics(self):
501
502        async def outer():
503            a = await inner1()
504            b = await inner2()
505            return a+b
506
507        async def inner1():
508            return 42
509
510        async def inner2():
511            return 1000
512
513        t = outer()
514        self.assertEqual(self.loop.run_until_complete(t), 1042)
515
516    def test_exception_chaining_after_await(self):
517        # Test that when awaiting on a task when an exception is already
518        # active, if the task raises an exception it will be chained
519        # with the original.
520        loop = asyncio.new_event_loop()
521        self.set_event_loop(loop)
522
523        async def raise_error():
524            raise ValueError
525
526        async def run():
527            try:
528                raise KeyError(3)
529            except Exception as exc:
530                task = self.new_task(loop, raise_error())
531                try:
532                    await task
533                except Exception as exc:
534                    self.assertEqual(type(exc), ValueError)
535                    chained = exc.__context__
536                    self.assertEqual((type(chained), chained.args),
537                        (KeyError, (3,)))
538
539        try:
540            task = self.new_task(loop, run())
541            loop.run_until_complete(task)
542        finally:
543            loop.close()
544
545    def test_exception_chaining_after_await_with_context_cycle(self):
546        # Check trying to create an exception context cycle:
547        # https://bugs.python.org/issue40696
548        has_cycle = None
549        loop = asyncio.new_event_loop()
550        self.set_event_loop(loop)
551
552        async def process_exc(exc):
553            raise exc
554
555        async def run():
556            nonlocal has_cycle
557            try:
558                raise KeyError('a')
559            except Exception as exc:
560                task = self.new_task(loop, process_exc(exc))
561                try:
562                    await task
563                except BaseException as exc:
564                    has_cycle = (exc is exc.__context__)
565                    # Prevent a hang if has_cycle is True.
566                    exc.__context__ = None
567
568        try:
569            task = self.new_task(loop, run())
570            loop.run_until_complete(task)
571        finally:
572            loop.close()
573        # This also distinguishes from the initial has_cycle=None.
574        self.assertEqual(has_cycle, False)
575
576    def test_cancel(self):
577
578        def gen():
579            when = yield
580            self.assertAlmostEqual(10.0, when)
581            yield 0
582
583        loop = self.new_test_loop(gen)
584
585        async def task():
586            await asyncio.sleep(10.0)
587            return 12
588
589        t = self.new_task(loop, task())
590        loop.call_soon(t.cancel)
591        with self.assertRaises(asyncio.CancelledError):
592            loop.run_until_complete(t)
593        self.assertTrue(t.done())
594        self.assertTrue(t.cancelled())
595        self.assertFalse(t.cancel())
596
597    def test_cancel_with_message_then_future_result(self):
598        # Test Future.result() after calling cancel() with a message.
599        cases = [
600            ((), ()),
601            ((None,), ()),
602            (('my message',), ('my message',)),
603            # Non-string values should roundtrip.
604            ((5,), (5,)),
605        ]
606        for cancel_args, expected_args in cases:
607            with self.subTest(cancel_args=cancel_args):
608                loop = asyncio.new_event_loop()
609                self.set_event_loop(loop)
610
611                async def sleep():
612                    await asyncio.sleep(10)
613
614                async def coro():
615                    task = self.new_task(loop, sleep())
616                    await asyncio.sleep(0)
617                    task.cancel(*cancel_args)
618                    done, pending = await asyncio.wait([task])
619                    task.result()
620
621                task = self.new_task(loop, coro())
622                with self.assertRaises(asyncio.CancelledError) as cm:
623                    loop.run_until_complete(task)
624                exc = cm.exception
625                self.assertEqual(exc.args, ())
626
627                actual = get_innermost_context(exc)
628                self.assertEqual(actual,
629                    (asyncio.CancelledError, expected_args, 2))
630
631    def test_cancel_with_message_then_future_exception(self):
632        # Test Future.exception() after calling cancel() with a message.
633        cases = [
634            ((), ()),
635            ((None,), ()),
636            (('my message',), ('my message',)),
637            # Non-string values should roundtrip.
638            ((5,), (5,)),
639        ]
640        for cancel_args, expected_args in cases:
641            with self.subTest(cancel_args=cancel_args):
642                loop = asyncio.new_event_loop()
643                self.set_event_loop(loop)
644
645                async def sleep():
646                    await asyncio.sleep(10)
647
648                async def coro():
649                    task = self.new_task(loop, sleep())
650                    await asyncio.sleep(0)
651                    task.cancel(*cancel_args)
652                    done, pending = await asyncio.wait([task])
653                    task.exception()
654
655                task = self.new_task(loop, coro())
656                with self.assertRaises(asyncio.CancelledError) as cm:
657                    loop.run_until_complete(task)
658                exc = cm.exception
659                self.assertEqual(exc.args, ())
660
661                actual = get_innermost_context(exc)
662                self.assertEqual(actual,
663                    (asyncio.CancelledError, expected_args, 2))
664
665    def test_cancel_with_message_before_starting_task(self):
666        loop = asyncio.new_event_loop()
667        self.set_event_loop(loop)
668
669        async def sleep():
670            await asyncio.sleep(10)
671
672        async def coro():
673            task = self.new_task(loop, sleep())
674            # We deliberately leave out the sleep here.
675            task.cancel('my message')
676            done, pending = await asyncio.wait([task])
677            task.exception()
678
679        task = self.new_task(loop, coro())
680        with self.assertRaises(asyncio.CancelledError) as cm:
681            loop.run_until_complete(task)
682        exc = cm.exception
683        self.assertEqual(exc.args, ())
684
685        actual = get_innermost_context(exc)
686        self.assertEqual(actual,
687            (asyncio.CancelledError, ('my message',), 2))
688
689    def test_cancel_yield(self):
690        with self.assertWarns(DeprecationWarning):
691            @asyncio.coroutine
692            def task():
693                yield
694                yield
695                return 12
696
697        t = self.new_task(self.loop, task())
698        test_utils.run_briefly(self.loop)  # start coro
699        t.cancel()
700        self.assertRaises(
701            asyncio.CancelledError, self.loop.run_until_complete, t)
702        self.assertTrue(t.done())
703        self.assertTrue(t.cancelled())
704        self.assertFalse(t.cancel())
705
706    def test_cancel_inner_future(self):
707        f = self.new_future(self.loop)
708
709        async def task():
710            await f
711            return 12
712
713        t = self.new_task(self.loop, task())
714        test_utils.run_briefly(self.loop)  # start task
715        f.cancel()
716        with self.assertRaises(asyncio.CancelledError):
717            self.loop.run_until_complete(t)
718        self.assertTrue(f.cancelled())
719        self.assertTrue(t.cancelled())
720
721    def test_cancel_both_task_and_inner_future(self):
722        f = self.new_future(self.loop)
723
724        async def task():
725            await f
726            return 12
727
728        t = self.new_task(self.loop, task())
729        test_utils.run_briefly(self.loop)
730
731        f.cancel()
732        t.cancel()
733
734        with self.assertRaises(asyncio.CancelledError):
735            self.loop.run_until_complete(t)
736
737        self.assertTrue(t.done())
738        self.assertTrue(f.cancelled())
739        self.assertTrue(t.cancelled())
740
741    def test_cancel_task_catching(self):
742        fut1 = self.new_future(self.loop)
743        fut2 = self.new_future(self.loop)
744
745        async def task():
746            await fut1
747            try:
748                await fut2
749            except asyncio.CancelledError:
750                return 42
751
752        t = self.new_task(self.loop, task())
753        test_utils.run_briefly(self.loop)
754        self.assertIs(t._fut_waiter, fut1)  # White-box test.
755        fut1.set_result(None)
756        test_utils.run_briefly(self.loop)
757        self.assertIs(t._fut_waiter, fut2)  # White-box test.
758        t.cancel()
759        self.assertTrue(fut2.cancelled())
760        res = self.loop.run_until_complete(t)
761        self.assertEqual(res, 42)
762        self.assertFalse(t.cancelled())
763
764    def test_cancel_task_ignoring(self):
765        fut1 = self.new_future(self.loop)
766        fut2 = self.new_future(self.loop)
767        fut3 = self.new_future(self.loop)
768
769        async def task():
770            await fut1
771            try:
772                await fut2
773            except asyncio.CancelledError:
774                pass
775            res = await fut3
776            return res
777
778        t = self.new_task(self.loop, task())
779        test_utils.run_briefly(self.loop)
780        self.assertIs(t._fut_waiter, fut1)  # White-box test.
781        fut1.set_result(None)
782        test_utils.run_briefly(self.loop)
783        self.assertIs(t._fut_waiter, fut2)  # White-box test.
784        t.cancel()
785        self.assertTrue(fut2.cancelled())
786        test_utils.run_briefly(self.loop)
787        self.assertIs(t._fut_waiter, fut3)  # White-box test.
788        fut3.set_result(42)
789        res = self.loop.run_until_complete(t)
790        self.assertEqual(res, 42)
791        self.assertFalse(fut3.cancelled())
792        self.assertFalse(t.cancelled())
793
794    def test_cancel_current_task(self):
795        loop = asyncio.new_event_loop()
796        self.set_event_loop(loop)
797
798        async def task():
799            t.cancel()
800            self.assertTrue(t._must_cancel)  # White-box test.
801            # The sleep should be cancelled immediately.
802            await asyncio.sleep(100)
803            return 12
804
805        t = self.new_task(loop, task())
806        self.assertFalse(t.cancelled())
807        self.assertRaises(
808            asyncio.CancelledError, loop.run_until_complete, t)
809        self.assertTrue(t.done())
810        self.assertTrue(t.cancelled())
811        self.assertFalse(t._must_cancel)  # White-box test.
812        self.assertFalse(t.cancel())
813
814    def test_cancel_at_end(self):
815        """coroutine end right after task is cancelled"""
816        loop = asyncio.new_event_loop()
817        self.set_event_loop(loop)
818
819        async def task():
820            t.cancel()
821            self.assertTrue(t._must_cancel)  # White-box test.
822            return 12
823
824        t = self.new_task(loop, task())
825        self.assertFalse(t.cancelled())
826        self.assertRaises(
827            asyncio.CancelledError, loop.run_until_complete, t)
828        self.assertTrue(t.done())
829        self.assertTrue(t.cancelled())
830        self.assertFalse(t._must_cancel)  # White-box test.
831        self.assertFalse(t.cancel())
832
833    def test_cancel_awaited_task(self):
834        # This tests for a relatively rare condition when
835        # a task cancellation is requested for a task which is not
836        # currently blocked, such as a task cancelling itself.
837        # In this situation we must ensure that whatever next future
838        # or task the cancelled task blocks on is cancelled correctly
839        # as well.  See also bpo-34872.
840        loop = asyncio.new_event_loop()
841        self.addCleanup(lambda: loop.close())
842
843        task = nested_task = None
844        fut = self.new_future(loop)
845
846        async def nested():
847            await fut
848
849        async def coro():
850            nonlocal nested_task
851            # Create a sub-task and wait for it to run.
852            nested_task = self.new_task(loop, nested())
853            await asyncio.sleep(0)
854
855            # Request the current task to be cancelled.
856            task.cancel()
857            # Block on the nested task, which should be immediately
858            # cancelled.
859            await nested_task
860
861        task = self.new_task(loop, coro())
862        with self.assertRaises(asyncio.CancelledError):
863            loop.run_until_complete(task)
864
865        self.assertTrue(task.cancelled())
866        self.assertTrue(nested_task.cancelled())
867        self.assertTrue(fut.cancelled())
868
869    def assert_text_contains(self, text, substr):
870        if substr not in text:
871            raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<')
872
873    def test_cancel_traceback_for_future_result(self):
874        # When calling Future.result() on a cancelled task, check that the
875        # line of code that was interrupted is included in the traceback.
876        loop = asyncio.new_event_loop()
877        self.set_event_loop(loop)
878
879        async def nested():
880            # This will get cancelled immediately.
881            await asyncio.sleep(10)
882
883        async def coro():
884            task = self.new_task(loop, nested())
885            await asyncio.sleep(0)
886            task.cancel()
887            await task  # search target
888
889        task = self.new_task(loop, coro())
890        try:
891            loop.run_until_complete(task)
892        except asyncio.CancelledError:
893            tb = traceback.format_exc()
894            self.assert_text_contains(tb, "await asyncio.sleep(10)")
895            # The intermediate await should also be included.
896            self.assert_text_contains(tb, "await task  # search target")
897        else:
898            self.fail('CancelledError did not occur')
899
900    def test_cancel_traceback_for_future_exception(self):
901        # When calling Future.exception() on a cancelled task, check that the
902        # line of code that was interrupted is included in the traceback.
903        loop = asyncio.new_event_loop()
904        self.set_event_loop(loop)
905
906        async def nested():
907            # This will get cancelled immediately.
908            await asyncio.sleep(10)
909
910        async def coro():
911            task = self.new_task(loop, nested())
912            await asyncio.sleep(0)
913            task.cancel()
914            done, pending = await asyncio.wait([task])
915            task.exception()  # search target
916
917        task = self.new_task(loop, coro())
918        try:
919            loop.run_until_complete(task)
920        except asyncio.CancelledError:
921            tb = traceback.format_exc()
922            self.assert_text_contains(tb, "await asyncio.sleep(10)")
923            # The intermediate await should also be included.
924            self.assert_text_contains(tb,
925                "task.exception()  # search target")
926        else:
927            self.fail('CancelledError did not occur')
928
929    def test_stop_while_run_in_complete(self):
930
931        def gen():
932            when = yield
933            self.assertAlmostEqual(0.1, when)
934            when = yield 0.1
935            self.assertAlmostEqual(0.2, when)
936            when = yield 0.1
937            self.assertAlmostEqual(0.3, when)
938            yield 0.1
939
940        loop = self.new_test_loop(gen)
941
942        x = 0
943
944        async def task():
945            nonlocal x
946            while x < 10:
947                await asyncio.sleep(0.1)
948                x += 1
949                if x == 2:
950                    loop.stop()
951
952        t = self.new_task(loop, task())
953        with self.assertRaises(RuntimeError) as cm:
954            loop.run_until_complete(t)
955        self.assertEqual(str(cm.exception),
956                         'Event loop stopped before Future completed.')
957        self.assertFalse(t.done())
958        self.assertEqual(x, 2)
959        self.assertAlmostEqual(0.3, loop.time())
960
961        t.cancel()
962        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
963
964    def test_log_traceback(self):
965        async def coro():
966            pass
967
968        task = self.new_task(self.loop, coro())
969        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
970            task._log_traceback = True
971        self.loop.run_until_complete(task)
972
973    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
974        def gen():
975            when = yield
976            self.assertAlmostEqual(0, when)
977
978        loop = self.new_test_loop(gen)
979
980        fut = self.new_future(loop)
981        fut.set_result('done')
982
983        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
984
985        self.assertEqual(ret, 'done')
986        self.assertTrue(fut.done())
987        self.assertAlmostEqual(0, loop.time())
988
989    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
990        def gen():
991            when = yield
992            self.assertAlmostEqual(0, when)
993
994        loop = self.new_test_loop(gen)
995
996        foo_started = False
997
998        async def foo():
999            nonlocal foo_started
1000            foo_started = True
1001
1002        with self.assertRaises(asyncio.TimeoutError):
1003            loop.run_until_complete(asyncio.wait_for(foo(), 0))
1004
1005        self.assertAlmostEqual(0, loop.time())
1006        self.assertEqual(foo_started, False)
1007
1008    def test_wait_for_timeout_less_then_0_or_0(self):
1009        def gen():
1010            when = yield
1011            self.assertAlmostEqual(0.2, when)
1012            when = yield 0
1013            self.assertAlmostEqual(0, when)
1014
1015        for timeout in [0, -1]:
1016            with self.subTest(timeout=timeout):
1017                loop = self.new_test_loop(gen)
1018
1019                foo_running = None
1020
1021                async def foo():
1022                    nonlocal foo_running
1023                    foo_running = True
1024                    try:
1025                        await asyncio.sleep(0.2)
1026                    finally:
1027                        foo_running = False
1028                    return 'done'
1029
1030                fut = self.new_task(loop, foo())
1031
1032                with self.assertRaises(asyncio.TimeoutError):
1033                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
1034                self.assertTrue(fut.done())
1035                # it should have been cancelled due to the timeout
1036                self.assertTrue(fut.cancelled())
1037                self.assertAlmostEqual(0, loop.time())
1038                self.assertEqual(foo_running, False)
1039
1040    def test_wait_for(self):
1041
1042        def gen():
1043            when = yield
1044            self.assertAlmostEqual(0.2, when)
1045            when = yield 0
1046            self.assertAlmostEqual(0.1, when)
1047            when = yield 0.1
1048
1049        loop = self.new_test_loop(gen)
1050
1051        foo_running = None
1052
1053        async def foo():
1054            nonlocal foo_running
1055            foo_running = True
1056            try:
1057                await asyncio.sleep(0.2)
1058            finally:
1059                foo_running = False
1060            return 'done'
1061
1062        fut = self.new_task(loop, foo())
1063
1064        with self.assertRaises(asyncio.TimeoutError):
1065            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
1066        self.assertTrue(fut.done())
1067        # it should have been cancelled due to the timeout
1068        self.assertTrue(fut.cancelled())
1069        self.assertAlmostEqual(0.1, loop.time())
1070        self.assertEqual(foo_running, False)
1071
1072    def test_wait_for_blocking(self):
1073        loop = self.new_test_loop()
1074
1075        async def coro():
1076            return 'done'
1077
1078        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
1079        self.assertEqual(res, 'done')
1080
1081    def test_wait_for_with_global_loop(self):
1082
1083        def gen():
1084            when = yield
1085            self.assertAlmostEqual(0.2, when)
1086            when = yield 0
1087            self.assertAlmostEqual(0.01, when)
1088            yield 0.01
1089
1090        loop = self.new_test_loop(gen)
1091
1092        async def foo():
1093            await asyncio.sleep(0.2)
1094            return 'done'
1095
1096        asyncio.set_event_loop(loop)
1097        try:
1098            fut = self.new_task(loop, foo())
1099            with self.assertRaises(asyncio.TimeoutError):
1100                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
1101        finally:
1102            asyncio.set_event_loop(None)
1103
1104        self.assertAlmostEqual(0.01, loop.time())
1105        self.assertTrue(fut.done())
1106        self.assertTrue(fut.cancelled())
1107
1108    def test_wait_for_race_condition(self):
1109
1110        def gen():
1111            yield 0.1
1112            yield 0.1
1113            yield 0.1
1114
1115        loop = self.new_test_loop(gen)
1116
1117        fut = self.new_future(loop)
1118        task = asyncio.wait_for(fut, timeout=0.2)
1119        loop.call_later(0.1, fut.set_result, "ok")
1120        res = loop.run_until_complete(task)
1121        self.assertEqual(res, "ok")
1122
1123    def test_wait_for_cancellation_race_condition(self):
1124        def gen():
1125            yield 0.1
1126            yield 0.1
1127            yield 0.1
1128            yield 0.1
1129
1130        loop = self.new_test_loop(gen)
1131
1132        fut = self.new_future(loop)
1133        loop.call_later(0.1, fut.set_result, "ok")
1134        task = loop.create_task(asyncio.wait_for(fut, timeout=1))
1135        loop.call_later(0.1, task.cancel)
1136        res = loop.run_until_complete(task)
1137        self.assertEqual(res, "ok")
1138
1139    def test_wait_for_waits_for_task_cancellation(self):
1140        loop = asyncio.new_event_loop()
1141        self.addCleanup(loop.close)
1142
1143        task_done = False
1144
1145        async def foo():
1146            async def inner():
1147                nonlocal task_done
1148                try:
1149                    await asyncio.sleep(0.2)
1150                except asyncio.CancelledError:
1151                    await asyncio.sleep(_EPSILON)
1152                    raise
1153                finally:
1154                    task_done = True
1155
1156            inner_task = self.new_task(loop, inner())
1157
1158            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1159
1160        with self.assertRaises(asyncio.TimeoutError) as cm:
1161            loop.run_until_complete(foo())
1162
1163        self.assertTrue(task_done)
1164        chained = cm.exception.__context__
1165        self.assertEqual(type(chained), asyncio.CancelledError)
1166
1167    def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
1168        loop = asyncio.new_event_loop()
1169        self.addCleanup(loop.close)
1170
1171        task_done = False
1172
1173        async def foo():
1174            async def inner():
1175                nonlocal task_done
1176                try:
1177                    await asyncio.sleep(10)
1178                except asyncio.CancelledError:
1179                    await asyncio.sleep(_EPSILON)
1180                    raise
1181                finally:
1182                    task_done = True
1183
1184            inner_task = self.new_task(loop, inner())
1185            await asyncio.sleep(_EPSILON)
1186            await asyncio.wait_for(inner_task, timeout=0)
1187
1188        with self.assertRaises(asyncio.TimeoutError) as cm:
1189            loop.run_until_complete(foo())
1190
1191        self.assertTrue(task_done)
1192        chained = cm.exception.__context__
1193        self.assertEqual(type(chained), asyncio.CancelledError)
1194
1195    def test_wait_for_reraises_exception_during_cancellation(self):
1196        loop = asyncio.new_event_loop()
1197        self.addCleanup(loop.close)
1198
1199        class FooException(Exception):
1200            pass
1201
1202        async def foo():
1203            async def inner():
1204                try:
1205                    await asyncio.sleep(0.2)
1206                finally:
1207                    raise FooException
1208
1209            inner_task = self.new_task(loop, inner())
1210
1211            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1212
1213        with self.assertRaises(FooException):
1214            loop.run_until_complete(foo())
1215
1216    def test_wait_for_raises_timeout_error_if_returned_during_cancellation(self):
1217        loop = asyncio.new_event_loop()
1218        self.addCleanup(loop.close)
1219
1220        async def foo():
1221            async def inner():
1222                try:
1223                    await asyncio.sleep(0.2)
1224                except asyncio.CancelledError:
1225                    return 42
1226
1227            inner_task = self.new_task(loop, inner())
1228
1229            await asyncio.wait_for(inner_task, timeout=_EPSILON)
1230
1231        with self.assertRaises(asyncio.TimeoutError):
1232            loop.run_until_complete(foo())
1233
1234    def test_wait_for_self_cancellation(self):
1235        loop = asyncio.new_event_loop()
1236        self.addCleanup(loop.close)
1237
1238        async def foo():
1239            async def inner():
1240                try:
1241                    await asyncio.sleep(0.3)
1242                except asyncio.CancelledError:
1243                    try:
1244                        await asyncio.sleep(0.3)
1245                    except asyncio.CancelledError:
1246                        await asyncio.sleep(0.3)
1247
1248                return 42
1249
1250            inner_task = self.new_task(loop, inner())
1251
1252            wait = asyncio.wait_for(inner_task, timeout=0.1)
1253
1254            # Test that wait_for itself is properly cancellable
1255            # even when the initial task holds up the initial cancellation.
1256            task = self.new_task(loop, wait)
1257            await asyncio.sleep(0.2)
1258            task.cancel()
1259
1260            with self.assertRaises(asyncio.CancelledError):
1261                await task
1262
1263            self.assertEqual(await inner_task, 42)
1264
1265        loop.run_until_complete(foo())
1266
1267    def test_wait(self):
1268
1269        def gen():
1270            when = yield
1271            self.assertAlmostEqual(0.1, when)
1272            when = yield 0
1273            self.assertAlmostEqual(0.15, when)
1274            yield 0.15
1275
1276        loop = self.new_test_loop(gen)
1277
1278        a = self.new_task(loop, asyncio.sleep(0.1))
1279        b = self.new_task(loop, asyncio.sleep(0.15))
1280
1281        async def foo():
1282            done, pending = await asyncio.wait([b, a])
1283            self.assertEqual(done, set([a, b]))
1284            self.assertEqual(pending, set())
1285            return 42
1286
1287        res = loop.run_until_complete(self.new_task(loop, foo()))
1288        self.assertEqual(res, 42)
1289        self.assertAlmostEqual(0.15, loop.time())
1290
1291        # Doing it again should take no time and exercise a different path.
1292        res = loop.run_until_complete(self.new_task(loop, foo()))
1293        self.assertAlmostEqual(0.15, loop.time())
1294        self.assertEqual(res, 42)
1295
1296    def test_wait_with_global_loop(self):
1297
1298        def gen():
1299            when = yield
1300            self.assertAlmostEqual(0.01, when)
1301            when = yield 0
1302            self.assertAlmostEqual(0.015, when)
1303            yield 0.015
1304
1305        loop = self.new_test_loop(gen)
1306
1307        a = self.new_task(loop, asyncio.sleep(0.01))
1308        b = self.new_task(loop, asyncio.sleep(0.015))
1309
1310        async def foo():
1311            done, pending = await asyncio.wait([b, a])
1312            self.assertEqual(done, set([a, b]))
1313            self.assertEqual(pending, set())
1314            return 42
1315
1316        asyncio.set_event_loop(loop)
1317        res = loop.run_until_complete(
1318            self.new_task(loop, foo()))
1319
1320        self.assertEqual(res, 42)
1321
1322    def test_wait_duplicate_coroutines(self):
1323
1324        with self.assertWarns(DeprecationWarning):
1325            @asyncio.coroutine
1326            def coro(s):
1327                return s
1328        c = coro('test')
1329        task = self.new_task(
1330            self.loop,
1331            asyncio.wait([c, c, coro('spam')]))
1332
1333        with self.assertWarns(DeprecationWarning):
1334            done, pending = self.loop.run_until_complete(task)
1335
1336        self.assertFalse(pending)
1337        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1338
1339    def test_wait_errors(self):
1340        self.assertRaises(
1341            ValueError, self.loop.run_until_complete,
1342            asyncio.wait(set()))
1343
1344        # -1 is an invalid return_when value
1345        sleep_coro = asyncio.sleep(10.0)
1346        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1347        self.assertRaises(ValueError,
1348                          self.loop.run_until_complete, wait_coro)
1349
1350        sleep_coro.close()
1351
1352    def test_wait_first_completed(self):
1353
1354        def gen():
1355            when = yield
1356            self.assertAlmostEqual(10.0, when)
1357            when = yield 0
1358            self.assertAlmostEqual(0.1, when)
1359            yield 0.1
1360
1361        loop = self.new_test_loop(gen)
1362
1363        a = self.new_task(loop, asyncio.sleep(10.0))
1364        b = self.new_task(loop, asyncio.sleep(0.1))
1365        task = self.new_task(
1366            loop,
1367            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1368
1369        done, pending = loop.run_until_complete(task)
1370        self.assertEqual({b}, done)
1371        self.assertEqual({a}, pending)
1372        self.assertFalse(a.done())
1373        self.assertTrue(b.done())
1374        self.assertIsNone(b.result())
1375        self.assertAlmostEqual(0.1, loop.time())
1376
1377        # move forward to close generator
1378        loop.advance_time(10)
1379        loop.run_until_complete(asyncio.wait([a, b]))
1380
1381    def test_wait_really_done(self):
1382        # there is possibility that some tasks in the pending list
1383        # became done but their callbacks haven't all been called yet
1384
1385        async def coro1():
1386            await asyncio.sleep(0)
1387
1388        async def coro2():
1389            await asyncio.sleep(0)
1390            await asyncio.sleep(0)
1391
1392        a = self.new_task(self.loop, coro1())
1393        b = self.new_task(self.loop, coro2())
1394        task = self.new_task(
1395            self.loop,
1396            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1397
1398        done, pending = self.loop.run_until_complete(task)
1399        self.assertEqual({a, b}, done)
1400        self.assertTrue(a.done())
1401        self.assertIsNone(a.result())
1402        self.assertTrue(b.done())
1403        self.assertIsNone(b.result())
1404
1405    def test_wait_first_exception(self):
1406
1407        def gen():
1408            when = yield
1409            self.assertAlmostEqual(10.0, when)
1410            yield 0
1411
1412        loop = self.new_test_loop(gen)
1413
1414        # first_exception, task already has exception
1415        a = self.new_task(loop, asyncio.sleep(10.0))
1416
1417        async def exc():
1418            raise ZeroDivisionError('err')
1419
1420        b = self.new_task(loop, exc())
1421        task = self.new_task(
1422            loop,
1423            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1424
1425        done, pending = loop.run_until_complete(task)
1426        self.assertEqual({b}, done)
1427        self.assertEqual({a}, pending)
1428        self.assertAlmostEqual(0, loop.time())
1429
1430        # move forward to close generator
1431        loop.advance_time(10)
1432        loop.run_until_complete(asyncio.wait([a, b]))
1433
1434    def test_wait_first_exception_in_wait(self):
1435
1436        def gen():
1437            when = yield
1438            self.assertAlmostEqual(10.0, when)
1439            when = yield 0
1440            self.assertAlmostEqual(0.01, when)
1441            yield 0.01
1442
1443        loop = self.new_test_loop(gen)
1444
1445        # first_exception, exception during waiting
1446        a = self.new_task(loop, asyncio.sleep(10.0))
1447
1448        async def exc():
1449            await asyncio.sleep(0.01)
1450            raise ZeroDivisionError('err')
1451
1452        b = self.new_task(loop, exc())
1453        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1454
1455        done, pending = loop.run_until_complete(task)
1456        self.assertEqual({b}, done)
1457        self.assertEqual({a}, pending)
1458        self.assertAlmostEqual(0.01, loop.time())
1459
1460        # move forward to close generator
1461        loop.advance_time(10)
1462        loop.run_until_complete(asyncio.wait([a, b]))
1463
1464    def test_wait_with_exception(self):
1465
1466        def gen():
1467            when = yield
1468            self.assertAlmostEqual(0.1, when)
1469            when = yield 0
1470            self.assertAlmostEqual(0.15, when)
1471            yield 0.15
1472
1473        loop = self.new_test_loop(gen)
1474
1475        a = self.new_task(loop, asyncio.sleep(0.1))
1476
1477        async def sleeper():
1478            await asyncio.sleep(0.15)
1479            raise ZeroDivisionError('really')
1480
1481        b = self.new_task(loop, sleeper())
1482
1483        async def foo():
1484            done, pending = await asyncio.wait([b, a])
1485            self.assertEqual(len(done), 2)
1486            self.assertEqual(pending, set())
1487            errors = set(f for f in done if f.exception() is not None)
1488            self.assertEqual(len(errors), 1)
1489
1490        loop.run_until_complete(self.new_task(loop, foo()))
1491        self.assertAlmostEqual(0.15, loop.time())
1492
1493        loop.run_until_complete(self.new_task(loop, foo()))
1494        self.assertAlmostEqual(0.15, loop.time())
1495
1496    def test_wait_with_timeout(self):
1497
1498        def gen():
1499            when = yield
1500            self.assertAlmostEqual(0.1, when)
1501            when = yield 0
1502            self.assertAlmostEqual(0.15, when)
1503            when = yield 0
1504            self.assertAlmostEqual(0.11, when)
1505            yield 0.11
1506
1507        loop = self.new_test_loop(gen)
1508
1509        a = self.new_task(loop, asyncio.sleep(0.1))
1510        b = self.new_task(loop, asyncio.sleep(0.15))
1511
1512        async def foo():
1513            done, pending = await asyncio.wait([b, a], timeout=0.11)
1514            self.assertEqual(done, set([a]))
1515            self.assertEqual(pending, set([b]))
1516
1517        loop.run_until_complete(self.new_task(loop, foo()))
1518        self.assertAlmostEqual(0.11, loop.time())
1519
1520        # move forward to close generator
1521        loop.advance_time(10)
1522        loop.run_until_complete(asyncio.wait([a, b]))
1523
1524    def test_wait_concurrent_complete(self):
1525
1526        def gen():
1527            when = yield
1528            self.assertAlmostEqual(0.1, when)
1529            when = yield 0
1530            self.assertAlmostEqual(0.15, when)
1531            when = yield 0
1532            self.assertAlmostEqual(0.1, when)
1533            yield 0.1
1534
1535        loop = self.new_test_loop(gen)
1536
1537        a = self.new_task(loop, asyncio.sleep(0.1))
1538        b = self.new_task(loop, asyncio.sleep(0.15))
1539
1540        done, pending = loop.run_until_complete(
1541            asyncio.wait([b, a], timeout=0.1))
1542
1543        self.assertEqual(done, set([a]))
1544        self.assertEqual(pending, set([b]))
1545        self.assertAlmostEqual(0.1, loop.time())
1546
1547        # move forward to close generator
1548        loop.advance_time(10)
1549        loop.run_until_complete(asyncio.wait([a, b]))
1550
1551    def test_wait_with_iterator_of_tasks(self):
1552
1553        def gen():
1554            when = yield
1555            self.assertAlmostEqual(0.1, when)
1556            when = yield 0
1557            self.assertAlmostEqual(0.15, when)
1558            yield 0.15
1559
1560        loop = self.new_test_loop(gen)
1561
1562        a = self.new_task(loop, asyncio.sleep(0.1))
1563        b = self.new_task(loop, asyncio.sleep(0.15))
1564
1565        async def foo():
1566            done, pending = await asyncio.wait(iter([b, a]))
1567            self.assertEqual(done, set([a, b]))
1568            self.assertEqual(pending, set())
1569            return 42
1570
1571        res = loop.run_until_complete(self.new_task(loop, foo()))
1572        self.assertEqual(res, 42)
1573        self.assertAlmostEqual(0.15, loop.time())
1574
1575    def test_as_completed(self):
1576
1577        def gen():
1578            yield 0
1579            yield 0
1580            yield 0.01
1581            yield 0
1582
1583        loop = self.new_test_loop(gen)
1584        # disable "slow callback" warning
1585        loop.slow_callback_duration = 1.0
1586        completed = set()
1587        time_shifted = False
1588
1589        with self.assertWarns(DeprecationWarning):
1590            @asyncio.coroutine
1591            def sleeper(dt, x):
1592                nonlocal time_shifted
1593                yield from  asyncio.sleep(dt)
1594                completed.add(x)
1595                if not time_shifted and 'a' in completed and 'b' in completed:
1596                    time_shifted = True
1597                    loop.advance_time(0.14)
1598                return x
1599
1600        a = sleeper(0.01, 'a')
1601        b = sleeper(0.01, 'b')
1602        c = sleeper(0.15, 'c')
1603
1604        async def foo():
1605            values = []
1606            for f in asyncio.as_completed([b, c, a], loop=loop):
1607                values.append(await f)
1608            return values
1609        with self.assertWarns(DeprecationWarning) as w:
1610            res = loop.run_until_complete(self.new_task(loop, foo()))
1611        self.assertEqual(w.warnings[0].filename, __file__)
1612        self.assertAlmostEqual(0.15, loop.time())
1613        self.assertTrue('a' in res[:2])
1614        self.assertTrue('b' in res[:2])
1615        self.assertEqual(res[2], 'c')
1616
1617        # Doing it again should take no time and exercise a different path.
1618        with self.assertWarns(DeprecationWarning):
1619            res = loop.run_until_complete(self.new_task(loop, foo()))
1620        self.assertAlmostEqual(0.15, loop.time())
1621
1622    def test_as_completed_with_timeout(self):
1623
1624        def gen():
1625            yield
1626            yield 0
1627            yield 0
1628            yield 0.1
1629
1630        loop = self.new_test_loop(gen)
1631
1632        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1633        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1634
1635        async def foo():
1636            values = []
1637            for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
1638                if values:
1639                    loop.advance_time(0.02)
1640                try:
1641                    v = await f
1642                    values.append((1, v))
1643                except asyncio.TimeoutError as exc:
1644                    values.append((2, exc))
1645            return values
1646
1647        with self.assertWarns(DeprecationWarning):
1648            res = loop.run_until_complete(self.new_task(loop, foo()))
1649        self.assertEqual(len(res), 2, res)
1650        self.assertEqual(res[0], (1, 'a'))
1651        self.assertEqual(res[1][0], 2)
1652        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1653        self.assertAlmostEqual(0.12, loop.time())
1654
1655        # move forward to close generator
1656        loop.advance_time(10)
1657        loop.run_until_complete(asyncio.wait([a, b]))
1658
1659    def test_as_completed_with_unused_timeout(self):
1660
1661        def gen():
1662            yield
1663            yield 0
1664            yield 0.01
1665
1666        loop = self.new_test_loop(gen)
1667
1668        a = asyncio.sleep(0.01, 'a')
1669
1670        async def foo():
1671            for f in asyncio.as_completed([a], timeout=1, loop=loop):
1672                v = await f
1673                self.assertEqual(v, 'a')
1674
1675        with self.assertWarns(DeprecationWarning):
1676            loop.run_until_complete(self.new_task(loop, foo()))
1677
1678    def test_as_completed_reverse_wait(self):
1679
1680        def gen():
1681            yield 0
1682            yield 0.05
1683            yield 0
1684
1685        loop = self.new_test_loop(gen)
1686
1687        a = asyncio.sleep(0.05, 'a')
1688        b = asyncio.sleep(0.10, 'b')
1689        fs = {a, b}
1690
1691        with self.assertWarns(DeprecationWarning):
1692            futs = list(asyncio.as_completed(fs, loop=loop))
1693        self.assertEqual(len(futs), 2)
1694
1695        x = loop.run_until_complete(futs[1])
1696        self.assertEqual(x, 'a')
1697        self.assertAlmostEqual(0.05, loop.time())
1698        loop.advance_time(0.05)
1699        y = loop.run_until_complete(futs[0])
1700        self.assertEqual(y, 'b')
1701        self.assertAlmostEqual(0.10, loop.time())
1702
1703    def test_as_completed_concurrent(self):
1704
1705        def gen():
1706            when = yield
1707            self.assertAlmostEqual(0.05, when)
1708            when = yield 0
1709            self.assertAlmostEqual(0.05, when)
1710            yield 0.05
1711
1712        loop = self.new_test_loop(gen)
1713
1714        a = asyncio.sleep(0.05, 'a')
1715        b = asyncio.sleep(0.05, 'b')
1716        fs = {a, b}
1717        with self.assertWarns(DeprecationWarning):
1718            futs = list(asyncio.as_completed(fs, loop=loop))
1719        self.assertEqual(len(futs), 2)
1720        waiter = asyncio.wait(futs)
1721        # Deprecation from passing coros in futs to asyncio.wait()
1722        with self.assertWarns(DeprecationWarning):
1723            done, pending = loop.run_until_complete(waiter)
1724        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1725
1726    def test_as_completed_duplicate_coroutines(self):
1727
1728        with self.assertWarns(DeprecationWarning):
1729            @asyncio.coroutine
1730            def coro(s):
1731                return s
1732
1733        with self.assertWarns(DeprecationWarning):
1734            @asyncio.coroutine
1735            def runner():
1736                result = []
1737                c = coro('ham')
1738                for f in asyncio.as_completed([c, c, coro('spam')],
1739                                              loop=self.loop):
1740                    result.append((yield from f))
1741                return result
1742
1743        with self.assertWarns(DeprecationWarning):
1744            fut = self.new_task(self.loop, runner())
1745            self.loop.run_until_complete(fut)
1746        result = fut.result()
1747        self.assertEqual(set(result), {'ham', 'spam'})
1748        self.assertEqual(len(result), 2)
1749
1750    def test_sleep(self):
1751
1752        def gen():
1753            when = yield
1754            self.assertAlmostEqual(0.05, when)
1755            when = yield 0.05
1756            self.assertAlmostEqual(0.1, when)
1757            yield 0.05
1758
1759        loop = self.new_test_loop(gen)
1760
1761        async def sleeper(dt, arg):
1762            await asyncio.sleep(dt/2)
1763            res = await asyncio.sleep(dt/2, arg)
1764            return res
1765
1766        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1767        loop.run_until_complete(t)
1768        self.assertTrue(t.done())
1769        self.assertEqual(t.result(), 'yeah')
1770        self.assertAlmostEqual(0.1, loop.time())
1771
1772    def test_sleep_cancel(self):
1773
1774        def gen():
1775            when = yield
1776            self.assertAlmostEqual(10.0, when)
1777            yield 0
1778
1779        loop = self.new_test_loop(gen)
1780
1781        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1782
1783        handle = None
1784        orig_call_later = loop.call_later
1785
1786        def call_later(delay, callback, *args):
1787            nonlocal handle
1788            handle = orig_call_later(delay, callback, *args)
1789            return handle
1790
1791        loop.call_later = call_later
1792        test_utils.run_briefly(loop)
1793
1794        self.assertFalse(handle._cancelled)
1795
1796        t.cancel()
1797        test_utils.run_briefly(loop)
1798        self.assertTrue(handle._cancelled)
1799
1800    def test_task_cancel_sleeping_task(self):
1801
1802        def gen():
1803            when = yield
1804            self.assertAlmostEqual(0.1, when)
1805            when = yield 0
1806            self.assertAlmostEqual(5000, when)
1807            yield 0.1
1808
1809        loop = self.new_test_loop(gen)
1810
1811        async def sleep(dt):
1812            await asyncio.sleep(dt)
1813
1814        async def doit():
1815            sleeper = self.new_task(loop, sleep(5000))
1816            loop.call_later(0.1, sleeper.cancel)
1817            try:
1818                await sleeper
1819            except asyncio.CancelledError:
1820                return 'cancelled'
1821            else:
1822                return 'slept in'
1823
1824        doer = doit()
1825        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1826        self.assertAlmostEqual(0.1, loop.time())
1827
1828    def test_task_cancel_waiter_future(self):
1829        fut = self.new_future(self.loop)
1830
1831        async def coro():
1832            await fut
1833
1834        task = self.new_task(self.loop, coro())
1835        test_utils.run_briefly(self.loop)
1836        self.assertIs(task._fut_waiter, fut)
1837
1838        task.cancel()
1839        test_utils.run_briefly(self.loop)
1840        self.assertRaises(
1841            asyncio.CancelledError, self.loop.run_until_complete, task)
1842        self.assertIsNone(task._fut_waiter)
1843        self.assertTrue(fut.cancelled())
1844
1845    def test_task_set_methods(self):
1846        async def notmuch():
1847            return 'ko'
1848
1849        gen = notmuch()
1850        task = self.new_task(self.loop, gen)
1851
1852        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1853            task.set_result('ok')
1854
1855        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1856            task.set_exception(ValueError())
1857
1858        self.assertEqual(
1859            self.loop.run_until_complete(task),
1860            'ko')
1861
1862    def test_step_result(self):
1863        with self.assertWarns(DeprecationWarning):
1864            @asyncio.coroutine
1865            def notmuch():
1866                yield None
1867                yield 1
1868                return 'ko'
1869
1870        self.assertRaises(
1871            RuntimeError, self.loop.run_until_complete, notmuch())
1872
1873    def test_step_result_future(self):
1874        # If coroutine returns future, task waits on this future.
1875
1876        class Fut(asyncio.Future):
1877            def __init__(self, *args, **kwds):
1878                self.cb_added = False
1879                super().__init__(*args, **kwds)
1880
1881            def add_done_callback(self, *args, **kwargs):
1882                self.cb_added = True
1883                super().add_done_callback(*args, **kwargs)
1884
1885        fut = Fut(loop=self.loop)
1886        result = None
1887
1888        async def wait_for_future():
1889            nonlocal result
1890            result = await fut
1891
1892        t = self.new_task(self.loop, wait_for_future())
1893        test_utils.run_briefly(self.loop)
1894        self.assertTrue(fut.cb_added)
1895
1896        res = object()
1897        fut.set_result(res)
1898        test_utils.run_briefly(self.loop)
1899        self.assertIs(res, result)
1900        self.assertTrue(t.done())
1901        self.assertIsNone(t.result())
1902
1903    def test_baseexception_during_cancel(self):
1904
1905        def gen():
1906            when = yield
1907            self.assertAlmostEqual(10.0, when)
1908            yield 0
1909
1910        loop = self.new_test_loop(gen)
1911
1912        async def sleeper():
1913            await asyncio.sleep(10)
1914
1915        base_exc = SystemExit()
1916
1917        async def notmutch():
1918            try:
1919                await sleeper()
1920            except asyncio.CancelledError:
1921                raise base_exc
1922
1923        task = self.new_task(loop, notmutch())
1924        test_utils.run_briefly(loop)
1925
1926        task.cancel()
1927        self.assertFalse(task.done())
1928
1929        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1930
1931        self.assertTrue(task.done())
1932        self.assertFalse(task.cancelled())
1933        self.assertIs(task.exception(), base_exc)
1934
1935    def test_iscoroutinefunction(self):
1936        def fn():
1937            pass
1938
1939        self.assertFalse(asyncio.iscoroutinefunction(fn))
1940
1941        def fn1():
1942            yield
1943        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1944
1945        with self.assertWarns(DeprecationWarning):
1946            @asyncio.coroutine
1947            def fn2():
1948                yield
1949        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1950
1951        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1952
1953    def test_yield_vs_yield_from(self):
1954        fut = self.new_future(self.loop)
1955
1956        with self.assertWarns(DeprecationWarning):
1957            @asyncio.coroutine
1958            def wait_for_future():
1959                yield fut
1960
1961        task = wait_for_future()
1962        with self.assertRaises(RuntimeError):
1963            self.loop.run_until_complete(task)
1964
1965        self.assertFalse(fut.done())
1966
1967    def test_yield_vs_yield_from_generator(self):
1968        with self.assertWarns(DeprecationWarning):
1969            @asyncio.coroutine
1970            def coro():
1971                yield
1972
1973        with self.assertWarns(DeprecationWarning):
1974            @asyncio.coroutine
1975            def wait_for_future():
1976                gen = coro()
1977                try:
1978                    yield gen
1979                finally:
1980                    gen.close()
1981
1982        task = wait_for_future()
1983        self.assertRaises(
1984            RuntimeError,
1985            self.loop.run_until_complete, task)
1986
1987    def test_coroutine_non_gen_function(self):
1988        with self.assertWarns(DeprecationWarning):
1989            @asyncio.coroutine
1990            def func():
1991                return 'test'
1992
1993        self.assertTrue(asyncio.iscoroutinefunction(func))
1994
1995        coro = func()
1996        self.assertTrue(asyncio.iscoroutine(coro))
1997
1998        res = self.loop.run_until_complete(coro)
1999        self.assertEqual(res, 'test')
2000
2001    def test_coroutine_non_gen_function_return_future(self):
2002        fut = self.new_future(self.loop)
2003
2004        with self.assertWarns(DeprecationWarning):
2005            @asyncio.coroutine
2006            def func():
2007                return fut
2008
2009        async def coro():
2010            fut.set_result('test')
2011
2012        t1 = self.new_task(self.loop, func())
2013        t2 = self.new_task(self.loop, coro())
2014        res = self.loop.run_until_complete(t1)
2015        self.assertEqual(res, 'test')
2016        self.assertIsNone(t2.result())
2017
2018    def test_current_task(self):
2019        self.assertIsNone(asyncio.current_task(loop=self.loop))
2020
2021        async def coro(loop):
2022            self.assertIs(asyncio.current_task(loop=loop), task)
2023
2024            self.assertIs(asyncio.current_task(None), task)
2025            self.assertIs(asyncio.current_task(), task)
2026
2027        task = self.new_task(self.loop, coro(self.loop))
2028        self.loop.run_until_complete(task)
2029        self.assertIsNone(asyncio.current_task(loop=self.loop))
2030
2031    def test_current_task_with_interleaving_tasks(self):
2032        self.assertIsNone(asyncio.current_task(loop=self.loop))
2033
2034        fut1 = self.new_future(self.loop)
2035        fut2 = self.new_future(self.loop)
2036
2037        async def coro1(loop):
2038            self.assertTrue(asyncio.current_task(loop=loop) is task1)
2039            await fut1
2040            self.assertTrue(asyncio.current_task(loop=loop) is task1)
2041            fut2.set_result(True)
2042
2043        async def coro2(loop):
2044            self.assertTrue(asyncio.current_task(loop=loop) is task2)
2045            fut1.set_result(True)
2046            await fut2
2047            self.assertTrue(asyncio.current_task(loop=loop) is task2)
2048
2049        task1 = self.new_task(self.loop, coro1(self.loop))
2050        task2 = self.new_task(self.loop, coro2(self.loop))
2051
2052        self.loop.run_until_complete(asyncio.wait((task1, task2)))
2053        self.assertIsNone(asyncio.current_task(loop=self.loop))
2054
2055    # Some thorough tests for cancellation propagation through
2056    # coroutines, tasks and wait().
2057
2058    def test_yield_future_passes_cancel(self):
2059        # Cancelling outer() cancels inner() cancels waiter.
2060        proof = 0
2061        waiter = self.new_future(self.loop)
2062
2063        async def inner():
2064            nonlocal proof
2065            try:
2066                await waiter
2067            except asyncio.CancelledError:
2068                proof += 1
2069                raise
2070            else:
2071                self.fail('got past sleep() in inner()')
2072
2073        async def outer():
2074            nonlocal proof
2075            try:
2076                await inner()
2077            except asyncio.CancelledError:
2078                proof += 100  # Expect this path.
2079            else:
2080                proof += 10
2081
2082        f = asyncio.ensure_future(outer(), loop=self.loop)
2083        test_utils.run_briefly(self.loop)
2084        f.cancel()
2085        self.loop.run_until_complete(f)
2086        self.assertEqual(proof, 101)
2087        self.assertTrue(waiter.cancelled())
2088
2089    def test_yield_wait_does_not_shield_cancel(self):
2090        # Cancelling outer() makes wait() return early, leaves inner()
2091        # running.
2092        proof = 0
2093        waiter = self.new_future(self.loop)
2094
2095        async def inner():
2096            nonlocal proof
2097            await waiter
2098            proof += 1
2099
2100        async def outer():
2101            nonlocal proof
2102            with self.assertWarns(DeprecationWarning):
2103                d, p = await asyncio.wait([inner()])
2104            proof += 100
2105
2106        f = asyncio.ensure_future(outer(), loop=self.loop)
2107        test_utils.run_briefly(self.loop)
2108        f.cancel()
2109        self.assertRaises(
2110            asyncio.CancelledError, self.loop.run_until_complete, f)
2111        waiter.set_result(None)
2112        test_utils.run_briefly(self.loop)
2113        self.assertEqual(proof, 1)
2114
2115    def test_shield_result(self):
2116        inner = self.new_future(self.loop)
2117        outer = asyncio.shield(inner)
2118        inner.set_result(42)
2119        res = self.loop.run_until_complete(outer)
2120        self.assertEqual(res, 42)
2121
2122    def test_shield_exception(self):
2123        inner = self.new_future(self.loop)
2124        outer = asyncio.shield(inner)
2125        test_utils.run_briefly(self.loop)
2126        exc = RuntimeError('expected')
2127        inner.set_exception(exc)
2128        test_utils.run_briefly(self.loop)
2129        self.assertIs(outer.exception(), exc)
2130
2131    def test_shield_cancel_inner(self):
2132        inner = self.new_future(self.loop)
2133        outer = asyncio.shield(inner)
2134        test_utils.run_briefly(self.loop)
2135        inner.cancel()
2136        test_utils.run_briefly(self.loop)
2137        self.assertTrue(outer.cancelled())
2138
2139    def test_shield_cancel_outer(self):
2140        inner = self.new_future(self.loop)
2141        outer = asyncio.shield(inner)
2142        test_utils.run_briefly(self.loop)
2143        outer.cancel()
2144        test_utils.run_briefly(self.loop)
2145        self.assertTrue(outer.cancelled())
2146        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
2147
2148    def test_shield_shortcut(self):
2149        fut = self.new_future(self.loop)
2150        fut.set_result(42)
2151        res = self.loop.run_until_complete(asyncio.shield(fut))
2152        self.assertEqual(res, 42)
2153
2154    def test_shield_effect(self):
2155        # Cancelling outer() does not affect inner().
2156        proof = 0
2157        waiter = self.new_future(self.loop)
2158
2159        async def inner():
2160            nonlocal proof
2161            await waiter
2162            proof += 1
2163
2164        async def outer():
2165            nonlocal proof
2166            await asyncio.shield(inner())
2167            proof += 100
2168
2169        f = asyncio.ensure_future(outer(), loop=self.loop)
2170        test_utils.run_briefly(self.loop)
2171        f.cancel()
2172        with self.assertRaises(asyncio.CancelledError):
2173            self.loop.run_until_complete(f)
2174        waiter.set_result(None)
2175        test_utils.run_briefly(self.loop)
2176        self.assertEqual(proof, 1)
2177
2178    def test_shield_gather(self):
2179        child1 = self.new_future(self.loop)
2180        child2 = self.new_future(self.loop)
2181        parent = asyncio.gather(child1, child2)
2182        outer = asyncio.shield(parent)
2183        test_utils.run_briefly(self.loop)
2184        outer.cancel()
2185        test_utils.run_briefly(self.loop)
2186        self.assertTrue(outer.cancelled())
2187        child1.set_result(1)
2188        child2.set_result(2)
2189        test_utils.run_briefly(self.loop)
2190        self.assertEqual(parent.result(), [1, 2])
2191
2192    def test_gather_shield(self):
2193        child1 = self.new_future(self.loop)
2194        child2 = self.new_future(self.loop)
2195        inner1 = asyncio.shield(child1)
2196        inner2 = asyncio.shield(child2)
2197        parent = asyncio.gather(inner1, inner2)
2198        test_utils.run_briefly(self.loop)
2199        parent.cancel()
2200        # This should cancel inner1 and inner2 but bot child1 and child2.
2201        test_utils.run_briefly(self.loop)
2202        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
2203        self.assertTrue(inner1.cancelled())
2204        self.assertTrue(inner2.cancelled())
2205        child1.set_result(1)
2206        child2.set_result(2)
2207        test_utils.run_briefly(self.loop)
2208
2209    def test_as_completed_invalid_args(self):
2210        fut = self.new_future(self.loop)
2211
2212        # as_completed() expects a list of futures, not a future instance
2213        self.assertRaises(TypeError, self.loop.run_until_complete,
2214            asyncio.as_completed(fut, loop=self.loop))
2215        coro = coroutine_function()
2216        self.assertRaises(TypeError, self.loop.run_until_complete,
2217            asyncio.as_completed(coro, loop=self.loop))
2218        coro.close()
2219
2220    def test_wait_invalid_args(self):
2221        fut = self.new_future(self.loop)
2222
2223        # wait() expects a list of futures, not a future instance
2224        self.assertRaises(TypeError, self.loop.run_until_complete,
2225            asyncio.wait(fut))
2226        coro = coroutine_function()
2227        self.assertRaises(TypeError, self.loop.run_until_complete,
2228            asyncio.wait(coro))
2229        coro.close()
2230
2231        # wait() expects at least a future
2232        self.assertRaises(ValueError, self.loop.run_until_complete,
2233            asyncio.wait([]))
2234
2235    def test_corowrapper_mocks_generator(self):
2236
2237        def check():
2238            # A function that asserts various things.
2239            # Called twice, with different debug flag values.
2240
2241            with self.assertWarns(DeprecationWarning):
2242                @asyncio.coroutine
2243                def coro():
2244                    # The actual coroutine.
2245                    self.assertTrue(gen.gi_running)
2246                    yield from fut
2247
2248            # A completed Future used to run the coroutine.
2249            fut = self.new_future(self.loop)
2250            fut.set_result(None)
2251
2252            # Call the coroutine.
2253            gen = coro()
2254
2255            # Check some properties.
2256            self.assertTrue(asyncio.iscoroutine(gen))
2257            self.assertIsInstance(gen.gi_frame, types.FrameType)
2258            self.assertFalse(gen.gi_running)
2259            self.assertIsInstance(gen.gi_code, types.CodeType)
2260
2261            # Run it.
2262            self.loop.run_until_complete(gen)
2263
2264            # The frame should have changed.
2265            self.assertIsNone(gen.gi_frame)
2266
2267        # Test with debug flag cleared.
2268        with set_coroutine_debug(False):
2269            check()
2270
2271        # Test with debug flag set.
2272        with set_coroutine_debug(True):
2273            check()
2274
2275    def test_yield_from_corowrapper(self):
2276        with set_coroutine_debug(True):
2277            with self.assertWarns(DeprecationWarning):
2278                @asyncio.coroutine
2279                def t1():
2280                    return (yield from t2())
2281
2282            with self.assertWarns(DeprecationWarning):
2283                @asyncio.coroutine
2284                def t2():
2285                    f = self.new_future(self.loop)
2286                    self.new_task(self.loop, t3(f))
2287                    return (yield from f)
2288
2289            with self.assertWarns(DeprecationWarning):
2290                @asyncio.coroutine
2291                def t3(f):
2292                    f.set_result((1, 2, 3))
2293
2294            task = self.new_task(self.loop, t1())
2295            val = self.loop.run_until_complete(task)
2296            self.assertEqual(val, (1, 2, 3))
2297
2298    def test_yield_from_corowrapper_send(self):
2299        def foo():
2300            a = yield
2301            return a
2302
2303        def call(arg):
2304            cw = asyncio.coroutines.CoroWrapper(foo())
2305            cw.send(None)
2306            try:
2307                cw.send(arg)
2308            except StopIteration as ex:
2309                return ex.args[0]
2310            else:
2311                raise AssertionError('StopIteration was expected')
2312
2313        self.assertEqual(call((1, 2)), (1, 2))
2314        self.assertEqual(call('spam'), 'spam')
2315
2316    def test_corowrapper_weakref(self):
2317        wd = weakref.WeakValueDictionary()
2318        def foo(): yield from []
2319        cw = asyncio.coroutines.CoroWrapper(foo())
2320        wd['cw'] = cw  # Would fail without __weakref__ slot.
2321        cw.gen = None  # Suppress warning from __del__.
2322
2323    def test_corowrapper_throw(self):
2324        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
2325        def foo():
2326            value = None
2327            while True:
2328                try:
2329                    value = yield value
2330                except Exception as e:
2331                    value = e
2332
2333        exception = Exception("foo")
2334        cw = asyncio.coroutines.CoroWrapper(foo())
2335        cw.send(None)
2336        self.assertIs(exception, cw.throw(exception))
2337
2338        cw = asyncio.coroutines.CoroWrapper(foo())
2339        cw.send(None)
2340        self.assertIs(exception, cw.throw(Exception, exception))
2341
2342        cw = asyncio.coroutines.CoroWrapper(foo())
2343        cw.send(None)
2344        exception = cw.throw(Exception, "foo")
2345        self.assertIsInstance(exception, Exception)
2346        self.assertEqual(exception.args, ("foo", ))
2347
2348        cw = asyncio.coroutines.CoroWrapper(foo())
2349        cw.send(None)
2350        exception = cw.throw(Exception, "foo", None)
2351        self.assertIsInstance(exception, Exception)
2352        self.assertEqual(exception.args, ("foo", ))
2353
2354    def test_log_destroyed_pending_task(self):
2355        Task = self.__class__.Task
2356
2357        with self.assertWarns(DeprecationWarning):
2358            @asyncio.coroutine
2359            def kill_me(loop):
2360                future = self.new_future(loop)
2361                yield from future
2362                # at this point, the only reference to kill_me() task is
2363                # the Task._wakeup() method in future._callbacks
2364                raise Exception("code never reached")
2365
2366        mock_handler = mock.Mock()
2367        self.loop.set_debug(True)
2368        self.loop.set_exception_handler(mock_handler)
2369
2370        # schedule the task
2371        coro = kill_me(self.loop)
2372        task = asyncio.ensure_future(coro, loop=self.loop)
2373
2374        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2375
2376        asyncio.set_event_loop(None)
2377
2378        # execute the task so it waits for future
2379        self.loop._run_once()
2380        self.assertEqual(len(self.loop._ready), 0)
2381
2382        # remove the future used in kill_me(), and references to the task
2383        del coro.gi_frame.f_locals['future']
2384        coro = None
2385        source_traceback = task._source_traceback
2386        task = None
2387
2388        # no more reference to kill_me() task: the task is destroyed by the GC
2389        support.gc_collect()
2390
2391        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2392
2393        mock_handler.assert_called_with(self.loop, {
2394            'message': 'Task was destroyed but it is pending!',
2395            'task': mock.ANY,
2396            'source_traceback': source_traceback,
2397        })
2398        mock_handler.reset_mock()
2399
2400    @mock.patch('asyncio.base_events.logger')
2401    def test_tb_logger_not_called_after_cancel(self, m_log):
2402        loop = asyncio.new_event_loop()
2403        self.set_event_loop(loop)
2404
2405        async def coro():
2406            raise TypeError
2407
2408        async def runner():
2409            task = self.new_task(loop, coro())
2410            await asyncio.sleep(0.05)
2411            task.cancel()
2412            task = None
2413
2414        loop.run_until_complete(runner())
2415        self.assertFalse(m_log.error.called)
2416
2417    @mock.patch('asyncio.coroutines.logger')
2418    def test_coroutine_never_yielded(self, m_log):
2419        with set_coroutine_debug(True):
2420            with self.assertWarns(DeprecationWarning):
2421                @asyncio.coroutine
2422                def coro_noop():
2423                    pass
2424
2425        tb_filename = __file__
2426        tb_lineno = sys._getframe().f_lineno + 2
2427        # create a coroutine object but don't use it
2428        coro_noop()
2429        support.gc_collect()
2430
2431        self.assertTrue(m_log.error.called)
2432        message = m_log.error.call_args[0][0]
2433        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2434
2435        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2436                    r'was never yielded from\n'
2437                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2438                 r'.*\n'
2439                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2440                 r'    coro_noop\(\)$'
2441                 % (re.escape(coro_noop.__qualname__),
2442                    re.escape(func_filename), func_lineno,
2443                    re.escape(tb_filename), tb_lineno))
2444
2445        self.assertRegex(message, re.compile(regex, re.DOTALL))
2446
2447    def test_return_coroutine_from_coroutine(self):
2448        """Return of @asyncio.coroutine()-wrapped function generator object
2449        from @asyncio.coroutine()-wrapped function should have same effect as
2450        returning generator object or Future."""
2451        def check():
2452            with self.assertWarns(DeprecationWarning):
2453                @asyncio.coroutine
2454                def outer_coro():
2455                    with self.assertWarns(DeprecationWarning):
2456                        @asyncio.coroutine
2457                        def inner_coro():
2458                            return 1
2459
2460                    return inner_coro()
2461
2462            result = self.loop.run_until_complete(outer_coro())
2463            self.assertEqual(result, 1)
2464
2465        # Test with debug flag cleared.
2466        with set_coroutine_debug(False):
2467            check()
2468
2469        # Test with debug flag set.
2470        with set_coroutine_debug(True):
2471            check()
2472
2473    def test_task_source_traceback(self):
2474        self.loop.set_debug(True)
2475
2476        task = self.new_task(self.loop, coroutine_function())
2477        lineno = sys._getframe().f_lineno - 1
2478        self.assertIsInstance(task._source_traceback, list)
2479        self.assertEqual(task._source_traceback[-2][:3],
2480                         (__file__,
2481                          lineno,
2482                          'test_task_source_traceback'))
2483        self.loop.run_until_complete(task)
2484
2485    def _test_cancel_wait_for(self, timeout):
2486        loop = asyncio.new_event_loop()
2487        self.addCleanup(loop.close)
2488
2489        async def blocking_coroutine():
2490            fut = self.new_future(loop)
2491            # Block: fut result is never set
2492            await fut
2493
2494        task = loop.create_task(blocking_coroutine())
2495
2496        wait = loop.create_task(asyncio.wait_for(task, timeout))
2497        loop.call_soon(wait.cancel)
2498
2499        self.assertRaises(asyncio.CancelledError,
2500                          loop.run_until_complete, wait)
2501
2502        # Python issue #23219: cancelling the wait must also cancel the task
2503        self.assertTrue(task.cancelled())
2504
2505    def test_cancel_blocking_wait_for(self):
2506        self._test_cancel_wait_for(None)
2507
2508    def test_cancel_wait_for(self):
2509        self._test_cancel_wait_for(60.0)
2510
2511    def test_cancel_gather_1(self):
2512        """Ensure that a gathering future refuses to be cancelled once all
2513        children are done"""
2514        loop = asyncio.new_event_loop()
2515        self.addCleanup(loop.close)
2516
2517        fut = self.new_future(loop)
2518        # The indirection fut->child_coro is needed since otherwise the
2519        # gathering task is done at the same time as the child future
2520        def child_coro():
2521            return (yield from fut)
2522        with self.assertWarns(DeprecationWarning):
2523            gather_future = asyncio.gather(child_coro(), loop=loop)
2524        gather_task = asyncio.ensure_future(gather_future, loop=loop)
2525
2526        cancel_result = None
2527        def cancelling_callback(_):
2528            nonlocal cancel_result
2529            cancel_result = gather_task.cancel()
2530        fut.add_done_callback(cancelling_callback)
2531
2532        fut.set_result(42) # calls the cancelling_callback after fut is done()
2533
2534        # At this point the task should complete.
2535        loop.run_until_complete(gather_task)
2536
2537        # Python issue #26923: asyncio.gather drops cancellation
2538        self.assertEqual(cancel_result, False)
2539        self.assertFalse(gather_task.cancelled())
2540        self.assertEqual(gather_task.result(), [42])
2541
2542    def test_cancel_gather_2(self):
2543        cases = [
2544            ((), ()),
2545            ((None,), ()),
2546            (('my message',), ('my message',)),
2547            # Non-string values should roundtrip.
2548            ((5,), (5,)),
2549        ]
2550        for cancel_args, expected_args in cases:
2551            with self.subTest(cancel_args=cancel_args):
2552                loop = asyncio.new_event_loop()
2553                self.addCleanup(loop.close)
2554
2555                async def test():
2556                    time = 0
2557                    while True:
2558                        time += 0.05
2559                        with self.assertWarns(DeprecationWarning):
2560                            await asyncio.gather(asyncio.sleep(0.05),
2561                                             return_exceptions=True,
2562                                             loop=loop)
2563                        if time > 1:
2564                            return
2565
2566                async def main():
2567                    qwe = self.new_task(loop, test())
2568                    await asyncio.sleep(0.2)
2569                    qwe.cancel(*cancel_args)
2570                    await qwe
2571
2572                try:
2573                    loop.run_until_complete(main())
2574                except asyncio.CancelledError as exc:
2575                    self.assertEqual(exc.args, ())
2576                    exc_type, exc_args, depth = get_innermost_context(exc)
2577                    self.assertEqual((exc_type, exc_args),
2578                        (asyncio.CancelledError, expected_args))
2579                    # The exact traceback seems to vary in CI.
2580                    self.assertIn(depth, (2, 3))
2581                else:
2582                    self.fail('gather did not propagate the cancellation '
2583                              'request')
2584
2585    def test_exception_traceback(self):
2586        # See http://bugs.python.org/issue28843
2587
2588        async def foo():
2589            1 / 0
2590
2591        async def main():
2592            task = self.new_task(self.loop, foo())
2593            await asyncio.sleep(0)  # skip one loop iteration
2594            self.assertIsNotNone(task.exception().__traceback__)
2595
2596        self.loop.run_until_complete(main())
2597
2598    @mock.patch('asyncio.base_events.logger')
2599    def test_error_in_call_soon(self, m_log):
2600        def call_soon(callback, *args, **kwargs):
2601            raise ValueError
2602        self.loop.call_soon = call_soon
2603
2604        with self.assertWarns(DeprecationWarning):
2605            @asyncio.coroutine
2606            def coro():
2607                pass
2608
2609        self.assertFalse(m_log.error.called)
2610
2611        with self.assertRaises(ValueError):
2612            gen = coro()
2613            try:
2614                self.new_task(self.loop, gen)
2615            finally:
2616                gen.close()
2617        gc.collect()  # For PyPy or other GCs.
2618
2619        self.assertTrue(m_log.error.called)
2620        message = m_log.error.call_args[0][0]
2621        self.assertIn('Task was destroyed but it is pending', message)
2622
2623        self.assertEqual(asyncio.all_tasks(self.loop), set())
2624
2625    def test_create_task_with_noncoroutine(self):
2626        with self.assertRaisesRegex(TypeError,
2627                                    "a coroutine was expected, got 123"):
2628            self.new_task(self.loop, 123)
2629
2630        # test it for the second time to ensure that caching
2631        # in asyncio.iscoroutine() doesn't break things.
2632        with self.assertRaisesRegex(TypeError,
2633                                    "a coroutine was expected, got 123"):
2634            self.new_task(self.loop, 123)
2635
2636    def test_create_task_with_oldstyle_coroutine(self):
2637
2638        with self.assertWarns(DeprecationWarning):
2639            @asyncio.coroutine
2640            def coro():
2641                pass
2642
2643        task = self.new_task(self.loop, coro())
2644        self.assertIsInstance(task, self.Task)
2645        self.loop.run_until_complete(task)
2646
2647        # test it for the second time to ensure that caching
2648        # in asyncio.iscoroutine() doesn't break things.
2649        task = self.new_task(self.loop, coro())
2650        self.assertIsInstance(task, self.Task)
2651        self.loop.run_until_complete(task)
2652
2653    def test_create_task_with_async_function(self):
2654
2655        async def coro():
2656            pass
2657
2658        task = self.new_task(self.loop, coro())
2659        self.assertIsInstance(task, self.Task)
2660        self.loop.run_until_complete(task)
2661
2662        # test it for the second time to ensure that caching
2663        # in asyncio.iscoroutine() doesn't break things.
2664        task = self.new_task(self.loop, coro())
2665        self.assertIsInstance(task, self.Task)
2666        self.loop.run_until_complete(task)
2667
2668    def test_create_task_with_asynclike_function(self):
2669        task = self.new_task(self.loop, CoroLikeObject())
2670        self.assertIsInstance(task, self.Task)
2671        self.assertEqual(self.loop.run_until_complete(task), 42)
2672
2673        # test it for the second time to ensure that caching
2674        # in asyncio.iscoroutine() doesn't break things.
2675        task = self.new_task(self.loop, CoroLikeObject())
2676        self.assertIsInstance(task, self.Task)
2677        self.assertEqual(self.loop.run_until_complete(task), 42)
2678
2679    def test_bare_create_task(self):
2680
2681        async def inner():
2682            return 1
2683
2684        async def coro():
2685            task = asyncio.create_task(inner())
2686            self.assertIsInstance(task, self.Task)
2687            ret = await task
2688            self.assertEqual(1, ret)
2689
2690        self.loop.run_until_complete(coro())
2691
2692    def test_bare_create_named_task(self):
2693
2694        async def coro_noop():
2695            pass
2696
2697        async def coro():
2698            task = asyncio.create_task(coro_noop(), name='No-op')
2699            self.assertEqual(task.get_name(), 'No-op')
2700            await task
2701
2702        self.loop.run_until_complete(coro())
2703
2704    def test_context_1(self):
2705        cvar = contextvars.ContextVar('cvar', default='nope')
2706
2707        async def sub():
2708            await asyncio.sleep(0.01)
2709            self.assertEqual(cvar.get(), 'nope')
2710            cvar.set('something else')
2711
2712        async def main():
2713            self.assertEqual(cvar.get(), 'nope')
2714            subtask = self.new_task(loop, sub())
2715            cvar.set('yes')
2716            self.assertEqual(cvar.get(), 'yes')
2717            await subtask
2718            self.assertEqual(cvar.get(), 'yes')
2719
2720        loop = asyncio.new_event_loop()
2721        try:
2722            task = self.new_task(loop, main())
2723            loop.run_until_complete(task)
2724        finally:
2725            loop.close()
2726
2727    def test_context_2(self):
2728        cvar = contextvars.ContextVar('cvar', default='nope')
2729
2730        async def main():
2731            def fut_on_done(fut):
2732                # This change must not pollute the context
2733                # of the "main()" task.
2734                cvar.set('something else')
2735
2736            self.assertEqual(cvar.get(), 'nope')
2737
2738            for j in range(2):
2739                fut = self.new_future(loop)
2740                fut.add_done_callback(fut_on_done)
2741                cvar.set(f'yes{j}')
2742                loop.call_soon(fut.set_result, None)
2743                await fut
2744                self.assertEqual(cvar.get(), f'yes{j}')
2745
2746                for i in range(3):
2747                    # Test that task passed its context to add_done_callback:
2748                    cvar.set(f'yes{i}-{j}')
2749                    await asyncio.sleep(0.001)
2750                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2751
2752        loop = asyncio.new_event_loop()
2753        try:
2754            task = self.new_task(loop, main())
2755            loop.run_until_complete(task)
2756        finally:
2757            loop.close()
2758
2759        self.assertEqual(cvar.get(), 'nope')
2760
2761    def test_context_3(self):
2762        # Run 100 Tasks in parallel, each modifying cvar.
2763
2764        cvar = contextvars.ContextVar('cvar', default=-1)
2765
2766        async def sub(num):
2767            for i in range(10):
2768                cvar.set(num + i)
2769                await asyncio.sleep(random.uniform(0.001, 0.05))
2770                self.assertEqual(cvar.get(), num + i)
2771
2772        async def main():
2773            tasks = []
2774            for i in range(100):
2775                task = loop.create_task(sub(random.randint(0, 10)))
2776                tasks.append(task)
2777
2778            await asyncio.gather(*tasks)
2779
2780        loop = asyncio.new_event_loop()
2781        try:
2782            loop.run_until_complete(main())
2783        finally:
2784            loop.close()
2785
2786        self.assertEqual(cvar.get(), -1)
2787
2788    def test_get_coro(self):
2789        loop = asyncio.new_event_loop()
2790        coro = coroutine_function()
2791        try:
2792            task = self.new_task(loop, coro)
2793            loop.run_until_complete(task)
2794            self.assertIs(task.get_coro(), coro)
2795        finally:
2796            loop.close()
2797
2798
2799def add_subclass_tests(cls):
2800    BaseTask = cls.Task
2801    BaseFuture = cls.Future
2802
2803    if BaseTask is None or BaseFuture is None:
2804        return cls
2805
2806    class CommonFuture:
2807        def __init__(self, *args, **kwargs):
2808            self.calls = collections.defaultdict(lambda: 0)
2809            super().__init__(*args, **kwargs)
2810
2811        def add_done_callback(self, *args, **kwargs):
2812            self.calls['add_done_callback'] += 1
2813            return super().add_done_callback(*args, **kwargs)
2814
2815    class Task(CommonFuture, BaseTask):
2816        pass
2817
2818    class Future(CommonFuture, BaseFuture):
2819        pass
2820
2821    def test_subclasses_ctask_cfuture(self):
2822        fut = self.Future(loop=self.loop)
2823
2824        async def func():
2825            self.loop.call_soon(lambda: fut.set_result('spam'))
2826            return await fut
2827
2828        task = self.Task(func(), loop=self.loop)
2829
2830        result = self.loop.run_until_complete(task)
2831
2832        self.assertEqual(result, 'spam')
2833
2834        self.assertEqual(
2835            dict(task.calls),
2836            {'add_done_callback': 1})
2837
2838        self.assertEqual(
2839            dict(fut.calls),
2840            {'add_done_callback': 1})
2841
2842    # Add patched Task & Future back to the test case
2843    cls.Task = Task
2844    cls.Future = Future
2845
2846    # Add an extra unit-test
2847    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2848
2849    # Disable the "test_task_source_traceback" test
2850    # (the test is hardcoded for a particular call stack, which
2851    # is slightly different for Task subclasses)
2852    cls.test_task_source_traceback = None
2853
2854    return cls
2855
2856
2857class SetMethodsTest:
2858
2859    def test_set_result_causes_invalid_state(self):
2860        Future = type(self).Future
2861        self.loop.call_exception_handler = exc_handler = mock.Mock()
2862
2863        async def foo():
2864            await asyncio.sleep(0.1)
2865            return 10
2866
2867        coro = foo()
2868        task = self.new_task(self.loop, coro)
2869        Future.set_result(task, 'spam')
2870
2871        self.assertEqual(
2872            self.loop.run_until_complete(task),
2873            'spam')
2874
2875        exc_handler.assert_called_once()
2876        exc = exc_handler.call_args[0][0]['exception']
2877        with self.assertRaisesRegex(asyncio.InvalidStateError,
2878                                    r'step\(\): already done'):
2879            raise exc
2880
2881        coro.close()
2882
2883    def test_set_exception_causes_invalid_state(self):
2884        class MyExc(Exception):
2885            pass
2886
2887        Future = type(self).Future
2888        self.loop.call_exception_handler = exc_handler = mock.Mock()
2889
2890        async def foo():
2891            await asyncio.sleep(0.1)
2892            return 10
2893
2894        coro = foo()
2895        task = self.new_task(self.loop, coro)
2896        Future.set_exception(task, MyExc())
2897
2898        with self.assertRaises(MyExc):
2899            self.loop.run_until_complete(task)
2900
2901        exc_handler.assert_called_once()
2902        exc = exc_handler.call_args[0][0]['exception']
2903        with self.assertRaisesRegex(asyncio.InvalidStateError,
2904                                    r'step\(\): already done'):
2905            raise exc
2906
2907        coro.close()
2908
2909
2910@unittest.skipUnless(hasattr(futures, '_CFuture') and
2911                     hasattr(tasks, '_CTask'),
2912                     'requires the C _asyncio module')
2913class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2914                          test_utils.TestCase):
2915
2916    Task = getattr(tasks, '_CTask', None)
2917    Future = getattr(futures, '_CFuture', None)
2918
2919    @support.refcount_test
2920    def test_refleaks_in_task___init__(self):
2921        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2922        async def coro():
2923            pass
2924        task = self.new_task(self.loop, coro())
2925        self.loop.run_until_complete(task)
2926        refs_before = gettotalrefcount()
2927        for i in range(100):
2928            task.__init__(coro(), loop=self.loop)
2929            self.loop.run_until_complete(task)
2930        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2931
2932    def test_del__log_destroy_pending_segfault(self):
2933        async def coro():
2934            pass
2935        task = self.new_task(self.loop, coro())
2936        self.loop.run_until_complete(task)
2937        with self.assertRaises(AttributeError):
2938            del task._log_destroy_pending
2939
2940
2941@unittest.skipUnless(hasattr(futures, '_CFuture') and
2942                     hasattr(tasks, '_CTask'),
2943                     'requires the C _asyncio module')
2944@add_subclass_tests
2945class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2946
2947    Task = getattr(tasks, '_CTask', None)
2948    Future = getattr(futures, '_CFuture', None)
2949
2950
2951@unittest.skipUnless(hasattr(tasks, '_CTask'),
2952                     'requires the C _asyncio module')
2953@add_subclass_tests
2954class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2955
2956    Task = getattr(tasks, '_CTask', None)
2957    Future = futures._PyFuture
2958
2959
2960@unittest.skipUnless(hasattr(futures, '_CFuture'),
2961                     'requires the C _asyncio module')
2962@add_subclass_tests
2963class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2964
2965    Future = getattr(futures, '_CFuture', None)
2966    Task = tasks._PyTask
2967
2968
2969@unittest.skipUnless(hasattr(tasks, '_CTask'),
2970                     'requires the C _asyncio module')
2971class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2972
2973    Task = getattr(tasks, '_CTask', None)
2974    Future = futures._PyFuture
2975
2976
2977@unittest.skipUnless(hasattr(futures, '_CFuture'),
2978                     'requires the C _asyncio module')
2979class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2980
2981    Task = tasks._PyTask
2982    Future = getattr(futures, '_CFuture', None)
2983
2984
2985class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2986                            test_utils.TestCase):
2987
2988    Task = tasks._PyTask
2989    Future = futures._PyFuture
2990
2991
2992@add_subclass_tests
2993class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2994    Task = tasks._PyTask
2995    Future = futures._PyFuture
2996
2997
2998@unittest.skipUnless(hasattr(tasks, '_CTask'),
2999                     'requires the C _asyncio module')
3000class CTask_Future_Tests(test_utils.TestCase):
3001
3002    def test_foobar(self):
3003        class Fut(asyncio.Future):
3004            @property
3005            def get_loop(self):
3006                raise AttributeError
3007
3008        async def coro():
3009            await fut
3010            return 'spam'
3011
3012        self.loop = asyncio.new_event_loop()
3013        try:
3014            fut = Fut(loop=self.loop)
3015            self.loop.call_later(0.1, fut.set_result, 1)
3016            task = self.loop.create_task(coro())
3017            res = self.loop.run_until_complete(task)
3018        finally:
3019            self.loop.close()
3020
3021        self.assertEqual(res, 'spam')
3022
3023
3024class BaseTaskIntrospectionTests:
3025    _register_task = None
3026    _unregister_task = None
3027    _enter_task = None
3028    _leave_task = None
3029
3030    def test__register_task_1(self):
3031        class TaskLike:
3032            @property
3033            def _loop(self):
3034                return loop
3035
3036            def done(self):
3037                return False
3038
3039        task = TaskLike()
3040        loop = mock.Mock()
3041
3042        self.assertEqual(asyncio.all_tasks(loop), set())
3043        self._register_task(task)
3044        self.assertEqual(asyncio.all_tasks(loop), {task})
3045        self._unregister_task(task)
3046
3047    def test__register_task_2(self):
3048        class TaskLike:
3049            def get_loop(self):
3050                return loop
3051
3052            def done(self):
3053                return False
3054
3055        task = TaskLike()
3056        loop = mock.Mock()
3057
3058        self.assertEqual(asyncio.all_tasks(loop), set())
3059        self._register_task(task)
3060        self.assertEqual(asyncio.all_tasks(loop), {task})
3061        self._unregister_task(task)
3062
3063    def test__register_task_3(self):
3064        class TaskLike:
3065            def get_loop(self):
3066                return loop
3067
3068            def done(self):
3069                return True
3070
3071        task = TaskLike()
3072        loop = mock.Mock()
3073
3074        self.assertEqual(asyncio.all_tasks(loop), set())
3075        self._register_task(task)
3076        self.assertEqual(asyncio.all_tasks(loop), set())
3077        self._unregister_task(task)
3078
3079    def test__enter_task(self):
3080        task = mock.Mock()
3081        loop = mock.Mock()
3082        self.assertIsNone(asyncio.current_task(loop))
3083        self._enter_task(loop, task)
3084        self.assertIs(asyncio.current_task(loop), task)
3085        self._leave_task(loop, task)
3086
3087    def test__enter_task_failure(self):
3088        task1 = mock.Mock()
3089        task2 = mock.Mock()
3090        loop = mock.Mock()
3091        self._enter_task(loop, task1)
3092        with self.assertRaises(RuntimeError):
3093            self._enter_task(loop, task2)
3094        self.assertIs(asyncio.current_task(loop), task1)
3095        self._leave_task(loop, task1)
3096
3097    def test__leave_task(self):
3098        task = mock.Mock()
3099        loop = mock.Mock()
3100        self._enter_task(loop, task)
3101        self._leave_task(loop, task)
3102        self.assertIsNone(asyncio.current_task(loop))
3103
3104    def test__leave_task_failure1(self):
3105        task1 = mock.Mock()
3106        task2 = mock.Mock()
3107        loop = mock.Mock()
3108        self._enter_task(loop, task1)
3109        with self.assertRaises(RuntimeError):
3110            self._leave_task(loop, task2)
3111        self.assertIs(asyncio.current_task(loop), task1)
3112        self._leave_task(loop, task1)
3113
3114    def test__leave_task_failure2(self):
3115        task = mock.Mock()
3116        loop = mock.Mock()
3117        with self.assertRaises(RuntimeError):
3118            self._leave_task(loop, task)
3119        self.assertIsNone(asyncio.current_task(loop))
3120
3121    def test__unregister_task(self):
3122        task = mock.Mock()
3123        loop = mock.Mock()
3124        task.get_loop = lambda: loop
3125        self._register_task(task)
3126        self._unregister_task(task)
3127        self.assertEqual(asyncio.all_tasks(loop), set())
3128
3129    def test__unregister_task_not_registered(self):
3130        task = mock.Mock()
3131        loop = mock.Mock()
3132        self._unregister_task(task)
3133        self.assertEqual(asyncio.all_tasks(loop), set())
3134
3135
3136class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3137    _register_task = staticmethod(tasks._py_register_task)
3138    _unregister_task = staticmethod(tasks._py_unregister_task)
3139    _enter_task = staticmethod(tasks._py_enter_task)
3140    _leave_task = staticmethod(tasks._py_leave_task)
3141
3142
3143@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
3144                     'requires the C _asyncio module')
3145class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
3146    if hasattr(tasks, '_c_register_task'):
3147        _register_task = staticmethod(tasks._c_register_task)
3148        _unregister_task = staticmethod(tasks._c_unregister_task)
3149        _enter_task = staticmethod(tasks._c_enter_task)
3150        _leave_task = staticmethod(tasks._c_leave_task)
3151    else:
3152        _register_task = _unregister_task = _enter_task = _leave_task = None
3153
3154
3155class BaseCurrentLoopTests:
3156
3157    def setUp(self):
3158        super().setUp()
3159        self.loop = asyncio.new_event_loop()
3160        self.set_event_loop(self.loop)
3161
3162    def new_task(self, coro):
3163        raise NotImplementedError
3164
3165    def test_current_task_no_running_loop(self):
3166        self.assertIsNone(asyncio.current_task(loop=self.loop))
3167
3168    def test_current_task_no_running_loop_implicit(self):
3169        with self.assertRaises(RuntimeError):
3170            asyncio.current_task()
3171
3172    def test_current_task_with_implicit_loop(self):
3173        async def coro():
3174            self.assertIs(asyncio.current_task(loop=self.loop), task)
3175
3176            self.assertIs(asyncio.current_task(None), task)
3177            self.assertIs(asyncio.current_task(), task)
3178
3179        task = self.new_task(coro())
3180        self.loop.run_until_complete(task)
3181        self.assertIsNone(asyncio.current_task(loop=self.loop))
3182
3183
3184class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3185
3186    def new_task(self, coro):
3187        return tasks._PyTask(coro, loop=self.loop)
3188
3189
3190@unittest.skipUnless(hasattr(tasks, '_CTask'),
3191                     'requires the C _asyncio module')
3192class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
3193
3194    def new_task(self, coro):
3195        return getattr(tasks, '_CTask')(coro, loop=self.loop)
3196
3197
3198class GenericTaskTests(test_utils.TestCase):
3199
3200    def test_future_subclass(self):
3201        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
3202
3203    @support.cpython_only
3204    def test_asyncio_module_compiled(self):
3205        # Because of circular imports it's easy to make _asyncio
3206        # module non-importable.  This is a simple test that will
3207        # fail on systems where C modules were successfully compiled
3208        # (hence the test for _functools etc), but _asyncio somehow didn't.
3209        try:
3210            import _functools
3211            import _json
3212            import _pickle
3213        except ImportError:
3214            self.skipTest('C modules are not available')
3215        else:
3216            try:
3217                import _asyncio
3218            except ImportError:
3219                self.fail('_asyncio module is missing')
3220
3221
3222class GatherTestsBase:
3223
3224    def setUp(self):
3225        super().setUp()
3226        self.one_loop = self.new_test_loop()
3227        self.other_loop = self.new_test_loop()
3228        self.set_event_loop(self.one_loop, cleanup=False)
3229
3230    def _run_loop(self, loop):
3231        while loop._ready:
3232            test_utils.run_briefly(loop)
3233
3234    def _check_success(self, **kwargs):
3235        a, b, c = [self.one_loop.create_future() for i in range(3)]
3236        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
3237        cb = test_utils.MockCallback()
3238        fut.add_done_callback(cb)
3239        b.set_result(1)
3240        a.set_result(2)
3241        self._run_loop(self.one_loop)
3242        self.assertEqual(cb.called, False)
3243        self.assertFalse(fut.done())
3244        c.set_result(3)
3245        self._run_loop(self.one_loop)
3246        cb.assert_called_once_with(fut)
3247        self.assertEqual(fut.result(), [2, 1, 3])
3248
3249    def test_success(self):
3250        self._check_success()
3251        self._check_success(return_exceptions=False)
3252
3253    def test_result_exception_success(self):
3254        self._check_success(return_exceptions=True)
3255
3256    def test_one_exception(self):
3257        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3258        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
3259        cb = test_utils.MockCallback()
3260        fut.add_done_callback(cb)
3261        exc = ZeroDivisionError()
3262        a.set_result(1)
3263        b.set_exception(exc)
3264        self._run_loop(self.one_loop)
3265        self.assertTrue(fut.done())
3266        cb.assert_called_once_with(fut)
3267        self.assertIs(fut.exception(), exc)
3268        # Does nothing
3269        c.set_result(3)
3270        d.cancel()
3271        e.set_exception(RuntimeError())
3272        e.exception()
3273
3274    def test_return_exceptions(self):
3275        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
3276        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
3277                             return_exceptions=True)
3278        cb = test_utils.MockCallback()
3279        fut.add_done_callback(cb)
3280        exc = ZeroDivisionError()
3281        exc2 = RuntimeError()
3282        b.set_result(1)
3283        c.set_exception(exc)
3284        a.set_result(3)
3285        self._run_loop(self.one_loop)
3286        self.assertFalse(fut.done())
3287        d.set_exception(exc2)
3288        self._run_loop(self.one_loop)
3289        self.assertTrue(fut.done())
3290        cb.assert_called_once_with(fut)
3291        self.assertEqual(fut.result(), [3, 1, exc, exc2])
3292
3293    def test_env_var_debug(self):
3294        code = '\n'.join((
3295            'import asyncio.coroutines',
3296            'print(asyncio.coroutines._DEBUG)'))
3297
3298        # Test with -E to not fail if the unit test was run with
3299        # PYTHONASYNCIODEBUG set to a non-empty string
3300        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
3301        self.assertEqual(stdout.rstrip(), b'False')
3302
3303        sts, stdout, stderr = assert_python_ok('-c', code,
3304                                               PYTHONASYNCIODEBUG='',
3305                                               PYTHONDEVMODE='')
3306        self.assertEqual(stdout.rstrip(), b'False')
3307
3308        sts, stdout, stderr = assert_python_ok('-c', code,
3309                                               PYTHONASYNCIODEBUG='1',
3310                                               PYTHONDEVMODE='')
3311        self.assertEqual(stdout.rstrip(), b'True')
3312
3313        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3314                                               PYTHONASYNCIODEBUG='1',
3315                                               PYTHONDEVMODE='')
3316        self.assertEqual(stdout.rstrip(), b'False')
3317
3318        # -X dev
3319        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3320                                               '-c', code)
3321        self.assertEqual(stdout.rstrip(), b'True')
3322
3323
3324class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3325
3326    def wrap_futures(self, *futures):
3327        return futures
3328
3329    def _check_empty_sequence(self, seq_or_iter):
3330        asyncio.set_event_loop(self.one_loop)
3331        self.addCleanup(asyncio.set_event_loop, None)
3332        fut = asyncio.gather(*seq_or_iter)
3333        self.assertIsInstance(fut, asyncio.Future)
3334        self.assertIs(fut._loop, self.one_loop)
3335        self._run_loop(self.one_loop)
3336        self.assertTrue(fut.done())
3337        self.assertEqual(fut.result(), [])
3338        with self.assertWarns(DeprecationWarning):
3339            fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
3340        self.assertIs(fut._loop, self.other_loop)
3341
3342    def test_constructor_empty_sequence(self):
3343        self._check_empty_sequence([])
3344        self._check_empty_sequence(())
3345        self._check_empty_sequence(set())
3346        self._check_empty_sequence(iter(""))
3347
3348    def test_constructor_heterogenous_futures(self):
3349        fut1 = self.one_loop.create_future()
3350        fut2 = self.other_loop.create_future()
3351        with self.assertRaises(ValueError):
3352            asyncio.gather(fut1, fut2)
3353        with self.assertRaises(ValueError):
3354            with self.assertWarns(DeprecationWarning):
3355                asyncio.gather(fut1, loop=self.other_loop)
3356
3357    def test_constructor_homogenous_futures(self):
3358        children = [self.other_loop.create_future() for i in range(3)]
3359        fut = asyncio.gather(*children)
3360        self.assertIs(fut._loop, self.other_loop)
3361        self._run_loop(self.other_loop)
3362        self.assertFalse(fut.done())
3363        with self.assertWarns(DeprecationWarning):
3364            fut = asyncio.gather(*children, loop=self.other_loop)
3365        self.assertIs(fut._loop, self.other_loop)
3366        self._run_loop(self.other_loop)
3367        self.assertFalse(fut.done())
3368
3369    def test_one_cancellation(self):
3370        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3371        fut = asyncio.gather(a, b, c, d, e)
3372        cb = test_utils.MockCallback()
3373        fut.add_done_callback(cb)
3374        a.set_result(1)
3375        b.cancel()
3376        self._run_loop(self.one_loop)
3377        self.assertTrue(fut.done())
3378        cb.assert_called_once_with(fut)
3379        self.assertFalse(fut.cancelled())
3380        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3381        # Does nothing
3382        c.set_result(3)
3383        d.cancel()
3384        e.set_exception(RuntimeError())
3385        e.exception()
3386
3387    def test_result_exception_one_cancellation(self):
3388        a, b, c, d, e, f = [self.one_loop.create_future()
3389                            for i in range(6)]
3390        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3391        cb = test_utils.MockCallback()
3392        fut.add_done_callback(cb)
3393        a.set_result(1)
3394        zde = ZeroDivisionError()
3395        b.set_exception(zde)
3396        c.cancel()
3397        self._run_loop(self.one_loop)
3398        self.assertFalse(fut.done())
3399        d.set_result(3)
3400        e.cancel()
3401        rte = RuntimeError()
3402        f.set_exception(rte)
3403        res = self.one_loop.run_until_complete(fut)
3404        self.assertIsInstance(res[2], asyncio.CancelledError)
3405        self.assertIsInstance(res[4], asyncio.CancelledError)
3406        res[2] = res[4] = None
3407        self.assertEqual(res, [1, zde, None, 3, None, rte])
3408        cb.assert_called_once_with(fut)
3409
3410
3411class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3412
3413    def setUp(self):
3414        super().setUp()
3415        asyncio.set_event_loop(self.one_loop)
3416
3417    def wrap_futures(self, *futures):
3418        coros = []
3419        for fut in futures:
3420            async def coro(fut=fut):
3421                return await fut
3422            coros.append(coro())
3423        return coros
3424
3425    def test_constructor_loop_selection(self):
3426        async def coro():
3427            return 'abc'
3428        gen1 = coro()
3429        gen2 = coro()
3430        fut = asyncio.gather(gen1, gen2)
3431        self.assertIs(fut._loop, self.one_loop)
3432        self.one_loop.run_until_complete(fut)
3433
3434        self.set_event_loop(self.other_loop, cleanup=False)
3435        gen3 = coro()
3436        gen4 = coro()
3437        with self.assertWarns(DeprecationWarning):
3438            fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
3439        self.assertIs(fut2._loop, self.other_loop)
3440        self.other_loop.run_until_complete(fut2)
3441
3442    def test_duplicate_coroutines(self):
3443        with self.assertWarns(DeprecationWarning):
3444            @asyncio.coroutine
3445            def coro(s):
3446                return s
3447        c = coro('abc')
3448        with self.assertWarns(DeprecationWarning):
3449            fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
3450        self._run_loop(self.one_loop)
3451        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3452
3453    def test_cancellation_broadcast(self):
3454        # Cancelling outer() cancels all children.
3455        proof = 0
3456        waiter = self.one_loop.create_future()
3457
3458        async def inner():
3459            nonlocal proof
3460            await waiter
3461            proof += 1
3462
3463        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3464        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3465        gatherer = None
3466
3467        async def outer():
3468            nonlocal proof, gatherer
3469            gatherer = asyncio.gather(child1, child2)
3470            await gatherer
3471            proof += 100
3472
3473        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3474        test_utils.run_briefly(self.one_loop)
3475        self.assertTrue(f.cancel())
3476        with self.assertRaises(asyncio.CancelledError):
3477            self.one_loop.run_until_complete(f)
3478        self.assertFalse(gatherer.cancel())
3479        self.assertTrue(waiter.cancelled())
3480        self.assertTrue(child1.cancelled())
3481        self.assertTrue(child2.cancelled())
3482        test_utils.run_briefly(self.one_loop)
3483        self.assertEqual(proof, 0)
3484
3485    def test_exception_marking(self):
3486        # Test for the first line marked "Mark exception retrieved."
3487
3488        async def inner(f):
3489            await f
3490            raise RuntimeError('should not be ignored')
3491
3492        a = self.one_loop.create_future()
3493        b = self.one_loop.create_future()
3494
3495        async def outer():
3496            await asyncio.gather(inner(a), inner(b))
3497
3498        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3499        test_utils.run_briefly(self.one_loop)
3500        a.set_result(None)
3501        test_utils.run_briefly(self.one_loop)
3502        b.set_result(None)
3503        test_utils.run_briefly(self.one_loop)
3504        self.assertIsInstance(f.exception(), RuntimeError)
3505
3506
3507class RunCoroutineThreadsafeTests(test_utils.TestCase):
3508    """Test case for asyncio.run_coroutine_threadsafe."""
3509
3510    def setUp(self):
3511        super().setUp()
3512        self.loop = asyncio.new_event_loop()
3513        self.set_event_loop(self.loop) # Will cleanup properly
3514
3515    async def add(self, a, b, fail=False, cancel=False):
3516        """Wait 0.05 second and return a + b."""
3517        await asyncio.sleep(0.05)
3518        if fail:
3519            raise RuntimeError("Fail!")
3520        if cancel:
3521            asyncio.current_task(self.loop).cancel()
3522            await asyncio.sleep(0)
3523        return a + b
3524
3525    def target(self, fail=False, cancel=False, timeout=None,
3526               advance_coro=False):
3527        """Run add coroutine in the event loop."""
3528        coro = self.add(1, 2, fail=fail, cancel=cancel)
3529        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3530        if advance_coro:
3531            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3532            # otherwise it spills errors and breaks **other** unittests, since
3533            # 'target' is interacting with threads.
3534
3535            # With this call, `coro` will be advanced, so that
3536            # CoroWrapper.__del__ won't do anything when asyncio tests run
3537            # in debug mode.
3538            self.loop.call_soon_threadsafe(coro.send, None)
3539        try:
3540            return future.result(timeout)
3541        finally:
3542            future.done() or future.cancel()
3543
3544    def test_run_coroutine_threadsafe(self):
3545        """Test coroutine submission from a thread to an event loop."""
3546        future = self.loop.run_in_executor(None, self.target)
3547        result = self.loop.run_until_complete(future)
3548        self.assertEqual(result, 3)
3549
3550    def test_run_coroutine_threadsafe_with_exception(self):
3551        """Test coroutine submission from a thread to an event loop
3552        when an exception is raised."""
3553        future = self.loop.run_in_executor(None, self.target, True)
3554        with self.assertRaises(RuntimeError) as exc_context:
3555            self.loop.run_until_complete(future)
3556        self.assertIn("Fail!", exc_context.exception.args)
3557
3558    def test_run_coroutine_threadsafe_with_timeout(self):
3559        """Test coroutine submission from a thread to an event loop
3560        when a timeout is raised."""
3561        callback = lambda: self.target(timeout=0)
3562        future = self.loop.run_in_executor(None, callback)
3563        with self.assertRaises(asyncio.TimeoutError):
3564            self.loop.run_until_complete(future)
3565        test_utils.run_briefly(self.loop)
3566        # Check that there's no pending task (add has been cancelled)
3567        for task in asyncio.all_tasks(self.loop):
3568            self.assertTrue(task.done())
3569
3570    def test_run_coroutine_threadsafe_task_cancelled(self):
3571        """Test coroutine submission from a thread to an event loop
3572        when the task is cancelled."""
3573        callback = lambda: self.target(cancel=True)
3574        future = self.loop.run_in_executor(None, callback)
3575        with self.assertRaises(asyncio.CancelledError):
3576            self.loop.run_until_complete(future)
3577
3578    def test_run_coroutine_threadsafe_task_factory_exception(self):
3579        """Test coroutine submission from a thread to an event loop
3580        when the task factory raise an exception."""
3581
3582        def task_factory(loop, coro):
3583            raise NameError
3584
3585        run = self.loop.run_in_executor(
3586            None, lambda: self.target(advance_coro=True))
3587
3588        # Set exception handler
3589        callback = test_utils.MockCallback()
3590        self.loop.set_exception_handler(callback)
3591
3592        # Set corrupted task factory
3593        self.addCleanup(self.loop.set_task_factory,
3594                        self.loop.get_task_factory())
3595        self.loop.set_task_factory(task_factory)
3596
3597        # Run event loop
3598        with self.assertRaises(NameError) as exc_context:
3599            self.loop.run_until_complete(run)
3600
3601        # Check exceptions
3602        self.assertEqual(len(callback.call_args_list), 1)
3603        (loop, context), kwargs = callback.call_args
3604        self.assertEqual(context['exception'], exc_context.exception)
3605
3606
3607class SleepTests(test_utils.TestCase):
3608    def setUp(self):
3609        super().setUp()
3610        self.loop = asyncio.new_event_loop()
3611        self.set_event_loop(self.loop)
3612
3613    def tearDown(self):
3614        self.loop.close()
3615        self.loop = None
3616        super().tearDown()
3617
3618    def test_sleep_zero(self):
3619        result = 0
3620
3621        def inc_result(num):
3622            nonlocal result
3623            result += num
3624
3625        async def coro():
3626            self.loop.call_soon(inc_result, 1)
3627            self.assertEqual(result, 0)
3628            num = await asyncio.sleep(0, result=10)
3629            self.assertEqual(result, 1) # inc'ed by call_soon
3630            inc_result(num) # num should be 11
3631
3632        self.loop.run_until_complete(coro())
3633        self.assertEqual(result, 11)
3634
3635    def test_loop_argument_is_deprecated(self):
3636        # Remove test when loop argument is removed in Python 3.10
3637        with self.assertWarns(DeprecationWarning):
3638            self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
3639
3640
3641class WaitTests(test_utils.TestCase):
3642    def setUp(self):
3643        super().setUp()
3644        self.loop = asyncio.new_event_loop()
3645        self.set_event_loop(self.loop)
3646
3647    def tearDown(self):
3648        self.loop.close()
3649        self.loop = None
3650        super().tearDown()
3651
3652    def test_loop_argument_is_deprecated_in_wait(self):
3653        # Remove test when loop argument is removed in Python 3.10
3654        with self.assertWarns(DeprecationWarning):
3655            self.loop.run_until_complete(
3656                asyncio.wait([coroutine_function()], loop=self.loop))
3657
3658    def test_loop_argument_is_deprecated_in_wait_for(self):
3659        # Remove test when loop argument is removed in Python 3.10
3660        with self.assertWarns(DeprecationWarning):
3661            self.loop.run_until_complete(
3662                asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
3663
3664    def test_coro_is_deprecated_in_wait(self):
3665        # Remove test when passing coros to asyncio.wait() is removed in 3.11
3666        with self.assertWarns(DeprecationWarning):
3667            self.loop.run_until_complete(
3668                asyncio.wait([coroutine_function()]))
3669
3670        task = self.loop.create_task(coroutine_function())
3671        with self.assertWarns(DeprecationWarning):
3672            self.loop.run_until_complete(
3673                asyncio.wait([task, coroutine_function()]))
3674
3675
3676class CompatibilityTests(test_utils.TestCase):
3677    # Tests for checking a bridge between old-styled coroutines
3678    # and async/await syntax
3679
3680    def setUp(self):
3681        super().setUp()
3682        self.loop = asyncio.new_event_loop()
3683        self.set_event_loop(self.loop)
3684
3685    def tearDown(self):
3686        self.loop.close()
3687        self.loop = None
3688        super().tearDown()
3689
3690    def test_yield_from_awaitable(self):
3691
3692        with self.assertWarns(DeprecationWarning):
3693            @asyncio.coroutine
3694            def coro():
3695                yield from asyncio.sleep(0)
3696                return 'ok'
3697
3698        result = self.loop.run_until_complete(coro())
3699        self.assertEqual('ok', result)
3700
3701    def test_await_old_style_coro(self):
3702
3703        with self.assertWarns(DeprecationWarning):
3704            @asyncio.coroutine
3705            def coro1():
3706                return 'ok1'
3707
3708        with self.assertWarns(DeprecationWarning):
3709            @asyncio.coroutine
3710            def coro2():
3711                yield from asyncio.sleep(0)
3712                return 'ok2'
3713
3714        async def inner():
3715            return await asyncio.gather(coro1(), coro2())
3716
3717        result = self.loop.run_until_complete(inner())
3718        self.assertEqual(['ok1', 'ok2'], result)
3719
3720    def test_debug_mode_interop(self):
3721        # https://bugs.python.org/issue32636
3722        code = textwrap.dedent("""
3723            import asyncio
3724
3725            async def native_coro():
3726                pass
3727
3728            @asyncio.coroutine
3729            def old_style_coro():
3730                yield from native_coro()
3731
3732            asyncio.run(old_style_coro())
3733        """)
3734
3735        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3736                         PYTHONASYNCIODEBUG="1")
3737
3738
3739if __name__ == '__main__':
3740    unittest.main()
3741