1"""Tests for tasks.py."""
2
3import collections
4import contextlib
5import contextvars
6import functools
7import gc
8import io
9import random
10import re
11import sys
12import textwrap
13import types
14import unittest
15import weakref
16from unittest import mock
17
18import asyncio
19from asyncio import coroutines
20from asyncio import futures
21from asyncio import tasks
22from test.test_asyncio import utils as test_utils
23from test import support
24from test.support.script_helper import assert_python_ok
25
26
27def tearDownModule():
28    asyncio.set_event_loop_policy(None)
29
30
31async def coroutine_function():
32    pass
33
34
35@contextlib.contextmanager
36def set_coroutine_debug(enabled):
37    coroutines = asyncio.coroutines
38
39    old_debug = coroutines._DEBUG
40    try:
41        coroutines._DEBUG = enabled
42        yield
43    finally:
44        coroutines._DEBUG = old_debug
45
46
47def format_coroutine(qualname, state, src, source_traceback, generator=False):
48    if generator:
49        state = '%s' % state
50    else:
51        state = '%s, defined' % state
52    if source_traceback is not None:
53        frame = source_traceback[-1]
54        return ('coro=<%s() %s at %s> created at %s:%s'
55                % (qualname, state, src, frame[0], frame[1]))
56    else:
57        return 'coro=<%s() %s at %s>' % (qualname, state, src)
58
59
60class Dummy:
61
62    def __repr__(self):
63        return '<Dummy>'
64
65    def __call__(self, *args):
66        pass
67
68
69class CoroLikeObject:
70    def send(self, v):
71        raise StopIteration(42)
72
73    def throw(self, *exc):
74        pass
75
76    def close(self):
77        pass
78
79    def __await__(self):
80        return self
81
82
83class BaseTaskTests:
84
85    Task = None
86    Future = None
87
88    def new_task(self, loop, coro, name='TestTask'):
89        return self.__class__.Task(coro, loop=loop, name=name)
90
91    def new_future(self, loop):
92        return self.__class__.Future(loop=loop)
93
94    def setUp(self):
95        super().setUp()
96        self.loop = self.new_test_loop()
97        self.loop.set_task_factory(self.new_task)
98        self.loop.create_future = lambda: self.new_future(self.loop)
99
100    def test_task_del_collect(self):
101        class Evil:
102            def __del__(self):
103                gc.collect()
104
105        async def run():
106            return Evil()
107
108        self.loop.run_until_complete(
109            asyncio.gather(*[
110                self.new_task(self.loop, run()) for _ in range(100)
111            ], loop=self.loop))
112
113    def test_other_loop_future(self):
114        other_loop = asyncio.new_event_loop()
115        fut = self.new_future(other_loop)
116
117        async def run(fut):
118            await fut
119
120        try:
121            with self.assertRaisesRegex(RuntimeError,
122                                        r'Task .* got Future .* attached'):
123                self.loop.run_until_complete(run(fut))
124        finally:
125            other_loop.close()
126
127    def test_task_awaits_on_itself(self):
128
129        async def test():
130            await task
131
132        task = asyncio.ensure_future(test(), loop=self.loop)
133
134        with self.assertRaisesRegex(RuntimeError,
135                                    'Task cannot await on itself'):
136            self.loop.run_until_complete(task)
137
138    def test_task_class(self):
139        async def notmuch():
140            return 'ok'
141        t = self.new_task(self.loop, notmuch())
142        self.loop.run_until_complete(t)
143        self.assertTrue(t.done())
144        self.assertEqual(t.result(), 'ok')
145        self.assertIs(t._loop, self.loop)
146        self.assertIs(t.get_loop(), self.loop)
147
148        loop = asyncio.new_event_loop()
149        self.set_event_loop(loop)
150        t = self.new_task(loop, notmuch())
151        self.assertIs(t._loop, loop)
152        loop.run_until_complete(t)
153        loop.close()
154
155    def test_ensure_future_coroutine(self):
156        with self.assertWarns(DeprecationWarning):
157            @asyncio.coroutine
158            def notmuch():
159                return 'ok'
160        t = asyncio.ensure_future(notmuch(), loop=self.loop)
161        self.loop.run_until_complete(t)
162        self.assertTrue(t.done())
163        self.assertEqual(t.result(), 'ok')
164        self.assertIs(t._loop, self.loop)
165
166        loop = asyncio.new_event_loop()
167        self.set_event_loop(loop)
168        t = asyncio.ensure_future(notmuch(), loop=loop)
169        self.assertIs(t._loop, loop)
170        loop.run_until_complete(t)
171        loop.close()
172
173    def test_ensure_future_future(self):
174        f_orig = self.new_future(self.loop)
175        f_orig.set_result('ko')
176
177        f = asyncio.ensure_future(f_orig)
178        self.loop.run_until_complete(f)
179        self.assertTrue(f.done())
180        self.assertEqual(f.result(), 'ko')
181        self.assertIs(f, f_orig)
182
183        loop = asyncio.new_event_loop()
184        self.set_event_loop(loop)
185
186        with self.assertRaises(ValueError):
187            f = asyncio.ensure_future(f_orig, loop=loop)
188
189        loop.close()
190
191        f = asyncio.ensure_future(f_orig, loop=self.loop)
192        self.assertIs(f, f_orig)
193
194    def test_ensure_future_task(self):
195        async def notmuch():
196            return 'ok'
197        t_orig = self.new_task(self.loop, notmuch())
198        t = asyncio.ensure_future(t_orig)
199        self.loop.run_until_complete(t)
200        self.assertTrue(t.done())
201        self.assertEqual(t.result(), 'ok')
202        self.assertIs(t, t_orig)
203
204        loop = asyncio.new_event_loop()
205        self.set_event_loop(loop)
206
207        with self.assertRaises(ValueError):
208            t = asyncio.ensure_future(t_orig, loop=loop)
209
210        loop.close()
211
212        t = asyncio.ensure_future(t_orig, loop=self.loop)
213        self.assertIs(t, t_orig)
214
215    def test_ensure_future_awaitable(self):
216        class Aw:
217            def __init__(self, coro):
218                self.coro = coro
219            def __await__(self):
220                return (yield from self.coro)
221
222        with self.assertWarns(DeprecationWarning):
223            @asyncio.coroutine
224            def coro():
225                return 'ok'
226
227        loop = asyncio.new_event_loop()
228        self.set_event_loop(loop)
229        fut = asyncio.ensure_future(Aw(coro()), loop=loop)
230        loop.run_until_complete(fut)
231        assert fut.result() == 'ok'
232
233    def test_ensure_future_neither(self):
234        with self.assertRaises(TypeError):
235            asyncio.ensure_future('ok')
236
237    def test_ensure_future_error_msg(self):
238        loop = asyncio.new_event_loop()
239        f = self.new_future(self.loop)
240        with self.assertRaisesRegex(ValueError, 'The future belongs to a '
241                                    'different loop than the one specified as '
242                                    'the loop argument'):
243            asyncio.ensure_future(f, loop=loop)
244        loop.close()
245
246    def test_get_stack(self):
247        T = None
248
249        async def foo():
250            await bar()
251
252        async def bar():
253            # test get_stack()
254            f = T.get_stack(limit=1)
255            try:
256                self.assertEqual(f[0].f_code.co_name, 'foo')
257            finally:
258                f = None
259
260            # test print_stack()
261            file = io.StringIO()
262            T.print_stack(limit=1, file=file)
263            file.seek(0)
264            tb = file.read()
265            self.assertRegex(tb, r'foo\(\) running')
266
267        async def runner():
268            nonlocal T
269            T = asyncio.ensure_future(foo(), loop=self.loop)
270            await T
271
272        self.loop.run_until_complete(runner())
273
274    def test_task_repr(self):
275        self.loop.set_debug(False)
276
277        async def notmuch():
278            return 'abc'
279
280        # test coroutine function
281        self.assertEqual(notmuch.__name__, 'notmuch')
282        self.assertRegex(notmuch.__qualname__,
283                         r'\w+.test_task_repr.<locals>.notmuch')
284        self.assertEqual(notmuch.__module__, __name__)
285
286        filename, lineno = test_utils.get_function_source(notmuch)
287        src = "%s:%s" % (filename, lineno)
288
289        # test coroutine object
290        gen = notmuch()
291        coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch'
292        self.assertEqual(gen.__name__, 'notmuch')
293        self.assertEqual(gen.__qualname__, coro_qualname)
294
295        # test pending Task
296        t = self.new_task(self.loop, gen)
297        t.add_done_callback(Dummy())
298
299        coro = format_coroutine(coro_qualname, 'running', src,
300                                t._source_traceback, generator=True)
301        self.assertEqual(repr(t),
302                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
303
304        # test cancelling Task
305        t.cancel()  # Does not take immediate effect!
306        self.assertEqual(repr(t),
307                         "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro)
308
309        # test cancelled Task
310        self.assertRaises(asyncio.CancelledError,
311                          self.loop.run_until_complete, t)
312        coro = format_coroutine(coro_qualname, 'done', src,
313                                t._source_traceback)
314        self.assertEqual(repr(t),
315                         "<Task cancelled name='TestTask' %s>" % coro)
316
317        # test finished Task
318        t = self.new_task(self.loop, notmuch())
319        self.loop.run_until_complete(t)
320        coro = format_coroutine(coro_qualname, 'done', src,
321                                t._source_traceback)
322        self.assertEqual(repr(t),
323                         "<Task finished name='TestTask' %s result='abc'>" % coro)
324
325    def test_task_repr_autogenerated(self):
326        async def notmuch():
327            return 123
328
329        t1 = self.new_task(self.loop, notmuch(), None)
330        t2 = self.new_task(self.loop, notmuch(), None)
331        self.assertNotEqual(repr(t1), repr(t2))
332
333        match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1))
334        self.assertIsNotNone(match1)
335        match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2))
336        self.assertIsNotNone(match2)
337
338        # Autogenerated task names should have monotonically increasing numbers
339        self.assertLess(int(match1.group(1)), int(match2.group(1)))
340        self.loop.run_until_complete(t1)
341        self.loop.run_until_complete(t2)
342
343    def test_task_repr_name_not_str(self):
344        async def notmuch():
345            return 123
346
347        t = self.new_task(self.loop, notmuch())
348        t.set_name({6})
349        self.assertEqual(t.get_name(), '{6}')
350        self.loop.run_until_complete(t)
351
352    def test_task_repr_coro_decorator(self):
353        self.loop.set_debug(False)
354
355        with self.assertWarns(DeprecationWarning):
356            @asyncio.coroutine
357            def notmuch():
358                # notmuch() function doesn't use yield from: it will be wrapped by
359                # @coroutine decorator
360                return 123
361
362        # test coroutine function
363        self.assertEqual(notmuch.__name__, 'notmuch')
364        self.assertRegex(notmuch.__qualname__,
365                         r'\w+.test_task_repr_coro_decorator'
366                         r'\.<locals>\.notmuch')
367        self.assertEqual(notmuch.__module__, __name__)
368
369        # test coroutine object
370        gen = notmuch()
371        # On Python >= 3.5, generators now inherit the name of the
372        # function, as expected, and have a qualified name (__qualname__
373        # attribute).
374        coro_name = 'notmuch'
375        coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator'
376                         '.<locals>.notmuch')
377        self.assertEqual(gen.__name__, coro_name)
378        self.assertEqual(gen.__qualname__, coro_qualname)
379
380        # test repr(CoroWrapper)
381        if coroutines._DEBUG:
382            # format the coroutine object
383            if coroutines._DEBUG:
384                filename, lineno = test_utils.get_function_source(notmuch)
385                frame = gen._source_traceback[-1]
386                coro = ('%s() running, defined at %s:%s, created at %s:%s'
387                        % (coro_qualname, filename, lineno,
388                           frame[0], frame[1]))
389            else:
390                code = gen.gi_code
391                coro = ('%s() running at %s:%s'
392                        % (coro_qualname, code.co_filename,
393                           code.co_firstlineno))
394
395            self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro)
396
397        # test pending Task
398        t = self.new_task(self.loop, gen)
399        t.add_done_callback(Dummy())
400
401        # format the coroutine object
402        if coroutines._DEBUG:
403            src = '%s:%s' % test_utils.get_function_source(notmuch)
404        else:
405            code = gen.gi_code
406            src = '%s:%s' % (code.co_filename, code.co_firstlineno)
407        coro = format_coroutine(coro_qualname, 'running', src,
408                                t._source_traceback,
409                                generator=not coroutines._DEBUG)
410        self.assertEqual(repr(t),
411                         "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro)
412        self.loop.run_until_complete(t)
413
414    def test_task_repr_wait_for(self):
415        self.loop.set_debug(False)
416
417        async def wait_for(fut):
418            return await fut
419
420        fut = self.new_future(self.loop)
421        task = self.new_task(self.loop, wait_for(fut))
422        test_utils.run_briefly(self.loop)
423        self.assertRegex(repr(task),
424                         '<Task .* wait_for=%s>' % re.escape(repr(fut)))
425
426        fut.set_result(None)
427        self.loop.run_until_complete(task)
428
429    def test_task_repr_partial_corowrapper(self):
430        # Issue #222: repr(CoroWrapper) must not fail in debug mode if the
431        # coroutine is a partial function
432        with set_coroutine_debug(True):
433            self.loop.set_debug(True)
434
435            async def func(x, y):
436                await asyncio.sleep(0)
437
438            with self.assertWarns(DeprecationWarning):
439                partial_func = asyncio.coroutine(functools.partial(func, 1))
440            task = self.loop.create_task(partial_func(2))
441
442            # make warnings quiet
443            task._log_destroy_pending = False
444            self.addCleanup(task._coro.close)
445
446        coro_repr = repr(task._coro)
447        expected = (
448            r'<coroutine object \w+\.test_task_repr_partial_corowrapper'
449            r'\.<locals>\.func at'
450        )
451        self.assertRegex(coro_repr, expected)
452
453    def test_task_basics(self):
454
455        async def outer():
456            a = await inner1()
457            b = await inner2()
458            return a+b
459
460        async def inner1():
461            return 42
462
463        async def inner2():
464            return 1000
465
466        t = outer()
467        self.assertEqual(self.loop.run_until_complete(t), 1042)
468
469    def test_cancel(self):
470
471        def gen():
472            when = yield
473            self.assertAlmostEqual(10.0, when)
474            yield 0
475
476        loop = self.new_test_loop(gen)
477
478        async def task():
479            await asyncio.sleep(10.0)
480            return 12
481
482        t = self.new_task(loop, task())
483        loop.call_soon(t.cancel)
484        with self.assertRaises(asyncio.CancelledError):
485            loop.run_until_complete(t)
486        self.assertTrue(t.done())
487        self.assertTrue(t.cancelled())
488        self.assertFalse(t.cancel())
489
490    def test_cancel_yield(self):
491        with self.assertWarns(DeprecationWarning):
492            @asyncio.coroutine
493            def task():
494                yield
495                yield
496                return 12
497
498        t = self.new_task(self.loop, task())
499        test_utils.run_briefly(self.loop)  # start coro
500        t.cancel()
501        self.assertRaises(
502            asyncio.CancelledError, self.loop.run_until_complete, t)
503        self.assertTrue(t.done())
504        self.assertTrue(t.cancelled())
505        self.assertFalse(t.cancel())
506
507    def test_cancel_inner_future(self):
508        f = self.new_future(self.loop)
509
510        async def task():
511            await f
512            return 12
513
514        t = self.new_task(self.loop, task())
515        test_utils.run_briefly(self.loop)  # start task
516        f.cancel()
517        with self.assertRaises(asyncio.CancelledError):
518            self.loop.run_until_complete(t)
519        self.assertTrue(f.cancelled())
520        self.assertTrue(t.cancelled())
521
522    def test_cancel_both_task_and_inner_future(self):
523        f = self.new_future(self.loop)
524
525        async def task():
526            await f
527            return 12
528
529        t = self.new_task(self.loop, task())
530        test_utils.run_briefly(self.loop)
531
532        f.cancel()
533        t.cancel()
534
535        with self.assertRaises(asyncio.CancelledError):
536            self.loop.run_until_complete(t)
537
538        self.assertTrue(t.done())
539        self.assertTrue(f.cancelled())
540        self.assertTrue(t.cancelled())
541
542    def test_cancel_task_catching(self):
543        fut1 = self.new_future(self.loop)
544        fut2 = self.new_future(self.loop)
545
546        async def task():
547            await fut1
548            try:
549                await fut2
550            except asyncio.CancelledError:
551                return 42
552
553        t = self.new_task(self.loop, task())
554        test_utils.run_briefly(self.loop)
555        self.assertIs(t._fut_waiter, fut1)  # White-box test.
556        fut1.set_result(None)
557        test_utils.run_briefly(self.loop)
558        self.assertIs(t._fut_waiter, fut2)  # White-box test.
559        t.cancel()
560        self.assertTrue(fut2.cancelled())
561        res = self.loop.run_until_complete(t)
562        self.assertEqual(res, 42)
563        self.assertFalse(t.cancelled())
564
565    def test_cancel_task_ignoring(self):
566        fut1 = self.new_future(self.loop)
567        fut2 = self.new_future(self.loop)
568        fut3 = self.new_future(self.loop)
569
570        async def task():
571            await fut1
572            try:
573                await fut2
574            except asyncio.CancelledError:
575                pass
576            res = await fut3
577            return res
578
579        t = self.new_task(self.loop, task())
580        test_utils.run_briefly(self.loop)
581        self.assertIs(t._fut_waiter, fut1)  # White-box test.
582        fut1.set_result(None)
583        test_utils.run_briefly(self.loop)
584        self.assertIs(t._fut_waiter, fut2)  # White-box test.
585        t.cancel()
586        self.assertTrue(fut2.cancelled())
587        test_utils.run_briefly(self.loop)
588        self.assertIs(t._fut_waiter, fut3)  # White-box test.
589        fut3.set_result(42)
590        res = self.loop.run_until_complete(t)
591        self.assertEqual(res, 42)
592        self.assertFalse(fut3.cancelled())
593        self.assertFalse(t.cancelled())
594
595    def test_cancel_current_task(self):
596        loop = asyncio.new_event_loop()
597        self.set_event_loop(loop)
598
599        async def task():
600            t.cancel()
601            self.assertTrue(t._must_cancel)  # White-box test.
602            # The sleep should be cancelled immediately.
603            await asyncio.sleep(100)
604            return 12
605
606        t = self.new_task(loop, task())
607        self.assertFalse(t.cancelled())
608        self.assertRaises(
609            asyncio.CancelledError, loop.run_until_complete, t)
610        self.assertTrue(t.done())
611        self.assertTrue(t.cancelled())
612        self.assertFalse(t._must_cancel)  # White-box test.
613        self.assertFalse(t.cancel())
614
615    def test_cancel_at_end(self):
616        """coroutine end right after task is cancelled"""
617        loop = asyncio.new_event_loop()
618        self.set_event_loop(loop)
619
620        async def task():
621            t.cancel()
622            self.assertTrue(t._must_cancel)  # White-box test.
623            return 12
624
625        t = self.new_task(loop, task())
626        self.assertFalse(t.cancelled())
627        self.assertRaises(
628            asyncio.CancelledError, loop.run_until_complete, t)
629        self.assertTrue(t.done())
630        self.assertTrue(t.cancelled())
631        self.assertFalse(t._must_cancel)  # White-box test.
632        self.assertFalse(t.cancel())
633
634    def test_cancel_awaited_task(self):
635        # This tests for a relatively rare condition when
636        # a task cancellation is requested for a task which is not
637        # currently blocked, such as a task cancelling itself.
638        # In this situation we must ensure that whatever next future
639        # or task the cancelled task blocks on is cancelled correctly
640        # as well.  See also bpo-34872.
641        loop = asyncio.new_event_loop()
642        self.addCleanup(lambda: loop.close())
643
644        task = nested_task = None
645        fut = self.new_future(loop)
646
647        async def nested():
648            await fut
649
650        async def coro():
651            nonlocal nested_task
652            # Create a sub-task and wait for it to run.
653            nested_task = self.new_task(loop, nested())
654            await asyncio.sleep(0)
655
656            # Request the current task to be cancelled.
657            task.cancel()
658            # Block on the nested task, which should be immediately
659            # cancelled.
660            await nested_task
661
662        task = self.new_task(loop, coro())
663        with self.assertRaises(asyncio.CancelledError):
664            loop.run_until_complete(task)
665
666        self.assertTrue(task.cancelled())
667        self.assertTrue(nested_task.cancelled())
668        self.assertTrue(fut.cancelled())
669
670    def test_stop_while_run_in_complete(self):
671
672        def gen():
673            when = yield
674            self.assertAlmostEqual(0.1, when)
675            when = yield 0.1
676            self.assertAlmostEqual(0.2, when)
677            when = yield 0.1
678            self.assertAlmostEqual(0.3, when)
679            yield 0.1
680
681        loop = self.new_test_loop(gen)
682
683        x = 0
684
685        async def task():
686            nonlocal x
687            while x < 10:
688                await asyncio.sleep(0.1)
689                x += 1
690                if x == 2:
691                    loop.stop()
692
693        t = self.new_task(loop, task())
694        with self.assertRaises(RuntimeError) as cm:
695            loop.run_until_complete(t)
696        self.assertEqual(str(cm.exception),
697                         'Event loop stopped before Future completed.')
698        self.assertFalse(t.done())
699        self.assertEqual(x, 2)
700        self.assertAlmostEqual(0.3, loop.time())
701
702        t.cancel()
703        self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t)
704
705    def test_log_traceback(self):
706        async def coro():
707            pass
708
709        task = self.new_task(self.loop, coro())
710        with self.assertRaisesRegex(ValueError, 'can only be set to False'):
711            task._log_traceback = True
712        self.loop.run_until_complete(task)
713
714    def test_wait_for_timeout_less_then_0_or_0_future_done(self):
715        def gen():
716            when = yield
717            self.assertAlmostEqual(0, when)
718
719        loop = self.new_test_loop(gen)
720
721        fut = self.new_future(loop)
722        fut.set_result('done')
723
724        ret = loop.run_until_complete(asyncio.wait_for(fut, 0))
725
726        self.assertEqual(ret, 'done')
727        self.assertTrue(fut.done())
728        self.assertAlmostEqual(0, loop.time())
729
730    def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self):
731        def gen():
732            when = yield
733            self.assertAlmostEqual(0, when)
734
735        loop = self.new_test_loop(gen)
736
737        foo_started = False
738
739        async def foo():
740            nonlocal foo_started
741            foo_started = True
742
743        with self.assertRaises(asyncio.TimeoutError):
744            loop.run_until_complete(asyncio.wait_for(foo(), 0))
745
746        self.assertAlmostEqual(0, loop.time())
747        self.assertEqual(foo_started, False)
748
749    def test_wait_for_timeout_less_then_0_or_0(self):
750        def gen():
751            when = yield
752            self.assertAlmostEqual(0.2, when)
753            when = yield 0
754            self.assertAlmostEqual(0, when)
755
756        for timeout in [0, -1]:
757            with self.subTest(timeout=timeout):
758                loop = self.new_test_loop(gen)
759
760                foo_running = None
761
762                async def foo():
763                    nonlocal foo_running
764                    foo_running = True
765                    try:
766                        await asyncio.sleep(0.2)
767                    finally:
768                        foo_running = False
769                    return 'done'
770
771                fut = self.new_task(loop, foo())
772
773                with self.assertRaises(asyncio.TimeoutError):
774                    loop.run_until_complete(asyncio.wait_for(fut, timeout))
775                self.assertTrue(fut.done())
776                # it should have been cancelled due to the timeout
777                self.assertTrue(fut.cancelled())
778                self.assertAlmostEqual(0, loop.time())
779                self.assertEqual(foo_running, False)
780
781    def test_wait_for(self):
782
783        def gen():
784            when = yield
785            self.assertAlmostEqual(0.2, when)
786            when = yield 0
787            self.assertAlmostEqual(0.1, when)
788            when = yield 0.1
789
790        loop = self.new_test_loop(gen)
791
792        foo_running = None
793
794        async def foo():
795            nonlocal foo_running
796            foo_running = True
797            try:
798                await asyncio.sleep(0.2)
799            finally:
800                foo_running = False
801            return 'done'
802
803        fut = self.new_task(loop, foo())
804
805        with self.assertRaises(asyncio.TimeoutError):
806            loop.run_until_complete(asyncio.wait_for(fut, 0.1))
807        self.assertTrue(fut.done())
808        # it should have been cancelled due to the timeout
809        self.assertTrue(fut.cancelled())
810        self.assertAlmostEqual(0.1, loop.time())
811        self.assertEqual(foo_running, False)
812
813    def test_wait_for_blocking(self):
814        loop = self.new_test_loop()
815
816        async def coro():
817            return 'done'
818
819        res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None))
820        self.assertEqual(res, 'done')
821
822    def test_wait_for_with_global_loop(self):
823
824        def gen():
825            when = yield
826            self.assertAlmostEqual(0.2, when)
827            when = yield 0
828            self.assertAlmostEqual(0.01, when)
829            yield 0.01
830
831        loop = self.new_test_loop(gen)
832
833        async def foo():
834            await asyncio.sleep(0.2)
835            return 'done'
836
837        asyncio.set_event_loop(loop)
838        try:
839            fut = self.new_task(loop, foo())
840            with self.assertRaises(asyncio.TimeoutError):
841                loop.run_until_complete(asyncio.wait_for(fut, 0.01))
842        finally:
843            asyncio.set_event_loop(None)
844
845        self.assertAlmostEqual(0.01, loop.time())
846        self.assertTrue(fut.done())
847        self.assertTrue(fut.cancelled())
848
849    def test_wait_for_race_condition(self):
850
851        def gen():
852            yield 0.1
853            yield 0.1
854            yield 0.1
855
856        loop = self.new_test_loop(gen)
857
858        fut = self.new_future(loop)
859        task = asyncio.wait_for(fut, timeout=0.2)
860        loop.call_later(0.1, fut.set_result, "ok")
861        res = loop.run_until_complete(task)
862        self.assertEqual(res, "ok")
863
864    def test_wait_for_cancellation_race_condition(self):
865        def gen():
866            yield 0.1
867            yield 0.1
868            yield 0.1
869            yield 0.1
870
871        loop = self.new_test_loop(gen)
872
873        fut = self.new_future(loop)
874        loop.call_later(0.1, fut.set_result, "ok")
875        task = loop.create_task(asyncio.wait_for(fut, timeout=1))
876        loop.call_later(0.1, task.cancel)
877        res = loop.run_until_complete(task)
878        self.assertEqual(res, "ok")
879
880    def test_wait_for_waits_for_task_cancellation(self):
881        loop = asyncio.new_event_loop()
882        self.addCleanup(loop.close)
883
884        task_done = False
885
886        async def foo():
887            async def inner():
888                nonlocal task_done
889                try:
890                    await asyncio.sleep(0.2)
891                except asyncio.CancelledError:
892                    await asyncio.sleep(0.1)
893                    raise
894                finally:
895                    task_done = True
896
897            inner_task = self.new_task(loop, inner())
898
899            with self.assertRaises(asyncio.TimeoutError):
900                await asyncio.wait_for(inner_task, timeout=0.1)
901
902            self.assertTrue(task_done)
903
904        loop.run_until_complete(foo())
905
906    def test_wait_for_waits_for_task_cancellation_w_timeout_0(self):
907        loop = asyncio.new_event_loop()
908        self.addCleanup(loop.close)
909
910        task_done = False
911
912        async def foo():
913            async def inner():
914                nonlocal task_done
915                try:
916                    await asyncio.sleep(10)
917                except asyncio.CancelledError:
918                    await asyncio.sleep(0.1)
919                    raise
920                finally:
921                    task_done = True
922
923            inner_task = self.new_task(loop, inner())
924            await asyncio.sleep(0.1)
925            await asyncio.wait_for(inner_task, timeout=0)
926
927        with self.assertRaises(asyncio.TimeoutError) as cm:
928            loop.run_until_complete(foo())
929
930        self.assertTrue(task_done)
931        chained = cm.exception.__context__
932        self.assertEqual(type(chained), asyncio.CancelledError)
933
934    def test_wait_for_self_cancellation(self):
935        loop = asyncio.new_event_loop()
936        self.addCleanup(loop.close)
937
938        async def foo():
939            async def inner():
940                try:
941                    await asyncio.sleep(0.3)
942                except asyncio.CancelledError:
943                    try:
944                        await asyncio.sleep(0.3)
945                    except asyncio.CancelledError:
946                        await asyncio.sleep(0.3)
947
948                return 42
949
950            inner_task = self.new_task(loop, inner())
951
952            wait = asyncio.wait_for(inner_task, timeout=0.1)
953
954            # Test that wait_for itself is properly cancellable
955            # even when the initial task holds up the initial cancellation.
956            task = self.new_task(loop, wait)
957            await asyncio.sleep(0.2)
958            task.cancel()
959
960            with self.assertRaises(asyncio.CancelledError):
961                await task
962
963            self.assertEqual(await inner_task, 42)
964
965        loop.run_until_complete(foo())
966
967    def test_wait(self):
968
969        def gen():
970            when = yield
971            self.assertAlmostEqual(0.1, when)
972            when = yield 0
973            self.assertAlmostEqual(0.15, when)
974            yield 0.15
975
976        loop = self.new_test_loop(gen)
977
978        a = self.new_task(loop, asyncio.sleep(0.1))
979        b = self.new_task(loop, asyncio.sleep(0.15))
980
981        async def foo():
982            done, pending = await asyncio.wait([b, a])
983            self.assertEqual(done, set([a, b]))
984            self.assertEqual(pending, set())
985            return 42
986
987        res = loop.run_until_complete(self.new_task(loop, foo()))
988        self.assertEqual(res, 42)
989        self.assertAlmostEqual(0.15, loop.time())
990
991        # Doing it again should take no time and exercise a different path.
992        res = loop.run_until_complete(self.new_task(loop, foo()))
993        self.assertAlmostEqual(0.15, loop.time())
994        self.assertEqual(res, 42)
995
996    def test_wait_with_global_loop(self):
997
998        def gen():
999            when = yield
1000            self.assertAlmostEqual(0.01, when)
1001            when = yield 0
1002            self.assertAlmostEqual(0.015, when)
1003            yield 0.015
1004
1005        loop = self.new_test_loop(gen)
1006
1007        a = self.new_task(loop, asyncio.sleep(0.01))
1008        b = self.new_task(loop, asyncio.sleep(0.015))
1009
1010        async def foo():
1011            done, pending = await asyncio.wait([b, a])
1012            self.assertEqual(done, set([a, b]))
1013            self.assertEqual(pending, set())
1014            return 42
1015
1016        asyncio.set_event_loop(loop)
1017        res = loop.run_until_complete(
1018            self.new_task(loop, foo()))
1019
1020        self.assertEqual(res, 42)
1021
1022    def test_wait_duplicate_coroutines(self):
1023
1024        with self.assertWarns(DeprecationWarning):
1025            @asyncio.coroutine
1026            def coro(s):
1027                return s
1028        c = coro('test')
1029
1030        task =self.new_task(
1031            self.loop,
1032            asyncio.wait([c, c, coro('spam')]))
1033
1034        done, pending = self.loop.run_until_complete(task)
1035
1036        self.assertFalse(pending)
1037        self.assertEqual(set(f.result() for f in done), {'test', 'spam'})
1038
1039    def test_wait_errors(self):
1040        self.assertRaises(
1041            ValueError, self.loop.run_until_complete,
1042            asyncio.wait(set()))
1043
1044        # -1 is an invalid return_when value
1045        sleep_coro = asyncio.sleep(10.0)
1046        wait_coro = asyncio.wait([sleep_coro], return_when=-1)
1047        self.assertRaises(ValueError,
1048                          self.loop.run_until_complete, wait_coro)
1049
1050        sleep_coro.close()
1051
1052    def test_wait_first_completed(self):
1053
1054        def gen():
1055            when = yield
1056            self.assertAlmostEqual(10.0, when)
1057            when = yield 0
1058            self.assertAlmostEqual(0.1, when)
1059            yield 0.1
1060
1061        loop = self.new_test_loop(gen)
1062
1063        a = self.new_task(loop, asyncio.sleep(10.0))
1064        b = self.new_task(loop, asyncio.sleep(0.1))
1065        task = self.new_task(
1066            loop,
1067            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1068
1069        done, pending = loop.run_until_complete(task)
1070        self.assertEqual({b}, done)
1071        self.assertEqual({a}, pending)
1072        self.assertFalse(a.done())
1073        self.assertTrue(b.done())
1074        self.assertIsNone(b.result())
1075        self.assertAlmostEqual(0.1, loop.time())
1076
1077        # move forward to close generator
1078        loop.advance_time(10)
1079        loop.run_until_complete(asyncio.wait([a, b]))
1080
1081    def test_wait_really_done(self):
1082        # there is possibility that some tasks in the pending list
1083        # became done but their callbacks haven't all been called yet
1084
1085        async def coro1():
1086            await asyncio.sleep(0)
1087
1088        async def coro2():
1089            await asyncio.sleep(0)
1090            await asyncio.sleep(0)
1091
1092        a = self.new_task(self.loop, coro1())
1093        b = self.new_task(self.loop, coro2())
1094        task = self.new_task(
1095            self.loop,
1096            asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED))
1097
1098        done, pending = self.loop.run_until_complete(task)
1099        self.assertEqual({a, b}, done)
1100        self.assertTrue(a.done())
1101        self.assertIsNone(a.result())
1102        self.assertTrue(b.done())
1103        self.assertIsNone(b.result())
1104
1105    def test_wait_first_exception(self):
1106
1107        def gen():
1108            when = yield
1109            self.assertAlmostEqual(10.0, when)
1110            yield 0
1111
1112        loop = self.new_test_loop(gen)
1113
1114        # first_exception, task already has exception
1115        a = self.new_task(loop, asyncio.sleep(10.0))
1116
1117        async def exc():
1118            raise ZeroDivisionError('err')
1119
1120        b = self.new_task(loop, exc())
1121        task = self.new_task(
1122            loop,
1123            asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION))
1124
1125        done, pending = loop.run_until_complete(task)
1126        self.assertEqual({b}, done)
1127        self.assertEqual({a}, pending)
1128        self.assertAlmostEqual(0, loop.time())
1129
1130        # move forward to close generator
1131        loop.advance_time(10)
1132        loop.run_until_complete(asyncio.wait([a, b]))
1133
1134    def test_wait_first_exception_in_wait(self):
1135
1136        def gen():
1137            when = yield
1138            self.assertAlmostEqual(10.0, when)
1139            when = yield 0
1140            self.assertAlmostEqual(0.01, when)
1141            yield 0.01
1142
1143        loop = self.new_test_loop(gen)
1144
1145        # first_exception, exception during waiting
1146        a = self.new_task(loop, asyncio.sleep(10.0))
1147
1148        async def exc():
1149            await asyncio.sleep(0.01)
1150            raise ZeroDivisionError('err')
1151
1152        b = self.new_task(loop, exc())
1153        task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)
1154
1155        done, pending = loop.run_until_complete(task)
1156        self.assertEqual({b}, done)
1157        self.assertEqual({a}, pending)
1158        self.assertAlmostEqual(0.01, loop.time())
1159
1160        # move forward to close generator
1161        loop.advance_time(10)
1162        loop.run_until_complete(asyncio.wait([a, b]))
1163
1164    def test_wait_with_exception(self):
1165
1166        def gen():
1167            when = yield
1168            self.assertAlmostEqual(0.1, when)
1169            when = yield 0
1170            self.assertAlmostEqual(0.15, when)
1171            yield 0.15
1172
1173        loop = self.new_test_loop(gen)
1174
1175        a = self.new_task(loop, asyncio.sleep(0.1))
1176
1177        async def sleeper():
1178            await asyncio.sleep(0.15)
1179            raise ZeroDivisionError('really')
1180
1181        b = self.new_task(loop, sleeper())
1182
1183        async def foo():
1184            done, pending = await asyncio.wait([b, a])
1185            self.assertEqual(len(done), 2)
1186            self.assertEqual(pending, set())
1187            errors = set(f for f in done if f.exception() is not None)
1188            self.assertEqual(len(errors), 1)
1189
1190        loop.run_until_complete(self.new_task(loop, foo()))
1191        self.assertAlmostEqual(0.15, loop.time())
1192
1193        loop.run_until_complete(self.new_task(loop, foo()))
1194        self.assertAlmostEqual(0.15, loop.time())
1195
1196    def test_wait_with_timeout(self):
1197
1198        def gen():
1199            when = yield
1200            self.assertAlmostEqual(0.1, when)
1201            when = yield 0
1202            self.assertAlmostEqual(0.15, when)
1203            when = yield 0
1204            self.assertAlmostEqual(0.11, when)
1205            yield 0.11
1206
1207        loop = self.new_test_loop(gen)
1208
1209        a = self.new_task(loop, asyncio.sleep(0.1))
1210        b = self.new_task(loop, asyncio.sleep(0.15))
1211
1212        async def foo():
1213            done, pending = await asyncio.wait([b, a], timeout=0.11)
1214            self.assertEqual(done, set([a]))
1215            self.assertEqual(pending, set([b]))
1216
1217        loop.run_until_complete(self.new_task(loop, foo()))
1218        self.assertAlmostEqual(0.11, loop.time())
1219
1220        # move forward to close generator
1221        loop.advance_time(10)
1222        loop.run_until_complete(asyncio.wait([a, b]))
1223
1224    def test_wait_concurrent_complete(self):
1225
1226        def gen():
1227            when = yield
1228            self.assertAlmostEqual(0.1, when)
1229            when = yield 0
1230            self.assertAlmostEqual(0.15, when)
1231            when = yield 0
1232            self.assertAlmostEqual(0.1, when)
1233            yield 0.1
1234
1235        loop = self.new_test_loop(gen)
1236
1237        a = self.new_task(loop, asyncio.sleep(0.1))
1238        b = self.new_task(loop, asyncio.sleep(0.15))
1239
1240        done, pending = loop.run_until_complete(
1241            asyncio.wait([b, a], timeout=0.1))
1242
1243        self.assertEqual(done, set([a]))
1244        self.assertEqual(pending, set([b]))
1245        self.assertAlmostEqual(0.1, loop.time())
1246
1247        # move forward to close generator
1248        loop.advance_time(10)
1249        loop.run_until_complete(asyncio.wait([a, b]))
1250
1251    def test_as_completed(self):
1252
1253        def gen():
1254            yield 0
1255            yield 0
1256            yield 0.01
1257            yield 0
1258
1259        loop = self.new_test_loop(gen)
1260        # disable "slow callback" warning
1261        loop.slow_callback_duration = 1.0
1262        completed = set()
1263        time_shifted = False
1264
1265        with self.assertWarns(DeprecationWarning):
1266            @asyncio.coroutine
1267            def sleeper(dt, x):
1268                nonlocal time_shifted
1269                yield from  asyncio.sleep(dt)
1270                completed.add(x)
1271                if not time_shifted and 'a' in completed and 'b' in completed:
1272                    time_shifted = True
1273                    loop.advance_time(0.14)
1274                return x
1275
1276        a = sleeper(0.01, 'a')
1277        b = sleeper(0.01, 'b')
1278        c = sleeper(0.15, 'c')
1279
1280        async def foo():
1281            values = []
1282            for f in asyncio.as_completed([b, c, a], loop=loop):
1283                values.append(await f)
1284            return values
1285        with self.assertWarns(DeprecationWarning):
1286            res = loop.run_until_complete(self.new_task(loop, foo()))
1287        self.assertAlmostEqual(0.15, loop.time())
1288        self.assertTrue('a' in res[:2])
1289        self.assertTrue('b' in res[:2])
1290        self.assertEqual(res[2], 'c')
1291
1292        # Doing it again should take no time and exercise a different path.
1293        with self.assertWarns(DeprecationWarning):
1294            res = loop.run_until_complete(self.new_task(loop, foo()))
1295        self.assertAlmostEqual(0.15, loop.time())
1296
1297    def test_as_completed_with_timeout(self):
1298
1299        def gen():
1300            yield
1301            yield 0
1302            yield 0
1303            yield 0.1
1304
1305        loop = self.new_test_loop(gen)
1306
1307        a = loop.create_task(asyncio.sleep(0.1, 'a'))
1308        b = loop.create_task(asyncio.sleep(0.15, 'b'))
1309
1310        async def foo():
1311            values = []
1312            for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop):
1313                if values:
1314                    loop.advance_time(0.02)
1315                try:
1316                    v = await f
1317                    values.append((1, v))
1318                except asyncio.TimeoutError as exc:
1319                    values.append((2, exc))
1320            return values
1321
1322        with self.assertWarns(DeprecationWarning):
1323            res = loop.run_until_complete(self.new_task(loop, foo()))
1324        self.assertEqual(len(res), 2, res)
1325        self.assertEqual(res[0], (1, 'a'))
1326        self.assertEqual(res[1][0], 2)
1327        self.assertIsInstance(res[1][1], asyncio.TimeoutError)
1328        self.assertAlmostEqual(0.12, loop.time())
1329
1330        # move forward to close generator
1331        loop.advance_time(10)
1332        loop.run_until_complete(asyncio.wait([a, b]))
1333
1334    def test_as_completed_with_unused_timeout(self):
1335
1336        def gen():
1337            yield
1338            yield 0
1339            yield 0.01
1340
1341        loop = self.new_test_loop(gen)
1342
1343        a = asyncio.sleep(0.01, 'a')
1344
1345        async def foo():
1346            for f in asyncio.as_completed([a], timeout=1, loop=loop):
1347                v = await f
1348                self.assertEqual(v, 'a')
1349
1350        with self.assertWarns(DeprecationWarning):
1351            loop.run_until_complete(self.new_task(loop, foo()))
1352
1353    def test_as_completed_reverse_wait(self):
1354
1355        def gen():
1356            yield 0
1357            yield 0.05
1358            yield 0
1359
1360        loop = self.new_test_loop(gen)
1361
1362        a = asyncio.sleep(0.05, 'a')
1363        b = asyncio.sleep(0.10, 'b')
1364        fs = {a, b}
1365
1366        with self.assertWarns(DeprecationWarning):
1367            futs = list(asyncio.as_completed(fs, loop=loop))
1368        self.assertEqual(len(futs), 2)
1369
1370        x = loop.run_until_complete(futs[1])
1371        self.assertEqual(x, 'a')
1372        self.assertAlmostEqual(0.05, loop.time())
1373        loop.advance_time(0.05)
1374        y = loop.run_until_complete(futs[0])
1375        self.assertEqual(y, 'b')
1376        self.assertAlmostEqual(0.10, loop.time())
1377
1378    def test_as_completed_concurrent(self):
1379
1380        def gen():
1381            when = yield
1382            self.assertAlmostEqual(0.05, when)
1383            when = yield 0
1384            self.assertAlmostEqual(0.05, when)
1385            yield 0.05
1386
1387        loop = self.new_test_loop(gen)
1388
1389        a = asyncio.sleep(0.05, 'a')
1390        b = asyncio.sleep(0.05, 'b')
1391        fs = {a, b}
1392        with self.assertWarns(DeprecationWarning):
1393            futs = list(asyncio.as_completed(fs, loop=loop))
1394        self.assertEqual(len(futs), 2)
1395        waiter = asyncio.wait(futs)
1396        done, pending = loop.run_until_complete(waiter)
1397        self.assertEqual(set(f.result() for f in done), {'a', 'b'})
1398
1399    def test_as_completed_duplicate_coroutines(self):
1400
1401        with self.assertWarns(DeprecationWarning):
1402            @asyncio.coroutine
1403            def coro(s):
1404                return s
1405
1406        with self.assertWarns(DeprecationWarning):
1407            @asyncio.coroutine
1408            def runner():
1409                result = []
1410                c = coro('ham')
1411                for f in asyncio.as_completed([c, c, coro('spam')],
1412                                              loop=self.loop):
1413                    result.append((yield from f))
1414                return result
1415
1416        with self.assertWarns(DeprecationWarning):
1417            fut = self.new_task(self.loop, runner())
1418            self.loop.run_until_complete(fut)
1419        result = fut.result()
1420        self.assertEqual(set(result), {'ham', 'spam'})
1421        self.assertEqual(len(result), 2)
1422
1423    def test_sleep(self):
1424
1425        def gen():
1426            when = yield
1427            self.assertAlmostEqual(0.05, when)
1428            when = yield 0.05
1429            self.assertAlmostEqual(0.1, when)
1430            yield 0.05
1431
1432        loop = self.new_test_loop(gen)
1433
1434        async def sleeper(dt, arg):
1435            await asyncio.sleep(dt/2)
1436            res = await asyncio.sleep(dt/2, arg)
1437            return res
1438
1439        t = self.new_task(loop, sleeper(0.1, 'yeah'))
1440        loop.run_until_complete(t)
1441        self.assertTrue(t.done())
1442        self.assertEqual(t.result(), 'yeah')
1443        self.assertAlmostEqual(0.1, loop.time())
1444
1445    def test_sleep_cancel(self):
1446
1447        def gen():
1448            when = yield
1449            self.assertAlmostEqual(10.0, when)
1450            yield 0
1451
1452        loop = self.new_test_loop(gen)
1453
1454        t = self.new_task(loop, asyncio.sleep(10.0, 'yeah'))
1455
1456        handle = None
1457        orig_call_later = loop.call_later
1458
1459        def call_later(delay, callback, *args):
1460            nonlocal handle
1461            handle = orig_call_later(delay, callback, *args)
1462            return handle
1463
1464        loop.call_later = call_later
1465        test_utils.run_briefly(loop)
1466
1467        self.assertFalse(handle._cancelled)
1468
1469        t.cancel()
1470        test_utils.run_briefly(loop)
1471        self.assertTrue(handle._cancelled)
1472
1473    def test_task_cancel_sleeping_task(self):
1474
1475        def gen():
1476            when = yield
1477            self.assertAlmostEqual(0.1, when)
1478            when = yield 0
1479            self.assertAlmostEqual(5000, when)
1480            yield 0.1
1481
1482        loop = self.new_test_loop(gen)
1483
1484        async def sleep(dt):
1485            await asyncio.sleep(dt)
1486
1487        async def doit():
1488            sleeper = self.new_task(loop, sleep(5000))
1489            loop.call_later(0.1, sleeper.cancel)
1490            try:
1491                await sleeper
1492            except asyncio.CancelledError:
1493                return 'cancelled'
1494            else:
1495                return 'slept in'
1496
1497        doer = doit()
1498        self.assertEqual(loop.run_until_complete(doer), 'cancelled')
1499        self.assertAlmostEqual(0.1, loop.time())
1500
1501    def test_task_cancel_waiter_future(self):
1502        fut = self.new_future(self.loop)
1503
1504        async def coro():
1505            await fut
1506
1507        task = self.new_task(self.loop, coro())
1508        test_utils.run_briefly(self.loop)
1509        self.assertIs(task._fut_waiter, fut)
1510
1511        task.cancel()
1512        test_utils.run_briefly(self.loop)
1513        self.assertRaises(
1514            asyncio.CancelledError, self.loop.run_until_complete, task)
1515        self.assertIsNone(task._fut_waiter)
1516        self.assertTrue(fut.cancelled())
1517
1518    def test_task_set_methods(self):
1519        async def notmuch():
1520            return 'ko'
1521
1522        gen = notmuch()
1523        task = self.new_task(self.loop, gen)
1524
1525        with self.assertRaisesRegex(RuntimeError, 'not support set_result'):
1526            task.set_result('ok')
1527
1528        with self.assertRaisesRegex(RuntimeError, 'not support set_exception'):
1529            task.set_exception(ValueError())
1530
1531        self.assertEqual(
1532            self.loop.run_until_complete(task),
1533            'ko')
1534
1535    def test_step_result(self):
1536        with self.assertWarns(DeprecationWarning):
1537            @asyncio.coroutine
1538            def notmuch():
1539                yield None
1540                yield 1
1541                return 'ko'
1542
1543        self.assertRaises(
1544            RuntimeError, self.loop.run_until_complete, notmuch())
1545
1546    def test_step_result_future(self):
1547        # If coroutine returns future, task waits on this future.
1548
1549        class Fut(asyncio.Future):
1550            def __init__(self, *args, **kwds):
1551                self.cb_added = False
1552                super().__init__(*args, **kwds)
1553
1554            def add_done_callback(self, *args, **kwargs):
1555                self.cb_added = True
1556                super().add_done_callback(*args, **kwargs)
1557
1558        fut = Fut(loop=self.loop)
1559        result = None
1560
1561        async def wait_for_future():
1562            nonlocal result
1563            result = await fut
1564
1565        t = self.new_task(self.loop, wait_for_future())
1566        test_utils.run_briefly(self.loop)
1567        self.assertTrue(fut.cb_added)
1568
1569        res = object()
1570        fut.set_result(res)
1571        test_utils.run_briefly(self.loop)
1572        self.assertIs(res, result)
1573        self.assertTrue(t.done())
1574        self.assertIsNone(t.result())
1575
1576    def test_baseexception_during_cancel(self):
1577
1578        def gen():
1579            when = yield
1580            self.assertAlmostEqual(10.0, when)
1581            yield 0
1582
1583        loop = self.new_test_loop(gen)
1584
1585        async def sleeper():
1586            await asyncio.sleep(10)
1587
1588        base_exc = SystemExit()
1589
1590        async def notmutch():
1591            try:
1592                await sleeper()
1593            except asyncio.CancelledError:
1594                raise base_exc
1595
1596        task = self.new_task(loop, notmutch())
1597        test_utils.run_briefly(loop)
1598
1599        task.cancel()
1600        self.assertFalse(task.done())
1601
1602        self.assertRaises(SystemExit, test_utils.run_briefly, loop)
1603
1604        self.assertTrue(task.done())
1605        self.assertFalse(task.cancelled())
1606        self.assertIs(task.exception(), base_exc)
1607
1608    def test_iscoroutinefunction(self):
1609        def fn():
1610            pass
1611
1612        self.assertFalse(asyncio.iscoroutinefunction(fn))
1613
1614        def fn1():
1615            yield
1616        self.assertFalse(asyncio.iscoroutinefunction(fn1))
1617
1618        with self.assertWarns(DeprecationWarning):
1619            @asyncio.coroutine
1620            def fn2():
1621                yield
1622        self.assertTrue(asyncio.iscoroutinefunction(fn2))
1623
1624        self.assertFalse(asyncio.iscoroutinefunction(mock.Mock()))
1625
1626    def test_yield_vs_yield_from(self):
1627        fut = self.new_future(self.loop)
1628
1629        with self.assertWarns(DeprecationWarning):
1630            @asyncio.coroutine
1631            def wait_for_future():
1632                yield fut
1633
1634        task = wait_for_future()
1635        with self.assertRaises(RuntimeError):
1636            self.loop.run_until_complete(task)
1637
1638        self.assertFalse(fut.done())
1639
1640    def test_yield_vs_yield_from_generator(self):
1641        with self.assertWarns(DeprecationWarning):
1642            @asyncio.coroutine
1643            def coro():
1644                yield
1645
1646        with self.assertWarns(DeprecationWarning):
1647            @asyncio.coroutine
1648            def wait_for_future():
1649                gen = coro()
1650                try:
1651                    yield gen
1652                finally:
1653                    gen.close()
1654
1655        task = wait_for_future()
1656        self.assertRaises(
1657            RuntimeError,
1658            self.loop.run_until_complete, task)
1659
1660    def test_coroutine_non_gen_function(self):
1661        with self.assertWarns(DeprecationWarning):
1662            @asyncio.coroutine
1663            def func():
1664                return 'test'
1665
1666        self.assertTrue(asyncio.iscoroutinefunction(func))
1667
1668        coro = func()
1669        self.assertTrue(asyncio.iscoroutine(coro))
1670
1671        res = self.loop.run_until_complete(coro)
1672        self.assertEqual(res, 'test')
1673
1674    def test_coroutine_non_gen_function_return_future(self):
1675        fut = self.new_future(self.loop)
1676
1677        with self.assertWarns(DeprecationWarning):
1678            @asyncio.coroutine
1679            def func():
1680                return fut
1681
1682        async def coro():
1683            fut.set_result('test')
1684
1685        t1 = self.new_task(self.loop, func())
1686        t2 = self.new_task(self.loop, coro())
1687        res = self.loop.run_until_complete(t1)
1688        self.assertEqual(res, 'test')
1689        self.assertIsNone(t2.result())
1690
1691
1692    def test_current_task_deprecated(self):
1693        Task = self.__class__.Task
1694
1695        with self.assertWarns(DeprecationWarning):
1696            self.assertIsNone(Task.current_task(loop=self.loop))
1697
1698        async def coro(loop):
1699            with self.assertWarns(DeprecationWarning):
1700                self.assertIs(Task.current_task(loop=loop), task)
1701
1702            # See http://bugs.python.org/issue29271 for details:
1703            asyncio.set_event_loop(loop)
1704            try:
1705                with self.assertWarns(DeprecationWarning):
1706                    self.assertIs(Task.current_task(None), task)
1707                with self.assertWarns(DeprecationWarning):
1708                    self.assertIs(Task.current_task(), task)
1709            finally:
1710                asyncio.set_event_loop(None)
1711
1712        task = self.new_task(self.loop, coro(self.loop))
1713        self.loop.run_until_complete(task)
1714        with self.assertWarns(DeprecationWarning):
1715            self.assertIsNone(Task.current_task(loop=self.loop))
1716
1717    def test_current_task(self):
1718        self.assertIsNone(asyncio.current_task(loop=self.loop))
1719
1720        async def coro(loop):
1721            self.assertIs(asyncio.current_task(loop=loop), task)
1722
1723            self.assertIs(asyncio.current_task(None), task)
1724            self.assertIs(asyncio.current_task(), task)
1725
1726        task = self.new_task(self.loop, coro(self.loop))
1727        self.loop.run_until_complete(task)
1728        self.assertIsNone(asyncio.current_task(loop=self.loop))
1729
1730    def test_current_task_with_interleaving_tasks(self):
1731        self.assertIsNone(asyncio.current_task(loop=self.loop))
1732
1733        fut1 = self.new_future(self.loop)
1734        fut2 = self.new_future(self.loop)
1735
1736        async def coro1(loop):
1737            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1738            await fut1
1739            self.assertTrue(asyncio.current_task(loop=loop) is task1)
1740            fut2.set_result(True)
1741
1742        async def coro2(loop):
1743            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1744            fut1.set_result(True)
1745            await fut2
1746            self.assertTrue(asyncio.current_task(loop=loop) is task2)
1747
1748        task1 = self.new_task(self.loop, coro1(self.loop))
1749        task2 = self.new_task(self.loop, coro2(self.loop))
1750
1751        self.loop.run_until_complete(asyncio.wait((task1, task2)))
1752        self.assertIsNone(asyncio.current_task(loop=self.loop))
1753
1754    # Some thorough tests for cancellation propagation through
1755    # coroutines, tasks and wait().
1756
1757    def test_yield_future_passes_cancel(self):
1758        # Cancelling outer() cancels inner() cancels waiter.
1759        proof = 0
1760        waiter = self.new_future(self.loop)
1761
1762        async def inner():
1763            nonlocal proof
1764            try:
1765                await waiter
1766            except asyncio.CancelledError:
1767                proof += 1
1768                raise
1769            else:
1770                self.fail('got past sleep() in inner()')
1771
1772        async def outer():
1773            nonlocal proof
1774            try:
1775                await inner()
1776            except asyncio.CancelledError:
1777                proof += 100  # Expect this path.
1778            else:
1779                proof += 10
1780
1781        f = asyncio.ensure_future(outer(), loop=self.loop)
1782        test_utils.run_briefly(self.loop)
1783        f.cancel()
1784        self.loop.run_until_complete(f)
1785        self.assertEqual(proof, 101)
1786        self.assertTrue(waiter.cancelled())
1787
1788    def test_yield_wait_does_not_shield_cancel(self):
1789        # Cancelling outer() makes wait() return early, leaves inner()
1790        # running.
1791        proof = 0
1792        waiter = self.new_future(self.loop)
1793
1794        async def inner():
1795            nonlocal proof
1796            await waiter
1797            proof += 1
1798
1799        async def outer():
1800            nonlocal proof
1801            d, p = await asyncio.wait([inner()])
1802            proof += 100
1803
1804        f = asyncio.ensure_future(outer(), loop=self.loop)
1805        test_utils.run_briefly(self.loop)
1806        f.cancel()
1807        self.assertRaises(
1808            asyncio.CancelledError, self.loop.run_until_complete, f)
1809        waiter.set_result(None)
1810        test_utils.run_briefly(self.loop)
1811        self.assertEqual(proof, 1)
1812
1813    def test_shield_result(self):
1814        inner = self.new_future(self.loop)
1815        outer = asyncio.shield(inner)
1816        inner.set_result(42)
1817        res = self.loop.run_until_complete(outer)
1818        self.assertEqual(res, 42)
1819
1820    def test_shield_exception(self):
1821        inner = self.new_future(self.loop)
1822        outer = asyncio.shield(inner)
1823        test_utils.run_briefly(self.loop)
1824        exc = RuntimeError('expected')
1825        inner.set_exception(exc)
1826        test_utils.run_briefly(self.loop)
1827        self.assertIs(outer.exception(), exc)
1828
1829    def test_shield_cancel_inner(self):
1830        inner = self.new_future(self.loop)
1831        outer = asyncio.shield(inner)
1832        test_utils.run_briefly(self.loop)
1833        inner.cancel()
1834        test_utils.run_briefly(self.loop)
1835        self.assertTrue(outer.cancelled())
1836
1837    def test_shield_cancel_outer(self):
1838        inner = self.new_future(self.loop)
1839        outer = asyncio.shield(inner)
1840        test_utils.run_briefly(self.loop)
1841        outer.cancel()
1842        test_utils.run_briefly(self.loop)
1843        self.assertTrue(outer.cancelled())
1844        self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks))
1845
1846    def test_shield_shortcut(self):
1847        fut = self.new_future(self.loop)
1848        fut.set_result(42)
1849        res = self.loop.run_until_complete(asyncio.shield(fut))
1850        self.assertEqual(res, 42)
1851
1852    def test_shield_effect(self):
1853        # Cancelling outer() does not affect inner().
1854        proof = 0
1855        waiter = self.new_future(self.loop)
1856
1857        async def inner():
1858            nonlocal proof
1859            await waiter
1860            proof += 1
1861
1862        async def outer():
1863            nonlocal proof
1864            await asyncio.shield(inner())
1865            proof += 100
1866
1867        f = asyncio.ensure_future(outer(), loop=self.loop)
1868        test_utils.run_briefly(self.loop)
1869        f.cancel()
1870        with self.assertRaises(asyncio.CancelledError):
1871            self.loop.run_until_complete(f)
1872        waiter.set_result(None)
1873        test_utils.run_briefly(self.loop)
1874        self.assertEqual(proof, 1)
1875
1876    def test_shield_gather(self):
1877        child1 = self.new_future(self.loop)
1878        child2 = self.new_future(self.loop)
1879        parent = asyncio.gather(child1, child2)
1880        outer = asyncio.shield(parent)
1881        test_utils.run_briefly(self.loop)
1882        outer.cancel()
1883        test_utils.run_briefly(self.loop)
1884        self.assertTrue(outer.cancelled())
1885        child1.set_result(1)
1886        child2.set_result(2)
1887        test_utils.run_briefly(self.loop)
1888        self.assertEqual(parent.result(), [1, 2])
1889
1890    def test_gather_shield(self):
1891        child1 = self.new_future(self.loop)
1892        child2 = self.new_future(self.loop)
1893        inner1 = asyncio.shield(child1)
1894        inner2 = asyncio.shield(child2)
1895        parent = asyncio.gather(inner1, inner2)
1896        test_utils.run_briefly(self.loop)
1897        parent.cancel()
1898        # This should cancel inner1 and inner2 but bot child1 and child2.
1899        test_utils.run_briefly(self.loop)
1900        self.assertIsInstance(parent.exception(), asyncio.CancelledError)
1901        self.assertTrue(inner1.cancelled())
1902        self.assertTrue(inner2.cancelled())
1903        child1.set_result(1)
1904        child2.set_result(2)
1905        test_utils.run_briefly(self.loop)
1906
1907    def test_as_completed_invalid_args(self):
1908        fut = self.new_future(self.loop)
1909
1910        # as_completed() expects a list of futures, not a future instance
1911        self.assertRaises(TypeError, self.loop.run_until_complete,
1912            asyncio.as_completed(fut, loop=self.loop))
1913        coro = coroutine_function()
1914        self.assertRaises(TypeError, self.loop.run_until_complete,
1915            asyncio.as_completed(coro, loop=self.loop))
1916        coro.close()
1917
1918    def test_wait_invalid_args(self):
1919        fut = self.new_future(self.loop)
1920
1921        # wait() expects a list of futures, not a future instance
1922        self.assertRaises(TypeError, self.loop.run_until_complete,
1923            asyncio.wait(fut))
1924        coro = coroutine_function()
1925        self.assertRaises(TypeError, self.loop.run_until_complete,
1926            asyncio.wait(coro))
1927        coro.close()
1928
1929        # wait() expects at least a future
1930        self.assertRaises(ValueError, self.loop.run_until_complete,
1931            asyncio.wait([]))
1932
1933    def test_corowrapper_mocks_generator(self):
1934
1935        def check():
1936            # A function that asserts various things.
1937            # Called twice, with different debug flag values.
1938
1939            with self.assertWarns(DeprecationWarning):
1940                @asyncio.coroutine
1941                def coro():
1942                    # The actual coroutine.
1943                    self.assertTrue(gen.gi_running)
1944                    yield from fut
1945
1946            # A completed Future used to run the coroutine.
1947            fut = self.new_future(self.loop)
1948            fut.set_result(None)
1949
1950            # Call the coroutine.
1951            gen = coro()
1952
1953            # Check some properties.
1954            self.assertTrue(asyncio.iscoroutine(gen))
1955            self.assertIsInstance(gen.gi_frame, types.FrameType)
1956            self.assertFalse(gen.gi_running)
1957            self.assertIsInstance(gen.gi_code, types.CodeType)
1958
1959            # Run it.
1960            self.loop.run_until_complete(gen)
1961
1962            # The frame should have changed.
1963            self.assertIsNone(gen.gi_frame)
1964
1965        # Test with debug flag cleared.
1966        with set_coroutine_debug(False):
1967            check()
1968
1969        # Test with debug flag set.
1970        with set_coroutine_debug(True):
1971            check()
1972
1973    def test_yield_from_corowrapper(self):
1974        with set_coroutine_debug(True):
1975            with self.assertWarns(DeprecationWarning):
1976                @asyncio.coroutine
1977                def t1():
1978                    return (yield from t2())
1979
1980            with self.assertWarns(DeprecationWarning):
1981                @asyncio.coroutine
1982                def t2():
1983                    f = self.new_future(self.loop)
1984                    self.new_task(self.loop, t3(f))
1985                    return (yield from f)
1986
1987            with self.assertWarns(DeprecationWarning):
1988                @asyncio.coroutine
1989                def t3(f):
1990                    f.set_result((1, 2, 3))
1991
1992            task = self.new_task(self.loop, t1())
1993            val = self.loop.run_until_complete(task)
1994            self.assertEqual(val, (1, 2, 3))
1995
1996    def test_yield_from_corowrapper_send(self):
1997        def foo():
1998            a = yield
1999            return a
2000
2001        def call(arg):
2002            cw = asyncio.coroutines.CoroWrapper(foo())
2003            cw.send(None)
2004            try:
2005                cw.send(arg)
2006            except StopIteration as ex:
2007                return ex.args[0]
2008            else:
2009                raise AssertionError('StopIteration was expected')
2010
2011        self.assertEqual(call((1, 2)), (1, 2))
2012        self.assertEqual(call('spam'), 'spam')
2013
2014    def test_corowrapper_weakref(self):
2015        wd = weakref.WeakValueDictionary()
2016        def foo(): yield from []
2017        cw = asyncio.coroutines.CoroWrapper(foo())
2018        wd['cw'] = cw  # Would fail without __weakref__ slot.
2019        cw.gen = None  # Suppress warning from __del__.
2020
2021    def test_corowrapper_throw(self):
2022        # Issue 429: CoroWrapper.throw must be compatible with gen.throw
2023        def foo():
2024            value = None
2025            while True:
2026                try:
2027                    value = yield value
2028                except Exception as e:
2029                    value = e
2030
2031        exception = Exception("foo")
2032        cw = asyncio.coroutines.CoroWrapper(foo())
2033        cw.send(None)
2034        self.assertIs(exception, cw.throw(exception))
2035
2036        cw = asyncio.coroutines.CoroWrapper(foo())
2037        cw.send(None)
2038        self.assertIs(exception, cw.throw(Exception, exception))
2039
2040        cw = asyncio.coroutines.CoroWrapper(foo())
2041        cw.send(None)
2042        exception = cw.throw(Exception, "foo")
2043        self.assertIsInstance(exception, Exception)
2044        self.assertEqual(exception.args, ("foo", ))
2045
2046        cw = asyncio.coroutines.CoroWrapper(foo())
2047        cw.send(None)
2048        exception = cw.throw(Exception, "foo", None)
2049        self.assertIsInstance(exception, Exception)
2050        self.assertEqual(exception.args, ("foo", ))
2051
2052    def test_all_tasks_deprecated(self):
2053        Task = self.__class__.Task
2054
2055        async def coro():
2056            with self.assertWarns(DeprecationWarning):
2057                assert Task.all_tasks(self.loop) == {t}
2058
2059        t = self.new_task(self.loop, coro())
2060        self.loop.run_until_complete(t)
2061
2062    def test_log_destroyed_pending_task(self):
2063        Task = self.__class__.Task
2064
2065        with self.assertWarns(DeprecationWarning):
2066            @asyncio.coroutine
2067            def kill_me(loop):
2068                future = self.new_future(loop)
2069                yield from future
2070                # at this point, the only reference to kill_me() task is
2071                # the Task._wakeup() method in future._callbacks
2072                raise Exception("code never reached")
2073
2074        mock_handler = mock.Mock()
2075        self.loop.set_debug(True)
2076        self.loop.set_exception_handler(mock_handler)
2077
2078        # schedule the task
2079        coro = kill_me(self.loop)
2080        task = asyncio.ensure_future(coro, loop=self.loop)
2081
2082        self.assertEqual(asyncio.all_tasks(loop=self.loop), {task})
2083
2084        # See http://bugs.python.org/issue29271 for details:
2085        asyncio.set_event_loop(self.loop)
2086        try:
2087            with self.assertWarns(DeprecationWarning):
2088                self.assertEqual(Task.all_tasks(), {task})
2089            with self.assertWarns(DeprecationWarning):
2090                self.assertEqual(Task.all_tasks(None), {task})
2091        finally:
2092            asyncio.set_event_loop(None)
2093
2094        # execute the task so it waits for future
2095        self.loop._run_once()
2096        self.assertEqual(len(self.loop._ready), 0)
2097
2098        # remove the future used in kill_me(), and references to the task
2099        del coro.gi_frame.f_locals['future']
2100        coro = None
2101        source_traceback = task._source_traceback
2102        task = None
2103
2104        # no more reference to kill_me() task: the task is destroyed by the GC
2105        support.gc_collect()
2106
2107        self.assertEqual(asyncio.all_tasks(loop=self.loop), set())
2108
2109        mock_handler.assert_called_with(self.loop, {
2110            'message': 'Task was destroyed but it is pending!',
2111            'task': mock.ANY,
2112            'source_traceback': source_traceback,
2113        })
2114        mock_handler.reset_mock()
2115
2116    @mock.patch('asyncio.base_events.logger')
2117    def test_tb_logger_not_called_after_cancel(self, m_log):
2118        loop = asyncio.new_event_loop()
2119        self.set_event_loop(loop)
2120
2121        async def coro():
2122            raise TypeError
2123
2124        async def runner():
2125            task = self.new_task(loop, coro())
2126            await asyncio.sleep(0.05)
2127            task.cancel()
2128            task = None
2129
2130        loop.run_until_complete(runner())
2131        self.assertFalse(m_log.error.called)
2132
2133    @mock.patch('asyncio.coroutines.logger')
2134    def test_coroutine_never_yielded(self, m_log):
2135        with set_coroutine_debug(True):
2136            with self.assertWarns(DeprecationWarning):
2137                @asyncio.coroutine
2138                def coro_noop():
2139                    pass
2140
2141        tb_filename = __file__
2142        tb_lineno = sys._getframe().f_lineno + 2
2143        # create a coroutine object but don't use it
2144        coro_noop()
2145        support.gc_collect()
2146
2147        self.assertTrue(m_log.error.called)
2148        message = m_log.error.call_args[0][0]
2149        func_filename, func_lineno = test_utils.get_function_source(coro_noop)
2150
2151        regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> '
2152                    r'was never yielded from\n'
2153                 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n'
2154                 r'.*\n'
2155                 r'  File "%s", line %s, in test_coroutine_never_yielded\n'
2156                 r'    coro_noop\(\)$'
2157                 % (re.escape(coro_noop.__qualname__),
2158                    re.escape(func_filename), func_lineno,
2159                    re.escape(tb_filename), tb_lineno))
2160
2161        self.assertRegex(message, re.compile(regex, re.DOTALL))
2162
2163    def test_return_coroutine_from_coroutine(self):
2164        """Return of @asyncio.coroutine()-wrapped function generator object
2165        from @asyncio.coroutine()-wrapped function should have same effect as
2166        returning generator object or Future."""
2167        def check():
2168            with self.assertWarns(DeprecationWarning):
2169                @asyncio.coroutine
2170                def outer_coro():
2171                    with self.assertWarns(DeprecationWarning):
2172                        @asyncio.coroutine
2173                        def inner_coro():
2174                            return 1
2175
2176                    return inner_coro()
2177
2178            result = self.loop.run_until_complete(outer_coro())
2179            self.assertEqual(result, 1)
2180
2181        # Test with debug flag cleared.
2182        with set_coroutine_debug(False):
2183            check()
2184
2185        # Test with debug flag set.
2186        with set_coroutine_debug(True):
2187            check()
2188
2189    def test_task_source_traceback(self):
2190        self.loop.set_debug(True)
2191
2192        task = self.new_task(self.loop, coroutine_function())
2193        lineno = sys._getframe().f_lineno - 1
2194        self.assertIsInstance(task._source_traceback, list)
2195        self.assertEqual(task._source_traceback[-2][:3],
2196                         (__file__,
2197                          lineno,
2198                          'test_task_source_traceback'))
2199        self.loop.run_until_complete(task)
2200
2201    def _test_cancel_wait_for(self, timeout):
2202        loop = asyncio.new_event_loop()
2203        self.addCleanup(loop.close)
2204
2205        async def blocking_coroutine():
2206            fut = self.new_future(loop)
2207            # Block: fut result is never set
2208            await fut
2209
2210        task = loop.create_task(blocking_coroutine())
2211
2212        wait = loop.create_task(asyncio.wait_for(task, timeout))
2213        loop.call_soon(wait.cancel)
2214
2215        self.assertRaises(asyncio.CancelledError,
2216                          loop.run_until_complete, wait)
2217
2218        # Python issue #23219: cancelling the wait must also cancel the task
2219        self.assertTrue(task.cancelled())
2220
2221    def test_cancel_blocking_wait_for(self):
2222        self._test_cancel_wait_for(None)
2223
2224    def test_cancel_wait_for(self):
2225        self._test_cancel_wait_for(60.0)
2226
2227    def test_cancel_gather_1(self):
2228        """Ensure that a gathering future refuses to be cancelled once all
2229        children are done"""
2230        loop = asyncio.new_event_loop()
2231        self.addCleanup(loop.close)
2232
2233        fut = self.new_future(loop)
2234        # The indirection fut->child_coro is needed since otherwise the
2235        # gathering task is done at the same time as the child future
2236        def child_coro():
2237            return (yield from fut)
2238        gather_future = asyncio.gather(child_coro(), loop=loop)
2239        gather_task = asyncio.ensure_future(gather_future, loop=loop)
2240
2241        cancel_result = None
2242        def cancelling_callback(_):
2243            nonlocal cancel_result
2244            cancel_result = gather_task.cancel()
2245        fut.add_done_callback(cancelling_callback)
2246
2247        fut.set_result(42) # calls the cancelling_callback after fut is done()
2248
2249        # At this point the task should complete.
2250        loop.run_until_complete(gather_task)
2251
2252        # Python issue #26923: asyncio.gather drops cancellation
2253        self.assertEqual(cancel_result, False)
2254        self.assertFalse(gather_task.cancelled())
2255        self.assertEqual(gather_task.result(), [42])
2256
2257    def test_cancel_gather_2(self):
2258        loop = asyncio.new_event_loop()
2259        self.addCleanup(loop.close)
2260
2261        async def test():
2262            time = 0
2263            while True:
2264                time += 0.05
2265                await asyncio.gather(asyncio.sleep(0.05),
2266                                     return_exceptions=True,
2267                                     loop=loop)
2268                if time > 1:
2269                    return
2270
2271        async def main():
2272            qwe = self.new_task(loop, test())
2273            await asyncio.sleep(0.2)
2274            qwe.cancel()
2275            try:
2276                await qwe
2277            except asyncio.CancelledError:
2278                pass
2279            else:
2280                self.fail('gather did not propagate the cancellation request')
2281
2282        loop.run_until_complete(main())
2283
2284    def test_exception_traceback(self):
2285        # See http://bugs.python.org/issue28843
2286
2287        async def foo():
2288            1 / 0
2289
2290        async def main():
2291            task = self.new_task(self.loop, foo())
2292            await asyncio.sleep(0)  # skip one loop iteration
2293            self.assertIsNotNone(task.exception().__traceback__)
2294
2295        self.loop.run_until_complete(main())
2296
2297    @mock.patch('asyncio.base_events.logger')
2298    def test_error_in_call_soon(self, m_log):
2299        def call_soon(callback, *args, **kwargs):
2300            raise ValueError
2301        self.loop.call_soon = call_soon
2302
2303        with self.assertWarns(DeprecationWarning):
2304            @asyncio.coroutine
2305            def coro():
2306                pass
2307
2308        self.assertFalse(m_log.error.called)
2309
2310        with self.assertRaises(ValueError):
2311            gen = coro()
2312            try:
2313                self.new_task(self.loop, gen)
2314            finally:
2315                gen.close()
2316
2317        self.assertTrue(m_log.error.called)
2318        message = m_log.error.call_args[0][0]
2319        self.assertIn('Task was destroyed but it is pending', message)
2320
2321        self.assertEqual(asyncio.all_tasks(self.loop), set())
2322
2323    def test_create_task_with_noncoroutine(self):
2324        with self.assertRaisesRegex(TypeError,
2325                                    "a coroutine was expected, got 123"):
2326            self.new_task(self.loop, 123)
2327
2328        # test it for the second time to ensure that caching
2329        # in asyncio.iscoroutine() doesn't break things.
2330        with self.assertRaisesRegex(TypeError,
2331                                    "a coroutine was expected, got 123"):
2332            self.new_task(self.loop, 123)
2333
2334    def test_create_task_with_oldstyle_coroutine(self):
2335
2336        with self.assertWarns(DeprecationWarning):
2337            @asyncio.coroutine
2338            def coro():
2339                pass
2340
2341        task = self.new_task(self.loop, coro())
2342        self.assertIsInstance(task, self.Task)
2343        self.loop.run_until_complete(task)
2344
2345        # test it for the second time to ensure that caching
2346        # in asyncio.iscoroutine() doesn't break things.
2347        task = self.new_task(self.loop, coro())
2348        self.assertIsInstance(task, self.Task)
2349        self.loop.run_until_complete(task)
2350
2351    def test_create_task_with_async_function(self):
2352
2353        async def coro():
2354            pass
2355
2356        task = self.new_task(self.loop, coro())
2357        self.assertIsInstance(task, self.Task)
2358        self.loop.run_until_complete(task)
2359
2360        # test it for the second time to ensure that caching
2361        # in asyncio.iscoroutine() doesn't break things.
2362        task = self.new_task(self.loop, coro())
2363        self.assertIsInstance(task, self.Task)
2364        self.loop.run_until_complete(task)
2365
2366    def test_create_task_with_asynclike_function(self):
2367        task = self.new_task(self.loop, CoroLikeObject())
2368        self.assertIsInstance(task, self.Task)
2369        self.assertEqual(self.loop.run_until_complete(task), 42)
2370
2371        # test it for the second time to ensure that caching
2372        # in asyncio.iscoroutine() doesn't break things.
2373        task = self.new_task(self.loop, CoroLikeObject())
2374        self.assertIsInstance(task, self.Task)
2375        self.assertEqual(self.loop.run_until_complete(task), 42)
2376
2377    def test_bare_create_task(self):
2378
2379        async def inner():
2380            return 1
2381
2382        async def coro():
2383            task = asyncio.create_task(inner())
2384            self.assertIsInstance(task, self.Task)
2385            ret = await task
2386            self.assertEqual(1, ret)
2387
2388        self.loop.run_until_complete(coro())
2389
2390    def test_bare_create_named_task(self):
2391
2392        async def coro_noop():
2393            pass
2394
2395        async def coro():
2396            task = asyncio.create_task(coro_noop(), name='No-op')
2397            self.assertEqual(task.get_name(), 'No-op')
2398            await task
2399
2400        self.loop.run_until_complete(coro())
2401
2402    def test_context_1(self):
2403        cvar = contextvars.ContextVar('cvar', default='nope')
2404
2405        async def sub():
2406            await asyncio.sleep(0.01)
2407            self.assertEqual(cvar.get(), 'nope')
2408            cvar.set('something else')
2409
2410        async def main():
2411            self.assertEqual(cvar.get(), 'nope')
2412            subtask = self.new_task(loop, sub())
2413            cvar.set('yes')
2414            self.assertEqual(cvar.get(), 'yes')
2415            await subtask
2416            self.assertEqual(cvar.get(), 'yes')
2417
2418        loop = asyncio.new_event_loop()
2419        try:
2420            task = self.new_task(loop, main())
2421            loop.run_until_complete(task)
2422        finally:
2423            loop.close()
2424
2425    def test_context_2(self):
2426        cvar = contextvars.ContextVar('cvar', default='nope')
2427
2428        async def main():
2429            def fut_on_done(fut):
2430                # This change must not pollute the context
2431                # of the "main()" task.
2432                cvar.set('something else')
2433
2434            self.assertEqual(cvar.get(), 'nope')
2435
2436            for j in range(2):
2437                fut = self.new_future(loop)
2438                fut.add_done_callback(fut_on_done)
2439                cvar.set(f'yes{j}')
2440                loop.call_soon(fut.set_result, None)
2441                await fut
2442                self.assertEqual(cvar.get(), f'yes{j}')
2443
2444                for i in range(3):
2445                    # Test that task passed its context to add_done_callback:
2446                    cvar.set(f'yes{i}-{j}')
2447                    await asyncio.sleep(0.001)
2448                    self.assertEqual(cvar.get(), f'yes{i}-{j}')
2449
2450        loop = asyncio.new_event_loop()
2451        try:
2452            task = self.new_task(loop, main())
2453            loop.run_until_complete(task)
2454        finally:
2455            loop.close()
2456
2457        self.assertEqual(cvar.get(), 'nope')
2458
2459    def test_context_3(self):
2460        # Run 100 Tasks in parallel, each modifying cvar.
2461
2462        cvar = contextvars.ContextVar('cvar', default=-1)
2463
2464        async def sub(num):
2465            for i in range(10):
2466                cvar.set(num + i)
2467                await asyncio.sleep(random.uniform(0.001, 0.05))
2468                self.assertEqual(cvar.get(), num + i)
2469
2470        async def main():
2471            tasks = []
2472            for i in range(100):
2473                task = loop.create_task(sub(random.randint(0, 10)))
2474                tasks.append(task)
2475
2476            await asyncio.gather(*tasks, loop=loop)
2477
2478        loop = asyncio.new_event_loop()
2479        try:
2480            loop.run_until_complete(main())
2481        finally:
2482            loop.close()
2483
2484        self.assertEqual(cvar.get(), -1)
2485
2486    def test_get_coro(self):
2487        loop = asyncio.new_event_loop()
2488        coro = coroutine_function()
2489        try:
2490            task = self.new_task(loop, coro)
2491            loop.run_until_complete(task)
2492            self.assertIs(task.get_coro(), coro)
2493        finally:
2494            loop.close()
2495
2496
2497def add_subclass_tests(cls):
2498    BaseTask = cls.Task
2499    BaseFuture = cls.Future
2500
2501    if BaseTask is None or BaseFuture is None:
2502        return cls
2503
2504    class CommonFuture:
2505        def __init__(self, *args, **kwargs):
2506            self.calls = collections.defaultdict(lambda: 0)
2507            super().__init__(*args, **kwargs)
2508
2509        def add_done_callback(self, *args, **kwargs):
2510            self.calls['add_done_callback'] += 1
2511            return super().add_done_callback(*args, **kwargs)
2512
2513    class Task(CommonFuture, BaseTask):
2514        pass
2515
2516    class Future(CommonFuture, BaseFuture):
2517        pass
2518
2519    def test_subclasses_ctask_cfuture(self):
2520        fut = self.Future(loop=self.loop)
2521
2522        async def func():
2523            self.loop.call_soon(lambda: fut.set_result('spam'))
2524            return await fut
2525
2526        task = self.Task(func(), loop=self.loop)
2527
2528        result = self.loop.run_until_complete(task)
2529
2530        self.assertEqual(result, 'spam')
2531
2532        self.assertEqual(
2533            dict(task.calls),
2534            {'add_done_callback': 1})
2535
2536        self.assertEqual(
2537            dict(fut.calls),
2538            {'add_done_callback': 1})
2539
2540    # Add patched Task & Future back to the test case
2541    cls.Task = Task
2542    cls.Future = Future
2543
2544    # Add an extra unit-test
2545    cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture
2546
2547    # Disable the "test_task_source_traceback" test
2548    # (the test is hardcoded for a particular call stack, which
2549    # is slightly different for Task subclasses)
2550    cls.test_task_source_traceback = None
2551
2552    return cls
2553
2554
2555class SetMethodsTest:
2556
2557    def test_set_result_causes_invalid_state(self):
2558        Future = type(self).Future
2559        self.loop.call_exception_handler = exc_handler = mock.Mock()
2560
2561        async def foo():
2562            await asyncio.sleep(0.1)
2563            return 10
2564
2565        coro = foo()
2566        task = self.new_task(self.loop, coro)
2567        Future.set_result(task, 'spam')
2568
2569        self.assertEqual(
2570            self.loop.run_until_complete(task),
2571            'spam')
2572
2573        exc_handler.assert_called_once()
2574        exc = exc_handler.call_args[0][0]['exception']
2575        with self.assertRaisesRegex(asyncio.InvalidStateError,
2576                                    r'step\(\): already done'):
2577            raise exc
2578
2579        coro.close()
2580
2581    def test_set_exception_causes_invalid_state(self):
2582        class MyExc(Exception):
2583            pass
2584
2585        Future = type(self).Future
2586        self.loop.call_exception_handler = exc_handler = mock.Mock()
2587
2588        async def foo():
2589            await asyncio.sleep(0.1)
2590            return 10
2591
2592        coro = foo()
2593        task = self.new_task(self.loop, coro)
2594        Future.set_exception(task, MyExc())
2595
2596        with self.assertRaises(MyExc):
2597            self.loop.run_until_complete(task)
2598
2599        exc_handler.assert_called_once()
2600        exc = exc_handler.call_args[0][0]['exception']
2601        with self.assertRaisesRegex(asyncio.InvalidStateError,
2602                                    r'step\(\): already done'):
2603            raise exc
2604
2605        coro.close()
2606
2607
2608@unittest.skipUnless(hasattr(futures, '_CFuture') and
2609                     hasattr(tasks, '_CTask'),
2610                     'requires the C _asyncio module')
2611class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest,
2612                          test_utils.TestCase):
2613
2614    Task = getattr(tasks, '_CTask', None)
2615    Future = getattr(futures, '_CFuture', None)
2616
2617    @support.refcount_test
2618    def test_refleaks_in_task___init__(self):
2619        gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount')
2620        async def coro():
2621            pass
2622        task = self.new_task(self.loop, coro())
2623        self.loop.run_until_complete(task)
2624        refs_before = gettotalrefcount()
2625        for i in range(100):
2626            task.__init__(coro(), loop=self.loop)
2627            self.loop.run_until_complete(task)
2628        self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10)
2629
2630    def test_del__log_destroy_pending_segfault(self):
2631        async def coro():
2632            pass
2633        task = self.new_task(self.loop, coro())
2634        self.loop.run_until_complete(task)
2635        with self.assertRaises(AttributeError):
2636            del task._log_destroy_pending
2637
2638
2639@unittest.skipUnless(hasattr(futures, '_CFuture') and
2640                     hasattr(tasks, '_CTask'),
2641                     'requires the C _asyncio module')
2642@add_subclass_tests
2643class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2644
2645    Task = getattr(tasks, '_CTask', None)
2646    Future = getattr(futures, '_CFuture', None)
2647
2648
2649@unittest.skipUnless(hasattr(tasks, '_CTask'),
2650                     'requires the C _asyncio module')
2651@add_subclass_tests
2652class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2653
2654    Task = getattr(tasks, '_CTask', None)
2655    Future = futures._PyFuture
2656
2657
2658@unittest.skipUnless(hasattr(futures, '_CFuture'),
2659                     'requires the C _asyncio module')
2660@add_subclass_tests
2661class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase):
2662
2663    Future = getattr(futures, '_CFuture', None)
2664    Task = tasks._PyTask
2665
2666
2667@unittest.skipUnless(hasattr(tasks, '_CTask'),
2668                     'requires the C _asyncio module')
2669class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase):
2670
2671    Task = getattr(tasks, '_CTask', None)
2672    Future = futures._PyFuture
2673
2674
2675@unittest.skipUnless(hasattr(futures, '_CFuture'),
2676                     'requires the C _asyncio module')
2677class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase):
2678
2679    Task = tasks._PyTask
2680    Future = getattr(futures, '_CFuture', None)
2681
2682
2683class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest,
2684                            test_utils.TestCase):
2685
2686    Task = tasks._PyTask
2687    Future = futures._PyFuture
2688
2689
2690@add_subclass_tests
2691class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase):
2692    Task = tasks._PyTask
2693    Future = futures._PyFuture
2694
2695
2696@unittest.skipUnless(hasattr(tasks, '_CTask'),
2697                     'requires the C _asyncio module')
2698class CTask_Future_Tests(test_utils.TestCase):
2699
2700    def test_foobar(self):
2701        class Fut(asyncio.Future):
2702            @property
2703            def get_loop(self):
2704                raise AttributeError
2705
2706        async def coro():
2707            await fut
2708            return 'spam'
2709
2710        self.loop = asyncio.new_event_loop()
2711        try:
2712            fut = Fut(loop=self.loop)
2713            self.loop.call_later(0.1, fut.set_result, 1)
2714            task = self.loop.create_task(coro())
2715            res = self.loop.run_until_complete(task)
2716        finally:
2717            self.loop.close()
2718
2719        self.assertEqual(res, 'spam')
2720
2721
2722class BaseTaskIntrospectionTests:
2723    _register_task = None
2724    _unregister_task = None
2725    _enter_task = None
2726    _leave_task = None
2727
2728    def test__register_task_1(self):
2729        class TaskLike:
2730            @property
2731            def _loop(self):
2732                return loop
2733
2734            def done(self):
2735                return False
2736
2737        task = TaskLike()
2738        loop = mock.Mock()
2739
2740        self.assertEqual(asyncio.all_tasks(loop), set())
2741        self._register_task(task)
2742        self.assertEqual(asyncio.all_tasks(loop), {task})
2743        self._unregister_task(task)
2744
2745    def test__register_task_2(self):
2746        class TaskLike:
2747            def get_loop(self):
2748                return loop
2749
2750            def done(self):
2751                return False
2752
2753        task = TaskLike()
2754        loop = mock.Mock()
2755
2756        self.assertEqual(asyncio.all_tasks(loop), set())
2757        self._register_task(task)
2758        self.assertEqual(asyncio.all_tasks(loop), {task})
2759        self._unregister_task(task)
2760
2761    def test__register_task_3(self):
2762        class TaskLike:
2763            def get_loop(self):
2764                return loop
2765
2766            def done(self):
2767                return True
2768
2769        task = TaskLike()
2770        loop = mock.Mock()
2771
2772        self.assertEqual(asyncio.all_tasks(loop), set())
2773        self._register_task(task)
2774        self.assertEqual(asyncio.all_tasks(loop), set())
2775        with self.assertWarns(DeprecationWarning):
2776            self.assertEqual(asyncio.Task.all_tasks(loop), {task})
2777        self._unregister_task(task)
2778
2779    def test__enter_task(self):
2780        task = mock.Mock()
2781        loop = mock.Mock()
2782        self.assertIsNone(asyncio.current_task(loop))
2783        self._enter_task(loop, task)
2784        self.assertIs(asyncio.current_task(loop), task)
2785        self._leave_task(loop, task)
2786
2787    def test__enter_task_failure(self):
2788        task1 = mock.Mock()
2789        task2 = mock.Mock()
2790        loop = mock.Mock()
2791        self._enter_task(loop, task1)
2792        with self.assertRaises(RuntimeError):
2793            self._enter_task(loop, task2)
2794        self.assertIs(asyncio.current_task(loop), task1)
2795        self._leave_task(loop, task1)
2796
2797    def test__leave_task(self):
2798        task = mock.Mock()
2799        loop = mock.Mock()
2800        self._enter_task(loop, task)
2801        self._leave_task(loop, task)
2802        self.assertIsNone(asyncio.current_task(loop))
2803
2804    def test__leave_task_failure1(self):
2805        task1 = mock.Mock()
2806        task2 = mock.Mock()
2807        loop = mock.Mock()
2808        self._enter_task(loop, task1)
2809        with self.assertRaises(RuntimeError):
2810            self._leave_task(loop, task2)
2811        self.assertIs(asyncio.current_task(loop), task1)
2812        self._leave_task(loop, task1)
2813
2814    def test__leave_task_failure2(self):
2815        task = mock.Mock()
2816        loop = mock.Mock()
2817        with self.assertRaises(RuntimeError):
2818            self._leave_task(loop, task)
2819        self.assertIsNone(asyncio.current_task(loop))
2820
2821    def test__unregister_task(self):
2822        task = mock.Mock()
2823        loop = mock.Mock()
2824        task.get_loop = lambda: loop
2825        self._register_task(task)
2826        self._unregister_task(task)
2827        self.assertEqual(asyncio.all_tasks(loop), set())
2828
2829    def test__unregister_task_not_registered(self):
2830        task = mock.Mock()
2831        loop = mock.Mock()
2832        self._unregister_task(task)
2833        self.assertEqual(asyncio.all_tasks(loop), set())
2834
2835
2836class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2837    _register_task = staticmethod(tasks._py_register_task)
2838    _unregister_task = staticmethod(tasks._py_unregister_task)
2839    _enter_task = staticmethod(tasks._py_enter_task)
2840    _leave_task = staticmethod(tasks._py_leave_task)
2841
2842
2843@unittest.skipUnless(hasattr(tasks, '_c_register_task'),
2844                     'requires the C _asyncio module')
2845class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests):
2846    if hasattr(tasks, '_c_register_task'):
2847        _register_task = staticmethod(tasks._c_register_task)
2848        _unregister_task = staticmethod(tasks._c_unregister_task)
2849        _enter_task = staticmethod(tasks._c_enter_task)
2850        _leave_task = staticmethod(tasks._c_leave_task)
2851    else:
2852        _register_task = _unregister_task = _enter_task = _leave_task = None
2853
2854
2855class BaseCurrentLoopTests:
2856
2857    def setUp(self):
2858        super().setUp()
2859        self.loop = asyncio.new_event_loop()
2860        self.set_event_loop(self.loop)
2861
2862    def new_task(self, coro):
2863        raise NotImplementedError
2864
2865    def test_current_task_no_running_loop(self):
2866        self.assertIsNone(asyncio.current_task(loop=self.loop))
2867
2868    def test_current_task_no_running_loop_implicit(self):
2869        with self.assertRaises(RuntimeError):
2870            asyncio.current_task()
2871
2872    def test_current_task_with_implicit_loop(self):
2873        async def coro():
2874            self.assertIs(asyncio.current_task(loop=self.loop), task)
2875
2876            self.assertIs(asyncio.current_task(None), task)
2877            self.assertIs(asyncio.current_task(), task)
2878
2879        task = self.new_task(coro())
2880        self.loop.run_until_complete(task)
2881        self.assertIsNone(asyncio.current_task(loop=self.loop))
2882
2883
2884class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2885
2886    def new_task(self, coro):
2887        return tasks._PyTask(coro, loop=self.loop)
2888
2889
2890@unittest.skipUnless(hasattr(tasks, '_CTask'),
2891                     'requires the C _asyncio module')
2892class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase):
2893
2894    def new_task(self, coro):
2895        return getattr(tasks, '_CTask')(coro, loop=self.loop)
2896
2897
2898class GenericTaskTests(test_utils.TestCase):
2899
2900    def test_future_subclass(self):
2901        self.assertTrue(issubclass(asyncio.Task, asyncio.Future))
2902
2903    def test_asyncio_module_compiled(self):
2904        # Because of circular imports it's easy to make _asyncio
2905        # module non-importable.  This is a simple test that will
2906        # fail on systems where C modules were successfully compiled
2907        # (hence the test for _functools), but _asyncio somehow didn't.
2908        try:
2909            import _functools
2910        except ImportError:
2911            pass
2912        else:
2913            try:
2914                import _asyncio
2915            except ImportError:
2916                self.fail('_asyncio module is missing')
2917
2918
2919class GatherTestsBase:
2920
2921    def setUp(self):
2922        super().setUp()
2923        self.one_loop = self.new_test_loop()
2924        self.other_loop = self.new_test_loop()
2925        self.set_event_loop(self.one_loop, cleanup=False)
2926
2927    def _run_loop(self, loop):
2928        while loop._ready:
2929            test_utils.run_briefly(loop)
2930
2931    def _check_success(self, **kwargs):
2932        a, b, c = [self.one_loop.create_future() for i in range(3)]
2933        fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs)
2934        cb = test_utils.MockCallback()
2935        fut.add_done_callback(cb)
2936        b.set_result(1)
2937        a.set_result(2)
2938        self._run_loop(self.one_loop)
2939        self.assertEqual(cb.called, False)
2940        self.assertFalse(fut.done())
2941        c.set_result(3)
2942        self._run_loop(self.one_loop)
2943        cb.assert_called_once_with(fut)
2944        self.assertEqual(fut.result(), [2, 1, 3])
2945
2946    def test_success(self):
2947        self._check_success()
2948        self._check_success(return_exceptions=False)
2949
2950    def test_result_exception_success(self):
2951        self._check_success(return_exceptions=True)
2952
2953    def test_one_exception(self):
2954        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
2955        fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e))
2956        cb = test_utils.MockCallback()
2957        fut.add_done_callback(cb)
2958        exc = ZeroDivisionError()
2959        a.set_result(1)
2960        b.set_exception(exc)
2961        self._run_loop(self.one_loop)
2962        self.assertTrue(fut.done())
2963        cb.assert_called_once_with(fut)
2964        self.assertIs(fut.exception(), exc)
2965        # Does nothing
2966        c.set_result(3)
2967        d.cancel()
2968        e.set_exception(RuntimeError())
2969        e.exception()
2970
2971    def test_return_exceptions(self):
2972        a, b, c, d = [self.one_loop.create_future() for i in range(4)]
2973        fut = asyncio.gather(*self.wrap_futures(a, b, c, d),
2974                             return_exceptions=True)
2975        cb = test_utils.MockCallback()
2976        fut.add_done_callback(cb)
2977        exc = ZeroDivisionError()
2978        exc2 = RuntimeError()
2979        b.set_result(1)
2980        c.set_exception(exc)
2981        a.set_result(3)
2982        self._run_loop(self.one_loop)
2983        self.assertFalse(fut.done())
2984        d.set_exception(exc2)
2985        self._run_loop(self.one_loop)
2986        self.assertTrue(fut.done())
2987        cb.assert_called_once_with(fut)
2988        self.assertEqual(fut.result(), [3, 1, exc, exc2])
2989
2990    def test_env_var_debug(self):
2991        code = '\n'.join((
2992            'import asyncio.coroutines',
2993            'print(asyncio.coroutines._DEBUG)'))
2994
2995        # Test with -E to not fail if the unit test was run with
2996        # PYTHONASYNCIODEBUG set to a non-empty string
2997        sts, stdout, stderr = assert_python_ok('-E', '-c', code)
2998        self.assertEqual(stdout.rstrip(), b'False')
2999
3000        sts, stdout, stderr = assert_python_ok('-c', code,
3001                                               PYTHONASYNCIODEBUG='',
3002                                               PYTHONDEVMODE='')
3003        self.assertEqual(stdout.rstrip(), b'False')
3004
3005        sts, stdout, stderr = assert_python_ok('-c', code,
3006                                               PYTHONASYNCIODEBUG='1',
3007                                               PYTHONDEVMODE='')
3008        self.assertEqual(stdout.rstrip(), b'True')
3009
3010        sts, stdout, stderr = assert_python_ok('-E', '-c', code,
3011                                               PYTHONASYNCIODEBUG='1',
3012                                               PYTHONDEVMODE='')
3013        self.assertEqual(stdout.rstrip(), b'False')
3014
3015        # -X dev
3016        sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev',
3017                                               '-c', code)
3018        self.assertEqual(stdout.rstrip(), b'True')
3019
3020
3021class FutureGatherTests(GatherTestsBase, test_utils.TestCase):
3022
3023    def wrap_futures(self, *futures):
3024        return futures
3025
3026    def _check_empty_sequence(self, seq_or_iter):
3027        asyncio.set_event_loop(self.one_loop)
3028        self.addCleanup(asyncio.set_event_loop, None)
3029        fut = asyncio.gather(*seq_or_iter)
3030        self.assertIsInstance(fut, asyncio.Future)
3031        self.assertIs(fut._loop, self.one_loop)
3032        self._run_loop(self.one_loop)
3033        self.assertTrue(fut.done())
3034        self.assertEqual(fut.result(), [])
3035        with self.assertWarns(DeprecationWarning):
3036            fut = asyncio.gather(*seq_or_iter, loop=self.other_loop)
3037        self.assertIs(fut._loop, self.other_loop)
3038
3039    def test_constructor_empty_sequence(self):
3040        self._check_empty_sequence([])
3041        self._check_empty_sequence(())
3042        self._check_empty_sequence(set())
3043        self._check_empty_sequence(iter(""))
3044
3045    def test_constructor_heterogenous_futures(self):
3046        fut1 = self.one_loop.create_future()
3047        fut2 = self.other_loop.create_future()
3048        with self.assertRaises(ValueError):
3049            asyncio.gather(fut1, fut2)
3050        with self.assertRaises(ValueError):
3051            asyncio.gather(fut1, loop=self.other_loop)
3052
3053    def test_constructor_homogenous_futures(self):
3054        children = [self.other_loop.create_future() for i in range(3)]
3055        fut = asyncio.gather(*children)
3056        self.assertIs(fut._loop, self.other_loop)
3057        self._run_loop(self.other_loop)
3058        self.assertFalse(fut.done())
3059        fut = asyncio.gather(*children, loop=self.other_loop)
3060        self.assertIs(fut._loop, self.other_loop)
3061        self._run_loop(self.other_loop)
3062        self.assertFalse(fut.done())
3063
3064    def test_one_cancellation(self):
3065        a, b, c, d, e = [self.one_loop.create_future() for i in range(5)]
3066        fut = asyncio.gather(a, b, c, d, e)
3067        cb = test_utils.MockCallback()
3068        fut.add_done_callback(cb)
3069        a.set_result(1)
3070        b.cancel()
3071        self._run_loop(self.one_loop)
3072        self.assertTrue(fut.done())
3073        cb.assert_called_once_with(fut)
3074        self.assertFalse(fut.cancelled())
3075        self.assertIsInstance(fut.exception(), asyncio.CancelledError)
3076        # Does nothing
3077        c.set_result(3)
3078        d.cancel()
3079        e.set_exception(RuntimeError())
3080        e.exception()
3081
3082    def test_result_exception_one_cancellation(self):
3083        a, b, c, d, e, f = [self.one_loop.create_future()
3084                            for i in range(6)]
3085        fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True)
3086        cb = test_utils.MockCallback()
3087        fut.add_done_callback(cb)
3088        a.set_result(1)
3089        zde = ZeroDivisionError()
3090        b.set_exception(zde)
3091        c.cancel()
3092        self._run_loop(self.one_loop)
3093        self.assertFalse(fut.done())
3094        d.set_result(3)
3095        e.cancel()
3096        rte = RuntimeError()
3097        f.set_exception(rte)
3098        res = self.one_loop.run_until_complete(fut)
3099        self.assertIsInstance(res[2], asyncio.CancelledError)
3100        self.assertIsInstance(res[4], asyncio.CancelledError)
3101        res[2] = res[4] = None
3102        self.assertEqual(res, [1, zde, None, 3, None, rte])
3103        cb.assert_called_once_with(fut)
3104
3105
3106class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase):
3107
3108    def setUp(self):
3109        super().setUp()
3110        asyncio.set_event_loop(self.one_loop)
3111
3112    def wrap_futures(self, *futures):
3113        coros = []
3114        for fut in futures:
3115            async def coro(fut=fut):
3116                return await fut
3117            coros.append(coro())
3118        return coros
3119
3120    def test_constructor_loop_selection(self):
3121        async def coro():
3122            return 'abc'
3123        gen1 = coro()
3124        gen2 = coro()
3125        fut = asyncio.gather(gen1, gen2)
3126        self.assertIs(fut._loop, self.one_loop)
3127        self.one_loop.run_until_complete(fut)
3128
3129        self.set_event_loop(self.other_loop, cleanup=False)
3130        gen3 = coro()
3131        gen4 = coro()
3132        fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop)
3133        self.assertIs(fut2._loop, self.other_loop)
3134        self.other_loop.run_until_complete(fut2)
3135
3136    def test_duplicate_coroutines(self):
3137        with self.assertWarns(DeprecationWarning):
3138            @asyncio.coroutine
3139            def coro(s):
3140                return s
3141        c = coro('abc')
3142        fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop)
3143        self._run_loop(self.one_loop)
3144        self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc'])
3145
3146    def test_cancellation_broadcast(self):
3147        # Cancelling outer() cancels all children.
3148        proof = 0
3149        waiter = self.one_loop.create_future()
3150
3151        async def inner():
3152            nonlocal proof
3153            await waiter
3154            proof += 1
3155
3156        child1 = asyncio.ensure_future(inner(), loop=self.one_loop)
3157        child2 = asyncio.ensure_future(inner(), loop=self.one_loop)
3158        gatherer = None
3159
3160        async def outer():
3161            nonlocal proof, gatherer
3162            gatherer = asyncio.gather(child1, child2, loop=self.one_loop)
3163            await gatherer
3164            proof += 100
3165
3166        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3167        test_utils.run_briefly(self.one_loop)
3168        self.assertTrue(f.cancel())
3169        with self.assertRaises(asyncio.CancelledError):
3170            self.one_loop.run_until_complete(f)
3171        self.assertFalse(gatherer.cancel())
3172        self.assertTrue(waiter.cancelled())
3173        self.assertTrue(child1.cancelled())
3174        self.assertTrue(child2.cancelled())
3175        test_utils.run_briefly(self.one_loop)
3176        self.assertEqual(proof, 0)
3177
3178    def test_exception_marking(self):
3179        # Test for the first line marked "Mark exception retrieved."
3180
3181        async def inner(f):
3182            await f
3183            raise RuntimeError('should not be ignored')
3184
3185        a = self.one_loop.create_future()
3186        b = self.one_loop.create_future()
3187
3188        async def outer():
3189            await asyncio.gather(inner(a), inner(b), loop=self.one_loop)
3190
3191        f = asyncio.ensure_future(outer(), loop=self.one_loop)
3192        test_utils.run_briefly(self.one_loop)
3193        a.set_result(None)
3194        test_utils.run_briefly(self.one_loop)
3195        b.set_result(None)
3196        test_utils.run_briefly(self.one_loop)
3197        self.assertIsInstance(f.exception(), RuntimeError)
3198
3199
3200class RunCoroutineThreadsafeTests(test_utils.TestCase):
3201    """Test case for asyncio.run_coroutine_threadsafe."""
3202
3203    def setUp(self):
3204        super().setUp()
3205        self.loop = asyncio.new_event_loop()
3206        self.set_event_loop(self.loop) # Will cleanup properly
3207
3208    async def add(self, a, b, fail=False, cancel=False):
3209        """Wait 0.05 second and return a + b."""
3210        await asyncio.sleep(0.05)
3211        if fail:
3212            raise RuntimeError("Fail!")
3213        if cancel:
3214            asyncio.current_task(self.loop).cancel()
3215            await asyncio.sleep(0)
3216        return a + b
3217
3218    def target(self, fail=False, cancel=False, timeout=None,
3219               advance_coro=False):
3220        """Run add coroutine in the event loop."""
3221        coro = self.add(1, 2, fail=fail, cancel=cancel)
3222        future = asyncio.run_coroutine_threadsafe(coro, self.loop)
3223        if advance_coro:
3224            # this is for test_run_coroutine_threadsafe_task_factory_exception;
3225            # otherwise it spills errors and breaks **other** unittests, since
3226            # 'target' is interacting with threads.
3227
3228            # With this call, `coro` will be advanced, so that
3229            # CoroWrapper.__del__ won't do anything when asyncio tests run
3230            # in debug mode.
3231            self.loop.call_soon_threadsafe(coro.send, None)
3232        try:
3233            return future.result(timeout)
3234        finally:
3235            future.done() or future.cancel()
3236
3237    def test_run_coroutine_threadsafe(self):
3238        """Test coroutine submission from a thread to an event loop."""
3239        future = self.loop.run_in_executor(None, self.target)
3240        result = self.loop.run_until_complete(future)
3241        self.assertEqual(result, 3)
3242
3243    def test_run_coroutine_threadsafe_with_exception(self):
3244        """Test coroutine submission from a thread to an event loop
3245        when an exception is raised."""
3246        future = self.loop.run_in_executor(None, self.target, True)
3247        with self.assertRaises(RuntimeError) as exc_context:
3248            self.loop.run_until_complete(future)
3249        self.assertIn("Fail!", exc_context.exception.args)
3250
3251    def test_run_coroutine_threadsafe_with_timeout(self):
3252        """Test coroutine submission from a thread to an event loop
3253        when a timeout is raised."""
3254        callback = lambda: self.target(timeout=0)
3255        future = self.loop.run_in_executor(None, callback)
3256        with self.assertRaises(asyncio.TimeoutError):
3257            self.loop.run_until_complete(future)
3258        test_utils.run_briefly(self.loop)
3259        # Check that there's no pending task (add has been cancelled)
3260        for task in asyncio.all_tasks(self.loop):
3261            self.assertTrue(task.done())
3262
3263    def test_run_coroutine_threadsafe_task_cancelled(self):
3264        """Test coroutine submission from a tread to an event loop
3265        when the task is cancelled."""
3266        callback = lambda: self.target(cancel=True)
3267        future = self.loop.run_in_executor(None, callback)
3268        with self.assertRaises(asyncio.CancelledError):
3269            self.loop.run_until_complete(future)
3270
3271    def test_run_coroutine_threadsafe_task_factory_exception(self):
3272        """Test coroutine submission from a tread to an event loop
3273        when the task factory raise an exception."""
3274
3275        def task_factory(loop, coro):
3276            raise NameError
3277
3278        run = self.loop.run_in_executor(
3279            None, lambda: self.target(advance_coro=True))
3280
3281        # Set exception handler
3282        callback = test_utils.MockCallback()
3283        self.loop.set_exception_handler(callback)
3284
3285        # Set corrupted task factory
3286        self.loop.set_task_factory(task_factory)
3287
3288        # Run event loop
3289        with self.assertRaises(NameError) as exc_context:
3290            self.loop.run_until_complete(run)
3291
3292        # Check exceptions
3293        self.assertEqual(len(callback.call_args_list), 1)
3294        (loop, context), kwargs = callback.call_args
3295        self.assertEqual(context['exception'], exc_context.exception)
3296
3297
3298class SleepTests(test_utils.TestCase):
3299    def setUp(self):
3300        super().setUp()
3301        self.loop = asyncio.new_event_loop()
3302        self.set_event_loop(self.loop)
3303
3304    def tearDown(self):
3305        self.loop.close()
3306        self.loop = None
3307        super().tearDown()
3308
3309    def test_sleep_zero(self):
3310        result = 0
3311
3312        def inc_result(num):
3313            nonlocal result
3314            result += num
3315
3316        async def coro():
3317            self.loop.call_soon(inc_result, 1)
3318            self.assertEqual(result, 0)
3319            num = await asyncio.sleep(0, result=10)
3320            self.assertEqual(result, 1) # inc'ed by call_soon
3321            inc_result(num) # num should be 11
3322
3323        self.loop.run_until_complete(coro())
3324        self.assertEqual(result, 11)
3325
3326    def test_loop_argument_is_deprecated(self):
3327        # Remove test when loop argument is removed in Python 3.10
3328        with self.assertWarns(DeprecationWarning):
3329            self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop))
3330
3331
3332class WaitTests(test_utils.TestCase):
3333    def setUp(self):
3334        super().setUp()
3335        self.loop = asyncio.new_event_loop()
3336        self.set_event_loop(self.loop)
3337
3338    def tearDown(self):
3339        self.loop.close()
3340        self.loop = None
3341        super().tearDown()
3342
3343    def test_loop_argument_is_deprecated_in_wait(self):
3344        # Remove test when loop argument is removed in Python 3.10
3345        with self.assertWarns(DeprecationWarning):
3346            self.loop.run_until_complete(
3347                asyncio.wait([coroutine_function()], loop=self.loop))
3348
3349    def test_loop_argument_is_deprecated_in_wait_for(self):
3350        # Remove test when loop argument is removed in Python 3.10
3351        with self.assertWarns(DeprecationWarning):
3352            self.loop.run_until_complete(
3353                asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop))
3354
3355
3356class CompatibilityTests(test_utils.TestCase):
3357    # Tests for checking a bridge between old-styled coroutines
3358    # and async/await syntax
3359
3360    def setUp(self):
3361        super().setUp()
3362        self.loop = asyncio.new_event_loop()
3363        self.set_event_loop(self.loop)
3364
3365    def tearDown(self):
3366        self.loop.close()
3367        self.loop = None
3368        super().tearDown()
3369
3370    def test_yield_from_awaitable(self):
3371
3372        with self.assertWarns(DeprecationWarning):
3373            @asyncio.coroutine
3374            def coro():
3375                yield from asyncio.sleep(0)
3376                return 'ok'
3377
3378        result = self.loop.run_until_complete(coro())
3379        self.assertEqual('ok', result)
3380
3381    def test_await_old_style_coro(self):
3382
3383        with self.assertWarns(DeprecationWarning):
3384            @asyncio.coroutine
3385            def coro1():
3386                return 'ok1'
3387
3388        with self.assertWarns(DeprecationWarning):
3389            @asyncio.coroutine
3390            def coro2():
3391                yield from asyncio.sleep(0)
3392                return 'ok2'
3393
3394        async def inner():
3395            return await asyncio.gather(coro1(), coro2(), loop=self.loop)
3396
3397        result = self.loop.run_until_complete(inner())
3398        self.assertEqual(['ok1', 'ok2'], result)
3399
3400    def test_debug_mode_interop(self):
3401        # https://bugs.python.org/issue32636
3402        code = textwrap.dedent("""
3403            import asyncio
3404
3405            async def native_coro():
3406                pass
3407
3408            @asyncio.coroutine
3409            def old_style_coro():
3410                yield from native_coro()
3411
3412            asyncio.run(old_style_coro())
3413        """)
3414
3415        assert_python_ok("-Wignore::DeprecationWarning", "-c", code,
3416                         PYTHONASYNCIODEBUG="1")
3417
3418
3419if __name__ == '__main__':
3420    unittest.main()
3421