1# test_kernel.py 2 3import time 4import pytest 5from curio import * 6kernel_clock = clock 7from curio import traps 8 9def test_hello(kernel): 10 11 async def hello(): 12 return 'hello' 13 14 result = kernel.run(hello) 15 assert result == 'hello' 16 17def test_raise(kernel): 18 class Error(Exception): 19 pass 20 21 async def boom(): 22 raise Error() 23 24 try: 25 kernel.run(boom) 26 assert False, 'boom() did not raise' 27 except Error as e: 28 pass 29 except: 30 assert False, 'boom() raised wrong error' 31 32def test_sleep(kernel): 33 start = end = 0 34 async def main(): 35 nonlocal start, end 36 start = time.time() 37 await sleep(0.5) 38 end = time.time() 39 40 kernel.run(main) 41 elapsed = end - start 42 assert elapsed > 0.5 43 44def test_clock(kernel): 45 async def main(): 46 start = await clock() 47 await sleep(0.1) 48 end = await clock() 49 assert (end - start) >= 0.1 50 kernel.run(main) 51 52def test_sleep_cancel(kernel): 53 cancelled = False 54 55 async def sleeper(): 56 nonlocal cancelled 57 try: 58 await sleep(1) 59 assert False 60 except CancelledError: 61 cancelled = True 62 63 async def main(): 64 task = await spawn(sleeper) 65 await sleep(0.1) 66 await task.cancel() 67 68 kernel.run(main) 69 assert cancelled 70 71def test_sleep_timeout(kernel): 72 cancelled = True 73 74 async def sleeper(): 75 nonlocal cancelled 76 try: 77 await timeout_after(0.1, sleep, 1) 78 assert False 79 except TaskTimeout: 80 cancelled = True 81 82 async def main(): 83 task = await spawn(sleeper) 84 await task.join() 85 86 kernel.run(main) 87 assert cancelled 88 89def test_sleep_ignore_timeout(kernel): 90 async def sleeper(): 91 cancelled = False 92 93 if await ignore_after(0.1, sleep(1)) is None: 94 cancelled = True 95 assert cancelled 96 97 cancelled = False 98 async with ignore_after(0.1) as s: 99 await sleep(1) 100 if s.result is None: 101 cancelled = True 102 103 assert cancelled 104 105 async def main(): 106 task = await spawn(sleeper) 107 await task.join() 108 109 kernel.run(main) 110 111def test_sleep_notimeout(kernel): 112 async def sleeper(): 113 try: 114 await timeout_after(0.5, sleep(0.1)) 115 assert True 116 except TaskTimeout: 117 assert False 118 await sleep(0.5) 119 assert True 120 121 async def main(): 122 task = await spawn(sleeper) 123 await task.join() 124 125 kernel.run(main) 126 127def test_task_join(kernel): 128 async def child(): 129 return 37 130 131 async def main(): 132 task = await spawn(child) 133 r = await task.join() 134 assert r == 37 135 136 kernel.run(main) 137 138def test_task_join_error(kernel): 139 async def child(): 140 int('bad') 141 142 async def main(): 143 task = await spawn(child) 144 try: 145 r = await task.join() 146 assert False 147 except TaskError as e: 148 assert isinstance(e.__cause__, ValueError) 149 150 kernel.run(main) 151 152def test_task_cancel(kernel): 153 cancelled = False 154 async def child(): 155 nonlocal cancelled 156 try: 157 await sleep(0.5) 158 assert False 159 except CancelledError: 160 cancelled = True 161 162 async def main(): 163 task = await spawn(child) 164 await task.cancel() 165 assert cancelled 166 167 kernel.run(main) 168 169def test_task_cancel_poll(kernel): 170 results = [] 171 172 async def child(): 173 async with disable_cancellation(): 174 await sleep(0.1) 175 results.append('success') 176 if await check_cancellation(): 177 results.append('cancelled') 178 else: 179 assert False 180 181 async def main(): 182 task = await spawn(child) 183 await task.cancel() 184 results.append('done') 185 186 kernel.run(main) 187 assert results == ['success', 'cancelled', 'done'] 188 189def test_task_cancel_not_blocking(kernel): 190 async def child(e1, e2): 191 await e1.set() 192 try: 193 await sleep(1000) 194 except CancelledError: 195 await e2.wait() 196 raise 197 198 async def main(): 199 e1 = Event() 200 e2 = Event() 201 task = await spawn(child, e1, e2) 202 await e1.wait() 203 await task.cancel(blocking=False) 204 await e2.set() 205 try: 206 await task.join() 207 except TaskError as e: 208 assert isinstance(e.__cause__, CancelledError) 209 210 kernel.run(main) 211 212 213def test_task_cancel_join(kernel): 214 child_evt = Event() 215 async def child(): 216 await child_evt.wait() 217 assert False 218 219 async def main(): 220 task = await spawn(child) 221 await sleep(0) 222 await task.cancel() 223 224 # Try joining with a cancelled task. Should raise a TaskError 225 try: 226 await task.join() 227 except TaskError as e: 228 assert isinstance(e.__cause__, CancelledError) 229 else: 230 assert False 231 assert True 232 233 kernel.run(main) 234 235def test_task_cancel_join_wait(kernel): 236 evt = Event() 237 238 async def child(): 239 await evt.wait() 240 241 async def canceller(task): 242 await task.cancel() 243 244 async def main(): 245 task1 = await spawn(child) 246 task2 = await spawn(canceller, task1) 247 await task2.join() 248 assert not evt.is_set() 249 try: 250 await task1.join() # Should raise TaskError... with CancelledError as cause 251 assert False 252 except TaskError as e: 253 assert isinstance(e.__cause__, CancelledError) 254 else: 255 assert False 256 257 kernel.run(main) 258 259def test_task_child_cancel(kernel): 260 results = [] 261 262 async def child(): 263 results.append('start') 264 try: 265 await sleep(0.5) 266 results.append('end') 267 except CancelledError: 268 results.append('child cancelled') 269 270 async def parent(): 271 try: 272 child_task = await spawn(child) 273 await sleep(0.5) 274 results.append('end parent') 275 except CancelledError: 276 await child_task.cancel() 277 results.append('parent cancelled') 278 279 async def grandparent(): 280 try: 281 parent_task = await spawn(parent) 282 await sleep(0.5) 283 results.append('end grandparent') 284 except CancelledError: 285 await parent_task.cancel() 286 results.append('grandparent cancelled') 287 288 async def main(): 289 task = await spawn(grandparent) 290 await sleep(0.1) 291 results.append('cancel start') 292 await sleep(0.1) 293 results.append('cancelling') 294 await task.cancel() 295 results.append('done') 296 297 kernel.run(main) 298 299 assert results == [ 300 'start', 301 'cancel start', 302 'cancelling', 303 'child cancelled', 304 'parent cancelled', 305 'grandparent cancelled', 306 'done', 307 ] 308 309 310def test_task_ready_cancel(kernel): 311 # This tests a tricky corner case of a task cancelling another task that's also 312 # on the ready queue. 313 results = [] 314 315 async def child(): 316 try: 317 results.append('child sleep') 318 await sleep(1.0) 319 results.append('child slept') 320 await sleep(1.0) 321 results.append('should not see this') 322 except CancelledError: 323 results.append('child cancelled') 324 325 async def parent(): 326 task = await spawn(child) 327 results.append('parent sleep') 328 await sleep(0.5) 329 results.append('cancel start') 330 await task.cancel() 331 results.append('cancel done') 332 333 async def main(): 334 task = await spawn(parent) 335 await sleep(0.1) 336 time.sleep(1) # Forced block of the event loop. Both tasks should awake when we come back 337 await sleep(0.1) 338 339 kernel.run(main) 340 341 assert results == [ 342 'parent sleep', 343 'child sleep', 344 'cancel start', 345 'child slept', 346 'child cancelled', 347 'cancel done' 348 ] 349 350 351def test_double_cancel(kernel): 352 results = [] 353 354 async def sleeper(): 355 results.append('start') 356 try: 357 await sleep(1) 358 results.append('not here') 359 except CancelledError: 360 results.append('cancel request') 361 await sleep(1) 362 results.append('cancelled') 363 364 async def main(): 365 task = await spawn(sleeper) 366 await sleep(0.5) 367 try: 368 await timeout_after(1, task.cancel()) 369 except TaskTimeout: 370 results.append('retry') 371 await task.cancel() # This second cancel should not abort any operation in sleeper 372 results.append('done cancel') 373 374 kernel.run(main) 375 assert results == [ 376 'start', 377 'cancel request', 378 'retry', 379 'cancelled', 380 'done cancel' 381 ] 382 383 384def test_nested_timeout(kernel): 385 results = [] 386 387 async def coro1(): 388 results.append('coro1 start') 389 await sleep(1) 390 results.append('coro1 done') 391 392 async def coro2(): 393 results.append('coro2 start') 394 await sleep(1) 395 results.append('coro2 done') 396 397 # Parent should cause a timeout before the child. 398 # Results in a TimeoutCancellationError instead of a normal TaskTimeout 399 async def child(): 400 try: 401 await timeout_after(5, coro1()) 402 results.append('coro1 success') 403 except TaskTimeout: 404 results.append('coro1 timeout') 405 except TimeoutCancellationError: 406 results.append('coro1 timeout cancel') 407 408 await coro2() 409 results.append('coro2 success') 410 411 async def parent(): 412 try: 413 await timeout_after(0.75, child()) 414 except TaskTimeout: 415 results.append('parent timeout') 416 417 kernel.run(parent) 418 assert results == [ 419 'coro1 start', 420 'coro1 timeout cancel', 421 'coro2 start', 422 'parent timeout' 423 ] 424 425 426def test_nested_context_timeout(kernel): 427 results = [] 428 429 async def coro1(): 430 results.append('coro1 start') 431 await sleep(1) 432 results.append('coro1 done') 433 434 async def coro2(): 435 results.append('coro2 start') 436 await sleep(1) 437 results.append('coro2 done') 438 439 # Parent should cause a timeout before the child. 440 # Results in a TimeoutCancellationError instead of a normal TaskTimeout 441 async def child(): 442 try: 443 async with timeout_after(5): 444 await coro1() 445 results.append('coro1 success') 446 except TaskTimeout: 447 results.append('coro1 timeout') 448 except TimeoutCancellationError: 449 results.append('coro1 timeout cancel') 450 451 await coro2() 452 results.append('coro2 success') 453 454 async def parent(): 455 try: 456 async with timeout_after(0.75): 457 await child() 458 except TaskTimeout: 459 results.append('parent timeout') 460 461 kernel.run(parent) 462 assert results == [ 463 'coro1 start', 464 'coro1 timeout cancel', 465 'coro2 start', 466 'parent timeout' 467 ] 468 469def test_nested_context_timeout2(kernel): 470 async def coro1(): 471 try: 472 async with timeout_after(10): 473 await sleep(5) 474 except CancelledError as e: 475 assert isinstance(e, TimeoutCancellationError) 476 raise 477 else: 478 assert False 479 480 async def coro2(): 481 try: 482 async with timeout_after(15): 483 await coro1() 484 except CancelledError as e: 485 assert isinstance(e, TimeoutCancellationError) 486 raise 487 else: 488 assert False 489 490 async def parent(): 491 try: 492 async with timeout_after(0.1): 493 await coro2() 494 except BaseException as e: 495 assert isinstance(e, TaskTimeout) 496 else: 497 assert False 498 499 kernel.run(parent) 500 501def test_nested_context_timeout3(kernel): 502 async def coro1(): 503 try: 504 await timeout_after(10, sleep, 5) 505 except CancelledError as e: 506 assert isinstance(e, TimeoutCancellationError) 507 raise 508 else: 509 assert False 510 511 async def coro2(): 512 try: 513 await timeout_after(15, coro1) 514 except CancelledError as e: 515 assert isinstance(e, TimeoutCancellationError) 516 raise 517 else: 518 assert False 519 520 async def parent(): 521 try: 522 await timeout_after(0.1, coro2) 523 except BaseException as e: 524 assert isinstance(e, TaskTimeout) 525 else: 526 assert False 527 528 kernel.run(parent) 529 530def test_nested_timeout_uncaught(kernel): 531 results = [] 532 533 async def coro1(): 534 results.append('coro1 start') 535 await sleep(5) 536 results.append('coro1 done') 537 538 async def child(): 539 # This will cause a TaskTimeout, but it's uncaught 540 await timeout_after(1, coro1()) 541 542 async def parent(): 543 try: 544 await timeout_after(10, child()) 545 except TaskTimeout: 546 results.append('parent timeout') 547 except UncaughtTimeoutError: 548 results.append('uncaught timeout') 549 550 kernel.run(parent) 551 assert results == [ 552 'coro1 start', 553 'uncaught timeout' 554 ] 555 556 557def test_nested_context_timeout_uncaught(kernel): 558 results = [] 559 560 async def coro1(): 561 results.append('coro1 start') 562 await sleep(5) 563 results.append('coro1 done') 564 565 async def child(): 566 # This will cause a TaskTimeout, but it's uncaught 567 async with timeout_after(1): 568 await coro1() 569 570 async def parent(): 571 try: 572 async with timeout_after(10): 573 await child() 574 except TaskTimeout: 575 results.append('parent timeout') 576 except UncaughtTimeoutError: 577 results.append('uncaught timeout') 578 579 kernel.run(parent) 580 assert results == [ 581 'coro1 start', 582 'uncaught timeout' 583 ] 584 585 586def test_nested_timeout_none(kernel): 587 results = [] 588 589 async def coro1(): 590 results.append('coro1 start') 591 await sleep(2) 592 results.append('coro1 done') 593 594 async def coro2(): 595 results.append('coro2 start') 596 await sleep(1) 597 results.append('coro2 done') 598 599 async def child(): 600 await timeout_after(None, coro1()) 601 results.append('coro1 success') 602 await coro2() 603 results.append('coro2 success') 604 605 async def parent(): 606 try: 607 await timeout_after(1, child()) 608 except TaskTimeout: 609 results.append('parent timeout') 610 611 kernel.run(parent) 612 assert results == [ 613 'coro1 start', 614# 'coro1 done', 615# 'coro1 success', 616# 'coro2 start', 617 'parent timeout' 618 ] 619 620 621def test_task_run_error(kernel): 622 results = [] 623 624 async def main(): 625 int('bad') 626 627 try: 628 kernel.run(main) 629 assert False, "Exception not raised" 630 except ValueError as e: 631 pass 632 except: 633 assert False, "Wrong exception raised" 634 635def test_sleep_0_starvation(kernel): 636 # This task should not block other tasks from running, and should be 637 # cancellable. We used to have a bug where neither were true... 638 async def loop_forever(): 639 while True: 640 print("Sleeping 0") 641 await sleep(0) 642 643 async def io1(sock): 644 await sock.recv(1) 645 await sock.send(b"x") 646 await sock.recv(1) 647 648 async def io2(sock): 649 await sock.send(b"x") 650 await sock.recv(1) 651 await sock.send(b"x") 652 653 async def main(): 654 loop_task = await spawn(loop_forever) 655 await sleep(0) 656 import curio.socket 657 sock1, sock2 = curio.socket.socketpair() 658 io1_task = await spawn(io1, sock1) 659 io2_task = await spawn(io2, sock2) 660 await io1_task.join() 661 await io2_task.join() 662 await loop_task.cancel() 663 664 kernel.run(main) 665 666 667def test_ping_pong_starvation(kernel): 668 # It used to be that two of these tasks could starve out other tasks 669 async def pingpong(inq, outq): 670 while True: 671 await outq.put(await inq.get()) 672 673 async def i_will_survive(): 674 for _ in range(10): 675 await sleep(0) 676 return "i survived!" 677 678 async def main(): 679 q1 = Queue() 680 q2 = Queue() 681 await q1.put("something") 682 pp1 = await spawn(pingpong, q1, q2) 683 pp2 = await spawn(pingpong, q2, q1) 684 iws = await spawn(i_will_survive) 685 686 assert (await iws.join()) == "i survived!" 687 await pp1.cancel() 688 await pp2.cancel() 689 690 kernel.run(main) 691 692def test_task_cancel_timeout(kernel): 693 # Test that cancellation also cancels timeouts 694 results = [] 695 696 async def coro(): 697 try: 698 await sleep(5) 699 except CancelledError: 700 results.append('cancelled') 701 await sleep(1) 702 results.append('done cancel') 703 raise 704 705 async def child(): 706 results.append('child') 707 try: 708 async with timeout_after(1): 709 await coro() 710 except TaskTimeout: 711 results.append('timeout') 712 713 async def main(): 714 task = await spawn(child) 715 await sleep(0.5) 716 await task.cancel() 717 718 kernel.run(main) 719 assert results == [ 'child', 'cancelled', 'done cancel' ] 720 721def test_reentrant_kernel(kernel): 722 async def child(): 723 pass 724 725 async def main(): 726 with pytest.raises(RuntimeError): 727 kernel.run(child) 728 729 kernel.run(main) 730 731from curio.traps import * 732 733def test_pending_cancellation(kernel): 734 async def main(): 735 self = await _get_current() 736 self.cancel_pending = CancelledError() 737 738 with pytest.raises(CancelledError): 739 await _read_wait(None) 740 741 self.cancel_pending = CancelledError() 742 with pytest.raises(CancelledError): 743 await _future_wait(None) 744 745 self.cancel_pending = CancelledError() 746 with pytest.raises(CancelledError): 747 await _scheduler_wait(None, None) 748 749 self.cancel_pending = TaskTimeout() 750 try: 751 await _unset_timeout(None) 752 assert True 753 except TaskTimeout: 754 assert False 755 756 kernel.run(main) 757 758from functools import partial 759 760def test_single_stepping(kernel): 761 value = 0 762 async def child(): 763 nonlocal value 764 await sleep(0) 765 value = 1 766 await sleep(0.1) 767 value = 2 768 769 task = kernel.run(partial(spawn, child, daemon=True)) 770 while value < 1: 771 kernel.run() 772 assert True 773 time.sleep(0.2) 774 kernel.run() 775 assert value == 2 776 777def test_io_registration(kernel): 778 # Tests some tricky corner cases of the kernel that are difficult 779 # to get to under normal socket usage 780 import socket 781 s1, s2 = socket.socketpair() 782 s1.setblocking(False) 783 s2.setblocking(False) 784 785 # Fill the send buffer 786 while True: 787 try: 788 s1.send(b'x'*100000) 789 except BlockingIOError: 790 break 791 792 async def reader1(): 793 await traps._read_wait(s1.fileno()) 794 data = s1.recv(100) 795 assert data == b'hello' 796 797 async def writer1(): 798 await traps._write_wait(s1.fileno()) 799 assert False 800 801 async def writer2(): 802 with pytest.raises(WriteResourceBusy): 803 await traps._write_wait(s1.fileno()) 804 805 async def main(): 806 t0 = await spawn(reader1) 807 t1 = await spawn(writer1) 808 t2 = await spawn(writer2) 809 await t2.join() 810 await t1.cancel() 811 s2.send(b'hello') 812 await t0.join() 813 s1.close() 814 s2.close() 815 816 kernel.run(main) 817 818from functools import partial 819 820def test_coro_partial(kernel): 821 async def func(x, y, z): 822 assert x == 1 823 assert y == 2 824 assert z == 3 825 return True 826 827 async def main(): 828 assert await func(1, 2, 3) 829 assert await ignore_after(1, func(1,2,3)) 830 assert await ignore_after(1, func, 1, 2, 3) 831 assert await ignore_after(1, partial(func, 1, 2), 3) 832 assert await ignore_after(1, partial(func, z=3), 1, 2) 833 834 # Try spawns 835 t = await spawn(func(1,2,3)) 836 assert await t.join() 837 838 t = await spawn(func, 1, 2, 3) 839 assert await t.join() 840 841 t = await spawn(partial(func, 1, 2), 3) 842 assert await t.join() 843 844 t = await spawn(partial(func, z=3), 1, 2) 845 assert await t.join() 846 847 kernel.run(main) 848 849def test_custom_cancel(kernel): 850 class CustomCancelled(CancelledError): 851 pass 852 853 evt = Event() 854 async def child(): 855 try: 856 await evt.wait() 857 except CustomCancelled: 858 assert True 859 except: 860 assert False 861 else: 862 assert False 863 864 async def main(): 865 t = await spawn(child) 866 await t.cancel(exc=CustomCancelled) 867 868 kernel.run(main) 869 870def test_timeout_badness(kernel): 871 import time 872 async def main(): 873 async with timeout_after(0.1): 874 time.sleep(0.2) # Timeout will take too long. Should issue a warning. 875 876 # Execution should make it here. There were no blocking operations to cancel. 877 # It makes no sense to issue a cancellation on the next operation because we're 878 # Likely out of the timeout block 879 assert True 880 881 kernel.run(main) 882 883def test_kernel_no_shutdown(): 884 # Code coverage test 885 k = Kernel() 886 del k 887 888 with Kernel() as k: 889 pass 890 891 with pytest.raises(RuntimeError): 892 k.run() 893 894 895def test_kernel_exit(): 896 # Code coverage test 897 async def main(): 898 raise SystemExit() 899 900 with pytest.raises(SystemExit): 901 with Kernel() as k: 902 k.run(main) 903 904 905def test_kernel_badtrap(): 906 # Code coverage test 907 async def main(): 908 from curio.traps import _kernel_trap 909 await _kernel_trap('bogus', 1) 910 911 with pytest.raises(KeyError): 912 with Kernel() as k: 913 k.run(main) 914 915def test_kernel_multischedule(kernel): 916 async def sleeper(): 917 try: 918 async with timeout_after(0.5): 919 await sleep(0.25) 920 except TaskTimeout: 921 assert False 922 923 await sleep(0.1) # Should not crash! 924 return True 925 926 async def main(): 927 import time 928 t = await spawn(sleeper) 929 await sleep(0.1) 930 time.sleep(1) # Force time clock to elapse past both the sleep and outer timeout 931 r = await t.join() 932 assert r 933 934 kernel.run(main) 935 936def test_kernel_debug(): 937 from curio.debug import schedtrace, traptrace 938 async def hello(): 939 await sleep(0) 940 941 with Kernel(debug=[schedtrace,traptrace]) as k: 942 k.run(hello) 943 944 with Kernel(debug=True) as k: 945 k.run(hello) 946 947 with Kernel(debug=schedtrace) as k: 948 k.run(hello) 949 950 with Kernel(debug=schedtrace(filter='none')) as k: 951 k.run(hello) 952 953 954 955 956 957