1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import traceback 14import types 15import unittest 16import weakref 17from unittest import mock 18 19import asyncio 20from asyncio import coroutines 21from asyncio import futures 22from asyncio import tasks 23from test.test_asyncio import utils as test_utils 24from test import support 25from test.support.script_helper import assert_python_ok 26 27 28def tearDownModule(): 29 asyncio.set_event_loop_policy(None) 30 31 32async def coroutine_function(): 33 pass 34 35 36@contextlib.contextmanager 37def set_coroutine_debug(enabled): 38 coroutines = asyncio.coroutines 39 40 old_debug = coroutines._DEBUG 41 try: 42 coroutines._DEBUG = enabled 43 yield 44 finally: 45 coroutines._DEBUG = old_debug 46 47 48def format_coroutine(qualname, state, src, source_traceback, generator=False): 49 if generator: 50 state = '%s' % state 51 else: 52 state = '%s, defined' % state 53 if source_traceback is not None: 54 frame = source_traceback[-1] 55 return ('coro=<%s() %s at %s> created at %s:%s' 56 % (qualname, state, src, frame[0], frame[1])) 57 else: 58 return 'coro=<%s() %s at %s>' % (qualname, state, src) 59 60 61def get_innermost_context(exc): 62 """ 63 Return information about the innermost exception context in the chain. 64 """ 65 depth = 0 66 while True: 67 context = exc.__context__ 68 if context is None: 69 break 70 71 exc = context 72 depth += 1 73 74 return (type(exc), exc.args, depth) 75 76 77class Dummy: 78 79 def __repr__(self): 80 return '<Dummy>' 81 82 def __call__(self, *args): 83 pass 84 85 86class CoroLikeObject: 87 def send(self, v): 88 raise StopIteration(42) 89 90 def throw(self, *exc): 91 pass 92 93 def close(self): 94 pass 95 96 def __await__(self): 97 return self 98 99 100# The following value can be used as a very small timeout: 101# it passes check "timeout > 0", but has almost 102# no effect on the test performance 103_EPSILON = 0.0001 104 105 106class BaseTaskTests: 107 108 Task = None 109 Future = None 110 111 def new_task(self, loop, coro, name='TestTask'): 112 return self.__class__.Task(coro, loop=loop, name=name) 113 114 def new_future(self, loop): 115 return self.__class__.Future(loop=loop) 116 117 def setUp(self): 118 super().setUp() 119 self.loop = self.new_test_loop() 120 self.loop.set_task_factory(self.new_task) 121 self.loop.create_future = lambda: self.new_future(self.loop) 122 123 def test_task_cancel_message_getter(self): 124 async def coro(): 125 pass 126 t = self.new_task(self.loop, coro()) 127 self.assertTrue(hasattr(t, '_cancel_message')) 128 self.assertEqual(t._cancel_message, None) 129 130 t.cancel('my message') 131 self.assertEqual(t._cancel_message, 'my message') 132 133 with self.assertRaises(asyncio.CancelledError): 134 self.loop.run_until_complete(t) 135 136 def test_task_cancel_message_setter(self): 137 async def coro(): 138 pass 139 t = self.new_task(self.loop, coro()) 140 t.cancel('my message') 141 t._cancel_message = 'my new message' 142 self.assertEqual(t._cancel_message, 'my new message') 143 144 with self.assertRaises(asyncio.CancelledError): 145 self.loop.run_until_complete(t) 146 147 def test_task_del_collect(self): 148 class Evil: 149 def __del__(self): 150 gc.collect() 151 152 async def run(): 153 return Evil() 154 155 self.loop.run_until_complete( 156 asyncio.gather(*[ 157 self.new_task(self.loop, run()) for _ in range(100) 158 ])) 159 160 def test_other_loop_future(self): 161 other_loop = asyncio.new_event_loop() 162 fut = self.new_future(other_loop) 163 164 async def run(fut): 165 await fut 166 167 try: 168 with self.assertRaisesRegex(RuntimeError, 169 r'Task .* got Future .* attached'): 170 self.loop.run_until_complete(run(fut)) 171 finally: 172 other_loop.close() 173 174 def test_task_awaits_on_itself(self): 175 176 async def test(): 177 await task 178 179 task = asyncio.ensure_future(test(), loop=self.loop) 180 181 with self.assertRaisesRegex(RuntimeError, 182 'Task cannot await on itself'): 183 self.loop.run_until_complete(task) 184 185 def test_task_class(self): 186 async def notmuch(): 187 return 'ok' 188 t = self.new_task(self.loop, notmuch()) 189 self.loop.run_until_complete(t) 190 self.assertTrue(t.done()) 191 self.assertEqual(t.result(), 'ok') 192 self.assertIs(t._loop, self.loop) 193 self.assertIs(t.get_loop(), self.loop) 194 195 loop = asyncio.new_event_loop() 196 self.set_event_loop(loop) 197 t = self.new_task(loop, notmuch()) 198 self.assertIs(t._loop, loop) 199 loop.run_until_complete(t) 200 loop.close() 201 202 def test_ensure_future_coroutine(self): 203 with self.assertWarns(DeprecationWarning): 204 @asyncio.coroutine 205 def notmuch(): 206 return 'ok' 207 t = asyncio.ensure_future(notmuch(), loop=self.loop) 208 self.loop.run_until_complete(t) 209 self.assertTrue(t.done()) 210 self.assertEqual(t.result(), 'ok') 211 self.assertIs(t._loop, self.loop) 212 213 loop = asyncio.new_event_loop() 214 self.set_event_loop(loop) 215 t = asyncio.ensure_future(notmuch(), loop=loop) 216 self.assertIs(t._loop, loop) 217 loop.run_until_complete(t) 218 loop.close() 219 220 def test_ensure_future_future(self): 221 f_orig = self.new_future(self.loop) 222 f_orig.set_result('ko') 223 224 f = asyncio.ensure_future(f_orig) 225 self.loop.run_until_complete(f) 226 self.assertTrue(f.done()) 227 self.assertEqual(f.result(), 'ko') 228 self.assertIs(f, f_orig) 229 230 loop = asyncio.new_event_loop() 231 self.set_event_loop(loop) 232 233 with self.assertRaises(ValueError): 234 f = asyncio.ensure_future(f_orig, loop=loop) 235 236 loop.close() 237 238 f = asyncio.ensure_future(f_orig, loop=self.loop) 239 self.assertIs(f, f_orig) 240 241 def test_ensure_future_task(self): 242 async def notmuch(): 243 return 'ok' 244 t_orig = self.new_task(self.loop, notmuch()) 245 t = asyncio.ensure_future(t_orig) 246 self.loop.run_until_complete(t) 247 self.assertTrue(t.done()) 248 self.assertEqual(t.result(), 'ok') 249 self.assertIs(t, t_orig) 250 251 loop = asyncio.new_event_loop() 252 self.set_event_loop(loop) 253 254 with self.assertRaises(ValueError): 255 t = asyncio.ensure_future(t_orig, loop=loop) 256 257 loop.close() 258 259 t = asyncio.ensure_future(t_orig, loop=self.loop) 260 self.assertIs(t, t_orig) 261 262 def test_ensure_future_awaitable(self): 263 class Aw: 264 def __init__(self, coro): 265 self.coro = coro 266 def __await__(self): 267 return (yield from self.coro) 268 269 with self.assertWarns(DeprecationWarning): 270 @asyncio.coroutine 271 def coro(): 272 return 'ok' 273 274 loop = asyncio.new_event_loop() 275 self.set_event_loop(loop) 276 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 277 loop.run_until_complete(fut) 278 assert fut.result() == 'ok' 279 280 def test_ensure_future_neither(self): 281 with self.assertRaises(TypeError): 282 asyncio.ensure_future('ok') 283 284 def test_ensure_future_error_msg(self): 285 loop = asyncio.new_event_loop() 286 f = self.new_future(self.loop) 287 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 288 'different loop than the one specified as ' 289 'the loop argument'): 290 asyncio.ensure_future(f, loop=loop) 291 loop.close() 292 293 def test_get_stack(self): 294 T = None 295 296 async def foo(): 297 await bar() 298 299 async def bar(): 300 # test get_stack() 301 f = T.get_stack(limit=1) 302 try: 303 self.assertEqual(f[0].f_code.co_name, 'foo') 304 finally: 305 f = None 306 307 # test print_stack() 308 file = io.StringIO() 309 T.print_stack(limit=1, file=file) 310 file.seek(0) 311 tb = file.read() 312 self.assertRegex(tb, r'foo\(\) running') 313 314 async def runner(): 315 nonlocal T 316 T = asyncio.ensure_future(foo(), loop=self.loop) 317 await T 318 319 self.loop.run_until_complete(runner()) 320 321 def test_task_repr(self): 322 self.loop.set_debug(False) 323 324 async def notmuch(): 325 return 'abc' 326 327 # test coroutine function 328 self.assertEqual(notmuch.__name__, 'notmuch') 329 self.assertRegex(notmuch.__qualname__, 330 r'\w+.test_task_repr.<locals>.notmuch') 331 self.assertEqual(notmuch.__module__, __name__) 332 333 filename, lineno = test_utils.get_function_source(notmuch) 334 src = "%s:%s" % (filename, lineno) 335 336 # test coroutine object 337 gen = notmuch() 338 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 339 self.assertEqual(gen.__name__, 'notmuch') 340 self.assertEqual(gen.__qualname__, coro_qualname) 341 342 # test pending Task 343 t = self.new_task(self.loop, gen) 344 t.add_done_callback(Dummy()) 345 346 coro = format_coroutine(coro_qualname, 'running', src, 347 t._source_traceback, generator=True) 348 self.assertEqual(repr(t), 349 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 350 351 # test cancelling Task 352 t.cancel() # Does not take immediate effect! 353 self.assertEqual(repr(t), 354 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 355 356 # test cancelled Task 357 self.assertRaises(asyncio.CancelledError, 358 self.loop.run_until_complete, t) 359 coro = format_coroutine(coro_qualname, 'done', src, 360 t._source_traceback) 361 self.assertEqual(repr(t), 362 "<Task cancelled name='TestTask' %s>" % coro) 363 364 # test finished Task 365 t = self.new_task(self.loop, notmuch()) 366 self.loop.run_until_complete(t) 367 coro = format_coroutine(coro_qualname, 'done', src, 368 t._source_traceback) 369 self.assertEqual(repr(t), 370 "<Task finished name='TestTask' %s result='abc'>" % coro) 371 372 def test_task_repr_autogenerated(self): 373 async def notmuch(): 374 return 123 375 376 t1 = self.new_task(self.loop, notmuch(), None) 377 t2 = self.new_task(self.loop, notmuch(), None) 378 self.assertNotEqual(repr(t1), repr(t2)) 379 380 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 381 self.assertIsNotNone(match1) 382 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 383 self.assertIsNotNone(match2) 384 385 # Autogenerated task names should have monotonically increasing numbers 386 self.assertLess(int(match1.group(1)), int(match2.group(1))) 387 self.loop.run_until_complete(t1) 388 self.loop.run_until_complete(t2) 389 390 def test_task_repr_name_not_str(self): 391 async def notmuch(): 392 return 123 393 394 t = self.new_task(self.loop, notmuch()) 395 t.set_name({6}) 396 self.assertEqual(t.get_name(), '{6}') 397 self.loop.run_until_complete(t) 398 399 def test_task_repr_coro_decorator(self): 400 self.loop.set_debug(False) 401 402 with self.assertWarns(DeprecationWarning): 403 @asyncio.coroutine 404 def notmuch(): 405 # notmuch() function doesn't use yield from: it will be wrapped by 406 # @coroutine decorator 407 return 123 408 409 # test coroutine function 410 self.assertEqual(notmuch.__name__, 'notmuch') 411 self.assertRegex(notmuch.__qualname__, 412 r'\w+.test_task_repr_coro_decorator' 413 r'\.<locals>\.notmuch') 414 self.assertEqual(notmuch.__module__, __name__) 415 416 # test coroutine object 417 gen = notmuch() 418 # On Python >= 3.5, generators now inherit the name of the 419 # function, as expected, and have a qualified name (__qualname__ 420 # attribute). 421 coro_name = 'notmuch' 422 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 423 '.<locals>.notmuch') 424 self.assertEqual(gen.__name__, coro_name) 425 self.assertEqual(gen.__qualname__, coro_qualname) 426 427 # test repr(CoroWrapper) 428 if coroutines._DEBUG: 429 # format the coroutine object 430 if coroutines._DEBUG: 431 filename, lineno = test_utils.get_function_source(notmuch) 432 frame = gen._source_traceback[-1] 433 coro = ('%s() running, defined at %s:%s, created at %s:%s' 434 % (coro_qualname, filename, lineno, 435 frame[0], frame[1])) 436 else: 437 code = gen.gi_code 438 coro = ('%s() running at %s:%s' 439 % (coro_qualname, code.co_filename, 440 code.co_firstlineno)) 441 442 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 443 444 # test pending Task 445 t = self.new_task(self.loop, gen) 446 t.add_done_callback(Dummy()) 447 448 # format the coroutine object 449 if coroutines._DEBUG: 450 src = '%s:%s' % test_utils.get_function_source(notmuch) 451 else: 452 code = gen.gi_code 453 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 454 coro = format_coroutine(coro_qualname, 'running', src, 455 t._source_traceback, 456 generator=not coroutines._DEBUG) 457 self.assertEqual(repr(t), 458 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 459 self.loop.run_until_complete(t) 460 461 def test_task_repr_wait_for(self): 462 self.loop.set_debug(False) 463 464 async def wait_for(fut): 465 return await fut 466 467 fut = self.new_future(self.loop) 468 task = self.new_task(self.loop, wait_for(fut)) 469 test_utils.run_briefly(self.loop) 470 self.assertRegex(repr(task), 471 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 472 473 fut.set_result(None) 474 self.loop.run_until_complete(task) 475 476 def test_task_repr_partial_corowrapper(self): 477 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 478 # coroutine is a partial function 479 with set_coroutine_debug(True): 480 self.loop.set_debug(True) 481 482 async def func(x, y): 483 await asyncio.sleep(0) 484 485 with self.assertWarns(DeprecationWarning): 486 partial_func = asyncio.coroutine(functools.partial(func, 1)) 487 task = self.loop.create_task(partial_func(2)) 488 489 # make warnings quiet 490 task._log_destroy_pending = False 491 self.addCleanup(task._coro.close) 492 493 coro_repr = repr(task._coro) 494 expected = ( 495 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 496 r'\.<locals>\.func at' 497 ) 498 self.assertRegex(coro_repr, expected) 499 500 def test_task_basics(self): 501 502 async def outer(): 503 a = await inner1() 504 b = await inner2() 505 return a+b 506 507 async def inner1(): 508 return 42 509 510 async def inner2(): 511 return 1000 512 513 t = outer() 514 self.assertEqual(self.loop.run_until_complete(t), 1042) 515 516 def test_exception_chaining_after_await(self): 517 # Test that when awaiting on a task when an exception is already 518 # active, if the task raises an exception it will be chained 519 # with the original. 520 loop = asyncio.new_event_loop() 521 self.set_event_loop(loop) 522 523 async def raise_error(): 524 raise ValueError 525 526 async def run(): 527 try: 528 raise KeyError(3) 529 except Exception as exc: 530 task = self.new_task(loop, raise_error()) 531 try: 532 await task 533 except Exception as exc: 534 self.assertEqual(type(exc), ValueError) 535 chained = exc.__context__ 536 self.assertEqual((type(chained), chained.args), 537 (KeyError, (3,))) 538 539 try: 540 task = self.new_task(loop, run()) 541 loop.run_until_complete(task) 542 finally: 543 loop.close() 544 545 def test_exception_chaining_after_await_with_context_cycle(self): 546 # Check trying to create an exception context cycle: 547 # https://bugs.python.org/issue40696 548 has_cycle = None 549 loop = asyncio.new_event_loop() 550 self.set_event_loop(loop) 551 552 async def process_exc(exc): 553 raise exc 554 555 async def run(): 556 nonlocal has_cycle 557 try: 558 raise KeyError('a') 559 except Exception as exc: 560 task = self.new_task(loop, process_exc(exc)) 561 try: 562 await task 563 except BaseException as exc: 564 has_cycle = (exc is exc.__context__) 565 # Prevent a hang if has_cycle is True. 566 exc.__context__ = None 567 568 try: 569 task = self.new_task(loop, run()) 570 loop.run_until_complete(task) 571 finally: 572 loop.close() 573 # This also distinguishes from the initial has_cycle=None. 574 self.assertEqual(has_cycle, False) 575 576 def test_cancel(self): 577 578 def gen(): 579 when = yield 580 self.assertAlmostEqual(10.0, when) 581 yield 0 582 583 loop = self.new_test_loop(gen) 584 585 async def task(): 586 await asyncio.sleep(10.0) 587 return 12 588 589 t = self.new_task(loop, task()) 590 loop.call_soon(t.cancel) 591 with self.assertRaises(asyncio.CancelledError): 592 loop.run_until_complete(t) 593 self.assertTrue(t.done()) 594 self.assertTrue(t.cancelled()) 595 self.assertFalse(t.cancel()) 596 597 def test_cancel_with_message_then_future_result(self): 598 # Test Future.result() after calling cancel() with a message. 599 cases = [ 600 ((), ()), 601 ((None,), ()), 602 (('my message',), ('my message',)), 603 # Non-string values should roundtrip. 604 ((5,), (5,)), 605 ] 606 for cancel_args, expected_args in cases: 607 with self.subTest(cancel_args=cancel_args): 608 loop = asyncio.new_event_loop() 609 self.set_event_loop(loop) 610 611 async def sleep(): 612 await asyncio.sleep(10) 613 614 async def coro(): 615 task = self.new_task(loop, sleep()) 616 await asyncio.sleep(0) 617 task.cancel(*cancel_args) 618 done, pending = await asyncio.wait([task]) 619 task.result() 620 621 task = self.new_task(loop, coro()) 622 with self.assertRaises(asyncio.CancelledError) as cm: 623 loop.run_until_complete(task) 624 exc = cm.exception 625 self.assertEqual(exc.args, ()) 626 627 actual = get_innermost_context(exc) 628 self.assertEqual(actual, 629 (asyncio.CancelledError, expected_args, 2)) 630 631 def test_cancel_with_message_then_future_exception(self): 632 # Test Future.exception() after calling cancel() with a message. 633 cases = [ 634 ((), ()), 635 ((None,), ()), 636 (('my message',), ('my message',)), 637 # Non-string values should roundtrip. 638 ((5,), (5,)), 639 ] 640 for cancel_args, expected_args in cases: 641 with self.subTest(cancel_args=cancel_args): 642 loop = asyncio.new_event_loop() 643 self.set_event_loop(loop) 644 645 async def sleep(): 646 await asyncio.sleep(10) 647 648 async def coro(): 649 task = self.new_task(loop, sleep()) 650 await asyncio.sleep(0) 651 task.cancel(*cancel_args) 652 done, pending = await asyncio.wait([task]) 653 task.exception() 654 655 task = self.new_task(loop, coro()) 656 with self.assertRaises(asyncio.CancelledError) as cm: 657 loop.run_until_complete(task) 658 exc = cm.exception 659 self.assertEqual(exc.args, ()) 660 661 actual = get_innermost_context(exc) 662 self.assertEqual(actual, 663 (asyncio.CancelledError, expected_args, 2)) 664 665 def test_cancel_with_message_before_starting_task(self): 666 loop = asyncio.new_event_loop() 667 self.set_event_loop(loop) 668 669 async def sleep(): 670 await asyncio.sleep(10) 671 672 async def coro(): 673 task = self.new_task(loop, sleep()) 674 # We deliberately leave out the sleep here. 675 task.cancel('my message') 676 done, pending = await asyncio.wait([task]) 677 task.exception() 678 679 task = self.new_task(loop, coro()) 680 with self.assertRaises(asyncio.CancelledError) as cm: 681 loop.run_until_complete(task) 682 exc = cm.exception 683 self.assertEqual(exc.args, ()) 684 685 actual = get_innermost_context(exc) 686 self.assertEqual(actual, 687 (asyncio.CancelledError, ('my message',), 2)) 688 689 def test_cancel_yield(self): 690 with self.assertWarns(DeprecationWarning): 691 @asyncio.coroutine 692 def task(): 693 yield 694 yield 695 return 12 696 697 t = self.new_task(self.loop, task()) 698 test_utils.run_briefly(self.loop) # start coro 699 t.cancel() 700 self.assertRaises( 701 asyncio.CancelledError, self.loop.run_until_complete, t) 702 self.assertTrue(t.done()) 703 self.assertTrue(t.cancelled()) 704 self.assertFalse(t.cancel()) 705 706 def test_cancel_inner_future(self): 707 f = self.new_future(self.loop) 708 709 async def task(): 710 await f 711 return 12 712 713 t = self.new_task(self.loop, task()) 714 test_utils.run_briefly(self.loop) # start task 715 f.cancel() 716 with self.assertRaises(asyncio.CancelledError): 717 self.loop.run_until_complete(t) 718 self.assertTrue(f.cancelled()) 719 self.assertTrue(t.cancelled()) 720 721 def test_cancel_both_task_and_inner_future(self): 722 f = self.new_future(self.loop) 723 724 async def task(): 725 await f 726 return 12 727 728 t = self.new_task(self.loop, task()) 729 test_utils.run_briefly(self.loop) 730 731 f.cancel() 732 t.cancel() 733 734 with self.assertRaises(asyncio.CancelledError): 735 self.loop.run_until_complete(t) 736 737 self.assertTrue(t.done()) 738 self.assertTrue(f.cancelled()) 739 self.assertTrue(t.cancelled()) 740 741 def test_cancel_task_catching(self): 742 fut1 = self.new_future(self.loop) 743 fut2 = self.new_future(self.loop) 744 745 async def task(): 746 await fut1 747 try: 748 await fut2 749 except asyncio.CancelledError: 750 return 42 751 752 t = self.new_task(self.loop, task()) 753 test_utils.run_briefly(self.loop) 754 self.assertIs(t._fut_waiter, fut1) # White-box test. 755 fut1.set_result(None) 756 test_utils.run_briefly(self.loop) 757 self.assertIs(t._fut_waiter, fut2) # White-box test. 758 t.cancel() 759 self.assertTrue(fut2.cancelled()) 760 res = self.loop.run_until_complete(t) 761 self.assertEqual(res, 42) 762 self.assertFalse(t.cancelled()) 763 764 def test_cancel_task_ignoring(self): 765 fut1 = self.new_future(self.loop) 766 fut2 = self.new_future(self.loop) 767 fut3 = self.new_future(self.loop) 768 769 async def task(): 770 await fut1 771 try: 772 await fut2 773 except asyncio.CancelledError: 774 pass 775 res = await fut3 776 return res 777 778 t = self.new_task(self.loop, task()) 779 test_utils.run_briefly(self.loop) 780 self.assertIs(t._fut_waiter, fut1) # White-box test. 781 fut1.set_result(None) 782 test_utils.run_briefly(self.loop) 783 self.assertIs(t._fut_waiter, fut2) # White-box test. 784 t.cancel() 785 self.assertTrue(fut2.cancelled()) 786 test_utils.run_briefly(self.loop) 787 self.assertIs(t._fut_waiter, fut3) # White-box test. 788 fut3.set_result(42) 789 res = self.loop.run_until_complete(t) 790 self.assertEqual(res, 42) 791 self.assertFalse(fut3.cancelled()) 792 self.assertFalse(t.cancelled()) 793 794 def test_cancel_current_task(self): 795 loop = asyncio.new_event_loop() 796 self.set_event_loop(loop) 797 798 async def task(): 799 t.cancel() 800 self.assertTrue(t._must_cancel) # White-box test. 801 # The sleep should be cancelled immediately. 802 await asyncio.sleep(100) 803 return 12 804 805 t = self.new_task(loop, task()) 806 self.assertFalse(t.cancelled()) 807 self.assertRaises( 808 asyncio.CancelledError, loop.run_until_complete, t) 809 self.assertTrue(t.done()) 810 self.assertTrue(t.cancelled()) 811 self.assertFalse(t._must_cancel) # White-box test. 812 self.assertFalse(t.cancel()) 813 814 def test_cancel_at_end(self): 815 """coroutine end right after task is cancelled""" 816 loop = asyncio.new_event_loop() 817 self.set_event_loop(loop) 818 819 async def task(): 820 t.cancel() 821 self.assertTrue(t._must_cancel) # White-box test. 822 return 12 823 824 t = self.new_task(loop, task()) 825 self.assertFalse(t.cancelled()) 826 self.assertRaises( 827 asyncio.CancelledError, loop.run_until_complete, t) 828 self.assertTrue(t.done()) 829 self.assertTrue(t.cancelled()) 830 self.assertFalse(t._must_cancel) # White-box test. 831 self.assertFalse(t.cancel()) 832 833 def test_cancel_awaited_task(self): 834 # This tests for a relatively rare condition when 835 # a task cancellation is requested for a task which is not 836 # currently blocked, such as a task cancelling itself. 837 # In this situation we must ensure that whatever next future 838 # or task the cancelled task blocks on is cancelled correctly 839 # as well. See also bpo-34872. 840 loop = asyncio.new_event_loop() 841 self.addCleanup(lambda: loop.close()) 842 843 task = nested_task = None 844 fut = self.new_future(loop) 845 846 async def nested(): 847 await fut 848 849 async def coro(): 850 nonlocal nested_task 851 # Create a sub-task and wait for it to run. 852 nested_task = self.new_task(loop, nested()) 853 await asyncio.sleep(0) 854 855 # Request the current task to be cancelled. 856 task.cancel() 857 # Block on the nested task, which should be immediately 858 # cancelled. 859 await nested_task 860 861 task = self.new_task(loop, coro()) 862 with self.assertRaises(asyncio.CancelledError): 863 loop.run_until_complete(task) 864 865 self.assertTrue(task.cancelled()) 866 self.assertTrue(nested_task.cancelled()) 867 self.assertTrue(fut.cancelled()) 868 869 def assert_text_contains(self, text, substr): 870 if substr not in text: 871 raise RuntimeError(f'text {substr!r} not found in:\n>>>{text}<<<') 872 873 def test_cancel_traceback_for_future_result(self): 874 # When calling Future.result() on a cancelled task, check that the 875 # line of code that was interrupted is included in the traceback. 876 loop = asyncio.new_event_loop() 877 self.set_event_loop(loop) 878 879 async def nested(): 880 # This will get cancelled immediately. 881 await asyncio.sleep(10) 882 883 async def coro(): 884 task = self.new_task(loop, nested()) 885 await asyncio.sleep(0) 886 task.cancel() 887 await task # search target 888 889 task = self.new_task(loop, coro()) 890 try: 891 loop.run_until_complete(task) 892 except asyncio.CancelledError: 893 tb = traceback.format_exc() 894 self.assert_text_contains(tb, "await asyncio.sleep(10)") 895 # The intermediate await should also be included. 896 self.assert_text_contains(tb, "await task # search target") 897 else: 898 self.fail('CancelledError did not occur') 899 900 def test_cancel_traceback_for_future_exception(self): 901 # When calling Future.exception() on a cancelled task, check that the 902 # line of code that was interrupted is included in the traceback. 903 loop = asyncio.new_event_loop() 904 self.set_event_loop(loop) 905 906 async def nested(): 907 # This will get cancelled immediately. 908 await asyncio.sleep(10) 909 910 async def coro(): 911 task = self.new_task(loop, nested()) 912 await asyncio.sleep(0) 913 task.cancel() 914 done, pending = await asyncio.wait([task]) 915 task.exception() # search target 916 917 task = self.new_task(loop, coro()) 918 try: 919 loop.run_until_complete(task) 920 except asyncio.CancelledError: 921 tb = traceback.format_exc() 922 self.assert_text_contains(tb, "await asyncio.sleep(10)") 923 # The intermediate await should also be included. 924 self.assert_text_contains(tb, 925 "task.exception() # search target") 926 else: 927 self.fail('CancelledError did not occur') 928 929 def test_stop_while_run_in_complete(self): 930 931 def gen(): 932 when = yield 933 self.assertAlmostEqual(0.1, when) 934 when = yield 0.1 935 self.assertAlmostEqual(0.2, when) 936 when = yield 0.1 937 self.assertAlmostEqual(0.3, when) 938 yield 0.1 939 940 loop = self.new_test_loop(gen) 941 942 x = 0 943 944 async def task(): 945 nonlocal x 946 while x < 10: 947 await asyncio.sleep(0.1) 948 x += 1 949 if x == 2: 950 loop.stop() 951 952 t = self.new_task(loop, task()) 953 with self.assertRaises(RuntimeError) as cm: 954 loop.run_until_complete(t) 955 self.assertEqual(str(cm.exception), 956 'Event loop stopped before Future completed.') 957 self.assertFalse(t.done()) 958 self.assertEqual(x, 2) 959 self.assertAlmostEqual(0.3, loop.time()) 960 961 t.cancel() 962 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 963 964 def test_log_traceback(self): 965 async def coro(): 966 pass 967 968 task = self.new_task(self.loop, coro()) 969 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 970 task._log_traceback = True 971 self.loop.run_until_complete(task) 972 973 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 974 def gen(): 975 when = yield 976 self.assertAlmostEqual(0, when) 977 978 loop = self.new_test_loop(gen) 979 980 fut = self.new_future(loop) 981 fut.set_result('done') 982 983 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 984 985 self.assertEqual(ret, 'done') 986 self.assertTrue(fut.done()) 987 self.assertAlmostEqual(0, loop.time()) 988 989 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 990 def gen(): 991 when = yield 992 self.assertAlmostEqual(0, when) 993 994 loop = self.new_test_loop(gen) 995 996 foo_started = False 997 998 async def foo(): 999 nonlocal foo_started 1000 foo_started = True 1001 1002 with self.assertRaises(asyncio.TimeoutError): 1003 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 1004 1005 self.assertAlmostEqual(0, loop.time()) 1006 self.assertEqual(foo_started, False) 1007 1008 def test_wait_for_timeout_less_then_0_or_0(self): 1009 def gen(): 1010 when = yield 1011 self.assertAlmostEqual(0.2, when) 1012 when = yield 0 1013 self.assertAlmostEqual(0, when) 1014 1015 for timeout in [0, -1]: 1016 with self.subTest(timeout=timeout): 1017 loop = self.new_test_loop(gen) 1018 1019 foo_running = None 1020 1021 async def foo(): 1022 nonlocal foo_running 1023 foo_running = True 1024 try: 1025 await asyncio.sleep(0.2) 1026 finally: 1027 foo_running = False 1028 return 'done' 1029 1030 fut = self.new_task(loop, foo()) 1031 1032 with self.assertRaises(asyncio.TimeoutError): 1033 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 1034 self.assertTrue(fut.done()) 1035 # it should have been cancelled due to the timeout 1036 self.assertTrue(fut.cancelled()) 1037 self.assertAlmostEqual(0, loop.time()) 1038 self.assertEqual(foo_running, False) 1039 1040 def test_wait_for(self): 1041 1042 def gen(): 1043 when = yield 1044 self.assertAlmostEqual(0.2, when) 1045 when = yield 0 1046 self.assertAlmostEqual(0.1, when) 1047 when = yield 0.1 1048 1049 loop = self.new_test_loop(gen) 1050 1051 foo_running = None 1052 1053 async def foo(): 1054 nonlocal foo_running 1055 foo_running = True 1056 try: 1057 await asyncio.sleep(0.2) 1058 finally: 1059 foo_running = False 1060 return 'done' 1061 1062 fut = self.new_task(loop, foo()) 1063 1064 with self.assertRaises(asyncio.TimeoutError): 1065 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 1066 self.assertTrue(fut.done()) 1067 # it should have been cancelled due to the timeout 1068 self.assertTrue(fut.cancelled()) 1069 self.assertAlmostEqual(0.1, loop.time()) 1070 self.assertEqual(foo_running, False) 1071 1072 def test_wait_for_blocking(self): 1073 loop = self.new_test_loop() 1074 1075 async def coro(): 1076 return 'done' 1077 1078 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 1079 self.assertEqual(res, 'done') 1080 1081 def test_wait_for_with_global_loop(self): 1082 1083 def gen(): 1084 when = yield 1085 self.assertAlmostEqual(0.2, when) 1086 when = yield 0 1087 self.assertAlmostEqual(0.01, when) 1088 yield 0.01 1089 1090 loop = self.new_test_loop(gen) 1091 1092 async def foo(): 1093 await asyncio.sleep(0.2) 1094 return 'done' 1095 1096 asyncio.set_event_loop(loop) 1097 try: 1098 fut = self.new_task(loop, foo()) 1099 with self.assertRaises(asyncio.TimeoutError): 1100 loop.run_until_complete(asyncio.wait_for(fut, 0.01)) 1101 finally: 1102 asyncio.set_event_loop(None) 1103 1104 self.assertAlmostEqual(0.01, loop.time()) 1105 self.assertTrue(fut.done()) 1106 self.assertTrue(fut.cancelled()) 1107 1108 def test_wait_for_race_condition(self): 1109 1110 def gen(): 1111 yield 0.1 1112 yield 0.1 1113 yield 0.1 1114 1115 loop = self.new_test_loop(gen) 1116 1117 fut = self.new_future(loop) 1118 task = asyncio.wait_for(fut, timeout=0.2) 1119 loop.call_later(0.1, fut.set_result, "ok") 1120 res = loop.run_until_complete(task) 1121 self.assertEqual(res, "ok") 1122 1123 def test_wait_for_cancellation_race_condition(self): 1124 def gen(): 1125 yield 0.1 1126 yield 0.1 1127 yield 0.1 1128 yield 0.1 1129 1130 loop = self.new_test_loop(gen) 1131 1132 fut = self.new_future(loop) 1133 loop.call_later(0.1, fut.set_result, "ok") 1134 task = loop.create_task(asyncio.wait_for(fut, timeout=1)) 1135 loop.call_later(0.1, task.cancel) 1136 res = loop.run_until_complete(task) 1137 self.assertEqual(res, "ok") 1138 1139 def test_wait_for_waits_for_task_cancellation(self): 1140 loop = asyncio.new_event_loop() 1141 self.addCleanup(loop.close) 1142 1143 task_done = False 1144 1145 async def foo(): 1146 async def inner(): 1147 nonlocal task_done 1148 try: 1149 await asyncio.sleep(0.2) 1150 except asyncio.CancelledError: 1151 await asyncio.sleep(_EPSILON) 1152 raise 1153 finally: 1154 task_done = True 1155 1156 inner_task = self.new_task(loop, inner()) 1157 1158 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1159 1160 with self.assertRaises(asyncio.TimeoutError) as cm: 1161 loop.run_until_complete(foo()) 1162 1163 self.assertTrue(task_done) 1164 chained = cm.exception.__context__ 1165 self.assertEqual(type(chained), asyncio.CancelledError) 1166 1167 def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 1168 loop = asyncio.new_event_loop() 1169 self.addCleanup(loop.close) 1170 1171 task_done = False 1172 1173 async def foo(): 1174 async def inner(): 1175 nonlocal task_done 1176 try: 1177 await asyncio.sleep(10) 1178 except asyncio.CancelledError: 1179 await asyncio.sleep(_EPSILON) 1180 raise 1181 finally: 1182 task_done = True 1183 1184 inner_task = self.new_task(loop, inner()) 1185 await asyncio.sleep(_EPSILON) 1186 await asyncio.wait_for(inner_task, timeout=0) 1187 1188 with self.assertRaises(asyncio.TimeoutError) as cm: 1189 loop.run_until_complete(foo()) 1190 1191 self.assertTrue(task_done) 1192 chained = cm.exception.__context__ 1193 self.assertEqual(type(chained), asyncio.CancelledError) 1194 1195 def test_wait_for_reraises_exception_during_cancellation(self): 1196 loop = asyncio.new_event_loop() 1197 self.addCleanup(loop.close) 1198 1199 class FooException(Exception): 1200 pass 1201 1202 async def foo(): 1203 async def inner(): 1204 try: 1205 await asyncio.sleep(0.2) 1206 finally: 1207 raise FooException 1208 1209 inner_task = self.new_task(loop, inner()) 1210 1211 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1212 1213 with self.assertRaises(FooException): 1214 loop.run_until_complete(foo()) 1215 1216 def test_wait_for_raises_timeout_error_if_returned_during_cancellation(self): 1217 loop = asyncio.new_event_loop() 1218 self.addCleanup(loop.close) 1219 1220 async def foo(): 1221 async def inner(): 1222 try: 1223 await asyncio.sleep(0.2) 1224 except asyncio.CancelledError: 1225 return 42 1226 1227 inner_task = self.new_task(loop, inner()) 1228 1229 await asyncio.wait_for(inner_task, timeout=_EPSILON) 1230 1231 with self.assertRaises(asyncio.TimeoutError): 1232 loop.run_until_complete(foo()) 1233 1234 def test_wait_for_self_cancellation(self): 1235 loop = asyncio.new_event_loop() 1236 self.addCleanup(loop.close) 1237 1238 async def foo(): 1239 async def inner(): 1240 try: 1241 await asyncio.sleep(0.3) 1242 except asyncio.CancelledError: 1243 try: 1244 await asyncio.sleep(0.3) 1245 except asyncio.CancelledError: 1246 await asyncio.sleep(0.3) 1247 1248 return 42 1249 1250 inner_task = self.new_task(loop, inner()) 1251 1252 wait = asyncio.wait_for(inner_task, timeout=0.1) 1253 1254 # Test that wait_for itself is properly cancellable 1255 # even when the initial task holds up the initial cancellation. 1256 task = self.new_task(loop, wait) 1257 await asyncio.sleep(0.2) 1258 task.cancel() 1259 1260 with self.assertRaises(asyncio.CancelledError): 1261 await task 1262 1263 self.assertEqual(await inner_task, 42) 1264 1265 loop.run_until_complete(foo()) 1266 1267 def test_wait(self): 1268 1269 def gen(): 1270 when = yield 1271 self.assertAlmostEqual(0.1, when) 1272 when = yield 0 1273 self.assertAlmostEqual(0.15, when) 1274 yield 0.15 1275 1276 loop = self.new_test_loop(gen) 1277 1278 a = self.new_task(loop, asyncio.sleep(0.1)) 1279 b = self.new_task(loop, asyncio.sleep(0.15)) 1280 1281 async def foo(): 1282 done, pending = await asyncio.wait([b, a]) 1283 self.assertEqual(done, set([a, b])) 1284 self.assertEqual(pending, set()) 1285 return 42 1286 1287 res = loop.run_until_complete(self.new_task(loop, foo())) 1288 self.assertEqual(res, 42) 1289 self.assertAlmostEqual(0.15, loop.time()) 1290 1291 # Doing it again should take no time and exercise a different path. 1292 res = loop.run_until_complete(self.new_task(loop, foo())) 1293 self.assertAlmostEqual(0.15, loop.time()) 1294 self.assertEqual(res, 42) 1295 1296 def test_wait_with_global_loop(self): 1297 1298 def gen(): 1299 when = yield 1300 self.assertAlmostEqual(0.01, when) 1301 when = yield 0 1302 self.assertAlmostEqual(0.015, when) 1303 yield 0.015 1304 1305 loop = self.new_test_loop(gen) 1306 1307 a = self.new_task(loop, asyncio.sleep(0.01)) 1308 b = self.new_task(loop, asyncio.sleep(0.015)) 1309 1310 async def foo(): 1311 done, pending = await asyncio.wait([b, a]) 1312 self.assertEqual(done, set([a, b])) 1313 self.assertEqual(pending, set()) 1314 return 42 1315 1316 asyncio.set_event_loop(loop) 1317 res = loop.run_until_complete( 1318 self.new_task(loop, foo())) 1319 1320 self.assertEqual(res, 42) 1321 1322 def test_wait_duplicate_coroutines(self): 1323 1324 with self.assertWarns(DeprecationWarning): 1325 @asyncio.coroutine 1326 def coro(s): 1327 return s 1328 c = coro('test') 1329 task = self.new_task( 1330 self.loop, 1331 asyncio.wait([c, c, coro('spam')])) 1332 1333 with self.assertWarns(DeprecationWarning): 1334 done, pending = self.loop.run_until_complete(task) 1335 1336 self.assertFalse(pending) 1337 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1338 1339 def test_wait_errors(self): 1340 self.assertRaises( 1341 ValueError, self.loop.run_until_complete, 1342 asyncio.wait(set())) 1343 1344 # -1 is an invalid return_when value 1345 sleep_coro = asyncio.sleep(10.0) 1346 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1347 self.assertRaises(ValueError, 1348 self.loop.run_until_complete, wait_coro) 1349 1350 sleep_coro.close() 1351 1352 def test_wait_first_completed(self): 1353 1354 def gen(): 1355 when = yield 1356 self.assertAlmostEqual(10.0, when) 1357 when = yield 0 1358 self.assertAlmostEqual(0.1, when) 1359 yield 0.1 1360 1361 loop = self.new_test_loop(gen) 1362 1363 a = self.new_task(loop, asyncio.sleep(10.0)) 1364 b = self.new_task(loop, asyncio.sleep(0.1)) 1365 task = self.new_task( 1366 loop, 1367 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1368 1369 done, pending = loop.run_until_complete(task) 1370 self.assertEqual({b}, done) 1371 self.assertEqual({a}, pending) 1372 self.assertFalse(a.done()) 1373 self.assertTrue(b.done()) 1374 self.assertIsNone(b.result()) 1375 self.assertAlmostEqual(0.1, loop.time()) 1376 1377 # move forward to close generator 1378 loop.advance_time(10) 1379 loop.run_until_complete(asyncio.wait([a, b])) 1380 1381 def test_wait_really_done(self): 1382 # there is possibility that some tasks in the pending list 1383 # became done but their callbacks haven't all been called yet 1384 1385 async def coro1(): 1386 await asyncio.sleep(0) 1387 1388 async def coro2(): 1389 await asyncio.sleep(0) 1390 await asyncio.sleep(0) 1391 1392 a = self.new_task(self.loop, coro1()) 1393 b = self.new_task(self.loop, coro2()) 1394 task = self.new_task( 1395 self.loop, 1396 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1397 1398 done, pending = self.loop.run_until_complete(task) 1399 self.assertEqual({a, b}, done) 1400 self.assertTrue(a.done()) 1401 self.assertIsNone(a.result()) 1402 self.assertTrue(b.done()) 1403 self.assertIsNone(b.result()) 1404 1405 def test_wait_first_exception(self): 1406 1407 def gen(): 1408 when = yield 1409 self.assertAlmostEqual(10.0, when) 1410 yield 0 1411 1412 loop = self.new_test_loop(gen) 1413 1414 # first_exception, task already has exception 1415 a = self.new_task(loop, asyncio.sleep(10.0)) 1416 1417 async def exc(): 1418 raise ZeroDivisionError('err') 1419 1420 b = self.new_task(loop, exc()) 1421 task = self.new_task( 1422 loop, 1423 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1424 1425 done, pending = loop.run_until_complete(task) 1426 self.assertEqual({b}, done) 1427 self.assertEqual({a}, pending) 1428 self.assertAlmostEqual(0, loop.time()) 1429 1430 # move forward to close generator 1431 loop.advance_time(10) 1432 loop.run_until_complete(asyncio.wait([a, b])) 1433 1434 def test_wait_first_exception_in_wait(self): 1435 1436 def gen(): 1437 when = yield 1438 self.assertAlmostEqual(10.0, when) 1439 when = yield 0 1440 self.assertAlmostEqual(0.01, when) 1441 yield 0.01 1442 1443 loop = self.new_test_loop(gen) 1444 1445 # first_exception, exception during waiting 1446 a = self.new_task(loop, asyncio.sleep(10.0)) 1447 1448 async def exc(): 1449 await asyncio.sleep(0.01) 1450 raise ZeroDivisionError('err') 1451 1452 b = self.new_task(loop, exc()) 1453 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1454 1455 done, pending = loop.run_until_complete(task) 1456 self.assertEqual({b}, done) 1457 self.assertEqual({a}, pending) 1458 self.assertAlmostEqual(0.01, loop.time()) 1459 1460 # move forward to close generator 1461 loop.advance_time(10) 1462 loop.run_until_complete(asyncio.wait([a, b])) 1463 1464 def test_wait_with_exception(self): 1465 1466 def gen(): 1467 when = yield 1468 self.assertAlmostEqual(0.1, when) 1469 when = yield 0 1470 self.assertAlmostEqual(0.15, when) 1471 yield 0.15 1472 1473 loop = self.new_test_loop(gen) 1474 1475 a = self.new_task(loop, asyncio.sleep(0.1)) 1476 1477 async def sleeper(): 1478 await asyncio.sleep(0.15) 1479 raise ZeroDivisionError('really') 1480 1481 b = self.new_task(loop, sleeper()) 1482 1483 async def foo(): 1484 done, pending = await asyncio.wait([b, a]) 1485 self.assertEqual(len(done), 2) 1486 self.assertEqual(pending, set()) 1487 errors = set(f for f in done if f.exception() is not None) 1488 self.assertEqual(len(errors), 1) 1489 1490 loop.run_until_complete(self.new_task(loop, foo())) 1491 self.assertAlmostEqual(0.15, loop.time()) 1492 1493 loop.run_until_complete(self.new_task(loop, foo())) 1494 self.assertAlmostEqual(0.15, loop.time()) 1495 1496 def test_wait_with_timeout(self): 1497 1498 def gen(): 1499 when = yield 1500 self.assertAlmostEqual(0.1, when) 1501 when = yield 0 1502 self.assertAlmostEqual(0.15, when) 1503 when = yield 0 1504 self.assertAlmostEqual(0.11, when) 1505 yield 0.11 1506 1507 loop = self.new_test_loop(gen) 1508 1509 a = self.new_task(loop, asyncio.sleep(0.1)) 1510 b = self.new_task(loop, asyncio.sleep(0.15)) 1511 1512 async def foo(): 1513 done, pending = await asyncio.wait([b, a], timeout=0.11) 1514 self.assertEqual(done, set([a])) 1515 self.assertEqual(pending, set([b])) 1516 1517 loop.run_until_complete(self.new_task(loop, foo())) 1518 self.assertAlmostEqual(0.11, loop.time()) 1519 1520 # move forward to close generator 1521 loop.advance_time(10) 1522 loop.run_until_complete(asyncio.wait([a, b])) 1523 1524 def test_wait_concurrent_complete(self): 1525 1526 def gen(): 1527 when = yield 1528 self.assertAlmostEqual(0.1, when) 1529 when = yield 0 1530 self.assertAlmostEqual(0.15, when) 1531 when = yield 0 1532 self.assertAlmostEqual(0.1, when) 1533 yield 0.1 1534 1535 loop = self.new_test_loop(gen) 1536 1537 a = self.new_task(loop, asyncio.sleep(0.1)) 1538 b = self.new_task(loop, asyncio.sleep(0.15)) 1539 1540 done, pending = loop.run_until_complete( 1541 asyncio.wait([b, a], timeout=0.1)) 1542 1543 self.assertEqual(done, set([a])) 1544 self.assertEqual(pending, set([b])) 1545 self.assertAlmostEqual(0.1, loop.time()) 1546 1547 # move forward to close generator 1548 loop.advance_time(10) 1549 loop.run_until_complete(asyncio.wait([a, b])) 1550 1551 def test_wait_with_iterator_of_tasks(self): 1552 1553 def gen(): 1554 when = yield 1555 self.assertAlmostEqual(0.1, when) 1556 when = yield 0 1557 self.assertAlmostEqual(0.15, when) 1558 yield 0.15 1559 1560 loop = self.new_test_loop(gen) 1561 1562 a = self.new_task(loop, asyncio.sleep(0.1)) 1563 b = self.new_task(loop, asyncio.sleep(0.15)) 1564 1565 async def foo(): 1566 done, pending = await asyncio.wait(iter([b, a])) 1567 self.assertEqual(done, set([a, b])) 1568 self.assertEqual(pending, set()) 1569 return 42 1570 1571 res = loop.run_until_complete(self.new_task(loop, foo())) 1572 self.assertEqual(res, 42) 1573 self.assertAlmostEqual(0.15, loop.time()) 1574 1575 def test_as_completed(self): 1576 1577 def gen(): 1578 yield 0 1579 yield 0 1580 yield 0.01 1581 yield 0 1582 1583 loop = self.new_test_loop(gen) 1584 # disable "slow callback" warning 1585 loop.slow_callback_duration = 1.0 1586 completed = set() 1587 time_shifted = False 1588 1589 with self.assertWarns(DeprecationWarning): 1590 @asyncio.coroutine 1591 def sleeper(dt, x): 1592 nonlocal time_shifted 1593 yield from asyncio.sleep(dt) 1594 completed.add(x) 1595 if not time_shifted and 'a' in completed and 'b' in completed: 1596 time_shifted = True 1597 loop.advance_time(0.14) 1598 return x 1599 1600 a = sleeper(0.01, 'a') 1601 b = sleeper(0.01, 'b') 1602 c = sleeper(0.15, 'c') 1603 1604 async def foo(): 1605 values = [] 1606 for f in asyncio.as_completed([b, c, a], loop=loop): 1607 values.append(await f) 1608 return values 1609 with self.assertWarns(DeprecationWarning) as w: 1610 res = loop.run_until_complete(self.new_task(loop, foo())) 1611 self.assertEqual(w.warnings[0].filename, __file__) 1612 self.assertAlmostEqual(0.15, loop.time()) 1613 self.assertTrue('a' in res[:2]) 1614 self.assertTrue('b' in res[:2]) 1615 self.assertEqual(res[2], 'c') 1616 1617 # Doing it again should take no time and exercise a different path. 1618 with self.assertWarns(DeprecationWarning): 1619 res = loop.run_until_complete(self.new_task(loop, foo())) 1620 self.assertAlmostEqual(0.15, loop.time()) 1621 1622 def test_as_completed_with_timeout(self): 1623 1624 def gen(): 1625 yield 1626 yield 0 1627 yield 0 1628 yield 0.1 1629 1630 loop = self.new_test_loop(gen) 1631 1632 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1633 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1634 1635 async def foo(): 1636 values = [] 1637 for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): 1638 if values: 1639 loop.advance_time(0.02) 1640 try: 1641 v = await f 1642 values.append((1, v)) 1643 except asyncio.TimeoutError as exc: 1644 values.append((2, exc)) 1645 return values 1646 1647 with self.assertWarns(DeprecationWarning): 1648 res = loop.run_until_complete(self.new_task(loop, foo())) 1649 self.assertEqual(len(res), 2, res) 1650 self.assertEqual(res[0], (1, 'a')) 1651 self.assertEqual(res[1][0], 2) 1652 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1653 self.assertAlmostEqual(0.12, loop.time()) 1654 1655 # move forward to close generator 1656 loop.advance_time(10) 1657 loop.run_until_complete(asyncio.wait([a, b])) 1658 1659 def test_as_completed_with_unused_timeout(self): 1660 1661 def gen(): 1662 yield 1663 yield 0 1664 yield 0.01 1665 1666 loop = self.new_test_loop(gen) 1667 1668 a = asyncio.sleep(0.01, 'a') 1669 1670 async def foo(): 1671 for f in asyncio.as_completed([a], timeout=1, loop=loop): 1672 v = await f 1673 self.assertEqual(v, 'a') 1674 1675 with self.assertWarns(DeprecationWarning): 1676 loop.run_until_complete(self.new_task(loop, foo())) 1677 1678 def test_as_completed_reverse_wait(self): 1679 1680 def gen(): 1681 yield 0 1682 yield 0.05 1683 yield 0 1684 1685 loop = self.new_test_loop(gen) 1686 1687 a = asyncio.sleep(0.05, 'a') 1688 b = asyncio.sleep(0.10, 'b') 1689 fs = {a, b} 1690 1691 with self.assertWarns(DeprecationWarning): 1692 futs = list(asyncio.as_completed(fs, loop=loop)) 1693 self.assertEqual(len(futs), 2) 1694 1695 x = loop.run_until_complete(futs[1]) 1696 self.assertEqual(x, 'a') 1697 self.assertAlmostEqual(0.05, loop.time()) 1698 loop.advance_time(0.05) 1699 y = loop.run_until_complete(futs[0]) 1700 self.assertEqual(y, 'b') 1701 self.assertAlmostEqual(0.10, loop.time()) 1702 1703 def test_as_completed_concurrent(self): 1704 1705 def gen(): 1706 when = yield 1707 self.assertAlmostEqual(0.05, when) 1708 when = yield 0 1709 self.assertAlmostEqual(0.05, when) 1710 yield 0.05 1711 1712 loop = self.new_test_loop(gen) 1713 1714 a = asyncio.sleep(0.05, 'a') 1715 b = asyncio.sleep(0.05, 'b') 1716 fs = {a, b} 1717 with self.assertWarns(DeprecationWarning): 1718 futs = list(asyncio.as_completed(fs, loop=loop)) 1719 self.assertEqual(len(futs), 2) 1720 waiter = asyncio.wait(futs) 1721 # Deprecation from passing coros in futs to asyncio.wait() 1722 with self.assertWarns(DeprecationWarning): 1723 done, pending = loop.run_until_complete(waiter) 1724 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1725 1726 def test_as_completed_duplicate_coroutines(self): 1727 1728 with self.assertWarns(DeprecationWarning): 1729 @asyncio.coroutine 1730 def coro(s): 1731 return s 1732 1733 with self.assertWarns(DeprecationWarning): 1734 @asyncio.coroutine 1735 def runner(): 1736 result = [] 1737 c = coro('ham') 1738 for f in asyncio.as_completed([c, c, coro('spam')], 1739 loop=self.loop): 1740 result.append((yield from f)) 1741 return result 1742 1743 with self.assertWarns(DeprecationWarning): 1744 fut = self.new_task(self.loop, runner()) 1745 self.loop.run_until_complete(fut) 1746 result = fut.result() 1747 self.assertEqual(set(result), {'ham', 'spam'}) 1748 self.assertEqual(len(result), 2) 1749 1750 def test_sleep(self): 1751 1752 def gen(): 1753 when = yield 1754 self.assertAlmostEqual(0.05, when) 1755 when = yield 0.05 1756 self.assertAlmostEqual(0.1, when) 1757 yield 0.05 1758 1759 loop = self.new_test_loop(gen) 1760 1761 async def sleeper(dt, arg): 1762 await asyncio.sleep(dt/2) 1763 res = await asyncio.sleep(dt/2, arg) 1764 return res 1765 1766 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1767 loop.run_until_complete(t) 1768 self.assertTrue(t.done()) 1769 self.assertEqual(t.result(), 'yeah') 1770 self.assertAlmostEqual(0.1, loop.time()) 1771 1772 def test_sleep_cancel(self): 1773 1774 def gen(): 1775 when = yield 1776 self.assertAlmostEqual(10.0, when) 1777 yield 0 1778 1779 loop = self.new_test_loop(gen) 1780 1781 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1782 1783 handle = None 1784 orig_call_later = loop.call_later 1785 1786 def call_later(delay, callback, *args): 1787 nonlocal handle 1788 handle = orig_call_later(delay, callback, *args) 1789 return handle 1790 1791 loop.call_later = call_later 1792 test_utils.run_briefly(loop) 1793 1794 self.assertFalse(handle._cancelled) 1795 1796 t.cancel() 1797 test_utils.run_briefly(loop) 1798 self.assertTrue(handle._cancelled) 1799 1800 def test_task_cancel_sleeping_task(self): 1801 1802 def gen(): 1803 when = yield 1804 self.assertAlmostEqual(0.1, when) 1805 when = yield 0 1806 self.assertAlmostEqual(5000, when) 1807 yield 0.1 1808 1809 loop = self.new_test_loop(gen) 1810 1811 async def sleep(dt): 1812 await asyncio.sleep(dt) 1813 1814 async def doit(): 1815 sleeper = self.new_task(loop, sleep(5000)) 1816 loop.call_later(0.1, sleeper.cancel) 1817 try: 1818 await sleeper 1819 except asyncio.CancelledError: 1820 return 'cancelled' 1821 else: 1822 return 'slept in' 1823 1824 doer = doit() 1825 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1826 self.assertAlmostEqual(0.1, loop.time()) 1827 1828 def test_task_cancel_waiter_future(self): 1829 fut = self.new_future(self.loop) 1830 1831 async def coro(): 1832 await fut 1833 1834 task = self.new_task(self.loop, coro()) 1835 test_utils.run_briefly(self.loop) 1836 self.assertIs(task._fut_waiter, fut) 1837 1838 task.cancel() 1839 test_utils.run_briefly(self.loop) 1840 self.assertRaises( 1841 asyncio.CancelledError, self.loop.run_until_complete, task) 1842 self.assertIsNone(task._fut_waiter) 1843 self.assertTrue(fut.cancelled()) 1844 1845 def test_task_set_methods(self): 1846 async def notmuch(): 1847 return 'ko' 1848 1849 gen = notmuch() 1850 task = self.new_task(self.loop, gen) 1851 1852 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1853 task.set_result('ok') 1854 1855 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1856 task.set_exception(ValueError()) 1857 1858 self.assertEqual( 1859 self.loop.run_until_complete(task), 1860 'ko') 1861 1862 def test_step_result(self): 1863 with self.assertWarns(DeprecationWarning): 1864 @asyncio.coroutine 1865 def notmuch(): 1866 yield None 1867 yield 1 1868 return 'ko' 1869 1870 self.assertRaises( 1871 RuntimeError, self.loop.run_until_complete, notmuch()) 1872 1873 def test_step_result_future(self): 1874 # If coroutine returns future, task waits on this future. 1875 1876 class Fut(asyncio.Future): 1877 def __init__(self, *args, **kwds): 1878 self.cb_added = False 1879 super().__init__(*args, **kwds) 1880 1881 def add_done_callback(self, *args, **kwargs): 1882 self.cb_added = True 1883 super().add_done_callback(*args, **kwargs) 1884 1885 fut = Fut(loop=self.loop) 1886 result = None 1887 1888 async def wait_for_future(): 1889 nonlocal result 1890 result = await fut 1891 1892 t = self.new_task(self.loop, wait_for_future()) 1893 test_utils.run_briefly(self.loop) 1894 self.assertTrue(fut.cb_added) 1895 1896 res = object() 1897 fut.set_result(res) 1898 test_utils.run_briefly(self.loop) 1899 self.assertIs(res, result) 1900 self.assertTrue(t.done()) 1901 self.assertIsNone(t.result()) 1902 1903 def test_baseexception_during_cancel(self): 1904 1905 def gen(): 1906 when = yield 1907 self.assertAlmostEqual(10.0, when) 1908 yield 0 1909 1910 loop = self.new_test_loop(gen) 1911 1912 async def sleeper(): 1913 await asyncio.sleep(10) 1914 1915 base_exc = SystemExit() 1916 1917 async def notmutch(): 1918 try: 1919 await sleeper() 1920 except asyncio.CancelledError: 1921 raise base_exc 1922 1923 task = self.new_task(loop, notmutch()) 1924 test_utils.run_briefly(loop) 1925 1926 task.cancel() 1927 self.assertFalse(task.done()) 1928 1929 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1930 1931 self.assertTrue(task.done()) 1932 self.assertFalse(task.cancelled()) 1933 self.assertIs(task.exception(), base_exc) 1934 1935 def test_iscoroutinefunction(self): 1936 def fn(): 1937 pass 1938 1939 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1940 1941 def fn1(): 1942 yield 1943 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1944 1945 with self.assertWarns(DeprecationWarning): 1946 @asyncio.coroutine 1947 def fn2(): 1948 yield 1949 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1950 1951 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1952 1953 def test_yield_vs_yield_from(self): 1954 fut = self.new_future(self.loop) 1955 1956 with self.assertWarns(DeprecationWarning): 1957 @asyncio.coroutine 1958 def wait_for_future(): 1959 yield fut 1960 1961 task = wait_for_future() 1962 with self.assertRaises(RuntimeError): 1963 self.loop.run_until_complete(task) 1964 1965 self.assertFalse(fut.done()) 1966 1967 def test_yield_vs_yield_from_generator(self): 1968 with self.assertWarns(DeprecationWarning): 1969 @asyncio.coroutine 1970 def coro(): 1971 yield 1972 1973 with self.assertWarns(DeprecationWarning): 1974 @asyncio.coroutine 1975 def wait_for_future(): 1976 gen = coro() 1977 try: 1978 yield gen 1979 finally: 1980 gen.close() 1981 1982 task = wait_for_future() 1983 self.assertRaises( 1984 RuntimeError, 1985 self.loop.run_until_complete, task) 1986 1987 def test_coroutine_non_gen_function(self): 1988 with self.assertWarns(DeprecationWarning): 1989 @asyncio.coroutine 1990 def func(): 1991 return 'test' 1992 1993 self.assertTrue(asyncio.iscoroutinefunction(func)) 1994 1995 coro = func() 1996 self.assertTrue(asyncio.iscoroutine(coro)) 1997 1998 res = self.loop.run_until_complete(coro) 1999 self.assertEqual(res, 'test') 2000 2001 def test_coroutine_non_gen_function_return_future(self): 2002 fut = self.new_future(self.loop) 2003 2004 with self.assertWarns(DeprecationWarning): 2005 @asyncio.coroutine 2006 def func(): 2007 return fut 2008 2009 async def coro(): 2010 fut.set_result('test') 2011 2012 t1 = self.new_task(self.loop, func()) 2013 t2 = self.new_task(self.loop, coro()) 2014 res = self.loop.run_until_complete(t1) 2015 self.assertEqual(res, 'test') 2016 self.assertIsNone(t2.result()) 2017 2018 def test_current_task(self): 2019 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2020 2021 async def coro(loop): 2022 self.assertIs(asyncio.current_task(loop=loop), task) 2023 2024 self.assertIs(asyncio.current_task(None), task) 2025 self.assertIs(asyncio.current_task(), task) 2026 2027 task = self.new_task(self.loop, coro(self.loop)) 2028 self.loop.run_until_complete(task) 2029 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2030 2031 def test_current_task_with_interleaving_tasks(self): 2032 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2033 2034 fut1 = self.new_future(self.loop) 2035 fut2 = self.new_future(self.loop) 2036 2037 async def coro1(loop): 2038 self.assertTrue(asyncio.current_task(loop=loop) is task1) 2039 await fut1 2040 self.assertTrue(asyncio.current_task(loop=loop) is task1) 2041 fut2.set_result(True) 2042 2043 async def coro2(loop): 2044 self.assertTrue(asyncio.current_task(loop=loop) is task2) 2045 fut1.set_result(True) 2046 await fut2 2047 self.assertTrue(asyncio.current_task(loop=loop) is task2) 2048 2049 task1 = self.new_task(self.loop, coro1(self.loop)) 2050 task2 = self.new_task(self.loop, coro2(self.loop)) 2051 2052 self.loop.run_until_complete(asyncio.wait((task1, task2))) 2053 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2054 2055 # Some thorough tests for cancellation propagation through 2056 # coroutines, tasks and wait(). 2057 2058 def test_yield_future_passes_cancel(self): 2059 # Cancelling outer() cancels inner() cancels waiter. 2060 proof = 0 2061 waiter = self.new_future(self.loop) 2062 2063 async def inner(): 2064 nonlocal proof 2065 try: 2066 await waiter 2067 except asyncio.CancelledError: 2068 proof += 1 2069 raise 2070 else: 2071 self.fail('got past sleep() in inner()') 2072 2073 async def outer(): 2074 nonlocal proof 2075 try: 2076 await inner() 2077 except asyncio.CancelledError: 2078 proof += 100 # Expect this path. 2079 else: 2080 proof += 10 2081 2082 f = asyncio.ensure_future(outer(), loop=self.loop) 2083 test_utils.run_briefly(self.loop) 2084 f.cancel() 2085 self.loop.run_until_complete(f) 2086 self.assertEqual(proof, 101) 2087 self.assertTrue(waiter.cancelled()) 2088 2089 def test_yield_wait_does_not_shield_cancel(self): 2090 # Cancelling outer() makes wait() return early, leaves inner() 2091 # running. 2092 proof = 0 2093 waiter = self.new_future(self.loop) 2094 2095 async def inner(): 2096 nonlocal proof 2097 await waiter 2098 proof += 1 2099 2100 async def outer(): 2101 nonlocal proof 2102 with self.assertWarns(DeprecationWarning): 2103 d, p = await asyncio.wait([inner()]) 2104 proof += 100 2105 2106 f = asyncio.ensure_future(outer(), loop=self.loop) 2107 test_utils.run_briefly(self.loop) 2108 f.cancel() 2109 self.assertRaises( 2110 asyncio.CancelledError, self.loop.run_until_complete, f) 2111 waiter.set_result(None) 2112 test_utils.run_briefly(self.loop) 2113 self.assertEqual(proof, 1) 2114 2115 def test_shield_result(self): 2116 inner = self.new_future(self.loop) 2117 outer = asyncio.shield(inner) 2118 inner.set_result(42) 2119 res = self.loop.run_until_complete(outer) 2120 self.assertEqual(res, 42) 2121 2122 def test_shield_exception(self): 2123 inner = self.new_future(self.loop) 2124 outer = asyncio.shield(inner) 2125 test_utils.run_briefly(self.loop) 2126 exc = RuntimeError('expected') 2127 inner.set_exception(exc) 2128 test_utils.run_briefly(self.loop) 2129 self.assertIs(outer.exception(), exc) 2130 2131 def test_shield_cancel_inner(self): 2132 inner = self.new_future(self.loop) 2133 outer = asyncio.shield(inner) 2134 test_utils.run_briefly(self.loop) 2135 inner.cancel() 2136 test_utils.run_briefly(self.loop) 2137 self.assertTrue(outer.cancelled()) 2138 2139 def test_shield_cancel_outer(self): 2140 inner = self.new_future(self.loop) 2141 outer = asyncio.shield(inner) 2142 test_utils.run_briefly(self.loop) 2143 outer.cancel() 2144 test_utils.run_briefly(self.loop) 2145 self.assertTrue(outer.cancelled()) 2146 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 2147 2148 def test_shield_shortcut(self): 2149 fut = self.new_future(self.loop) 2150 fut.set_result(42) 2151 res = self.loop.run_until_complete(asyncio.shield(fut)) 2152 self.assertEqual(res, 42) 2153 2154 def test_shield_effect(self): 2155 # Cancelling outer() does not affect inner(). 2156 proof = 0 2157 waiter = self.new_future(self.loop) 2158 2159 async def inner(): 2160 nonlocal proof 2161 await waiter 2162 proof += 1 2163 2164 async def outer(): 2165 nonlocal proof 2166 await asyncio.shield(inner()) 2167 proof += 100 2168 2169 f = asyncio.ensure_future(outer(), loop=self.loop) 2170 test_utils.run_briefly(self.loop) 2171 f.cancel() 2172 with self.assertRaises(asyncio.CancelledError): 2173 self.loop.run_until_complete(f) 2174 waiter.set_result(None) 2175 test_utils.run_briefly(self.loop) 2176 self.assertEqual(proof, 1) 2177 2178 def test_shield_gather(self): 2179 child1 = self.new_future(self.loop) 2180 child2 = self.new_future(self.loop) 2181 parent = asyncio.gather(child1, child2) 2182 outer = asyncio.shield(parent) 2183 test_utils.run_briefly(self.loop) 2184 outer.cancel() 2185 test_utils.run_briefly(self.loop) 2186 self.assertTrue(outer.cancelled()) 2187 child1.set_result(1) 2188 child2.set_result(2) 2189 test_utils.run_briefly(self.loop) 2190 self.assertEqual(parent.result(), [1, 2]) 2191 2192 def test_gather_shield(self): 2193 child1 = self.new_future(self.loop) 2194 child2 = self.new_future(self.loop) 2195 inner1 = asyncio.shield(child1) 2196 inner2 = asyncio.shield(child2) 2197 parent = asyncio.gather(inner1, inner2) 2198 test_utils.run_briefly(self.loop) 2199 parent.cancel() 2200 # This should cancel inner1 and inner2 but bot child1 and child2. 2201 test_utils.run_briefly(self.loop) 2202 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 2203 self.assertTrue(inner1.cancelled()) 2204 self.assertTrue(inner2.cancelled()) 2205 child1.set_result(1) 2206 child2.set_result(2) 2207 test_utils.run_briefly(self.loop) 2208 2209 def test_as_completed_invalid_args(self): 2210 fut = self.new_future(self.loop) 2211 2212 # as_completed() expects a list of futures, not a future instance 2213 self.assertRaises(TypeError, self.loop.run_until_complete, 2214 asyncio.as_completed(fut, loop=self.loop)) 2215 coro = coroutine_function() 2216 self.assertRaises(TypeError, self.loop.run_until_complete, 2217 asyncio.as_completed(coro, loop=self.loop)) 2218 coro.close() 2219 2220 def test_wait_invalid_args(self): 2221 fut = self.new_future(self.loop) 2222 2223 # wait() expects a list of futures, not a future instance 2224 self.assertRaises(TypeError, self.loop.run_until_complete, 2225 asyncio.wait(fut)) 2226 coro = coroutine_function() 2227 self.assertRaises(TypeError, self.loop.run_until_complete, 2228 asyncio.wait(coro)) 2229 coro.close() 2230 2231 # wait() expects at least a future 2232 self.assertRaises(ValueError, self.loop.run_until_complete, 2233 asyncio.wait([])) 2234 2235 def test_corowrapper_mocks_generator(self): 2236 2237 def check(): 2238 # A function that asserts various things. 2239 # Called twice, with different debug flag values. 2240 2241 with self.assertWarns(DeprecationWarning): 2242 @asyncio.coroutine 2243 def coro(): 2244 # The actual coroutine. 2245 self.assertTrue(gen.gi_running) 2246 yield from fut 2247 2248 # A completed Future used to run the coroutine. 2249 fut = self.new_future(self.loop) 2250 fut.set_result(None) 2251 2252 # Call the coroutine. 2253 gen = coro() 2254 2255 # Check some properties. 2256 self.assertTrue(asyncio.iscoroutine(gen)) 2257 self.assertIsInstance(gen.gi_frame, types.FrameType) 2258 self.assertFalse(gen.gi_running) 2259 self.assertIsInstance(gen.gi_code, types.CodeType) 2260 2261 # Run it. 2262 self.loop.run_until_complete(gen) 2263 2264 # The frame should have changed. 2265 self.assertIsNone(gen.gi_frame) 2266 2267 # Test with debug flag cleared. 2268 with set_coroutine_debug(False): 2269 check() 2270 2271 # Test with debug flag set. 2272 with set_coroutine_debug(True): 2273 check() 2274 2275 def test_yield_from_corowrapper(self): 2276 with set_coroutine_debug(True): 2277 with self.assertWarns(DeprecationWarning): 2278 @asyncio.coroutine 2279 def t1(): 2280 return (yield from t2()) 2281 2282 with self.assertWarns(DeprecationWarning): 2283 @asyncio.coroutine 2284 def t2(): 2285 f = self.new_future(self.loop) 2286 self.new_task(self.loop, t3(f)) 2287 return (yield from f) 2288 2289 with self.assertWarns(DeprecationWarning): 2290 @asyncio.coroutine 2291 def t3(f): 2292 f.set_result((1, 2, 3)) 2293 2294 task = self.new_task(self.loop, t1()) 2295 val = self.loop.run_until_complete(task) 2296 self.assertEqual(val, (1, 2, 3)) 2297 2298 def test_yield_from_corowrapper_send(self): 2299 def foo(): 2300 a = yield 2301 return a 2302 2303 def call(arg): 2304 cw = asyncio.coroutines.CoroWrapper(foo()) 2305 cw.send(None) 2306 try: 2307 cw.send(arg) 2308 except StopIteration as ex: 2309 return ex.args[0] 2310 else: 2311 raise AssertionError('StopIteration was expected') 2312 2313 self.assertEqual(call((1, 2)), (1, 2)) 2314 self.assertEqual(call('spam'), 'spam') 2315 2316 def test_corowrapper_weakref(self): 2317 wd = weakref.WeakValueDictionary() 2318 def foo(): yield from [] 2319 cw = asyncio.coroutines.CoroWrapper(foo()) 2320 wd['cw'] = cw # Would fail without __weakref__ slot. 2321 cw.gen = None # Suppress warning from __del__. 2322 2323 def test_corowrapper_throw(self): 2324 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 2325 def foo(): 2326 value = None 2327 while True: 2328 try: 2329 value = yield value 2330 except Exception as e: 2331 value = e 2332 2333 exception = Exception("foo") 2334 cw = asyncio.coroutines.CoroWrapper(foo()) 2335 cw.send(None) 2336 self.assertIs(exception, cw.throw(exception)) 2337 2338 cw = asyncio.coroutines.CoroWrapper(foo()) 2339 cw.send(None) 2340 self.assertIs(exception, cw.throw(Exception, exception)) 2341 2342 cw = asyncio.coroutines.CoroWrapper(foo()) 2343 cw.send(None) 2344 exception = cw.throw(Exception, "foo") 2345 self.assertIsInstance(exception, Exception) 2346 self.assertEqual(exception.args, ("foo", )) 2347 2348 cw = asyncio.coroutines.CoroWrapper(foo()) 2349 cw.send(None) 2350 exception = cw.throw(Exception, "foo", None) 2351 self.assertIsInstance(exception, Exception) 2352 self.assertEqual(exception.args, ("foo", )) 2353 2354 def test_log_destroyed_pending_task(self): 2355 Task = self.__class__.Task 2356 2357 with self.assertWarns(DeprecationWarning): 2358 @asyncio.coroutine 2359 def kill_me(loop): 2360 future = self.new_future(loop) 2361 yield from future 2362 # at this point, the only reference to kill_me() task is 2363 # the Task._wakeup() method in future._callbacks 2364 raise Exception("code never reached") 2365 2366 mock_handler = mock.Mock() 2367 self.loop.set_debug(True) 2368 self.loop.set_exception_handler(mock_handler) 2369 2370 # schedule the task 2371 coro = kill_me(self.loop) 2372 task = asyncio.ensure_future(coro, loop=self.loop) 2373 2374 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2375 2376 asyncio.set_event_loop(None) 2377 2378 # execute the task so it waits for future 2379 self.loop._run_once() 2380 self.assertEqual(len(self.loop._ready), 0) 2381 2382 # remove the future used in kill_me(), and references to the task 2383 del coro.gi_frame.f_locals['future'] 2384 coro = None 2385 source_traceback = task._source_traceback 2386 task = None 2387 2388 # no more reference to kill_me() task: the task is destroyed by the GC 2389 support.gc_collect() 2390 2391 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2392 2393 mock_handler.assert_called_with(self.loop, { 2394 'message': 'Task was destroyed but it is pending!', 2395 'task': mock.ANY, 2396 'source_traceback': source_traceback, 2397 }) 2398 mock_handler.reset_mock() 2399 2400 @mock.patch('asyncio.base_events.logger') 2401 def test_tb_logger_not_called_after_cancel(self, m_log): 2402 loop = asyncio.new_event_loop() 2403 self.set_event_loop(loop) 2404 2405 async def coro(): 2406 raise TypeError 2407 2408 async def runner(): 2409 task = self.new_task(loop, coro()) 2410 await asyncio.sleep(0.05) 2411 task.cancel() 2412 task = None 2413 2414 loop.run_until_complete(runner()) 2415 self.assertFalse(m_log.error.called) 2416 2417 @mock.patch('asyncio.coroutines.logger') 2418 def test_coroutine_never_yielded(self, m_log): 2419 with set_coroutine_debug(True): 2420 with self.assertWarns(DeprecationWarning): 2421 @asyncio.coroutine 2422 def coro_noop(): 2423 pass 2424 2425 tb_filename = __file__ 2426 tb_lineno = sys._getframe().f_lineno + 2 2427 # create a coroutine object but don't use it 2428 coro_noop() 2429 support.gc_collect() 2430 2431 self.assertTrue(m_log.error.called) 2432 message = m_log.error.call_args[0][0] 2433 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2434 2435 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2436 r'was never yielded from\n' 2437 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2438 r'.*\n' 2439 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2440 r' coro_noop\(\)$' 2441 % (re.escape(coro_noop.__qualname__), 2442 re.escape(func_filename), func_lineno, 2443 re.escape(tb_filename), tb_lineno)) 2444 2445 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2446 2447 def test_return_coroutine_from_coroutine(self): 2448 """Return of @asyncio.coroutine()-wrapped function generator object 2449 from @asyncio.coroutine()-wrapped function should have same effect as 2450 returning generator object or Future.""" 2451 def check(): 2452 with self.assertWarns(DeprecationWarning): 2453 @asyncio.coroutine 2454 def outer_coro(): 2455 with self.assertWarns(DeprecationWarning): 2456 @asyncio.coroutine 2457 def inner_coro(): 2458 return 1 2459 2460 return inner_coro() 2461 2462 result = self.loop.run_until_complete(outer_coro()) 2463 self.assertEqual(result, 1) 2464 2465 # Test with debug flag cleared. 2466 with set_coroutine_debug(False): 2467 check() 2468 2469 # Test with debug flag set. 2470 with set_coroutine_debug(True): 2471 check() 2472 2473 def test_task_source_traceback(self): 2474 self.loop.set_debug(True) 2475 2476 task = self.new_task(self.loop, coroutine_function()) 2477 lineno = sys._getframe().f_lineno - 1 2478 self.assertIsInstance(task._source_traceback, list) 2479 self.assertEqual(task._source_traceback[-2][:3], 2480 (__file__, 2481 lineno, 2482 'test_task_source_traceback')) 2483 self.loop.run_until_complete(task) 2484 2485 def _test_cancel_wait_for(self, timeout): 2486 loop = asyncio.new_event_loop() 2487 self.addCleanup(loop.close) 2488 2489 async def blocking_coroutine(): 2490 fut = self.new_future(loop) 2491 # Block: fut result is never set 2492 await fut 2493 2494 task = loop.create_task(blocking_coroutine()) 2495 2496 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2497 loop.call_soon(wait.cancel) 2498 2499 self.assertRaises(asyncio.CancelledError, 2500 loop.run_until_complete, wait) 2501 2502 # Python issue #23219: cancelling the wait must also cancel the task 2503 self.assertTrue(task.cancelled()) 2504 2505 def test_cancel_blocking_wait_for(self): 2506 self._test_cancel_wait_for(None) 2507 2508 def test_cancel_wait_for(self): 2509 self._test_cancel_wait_for(60.0) 2510 2511 def test_cancel_gather_1(self): 2512 """Ensure that a gathering future refuses to be cancelled once all 2513 children are done""" 2514 loop = asyncio.new_event_loop() 2515 self.addCleanup(loop.close) 2516 2517 fut = self.new_future(loop) 2518 # The indirection fut->child_coro is needed since otherwise the 2519 # gathering task is done at the same time as the child future 2520 def child_coro(): 2521 return (yield from fut) 2522 with self.assertWarns(DeprecationWarning): 2523 gather_future = asyncio.gather(child_coro(), loop=loop) 2524 gather_task = asyncio.ensure_future(gather_future, loop=loop) 2525 2526 cancel_result = None 2527 def cancelling_callback(_): 2528 nonlocal cancel_result 2529 cancel_result = gather_task.cancel() 2530 fut.add_done_callback(cancelling_callback) 2531 2532 fut.set_result(42) # calls the cancelling_callback after fut is done() 2533 2534 # At this point the task should complete. 2535 loop.run_until_complete(gather_task) 2536 2537 # Python issue #26923: asyncio.gather drops cancellation 2538 self.assertEqual(cancel_result, False) 2539 self.assertFalse(gather_task.cancelled()) 2540 self.assertEqual(gather_task.result(), [42]) 2541 2542 def test_cancel_gather_2(self): 2543 cases = [ 2544 ((), ()), 2545 ((None,), ()), 2546 (('my message',), ('my message',)), 2547 # Non-string values should roundtrip. 2548 ((5,), (5,)), 2549 ] 2550 for cancel_args, expected_args in cases: 2551 with self.subTest(cancel_args=cancel_args): 2552 loop = asyncio.new_event_loop() 2553 self.addCleanup(loop.close) 2554 2555 async def test(): 2556 time = 0 2557 while True: 2558 time += 0.05 2559 with self.assertWarns(DeprecationWarning): 2560 await asyncio.gather(asyncio.sleep(0.05), 2561 return_exceptions=True, 2562 loop=loop) 2563 if time > 1: 2564 return 2565 2566 async def main(): 2567 qwe = self.new_task(loop, test()) 2568 await asyncio.sleep(0.2) 2569 qwe.cancel(*cancel_args) 2570 await qwe 2571 2572 try: 2573 loop.run_until_complete(main()) 2574 except asyncio.CancelledError as exc: 2575 self.assertEqual(exc.args, ()) 2576 exc_type, exc_args, depth = get_innermost_context(exc) 2577 self.assertEqual((exc_type, exc_args), 2578 (asyncio.CancelledError, expected_args)) 2579 # The exact traceback seems to vary in CI. 2580 self.assertIn(depth, (2, 3)) 2581 else: 2582 self.fail('gather did not propagate the cancellation ' 2583 'request') 2584 2585 def test_exception_traceback(self): 2586 # See http://bugs.python.org/issue28843 2587 2588 async def foo(): 2589 1 / 0 2590 2591 async def main(): 2592 task = self.new_task(self.loop, foo()) 2593 await asyncio.sleep(0) # skip one loop iteration 2594 self.assertIsNotNone(task.exception().__traceback__) 2595 2596 self.loop.run_until_complete(main()) 2597 2598 @mock.patch('asyncio.base_events.logger') 2599 def test_error_in_call_soon(self, m_log): 2600 def call_soon(callback, *args, **kwargs): 2601 raise ValueError 2602 self.loop.call_soon = call_soon 2603 2604 with self.assertWarns(DeprecationWarning): 2605 @asyncio.coroutine 2606 def coro(): 2607 pass 2608 2609 self.assertFalse(m_log.error.called) 2610 2611 with self.assertRaises(ValueError): 2612 gen = coro() 2613 try: 2614 self.new_task(self.loop, gen) 2615 finally: 2616 gen.close() 2617 gc.collect() # For PyPy or other GCs. 2618 2619 self.assertTrue(m_log.error.called) 2620 message = m_log.error.call_args[0][0] 2621 self.assertIn('Task was destroyed but it is pending', message) 2622 2623 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2624 2625 def test_create_task_with_noncoroutine(self): 2626 with self.assertRaisesRegex(TypeError, 2627 "a coroutine was expected, got 123"): 2628 self.new_task(self.loop, 123) 2629 2630 # test it for the second time to ensure that caching 2631 # in asyncio.iscoroutine() doesn't break things. 2632 with self.assertRaisesRegex(TypeError, 2633 "a coroutine was expected, got 123"): 2634 self.new_task(self.loop, 123) 2635 2636 def test_create_task_with_oldstyle_coroutine(self): 2637 2638 with self.assertWarns(DeprecationWarning): 2639 @asyncio.coroutine 2640 def coro(): 2641 pass 2642 2643 task = self.new_task(self.loop, coro()) 2644 self.assertIsInstance(task, self.Task) 2645 self.loop.run_until_complete(task) 2646 2647 # test it for the second time to ensure that caching 2648 # in asyncio.iscoroutine() doesn't break things. 2649 task = self.new_task(self.loop, coro()) 2650 self.assertIsInstance(task, self.Task) 2651 self.loop.run_until_complete(task) 2652 2653 def test_create_task_with_async_function(self): 2654 2655 async def coro(): 2656 pass 2657 2658 task = self.new_task(self.loop, coro()) 2659 self.assertIsInstance(task, self.Task) 2660 self.loop.run_until_complete(task) 2661 2662 # test it for the second time to ensure that caching 2663 # in asyncio.iscoroutine() doesn't break things. 2664 task = self.new_task(self.loop, coro()) 2665 self.assertIsInstance(task, self.Task) 2666 self.loop.run_until_complete(task) 2667 2668 def test_create_task_with_asynclike_function(self): 2669 task = self.new_task(self.loop, CoroLikeObject()) 2670 self.assertIsInstance(task, self.Task) 2671 self.assertEqual(self.loop.run_until_complete(task), 42) 2672 2673 # test it for the second time to ensure that caching 2674 # in asyncio.iscoroutine() doesn't break things. 2675 task = self.new_task(self.loop, CoroLikeObject()) 2676 self.assertIsInstance(task, self.Task) 2677 self.assertEqual(self.loop.run_until_complete(task), 42) 2678 2679 def test_bare_create_task(self): 2680 2681 async def inner(): 2682 return 1 2683 2684 async def coro(): 2685 task = asyncio.create_task(inner()) 2686 self.assertIsInstance(task, self.Task) 2687 ret = await task 2688 self.assertEqual(1, ret) 2689 2690 self.loop.run_until_complete(coro()) 2691 2692 def test_bare_create_named_task(self): 2693 2694 async def coro_noop(): 2695 pass 2696 2697 async def coro(): 2698 task = asyncio.create_task(coro_noop(), name='No-op') 2699 self.assertEqual(task.get_name(), 'No-op') 2700 await task 2701 2702 self.loop.run_until_complete(coro()) 2703 2704 def test_context_1(self): 2705 cvar = contextvars.ContextVar('cvar', default='nope') 2706 2707 async def sub(): 2708 await asyncio.sleep(0.01) 2709 self.assertEqual(cvar.get(), 'nope') 2710 cvar.set('something else') 2711 2712 async def main(): 2713 self.assertEqual(cvar.get(), 'nope') 2714 subtask = self.new_task(loop, sub()) 2715 cvar.set('yes') 2716 self.assertEqual(cvar.get(), 'yes') 2717 await subtask 2718 self.assertEqual(cvar.get(), 'yes') 2719 2720 loop = asyncio.new_event_loop() 2721 try: 2722 task = self.new_task(loop, main()) 2723 loop.run_until_complete(task) 2724 finally: 2725 loop.close() 2726 2727 def test_context_2(self): 2728 cvar = contextvars.ContextVar('cvar', default='nope') 2729 2730 async def main(): 2731 def fut_on_done(fut): 2732 # This change must not pollute the context 2733 # of the "main()" task. 2734 cvar.set('something else') 2735 2736 self.assertEqual(cvar.get(), 'nope') 2737 2738 for j in range(2): 2739 fut = self.new_future(loop) 2740 fut.add_done_callback(fut_on_done) 2741 cvar.set(f'yes{j}') 2742 loop.call_soon(fut.set_result, None) 2743 await fut 2744 self.assertEqual(cvar.get(), f'yes{j}') 2745 2746 for i in range(3): 2747 # Test that task passed its context to add_done_callback: 2748 cvar.set(f'yes{i}-{j}') 2749 await asyncio.sleep(0.001) 2750 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2751 2752 loop = asyncio.new_event_loop() 2753 try: 2754 task = self.new_task(loop, main()) 2755 loop.run_until_complete(task) 2756 finally: 2757 loop.close() 2758 2759 self.assertEqual(cvar.get(), 'nope') 2760 2761 def test_context_3(self): 2762 # Run 100 Tasks in parallel, each modifying cvar. 2763 2764 cvar = contextvars.ContextVar('cvar', default=-1) 2765 2766 async def sub(num): 2767 for i in range(10): 2768 cvar.set(num + i) 2769 await asyncio.sleep(random.uniform(0.001, 0.05)) 2770 self.assertEqual(cvar.get(), num + i) 2771 2772 async def main(): 2773 tasks = [] 2774 for i in range(100): 2775 task = loop.create_task(sub(random.randint(0, 10))) 2776 tasks.append(task) 2777 2778 await asyncio.gather(*tasks) 2779 2780 loop = asyncio.new_event_loop() 2781 try: 2782 loop.run_until_complete(main()) 2783 finally: 2784 loop.close() 2785 2786 self.assertEqual(cvar.get(), -1) 2787 2788 def test_get_coro(self): 2789 loop = asyncio.new_event_loop() 2790 coro = coroutine_function() 2791 try: 2792 task = self.new_task(loop, coro) 2793 loop.run_until_complete(task) 2794 self.assertIs(task.get_coro(), coro) 2795 finally: 2796 loop.close() 2797 2798 2799def add_subclass_tests(cls): 2800 BaseTask = cls.Task 2801 BaseFuture = cls.Future 2802 2803 if BaseTask is None or BaseFuture is None: 2804 return cls 2805 2806 class CommonFuture: 2807 def __init__(self, *args, **kwargs): 2808 self.calls = collections.defaultdict(lambda: 0) 2809 super().__init__(*args, **kwargs) 2810 2811 def add_done_callback(self, *args, **kwargs): 2812 self.calls['add_done_callback'] += 1 2813 return super().add_done_callback(*args, **kwargs) 2814 2815 class Task(CommonFuture, BaseTask): 2816 pass 2817 2818 class Future(CommonFuture, BaseFuture): 2819 pass 2820 2821 def test_subclasses_ctask_cfuture(self): 2822 fut = self.Future(loop=self.loop) 2823 2824 async def func(): 2825 self.loop.call_soon(lambda: fut.set_result('spam')) 2826 return await fut 2827 2828 task = self.Task(func(), loop=self.loop) 2829 2830 result = self.loop.run_until_complete(task) 2831 2832 self.assertEqual(result, 'spam') 2833 2834 self.assertEqual( 2835 dict(task.calls), 2836 {'add_done_callback': 1}) 2837 2838 self.assertEqual( 2839 dict(fut.calls), 2840 {'add_done_callback': 1}) 2841 2842 # Add patched Task & Future back to the test case 2843 cls.Task = Task 2844 cls.Future = Future 2845 2846 # Add an extra unit-test 2847 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2848 2849 # Disable the "test_task_source_traceback" test 2850 # (the test is hardcoded for a particular call stack, which 2851 # is slightly different for Task subclasses) 2852 cls.test_task_source_traceback = None 2853 2854 return cls 2855 2856 2857class SetMethodsTest: 2858 2859 def test_set_result_causes_invalid_state(self): 2860 Future = type(self).Future 2861 self.loop.call_exception_handler = exc_handler = mock.Mock() 2862 2863 async def foo(): 2864 await asyncio.sleep(0.1) 2865 return 10 2866 2867 coro = foo() 2868 task = self.new_task(self.loop, coro) 2869 Future.set_result(task, 'spam') 2870 2871 self.assertEqual( 2872 self.loop.run_until_complete(task), 2873 'spam') 2874 2875 exc_handler.assert_called_once() 2876 exc = exc_handler.call_args[0][0]['exception'] 2877 with self.assertRaisesRegex(asyncio.InvalidStateError, 2878 r'step\(\): already done'): 2879 raise exc 2880 2881 coro.close() 2882 2883 def test_set_exception_causes_invalid_state(self): 2884 class MyExc(Exception): 2885 pass 2886 2887 Future = type(self).Future 2888 self.loop.call_exception_handler = exc_handler = mock.Mock() 2889 2890 async def foo(): 2891 await asyncio.sleep(0.1) 2892 return 10 2893 2894 coro = foo() 2895 task = self.new_task(self.loop, coro) 2896 Future.set_exception(task, MyExc()) 2897 2898 with self.assertRaises(MyExc): 2899 self.loop.run_until_complete(task) 2900 2901 exc_handler.assert_called_once() 2902 exc = exc_handler.call_args[0][0]['exception'] 2903 with self.assertRaisesRegex(asyncio.InvalidStateError, 2904 r'step\(\): already done'): 2905 raise exc 2906 2907 coro.close() 2908 2909 2910@unittest.skipUnless(hasattr(futures, '_CFuture') and 2911 hasattr(tasks, '_CTask'), 2912 'requires the C _asyncio module') 2913class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2914 test_utils.TestCase): 2915 2916 Task = getattr(tasks, '_CTask', None) 2917 Future = getattr(futures, '_CFuture', None) 2918 2919 @support.refcount_test 2920 def test_refleaks_in_task___init__(self): 2921 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2922 async def coro(): 2923 pass 2924 task = self.new_task(self.loop, coro()) 2925 self.loop.run_until_complete(task) 2926 refs_before = gettotalrefcount() 2927 for i in range(100): 2928 task.__init__(coro(), loop=self.loop) 2929 self.loop.run_until_complete(task) 2930 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2931 2932 def test_del__log_destroy_pending_segfault(self): 2933 async def coro(): 2934 pass 2935 task = self.new_task(self.loop, coro()) 2936 self.loop.run_until_complete(task) 2937 with self.assertRaises(AttributeError): 2938 del task._log_destroy_pending 2939 2940 2941@unittest.skipUnless(hasattr(futures, '_CFuture') and 2942 hasattr(tasks, '_CTask'), 2943 'requires the C _asyncio module') 2944@add_subclass_tests 2945class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2946 2947 Task = getattr(tasks, '_CTask', None) 2948 Future = getattr(futures, '_CFuture', None) 2949 2950 2951@unittest.skipUnless(hasattr(tasks, '_CTask'), 2952 'requires the C _asyncio module') 2953@add_subclass_tests 2954class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2955 2956 Task = getattr(tasks, '_CTask', None) 2957 Future = futures._PyFuture 2958 2959 2960@unittest.skipUnless(hasattr(futures, '_CFuture'), 2961 'requires the C _asyncio module') 2962@add_subclass_tests 2963class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2964 2965 Future = getattr(futures, '_CFuture', None) 2966 Task = tasks._PyTask 2967 2968 2969@unittest.skipUnless(hasattr(tasks, '_CTask'), 2970 'requires the C _asyncio module') 2971class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2972 2973 Task = getattr(tasks, '_CTask', None) 2974 Future = futures._PyFuture 2975 2976 2977@unittest.skipUnless(hasattr(futures, '_CFuture'), 2978 'requires the C _asyncio module') 2979class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2980 2981 Task = tasks._PyTask 2982 Future = getattr(futures, '_CFuture', None) 2983 2984 2985class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2986 test_utils.TestCase): 2987 2988 Task = tasks._PyTask 2989 Future = futures._PyFuture 2990 2991 2992@add_subclass_tests 2993class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2994 Task = tasks._PyTask 2995 Future = futures._PyFuture 2996 2997 2998@unittest.skipUnless(hasattr(tasks, '_CTask'), 2999 'requires the C _asyncio module') 3000class CTask_Future_Tests(test_utils.TestCase): 3001 3002 def test_foobar(self): 3003 class Fut(asyncio.Future): 3004 @property 3005 def get_loop(self): 3006 raise AttributeError 3007 3008 async def coro(): 3009 await fut 3010 return 'spam' 3011 3012 self.loop = asyncio.new_event_loop() 3013 try: 3014 fut = Fut(loop=self.loop) 3015 self.loop.call_later(0.1, fut.set_result, 1) 3016 task = self.loop.create_task(coro()) 3017 res = self.loop.run_until_complete(task) 3018 finally: 3019 self.loop.close() 3020 3021 self.assertEqual(res, 'spam') 3022 3023 3024class BaseTaskIntrospectionTests: 3025 _register_task = None 3026 _unregister_task = None 3027 _enter_task = None 3028 _leave_task = None 3029 3030 def test__register_task_1(self): 3031 class TaskLike: 3032 @property 3033 def _loop(self): 3034 return loop 3035 3036 def done(self): 3037 return False 3038 3039 task = TaskLike() 3040 loop = mock.Mock() 3041 3042 self.assertEqual(asyncio.all_tasks(loop), set()) 3043 self._register_task(task) 3044 self.assertEqual(asyncio.all_tasks(loop), {task}) 3045 self._unregister_task(task) 3046 3047 def test__register_task_2(self): 3048 class TaskLike: 3049 def get_loop(self): 3050 return loop 3051 3052 def done(self): 3053 return False 3054 3055 task = TaskLike() 3056 loop = mock.Mock() 3057 3058 self.assertEqual(asyncio.all_tasks(loop), set()) 3059 self._register_task(task) 3060 self.assertEqual(asyncio.all_tasks(loop), {task}) 3061 self._unregister_task(task) 3062 3063 def test__register_task_3(self): 3064 class TaskLike: 3065 def get_loop(self): 3066 return loop 3067 3068 def done(self): 3069 return True 3070 3071 task = TaskLike() 3072 loop = mock.Mock() 3073 3074 self.assertEqual(asyncio.all_tasks(loop), set()) 3075 self._register_task(task) 3076 self.assertEqual(asyncio.all_tasks(loop), set()) 3077 self._unregister_task(task) 3078 3079 def test__enter_task(self): 3080 task = mock.Mock() 3081 loop = mock.Mock() 3082 self.assertIsNone(asyncio.current_task(loop)) 3083 self._enter_task(loop, task) 3084 self.assertIs(asyncio.current_task(loop), task) 3085 self._leave_task(loop, task) 3086 3087 def test__enter_task_failure(self): 3088 task1 = mock.Mock() 3089 task2 = mock.Mock() 3090 loop = mock.Mock() 3091 self._enter_task(loop, task1) 3092 with self.assertRaises(RuntimeError): 3093 self._enter_task(loop, task2) 3094 self.assertIs(asyncio.current_task(loop), task1) 3095 self._leave_task(loop, task1) 3096 3097 def test__leave_task(self): 3098 task = mock.Mock() 3099 loop = mock.Mock() 3100 self._enter_task(loop, task) 3101 self._leave_task(loop, task) 3102 self.assertIsNone(asyncio.current_task(loop)) 3103 3104 def test__leave_task_failure1(self): 3105 task1 = mock.Mock() 3106 task2 = mock.Mock() 3107 loop = mock.Mock() 3108 self._enter_task(loop, task1) 3109 with self.assertRaises(RuntimeError): 3110 self._leave_task(loop, task2) 3111 self.assertIs(asyncio.current_task(loop), task1) 3112 self._leave_task(loop, task1) 3113 3114 def test__leave_task_failure2(self): 3115 task = mock.Mock() 3116 loop = mock.Mock() 3117 with self.assertRaises(RuntimeError): 3118 self._leave_task(loop, task) 3119 self.assertIsNone(asyncio.current_task(loop)) 3120 3121 def test__unregister_task(self): 3122 task = mock.Mock() 3123 loop = mock.Mock() 3124 task.get_loop = lambda: loop 3125 self._register_task(task) 3126 self._unregister_task(task) 3127 self.assertEqual(asyncio.all_tasks(loop), set()) 3128 3129 def test__unregister_task_not_registered(self): 3130 task = mock.Mock() 3131 loop = mock.Mock() 3132 self._unregister_task(task) 3133 self.assertEqual(asyncio.all_tasks(loop), set()) 3134 3135 3136class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3137 _register_task = staticmethod(tasks._py_register_task) 3138 _unregister_task = staticmethod(tasks._py_unregister_task) 3139 _enter_task = staticmethod(tasks._py_enter_task) 3140 _leave_task = staticmethod(tasks._py_leave_task) 3141 3142 3143@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 3144 'requires the C _asyncio module') 3145class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 3146 if hasattr(tasks, '_c_register_task'): 3147 _register_task = staticmethod(tasks._c_register_task) 3148 _unregister_task = staticmethod(tasks._c_unregister_task) 3149 _enter_task = staticmethod(tasks._c_enter_task) 3150 _leave_task = staticmethod(tasks._c_leave_task) 3151 else: 3152 _register_task = _unregister_task = _enter_task = _leave_task = None 3153 3154 3155class BaseCurrentLoopTests: 3156 3157 def setUp(self): 3158 super().setUp() 3159 self.loop = asyncio.new_event_loop() 3160 self.set_event_loop(self.loop) 3161 3162 def new_task(self, coro): 3163 raise NotImplementedError 3164 3165 def test_current_task_no_running_loop(self): 3166 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3167 3168 def test_current_task_no_running_loop_implicit(self): 3169 with self.assertRaises(RuntimeError): 3170 asyncio.current_task() 3171 3172 def test_current_task_with_implicit_loop(self): 3173 async def coro(): 3174 self.assertIs(asyncio.current_task(loop=self.loop), task) 3175 3176 self.assertIs(asyncio.current_task(None), task) 3177 self.assertIs(asyncio.current_task(), task) 3178 3179 task = self.new_task(coro()) 3180 self.loop.run_until_complete(task) 3181 self.assertIsNone(asyncio.current_task(loop=self.loop)) 3182 3183 3184class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3185 3186 def new_task(self, coro): 3187 return tasks._PyTask(coro, loop=self.loop) 3188 3189 3190@unittest.skipUnless(hasattr(tasks, '_CTask'), 3191 'requires the C _asyncio module') 3192class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 3193 3194 def new_task(self, coro): 3195 return getattr(tasks, '_CTask')(coro, loop=self.loop) 3196 3197 3198class GenericTaskTests(test_utils.TestCase): 3199 3200 def test_future_subclass(self): 3201 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 3202 3203 @support.cpython_only 3204 def test_asyncio_module_compiled(self): 3205 # Because of circular imports it's easy to make _asyncio 3206 # module non-importable. This is a simple test that will 3207 # fail on systems where C modules were successfully compiled 3208 # (hence the test for _functools etc), but _asyncio somehow didn't. 3209 try: 3210 import _functools 3211 import _json 3212 import _pickle 3213 except ImportError: 3214 self.skipTest('C modules are not available') 3215 else: 3216 try: 3217 import _asyncio 3218 except ImportError: 3219 self.fail('_asyncio module is missing') 3220 3221 3222class GatherTestsBase: 3223 3224 def setUp(self): 3225 super().setUp() 3226 self.one_loop = self.new_test_loop() 3227 self.other_loop = self.new_test_loop() 3228 self.set_event_loop(self.one_loop, cleanup=False) 3229 3230 def _run_loop(self, loop): 3231 while loop._ready: 3232 test_utils.run_briefly(loop) 3233 3234 def _check_success(self, **kwargs): 3235 a, b, c = [self.one_loop.create_future() for i in range(3)] 3236 fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) 3237 cb = test_utils.MockCallback() 3238 fut.add_done_callback(cb) 3239 b.set_result(1) 3240 a.set_result(2) 3241 self._run_loop(self.one_loop) 3242 self.assertEqual(cb.called, False) 3243 self.assertFalse(fut.done()) 3244 c.set_result(3) 3245 self._run_loop(self.one_loop) 3246 cb.assert_called_once_with(fut) 3247 self.assertEqual(fut.result(), [2, 1, 3]) 3248 3249 def test_success(self): 3250 self._check_success() 3251 self._check_success(return_exceptions=False) 3252 3253 def test_result_exception_success(self): 3254 self._check_success(return_exceptions=True) 3255 3256 def test_one_exception(self): 3257 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3258 fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) 3259 cb = test_utils.MockCallback() 3260 fut.add_done_callback(cb) 3261 exc = ZeroDivisionError() 3262 a.set_result(1) 3263 b.set_exception(exc) 3264 self._run_loop(self.one_loop) 3265 self.assertTrue(fut.done()) 3266 cb.assert_called_once_with(fut) 3267 self.assertIs(fut.exception(), exc) 3268 # Does nothing 3269 c.set_result(3) 3270 d.cancel() 3271 e.set_exception(RuntimeError()) 3272 e.exception() 3273 3274 def test_return_exceptions(self): 3275 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 3276 fut = asyncio.gather(*self.wrap_futures(a, b, c, d), 3277 return_exceptions=True) 3278 cb = test_utils.MockCallback() 3279 fut.add_done_callback(cb) 3280 exc = ZeroDivisionError() 3281 exc2 = RuntimeError() 3282 b.set_result(1) 3283 c.set_exception(exc) 3284 a.set_result(3) 3285 self._run_loop(self.one_loop) 3286 self.assertFalse(fut.done()) 3287 d.set_exception(exc2) 3288 self._run_loop(self.one_loop) 3289 self.assertTrue(fut.done()) 3290 cb.assert_called_once_with(fut) 3291 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 3292 3293 def test_env_var_debug(self): 3294 code = '\n'.join(( 3295 'import asyncio.coroutines', 3296 'print(asyncio.coroutines._DEBUG)')) 3297 3298 # Test with -E to not fail if the unit test was run with 3299 # PYTHONASYNCIODEBUG set to a non-empty string 3300 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 3301 self.assertEqual(stdout.rstrip(), b'False') 3302 3303 sts, stdout, stderr = assert_python_ok('-c', code, 3304 PYTHONASYNCIODEBUG='', 3305 PYTHONDEVMODE='') 3306 self.assertEqual(stdout.rstrip(), b'False') 3307 3308 sts, stdout, stderr = assert_python_ok('-c', code, 3309 PYTHONASYNCIODEBUG='1', 3310 PYTHONDEVMODE='') 3311 self.assertEqual(stdout.rstrip(), b'True') 3312 3313 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3314 PYTHONASYNCIODEBUG='1', 3315 PYTHONDEVMODE='') 3316 self.assertEqual(stdout.rstrip(), b'False') 3317 3318 # -X dev 3319 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3320 '-c', code) 3321 self.assertEqual(stdout.rstrip(), b'True') 3322 3323 3324class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3325 3326 def wrap_futures(self, *futures): 3327 return futures 3328 3329 def _check_empty_sequence(self, seq_or_iter): 3330 asyncio.set_event_loop(self.one_loop) 3331 self.addCleanup(asyncio.set_event_loop, None) 3332 fut = asyncio.gather(*seq_or_iter) 3333 self.assertIsInstance(fut, asyncio.Future) 3334 self.assertIs(fut._loop, self.one_loop) 3335 self._run_loop(self.one_loop) 3336 self.assertTrue(fut.done()) 3337 self.assertEqual(fut.result(), []) 3338 with self.assertWarns(DeprecationWarning): 3339 fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) 3340 self.assertIs(fut._loop, self.other_loop) 3341 3342 def test_constructor_empty_sequence(self): 3343 self._check_empty_sequence([]) 3344 self._check_empty_sequence(()) 3345 self._check_empty_sequence(set()) 3346 self._check_empty_sequence(iter("")) 3347 3348 def test_constructor_heterogenous_futures(self): 3349 fut1 = self.one_loop.create_future() 3350 fut2 = self.other_loop.create_future() 3351 with self.assertRaises(ValueError): 3352 asyncio.gather(fut1, fut2) 3353 with self.assertRaises(ValueError): 3354 with self.assertWarns(DeprecationWarning): 3355 asyncio.gather(fut1, loop=self.other_loop) 3356 3357 def test_constructor_homogenous_futures(self): 3358 children = [self.other_loop.create_future() for i in range(3)] 3359 fut = asyncio.gather(*children) 3360 self.assertIs(fut._loop, self.other_loop) 3361 self._run_loop(self.other_loop) 3362 self.assertFalse(fut.done()) 3363 with self.assertWarns(DeprecationWarning): 3364 fut = asyncio.gather(*children, loop=self.other_loop) 3365 self.assertIs(fut._loop, self.other_loop) 3366 self._run_loop(self.other_loop) 3367 self.assertFalse(fut.done()) 3368 3369 def test_one_cancellation(self): 3370 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3371 fut = asyncio.gather(a, b, c, d, e) 3372 cb = test_utils.MockCallback() 3373 fut.add_done_callback(cb) 3374 a.set_result(1) 3375 b.cancel() 3376 self._run_loop(self.one_loop) 3377 self.assertTrue(fut.done()) 3378 cb.assert_called_once_with(fut) 3379 self.assertFalse(fut.cancelled()) 3380 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3381 # Does nothing 3382 c.set_result(3) 3383 d.cancel() 3384 e.set_exception(RuntimeError()) 3385 e.exception() 3386 3387 def test_result_exception_one_cancellation(self): 3388 a, b, c, d, e, f = [self.one_loop.create_future() 3389 for i in range(6)] 3390 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3391 cb = test_utils.MockCallback() 3392 fut.add_done_callback(cb) 3393 a.set_result(1) 3394 zde = ZeroDivisionError() 3395 b.set_exception(zde) 3396 c.cancel() 3397 self._run_loop(self.one_loop) 3398 self.assertFalse(fut.done()) 3399 d.set_result(3) 3400 e.cancel() 3401 rte = RuntimeError() 3402 f.set_exception(rte) 3403 res = self.one_loop.run_until_complete(fut) 3404 self.assertIsInstance(res[2], asyncio.CancelledError) 3405 self.assertIsInstance(res[4], asyncio.CancelledError) 3406 res[2] = res[4] = None 3407 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3408 cb.assert_called_once_with(fut) 3409 3410 3411class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3412 3413 def setUp(self): 3414 super().setUp() 3415 asyncio.set_event_loop(self.one_loop) 3416 3417 def wrap_futures(self, *futures): 3418 coros = [] 3419 for fut in futures: 3420 async def coro(fut=fut): 3421 return await fut 3422 coros.append(coro()) 3423 return coros 3424 3425 def test_constructor_loop_selection(self): 3426 async def coro(): 3427 return 'abc' 3428 gen1 = coro() 3429 gen2 = coro() 3430 fut = asyncio.gather(gen1, gen2) 3431 self.assertIs(fut._loop, self.one_loop) 3432 self.one_loop.run_until_complete(fut) 3433 3434 self.set_event_loop(self.other_loop, cleanup=False) 3435 gen3 = coro() 3436 gen4 = coro() 3437 with self.assertWarns(DeprecationWarning): 3438 fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) 3439 self.assertIs(fut2._loop, self.other_loop) 3440 self.other_loop.run_until_complete(fut2) 3441 3442 def test_duplicate_coroutines(self): 3443 with self.assertWarns(DeprecationWarning): 3444 @asyncio.coroutine 3445 def coro(s): 3446 return s 3447 c = coro('abc') 3448 with self.assertWarns(DeprecationWarning): 3449 fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) 3450 self._run_loop(self.one_loop) 3451 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3452 3453 def test_cancellation_broadcast(self): 3454 # Cancelling outer() cancels all children. 3455 proof = 0 3456 waiter = self.one_loop.create_future() 3457 3458 async def inner(): 3459 nonlocal proof 3460 await waiter 3461 proof += 1 3462 3463 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3464 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3465 gatherer = None 3466 3467 async def outer(): 3468 nonlocal proof, gatherer 3469 gatherer = asyncio.gather(child1, child2) 3470 await gatherer 3471 proof += 100 3472 3473 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3474 test_utils.run_briefly(self.one_loop) 3475 self.assertTrue(f.cancel()) 3476 with self.assertRaises(asyncio.CancelledError): 3477 self.one_loop.run_until_complete(f) 3478 self.assertFalse(gatherer.cancel()) 3479 self.assertTrue(waiter.cancelled()) 3480 self.assertTrue(child1.cancelled()) 3481 self.assertTrue(child2.cancelled()) 3482 test_utils.run_briefly(self.one_loop) 3483 self.assertEqual(proof, 0) 3484 3485 def test_exception_marking(self): 3486 # Test for the first line marked "Mark exception retrieved." 3487 3488 async def inner(f): 3489 await f 3490 raise RuntimeError('should not be ignored') 3491 3492 a = self.one_loop.create_future() 3493 b = self.one_loop.create_future() 3494 3495 async def outer(): 3496 await asyncio.gather(inner(a), inner(b)) 3497 3498 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3499 test_utils.run_briefly(self.one_loop) 3500 a.set_result(None) 3501 test_utils.run_briefly(self.one_loop) 3502 b.set_result(None) 3503 test_utils.run_briefly(self.one_loop) 3504 self.assertIsInstance(f.exception(), RuntimeError) 3505 3506 3507class RunCoroutineThreadsafeTests(test_utils.TestCase): 3508 """Test case for asyncio.run_coroutine_threadsafe.""" 3509 3510 def setUp(self): 3511 super().setUp() 3512 self.loop = asyncio.new_event_loop() 3513 self.set_event_loop(self.loop) # Will cleanup properly 3514 3515 async def add(self, a, b, fail=False, cancel=False): 3516 """Wait 0.05 second and return a + b.""" 3517 await asyncio.sleep(0.05) 3518 if fail: 3519 raise RuntimeError("Fail!") 3520 if cancel: 3521 asyncio.current_task(self.loop).cancel() 3522 await asyncio.sleep(0) 3523 return a + b 3524 3525 def target(self, fail=False, cancel=False, timeout=None, 3526 advance_coro=False): 3527 """Run add coroutine in the event loop.""" 3528 coro = self.add(1, 2, fail=fail, cancel=cancel) 3529 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3530 if advance_coro: 3531 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3532 # otherwise it spills errors and breaks **other** unittests, since 3533 # 'target' is interacting with threads. 3534 3535 # With this call, `coro` will be advanced, so that 3536 # CoroWrapper.__del__ won't do anything when asyncio tests run 3537 # in debug mode. 3538 self.loop.call_soon_threadsafe(coro.send, None) 3539 try: 3540 return future.result(timeout) 3541 finally: 3542 future.done() or future.cancel() 3543 3544 def test_run_coroutine_threadsafe(self): 3545 """Test coroutine submission from a thread to an event loop.""" 3546 future = self.loop.run_in_executor(None, self.target) 3547 result = self.loop.run_until_complete(future) 3548 self.assertEqual(result, 3) 3549 3550 def test_run_coroutine_threadsafe_with_exception(self): 3551 """Test coroutine submission from a thread to an event loop 3552 when an exception is raised.""" 3553 future = self.loop.run_in_executor(None, self.target, True) 3554 with self.assertRaises(RuntimeError) as exc_context: 3555 self.loop.run_until_complete(future) 3556 self.assertIn("Fail!", exc_context.exception.args) 3557 3558 def test_run_coroutine_threadsafe_with_timeout(self): 3559 """Test coroutine submission from a thread to an event loop 3560 when a timeout is raised.""" 3561 callback = lambda: self.target(timeout=0) 3562 future = self.loop.run_in_executor(None, callback) 3563 with self.assertRaises(asyncio.TimeoutError): 3564 self.loop.run_until_complete(future) 3565 test_utils.run_briefly(self.loop) 3566 # Check that there's no pending task (add has been cancelled) 3567 for task in asyncio.all_tasks(self.loop): 3568 self.assertTrue(task.done()) 3569 3570 def test_run_coroutine_threadsafe_task_cancelled(self): 3571 """Test coroutine submission from a thread to an event loop 3572 when the task is cancelled.""" 3573 callback = lambda: self.target(cancel=True) 3574 future = self.loop.run_in_executor(None, callback) 3575 with self.assertRaises(asyncio.CancelledError): 3576 self.loop.run_until_complete(future) 3577 3578 def test_run_coroutine_threadsafe_task_factory_exception(self): 3579 """Test coroutine submission from a thread to an event loop 3580 when the task factory raise an exception.""" 3581 3582 def task_factory(loop, coro): 3583 raise NameError 3584 3585 run = self.loop.run_in_executor( 3586 None, lambda: self.target(advance_coro=True)) 3587 3588 # Set exception handler 3589 callback = test_utils.MockCallback() 3590 self.loop.set_exception_handler(callback) 3591 3592 # Set corrupted task factory 3593 self.addCleanup(self.loop.set_task_factory, 3594 self.loop.get_task_factory()) 3595 self.loop.set_task_factory(task_factory) 3596 3597 # Run event loop 3598 with self.assertRaises(NameError) as exc_context: 3599 self.loop.run_until_complete(run) 3600 3601 # Check exceptions 3602 self.assertEqual(len(callback.call_args_list), 1) 3603 (loop, context), kwargs = callback.call_args 3604 self.assertEqual(context['exception'], exc_context.exception) 3605 3606 3607class SleepTests(test_utils.TestCase): 3608 def setUp(self): 3609 super().setUp() 3610 self.loop = asyncio.new_event_loop() 3611 self.set_event_loop(self.loop) 3612 3613 def tearDown(self): 3614 self.loop.close() 3615 self.loop = None 3616 super().tearDown() 3617 3618 def test_sleep_zero(self): 3619 result = 0 3620 3621 def inc_result(num): 3622 nonlocal result 3623 result += num 3624 3625 async def coro(): 3626 self.loop.call_soon(inc_result, 1) 3627 self.assertEqual(result, 0) 3628 num = await asyncio.sleep(0, result=10) 3629 self.assertEqual(result, 1) # inc'ed by call_soon 3630 inc_result(num) # num should be 11 3631 3632 self.loop.run_until_complete(coro()) 3633 self.assertEqual(result, 11) 3634 3635 def test_loop_argument_is_deprecated(self): 3636 # Remove test when loop argument is removed in Python 3.10 3637 with self.assertWarns(DeprecationWarning): 3638 self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop)) 3639 3640 3641class WaitTests(test_utils.TestCase): 3642 def setUp(self): 3643 super().setUp() 3644 self.loop = asyncio.new_event_loop() 3645 self.set_event_loop(self.loop) 3646 3647 def tearDown(self): 3648 self.loop.close() 3649 self.loop = None 3650 super().tearDown() 3651 3652 def test_loop_argument_is_deprecated_in_wait(self): 3653 # Remove test when loop argument is removed in Python 3.10 3654 with self.assertWarns(DeprecationWarning): 3655 self.loop.run_until_complete( 3656 asyncio.wait([coroutine_function()], loop=self.loop)) 3657 3658 def test_loop_argument_is_deprecated_in_wait_for(self): 3659 # Remove test when loop argument is removed in Python 3.10 3660 with self.assertWarns(DeprecationWarning): 3661 self.loop.run_until_complete( 3662 asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop)) 3663 3664 def test_coro_is_deprecated_in_wait(self): 3665 # Remove test when passing coros to asyncio.wait() is removed in 3.11 3666 with self.assertWarns(DeprecationWarning): 3667 self.loop.run_until_complete( 3668 asyncio.wait([coroutine_function()])) 3669 3670 task = self.loop.create_task(coroutine_function()) 3671 with self.assertWarns(DeprecationWarning): 3672 self.loop.run_until_complete( 3673 asyncio.wait([task, coroutine_function()])) 3674 3675 3676class CompatibilityTests(test_utils.TestCase): 3677 # Tests for checking a bridge between old-styled coroutines 3678 # and async/await syntax 3679 3680 def setUp(self): 3681 super().setUp() 3682 self.loop = asyncio.new_event_loop() 3683 self.set_event_loop(self.loop) 3684 3685 def tearDown(self): 3686 self.loop.close() 3687 self.loop = None 3688 super().tearDown() 3689 3690 def test_yield_from_awaitable(self): 3691 3692 with self.assertWarns(DeprecationWarning): 3693 @asyncio.coroutine 3694 def coro(): 3695 yield from asyncio.sleep(0) 3696 return 'ok' 3697 3698 result = self.loop.run_until_complete(coro()) 3699 self.assertEqual('ok', result) 3700 3701 def test_await_old_style_coro(self): 3702 3703 with self.assertWarns(DeprecationWarning): 3704 @asyncio.coroutine 3705 def coro1(): 3706 return 'ok1' 3707 3708 with self.assertWarns(DeprecationWarning): 3709 @asyncio.coroutine 3710 def coro2(): 3711 yield from asyncio.sleep(0) 3712 return 'ok2' 3713 3714 async def inner(): 3715 return await asyncio.gather(coro1(), coro2()) 3716 3717 result = self.loop.run_until_complete(inner()) 3718 self.assertEqual(['ok1', 'ok2'], result) 3719 3720 def test_debug_mode_interop(self): 3721 # https://bugs.python.org/issue32636 3722 code = textwrap.dedent(""" 3723 import asyncio 3724 3725 async def native_coro(): 3726 pass 3727 3728 @asyncio.coroutine 3729 def old_style_coro(): 3730 yield from native_coro() 3731 3732 asyncio.run(old_style_coro()) 3733 """) 3734 3735 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3736 PYTHONASYNCIODEBUG="1") 3737 3738 3739if __name__ == '__main__': 3740 unittest.main() 3741