1"""Tests for tasks.py.""" 2 3import collections 4import contextlib 5import contextvars 6import functools 7import gc 8import io 9import random 10import re 11import sys 12import textwrap 13import types 14import unittest 15import weakref 16from unittest import mock 17 18import asyncio 19from asyncio import coroutines 20from asyncio import futures 21from asyncio import tasks 22from test.test_asyncio import utils as test_utils 23from test import support 24from test.support.script_helper import assert_python_ok 25 26 27def tearDownModule(): 28 asyncio.set_event_loop_policy(None) 29 30 31async def coroutine_function(): 32 pass 33 34 35@contextlib.contextmanager 36def set_coroutine_debug(enabled): 37 coroutines = asyncio.coroutines 38 39 old_debug = coroutines._DEBUG 40 try: 41 coroutines._DEBUG = enabled 42 yield 43 finally: 44 coroutines._DEBUG = old_debug 45 46 47def format_coroutine(qualname, state, src, source_traceback, generator=False): 48 if generator: 49 state = '%s' % state 50 else: 51 state = '%s, defined' % state 52 if source_traceback is not None: 53 frame = source_traceback[-1] 54 return ('coro=<%s() %s at %s> created at %s:%s' 55 % (qualname, state, src, frame[0], frame[1])) 56 else: 57 return 'coro=<%s() %s at %s>' % (qualname, state, src) 58 59 60class Dummy: 61 62 def __repr__(self): 63 return '<Dummy>' 64 65 def __call__(self, *args): 66 pass 67 68 69class CoroLikeObject: 70 def send(self, v): 71 raise StopIteration(42) 72 73 def throw(self, *exc): 74 pass 75 76 def close(self): 77 pass 78 79 def __await__(self): 80 return self 81 82 83class BaseTaskTests: 84 85 Task = None 86 Future = None 87 88 def new_task(self, loop, coro, name='TestTask'): 89 return self.__class__.Task(coro, loop=loop, name=name) 90 91 def new_future(self, loop): 92 return self.__class__.Future(loop=loop) 93 94 def setUp(self): 95 super().setUp() 96 self.loop = self.new_test_loop() 97 self.loop.set_task_factory(self.new_task) 98 self.loop.create_future = lambda: self.new_future(self.loop) 99 100 def test_task_del_collect(self): 101 class Evil: 102 def __del__(self): 103 gc.collect() 104 105 async def run(): 106 return Evil() 107 108 self.loop.run_until_complete( 109 asyncio.gather(*[ 110 self.new_task(self.loop, run()) for _ in range(100) 111 ], loop=self.loop)) 112 113 def test_other_loop_future(self): 114 other_loop = asyncio.new_event_loop() 115 fut = self.new_future(other_loop) 116 117 async def run(fut): 118 await fut 119 120 try: 121 with self.assertRaisesRegex(RuntimeError, 122 r'Task .* got Future .* attached'): 123 self.loop.run_until_complete(run(fut)) 124 finally: 125 other_loop.close() 126 127 def test_task_awaits_on_itself(self): 128 129 async def test(): 130 await task 131 132 task = asyncio.ensure_future(test(), loop=self.loop) 133 134 with self.assertRaisesRegex(RuntimeError, 135 'Task cannot await on itself'): 136 self.loop.run_until_complete(task) 137 138 def test_task_class(self): 139 async def notmuch(): 140 return 'ok' 141 t = self.new_task(self.loop, notmuch()) 142 self.loop.run_until_complete(t) 143 self.assertTrue(t.done()) 144 self.assertEqual(t.result(), 'ok') 145 self.assertIs(t._loop, self.loop) 146 self.assertIs(t.get_loop(), self.loop) 147 148 loop = asyncio.new_event_loop() 149 self.set_event_loop(loop) 150 t = self.new_task(loop, notmuch()) 151 self.assertIs(t._loop, loop) 152 loop.run_until_complete(t) 153 loop.close() 154 155 def test_ensure_future_coroutine(self): 156 with self.assertWarns(DeprecationWarning): 157 @asyncio.coroutine 158 def notmuch(): 159 return 'ok' 160 t = asyncio.ensure_future(notmuch(), loop=self.loop) 161 self.loop.run_until_complete(t) 162 self.assertTrue(t.done()) 163 self.assertEqual(t.result(), 'ok') 164 self.assertIs(t._loop, self.loop) 165 166 loop = asyncio.new_event_loop() 167 self.set_event_loop(loop) 168 t = asyncio.ensure_future(notmuch(), loop=loop) 169 self.assertIs(t._loop, loop) 170 loop.run_until_complete(t) 171 loop.close() 172 173 def test_ensure_future_future(self): 174 f_orig = self.new_future(self.loop) 175 f_orig.set_result('ko') 176 177 f = asyncio.ensure_future(f_orig) 178 self.loop.run_until_complete(f) 179 self.assertTrue(f.done()) 180 self.assertEqual(f.result(), 'ko') 181 self.assertIs(f, f_orig) 182 183 loop = asyncio.new_event_loop() 184 self.set_event_loop(loop) 185 186 with self.assertRaises(ValueError): 187 f = asyncio.ensure_future(f_orig, loop=loop) 188 189 loop.close() 190 191 f = asyncio.ensure_future(f_orig, loop=self.loop) 192 self.assertIs(f, f_orig) 193 194 def test_ensure_future_task(self): 195 async def notmuch(): 196 return 'ok' 197 t_orig = self.new_task(self.loop, notmuch()) 198 t = asyncio.ensure_future(t_orig) 199 self.loop.run_until_complete(t) 200 self.assertTrue(t.done()) 201 self.assertEqual(t.result(), 'ok') 202 self.assertIs(t, t_orig) 203 204 loop = asyncio.new_event_loop() 205 self.set_event_loop(loop) 206 207 with self.assertRaises(ValueError): 208 t = asyncio.ensure_future(t_orig, loop=loop) 209 210 loop.close() 211 212 t = asyncio.ensure_future(t_orig, loop=self.loop) 213 self.assertIs(t, t_orig) 214 215 def test_ensure_future_awaitable(self): 216 class Aw: 217 def __init__(self, coro): 218 self.coro = coro 219 def __await__(self): 220 return (yield from self.coro) 221 222 with self.assertWarns(DeprecationWarning): 223 @asyncio.coroutine 224 def coro(): 225 return 'ok' 226 227 loop = asyncio.new_event_loop() 228 self.set_event_loop(loop) 229 fut = asyncio.ensure_future(Aw(coro()), loop=loop) 230 loop.run_until_complete(fut) 231 assert fut.result() == 'ok' 232 233 def test_ensure_future_neither(self): 234 with self.assertRaises(TypeError): 235 asyncio.ensure_future('ok') 236 237 def test_ensure_future_error_msg(self): 238 loop = asyncio.new_event_loop() 239 f = self.new_future(self.loop) 240 with self.assertRaisesRegex(ValueError, 'The future belongs to a ' 241 'different loop than the one specified as ' 242 'the loop argument'): 243 asyncio.ensure_future(f, loop=loop) 244 loop.close() 245 246 def test_get_stack(self): 247 T = None 248 249 async def foo(): 250 await bar() 251 252 async def bar(): 253 # test get_stack() 254 f = T.get_stack(limit=1) 255 try: 256 self.assertEqual(f[0].f_code.co_name, 'foo') 257 finally: 258 f = None 259 260 # test print_stack() 261 file = io.StringIO() 262 T.print_stack(limit=1, file=file) 263 file.seek(0) 264 tb = file.read() 265 self.assertRegex(tb, r'foo\(\) running') 266 267 async def runner(): 268 nonlocal T 269 T = asyncio.ensure_future(foo(), loop=self.loop) 270 await T 271 272 self.loop.run_until_complete(runner()) 273 274 def test_task_repr(self): 275 self.loop.set_debug(False) 276 277 async def notmuch(): 278 return 'abc' 279 280 # test coroutine function 281 self.assertEqual(notmuch.__name__, 'notmuch') 282 self.assertRegex(notmuch.__qualname__, 283 r'\w+.test_task_repr.<locals>.notmuch') 284 self.assertEqual(notmuch.__module__, __name__) 285 286 filename, lineno = test_utils.get_function_source(notmuch) 287 src = "%s:%s" % (filename, lineno) 288 289 # test coroutine object 290 gen = notmuch() 291 coro_qualname = 'BaseTaskTests.test_task_repr.<locals>.notmuch' 292 self.assertEqual(gen.__name__, 'notmuch') 293 self.assertEqual(gen.__qualname__, coro_qualname) 294 295 # test pending Task 296 t = self.new_task(self.loop, gen) 297 t.add_done_callback(Dummy()) 298 299 coro = format_coroutine(coro_qualname, 'running', src, 300 t._source_traceback, generator=True) 301 self.assertEqual(repr(t), 302 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 303 304 # test cancelling Task 305 t.cancel() # Does not take immediate effect! 306 self.assertEqual(repr(t), 307 "<Task cancelling name='TestTask' %s cb=[<Dummy>()]>" % coro) 308 309 # test cancelled Task 310 self.assertRaises(asyncio.CancelledError, 311 self.loop.run_until_complete, t) 312 coro = format_coroutine(coro_qualname, 'done', src, 313 t._source_traceback) 314 self.assertEqual(repr(t), 315 "<Task cancelled name='TestTask' %s>" % coro) 316 317 # test finished Task 318 t = self.new_task(self.loop, notmuch()) 319 self.loop.run_until_complete(t) 320 coro = format_coroutine(coro_qualname, 'done', src, 321 t._source_traceback) 322 self.assertEqual(repr(t), 323 "<Task finished name='TestTask' %s result='abc'>" % coro) 324 325 def test_task_repr_autogenerated(self): 326 async def notmuch(): 327 return 123 328 329 t1 = self.new_task(self.loop, notmuch(), None) 330 t2 = self.new_task(self.loop, notmuch(), None) 331 self.assertNotEqual(repr(t1), repr(t2)) 332 333 match1 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t1)) 334 self.assertIsNotNone(match1) 335 match2 = re.match(r"^<Task pending name='Task-(\d+)'", repr(t2)) 336 self.assertIsNotNone(match2) 337 338 # Autogenerated task names should have monotonically increasing numbers 339 self.assertLess(int(match1.group(1)), int(match2.group(1))) 340 self.loop.run_until_complete(t1) 341 self.loop.run_until_complete(t2) 342 343 def test_task_repr_name_not_str(self): 344 async def notmuch(): 345 return 123 346 347 t = self.new_task(self.loop, notmuch()) 348 t.set_name({6}) 349 self.assertEqual(t.get_name(), '{6}') 350 self.loop.run_until_complete(t) 351 352 def test_task_repr_coro_decorator(self): 353 self.loop.set_debug(False) 354 355 with self.assertWarns(DeprecationWarning): 356 @asyncio.coroutine 357 def notmuch(): 358 # notmuch() function doesn't use yield from: it will be wrapped by 359 # @coroutine decorator 360 return 123 361 362 # test coroutine function 363 self.assertEqual(notmuch.__name__, 'notmuch') 364 self.assertRegex(notmuch.__qualname__, 365 r'\w+.test_task_repr_coro_decorator' 366 r'\.<locals>\.notmuch') 367 self.assertEqual(notmuch.__module__, __name__) 368 369 # test coroutine object 370 gen = notmuch() 371 # On Python >= 3.5, generators now inherit the name of the 372 # function, as expected, and have a qualified name (__qualname__ 373 # attribute). 374 coro_name = 'notmuch' 375 coro_qualname = ('BaseTaskTests.test_task_repr_coro_decorator' 376 '.<locals>.notmuch') 377 self.assertEqual(gen.__name__, coro_name) 378 self.assertEqual(gen.__qualname__, coro_qualname) 379 380 # test repr(CoroWrapper) 381 if coroutines._DEBUG: 382 # format the coroutine object 383 if coroutines._DEBUG: 384 filename, lineno = test_utils.get_function_source(notmuch) 385 frame = gen._source_traceback[-1] 386 coro = ('%s() running, defined at %s:%s, created at %s:%s' 387 % (coro_qualname, filename, lineno, 388 frame[0], frame[1])) 389 else: 390 code = gen.gi_code 391 coro = ('%s() running at %s:%s' 392 % (coro_qualname, code.co_filename, 393 code.co_firstlineno)) 394 395 self.assertEqual(repr(gen), '<CoroWrapper %s>' % coro) 396 397 # test pending Task 398 t = self.new_task(self.loop, gen) 399 t.add_done_callback(Dummy()) 400 401 # format the coroutine object 402 if coroutines._DEBUG: 403 src = '%s:%s' % test_utils.get_function_source(notmuch) 404 else: 405 code = gen.gi_code 406 src = '%s:%s' % (code.co_filename, code.co_firstlineno) 407 coro = format_coroutine(coro_qualname, 'running', src, 408 t._source_traceback, 409 generator=not coroutines._DEBUG) 410 self.assertEqual(repr(t), 411 "<Task pending name='TestTask' %s cb=[<Dummy>()]>" % coro) 412 self.loop.run_until_complete(t) 413 414 def test_task_repr_wait_for(self): 415 self.loop.set_debug(False) 416 417 async def wait_for(fut): 418 return await fut 419 420 fut = self.new_future(self.loop) 421 task = self.new_task(self.loop, wait_for(fut)) 422 test_utils.run_briefly(self.loop) 423 self.assertRegex(repr(task), 424 '<Task .* wait_for=%s>' % re.escape(repr(fut))) 425 426 fut.set_result(None) 427 self.loop.run_until_complete(task) 428 429 def test_task_repr_partial_corowrapper(self): 430 # Issue #222: repr(CoroWrapper) must not fail in debug mode if the 431 # coroutine is a partial function 432 with set_coroutine_debug(True): 433 self.loop.set_debug(True) 434 435 async def func(x, y): 436 await asyncio.sleep(0) 437 438 with self.assertWarns(DeprecationWarning): 439 partial_func = asyncio.coroutine(functools.partial(func, 1)) 440 task = self.loop.create_task(partial_func(2)) 441 442 # make warnings quiet 443 task._log_destroy_pending = False 444 self.addCleanup(task._coro.close) 445 446 coro_repr = repr(task._coro) 447 expected = ( 448 r'<coroutine object \w+\.test_task_repr_partial_corowrapper' 449 r'\.<locals>\.func at' 450 ) 451 self.assertRegex(coro_repr, expected) 452 453 def test_task_basics(self): 454 455 async def outer(): 456 a = await inner1() 457 b = await inner2() 458 return a+b 459 460 async def inner1(): 461 return 42 462 463 async def inner2(): 464 return 1000 465 466 t = outer() 467 self.assertEqual(self.loop.run_until_complete(t), 1042) 468 469 def test_cancel(self): 470 471 def gen(): 472 when = yield 473 self.assertAlmostEqual(10.0, when) 474 yield 0 475 476 loop = self.new_test_loop(gen) 477 478 async def task(): 479 await asyncio.sleep(10.0) 480 return 12 481 482 t = self.new_task(loop, task()) 483 loop.call_soon(t.cancel) 484 with self.assertRaises(asyncio.CancelledError): 485 loop.run_until_complete(t) 486 self.assertTrue(t.done()) 487 self.assertTrue(t.cancelled()) 488 self.assertFalse(t.cancel()) 489 490 def test_cancel_yield(self): 491 with self.assertWarns(DeprecationWarning): 492 @asyncio.coroutine 493 def task(): 494 yield 495 yield 496 return 12 497 498 t = self.new_task(self.loop, task()) 499 test_utils.run_briefly(self.loop) # start coro 500 t.cancel() 501 self.assertRaises( 502 asyncio.CancelledError, self.loop.run_until_complete, t) 503 self.assertTrue(t.done()) 504 self.assertTrue(t.cancelled()) 505 self.assertFalse(t.cancel()) 506 507 def test_cancel_inner_future(self): 508 f = self.new_future(self.loop) 509 510 async def task(): 511 await f 512 return 12 513 514 t = self.new_task(self.loop, task()) 515 test_utils.run_briefly(self.loop) # start task 516 f.cancel() 517 with self.assertRaises(asyncio.CancelledError): 518 self.loop.run_until_complete(t) 519 self.assertTrue(f.cancelled()) 520 self.assertTrue(t.cancelled()) 521 522 def test_cancel_both_task_and_inner_future(self): 523 f = self.new_future(self.loop) 524 525 async def task(): 526 await f 527 return 12 528 529 t = self.new_task(self.loop, task()) 530 test_utils.run_briefly(self.loop) 531 532 f.cancel() 533 t.cancel() 534 535 with self.assertRaises(asyncio.CancelledError): 536 self.loop.run_until_complete(t) 537 538 self.assertTrue(t.done()) 539 self.assertTrue(f.cancelled()) 540 self.assertTrue(t.cancelled()) 541 542 def test_cancel_task_catching(self): 543 fut1 = self.new_future(self.loop) 544 fut2 = self.new_future(self.loop) 545 546 async def task(): 547 await fut1 548 try: 549 await fut2 550 except asyncio.CancelledError: 551 return 42 552 553 t = self.new_task(self.loop, task()) 554 test_utils.run_briefly(self.loop) 555 self.assertIs(t._fut_waiter, fut1) # White-box test. 556 fut1.set_result(None) 557 test_utils.run_briefly(self.loop) 558 self.assertIs(t._fut_waiter, fut2) # White-box test. 559 t.cancel() 560 self.assertTrue(fut2.cancelled()) 561 res = self.loop.run_until_complete(t) 562 self.assertEqual(res, 42) 563 self.assertFalse(t.cancelled()) 564 565 def test_cancel_task_ignoring(self): 566 fut1 = self.new_future(self.loop) 567 fut2 = self.new_future(self.loop) 568 fut3 = self.new_future(self.loop) 569 570 async def task(): 571 await fut1 572 try: 573 await fut2 574 except asyncio.CancelledError: 575 pass 576 res = await fut3 577 return res 578 579 t = self.new_task(self.loop, task()) 580 test_utils.run_briefly(self.loop) 581 self.assertIs(t._fut_waiter, fut1) # White-box test. 582 fut1.set_result(None) 583 test_utils.run_briefly(self.loop) 584 self.assertIs(t._fut_waiter, fut2) # White-box test. 585 t.cancel() 586 self.assertTrue(fut2.cancelled()) 587 test_utils.run_briefly(self.loop) 588 self.assertIs(t._fut_waiter, fut3) # White-box test. 589 fut3.set_result(42) 590 res = self.loop.run_until_complete(t) 591 self.assertEqual(res, 42) 592 self.assertFalse(fut3.cancelled()) 593 self.assertFalse(t.cancelled()) 594 595 def test_cancel_current_task(self): 596 loop = asyncio.new_event_loop() 597 self.set_event_loop(loop) 598 599 async def task(): 600 t.cancel() 601 self.assertTrue(t._must_cancel) # White-box test. 602 # The sleep should be cancelled immediately. 603 await asyncio.sleep(100) 604 return 12 605 606 t = self.new_task(loop, task()) 607 self.assertFalse(t.cancelled()) 608 self.assertRaises( 609 asyncio.CancelledError, loop.run_until_complete, t) 610 self.assertTrue(t.done()) 611 self.assertTrue(t.cancelled()) 612 self.assertFalse(t._must_cancel) # White-box test. 613 self.assertFalse(t.cancel()) 614 615 def test_cancel_at_end(self): 616 """coroutine end right after task is cancelled""" 617 loop = asyncio.new_event_loop() 618 self.set_event_loop(loop) 619 620 async def task(): 621 t.cancel() 622 self.assertTrue(t._must_cancel) # White-box test. 623 return 12 624 625 t = self.new_task(loop, task()) 626 self.assertFalse(t.cancelled()) 627 self.assertRaises( 628 asyncio.CancelledError, loop.run_until_complete, t) 629 self.assertTrue(t.done()) 630 self.assertTrue(t.cancelled()) 631 self.assertFalse(t._must_cancel) # White-box test. 632 self.assertFalse(t.cancel()) 633 634 def test_cancel_awaited_task(self): 635 # This tests for a relatively rare condition when 636 # a task cancellation is requested for a task which is not 637 # currently blocked, such as a task cancelling itself. 638 # In this situation we must ensure that whatever next future 639 # or task the cancelled task blocks on is cancelled correctly 640 # as well. See also bpo-34872. 641 loop = asyncio.new_event_loop() 642 self.addCleanup(lambda: loop.close()) 643 644 task = nested_task = None 645 fut = self.new_future(loop) 646 647 async def nested(): 648 await fut 649 650 async def coro(): 651 nonlocal nested_task 652 # Create a sub-task and wait for it to run. 653 nested_task = self.new_task(loop, nested()) 654 await asyncio.sleep(0) 655 656 # Request the current task to be cancelled. 657 task.cancel() 658 # Block on the nested task, which should be immediately 659 # cancelled. 660 await nested_task 661 662 task = self.new_task(loop, coro()) 663 with self.assertRaises(asyncio.CancelledError): 664 loop.run_until_complete(task) 665 666 self.assertTrue(task.cancelled()) 667 self.assertTrue(nested_task.cancelled()) 668 self.assertTrue(fut.cancelled()) 669 670 def test_stop_while_run_in_complete(self): 671 672 def gen(): 673 when = yield 674 self.assertAlmostEqual(0.1, when) 675 when = yield 0.1 676 self.assertAlmostEqual(0.2, when) 677 when = yield 0.1 678 self.assertAlmostEqual(0.3, when) 679 yield 0.1 680 681 loop = self.new_test_loop(gen) 682 683 x = 0 684 685 async def task(): 686 nonlocal x 687 while x < 10: 688 await asyncio.sleep(0.1) 689 x += 1 690 if x == 2: 691 loop.stop() 692 693 t = self.new_task(loop, task()) 694 with self.assertRaises(RuntimeError) as cm: 695 loop.run_until_complete(t) 696 self.assertEqual(str(cm.exception), 697 'Event loop stopped before Future completed.') 698 self.assertFalse(t.done()) 699 self.assertEqual(x, 2) 700 self.assertAlmostEqual(0.3, loop.time()) 701 702 t.cancel() 703 self.assertRaises(asyncio.CancelledError, loop.run_until_complete, t) 704 705 def test_log_traceback(self): 706 async def coro(): 707 pass 708 709 task = self.new_task(self.loop, coro()) 710 with self.assertRaisesRegex(ValueError, 'can only be set to False'): 711 task._log_traceback = True 712 self.loop.run_until_complete(task) 713 714 def test_wait_for_timeout_less_then_0_or_0_future_done(self): 715 def gen(): 716 when = yield 717 self.assertAlmostEqual(0, when) 718 719 loop = self.new_test_loop(gen) 720 721 fut = self.new_future(loop) 722 fut.set_result('done') 723 724 ret = loop.run_until_complete(asyncio.wait_for(fut, 0)) 725 726 self.assertEqual(ret, 'done') 727 self.assertTrue(fut.done()) 728 self.assertAlmostEqual(0, loop.time()) 729 730 def test_wait_for_timeout_less_then_0_or_0_coroutine_do_not_started(self): 731 def gen(): 732 when = yield 733 self.assertAlmostEqual(0, when) 734 735 loop = self.new_test_loop(gen) 736 737 foo_started = False 738 739 async def foo(): 740 nonlocal foo_started 741 foo_started = True 742 743 with self.assertRaises(asyncio.TimeoutError): 744 loop.run_until_complete(asyncio.wait_for(foo(), 0)) 745 746 self.assertAlmostEqual(0, loop.time()) 747 self.assertEqual(foo_started, False) 748 749 def test_wait_for_timeout_less_then_0_or_0(self): 750 def gen(): 751 when = yield 752 self.assertAlmostEqual(0.2, when) 753 when = yield 0 754 self.assertAlmostEqual(0, when) 755 756 for timeout in [0, -1]: 757 with self.subTest(timeout=timeout): 758 loop = self.new_test_loop(gen) 759 760 foo_running = None 761 762 async def foo(): 763 nonlocal foo_running 764 foo_running = True 765 try: 766 await asyncio.sleep(0.2) 767 finally: 768 foo_running = False 769 return 'done' 770 771 fut = self.new_task(loop, foo()) 772 773 with self.assertRaises(asyncio.TimeoutError): 774 loop.run_until_complete(asyncio.wait_for(fut, timeout)) 775 self.assertTrue(fut.done()) 776 # it should have been cancelled due to the timeout 777 self.assertTrue(fut.cancelled()) 778 self.assertAlmostEqual(0, loop.time()) 779 self.assertEqual(foo_running, False) 780 781 def test_wait_for(self): 782 783 def gen(): 784 when = yield 785 self.assertAlmostEqual(0.2, when) 786 when = yield 0 787 self.assertAlmostEqual(0.1, when) 788 when = yield 0.1 789 790 loop = self.new_test_loop(gen) 791 792 foo_running = None 793 794 async def foo(): 795 nonlocal foo_running 796 foo_running = True 797 try: 798 await asyncio.sleep(0.2) 799 finally: 800 foo_running = False 801 return 'done' 802 803 fut = self.new_task(loop, foo()) 804 805 with self.assertRaises(asyncio.TimeoutError): 806 loop.run_until_complete(asyncio.wait_for(fut, 0.1)) 807 self.assertTrue(fut.done()) 808 # it should have been cancelled due to the timeout 809 self.assertTrue(fut.cancelled()) 810 self.assertAlmostEqual(0.1, loop.time()) 811 self.assertEqual(foo_running, False) 812 813 def test_wait_for_blocking(self): 814 loop = self.new_test_loop() 815 816 async def coro(): 817 return 'done' 818 819 res = loop.run_until_complete(asyncio.wait_for(coro(), timeout=None)) 820 self.assertEqual(res, 'done') 821 822 def test_wait_for_with_global_loop(self): 823 824 def gen(): 825 when = yield 826 self.assertAlmostEqual(0.2, when) 827 when = yield 0 828 self.assertAlmostEqual(0.01, when) 829 yield 0.01 830 831 loop = self.new_test_loop(gen) 832 833 async def foo(): 834 await asyncio.sleep(0.2) 835 return 'done' 836 837 asyncio.set_event_loop(loop) 838 try: 839 fut = self.new_task(loop, foo()) 840 with self.assertRaises(asyncio.TimeoutError): 841 loop.run_until_complete(asyncio.wait_for(fut, 0.01)) 842 finally: 843 asyncio.set_event_loop(None) 844 845 self.assertAlmostEqual(0.01, loop.time()) 846 self.assertTrue(fut.done()) 847 self.assertTrue(fut.cancelled()) 848 849 def test_wait_for_race_condition(self): 850 851 def gen(): 852 yield 0.1 853 yield 0.1 854 yield 0.1 855 856 loop = self.new_test_loop(gen) 857 858 fut = self.new_future(loop) 859 task = asyncio.wait_for(fut, timeout=0.2) 860 loop.call_later(0.1, fut.set_result, "ok") 861 res = loop.run_until_complete(task) 862 self.assertEqual(res, "ok") 863 864 def test_wait_for_cancellation_race_condition(self): 865 def gen(): 866 yield 0.1 867 yield 0.1 868 yield 0.1 869 yield 0.1 870 871 loop = self.new_test_loop(gen) 872 873 fut = self.new_future(loop) 874 loop.call_later(0.1, fut.set_result, "ok") 875 task = loop.create_task(asyncio.wait_for(fut, timeout=1)) 876 loop.call_later(0.1, task.cancel) 877 res = loop.run_until_complete(task) 878 self.assertEqual(res, "ok") 879 880 def test_wait_for_waits_for_task_cancellation(self): 881 loop = asyncio.new_event_loop() 882 self.addCleanup(loop.close) 883 884 task_done = False 885 886 async def foo(): 887 async def inner(): 888 nonlocal task_done 889 try: 890 await asyncio.sleep(0.2) 891 except asyncio.CancelledError: 892 await asyncio.sleep(0.1) 893 raise 894 finally: 895 task_done = True 896 897 inner_task = self.new_task(loop, inner()) 898 899 with self.assertRaises(asyncio.TimeoutError): 900 await asyncio.wait_for(inner_task, timeout=0.1) 901 902 self.assertTrue(task_done) 903 904 loop.run_until_complete(foo()) 905 906 def test_wait_for_waits_for_task_cancellation_w_timeout_0(self): 907 loop = asyncio.new_event_loop() 908 self.addCleanup(loop.close) 909 910 task_done = False 911 912 async def foo(): 913 async def inner(): 914 nonlocal task_done 915 try: 916 await asyncio.sleep(10) 917 except asyncio.CancelledError: 918 await asyncio.sleep(0.1) 919 raise 920 finally: 921 task_done = True 922 923 inner_task = self.new_task(loop, inner()) 924 await asyncio.sleep(0.1) 925 await asyncio.wait_for(inner_task, timeout=0) 926 927 with self.assertRaises(asyncio.TimeoutError) as cm: 928 loop.run_until_complete(foo()) 929 930 self.assertTrue(task_done) 931 chained = cm.exception.__context__ 932 self.assertEqual(type(chained), asyncio.CancelledError) 933 934 def test_wait_for_self_cancellation(self): 935 loop = asyncio.new_event_loop() 936 self.addCleanup(loop.close) 937 938 async def foo(): 939 async def inner(): 940 try: 941 await asyncio.sleep(0.3) 942 except asyncio.CancelledError: 943 try: 944 await asyncio.sleep(0.3) 945 except asyncio.CancelledError: 946 await asyncio.sleep(0.3) 947 948 return 42 949 950 inner_task = self.new_task(loop, inner()) 951 952 wait = asyncio.wait_for(inner_task, timeout=0.1) 953 954 # Test that wait_for itself is properly cancellable 955 # even when the initial task holds up the initial cancellation. 956 task = self.new_task(loop, wait) 957 await asyncio.sleep(0.2) 958 task.cancel() 959 960 with self.assertRaises(asyncio.CancelledError): 961 await task 962 963 self.assertEqual(await inner_task, 42) 964 965 loop.run_until_complete(foo()) 966 967 def test_wait(self): 968 969 def gen(): 970 when = yield 971 self.assertAlmostEqual(0.1, when) 972 when = yield 0 973 self.assertAlmostEqual(0.15, when) 974 yield 0.15 975 976 loop = self.new_test_loop(gen) 977 978 a = self.new_task(loop, asyncio.sleep(0.1)) 979 b = self.new_task(loop, asyncio.sleep(0.15)) 980 981 async def foo(): 982 done, pending = await asyncio.wait([b, a]) 983 self.assertEqual(done, set([a, b])) 984 self.assertEqual(pending, set()) 985 return 42 986 987 res = loop.run_until_complete(self.new_task(loop, foo())) 988 self.assertEqual(res, 42) 989 self.assertAlmostEqual(0.15, loop.time()) 990 991 # Doing it again should take no time and exercise a different path. 992 res = loop.run_until_complete(self.new_task(loop, foo())) 993 self.assertAlmostEqual(0.15, loop.time()) 994 self.assertEqual(res, 42) 995 996 def test_wait_with_global_loop(self): 997 998 def gen(): 999 when = yield 1000 self.assertAlmostEqual(0.01, when) 1001 when = yield 0 1002 self.assertAlmostEqual(0.015, when) 1003 yield 0.015 1004 1005 loop = self.new_test_loop(gen) 1006 1007 a = self.new_task(loop, asyncio.sleep(0.01)) 1008 b = self.new_task(loop, asyncio.sleep(0.015)) 1009 1010 async def foo(): 1011 done, pending = await asyncio.wait([b, a]) 1012 self.assertEqual(done, set([a, b])) 1013 self.assertEqual(pending, set()) 1014 return 42 1015 1016 asyncio.set_event_loop(loop) 1017 res = loop.run_until_complete( 1018 self.new_task(loop, foo())) 1019 1020 self.assertEqual(res, 42) 1021 1022 def test_wait_duplicate_coroutines(self): 1023 1024 with self.assertWarns(DeprecationWarning): 1025 @asyncio.coroutine 1026 def coro(s): 1027 return s 1028 c = coro('test') 1029 1030 task =self.new_task( 1031 self.loop, 1032 asyncio.wait([c, c, coro('spam')])) 1033 1034 done, pending = self.loop.run_until_complete(task) 1035 1036 self.assertFalse(pending) 1037 self.assertEqual(set(f.result() for f in done), {'test', 'spam'}) 1038 1039 def test_wait_errors(self): 1040 self.assertRaises( 1041 ValueError, self.loop.run_until_complete, 1042 asyncio.wait(set())) 1043 1044 # -1 is an invalid return_when value 1045 sleep_coro = asyncio.sleep(10.0) 1046 wait_coro = asyncio.wait([sleep_coro], return_when=-1) 1047 self.assertRaises(ValueError, 1048 self.loop.run_until_complete, wait_coro) 1049 1050 sleep_coro.close() 1051 1052 def test_wait_first_completed(self): 1053 1054 def gen(): 1055 when = yield 1056 self.assertAlmostEqual(10.0, when) 1057 when = yield 0 1058 self.assertAlmostEqual(0.1, when) 1059 yield 0.1 1060 1061 loop = self.new_test_loop(gen) 1062 1063 a = self.new_task(loop, asyncio.sleep(10.0)) 1064 b = self.new_task(loop, asyncio.sleep(0.1)) 1065 task = self.new_task( 1066 loop, 1067 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1068 1069 done, pending = loop.run_until_complete(task) 1070 self.assertEqual({b}, done) 1071 self.assertEqual({a}, pending) 1072 self.assertFalse(a.done()) 1073 self.assertTrue(b.done()) 1074 self.assertIsNone(b.result()) 1075 self.assertAlmostEqual(0.1, loop.time()) 1076 1077 # move forward to close generator 1078 loop.advance_time(10) 1079 loop.run_until_complete(asyncio.wait([a, b])) 1080 1081 def test_wait_really_done(self): 1082 # there is possibility that some tasks in the pending list 1083 # became done but their callbacks haven't all been called yet 1084 1085 async def coro1(): 1086 await asyncio.sleep(0) 1087 1088 async def coro2(): 1089 await asyncio.sleep(0) 1090 await asyncio.sleep(0) 1091 1092 a = self.new_task(self.loop, coro1()) 1093 b = self.new_task(self.loop, coro2()) 1094 task = self.new_task( 1095 self.loop, 1096 asyncio.wait([b, a], return_when=asyncio.FIRST_COMPLETED)) 1097 1098 done, pending = self.loop.run_until_complete(task) 1099 self.assertEqual({a, b}, done) 1100 self.assertTrue(a.done()) 1101 self.assertIsNone(a.result()) 1102 self.assertTrue(b.done()) 1103 self.assertIsNone(b.result()) 1104 1105 def test_wait_first_exception(self): 1106 1107 def gen(): 1108 when = yield 1109 self.assertAlmostEqual(10.0, when) 1110 yield 0 1111 1112 loop = self.new_test_loop(gen) 1113 1114 # first_exception, task already has exception 1115 a = self.new_task(loop, asyncio.sleep(10.0)) 1116 1117 async def exc(): 1118 raise ZeroDivisionError('err') 1119 1120 b = self.new_task(loop, exc()) 1121 task = self.new_task( 1122 loop, 1123 asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION)) 1124 1125 done, pending = loop.run_until_complete(task) 1126 self.assertEqual({b}, done) 1127 self.assertEqual({a}, pending) 1128 self.assertAlmostEqual(0, loop.time()) 1129 1130 # move forward to close generator 1131 loop.advance_time(10) 1132 loop.run_until_complete(asyncio.wait([a, b])) 1133 1134 def test_wait_first_exception_in_wait(self): 1135 1136 def gen(): 1137 when = yield 1138 self.assertAlmostEqual(10.0, when) 1139 when = yield 0 1140 self.assertAlmostEqual(0.01, when) 1141 yield 0.01 1142 1143 loop = self.new_test_loop(gen) 1144 1145 # first_exception, exception during waiting 1146 a = self.new_task(loop, asyncio.sleep(10.0)) 1147 1148 async def exc(): 1149 await asyncio.sleep(0.01) 1150 raise ZeroDivisionError('err') 1151 1152 b = self.new_task(loop, exc()) 1153 task = asyncio.wait([b, a], return_when=asyncio.FIRST_EXCEPTION) 1154 1155 done, pending = loop.run_until_complete(task) 1156 self.assertEqual({b}, done) 1157 self.assertEqual({a}, pending) 1158 self.assertAlmostEqual(0.01, loop.time()) 1159 1160 # move forward to close generator 1161 loop.advance_time(10) 1162 loop.run_until_complete(asyncio.wait([a, b])) 1163 1164 def test_wait_with_exception(self): 1165 1166 def gen(): 1167 when = yield 1168 self.assertAlmostEqual(0.1, when) 1169 when = yield 0 1170 self.assertAlmostEqual(0.15, when) 1171 yield 0.15 1172 1173 loop = self.new_test_loop(gen) 1174 1175 a = self.new_task(loop, asyncio.sleep(0.1)) 1176 1177 async def sleeper(): 1178 await asyncio.sleep(0.15) 1179 raise ZeroDivisionError('really') 1180 1181 b = self.new_task(loop, sleeper()) 1182 1183 async def foo(): 1184 done, pending = await asyncio.wait([b, a]) 1185 self.assertEqual(len(done), 2) 1186 self.assertEqual(pending, set()) 1187 errors = set(f for f in done if f.exception() is not None) 1188 self.assertEqual(len(errors), 1) 1189 1190 loop.run_until_complete(self.new_task(loop, foo())) 1191 self.assertAlmostEqual(0.15, loop.time()) 1192 1193 loop.run_until_complete(self.new_task(loop, foo())) 1194 self.assertAlmostEqual(0.15, loop.time()) 1195 1196 def test_wait_with_timeout(self): 1197 1198 def gen(): 1199 when = yield 1200 self.assertAlmostEqual(0.1, when) 1201 when = yield 0 1202 self.assertAlmostEqual(0.15, when) 1203 when = yield 0 1204 self.assertAlmostEqual(0.11, when) 1205 yield 0.11 1206 1207 loop = self.new_test_loop(gen) 1208 1209 a = self.new_task(loop, asyncio.sleep(0.1)) 1210 b = self.new_task(loop, asyncio.sleep(0.15)) 1211 1212 async def foo(): 1213 done, pending = await asyncio.wait([b, a], timeout=0.11) 1214 self.assertEqual(done, set([a])) 1215 self.assertEqual(pending, set([b])) 1216 1217 loop.run_until_complete(self.new_task(loop, foo())) 1218 self.assertAlmostEqual(0.11, loop.time()) 1219 1220 # move forward to close generator 1221 loop.advance_time(10) 1222 loop.run_until_complete(asyncio.wait([a, b])) 1223 1224 def test_wait_concurrent_complete(self): 1225 1226 def gen(): 1227 when = yield 1228 self.assertAlmostEqual(0.1, when) 1229 when = yield 0 1230 self.assertAlmostEqual(0.15, when) 1231 when = yield 0 1232 self.assertAlmostEqual(0.1, when) 1233 yield 0.1 1234 1235 loop = self.new_test_loop(gen) 1236 1237 a = self.new_task(loop, asyncio.sleep(0.1)) 1238 b = self.new_task(loop, asyncio.sleep(0.15)) 1239 1240 done, pending = loop.run_until_complete( 1241 asyncio.wait([b, a], timeout=0.1)) 1242 1243 self.assertEqual(done, set([a])) 1244 self.assertEqual(pending, set([b])) 1245 self.assertAlmostEqual(0.1, loop.time()) 1246 1247 # move forward to close generator 1248 loop.advance_time(10) 1249 loop.run_until_complete(asyncio.wait([a, b])) 1250 1251 def test_as_completed(self): 1252 1253 def gen(): 1254 yield 0 1255 yield 0 1256 yield 0.01 1257 yield 0 1258 1259 loop = self.new_test_loop(gen) 1260 # disable "slow callback" warning 1261 loop.slow_callback_duration = 1.0 1262 completed = set() 1263 time_shifted = False 1264 1265 with self.assertWarns(DeprecationWarning): 1266 @asyncio.coroutine 1267 def sleeper(dt, x): 1268 nonlocal time_shifted 1269 yield from asyncio.sleep(dt) 1270 completed.add(x) 1271 if not time_shifted and 'a' in completed and 'b' in completed: 1272 time_shifted = True 1273 loop.advance_time(0.14) 1274 return x 1275 1276 a = sleeper(0.01, 'a') 1277 b = sleeper(0.01, 'b') 1278 c = sleeper(0.15, 'c') 1279 1280 async def foo(): 1281 values = [] 1282 for f in asyncio.as_completed([b, c, a], loop=loop): 1283 values.append(await f) 1284 return values 1285 with self.assertWarns(DeprecationWarning): 1286 res = loop.run_until_complete(self.new_task(loop, foo())) 1287 self.assertAlmostEqual(0.15, loop.time()) 1288 self.assertTrue('a' in res[:2]) 1289 self.assertTrue('b' in res[:2]) 1290 self.assertEqual(res[2], 'c') 1291 1292 # Doing it again should take no time and exercise a different path. 1293 with self.assertWarns(DeprecationWarning): 1294 res = loop.run_until_complete(self.new_task(loop, foo())) 1295 self.assertAlmostEqual(0.15, loop.time()) 1296 1297 def test_as_completed_with_timeout(self): 1298 1299 def gen(): 1300 yield 1301 yield 0 1302 yield 0 1303 yield 0.1 1304 1305 loop = self.new_test_loop(gen) 1306 1307 a = loop.create_task(asyncio.sleep(0.1, 'a')) 1308 b = loop.create_task(asyncio.sleep(0.15, 'b')) 1309 1310 async def foo(): 1311 values = [] 1312 for f in asyncio.as_completed([a, b], timeout=0.12, loop=loop): 1313 if values: 1314 loop.advance_time(0.02) 1315 try: 1316 v = await f 1317 values.append((1, v)) 1318 except asyncio.TimeoutError as exc: 1319 values.append((2, exc)) 1320 return values 1321 1322 with self.assertWarns(DeprecationWarning): 1323 res = loop.run_until_complete(self.new_task(loop, foo())) 1324 self.assertEqual(len(res), 2, res) 1325 self.assertEqual(res[0], (1, 'a')) 1326 self.assertEqual(res[1][0], 2) 1327 self.assertIsInstance(res[1][1], asyncio.TimeoutError) 1328 self.assertAlmostEqual(0.12, loop.time()) 1329 1330 # move forward to close generator 1331 loop.advance_time(10) 1332 loop.run_until_complete(asyncio.wait([a, b])) 1333 1334 def test_as_completed_with_unused_timeout(self): 1335 1336 def gen(): 1337 yield 1338 yield 0 1339 yield 0.01 1340 1341 loop = self.new_test_loop(gen) 1342 1343 a = asyncio.sleep(0.01, 'a') 1344 1345 async def foo(): 1346 for f in asyncio.as_completed([a], timeout=1, loop=loop): 1347 v = await f 1348 self.assertEqual(v, 'a') 1349 1350 with self.assertWarns(DeprecationWarning): 1351 loop.run_until_complete(self.new_task(loop, foo())) 1352 1353 def test_as_completed_reverse_wait(self): 1354 1355 def gen(): 1356 yield 0 1357 yield 0.05 1358 yield 0 1359 1360 loop = self.new_test_loop(gen) 1361 1362 a = asyncio.sleep(0.05, 'a') 1363 b = asyncio.sleep(0.10, 'b') 1364 fs = {a, b} 1365 1366 with self.assertWarns(DeprecationWarning): 1367 futs = list(asyncio.as_completed(fs, loop=loop)) 1368 self.assertEqual(len(futs), 2) 1369 1370 x = loop.run_until_complete(futs[1]) 1371 self.assertEqual(x, 'a') 1372 self.assertAlmostEqual(0.05, loop.time()) 1373 loop.advance_time(0.05) 1374 y = loop.run_until_complete(futs[0]) 1375 self.assertEqual(y, 'b') 1376 self.assertAlmostEqual(0.10, loop.time()) 1377 1378 def test_as_completed_concurrent(self): 1379 1380 def gen(): 1381 when = yield 1382 self.assertAlmostEqual(0.05, when) 1383 when = yield 0 1384 self.assertAlmostEqual(0.05, when) 1385 yield 0.05 1386 1387 loop = self.new_test_loop(gen) 1388 1389 a = asyncio.sleep(0.05, 'a') 1390 b = asyncio.sleep(0.05, 'b') 1391 fs = {a, b} 1392 with self.assertWarns(DeprecationWarning): 1393 futs = list(asyncio.as_completed(fs, loop=loop)) 1394 self.assertEqual(len(futs), 2) 1395 waiter = asyncio.wait(futs) 1396 done, pending = loop.run_until_complete(waiter) 1397 self.assertEqual(set(f.result() for f in done), {'a', 'b'}) 1398 1399 def test_as_completed_duplicate_coroutines(self): 1400 1401 with self.assertWarns(DeprecationWarning): 1402 @asyncio.coroutine 1403 def coro(s): 1404 return s 1405 1406 with self.assertWarns(DeprecationWarning): 1407 @asyncio.coroutine 1408 def runner(): 1409 result = [] 1410 c = coro('ham') 1411 for f in asyncio.as_completed([c, c, coro('spam')], 1412 loop=self.loop): 1413 result.append((yield from f)) 1414 return result 1415 1416 with self.assertWarns(DeprecationWarning): 1417 fut = self.new_task(self.loop, runner()) 1418 self.loop.run_until_complete(fut) 1419 result = fut.result() 1420 self.assertEqual(set(result), {'ham', 'spam'}) 1421 self.assertEqual(len(result), 2) 1422 1423 def test_sleep(self): 1424 1425 def gen(): 1426 when = yield 1427 self.assertAlmostEqual(0.05, when) 1428 when = yield 0.05 1429 self.assertAlmostEqual(0.1, when) 1430 yield 0.05 1431 1432 loop = self.new_test_loop(gen) 1433 1434 async def sleeper(dt, arg): 1435 await asyncio.sleep(dt/2) 1436 res = await asyncio.sleep(dt/2, arg) 1437 return res 1438 1439 t = self.new_task(loop, sleeper(0.1, 'yeah')) 1440 loop.run_until_complete(t) 1441 self.assertTrue(t.done()) 1442 self.assertEqual(t.result(), 'yeah') 1443 self.assertAlmostEqual(0.1, loop.time()) 1444 1445 def test_sleep_cancel(self): 1446 1447 def gen(): 1448 when = yield 1449 self.assertAlmostEqual(10.0, when) 1450 yield 0 1451 1452 loop = self.new_test_loop(gen) 1453 1454 t = self.new_task(loop, asyncio.sleep(10.0, 'yeah')) 1455 1456 handle = None 1457 orig_call_later = loop.call_later 1458 1459 def call_later(delay, callback, *args): 1460 nonlocal handle 1461 handle = orig_call_later(delay, callback, *args) 1462 return handle 1463 1464 loop.call_later = call_later 1465 test_utils.run_briefly(loop) 1466 1467 self.assertFalse(handle._cancelled) 1468 1469 t.cancel() 1470 test_utils.run_briefly(loop) 1471 self.assertTrue(handle._cancelled) 1472 1473 def test_task_cancel_sleeping_task(self): 1474 1475 def gen(): 1476 when = yield 1477 self.assertAlmostEqual(0.1, when) 1478 when = yield 0 1479 self.assertAlmostEqual(5000, when) 1480 yield 0.1 1481 1482 loop = self.new_test_loop(gen) 1483 1484 async def sleep(dt): 1485 await asyncio.sleep(dt) 1486 1487 async def doit(): 1488 sleeper = self.new_task(loop, sleep(5000)) 1489 loop.call_later(0.1, sleeper.cancel) 1490 try: 1491 await sleeper 1492 except asyncio.CancelledError: 1493 return 'cancelled' 1494 else: 1495 return 'slept in' 1496 1497 doer = doit() 1498 self.assertEqual(loop.run_until_complete(doer), 'cancelled') 1499 self.assertAlmostEqual(0.1, loop.time()) 1500 1501 def test_task_cancel_waiter_future(self): 1502 fut = self.new_future(self.loop) 1503 1504 async def coro(): 1505 await fut 1506 1507 task = self.new_task(self.loop, coro()) 1508 test_utils.run_briefly(self.loop) 1509 self.assertIs(task._fut_waiter, fut) 1510 1511 task.cancel() 1512 test_utils.run_briefly(self.loop) 1513 self.assertRaises( 1514 asyncio.CancelledError, self.loop.run_until_complete, task) 1515 self.assertIsNone(task._fut_waiter) 1516 self.assertTrue(fut.cancelled()) 1517 1518 def test_task_set_methods(self): 1519 async def notmuch(): 1520 return 'ko' 1521 1522 gen = notmuch() 1523 task = self.new_task(self.loop, gen) 1524 1525 with self.assertRaisesRegex(RuntimeError, 'not support set_result'): 1526 task.set_result('ok') 1527 1528 with self.assertRaisesRegex(RuntimeError, 'not support set_exception'): 1529 task.set_exception(ValueError()) 1530 1531 self.assertEqual( 1532 self.loop.run_until_complete(task), 1533 'ko') 1534 1535 def test_step_result(self): 1536 with self.assertWarns(DeprecationWarning): 1537 @asyncio.coroutine 1538 def notmuch(): 1539 yield None 1540 yield 1 1541 return 'ko' 1542 1543 self.assertRaises( 1544 RuntimeError, self.loop.run_until_complete, notmuch()) 1545 1546 def test_step_result_future(self): 1547 # If coroutine returns future, task waits on this future. 1548 1549 class Fut(asyncio.Future): 1550 def __init__(self, *args, **kwds): 1551 self.cb_added = False 1552 super().__init__(*args, **kwds) 1553 1554 def add_done_callback(self, *args, **kwargs): 1555 self.cb_added = True 1556 super().add_done_callback(*args, **kwargs) 1557 1558 fut = Fut(loop=self.loop) 1559 result = None 1560 1561 async def wait_for_future(): 1562 nonlocal result 1563 result = await fut 1564 1565 t = self.new_task(self.loop, wait_for_future()) 1566 test_utils.run_briefly(self.loop) 1567 self.assertTrue(fut.cb_added) 1568 1569 res = object() 1570 fut.set_result(res) 1571 test_utils.run_briefly(self.loop) 1572 self.assertIs(res, result) 1573 self.assertTrue(t.done()) 1574 self.assertIsNone(t.result()) 1575 1576 def test_baseexception_during_cancel(self): 1577 1578 def gen(): 1579 when = yield 1580 self.assertAlmostEqual(10.0, when) 1581 yield 0 1582 1583 loop = self.new_test_loop(gen) 1584 1585 async def sleeper(): 1586 await asyncio.sleep(10) 1587 1588 base_exc = SystemExit() 1589 1590 async def notmutch(): 1591 try: 1592 await sleeper() 1593 except asyncio.CancelledError: 1594 raise base_exc 1595 1596 task = self.new_task(loop, notmutch()) 1597 test_utils.run_briefly(loop) 1598 1599 task.cancel() 1600 self.assertFalse(task.done()) 1601 1602 self.assertRaises(SystemExit, test_utils.run_briefly, loop) 1603 1604 self.assertTrue(task.done()) 1605 self.assertFalse(task.cancelled()) 1606 self.assertIs(task.exception(), base_exc) 1607 1608 def test_iscoroutinefunction(self): 1609 def fn(): 1610 pass 1611 1612 self.assertFalse(asyncio.iscoroutinefunction(fn)) 1613 1614 def fn1(): 1615 yield 1616 self.assertFalse(asyncio.iscoroutinefunction(fn1)) 1617 1618 with self.assertWarns(DeprecationWarning): 1619 @asyncio.coroutine 1620 def fn2(): 1621 yield 1622 self.assertTrue(asyncio.iscoroutinefunction(fn2)) 1623 1624 self.assertFalse(asyncio.iscoroutinefunction(mock.Mock())) 1625 1626 def test_yield_vs_yield_from(self): 1627 fut = self.new_future(self.loop) 1628 1629 with self.assertWarns(DeprecationWarning): 1630 @asyncio.coroutine 1631 def wait_for_future(): 1632 yield fut 1633 1634 task = wait_for_future() 1635 with self.assertRaises(RuntimeError): 1636 self.loop.run_until_complete(task) 1637 1638 self.assertFalse(fut.done()) 1639 1640 def test_yield_vs_yield_from_generator(self): 1641 with self.assertWarns(DeprecationWarning): 1642 @asyncio.coroutine 1643 def coro(): 1644 yield 1645 1646 with self.assertWarns(DeprecationWarning): 1647 @asyncio.coroutine 1648 def wait_for_future(): 1649 gen = coro() 1650 try: 1651 yield gen 1652 finally: 1653 gen.close() 1654 1655 task = wait_for_future() 1656 self.assertRaises( 1657 RuntimeError, 1658 self.loop.run_until_complete, task) 1659 1660 def test_coroutine_non_gen_function(self): 1661 with self.assertWarns(DeprecationWarning): 1662 @asyncio.coroutine 1663 def func(): 1664 return 'test' 1665 1666 self.assertTrue(asyncio.iscoroutinefunction(func)) 1667 1668 coro = func() 1669 self.assertTrue(asyncio.iscoroutine(coro)) 1670 1671 res = self.loop.run_until_complete(coro) 1672 self.assertEqual(res, 'test') 1673 1674 def test_coroutine_non_gen_function_return_future(self): 1675 fut = self.new_future(self.loop) 1676 1677 with self.assertWarns(DeprecationWarning): 1678 @asyncio.coroutine 1679 def func(): 1680 return fut 1681 1682 async def coro(): 1683 fut.set_result('test') 1684 1685 t1 = self.new_task(self.loop, func()) 1686 t2 = self.new_task(self.loop, coro()) 1687 res = self.loop.run_until_complete(t1) 1688 self.assertEqual(res, 'test') 1689 self.assertIsNone(t2.result()) 1690 1691 1692 def test_current_task_deprecated(self): 1693 Task = self.__class__.Task 1694 1695 with self.assertWarns(DeprecationWarning): 1696 self.assertIsNone(Task.current_task(loop=self.loop)) 1697 1698 async def coro(loop): 1699 with self.assertWarns(DeprecationWarning): 1700 self.assertIs(Task.current_task(loop=loop), task) 1701 1702 # See http://bugs.python.org/issue29271 for details: 1703 asyncio.set_event_loop(loop) 1704 try: 1705 with self.assertWarns(DeprecationWarning): 1706 self.assertIs(Task.current_task(None), task) 1707 with self.assertWarns(DeprecationWarning): 1708 self.assertIs(Task.current_task(), task) 1709 finally: 1710 asyncio.set_event_loop(None) 1711 1712 task = self.new_task(self.loop, coro(self.loop)) 1713 self.loop.run_until_complete(task) 1714 with self.assertWarns(DeprecationWarning): 1715 self.assertIsNone(Task.current_task(loop=self.loop)) 1716 1717 def test_current_task(self): 1718 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1719 1720 async def coro(loop): 1721 self.assertIs(asyncio.current_task(loop=loop), task) 1722 1723 self.assertIs(asyncio.current_task(None), task) 1724 self.assertIs(asyncio.current_task(), task) 1725 1726 task = self.new_task(self.loop, coro(self.loop)) 1727 self.loop.run_until_complete(task) 1728 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1729 1730 def test_current_task_with_interleaving_tasks(self): 1731 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1732 1733 fut1 = self.new_future(self.loop) 1734 fut2 = self.new_future(self.loop) 1735 1736 async def coro1(loop): 1737 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1738 await fut1 1739 self.assertTrue(asyncio.current_task(loop=loop) is task1) 1740 fut2.set_result(True) 1741 1742 async def coro2(loop): 1743 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1744 fut1.set_result(True) 1745 await fut2 1746 self.assertTrue(asyncio.current_task(loop=loop) is task2) 1747 1748 task1 = self.new_task(self.loop, coro1(self.loop)) 1749 task2 = self.new_task(self.loop, coro2(self.loop)) 1750 1751 self.loop.run_until_complete(asyncio.wait((task1, task2))) 1752 self.assertIsNone(asyncio.current_task(loop=self.loop)) 1753 1754 # Some thorough tests for cancellation propagation through 1755 # coroutines, tasks and wait(). 1756 1757 def test_yield_future_passes_cancel(self): 1758 # Cancelling outer() cancels inner() cancels waiter. 1759 proof = 0 1760 waiter = self.new_future(self.loop) 1761 1762 async def inner(): 1763 nonlocal proof 1764 try: 1765 await waiter 1766 except asyncio.CancelledError: 1767 proof += 1 1768 raise 1769 else: 1770 self.fail('got past sleep() in inner()') 1771 1772 async def outer(): 1773 nonlocal proof 1774 try: 1775 await inner() 1776 except asyncio.CancelledError: 1777 proof += 100 # Expect this path. 1778 else: 1779 proof += 10 1780 1781 f = asyncio.ensure_future(outer(), loop=self.loop) 1782 test_utils.run_briefly(self.loop) 1783 f.cancel() 1784 self.loop.run_until_complete(f) 1785 self.assertEqual(proof, 101) 1786 self.assertTrue(waiter.cancelled()) 1787 1788 def test_yield_wait_does_not_shield_cancel(self): 1789 # Cancelling outer() makes wait() return early, leaves inner() 1790 # running. 1791 proof = 0 1792 waiter = self.new_future(self.loop) 1793 1794 async def inner(): 1795 nonlocal proof 1796 await waiter 1797 proof += 1 1798 1799 async def outer(): 1800 nonlocal proof 1801 d, p = await asyncio.wait([inner()]) 1802 proof += 100 1803 1804 f = asyncio.ensure_future(outer(), loop=self.loop) 1805 test_utils.run_briefly(self.loop) 1806 f.cancel() 1807 self.assertRaises( 1808 asyncio.CancelledError, self.loop.run_until_complete, f) 1809 waiter.set_result(None) 1810 test_utils.run_briefly(self.loop) 1811 self.assertEqual(proof, 1) 1812 1813 def test_shield_result(self): 1814 inner = self.new_future(self.loop) 1815 outer = asyncio.shield(inner) 1816 inner.set_result(42) 1817 res = self.loop.run_until_complete(outer) 1818 self.assertEqual(res, 42) 1819 1820 def test_shield_exception(self): 1821 inner = self.new_future(self.loop) 1822 outer = asyncio.shield(inner) 1823 test_utils.run_briefly(self.loop) 1824 exc = RuntimeError('expected') 1825 inner.set_exception(exc) 1826 test_utils.run_briefly(self.loop) 1827 self.assertIs(outer.exception(), exc) 1828 1829 def test_shield_cancel_inner(self): 1830 inner = self.new_future(self.loop) 1831 outer = asyncio.shield(inner) 1832 test_utils.run_briefly(self.loop) 1833 inner.cancel() 1834 test_utils.run_briefly(self.loop) 1835 self.assertTrue(outer.cancelled()) 1836 1837 def test_shield_cancel_outer(self): 1838 inner = self.new_future(self.loop) 1839 outer = asyncio.shield(inner) 1840 test_utils.run_briefly(self.loop) 1841 outer.cancel() 1842 test_utils.run_briefly(self.loop) 1843 self.assertTrue(outer.cancelled()) 1844 self.assertEqual(0, 0 if outer._callbacks is None else len(outer._callbacks)) 1845 1846 def test_shield_shortcut(self): 1847 fut = self.new_future(self.loop) 1848 fut.set_result(42) 1849 res = self.loop.run_until_complete(asyncio.shield(fut)) 1850 self.assertEqual(res, 42) 1851 1852 def test_shield_effect(self): 1853 # Cancelling outer() does not affect inner(). 1854 proof = 0 1855 waiter = self.new_future(self.loop) 1856 1857 async def inner(): 1858 nonlocal proof 1859 await waiter 1860 proof += 1 1861 1862 async def outer(): 1863 nonlocal proof 1864 await asyncio.shield(inner()) 1865 proof += 100 1866 1867 f = asyncio.ensure_future(outer(), loop=self.loop) 1868 test_utils.run_briefly(self.loop) 1869 f.cancel() 1870 with self.assertRaises(asyncio.CancelledError): 1871 self.loop.run_until_complete(f) 1872 waiter.set_result(None) 1873 test_utils.run_briefly(self.loop) 1874 self.assertEqual(proof, 1) 1875 1876 def test_shield_gather(self): 1877 child1 = self.new_future(self.loop) 1878 child2 = self.new_future(self.loop) 1879 parent = asyncio.gather(child1, child2) 1880 outer = asyncio.shield(parent) 1881 test_utils.run_briefly(self.loop) 1882 outer.cancel() 1883 test_utils.run_briefly(self.loop) 1884 self.assertTrue(outer.cancelled()) 1885 child1.set_result(1) 1886 child2.set_result(2) 1887 test_utils.run_briefly(self.loop) 1888 self.assertEqual(parent.result(), [1, 2]) 1889 1890 def test_gather_shield(self): 1891 child1 = self.new_future(self.loop) 1892 child2 = self.new_future(self.loop) 1893 inner1 = asyncio.shield(child1) 1894 inner2 = asyncio.shield(child2) 1895 parent = asyncio.gather(inner1, inner2) 1896 test_utils.run_briefly(self.loop) 1897 parent.cancel() 1898 # This should cancel inner1 and inner2 but bot child1 and child2. 1899 test_utils.run_briefly(self.loop) 1900 self.assertIsInstance(parent.exception(), asyncio.CancelledError) 1901 self.assertTrue(inner1.cancelled()) 1902 self.assertTrue(inner2.cancelled()) 1903 child1.set_result(1) 1904 child2.set_result(2) 1905 test_utils.run_briefly(self.loop) 1906 1907 def test_as_completed_invalid_args(self): 1908 fut = self.new_future(self.loop) 1909 1910 # as_completed() expects a list of futures, not a future instance 1911 self.assertRaises(TypeError, self.loop.run_until_complete, 1912 asyncio.as_completed(fut, loop=self.loop)) 1913 coro = coroutine_function() 1914 self.assertRaises(TypeError, self.loop.run_until_complete, 1915 asyncio.as_completed(coro, loop=self.loop)) 1916 coro.close() 1917 1918 def test_wait_invalid_args(self): 1919 fut = self.new_future(self.loop) 1920 1921 # wait() expects a list of futures, not a future instance 1922 self.assertRaises(TypeError, self.loop.run_until_complete, 1923 asyncio.wait(fut)) 1924 coro = coroutine_function() 1925 self.assertRaises(TypeError, self.loop.run_until_complete, 1926 asyncio.wait(coro)) 1927 coro.close() 1928 1929 # wait() expects at least a future 1930 self.assertRaises(ValueError, self.loop.run_until_complete, 1931 asyncio.wait([])) 1932 1933 def test_corowrapper_mocks_generator(self): 1934 1935 def check(): 1936 # A function that asserts various things. 1937 # Called twice, with different debug flag values. 1938 1939 with self.assertWarns(DeprecationWarning): 1940 @asyncio.coroutine 1941 def coro(): 1942 # The actual coroutine. 1943 self.assertTrue(gen.gi_running) 1944 yield from fut 1945 1946 # A completed Future used to run the coroutine. 1947 fut = self.new_future(self.loop) 1948 fut.set_result(None) 1949 1950 # Call the coroutine. 1951 gen = coro() 1952 1953 # Check some properties. 1954 self.assertTrue(asyncio.iscoroutine(gen)) 1955 self.assertIsInstance(gen.gi_frame, types.FrameType) 1956 self.assertFalse(gen.gi_running) 1957 self.assertIsInstance(gen.gi_code, types.CodeType) 1958 1959 # Run it. 1960 self.loop.run_until_complete(gen) 1961 1962 # The frame should have changed. 1963 self.assertIsNone(gen.gi_frame) 1964 1965 # Test with debug flag cleared. 1966 with set_coroutine_debug(False): 1967 check() 1968 1969 # Test with debug flag set. 1970 with set_coroutine_debug(True): 1971 check() 1972 1973 def test_yield_from_corowrapper(self): 1974 with set_coroutine_debug(True): 1975 with self.assertWarns(DeprecationWarning): 1976 @asyncio.coroutine 1977 def t1(): 1978 return (yield from t2()) 1979 1980 with self.assertWarns(DeprecationWarning): 1981 @asyncio.coroutine 1982 def t2(): 1983 f = self.new_future(self.loop) 1984 self.new_task(self.loop, t3(f)) 1985 return (yield from f) 1986 1987 with self.assertWarns(DeprecationWarning): 1988 @asyncio.coroutine 1989 def t3(f): 1990 f.set_result((1, 2, 3)) 1991 1992 task = self.new_task(self.loop, t1()) 1993 val = self.loop.run_until_complete(task) 1994 self.assertEqual(val, (1, 2, 3)) 1995 1996 def test_yield_from_corowrapper_send(self): 1997 def foo(): 1998 a = yield 1999 return a 2000 2001 def call(arg): 2002 cw = asyncio.coroutines.CoroWrapper(foo()) 2003 cw.send(None) 2004 try: 2005 cw.send(arg) 2006 except StopIteration as ex: 2007 return ex.args[0] 2008 else: 2009 raise AssertionError('StopIteration was expected') 2010 2011 self.assertEqual(call((1, 2)), (1, 2)) 2012 self.assertEqual(call('spam'), 'spam') 2013 2014 def test_corowrapper_weakref(self): 2015 wd = weakref.WeakValueDictionary() 2016 def foo(): yield from [] 2017 cw = asyncio.coroutines.CoroWrapper(foo()) 2018 wd['cw'] = cw # Would fail without __weakref__ slot. 2019 cw.gen = None # Suppress warning from __del__. 2020 2021 def test_corowrapper_throw(self): 2022 # Issue 429: CoroWrapper.throw must be compatible with gen.throw 2023 def foo(): 2024 value = None 2025 while True: 2026 try: 2027 value = yield value 2028 except Exception as e: 2029 value = e 2030 2031 exception = Exception("foo") 2032 cw = asyncio.coroutines.CoroWrapper(foo()) 2033 cw.send(None) 2034 self.assertIs(exception, cw.throw(exception)) 2035 2036 cw = asyncio.coroutines.CoroWrapper(foo()) 2037 cw.send(None) 2038 self.assertIs(exception, cw.throw(Exception, exception)) 2039 2040 cw = asyncio.coroutines.CoroWrapper(foo()) 2041 cw.send(None) 2042 exception = cw.throw(Exception, "foo") 2043 self.assertIsInstance(exception, Exception) 2044 self.assertEqual(exception.args, ("foo", )) 2045 2046 cw = asyncio.coroutines.CoroWrapper(foo()) 2047 cw.send(None) 2048 exception = cw.throw(Exception, "foo", None) 2049 self.assertIsInstance(exception, Exception) 2050 self.assertEqual(exception.args, ("foo", )) 2051 2052 def test_all_tasks_deprecated(self): 2053 Task = self.__class__.Task 2054 2055 async def coro(): 2056 with self.assertWarns(DeprecationWarning): 2057 assert Task.all_tasks(self.loop) == {t} 2058 2059 t = self.new_task(self.loop, coro()) 2060 self.loop.run_until_complete(t) 2061 2062 def test_log_destroyed_pending_task(self): 2063 Task = self.__class__.Task 2064 2065 with self.assertWarns(DeprecationWarning): 2066 @asyncio.coroutine 2067 def kill_me(loop): 2068 future = self.new_future(loop) 2069 yield from future 2070 # at this point, the only reference to kill_me() task is 2071 # the Task._wakeup() method in future._callbacks 2072 raise Exception("code never reached") 2073 2074 mock_handler = mock.Mock() 2075 self.loop.set_debug(True) 2076 self.loop.set_exception_handler(mock_handler) 2077 2078 # schedule the task 2079 coro = kill_me(self.loop) 2080 task = asyncio.ensure_future(coro, loop=self.loop) 2081 2082 self.assertEqual(asyncio.all_tasks(loop=self.loop), {task}) 2083 2084 # See http://bugs.python.org/issue29271 for details: 2085 asyncio.set_event_loop(self.loop) 2086 try: 2087 with self.assertWarns(DeprecationWarning): 2088 self.assertEqual(Task.all_tasks(), {task}) 2089 with self.assertWarns(DeprecationWarning): 2090 self.assertEqual(Task.all_tasks(None), {task}) 2091 finally: 2092 asyncio.set_event_loop(None) 2093 2094 # execute the task so it waits for future 2095 self.loop._run_once() 2096 self.assertEqual(len(self.loop._ready), 0) 2097 2098 # remove the future used in kill_me(), and references to the task 2099 del coro.gi_frame.f_locals['future'] 2100 coro = None 2101 source_traceback = task._source_traceback 2102 task = None 2103 2104 # no more reference to kill_me() task: the task is destroyed by the GC 2105 support.gc_collect() 2106 2107 self.assertEqual(asyncio.all_tasks(loop=self.loop), set()) 2108 2109 mock_handler.assert_called_with(self.loop, { 2110 'message': 'Task was destroyed but it is pending!', 2111 'task': mock.ANY, 2112 'source_traceback': source_traceback, 2113 }) 2114 mock_handler.reset_mock() 2115 2116 @mock.patch('asyncio.base_events.logger') 2117 def test_tb_logger_not_called_after_cancel(self, m_log): 2118 loop = asyncio.new_event_loop() 2119 self.set_event_loop(loop) 2120 2121 async def coro(): 2122 raise TypeError 2123 2124 async def runner(): 2125 task = self.new_task(loop, coro()) 2126 await asyncio.sleep(0.05) 2127 task.cancel() 2128 task = None 2129 2130 loop.run_until_complete(runner()) 2131 self.assertFalse(m_log.error.called) 2132 2133 @mock.patch('asyncio.coroutines.logger') 2134 def test_coroutine_never_yielded(self, m_log): 2135 with set_coroutine_debug(True): 2136 with self.assertWarns(DeprecationWarning): 2137 @asyncio.coroutine 2138 def coro_noop(): 2139 pass 2140 2141 tb_filename = __file__ 2142 tb_lineno = sys._getframe().f_lineno + 2 2143 # create a coroutine object but don't use it 2144 coro_noop() 2145 support.gc_collect() 2146 2147 self.assertTrue(m_log.error.called) 2148 message = m_log.error.call_args[0][0] 2149 func_filename, func_lineno = test_utils.get_function_source(coro_noop) 2150 2151 regex = (r'^<CoroWrapper %s\(?\)? .* at %s:%s, .*> ' 2152 r'was never yielded from\n' 2153 r'Coroutine object created at \(most recent call last, truncated to \d+ last lines\):\n' 2154 r'.*\n' 2155 r' File "%s", line %s, in test_coroutine_never_yielded\n' 2156 r' coro_noop\(\)$' 2157 % (re.escape(coro_noop.__qualname__), 2158 re.escape(func_filename), func_lineno, 2159 re.escape(tb_filename), tb_lineno)) 2160 2161 self.assertRegex(message, re.compile(regex, re.DOTALL)) 2162 2163 def test_return_coroutine_from_coroutine(self): 2164 """Return of @asyncio.coroutine()-wrapped function generator object 2165 from @asyncio.coroutine()-wrapped function should have same effect as 2166 returning generator object or Future.""" 2167 def check(): 2168 with self.assertWarns(DeprecationWarning): 2169 @asyncio.coroutine 2170 def outer_coro(): 2171 with self.assertWarns(DeprecationWarning): 2172 @asyncio.coroutine 2173 def inner_coro(): 2174 return 1 2175 2176 return inner_coro() 2177 2178 result = self.loop.run_until_complete(outer_coro()) 2179 self.assertEqual(result, 1) 2180 2181 # Test with debug flag cleared. 2182 with set_coroutine_debug(False): 2183 check() 2184 2185 # Test with debug flag set. 2186 with set_coroutine_debug(True): 2187 check() 2188 2189 def test_task_source_traceback(self): 2190 self.loop.set_debug(True) 2191 2192 task = self.new_task(self.loop, coroutine_function()) 2193 lineno = sys._getframe().f_lineno - 1 2194 self.assertIsInstance(task._source_traceback, list) 2195 self.assertEqual(task._source_traceback[-2][:3], 2196 (__file__, 2197 lineno, 2198 'test_task_source_traceback')) 2199 self.loop.run_until_complete(task) 2200 2201 def _test_cancel_wait_for(self, timeout): 2202 loop = asyncio.new_event_loop() 2203 self.addCleanup(loop.close) 2204 2205 async def blocking_coroutine(): 2206 fut = self.new_future(loop) 2207 # Block: fut result is never set 2208 await fut 2209 2210 task = loop.create_task(blocking_coroutine()) 2211 2212 wait = loop.create_task(asyncio.wait_for(task, timeout)) 2213 loop.call_soon(wait.cancel) 2214 2215 self.assertRaises(asyncio.CancelledError, 2216 loop.run_until_complete, wait) 2217 2218 # Python issue #23219: cancelling the wait must also cancel the task 2219 self.assertTrue(task.cancelled()) 2220 2221 def test_cancel_blocking_wait_for(self): 2222 self._test_cancel_wait_for(None) 2223 2224 def test_cancel_wait_for(self): 2225 self._test_cancel_wait_for(60.0) 2226 2227 def test_cancel_gather_1(self): 2228 """Ensure that a gathering future refuses to be cancelled once all 2229 children are done""" 2230 loop = asyncio.new_event_loop() 2231 self.addCleanup(loop.close) 2232 2233 fut = self.new_future(loop) 2234 # The indirection fut->child_coro is needed since otherwise the 2235 # gathering task is done at the same time as the child future 2236 def child_coro(): 2237 return (yield from fut) 2238 gather_future = asyncio.gather(child_coro(), loop=loop) 2239 gather_task = asyncio.ensure_future(gather_future, loop=loop) 2240 2241 cancel_result = None 2242 def cancelling_callback(_): 2243 nonlocal cancel_result 2244 cancel_result = gather_task.cancel() 2245 fut.add_done_callback(cancelling_callback) 2246 2247 fut.set_result(42) # calls the cancelling_callback after fut is done() 2248 2249 # At this point the task should complete. 2250 loop.run_until_complete(gather_task) 2251 2252 # Python issue #26923: asyncio.gather drops cancellation 2253 self.assertEqual(cancel_result, False) 2254 self.assertFalse(gather_task.cancelled()) 2255 self.assertEqual(gather_task.result(), [42]) 2256 2257 def test_cancel_gather_2(self): 2258 loop = asyncio.new_event_loop() 2259 self.addCleanup(loop.close) 2260 2261 async def test(): 2262 time = 0 2263 while True: 2264 time += 0.05 2265 await asyncio.gather(asyncio.sleep(0.05), 2266 return_exceptions=True, 2267 loop=loop) 2268 if time > 1: 2269 return 2270 2271 async def main(): 2272 qwe = self.new_task(loop, test()) 2273 await asyncio.sleep(0.2) 2274 qwe.cancel() 2275 try: 2276 await qwe 2277 except asyncio.CancelledError: 2278 pass 2279 else: 2280 self.fail('gather did not propagate the cancellation request') 2281 2282 loop.run_until_complete(main()) 2283 2284 def test_exception_traceback(self): 2285 # See http://bugs.python.org/issue28843 2286 2287 async def foo(): 2288 1 / 0 2289 2290 async def main(): 2291 task = self.new_task(self.loop, foo()) 2292 await asyncio.sleep(0) # skip one loop iteration 2293 self.assertIsNotNone(task.exception().__traceback__) 2294 2295 self.loop.run_until_complete(main()) 2296 2297 @mock.patch('asyncio.base_events.logger') 2298 def test_error_in_call_soon(self, m_log): 2299 def call_soon(callback, *args, **kwargs): 2300 raise ValueError 2301 self.loop.call_soon = call_soon 2302 2303 with self.assertWarns(DeprecationWarning): 2304 @asyncio.coroutine 2305 def coro(): 2306 pass 2307 2308 self.assertFalse(m_log.error.called) 2309 2310 with self.assertRaises(ValueError): 2311 gen = coro() 2312 try: 2313 self.new_task(self.loop, gen) 2314 finally: 2315 gen.close() 2316 2317 self.assertTrue(m_log.error.called) 2318 message = m_log.error.call_args[0][0] 2319 self.assertIn('Task was destroyed but it is pending', message) 2320 2321 self.assertEqual(asyncio.all_tasks(self.loop), set()) 2322 2323 def test_create_task_with_noncoroutine(self): 2324 with self.assertRaisesRegex(TypeError, 2325 "a coroutine was expected, got 123"): 2326 self.new_task(self.loop, 123) 2327 2328 # test it for the second time to ensure that caching 2329 # in asyncio.iscoroutine() doesn't break things. 2330 with self.assertRaisesRegex(TypeError, 2331 "a coroutine was expected, got 123"): 2332 self.new_task(self.loop, 123) 2333 2334 def test_create_task_with_oldstyle_coroutine(self): 2335 2336 with self.assertWarns(DeprecationWarning): 2337 @asyncio.coroutine 2338 def coro(): 2339 pass 2340 2341 task = self.new_task(self.loop, coro()) 2342 self.assertIsInstance(task, self.Task) 2343 self.loop.run_until_complete(task) 2344 2345 # test it for the second time to ensure that caching 2346 # in asyncio.iscoroutine() doesn't break things. 2347 task = self.new_task(self.loop, coro()) 2348 self.assertIsInstance(task, self.Task) 2349 self.loop.run_until_complete(task) 2350 2351 def test_create_task_with_async_function(self): 2352 2353 async def coro(): 2354 pass 2355 2356 task = self.new_task(self.loop, coro()) 2357 self.assertIsInstance(task, self.Task) 2358 self.loop.run_until_complete(task) 2359 2360 # test it for the second time to ensure that caching 2361 # in asyncio.iscoroutine() doesn't break things. 2362 task = self.new_task(self.loop, coro()) 2363 self.assertIsInstance(task, self.Task) 2364 self.loop.run_until_complete(task) 2365 2366 def test_create_task_with_asynclike_function(self): 2367 task = self.new_task(self.loop, CoroLikeObject()) 2368 self.assertIsInstance(task, self.Task) 2369 self.assertEqual(self.loop.run_until_complete(task), 42) 2370 2371 # test it for the second time to ensure that caching 2372 # in asyncio.iscoroutine() doesn't break things. 2373 task = self.new_task(self.loop, CoroLikeObject()) 2374 self.assertIsInstance(task, self.Task) 2375 self.assertEqual(self.loop.run_until_complete(task), 42) 2376 2377 def test_bare_create_task(self): 2378 2379 async def inner(): 2380 return 1 2381 2382 async def coro(): 2383 task = asyncio.create_task(inner()) 2384 self.assertIsInstance(task, self.Task) 2385 ret = await task 2386 self.assertEqual(1, ret) 2387 2388 self.loop.run_until_complete(coro()) 2389 2390 def test_bare_create_named_task(self): 2391 2392 async def coro_noop(): 2393 pass 2394 2395 async def coro(): 2396 task = asyncio.create_task(coro_noop(), name='No-op') 2397 self.assertEqual(task.get_name(), 'No-op') 2398 await task 2399 2400 self.loop.run_until_complete(coro()) 2401 2402 def test_context_1(self): 2403 cvar = contextvars.ContextVar('cvar', default='nope') 2404 2405 async def sub(): 2406 await asyncio.sleep(0.01) 2407 self.assertEqual(cvar.get(), 'nope') 2408 cvar.set('something else') 2409 2410 async def main(): 2411 self.assertEqual(cvar.get(), 'nope') 2412 subtask = self.new_task(loop, sub()) 2413 cvar.set('yes') 2414 self.assertEqual(cvar.get(), 'yes') 2415 await subtask 2416 self.assertEqual(cvar.get(), 'yes') 2417 2418 loop = asyncio.new_event_loop() 2419 try: 2420 task = self.new_task(loop, main()) 2421 loop.run_until_complete(task) 2422 finally: 2423 loop.close() 2424 2425 def test_context_2(self): 2426 cvar = contextvars.ContextVar('cvar', default='nope') 2427 2428 async def main(): 2429 def fut_on_done(fut): 2430 # This change must not pollute the context 2431 # of the "main()" task. 2432 cvar.set('something else') 2433 2434 self.assertEqual(cvar.get(), 'nope') 2435 2436 for j in range(2): 2437 fut = self.new_future(loop) 2438 fut.add_done_callback(fut_on_done) 2439 cvar.set(f'yes{j}') 2440 loop.call_soon(fut.set_result, None) 2441 await fut 2442 self.assertEqual(cvar.get(), f'yes{j}') 2443 2444 for i in range(3): 2445 # Test that task passed its context to add_done_callback: 2446 cvar.set(f'yes{i}-{j}') 2447 await asyncio.sleep(0.001) 2448 self.assertEqual(cvar.get(), f'yes{i}-{j}') 2449 2450 loop = asyncio.new_event_loop() 2451 try: 2452 task = self.new_task(loop, main()) 2453 loop.run_until_complete(task) 2454 finally: 2455 loop.close() 2456 2457 self.assertEqual(cvar.get(), 'nope') 2458 2459 def test_context_3(self): 2460 # Run 100 Tasks in parallel, each modifying cvar. 2461 2462 cvar = contextvars.ContextVar('cvar', default=-1) 2463 2464 async def sub(num): 2465 for i in range(10): 2466 cvar.set(num + i) 2467 await asyncio.sleep(random.uniform(0.001, 0.05)) 2468 self.assertEqual(cvar.get(), num + i) 2469 2470 async def main(): 2471 tasks = [] 2472 for i in range(100): 2473 task = loop.create_task(sub(random.randint(0, 10))) 2474 tasks.append(task) 2475 2476 await asyncio.gather(*tasks, loop=loop) 2477 2478 loop = asyncio.new_event_loop() 2479 try: 2480 loop.run_until_complete(main()) 2481 finally: 2482 loop.close() 2483 2484 self.assertEqual(cvar.get(), -1) 2485 2486 def test_get_coro(self): 2487 loop = asyncio.new_event_loop() 2488 coro = coroutine_function() 2489 try: 2490 task = self.new_task(loop, coro) 2491 loop.run_until_complete(task) 2492 self.assertIs(task.get_coro(), coro) 2493 finally: 2494 loop.close() 2495 2496 2497def add_subclass_tests(cls): 2498 BaseTask = cls.Task 2499 BaseFuture = cls.Future 2500 2501 if BaseTask is None or BaseFuture is None: 2502 return cls 2503 2504 class CommonFuture: 2505 def __init__(self, *args, **kwargs): 2506 self.calls = collections.defaultdict(lambda: 0) 2507 super().__init__(*args, **kwargs) 2508 2509 def add_done_callback(self, *args, **kwargs): 2510 self.calls['add_done_callback'] += 1 2511 return super().add_done_callback(*args, **kwargs) 2512 2513 class Task(CommonFuture, BaseTask): 2514 pass 2515 2516 class Future(CommonFuture, BaseFuture): 2517 pass 2518 2519 def test_subclasses_ctask_cfuture(self): 2520 fut = self.Future(loop=self.loop) 2521 2522 async def func(): 2523 self.loop.call_soon(lambda: fut.set_result('spam')) 2524 return await fut 2525 2526 task = self.Task(func(), loop=self.loop) 2527 2528 result = self.loop.run_until_complete(task) 2529 2530 self.assertEqual(result, 'spam') 2531 2532 self.assertEqual( 2533 dict(task.calls), 2534 {'add_done_callback': 1}) 2535 2536 self.assertEqual( 2537 dict(fut.calls), 2538 {'add_done_callback': 1}) 2539 2540 # Add patched Task & Future back to the test case 2541 cls.Task = Task 2542 cls.Future = Future 2543 2544 # Add an extra unit-test 2545 cls.test_subclasses_ctask_cfuture = test_subclasses_ctask_cfuture 2546 2547 # Disable the "test_task_source_traceback" test 2548 # (the test is hardcoded for a particular call stack, which 2549 # is slightly different for Task subclasses) 2550 cls.test_task_source_traceback = None 2551 2552 return cls 2553 2554 2555class SetMethodsTest: 2556 2557 def test_set_result_causes_invalid_state(self): 2558 Future = type(self).Future 2559 self.loop.call_exception_handler = exc_handler = mock.Mock() 2560 2561 async def foo(): 2562 await asyncio.sleep(0.1) 2563 return 10 2564 2565 coro = foo() 2566 task = self.new_task(self.loop, coro) 2567 Future.set_result(task, 'spam') 2568 2569 self.assertEqual( 2570 self.loop.run_until_complete(task), 2571 'spam') 2572 2573 exc_handler.assert_called_once() 2574 exc = exc_handler.call_args[0][0]['exception'] 2575 with self.assertRaisesRegex(asyncio.InvalidStateError, 2576 r'step\(\): already done'): 2577 raise exc 2578 2579 coro.close() 2580 2581 def test_set_exception_causes_invalid_state(self): 2582 class MyExc(Exception): 2583 pass 2584 2585 Future = type(self).Future 2586 self.loop.call_exception_handler = exc_handler = mock.Mock() 2587 2588 async def foo(): 2589 await asyncio.sleep(0.1) 2590 return 10 2591 2592 coro = foo() 2593 task = self.new_task(self.loop, coro) 2594 Future.set_exception(task, MyExc()) 2595 2596 with self.assertRaises(MyExc): 2597 self.loop.run_until_complete(task) 2598 2599 exc_handler.assert_called_once() 2600 exc = exc_handler.call_args[0][0]['exception'] 2601 with self.assertRaisesRegex(asyncio.InvalidStateError, 2602 r'step\(\): already done'): 2603 raise exc 2604 2605 coro.close() 2606 2607 2608@unittest.skipUnless(hasattr(futures, '_CFuture') and 2609 hasattr(tasks, '_CTask'), 2610 'requires the C _asyncio module') 2611class CTask_CFuture_Tests(BaseTaskTests, SetMethodsTest, 2612 test_utils.TestCase): 2613 2614 Task = getattr(tasks, '_CTask', None) 2615 Future = getattr(futures, '_CFuture', None) 2616 2617 @support.refcount_test 2618 def test_refleaks_in_task___init__(self): 2619 gettotalrefcount = support.get_attribute(sys, 'gettotalrefcount') 2620 async def coro(): 2621 pass 2622 task = self.new_task(self.loop, coro()) 2623 self.loop.run_until_complete(task) 2624 refs_before = gettotalrefcount() 2625 for i in range(100): 2626 task.__init__(coro(), loop=self.loop) 2627 self.loop.run_until_complete(task) 2628 self.assertAlmostEqual(gettotalrefcount() - refs_before, 0, delta=10) 2629 2630 def test_del__log_destroy_pending_segfault(self): 2631 async def coro(): 2632 pass 2633 task = self.new_task(self.loop, coro()) 2634 self.loop.run_until_complete(task) 2635 with self.assertRaises(AttributeError): 2636 del task._log_destroy_pending 2637 2638 2639@unittest.skipUnless(hasattr(futures, '_CFuture') and 2640 hasattr(tasks, '_CTask'), 2641 'requires the C _asyncio module') 2642@add_subclass_tests 2643class CTask_CFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2644 2645 Task = getattr(tasks, '_CTask', None) 2646 Future = getattr(futures, '_CFuture', None) 2647 2648 2649@unittest.skipUnless(hasattr(tasks, '_CTask'), 2650 'requires the C _asyncio module') 2651@add_subclass_tests 2652class CTaskSubclass_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2653 2654 Task = getattr(tasks, '_CTask', None) 2655 Future = futures._PyFuture 2656 2657 2658@unittest.skipUnless(hasattr(futures, '_CFuture'), 2659 'requires the C _asyncio module') 2660@add_subclass_tests 2661class PyTask_CFutureSubclass_Tests(BaseTaskTests, test_utils.TestCase): 2662 2663 Future = getattr(futures, '_CFuture', None) 2664 Task = tasks._PyTask 2665 2666 2667@unittest.skipUnless(hasattr(tasks, '_CTask'), 2668 'requires the C _asyncio module') 2669class CTask_PyFuture_Tests(BaseTaskTests, test_utils.TestCase): 2670 2671 Task = getattr(tasks, '_CTask', None) 2672 Future = futures._PyFuture 2673 2674 2675@unittest.skipUnless(hasattr(futures, '_CFuture'), 2676 'requires the C _asyncio module') 2677class PyTask_CFuture_Tests(BaseTaskTests, test_utils.TestCase): 2678 2679 Task = tasks._PyTask 2680 Future = getattr(futures, '_CFuture', None) 2681 2682 2683class PyTask_PyFuture_Tests(BaseTaskTests, SetMethodsTest, 2684 test_utils.TestCase): 2685 2686 Task = tasks._PyTask 2687 Future = futures._PyFuture 2688 2689 2690@add_subclass_tests 2691class PyTask_PyFuture_SubclassTests(BaseTaskTests, test_utils.TestCase): 2692 Task = tasks._PyTask 2693 Future = futures._PyFuture 2694 2695 2696@unittest.skipUnless(hasattr(tasks, '_CTask'), 2697 'requires the C _asyncio module') 2698class CTask_Future_Tests(test_utils.TestCase): 2699 2700 def test_foobar(self): 2701 class Fut(asyncio.Future): 2702 @property 2703 def get_loop(self): 2704 raise AttributeError 2705 2706 async def coro(): 2707 await fut 2708 return 'spam' 2709 2710 self.loop = asyncio.new_event_loop() 2711 try: 2712 fut = Fut(loop=self.loop) 2713 self.loop.call_later(0.1, fut.set_result, 1) 2714 task = self.loop.create_task(coro()) 2715 res = self.loop.run_until_complete(task) 2716 finally: 2717 self.loop.close() 2718 2719 self.assertEqual(res, 'spam') 2720 2721 2722class BaseTaskIntrospectionTests: 2723 _register_task = None 2724 _unregister_task = None 2725 _enter_task = None 2726 _leave_task = None 2727 2728 def test__register_task_1(self): 2729 class TaskLike: 2730 @property 2731 def _loop(self): 2732 return loop 2733 2734 def done(self): 2735 return False 2736 2737 task = TaskLike() 2738 loop = mock.Mock() 2739 2740 self.assertEqual(asyncio.all_tasks(loop), set()) 2741 self._register_task(task) 2742 self.assertEqual(asyncio.all_tasks(loop), {task}) 2743 self._unregister_task(task) 2744 2745 def test__register_task_2(self): 2746 class TaskLike: 2747 def get_loop(self): 2748 return loop 2749 2750 def done(self): 2751 return False 2752 2753 task = TaskLike() 2754 loop = mock.Mock() 2755 2756 self.assertEqual(asyncio.all_tasks(loop), set()) 2757 self._register_task(task) 2758 self.assertEqual(asyncio.all_tasks(loop), {task}) 2759 self._unregister_task(task) 2760 2761 def test__register_task_3(self): 2762 class TaskLike: 2763 def get_loop(self): 2764 return loop 2765 2766 def done(self): 2767 return True 2768 2769 task = TaskLike() 2770 loop = mock.Mock() 2771 2772 self.assertEqual(asyncio.all_tasks(loop), set()) 2773 self._register_task(task) 2774 self.assertEqual(asyncio.all_tasks(loop), set()) 2775 with self.assertWarns(DeprecationWarning): 2776 self.assertEqual(asyncio.Task.all_tasks(loop), {task}) 2777 self._unregister_task(task) 2778 2779 def test__enter_task(self): 2780 task = mock.Mock() 2781 loop = mock.Mock() 2782 self.assertIsNone(asyncio.current_task(loop)) 2783 self._enter_task(loop, task) 2784 self.assertIs(asyncio.current_task(loop), task) 2785 self._leave_task(loop, task) 2786 2787 def test__enter_task_failure(self): 2788 task1 = mock.Mock() 2789 task2 = mock.Mock() 2790 loop = mock.Mock() 2791 self._enter_task(loop, task1) 2792 with self.assertRaises(RuntimeError): 2793 self._enter_task(loop, task2) 2794 self.assertIs(asyncio.current_task(loop), task1) 2795 self._leave_task(loop, task1) 2796 2797 def test__leave_task(self): 2798 task = mock.Mock() 2799 loop = mock.Mock() 2800 self._enter_task(loop, task) 2801 self._leave_task(loop, task) 2802 self.assertIsNone(asyncio.current_task(loop)) 2803 2804 def test__leave_task_failure1(self): 2805 task1 = mock.Mock() 2806 task2 = mock.Mock() 2807 loop = mock.Mock() 2808 self._enter_task(loop, task1) 2809 with self.assertRaises(RuntimeError): 2810 self._leave_task(loop, task2) 2811 self.assertIs(asyncio.current_task(loop), task1) 2812 self._leave_task(loop, task1) 2813 2814 def test__leave_task_failure2(self): 2815 task = mock.Mock() 2816 loop = mock.Mock() 2817 with self.assertRaises(RuntimeError): 2818 self._leave_task(loop, task) 2819 self.assertIsNone(asyncio.current_task(loop)) 2820 2821 def test__unregister_task(self): 2822 task = mock.Mock() 2823 loop = mock.Mock() 2824 task.get_loop = lambda: loop 2825 self._register_task(task) 2826 self._unregister_task(task) 2827 self.assertEqual(asyncio.all_tasks(loop), set()) 2828 2829 def test__unregister_task_not_registered(self): 2830 task = mock.Mock() 2831 loop = mock.Mock() 2832 self._unregister_task(task) 2833 self.assertEqual(asyncio.all_tasks(loop), set()) 2834 2835 2836class PyIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2837 _register_task = staticmethod(tasks._py_register_task) 2838 _unregister_task = staticmethod(tasks._py_unregister_task) 2839 _enter_task = staticmethod(tasks._py_enter_task) 2840 _leave_task = staticmethod(tasks._py_leave_task) 2841 2842 2843@unittest.skipUnless(hasattr(tasks, '_c_register_task'), 2844 'requires the C _asyncio module') 2845class CIntrospectionTests(test_utils.TestCase, BaseTaskIntrospectionTests): 2846 if hasattr(tasks, '_c_register_task'): 2847 _register_task = staticmethod(tasks._c_register_task) 2848 _unregister_task = staticmethod(tasks._c_unregister_task) 2849 _enter_task = staticmethod(tasks._c_enter_task) 2850 _leave_task = staticmethod(tasks._c_leave_task) 2851 else: 2852 _register_task = _unregister_task = _enter_task = _leave_task = None 2853 2854 2855class BaseCurrentLoopTests: 2856 2857 def setUp(self): 2858 super().setUp() 2859 self.loop = asyncio.new_event_loop() 2860 self.set_event_loop(self.loop) 2861 2862 def new_task(self, coro): 2863 raise NotImplementedError 2864 2865 def test_current_task_no_running_loop(self): 2866 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2867 2868 def test_current_task_no_running_loop_implicit(self): 2869 with self.assertRaises(RuntimeError): 2870 asyncio.current_task() 2871 2872 def test_current_task_with_implicit_loop(self): 2873 async def coro(): 2874 self.assertIs(asyncio.current_task(loop=self.loop), task) 2875 2876 self.assertIs(asyncio.current_task(None), task) 2877 self.assertIs(asyncio.current_task(), task) 2878 2879 task = self.new_task(coro()) 2880 self.loop.run_until_complete(task) 2881 self.assertIsNone(asyncio.current_task(loop=self.loop)) 2882 2883 2884class PyCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2885 2886 def new_task(self, coro): 2887 return tasks._PyTask(coro, loop=self.loop) 2888 2889 2890@unittest.skipUnless(hasattr(tasks, '_CTask'), 2891 'requires the C _asyncio module') 2892class CCurrentLoopTests(BaseCurrentLoopTests, test_utils.TestCase): 2893 2894 def new_task(self, coro): 2895 return getattr(tasks, '_CTask')(coro, loop=self.loop) 2896 2897 2898class GenericTaskTests(test_utils.TestCase): 2899 2900 def test_future_subclass(self): 2901 self.assertTrue(issubclass(asyncio.Task, asyncio.Future)) 2902 2903 def test_asyncio_module_compiled(self): 2904 # Because of circular imports it's easy to make _asyncio 2905 # module non-importable. This is a simple test that will 2906 # fail on systems where C modules were successfully compiled 2907 # (hence the test for _functools), but _asyncio somehow didn't. 2908 try: 2909 import _functools 2910 except ImportError: 2911 pass 2912 else: 2913 try: 2914 import _asyncio 2915 except ImportError: 2916 self.fail('_asyncio module is missing') 2917 2918 2919class GatherTestsBase: 2920 2921 def setUp(self): 2922 super().setUp() 2923 self.one_loop = self.new_test_loop() 2924 self.other_loop = self.new_test_loop() 2925 self.set_event_loop(self.one_loop, cleanup=False) 2926 2927 def _run_loop(self, loop): 2928 while loop._ready: 2929 test_utils.run_briefly(loop) 2930 2931 def _check_success(self, **kwargs): 2932 a, b, c = [self.one_loop.create_future() for i in range(3)] 2933 fut = asyncio.gather(*self.wrap_futures(a, b, c), **kwargs) 2934 cb = test_utils.MockCallback() 2935 fut.add_done_callback(cb) 2936 b.set_result(1) 2937 a.set_result(2) 2938 self._run_loop(self.one_loop) 2939 self.assertEqual(cb.called, False) 2940 self.assertFalse(fut.done()) 2941 c.set_result(3) 2942 self._run_loop(self.one_loop) 2943 cb.assert_called_once_with(fut) 2944 self.assertEqual(fut.result(), [2, 1, 3]) 2945 2946 def test_success(self): 2947 self._check_success() 2948 self._check_success(return_exceptions=False) 2949 2950 def test_result_exception_success(self): 2951 self._check_success(return_exceptions=True) 2952 2953 def test_one_exception(self): 2954 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 2955 fut = asyncio.gather(*self.wrap_futures(a, b, c, d, e)) 2956 cb = test_utils.MockCallback() 2957 fut.add_done_callback(cb) 2958 exc = ZeroDivisionError() 2959 a.set_result(1) 2960 b.set_exception(exc) 2961 self._run_loop(self.one_loop) 2962 self.assertTrue(fut.done()) 2963 cb.assert_called_once_with(fut) 2964 self.assertIs(fut.exception(), exc) 2965 # Does nothing 2966 c.set_result(3) 2967 d.cancel() 2968 e.set_exception(RuntimeError()) 2969 e.exception() 2970 2971 def test_return_exceptions(self): 2972 a, b, c, d = [self.one_loop.create_future() for i in range(4)] 2973 fut = asyncio.gather(*self.wrap_futures(a, b, c, d), 2974 return_exceptions=True) 2975 cb = test_utils.MockCallback() 2976 fut.add_done_callback(cb) 2977 exc = ZeroDivisionError() 2978 exc2 = RuntimeError() 2979 b.set_result(1) 2980 c.set_exception(exc) 2981 a.set_result(3) 2982 self._run_loop(self.one_loop) 2983 self.assertFalse(fut.done()) 2984 d.set_exception(exc2) 2985 self._run_loop(self.one_loop) 2986 self.assertTrue(fut.done()) 2987 cb.assert_called_once_with(fut) 2988 self.assertEqual(fut.result(), [3, 1, exc, exc2]) 2989 2990 def test_env_var_debug(self): 2991 code = '\n'.join(( 2992 'import asyncio.coroutines', 2993 'print(asyncio.coroutines._DEBUG)')) 2994 2995 # Test with -E to not fail if the unit test was run with 2996 # PYTHONASYNCIODEBUG set to a non-empty string 2997 sts, stdout, stderr = assert_python_ok('-E', '-c', code) 2998 self.assertEqual(stdout.rstrip(), b'False') 2999 3000 sts, stdout, stderr = assert_python_ok('-c', code, 3001 PYTHONASYNCIODEBUG='', 3002 PYTHONDEVMODE='') 3003 self.assertEqual(stdout.rstrip(), b'False') 3004 3005 sts, stdout, stderr = assert_python_ok('-c', code, 3006 PYTHONASYNCIODEBUG='1', 3007 PYTHONDEVMODE='') 3008 self.assertEqual(stdout.rstrip(), b'True') 3009 3010 sts, stdout, stderr = assert_python_ok('-E', '-c', code, 3011 PYTHONASYNCIODEBUG='1', 3012 PYTHONDEVMODE='') 3013 self.assertEqual(stdout.rstrip(), b'False') 3014 3015 # -X dev 3016 sts, stdout, stderr = assert_python_ok('-E', '-X', 'dev', 3017 '-c', code) 3018 self.assertEqual(stdout.rstrip(), b'True') 3019 3020 3021class FutureGatherTests(GatherTestsBase, test_utils.TestCase): 3022 3023 def wrap_futures(self, *futures): 3024 return futures 3025 3026 def _check_empty_sequence(self, seq_or_iter): 3027 asyncio.set_event_loop(self.one_loop) 3028 self.addCleanup(asyncio.set_event_loop, None) 3029 fut = asyncio.gather(*seq_or_iter) 3030 self.assertIsInstance(fut, asyncio.Future) 3031 self.assertIs(fut._loop, self.one_loop) 3032 self._run_loop(self.one_loop) 3033 self.assertTrue(fut.done()) 3034 self.assertEqual(fut.result(), []) 3035 with self.assertWarns(DeprecationWarning): 3036 fut = asyncio.gather(*seq_or_iter, loop=self.other_loop) 3037 self.assertIs(fut._loop, self.other_loop) 3038 3039 def test_constructor_empty_sequence(self): 3040 self._check_empty_sequence([]) 3041 self._check_empty_sequence(()) 3042 self._check_empty_sequence(set()) 3043 self._check_empty_sequence(iter("")) 3044 3045 def test_constructor_heterogenous_futures(self): 3046 fut1 = self.one_loop.create_future() 3047 fut2 = self.other_loop.create_future() 3048 with self.assertRaises(ValueError): 3049 asyncio.gather(fut1, fut2) 3050 with self.assertRaises(ValueError): 3051 asyncio.gather(fut1, loop=self.other_loop) 3052 3053 def test_constructor_homogenous_futures(self): 3054 children = [self.other_loop.create_future() for i in range(3)] 3055 fut = asyncio.gather(*children) 3056 self.assertIs(fut._loop, self.other_loop) 3057 self._run_loop(self.other_loop) 3058 self.assertFalse(fut.done()) 3059 fut = asyncio.gather(*children, loop=self.other_loop) 3060 self.assertIs(fut._loop, self.other_loop) 3061 self._run_loop(self.other_loop) 3062 self.assertFalse(fut.done()) 3063 3064 def test_one_cancellation(self): 3065 a, b, c, d, e = [self.one_loop.create_future() for i in range(5)] 3066 fut = asyncio.gather(a, b, c, d, e) 3067 cb = test_utils.MockCallback() 3068 fut.add_done_callback(cb) 3069 a.set_result(1) 3070 b.cancel() 3071 self._run_loop(self.one_loop) 3072 self.assertTrue(fut.done()) 3073 cb.assert_called_once_with(fut) 3074 self.assertFalse(fut.cancelled()) 3075 self.assertIsInstance(fut.exception(), asyncio.CancelledError) 3076 # Does nothing 3077 c.set_result(3) 3078 d.cancel() 3079 e.set_exception(RuntimeError()) 3080 e.exception() 3081 3082 def test_result_exception_one_cancellation(self): 3083 a, b, c, d, e, f = [self.one_loop.create_future() 3084 for i in range(6)] 3085 fut = asyncio.gather(a, b, c, d, e, f, return_exceptions=True) 3086 cb = test_utils.MockCallback() 3087 fut.add_done_callback(cb) 3088 a.set_result(1) 3089 zde = ZeroDivisionError() 3090 b.set_exception(zde) 3091 c.cancel() 3092 self._run_loop(self.one_loop) 3093 self.assertFalse(fut.done()) 3094 d.set_result(3) 3095 e.cancel() 3096 rte = RuntimeError() 3097 f.set_exception(rte) 3098 res = self.one_loop.run_until_complete(fut) 3099 self.assertIsInstance(res[2], asyncio.CancelledError) 3100 self.assertIsInstance(res[4], asyncio.CancelledError) 3101 res[2] = res[4] = None 3102 self.assertEqual(res, [1, zde, None, 3, None, rte]) 3103 cb.assert_called_once_with(fut) 3104 3105 3106class CoroutineGatherTests(GatherTestsBase, test_utils.TestCase): 3107 3108 def setUp(self): 3109 super().setUp() 3110 asyncio.set_event_loop(self.one_loop) 3111 3112 def wrap_futures(self, *futures): 3113 coros = [] 3114 for fut in futures: 3115 async def coro(fut=fut): 3116 return await fut 3117 coros.append(coro()) 3118 return coros 3119 3120 def test_constructor_loop_selection(self): 3121 async def coro(): 3122 return 'abc' 3123 gen1 = coro() 3124 gen2 = coro() 3125 fut = asyncio.gather(gen1, gen2) 3126 self.assertIs(fut._loop, self.one_loop) 3127 self.one_loop.run_until_complete(fut) 3128 3129 self.set_event_loop(self.other_loop, cleanup=False) 3130 gen3 = coro() 3131 gen4 = coro() 3132 fut2 = asyncio.gather(gen3, gen4, loop=self.other_loop) 3133 self.assertIs(fut2._loop, self.other_loop) 3134 self.other_loop.run_until_complete(fut2) 3135 3136 def test_duplicate_coroutines(self): 3137 with self.assertWarns(DeprecationWarning): 3138 @asyncio.coroutine 3139 def coro(s): 3140 return s 3141 c = coro('abc') 3142 fut = asyncio.gather(c, c, coro('def'), c, loop=self.one_loop) 3143 self._run_loop(self.one_loop) 3144 self.assertEqual(fut.result(), ['abc', 'abc', 'def', 'abc']) 3145 3146 def test_cancellation_broadcast(self): 3147 # Cancelling outer() cancels all children. 3148 proof = 0 3149 waiter = self.one_loop.create_future() 3150 3151 async def inner(): 3152 nonlocal proof 3153 await waiter 3154 proof += 1 3155 3156 child1 = asyncio.ensure_future(inner(), loop=self.one_loop) 3157 child2 = asyncio.ensure_future(inner(), loop=self.one_loop) 3158 gatherer = None 3159 3160 async def outer(): 3161 nonlocal proof, gatherer 3162 gatherer = asyncio.gather(child1, child2, loop=self.one_loop) 3163 await gatherer 3164 proof += 100 3165 3166 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3167 test_utils.run_briefly(self.one_loop) 3168 self.assertTrue(f.cancel()) 3169 with self.assertRaises(asyncio.CancelledError): 3170 self.one_loop.run_until_complete(f) 3171 self.assertFalse(gatherer.cancel()) 3172 self.assertTrue(waiter.cancelled()) 3173 self.assertTrue(child1.cancelled()) 3174 self.assertTrue(child2.cancelled()) 3175 test_utils.run_briefly(self.one_loop) 3176 self.assertEqual(proof, 0) 3177 3178 def test_exception_marking(self): 3179 # Test for the first line marked "Mark exception retrieved." 3180 3181 async def inner(f): 3182 await f 3183 raise RuntimeError('should not be ignored') 3184 3185 a = self.one_loop.create_future() 3186 b = self.one_loop.create_future() 3187 3188 async def outer(): 3189 await asyncio.gather(inner(a), inner(b), loop=self.one_loop) 3190 3191 f = asyncio.ensure_future(outer(), loop=self.one_loop) 3192 test_utils.run_briefly(self.one_loop) 3193 a.set_result(None) 3194 test_utils.run_briefly(self.one_loop) 3195 b.set_result(None) 3196 test_utils.run_briefly(self.one_loop) 3197 self.assertIsInstance(f.exception(), RuntimeError) 3198 3199 3200class RunCoroutineThreadsafeTests(test_utils.TestCase): 3201 """Test case for asyncio.run_coroutine_threadsafe.""" 3202 3203 def setUp(self): 3204 super().setUp() 3205 self.loop = asyncio.new_event_loop() 3206 self.set_event_loop(self.loop) # Will cleanup properly 3207 3208 async def add(self, a, b, fail=False, cancel=False): 3209 """Wait 0.05 second and return a + b.""" 3210 await asyncio.sleep(0.05) 3211 if fail: 3212 raise RuntimeError("Fail!") 3213 if cancel: 3214 asyncio.current_task(self.loop).cancel() 3215 await asyncio.sleep(0) 3216 return a + b 3217 3218 def target(self, fail=False, cancel=False, timeout=None, 3219 advance_coro=False): 3220 """Run add coroutine in the event loop.""" 3221 coro = self.add(1, 2, fail=fail, cancel=cancel) 3222 future = asyncio.run_coroutine_threadsafe(coro, self.loop) 3223 if advance_coro: 3224 # this is for test_run_coroutine_threadsafe_task_factory_exception; 3225 # otherwise it spills errors and breaks **other** unittests, since 3226 # 'target' is interacting with threads. 3227 3228 # With this call, `coro` will be advanced, so that 3229 # CoroWrapper.__del__ won't do anything when asyncio tests run 3230 # in debug mode. 3231 self.loop.call_soon_threadsafe(coro.send, None) 3232 try: 3233 return future.result(timeout) 3234 finally: 3235 future.done() or future.cancel() 3236 3237 def test_run_coroutine_threadsafe(self): 3238 """Test coroutine submission from a thread to an event loop.""" 3239 future = self.loop.run_in_executor(None, self.target) 3240 result = self.loop.run_until_complete(future) 3241 self.assertEqual(result, 3) 3242 3243 def test_run_coroutine_threadsafe_with_exception(self): 3244 """Test coroutine submission from a thread to an event loop 3245 when an exception is raised.""" 3246 future = self.loop.run_in_executor(None, self.target, True) 3247 with self.assertRaises(RuntimeError) as exc_context: 3248 self.loop.run_until_complete(future) 3249 self.assertIn("Fail!", exc_context.exception.args) 3250 3251 def test_run_coroutine_threadsafe_with_timeout(self): 3252 """Test coroutine submission from a thread to an event loop 3253 when a timeout is raised.""" 3254 callback = lambda: self.target(timeout=0) 3255 future = self.loop.run_in_executor(None, callback) 3256 with self.assertRaises(asyncio.TimeoutError): 3257 self.loop.run_until_complete(future) 3258 test_utils.run_briefly(self.loop) 3259 # Check that there's no pending task (add has been cancelled) 3260 for task in asyncio.all_tasks(self.loop): 3261 self.assertTrue(task.done()) 3262 3263 def test_run_coroutine_threadsafe_task_cancelled(self): 3264 """Test coroutine submission from a tread to an event loop 3265 when the task is cancelled.""" 3266 callback = lambda: self.target(cancel=True) 3267 future = self.loop.run_in_executor(None, callback) 3268 with self.assertRaises(asyncio.CancelledError): 3269 self.loop.run_until_complete(future) 3270 3271 def test_run_coroutine_threadsafe_task_factory_exception(self): 3272 """Test coroutine submission from a tread to an event loop 3273 when the task factory raise an exception.""" 3274 3275 def task_factory(loop, coro): 3276 raise NameError 3277 3278 run = self.loop.run_in_executor( 3279 None, lambda: self.target(advance_coro=True)) 3280 3281 # Set exception handler 3282 callback = test_utils.MockCallback() 3283 self.loop.set_exception_handler(callback) 3284 3285 # Set corrupted task factory 3286 self.loop.set_task_factory(task_factory) 3287 3288 # Run event loop 3289 with self.assertRaises(NameError) as exc_context: 3290 self.loop.run_until_complete(run) 3291 3292 # Check exceptions 3293 self.assertEqual(len(callback.call_args_list), 1) 3294 (loop, context), kwargs = callback.call_args 3295 self.assertEqual(context['exception'], exc_context.exception) 3296 3297 3298class SleepTests(test_utils.TestCase): 3299 def setUp(self): 3300 super().setUp() 3301 self.loop = asyncio.new_event_loop() 3302 self.set_event_loop(self.loop) 3303 3304 def tearDown(self): 3305 self.loop.close() 3306 self.loop = None 3307 super().tearDown() 3308 3309 def test_sleep_zero(self): 3310 result = 0 3311 3312 def inc_result(num): 3313 nonlocal result 3314 result += num 3315 3316 async def coro(): 3317 self.loop.call_soon(inc_result, 1) 3318 self.assertEqual(result, 0) 3319 num = await asyncio.sleep(0, result=10) 3320 self.assertEqual(result, 1) # inc'ed by call_soon 3321 inc_result(num) # num should be 11 3322 3323 self.loop.run_until_complete(coro()) 3324 self.assertEqual(result, 11) 3325 3326 def test_loop_argument_is_deprecated(self): 3327 # Remove test when loop argument is removed in Python 3.10 3328 with self.assertWarns(DeprecationWarning): 3329 self.loop.run_until_complete(asyncio.sleep(0.01, loop=self.loop)) 3330 3331 3332class WaitTests(test_utils.TestCase): 3333 def setUp(self): 3334 super().setUp() 3335 self.loop = asyncio.new_event_loop() 3336 self.set_event_loop(self.loop) 3337 3338 def tearDown(self): 3339 self.loop.close() 3340 self.loop = None 3341 super().tearDown() 3342 3343 def test_loop_argument_is_deprecated_in_wait(self): 3344 # Remove test when loop argument is removed in Python 3.10 3345 with self.assertWarns(DeprecationWarning): 3346 self.loop.run_until_complete( 3347 asyncio.wait([coroutine_function()], loop=self.loop)) 3348 3349 def test_loop_argument_is_deprecated_in_wait_for(self): 3350 # Remove test when loop argument is removed in Python 3.10 3351 with self.assertWarns(DeprecationWarning): 3352 self.loop.run_until_complete( 3353 asyncio.wait_for(coroutine_function(), 0.01, loop=self.loop)) 3354 3355 3356class CompatibilityTests(test_utils.TestCase): 3357 # Tests for checking a bridge between old-styled coroutines 3358 # and async/await syntax 3359 3360 def setUp(self): 3361 super().setUp() 3362 self.loop = asyncio.new_event_loop() 3363 self.set_event_loop(self.loop) 3364 3365 def tearDown(self): 3366 self.loop.close() 3367 self.loop = None 3368 super().tearDown() 3369 3370 def test_yield_from_awaitable(self): 3371 3372 with self.assertWarns(DeprecationWarning): 3373 @asyncio.coroutine 3374 def coro(): 3375 yield from asyncio.sleep(0) 3376 return 'ok' 3377 3378 result = self.loop.run_until_complete(coro()) 3379 self.assertEqual('ok', result) 3380 3381 def test_await_old_style_coro(self): 3382 3383 with self.assertWarns(DeprecationWarning): 3384 @asyncio.coroutine 3385 def coro1(): 3386 return 'ok1' 3387 3388 with self.assertWarns(DeprecationWarning): 3389 @asyncio.coroutine 3390 def coro2(): 3391 yield from asyncio.sleep(0) 3392 return 'ok2' 3393 3394 async def inner(): 3395 return await asyncio.gather(coro1(), coro2(), loop=self.loop) 3396 3397 result = self.loop.run_until_complete(inner()) 3398 self.assertEqual(['ok1', 'ok2'], result) 3399 3400 def test_debug_mode_interop(self): 3401 # https://bugs.python.org/issue32636 3402 code = textwrap.dedent(""" 3403 import asyncio 3404 3405 async def native_coro(): 3406 pass 3407 3408 @asyncio.coroutine 3409 def old_style_coro(): 3410 yield from native_coro() 3411 3412 asyncio.run(old_style_coro()) 3413 """) 3414 3415 assert_python_ok("-Wignore::DeprecationWarning", "-c", code, 3416 PYTHONASYNCIODEBUG="1") 3417 3418 3419if __name__ == '__main__': 3420 unittest.main() 3421