1# test_io.py
2
3from curio import *
4from curio.socket import *
5import io
6import socket as std_socket
7import pytest
8import sys
9
10def test_socket_blocking(kernel, portno):
11    '''
12    Test of exposing a socket in blocking mode
13    '''
14    done = Event()
15
16    results = []
17
18    def sync_client(sock):
19        assert isinstance(sock, std_socket.socket)
20        data = sock.recv(8192)
21        results.append(('client', data))
22        sock.sendall(data)
23
24    async def handler(client, addr):
25        results.append('handler start')
26        await client.send(b'OK')
27        with client.blocking() as s:
28            await run_in_thread(sync_client, s)
29        results.append('handler done')
30        await done.set()
31
32    async def test_client(address, serv):
33        sock = socket(AF_INET, SOCK_STREAM)
34        await sock.connect(address)
35        await sock.recv(8)
36        await sock.send(b'Message')
37        data = await sock.recv(8192)
38        await sock.close()
39
40    async def main():
41        serv = await spawn(tcp_server, '', portno, handler)
42        c = await spawn(test_client, ('localhost', portno), serv)
43        await done.wait()
44        await serv.cancel()
45        await c.join()
46
47
48    kernel.run(main())
49
50    assert results == [
51        'handler start',
52        ('client', b'Message'),
53        'handler done'
54    ]
55
56@pytest.mark.skipif(sys.platform.startswith("win"),
57                    reason="not supported on Windows")
58def test_socketstream_blocking(kernel, portno):
59    '''
60    Test of exposing a socket stream in blocking mode
61    '''
62    done = Event()
63    results = []
64
65    def sync_client(f):
66        assert isinstance(f, io.RawIOBase)
67        data = f.read(8192)
68        results.append(('client', data))
69        f.write(data)
70
71    async def handler(client, addr):
72        results.append('handler start')
73        await client.send(b'OK')
74        s = client.as_stream()
75        with s.blocking() as f:
76            await run_in_thread(sync_client, f)
77        results.append('handler done')
78        await done.set()
79
80    async def test_client(address, serv):
81        sock = socket(AF_INET, SOCK_STREAM)
82        await sock.connect(address)
83        await sock.recv(8)
84        await sock.send(b'Message')
85        data = await sock.recv(8192)
86        await sock.close()
87
88    async def main():
89        serv = await spawn(tcp_server, '', portno, handler)
90        c = await spawn(test_client, ('localhost', portno), serv)
91        await done.wait()
92        await serv.cancel()
93        await c.join()
94
95    kernel.run(main())
96
97    assert results == [
98        'handler start',
99        ('client', b'Message'),
100        'handler done'
101    ]
102
103@pytest.mark.skipif(sys.platform.startswith("win"),
104                    reason="not supported on Windows")
105def test_filestream_blocking(kernel, portno):
106    '''
107    Test of exposing a socket in blocking mode
108    '''
109    done = Event()
110    results = []
111
112    def sync_client(f):
113        assert isinstance(f, io.RawIOBase)
114        data = f.read(8192)
115        results.append(('client', data))
116        f.write(data)
117
118    async def handler(client, addr):
119        results.append('handler start')
120        await client.send(b'OK')
121        s = client.makefile('rwb', buffering=0)
122        with s.blocking() as f:
123            await run_in_thread(sync_client, f)
124        results.append('handler done')
125        await done.set()
126
127    async def test_client(address, serv):
128        sock = socket(AF_INET, SOCK_STREAM)
129        await sock.connect(address)
130        await sock.recv(8)
131        await sock.send(b'Message')
132        data = await sock.recv(8192)
133        await sock.close()
134
135    async def main():
136        serv = await spawn(tcp_server, '', portno, handler)
137        c = await spawn(test_client, ('localhost', portno), serv)
138        await done.wait()
139        await serv.cancel()
140        await c.join()
141
142    kernel.run(main())
143
144    assert results == [
145        'handler start',
146        ('client', b'Message'),
147        'handler done'
148    ]
149
150@pytest.mark.skipif(sys.platform.startswith("win"),
151                    reason="not supported on Windows")
152def test_filestream_bad_blocking(kernel, portno):
153    '''
154    Test of exposing a socket in blocking mode with buffered data error
155    '''
156    done = Event()
157    results = []
158
159    async def handler(client, addr):
160        await client.send(b'OK')
161        s = client.makefile('rwb', buffering=0)
162        line = await s.readline()
163        assert line == b'hello\n'
164        try:
165            with s.blocking() as f:
166                pass
167        except IOError:
168            results.append(True)
169        else:
170            results.append(False)
171        await done.set()
172
173    async def test_client(address, serv):
174        sock = socket(AF_INET, SOCK_STREAM)
175        await sock.connect(address)
176        await sock.recv(8)
177        await sock.send(b'hello\nworld\n')
178        data = await sock.recv(8192)
179        await sock.close()
180
181    async def main():
182        serv = await spawn(tcp_server, '', portno, handler)
183        c = await spawn(test_client, ('localhost', portno), serv)
184        await done.wait()
185        await serv.cancel()
186        await c.join()
187
188    kernel.run(main())
189    assert results[0]
190
191@pytest.mark.skipif(sys.platform.startswith("win"),
192                    reason="not supported on Windows")
193def test_socketstream_bad_blocking(kernel, portno):
194    '''
195    Test of exposing a socket in blocking mode with buffered data error
196    '''
197    done = Event()
198    results = []
199
200    async def handler(client, addr):
201        await client.send(b'OK')
202        s = client.as_stream()
203        line = await s.readline()
204        assert line == b'hello\n'
205        try:
206            with s.blocking() as f:
207                pass
208        except IOError:
209            results.append(True)
210        else:
211            results.append(False)
212        await done.set()
213
214    async def test_client(address, serv):
215        sock = socket(AF_INET, SOCK_STREAM)
216        await sock.connect(address)
217        await sock.recv(8)
218        await sock.send(b'hello\nworld\n')
219        data = await sock.recv(8192)
220        await sock.close()
221
222    async def main():
223        serv = await spawn(tcp_server, '', portno, handler)
224        c = await spawn(test_client, ('localhost', portno), serv)
225        await done.wait()
226        await serv.cancel()
227        await c.join()
228
229    kernel.run(main())
230    assert results[0]
231
232def test_read_partial(kernel, portno):
233    done = Event()
234    results = []
235    async def handler(client, addr):
236        try:
237            await client.send(b'OK')
238            s = client.as_stream()
239            line = await s.readline()
240            results.append(line)
241            data = await s.read(2)
242            results.append(data)
243            data = await s.read(1000)
244            results.append(data)
245        finally:
246            await done.set()
247
248    async def test_client(address, serv):
249        sock = socket(AF_INET, SOCK_STREAM)
250        await sock.connect(address)
251        await sock.recv(8)
252        await sock.send(b'hello\nworld\n')
253        await sock.close()
254
255
256    async def main():
257        serv = await spawn(tcp_server, '', portno, handler)
258        c = await spawn(test_client, ('localhost', portno), serv)
259        await done.wait()
260        await serv.cancel()
261        await c.join()
262
263    kernel.run(main())
264    assert results == [ b'hello\n', b'wo', b'rld\n']
265
266def test_readall(kernel, portno):
267    done = Event()
268    results = []
269
270    async def handler(client, addr):
271        results.append('handler start')
272        await client.send(b'OK')
273        async with client.as_stream() as s:
274            data = await s.readall()
275            results.append(data)
276            results.append('handler done')
277            await done.set()
278
279    async def test_client(address, serv):
280        sock = socket(AF_INET, SOCK_STREAM)
281        await sock.connect(address)
282        await sock.recv(8)
283        await sock.send(b'Msg1\n')
284        await sleep(0.1)
285        await sock.send(b'Msg2\n')
286        await sleep(0.1)
287        await sock.send(b'Msg3\n')
288        await sleep(0.1)
289        await sock.close()
290
291
292    async def main():
293        serv = await spawn(tcp_server, '', portno, handler)
294        c = await spawn(test_client, ('localhost', portno), serv)
295        await done.wait()
296        await serv.cancel()
297        await c.join()
298
299    kernel.run(main())
300
301    assert results == [
302        'handler start',
303        b'Msg1\nMsg2\nMsg3\n',
304        'handler done'
305    ]
306
307
308def test_readall_partial(kernel, portno):
309    done = Event()
310
311    async def handler(client, addr):
312        await client.send(b'OK')
313        s = client.as_stream()
314        line = await s.readline()
315        assert line == b'hello\n'
316        data = await s.readall()
317        assert data == b'world\n'
318        await done.set()
319
320    async def test_client(address, serv):
321        sock = socket(AF_INET, SOCK_STREAM)
322        await sock.connect(address)
323        await sock.recv(8)
324        await sock.send(b'hello\nworld\n')
325        await sock.close()
326
327
328    async def main():
329        serv = await spawn(tcp_server, '', portno, handler)
330        c = await spawn(test_client, ('localhost', portno), serv)
331        await done.wait()
332        await serv.cancel()
333        await c.join()
334
335    kernel.run(main())
336
337
338def test_readall_timeout(kernel, portno):
339    done = Event()
340    results = []
341
342    async def handler(client, addr):
343        results.append('handler start')
344        await client.send(b'OK')
345        s = client.as_stream()
346        try:
347            data = await timeout_after(0.5, s.readall())
348        except TaskTimeout as e:
349            results.append(e.bytes_read)
350        results.append('handler done')
351        await done.set()
352
353    async def test_client(address, serv):
354        sock = socket(AF_INET, SOCK_STREAM)
355        await sock.connect(address)
356        await sock.recv(8)
357        await sock.send(b'Msg1\n')
358        await sleep(1)
359        await sock.send(b'Msg2\n')
360        await sock.close()
361
362    async def main():
363        serv = await spawn(tcp_server, '', portno, handler)
364        c = await spawn(test_client, ('localhost', portno), serv)
365        await done.wait()
366        await serv.cancel()
367        await c.join()
368
369    kernel.run(main())
370
371    assert results == [
372        'handler start',
373        b'Msg1\n',
374        'handler done'
375    ]
376
377
378def test_read_exactly(kernel, portno):
379    done = Event()
380    results = []
381
382    async def handler(client, addr):
383        results.append('handler start')
384        await client.send(b'OK')
385        s = client.as_stream()
386        for n in range(3):
387            results.append(await s.read_exactly(5))
388        results.append('handler done')
389        await done.set()
390
391    async def test_client(address, serv):
392        sock = socket(AF_INET, SOCK_STREAM)
393        await sock.connect(address)
394        await sock.recv(8)
395        await sock.send(b'Msg1\nMsg2\nMsg3\n')
396        await sock.close()
397
398    async def main():
399        serv = await spawn(tcp_server, '', portno, handler)
400        c = await spawn(test_client, ('localhost', portno), serv)
401        await done.wait()
402        await serv.cancel()
403        await c.join()
404
405    kernel.run(main())
406
407    assert results == [
408        'handler start',
409        b'Msg1\n',
410        b'Msg2\n',
411        b'Msg3\n',
412        'handler done'
413    ]
414
415
416def test_read_exactly_incomplete(kernel, portno):
417    done = Event()
418    results = []
419    async def handler(client, addr):
420        await client.send(b'OK')
421        s = client.as_stream()
422        try:
423            await s.read_exactly(100)
424        except EOFError as e:
425            results.append(e.bytes_read)
426        finally:
427            await done.set()
428
429    async def test_client(address, serv):
430        sock = socket(AF_INET, SOCK_STREAM)
431        await sock.connect(address)
432        await sock.recv(8)
433        await sock.send(b'Msg1\nMsg2\nMsg3\n')
434        await sock.close()
435
436    async def main():
437        serv = await spawn(tcp_server, '', portno, handler)
438        c = await spawn(test_client, ('localhost', portno), serv)
439        await done.wait()
440        await serv.cancel()
441        await c.join()
442
443    kernel.run(main())
444
445    assert results[0] ==  b'Msg1\nMsg2\nMsg3\n'
446
447def test_read_exactly_timeout(kernel, portno):
448    done = Event()
449    results = []
450
451    async def handler(client, addr):
452        results.append('handler start')
453        await client.send(b'OK')
454        s = client.as_stream()
455        try:
456            data = await timeout_after(0.5, s.read_exactly(10))
457            results.append(data)
458        except TaskTimeout as e:
459            results.append(e.bytes_read)
460        results.append('handler done')
461        await done.set()
462
463    async def test_client(address, serv):
464        sock = socket(AF_INET, SOCK_STREAM)
465        await sock.connect(address)
466        await sock.recv(8)
467        await sock.send(b'Msg1\n')
468        await sleep(1)
469        await sock.send(b'Msg2\n')
470        await sock.close()
471
472
473    async def main():
474        serv = await spawn(tcp_server, '', portno, handler)
475        c = await spawn(test_client, ('localhost', portno), serv)
476        await done.wait()
477        await serv.cancel()
478        await c.join()
479
480    kernel.run(main())
481
482    assert results == [
483        'handler start',
484        b'Msg1\n',
485        'handler done'
486    ]
487
488
489
490def test_readline(kernel, portno):
491    done = Event()
492    results = []
493
494    async def handler(client, addr):
495        results.append('handler start')
496        await client.send(b'OK')
497        s = client.as_stream()
498        for n in range(3):
499            results.append(await s.readline())
500        results.append('handler done')
501        await done.set()
502
503    async def test_client(address, serv):
504        sock = socket(AF_INET, SOCK_STREAM)
505        await sock.connect(address)
506        await sock.recv(8)
507        await sock.send(b'Msg1\nMsg2\nMsg3\n')
508        await sock.close()
509
510    async def main():
511        serv = await spawn(tcp_server, '', portno, handler)
512        c = await spawn(test_client, ('localhost', portno), serv)
513        await done.wait()
514        await serv.cancel()
515        await c.join()
516
517    kernel.run(main())
518
519    assert results == [
520        'handler start',
521        b'Msg1\n',
522        b'Msg2\n',
523        b'Msg3\n',
524        'handler done'
525    ]
526
527
528def test_readlines(kernel, portno):
529    done = Event()
530    results = []
531
532    async def handler(client, addr):
533        await client.send(b'OK')
534        results.append('handler start')
535        s = client.as_stream()
536        results.extend(await s.readlines())
537        results.append('handler done')
538        await done.set()
539
540    async def test_client(address, serv):
541        sock = socket(AF_INET, SOCK_STREAM)
542        await sock.connect(address)
543        await sock.recv(8)
544        await sock.send(b'Msg1\nMsg2\nMsg3\n')
545        await sock.close()
546
547    async def main():
548        serv = await spawn(tcp_server, '', portno, handler)
549        c = await spawn(test_client, ('localhost', portno), serv)
550        await done.wait()
551        await serv.cancel()
552        await c.join()
553
554    kernel.run(main())
555
556    assert results == [
557        'handler start',
558        b'Msg1\n',
559        b'Msg2\n',
560        b'Msg3\n',
561        'handler done'
562    ]
563
564
565def test_readlines_timeout(kernel, portno):
566    done = Event()
567    results = []
568
569    async def handler(client, addr):
570        await client.send(b'OK')
571        results.append('handler start')
572        s = client.as_stream()
573        try:
574            await timeout_after(0.5, s.readlines())
575        except TaskTimeout as e:
576            results.extend(e.lines_read)
577        results.append('handler done')
578        await done.set()
579
580    async def test_client(address, serv):
581        sock = socket(AF_INET, SOCK_STREAM)
582        await sock.connect(address)
583        await sock.recv(8)
584        await sock.send(b'Msg1\nMsg2\n')
585        await sleep(1)
586        await sock.send(b'Msg3\n')
587        await sock.close()
588
589
590    async def main():
591        serv = await spawn(tcp_server, '', portno, handler)
592        c = await spawn(test_client, ('localhost', portno), serv)
593        await done.wait()
594        await serv.cancel()
595        await c.join()
596
597    kernel.run(main())
598
599    assert results == [
600        'handler start',
601        b'Msg1\n',
602        b'Msg2\n',
603        'handler done'
604    ]
605
606
607def test_writelines(kernel, portno):
608    done = Event()
609    results = []
610
611    async def handler(client, addr):
612        results.append('handler start')
613        await client.send(b'OK')
614        s = client.as_stream()
615        results.append(await s.readall())
616        results.append('handler done')
617        await done.set()
618
619    async def test_client(address, serv):
620        sock = socket(AF_INET, SOCK_STREAM)
621        await sock.connect(address)
622        await sock.recv(8)
623        s = sock.as_stream()
624        await s.writelines([b'Msg1\n', b'Msg2\n', b'Msg3\n'])
625        await sock.close()
626
627    async def main():
628        serv = await spawn(tcp_server, '', portno, handler)
629        c = await spawn(test_client, ('localhost', portno), serv)
630        await done.wait()
631        await serv.cancel()
632        await c.join()
633
634    kernel.run(main())
635
636    assert results == [
637        'handler start',
638        b'Msg1\nMsg2\nMsg3\n',
639        'handler done'
640    ]
641
642
643def test_writelines_timeout(kernel, portno):
644    done = Event()
645    results = []
646    async def handler(client, addr):
647        await client.send(b'OK')
648        s = client.as_stream()
649        await sleep(1)
650        results.append(await s.readall())
651        await done.set()
652
653    def line_generator():
654        n = 0
655        while True:
656            yield b'Msg%d\n' % n
657            n += 1
658
659    async def test_client(address, serv):
660        sock = socket(AF_INET, SOCK_STREAM)
661        await sock.connect(address)
662        await sock.recv(8)
663        s = sock.as_stream()
664        try:
665            await timeout_after(0.5, s.writelines(line_generator()))
666        except TaskTimeout as e:
667            results.append(e.bytes_written)
668        await sock.close()
669
670    async def main():
671        serv = await spawn(tcp_server, '', portno, handler)
672        c = await spawn(test_client, ('localhost', portno), serv)
673        await done.wait()
674        await serv.cancel()
675        await c.join()
676
677    kernel.run(main())
678
679    assert results[0] == len(results[1])
680
681@pytest.mark.skipif(sys.platform.startswith("win"),
682                    reason="fails on windows")
683def test_write_timeout(kernel, portno):
684    done = Event()
685    results = []
686    async def handler(client, addr):
687        await client.send(b'OK')
688        s = client.as_stream()
689        await sleep(1)
690        results.append(await s.readall())
691        await done.set()
692
693    async def test_client(address, serv):
694        sock = socket(AF_INET, SOCK_STREAM)
695        await sock.connect(address)
696        await sock.recv(8)
697        s = sock.as_stream()
698        try:
699            msg = b'x'*10000000  # Must be big enough to fill buffers
700            await timeout_after(0.5, s.write(msg))
701        except TaskTimeout as e:
702            results.append(e.bytes_written)
703        await sock.close()
704
705    async def main():
706        serv = await spawn(tcp_server, '', portno, handler)
707        c = await spawn(test_client, ('localhost', portno), serv)
708        await done.wait()
709        await serv.cancel()
710        await c.join()
711
712    kernel.run(main())
713
714    assert results[0] == len(results[1])
715
716def test_iterline(kernel, portno):
717    results = []
718
719    async def handler(client, addr):
720        results.append('handler start')
721        await client.send(b'OK')
722        s = client.as_stream()
723        async for line in s:
724            results.append(line)
725        results.append('handler done')
726
727    async def test_client(address, serv):
728        sock = socket(AF_INET, SOCK_STREAM)
729        await sock.connect(address)
730        await sock.recv(8)
731        await sock.send(b'Msg1\nMsg2\nMsg3\n')
732        await sock.close()
733        await sleep(0.1)
734        await serv.cancel()
735
736    async def main():
737        async with TaskGroup() as g:
738            serv = await g.spawn(tcp_server, '', portno, handler)
739            await g.spawn(test_client, ('localhost', portno), serv)
740
741    kernel.run(main())
742
743    assert results == [
744        'handler start',
745        b'Msg1\n',
746        b'Msg2\n',
747        b'Msg3\n',
748        'handler done'
749    ]
750
751
752def test_double_recv(kernel, portno):
753    done = Event()
754    results = []
755
756    async def bad_handler(client):
757        results.append('bad handler')
758        await sleep(0.1)
759        try:
760            await client.recv(1000)   # <- This needs to fail. Task already reading on the socket
761            results.append('why am I here?')
762        except ReadResourceBusy as e:
763            results.append('good handler')
764
765    async def handler(client, addr):
766        results.append('handler start')
767        await spawn(bad_handler, client, daemon=True)
768        await client.send(b'OK')
769        data = await client.recv(1000)
770        results.append(data)
771        results.append('handler done')
772        await done.set()
773
774    async def test_client(address, serv):
775        sock = socket(AF_INET, SOCK_STREAM)
776        await sock.connect(address)
777        await sock.recv(8)
778        await sleep(1)
779        await sock.send(b'Msg')
780        await sock.close()
781
782    async def main():
783        serv = await spawn(tcp_server, '', portno, handler)
784        client = await spawn(test_client, ('localhost', portno), serv)
785        await done.wait()
786        await serv.cancel()
787        await client.join()
788
789    kernel.run(main())
790
791    assert results == [
792        'handler start',
793        'bad handler',
794        'good handler',
795        b'Msg',
796        'handler done'
797        ]
798
799@pytest.mark.skipif(True,
800                    reason="flaky")
801def test_sendall_cancel(kernel, portno):
802    done = Event()
803    start = Event()
804    results = {}
805
806    async def handler(client, addr):
807        await start.wait()
808        nrecv = 0
809        while True:
810            data = await client.recv(1000000)
811            if not data:
812                break
813            nrecv += len(data)
814        results['handler'] = nrecv
815        await client.close()
816        await done.set()
817
818    async def test_client(address, serv):
819        sock = socket(AF_INET, SOCK_STREAM)
820        await sock.connect(address)
821        try:
822            await sock.sendall(b'x'*10000000)
823        except CancelledError as e:
824            results['sender'] = e.bytes_sent
825        await sock.close()
826
827    async def main():
828        serv = await spawn(tcp_server, '', portno, handler)
829        t = await spawn(test_client, ('localhost', portno), serv)
830        await sleep(0.1)
831        await t.cancel()
832        await start.set()
833        await serv.cancel()
834        await done.wait()
835
836    kernel.run(main())
837
838    assert results['handler'] == results['sender']
839
840
841def test_stream_bad_context(kernel, portno):
842    done = Event()
843    results = []
844    async def handler(client, addr):
845        await client.send(b'OK')
846        try:
847            with client.as_stream() as s:
848                data = await s.readall()
849        except AsyncOnlyError:
850            results.append(True)
851        finally:
852            await done.set()
853
854    async def test_client(address, serv):
855        sock = socket(AF_INET, SOCK_STREAM)
856        await sock.connect(address)
857        await sock.recv(8)
858        await sock.send(b'Hello\n')
859        await sock.close()
860
861
862    async def main():
863        serv = await spawn(tcp_server, '', portno, handler)
864        c = await spawn(test_client, ('localhost', portno), serv)
865        await done.wait()
866        await serv.cancel()
867        await c.join()
868
869    kernel.run(main())
870
871    assert results == [ True ]
872
873def test_stream_bad_iter(kernel, portno):
874    done = Event()
875    results = []
876    async def handler(client, addr):
877        await client.send(b'OK')
878        try:
879            async with client.as_stream() as s:
880                for line in s:
881                    pass
882        except AsyncOnlyError:
883            results.append(True)
884        finally:
885            await done.set()
886
887    async def test_client(address, serv):
888        sock = socket(AF_INET, SOCK_STREAM)
889        await sock.connect(address)
890        await sock.recv(8)
891        await sock.send(b'Hello\n')
892        await sock.close()
893
894
895    async def main():
896        serv = await spawn(tcp_server, '', portno, handler)
897        c = await spawn(test_client, ('localhost', portno), serv)
898        await done.wait()
899        await serv.cancel()
900        await c.join()
901
902    kernel.run(main())
903
904    assert results == [ True ]
905
906def test_io_waiting(kernel):
907    async def handler(sock):
908        result = await sock.accept()
909
910    async def main():
911        from curio.traps import _io_waiting
912
913        sock = socket(AF_INET, SOCK_STREAM)
914        sock.setsockopt(SOL_SOCKET, SO_REUSEADDR, True)
915        sock.bind(('',25000))
916        sock.listen(1)
917        async with sock:
918            t1 = await spawn(handler, sock)
919            await sleep(0.1)
920            r, w = await _io_waiting(sock)
921            assert t1 == r
922            assert w == None
923            await t1.cancel()
924
925        r,w = await _io_waiting(0)
926        assert (r, w) == (None, None)
927
928    kernel.run(main())
929
930def test_io_unregister(kernel):
931    # This is purely a code coverage test
932    async def reader(sock):
933        from curio.traps import _read_wait
934        await _read_wait(sock.fileno())
935
936    async def main():
937        from curio.traps import _write_wait
938        sock = socket(AF_INET, SOCK_DGRAM)
939        sock.bind(('', 26000))
940        t = await spawn(reader(sock))
941        await sleep(0.1)
942        await _write_wait(sock.fileno())
943        await t.cancel()
944        await sock.close()
945    kernel.run(main())
946    assert True
947