1# test_io.py 2 3from curio import * 4from curio.socket import * 5import io 6import socket as std_socket 7import pytest 8import sys 9 10def test_socket_blocking(kernel, portno): 11 ''' 12 Test of exposing a socket in blocking mode 13 ''' 14 done = Event() 15 16 results = [] 17 18 def sync_client(sock): 19 assert isinstance(sock, std_socket.socket) 20 data = sock.recv(8192) 21 results.append(('client', data)) 22 sock.sendall(data) 23 24 async def handler(client, addr): 25 results.append('handler start') 26 await client.send(b'OK') 27 with client.blocking() as s: 28 await run_in_thread(sync_client, s) 29 results.append('handler done') 30 await done.set() 31 32 async def test_client(address, serv): 33 sock = socket(AF_INET, SOCK_STREAM) 34 await sock.connect(address) 35 await sock.recv(8) 36 await sock.send(b'Message') 37 data = await sock.recv(8192) 38 await sock.close() 39 40 async def main(): 41 serv = await spawn(tcp_server, '', portno, handler) 42 c = await spawn(test_client, ('localhost', portno), serv) 43 await done.wait() 44 await serv.cancel() 45 await c.join() 46 47 48 kernel.run(main()) 49 50 assert results == [ 51 'handler start', 52 ('client', b'Message'), 53 'handler done' 54 ] 55 56@pytest.mark.skipif(sys.platform.startswith("win"), 57 reason="not supported on Windows") 58def test_socketstream_blocking(kernel, portno): 59 ''' 60 Test of exposing a socket stream in blocking mode 61 ''' 62 done = Event() 63 results = [] 64 65 def sync_client(f): 66 assert isinstance(f, io.RawIOBase) 67 data = f.read(8192) 68 results.append(('client', data)) 69 f.write(data) 70 71 async def handler(client, addr): 72 results.append('handler start') 73 await client.send(b'OK') 74 s = client.as_stream() 75 with s.blocking() as f: 76 await run_in_thread(sync_client, f) 77 results.append('handler done') 78 await done.set() 79 80 async def test_client(address, serv): 81 sock = socket(AF_INET, SOCK_STREAM) 82 await sock.connect(address) 83 await sock.recv(8) 84 await sock.send(b'Message') 85 data = await sock.recv(8192) 86 await sock.close() 87 88 async def main(): 89 serv = await spawn(tcp_server, '', portno, handler) 90 c = await spawn(test_client, ('localhost', portno), serv) 91 await done.wait() 92 await serv.cancel() 93 await c.join() 94 95 kernel.run(main()) 96 97 assert results == [ 98 'handler start', 99 ('client', b'Message'), 100 'handler done' 101 ] 102 103@pytest.mark.skipif(sys.platform.startswith("win"), 104 reason="not supported on Windows") 105def test_filestream_blocking(kernel, portno): 106 ''' 107 Test of exposing a socket in blocking mode 108 ''' 109 done = Event() 110 results = [] 111 112 def sync_client(f): 113 assert isinstance(f, io.RawIOBase) 114 data = f.read(8192) 115 results.append(('client', data)) 116 f.write(data) 117 118 async def handler(client, addr): 119 results.append('handler start') 120 await client.send(b'OK') 121 s = client.makefile('rwb', buffering=0) 122 with s.blocking() as f: 123 await run_in_thread(sync_client, f) 124 results.append('handler done') 125 await done.set() 126 127 async def test_client(address, serv): 128 sock = socket(AF_INET, SOCK_STREAM) 129 await sock.connect(address) 130 await sock.recv(8) 131 await sock.send(b'Message') 132 data = await sock.recv(8192) 133 await sock.close() 134 135 async def main(): 136 serv = await spawn(tcp_server, '', portno, handler) 137 c = await spawn(test_client, ('localhost', portno), serv) 138 await done.wait() 139 await serv.cancel() 140 await c.join() 141 142 kernel.run(main()) 143 144 assert results == [ 145 'handler start', 146 ('client', b'Message'), 147 'handler done' 148 ] 149 150@pytest.mark.skipif(sys.platform.startswith("win"), 151 reason="not supported on Windows") 152def test_filestream_bad_blocking(kernel, portno): 153 ''' 154 Test of exposing a socket in blocking mode with buffered data error 155 ''' 156 done = Event() 157 results = [] 158 159 async def handler(client, addr): 160 await client.send(b'OK') 161 s = client.makefile('rwb', buffering=0) 162 line = await s.readline() 163 assert line == b'hello\n' 164 try: 165 with s.blocking() as f: 166 pass 167 except IOError: 168 results.append(True) 169 else: 170 results.append(False) 171 await done.set() 172 173 async def test_client(address, serv): 174 sock = socket(AF_INET, SOCK_STREAM) 175 await sock.connect(address) 176 await sock.recv(8) 177 await sock.send(b'hello\nworld\n') 178 data = await sock.recv(8192) 179 await sock.close() 180 181 async def main(): 182 serv = await spawn(tcp_server, '', portno, handler) 183 c = await spawn(test_client, ('localhost', portno), serv) 184 await done.wait() 185 await serv.cancel() 186 await c.join() 187 188 kernel.run(main()) 189 assert results[0] 190 191@pytest.mark.skipif(sys.platform.startswith("win"), 192 reason="not supported on Windows") 193def test_socketstream_bad_blocking(kernel, portno): 194 ''' 195 Test of exposing a socket in blocking mode with buffered data error 196 ''' 197 done = Event() 198 results = [] 199 200 async def handler(client, addr): 201 await client.send(b'OK') 202 s = client.as_stream() 203 line = await s.readline() 204 assert line == b'hello\n' 205 try: 206 with s.blocking() as f: 207 pass 208 except IOError: 209 results.append(True) 210 else: 211 results.append(False) 212 await done.set() 213 214 async def test_client(address, serv): 215 sock = socket(AF_INET, SOCK_STREAM) 216 await sock.connect(address) 217 await sock.recv(8) 218 await sock.send(b'hello\nworld\n') 219 data = await sock.recv(8192) 220 await sock.close() 221 222 async def main(): 223 serv = await spawn(tcp_server, '', portno, handler) 224 c = await spawn(test_client, ('localhost', portno), serv) 225 await done.wait() 226 await serv.cancel() 227 await c.join() 228 229 kernel.run(main()) 230 assert results[0] 231 232def test_read_partial(kernel, portno): 233 done = Event() 234 results = [] 235 async def handler(client, addr): 236 try: 237 await client.send(b'OK') 238 s = client.as_stream() 239 line = await s.readline() 240 results.append(line) 241 data = await s.read(2) 242 results.append(data) 243 data = await s.read(1000) 244 results.append(data) 245 finally: 246 await done.set() 247 248 async def test_client(address, serv): 249 sock = socket(AF_INET, SOCK_STREAM) 250 await sock.connect(address) 251 await sock.recv(8) 252 await sock.send(b'hello\nworld\n') 253 await sock.close() 254 255 256 async def main(): 257 serv = await spawn(tcp_server, '', portno, handler) 258 c = await spawn(test_client, ('localhost', portno), serv) 259 await done.wait() 260 await serv.cancel() 261 await c.join() 262 263 kernel.run(main()) 264 assert results == [ b'hello\n', b'wo', b'rld\n'] 265 266def test_readall(kernel, portno): 267 done = Event() 268 results = [] 269 270 async def handler(client, addr): 271 results.append('handler start') 272 await client.send(b'OK') 273 async with client.as_stream() as s: 274 data = await s.readall() 275 results.append(data) 276 results.append('handler done') 277 await done.set() 278 279 async def test_client(address, serv): 280 sock = socket(AF_INET, SOCK_STREAM) 281 await sock.connect(address) 282 await sock.recv(8) 283 await sock.send(b'Msg1\n') 284 await sleep(0.1) 285 await sock.send(b'Msg2\n') 286 await sleep(0.1) 287 await sock.send(b'Msg3\n') 288 await sleep(0.1) 289 await sock.close() 290 291 292 async def main(): 293 serv = await spawn(tcp_server, '', portno, handler) 294 c = await spawn(test_client, ('localhost', portno), serv) 295 await done.wait() 296 await serv.cancel() 297 await c.join() 298 299 kernel.run(main()) 300 301 assert results == [ 302 'handler start', 303 b'Msg1\nMsg2\nMsg3\n', 304 'handler done' 305 ] 306 307 308def test_readall_partial(kernel, portno): 309 done = Event() 310 311 async def handler(client, addr): 312 await client.send(b'OK') 313 s = client.as_stream() 314 line = await s.readline() 315 assert line == b'hello\n' 316 data = await s.readall() 317 assert data == b'world\n' 318 await done.set() 319 320 async def test_client(address, serv): 321 sock = socket(AF_INET, SOCK_STREAM) 322 await sock.connect(address) 323 await sock.recv(8) 324 await sock.send(b'hello\nworld\n') 325 await sock.close() 326 327 328 async def main(): 329 serv = await spawn(tcp_server, '', portno, handler) 330 c = await spawn(test_client, ('localhost', portno), serv) 331 await done.wait() 332 await serv.cancel() 333 await c.join() 334 335 kernel.run(main()) 336 337 338def test_readall_timeout(kernel, portno): 339 done = Event() 340 results = [] 341 342 async def handler(client, addr): 343 results.append('handler start') 344 await client.send(b'OK') 345 s = client.as_stream() 346 try: 347 data = await timeout_after(0.5, s.readall()) 348 except TaskTimeout as e: 349 results.append(e.bytes_read) 350 results.append('handler done') 351 await done.set() 352 353 async def test_client(address, serv): 354 sock = socket(AF_INET, SOCK_STREAM) 355 await sock.connect(address) 356 await sock.recv(8) 357 await sock.send(b'Msg1\n') 358 await sleep(1) 359 await sock.send(b'Msg2\n') 360 await sock.close() 361 362 async def main(): 363 serv = await spawn(tcp_server, '', portno, handler) 364 c = await spawn(test_client, ('localhost', portno), serv) 365 await done.wait() 366 await serv.cancel() 367 await c.join() 368 369 kernel.run(main()) 370 371 assert results == [ 372 'handler start', 373 b'Msg1\n', 374 'handler done' 375 ] 376 377 378def test_read_exactly(kernel, portno): 379 done = Event() 380 results = [] 381 382 async def handler(client, addr): 383 results.append('handler start') 384 await client.send(b'OK') 385 s = client.as_stream() 386 for n in range(3): 387 results.append(await s.read_exactly(5)) 388 results.append('handler done') 389 await done.set() 390 391 async def test_client(address, serv): 392 sock = socket(AF_INET, SOCK_STREAM) 393 await sock.connect(address) 394 await sock.recv(8) 395 await sock.send(b'Msg1\nMsg2\nMsg3\n') 396 await sock.close() 397 398 async def main(): 399 serv = await spawn(tcp_server, '', portno, handler) 400 c = await spawn(test_client, ('localhost', portno), serv) 401 await done.wait() 402 await serv.cancel() 403 await c.join() 404 405 kernel.run(main()) 406 407 assert results == [ 408 'handler start', 409 b'Msg1\n', 410 b'Msg2\n', 411 b'Msg3\n', 412 'handler done' 413 ] 414 415 416def test_read_exactly_incomplete(kernel, portno): 417 done = Event() 418 results = [] 419 async def handler(client, addr): 420 await client.send(b'OK') 421 s = client.as_stream() 422 try: 423 await s.read_exactly(100) 424 except EOFError as e: 425 results.append(e.bytes_read) 426 finally: 427 await done.set() 428 429 async def test_client(address, serv): 430 sock = socket(AF_INET, SOCK_STREAM) 431 await sock.connect(address) 432 await sock.recv(8) 433 await sock.send(b'Msg1\nMsg2\nMsg3\n') 434 await sock.close() 435 436 async def main(): 437 serv = await spawn(tcp_server, '', portno, handler) 438 c = await spawn(test_client, ('localhost', portno), serv) 439 await done.wait() 440 await serv.cancel() 441 await c.join() 442 443 kernel.run(main()) 444 445 assert results[0] == b'Msg1\nMsg2\nMsg3\n' 446 447def test_read_exactly_timeout(kernel, portno): 448 done = Event() 449 results = [] 450 451 async def handler(client, addr): 452 results.append('handler start') 453 await client.send(b'OK') 454 s = client.as_stream() 455 try: 456 data = await timeout_after(0.5, s.read_exactly(10)) 457 results.append(data) 458 except TaskTimeout as e: 459 results.append(e.bytes_read) 460 results.append('handler done') 461 await done.set() 462 463 async def test_client(address, serv): 464 sock = socket(AF_INET, SOCK_STREAM) 465 await sock.connect(address) 466 await sock.recv(8) 467 await sock.send(b'Msg1\n') 468 await sleep(1) 469 await sock.send(b'Msg2\n') 470 await sock.close() 471 472 473 async def main(): 474 serv = await spawn(tcp_server, '', portno, handler) 475 c = await spawn(test_client, ('localhost', portno), serv) 476 await done.wait() 477 await serv.cancel() 478 await c.join() 479 480 kernel.run(main()) 481 482 assert results == [ 483 'handler start', 484 b'Msg1\n', 485 'handler done' 486 ] 487 488 489 490def test_readline(kernel, portno): 491 done = Event() 492 results = [] 493 494 async def handler(client, addr): 495 results.append('handler start') 496 await client.send(b'OK') 497 s = client.as_stream() 498 for n in range(3): 499 results.append(await s.readline()) 500 results.append('handler done') 501 await done.set() 502 503 async def test_client(address, serv): 504 sock = socket(AF_INET, SOCK_STREAM) 505 await sock.connect(address) 506 await sock.recv(8) 507 await sock.send(b'Msg1\nMsg2\nMsg3\n') 508 await sock.close() 509 510 async def main(): 511 serv = await spawn(tcp_server, '', portno, handler) 512 c = await spawn(test_client, ('localhost', portno), serv) 513 await done.wait() 514 await serv.cancel() 515 await c.join() 516 517 kernel.run(main()) 518 519 assert results == [ 520 'handler start', 521 b'Msg1\n', 522 b'Msg2\n', 523 b'Msg3\n', 524 'handler done' 525 ] 526 527 528def test_readlines(kernel, portno): 529 done = Event() 530 results = [] 531 532 async def handler(client, addr): 533 await client.send(b'OK') 534 results.append('handler start') 535 s = client.as_stream() 536 results.extend(await s.readlines()) 537 results.append('handler done') 538 await done.set() 539 540 async def test_client(address, serv): 541 sock = socket(AF_INET, SOCK_STREAM) 542 await sock.connect(address) 543 await sock.recv(8) 544 await sock.send(b'Msg1\nMsg2\nMsg3\n') 545 await sock.close() 546 547 async def main(): 548 serv = await spawn(tcp_server, '', portno, handler) 549 c = await spawn(test_client, ('localhost', portno), serv) 550 await done.wait() 551 await serv.cancel() 552 await c.join() 553 554 kernel.run(main()) 555 556 assert results == [ 557 'handler start', 558 b'Msg1\n', 559 b'Msg2\n', 560 b'Msg3\n', 561 'handler done' 562 ] 563 564 565def test_readlines_timeout(kernel, portno): 566 done = Event() 567 results = [] 568 569 async def handler(client, addr): 570 await client.send(b'OK') 571 results.append('handler start') 572 s = client.as_stream() 573 try: 574 await timeout_after(0.5, s.readlines()) 575 except TaskTimeout as e: 576 results.extend(e.lines_read) 577 results.append('handler done') 578 await done.set() 579 580 async def test_client(address, serv): 581 sock = socket(AF_INET, SOCK_STREAM) 582 await sock.connect(address) 583 await sock.recv(8) 584 await sock.send(b'Msg1\nMsg2\n') 585 await sleep(1) 586 await sock.send(b'Msg3\n') 587 await sock.close() 588 589 590 async def main(): 591 serv = await spawn(tcp_server, '', portno, handler) 592 c = await spawn(test_client, ('localhost', portno), serv) 593 await done.wait() 594 await serv.cancel() 595 await c.join() 596 597 kernel.run(main()) 598 599 assert results == [ 600 'handler start', 601 b'Msg1\n', 602 b'Msg2\n', 603 'handler done' 604 ] 605 606 607def test_writelines(kernel, portno): 608 done = Event() 609 results = [] 610 611 async def handler(client, addr): 612 results.append('handler start') 613 await client.send(b'OK') 614 s = client.as_stream() 615 results.append(await s.readall()) 616 results.append('handler done') 617 await done.set() 618 619 async def test_client(address, serv): 620 sock = socket(AF_INET, SOCK_STREAM) 621 await sock.connect(address) 622 await sock.recv(8) 623 s = sock.as_stream() 624 await s.writelines([b'Msg1\n', b'Msg2\n', b'Msg3\n']) 625 await sock.close() 626 627 async def main(): 628 serv = await spawn(tcp_server, '', portno, handler) 629 c = await spawn(test_client, ('localhost', portno), serv) 630 await done.wait() 631 await serv.cancel() 632 await c.join() 633 634 kernel.run(main()) 635 636 assert results == [ 637 'handler start', 638 b'Msg1\nMsg2\nMsg3\n', 639 'handler done' 640 ] 641 642 643def test_writelines_timeout(kernel, portno): 644 done = Event() 645 results = [] 646 async def handler(client, addr): 647 await client.send(b'OK') 648 s = client.as_stream() 649 await sleep(1) 650 results.append(await s.readall()) 651 await done.set() 652 653 def line_generator(): 654 n = 0 655 while True: 656 yield b'Msg%d\n' % n 657 n += 1 658 659 async def test_client(address, serv): 660 sock = socket(AF_INET, SOCK_STREAM) 661 await sock.connect(address) 662 await sock.recv(8) 663 s = sock.as_stream() 664 try: 665 await timeout_after(0.5, s.writelines(line_generator())) 666 except TaskTimeout as e: 667 results.append(e.bytes_written) 668 await sock.close() 669 670 async def main(): 671 serv = await spawn(tcp_server, '', portno, handler) 672 c = await spawn(test_client, ('localhost', portno), serv) 673 await done.wait() 674 await serv.cancel() 675 await c.join() 676 677 kernel.run(main()) 678 679 assert results[0] == len(results[1]) 680 681@pytest.mark.skipif(sys.platform.startswith("win"), 682 reason="fails on windows") 683def test_write_timeout(kernel, portno): 684 done = Event() 685 results = [] 686 async def handler(client, addr): 687 await client.send(b'OK') 688 s = client.as_stream() 689 await sleep(1) 690 results.append(await s.readall()) 691 await done.set() 692 693 async def test_client(address, serv): 694 sock = socket(AF_INET, SOCK_STREAM) 695 await sock.connect(address) 696 await sock.recv(8) 697 s = sock.as_stream() 698 try: 699 msg = b'x'*10000000 # Must be big enough to fill buffers 700 await timeout_after(0.5, s.write(msg)) 701 except TaskTimeout as e: 702 results.append(e.bytes_written) 703 await sock.close() 704 705 async def main(): 706 serv = await spawn(tcp_server, '', portno, handler) 707 c = await spawn(test_client, ('localhost', portno), serv) 708 await done.wait() 709 await serv.cancel() 710 await c.join() 711 712 kernel.run(main()) 713 714 assert results[0] == len(results[1]) 715 716def test_iterline(kernel, portno): 717 results = [] 718 719 async def handler(client, addr): 720 results.append('handler start') 721 await client.send(b'OK') 722 s = client.as_stream() 723 async for line in s: 724 results.append(line) 725 results.append('handler done') 726 727 async def test_client(address, serv): 728 sock = socket(AF_INET, SOCK_STREAM) 729 await sock.connect(address) 730 await sock.recv(8) 731 await sock.send(b'Msg1\nMsg2\nMsg3\n') 732 await sock.close() 733 await sleep(0.1) 734 await serv.cancel() 735 736 async def main(): 737 async with TaskGroup() as g: 738 serv = await g.spawn(tcp_server, '', portno, handler) 739 await g.spawn(test_client, ('localhost', portno), serv) 740 741 kernel.run(main()) 742 743 assert results == [ 744 'handler start', 745 b'Msg1\n', 746 b'Msg2\n', 747 b'Msg3\n', 748 'handler done' 749 ] 750 751 752def test_double_recv(kernel, portno): 753 done = Event() 754 results = [] 755 756 async def bad_handler(client): 757 results.append('bad handler') 758 await sleep(0.1) 759 try: 760 await client.recv(1000) # <- This needs to fail. Task already reading on the socket 761 results.append('why am I here?') 762 except ReadResourceBusy as e: 763 results.append('good handler') 764 765 async def handler(client, addr): 766 results.append('handler start') 767 await spawn(bad_handler, client, daemon=True) 768 await client.send(b'OK') 769 data = await client.recv(1000) 770 results.append(data) 771 results.append('handler done') 772 await done.set() 773 774 async def test_client(address, serv): 775 sock = socket(AF_INET, SOCK_STREAM) 776 await sock.connect(address) 777 await sock.recv(8) 778 await sleep(1) 779 await sock.send(b'Msg') 780 await sock.close() 781 782 async def main(): 783 serv = await spawn(tcp_server, '', portno, handler) 784 client = await spawn(test_client, ('localhost', portno), serv) 785 await done.wait() 786 await serv.cancel() 787 await client.join() 788 789 kernel.run(main()) 790 791 assert results == [ 792 'handler start', 793 'bad handler', 794 'good handler', 795 b'Msg', 796 'handler done' 797 ] 798 799@pytest.mark.skipif(True, 800 reason="flaky") 801def test_sendall_cancel(kernel, portno): 802 done = Event() 803 start = Event() 804 results = {} 805 806 async def handler(client, addr): 807 await start.wait() 808 nrecv = 0 809 while True: 810 data = await client.recv(1000000) 811 if not data: 812 break 813 nrecv += len(data) 814 results['handler'] = nrecv 815 await client.close() 816 await done.set() 817 818 async def test_client(address, serv): 819 sock = socket(AF_INET, SOCK_STREAM) 820 await sock.connect(address) 821 try: 822 await sock.sendall(b'x'*10000000) 823 except CancelledError as e: 824 results['sender'] = e.bytes_sent 825 await sock.close() 826 827 async def main(): 828 serv = await spawn(tcp_server, '', portno, handler) 829 t = await spawn(test_client, ('localhost', portno), serv) 830 await sleep(0.1) 831 await t.cancel() 832 await start.set() 833 await serv.cancel() 834 await done.wait() 835 836 kernel.run(main()) 837 838 assert results['handler'] == results['sender'] 839 840 841def test_stream_bad_context(kernel, portno): 842 done = Event() 843 results = [] 844 async def handler(client, addr): 845 await client.send(b'OK') 846 try: 847 with client.as_stream() as s: 848 data = await s.readall() 849 except AsyncOnlyError: 850 results.append(True) 851 finally: 852 await done.set() 853 854 async def test_client(address, serv): 855 sock = socket(AF_INET, SOCK_STREAM) 856 await sock.connect(address) 857 await sock.recv(8) 858 await sock.send(b'Hello\n') 859 await sock.close() 860 861 862 async def main(): 863 serv = await spawn(tcp_server, '', portno, handler) 864 c = await spawn(test_client, ('localhost', portno), serv) 865 await done.wait() 866 await serv.cancel() 867 await c.join() 868 869 kernel.run(main()) 870 871 assert results == [ True ] 872 873def test_stream_bad_iter(kernel, portno): 874 done = Event() 875 results = [] 876 async def handler(client, addr): 877 await client.send(b'OK') 878 try: 879 async with client.as_stream() as s: 880 for line in s: 881 pass 882 except AsyncOnlyError: 883 results.append(True) 884 finally: 885 await done.set() 886 887 async def test_client(address, serv): 888 sock = socket(AF_INET, SOCK_STREAM) 889 await sock.connect(address) 890 await sock.recv(8) 891 await sock.send(b'Hello\n') 892 await sock.close() 893 894 895 async def main(): 896 serv = await spawn(tcp_server, '', portno, handler) 897 c = await spawn(test_client, ('localhost', portno), serv) 898 await done.wait() 899 await serv.cancel() 900 await c.join() 901 902 kernel.run(main()) 903 904 assert results == [ True ] 905 906def test_io_waiting(kernel): 907 async def handler(sock): 908 result = await sock.accept() 909 910 async def main(): 911 from curio.traps import _io_waiting 912 913 sock = socket(AF_INET, SOCK_STREAM) 914 sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True) 915 sock.bind(('',25000)) 916 sock.listen(1) 917 async with sock: 918 t1 = await spawn(handler, sock) 919 await sleep(0.1) 920 r, w = await _io_waiting(sock) 921 assert t1 == r 922 assert w == None 923 await t1.cancel() 924 925 r,w = await _io_waiting(0) 926 assert (r, w) == (None, None) 927 928 kernel.run(main()) 929 930def test_io_unregister(kernel): 931 # This is purely a code coverage test 932 async def reader(sock): 933 from curio.traps import _read_wait 934 await _read_wait(sock.fileno()) 935 936 async def main(): 937 from curio.traps import _write_wait 938 sock = socket(AF_INET, SOCK_DGRAM) 939 sock.bind(('', 26000)) 940 t = await spawn(reader(sock)) 941 await sleep(0.1) 942 await _write_wait(sock.fileno()) 943 await t.cancel() 944 await sock.close() 945 kernel.run(main()) 946 assert True 947