1# test_kernel.py
2
3import time
4import pytest
5from curio import *
6kernel_clock = clock
7from curio import traps
8
9def test_hello(kernel):
10
11    async def hello():
12        return 'hello'
13
14    result = kernel.run(hello)
15    assert result == 'hello'
16
17def test_raise(kernel):
18    class Error(Exception):
19        pass
20
21    async def boom():
22        raise Error()
23
24    try:
25        kernel.run(boom)
26        assert False, 'boom() did not raise'
27    except Error as e:
28        pass
29    except:
30        assert False, 'boom() raised wrong error'
31
32def test_sleep(kernel):
33    start = end = 0
34    async def main():
35        nonlocal start, end
36        start = time.time()
37        await sleep(0.5)
38        end = time.time()
39
40    kernel.run(main)
41    elapsed = end - start
42    assert elapsed > 0.5
43
44def test_clock(kernel):
45    async def main():
46        start = await clock()
47        await sleep(0.1)
48        end = await clock()
49        assert (end - start) >= 0.1
50    kernel.run(main)
51
52def test_sleep_cancel(kernel):
53    cancelled = False
54
55    async def sleeper():
56        nonlocal cancelled
57        try:
58            await sleep(1)
59            assert False
60        except CancelledError:
61            cancelled = True
62
63    async def main():
64        task = await spawn(sleeper)
65        await sleep(0.1)
66        await task.cancel()
67
68    kernel.run(main)
69    assert cancelled
70
71def test_sleep_timeout(kernel):
72    cancelled = True
73
74    async def sleeper():
75        nonlocal cancelled
76        try:
77            await timeout_after(0.1, sleep, 1)
78            assert False
79        except TaskTimeout:
80            cancelled = True
81
82    async def main():
83        task = await spawn(sleeper)
84        await task.join()
85
86    kernel.run(main)
87    assert cancelled
88
89def test_sleep_ignore_timeout(kernel):
90    async def sleeper():
91        cancelled = False
92
93        if await ignore_after(0.1, sleep(1)) is None:
94            cancelled = True
95        assert cancelled
96
97        cancelled = False
98        async with ignore_after(0.1) as s:
99            await sleep(1)
100        if s.result is None:
101            cancelled = True
102
103        assert cancelled
104
105    async def main():
106        task = await spawn(sleeper)
107        await task.join()
108
109    kernel.run(main)
110
111def test_sleep_notimeout(kernel):
112    async def sleeper():
113        try:
114            await timeout_after(0.5, sleep(0.1))
115            assert True
116        except TaskTimeout:
117            assert False
118        await sleep(0.5)
119        assert True
120
121    async def main():
122        task = await spawn(sleeper)
123        await task.join()
124
125    kernel.run(main)
126
127def test_task_join(kernel):
128    async def child():
129        return 37
130
131    async def main():
132        task = await spawn(child)
133        r = await task.join()
134        assert r == 37
135
136    kernel.run(main)
137
138def test_task_join_error(kernel):
139    async def child():
140        int('bad')
141
142    async def main():
143        task = await spawn(child)
144        try:
145            r = await task.join()
146            assert False
147        except TaskError as e:
148            assert isinstance(e.__cause__, ValueError)
149
150    kernel.run(main)
151
152def test_task_cancel(kernel):
153    cancelled = False
154    async def child():
155        nonlocal cancelled
156        try:
157            await sleep(0.5)
158            assert False
159        except CancelledError:
160            cancelled = True
161
162    async def main():
163        task = await spawn(child)
164        await task.cancel()
165        assert cancelled
166
167    kernel.run(main)
168
169def test_task_cancel_poll(kernel):
170    results = []
171
172    async def child():
173        async with disable_cancellation():
174            await sleep(0.1)
175            results.append('success')
176            if await check_cancellation():
177                results.append('cancelled')
178            else:
179                assert False
180
181    async def main():
182        task = await spawn(child)
183        await task.cancel()
184        results.append('done')
185
186    kernel.run(main)
187    assert results == ['success', 'cancelled', 'done']
188
189def test_task_cancel_not_blocking(kernel):
190    async def child(e1, e2):
191        await e1.set()
192        try:
193            await sleep(1000)
194        except CancelledError:
195            await e2.wait()
196            raise
197
198    async def main():
199        e1 = Event()
200        e2 = Event()
201        task = await spawn(child, e1, e2)
202        await e1.wait()
203        await task.cancel(blocking=False)
204        await e2.set()
205        try:
206            await task.join()
207        except TaskError as e:
208            assert isinstance(e.__cause__, CancelledError)
209
210    kernel.run(main)
211
212
213def test_task_cancel_join(kernel):
214    child_evt = Event()
215    async def child():
216        await child_evt.wait()
217        assert False
218
219    async def main():
220        task = await spawn(child)
221        await sleep(0)
222        await task.cancel()
223
224        # Try joining with a cancelled task. Should raise a TaskError
225        try:
226            await task.join()
227        except TaskError as e:
228            assert isinstance(e.__cause__, CancelledError)
229        else:
230            assert False
231        assert True
232
233    kernel.run(main)
234
235def test_task_cancel_join_wait(kernel):
236    evt = Event()
237
238    async def child():
239        await evt.wait()
240
241    async def canceller(task):
242        await task.cancel()
243
244    async def main():
245        task1 = await spawn(child)
246        task2 = await spawn(canceller, task1)
247        await task2.join()
248        assert not evt.is_set()
249        try:
250            await task1.join()     # Should raise TaskError... with CancelledError as cause
251            assert False
252        except TaskError as e:
253            assert isinstance(e.__cause__, CancelledError)
254        else:
255            assert False
256
257    kernel.run(main)
258
259def test_task_child_cancel(kernel):
260    results = []
261
262    async def child():
263        results.append('start')
264        try:
265            await sleep(0.5)
266            results.append('end')
267        except CancelledError:
268            results.append('child cancelled')
269
270    async def parent():
271        try:
272            child_task = await spawn(child)
273            await sleep(0.5)
274            results.append('end parent')
275        except CancelledError:
276            await child_task.cancel()
277            results.append('parent cancelled')
278
279    async def grandparent():
280        try:
281            parent_task = await spawn(parent)
282            await sleep(0.5)
283            results.append('end grandparent')
284        except CancelledError:
285            await parent_task.cancel()
286            results.append('grandparent cancelled')
287
288    async def main():
289        task = await spawn(grandparent)
290        await sleep(0.1)
291        results.append('cancel start')
292        await sleep(0.1)
293        results.append('cancelling')
294        await task.cancel()
295        results.append('done')
296
297    kernel.run(main)
298
299    assert results == [
300        'start',
301        'cancel start',
302        'cancelling',
303        'child cancelled',
304        'parent cancelled',
305        'grandparent cancelled',
306        'done',
307    ]
308
309
310def test_task_ready_cancel(kernel):
311    # This tests a tricky corner case of a task cancelling another task that's also
312    # on the ready queue.
313    results = []
314
315    async def child():
316        try:
317            results.append('child sleep')
318            await sleep(1.0)
319            results.append('child slept')
320            await sleep(1.0)
321            results.append('should not see this')
322        except CancelledError:
323            results.append('child cancelled')
324
325    async def parent():
326        task = await spawn(child)
327        results.append('parent sleep')
328        await sleep(0.5)
329        results.append('cancel start')
330        await task.cancel()
331        results.append('cancel done')
332
333    async def main():
334        task = await spawn(parent)
335        await sleep(0.1)
336        time.sleep(1)      # Forced block of the event loop. Both tasks should awake when we come back
337        await sleep(0.1)
338
339    kernel.run(main)
340
341    assert results == [
342        'parent sleep',
343        'child sleep',
344        'cancel start',
345        'child slept',
346        'child cancelled',
347        'cancel done'
348    ]
349
350
351def test_double_cancel(kernel):
352    results = []
353
354    async def sleeper():
355        results.append('start')
356        try:
357            await sleep(1)
358            results.append('not here')
359        except CancelledError:
360            results.append('cancel request')
361            await sleep(1)
362            results.append('cancelled')
363
364    async def main():
365        task = await spawn(sleeper)
366        await sleep(0.5)
367        try:
368            await timeout_after(1, task.cancel())
369        except TaskTimeout:
370            results.append('retry')
371            await task.cancel()    # This second cancel should not abort any operation in sleeper
372            results.append('done cancel')
373
374    kernel.run(main)
375    assert results == [
376        'start',
377        'cancel request',
378        'retry',
379        'cancelled',
380        'done cancel'
381    ]
382
383
384def test_nested_timeout(kernel):
385    results = []
386
387    async def coro1():
388        results.append('coro1 start')
389        await sleep(1)
390        results.append('coro1 done')
391
392    async def coro2():
393        results.append('coro2 start')
394        await sleep(1)
395        results.append('coro2 done')
396
397    # Parent should cause a timeout before the child.
398    # Results in a TimeoutCancellationError instead of a normal TaskTimeout
399    async def child():
400        try:
401            await timeout_after(5, coro1())
402            results.append('coro1 success')
403        except TaskTimeout:
404            results.append('coro1 timeout')
405        except TimeoutCancellationError:
406            results.append('coro1 timeout cancel')
407
408        await coro2()
409        results.append('coro2 success')
410
411    async def parent():
412        try:
413            await timeout_after(0.75, child())
414        except TaskTimeout:
415            results.append('parent timeout')
416
417    kernel.run(parent)
418    assert results == [
419        'coro1 start',
420        'coro1 timeout cancel',
421        'coro2 start',
422        'parent timeout'
423    ]
424
425
426def test_nested_context_timeout(kernel):
427    results = []
428
429    async def coro1():
430        results.append('coro1 start')
431        await sleep(1)
432        results.append('coro1 done')
433
434    async def coro2():
435        results.append('coro2 start')
436        await sleep(1)
437        results.append('coro2 done')
438
439    # Parent should cause a timeout before the child.
440    # Results in a TimeoutCancellationError instead of a normal TaskTimeout
441    async def child():
442        try:
443            async with timeout_after(5):
444                await coro1()
445            results.append('coro1 success')
446        except TaskTimeout:
447            results.append('coro1 timeout')
448        except TimeoutCancellationError:
449            results.append('coro1 timeout cancel')
450
451        await coro2()
452        results.append('coro2 success')
453
454    async def parent():
455        try:
456            async with timeout_after(0.75):
457                await child()
458        except TaskTimeout:
459            results.append('parent timeout')
460
461    kernel.run(parent)
462    assert results == [
463        'coro1 start',
464        'coro1 timeout cancel',
465        'coro2 start',
466        'parent timeout'
467    ]
468
469def test_nested_context_timeout2(kernel):
470    async def coro1():
471        try:
472            async with timeout_after(10):
473                await sleep(5)
474        except CancelledError as e:
475            assert isinstance(e, TimeoutCancellationError)
476            raise
477        else:
478            assert False
479
480    async def coro2():
481        try:
482            async with timeout_after(15):
483                await coro1()
484        except CancelledError as e:
485            assert isinstance(e, TimeoutCancellationError)
486            raise
487        else:
488            assert False
489
490    async def parent():
491        try:
492            async with timeout_after(0.1):
493                await coro2()
494        except BaseException as e:
495            assert isinstance(e, TaskTimeout)
496        else:
497            assert False
498
499    kernel.run(parent)
500
501def test_nested_context_timeout3(kernel):
502    async def coro1():
503        try:
504            await timeout_after(10, sleep, 5)
505        except CancelledError as e:
506            assert isinstance(e, TimeoutCancellationError)
507            raise
508        else:
509            assert False
510
511    async def coro2():
512        try:
513            await timeout_after(15, coro1)
514        except CancelledError as e:
515            assert isinstance(e, TimeoutCancellationError)
516            raise
517        else:
518            assert False
519
520    async def parent():
521        try:
522            await timeout_after(0.1, coro2)
523        except BaseException as e:
524            assert isinstance(e, TaskTimeout)
525        else:
526            assert False
527
528    kernel.run(parent)
529
530def test_nested_timeout_uncaught(kernel):
531    results = []
532
533    async def coro1():
534        results.append('coro1 start')
535        await sleep(5)
536        results.append('coro1 done')
537
538    async def child():
539        # This will cause a TaskTimeout, but it's uncaught
540        await timeout_after(1, coro1())
541
542    async def parent():
543        try:
544            await timeout_after(10, child())
545        except TaskTimeout:
546            results.append('parent timeout')
547        except UncaughtTimeoutError:
548            results.append('uncaught timeout')
549
550    kernel.run(parent)
551    assert results == [
552        'coro1 start',
553        'uncaught timeout'
554    ]
555
556
557def test_nested_context_timeout_uncaught(kernel):
558    results = []
559
560    async def coro1():
561        results.append('coro1 start')
562        await sleep(5)
563        results.append('coro1 done')
564
565    async def child():
566        # This will cause a TaskTimeout, but it's uncaught
567        async with timeout_after(1):
568            await coro1()
569
570    async def parent():
571        try:
572            async with timeout_after(10):
573                await child()
574        except TaskTimeout:
575            results.append('parent timeout')
576        except UncaughtTimeoutError:
577            results.append('uncaught timeout')
578
579    kernel.run(parent)
580    assert results == [
581        'coro1 start',
582        'uncaught timeout'
583    ]
584
585
586def test_nested_timeout_none(kernel):
587    results = []
588
589    async def coro1():
590        results.append('coro1 start')
591        await sleep(2)
592        results.append('coro1 done')
593
594    async def coro2():
595        results.append('coro2 start')
596        await sleep(1)
597        results.append('coro2 done')
598
599    async def child():
600        await timeout_after(None, coro1())
601        results.append('coro1 success')
602        await coro2()
603        results.append('coro2 success')
604
605    async def parent():
606        try:
607            await timeout_after(1, child())
608        except TaskTimeout:
609            results.append('parent timeout')
610
611    kernel.run(parent)
612    assert results == [
613        'coro1 start',
614#        'coro1 done',
615#        'coro1 success',
616#        'coro2 start',
617        'parent timeout'
618    ]
619
620
621def test_task_run_error(kernel):
622    results = []
623
624    async def main():
625        int('bad')
626
627    try:
628        kernel.run(main)
629        assert False, "Exception not raised"
630    except ValueError as e:
631        pass
632    except:
633        assert False, "Wrong exception raised"
634
635def test_sleep_0_starvation(kernel):
636    # This task should not block other tasks from running, and should be
637    # cancellable. We used to have a bug where neither were true...
638    async def loop_forever():
639        while True:
640            print("Sleeping 0")
641            await sleep(0)
642
643    async def io1(sock):
644        await sock.recv(1)
645        await sock.send(b"x")
646        await sock.recv(1)
647
648    async def io2(sock):
649        await sock.send(b"x")
650        await sock.recv(1)
651        await sock.send(b"x")
652
653    async def main():
654        loop_task = await spawn(loop_forever)
655        await sleep(0)
656        import curio.socket
657        sock1, sock2 = curio.socket.socketpair()
658        io1_task = await spawn(io1, sock1)
659        io2_task = await spawn(io2, sock2)
660        await io1_task.join()
661        await io2_task.join()
662        await loop_task.cancel()
663
664    kernel.run(main)
665
666
667def test_ping_pong_starvation(kernel):
668    # It used to be that two of these tasks could starve out other tasks
669    async def pingpong(inq, outq):
670        while True:
671            await outq.put(await inq.get())
672
673    async def i_will_survive():
674        for _ in range(10):
675            await sleep(0)
676        return "i survived!"
677
678    async def main():
679        q1 = Queue()
680        q2 = Queue()
681        await q1.put("something")
682        pp1 = await spawn(pingpong, q1, q2)
683        pp2 = await spawn(pingpong, q2, q1)
684        iws = await spawn(i_will_survive)
685
686        assert (await iws.join()) == "i survived!"
687        await pp1.cancel()
688        await pp2.cancel()
689
690    kernel.run(main)
691
692def test_task_cancel_timeout(kernel):
693    # Test that cancellation also cancels timeouts
694    results = []
695
696    async def coro():
697        try:
698            await sleep(5)
699        except CancelledError:
700            results.append('cancelled')
701            await sleep(1)
702            results.append('done cancel')
703            raise
704
705    async def child():
706        results.append('child')
707        try:
708            async with timeout_after(1):
709                 await coro()
710        except TaskTimeout:
711            results.append('timeout')
712
713    async def main():
714        task = await spawn(child)
715        await sleep(0.5)
716        await task.cancel()
717
718    kernel.run(main)
719    assert results == [ 'child', 'cancelled', 'done cancel' ]
720
721def test_reentrant_kernel(kernel):
722    async def child():
723        pass
724
725    async def main():
726        with pytest.raises(RuntimeError):
727            kernel.run(child)
728
729    kernel.run(main)
730
731from curio.traps import *
732
733def test_pending_cancellation(kernel):
734    async def main():
735        self = await _get_current()
736        self.cancel_pending = CancelledError()
737
738        with pytest.raises(CancelledError):
739            await _read_wait(None)
740
741        self.cancel_pending = CancelledError()
742        with pytest.raises(CancelledError):
743            await _future_wait(None)
744
745        self.cancel_pending = CancelledError()
746        with pytest.raises(CancelledError):
747            await _scheduler_wait(None, None)
748
749        self.cancel_pending = TaskTimeout()
750        try:
751            await _unset_timeout(None)
752            assert True
753        except TaskTimeout:
754            assert False
755
756    kernel.run(main)
757
758from functools import partial
759
760def test_single_stepping(kernel):
761    value = 0
762    async def child():
763        nonlocal value
764        await sleep(0)
765        value = 1
766        await sleep(0.1)
767        value = 2
768
769    task = kernel.run(partial(spawn, child, daemon=True))
770    while value < 1:
771        kernel.run()
772    assert True
773    time.sleep(0.2)
774    kernel.run()
775    assert value == 2
776
777def test_io_registration(kernel):
778    # Tests some tricky corner cases of the kernel that are difficult
779    # to get to under normal socket usage
780    import socket
781    s1, s2 = socket.socketpair()
782    s1.setblocking(False)
783    s2.setblocking(False)
784
785    # Fill the send buffer
786    while True:
787        try:
788            s1.send(b'x'*100000)
789        except BlockingIOError:
790            break
791
792    async def reader1():
793        await traps._read_wait(s1.fileno())
794        data = s1.recv(100)
795        assert data == b'hello'
796
797    async def writer1():
798        await traps._write_wait(s1.fileno())
799        assert False
800
801    async def writer2():
802        with pytest.raises(WriteResourceBusy):
803            await traps._write_wait(s1.fileno())
804
805    async def main():
806        t0 = await spawn(reader1)
807        t1 = await spawn(writer1)
808        t2 = await spawn(writer2)
809        await t2.join()
810        await t1.cancel()
811        s2.send(b'hello')
812        await t0.join()
813        s1.close()
814        s2.close()
815
816    kernel.run(main)
817
818from functools import partial
819
820def test_coro_partial(kernel):
821    async def func(x, y, z):
822        assert x == 1
823        assert y == 2
824        assert z == 3
825        return True
826
827    async def main():
828        assert await func(1, 2, 3)
829        assert await ignore_after(1, func(1,2,3))
830        assert await ignore_after(1, func, 1, 2, 3)
831        assert await ignore_after(1, partial(func, 1, 2), 3)
832        assert await ignore_after(1, partial(func, z=3), 1, 2)
833
834        # Try spawns
835        t = await spawn(func(1,2,3))
836        assert await t.join()
837
838        t = await spawn(func, 1, 2, 3)
839        assert await t.join()
840
841        t = await spawn(partial(func, 1, 2), 3)
842        assert await t.join()
843
844        t = await spawn(partial(func, z=3), 1, 2)
845        assert await t.join()
846
847    kernel.run(main)
848
849def test_custom_cancel(kernel):
850    class CustomCancelled(CancelledError):
851        pass
852
853    evt = Event()
854    async def child():
855        try:
856            await evt.wait()
857        except CustomCancelled:
858            assert True
859        except:
860            assert False
861        else:
862            assert False
863
864    async def main():
865        t = await spawn(child)
866        await t.cancel(exc=CustomCancelled)
867
868    kernel.run(main)
869
870def test_timeout_badness(kernel):
871    import time
872    async def main():
873        async with timeout_after(0.1):
874            time.sleep(0.2)   # Timeout will take too long. Should issue a warning.
875
876        # Execution should make it here.  There were no blocking operations to cancel.
877        # It makes no sense to issue a cancellation on the next operation because we're
878        # Likely out of the timeout block
879        assert True
880
881    kernel.run(main)
882
883def test_kernel_no_shutdown():
884    # Code coverage test
885    k = Kernel()
886    del k
887
888    with Kernel() as k:
889        pass
890
891    with pytest.raises(RuntimeError):
892        k.run()
893
894
895def test_kernel_exit():
896    # Code coverage test
897    async def main():
898        raise SystemExit()
899
900    with pytest.raises(SystemExit):
901         with Kernel() as k:
902             k.run(main)
903
904
905def test_kernel_badtrap():
906    # Code coverage test
907    async def main():
908        from curio.traps import _kernel_trap
909        await _kernel_trap('bogus', 1)
910
911    with pytest.raises(KeyError):
912         with Kernel() as k:
913             k.run(main)
914
915def test_kernel_multischedule(kernel):
916    async def sleeper():
917        try:
918            async with timeout_after(0.5):
919                await sleep(0.25)
920        except TaskTimeout:
921            assert False
922
923        await sleep(0.1)    # Should not crash!
924        return True
925
926    async def main():
927        import time
928        t = await spawn(sleeper)
929        await sleep(0.1)
930        time.sleep(1)     # Force time clock to elapse past both the sleep and outer timeout
931        r = await t.join()
932        assert r
933
934    kernel.run(main)
935
936def test_kernel_debug():
937    from curio.debug import schedtrace, traptrace
938    async def hello():
939        await sleep(0)
940
941    with Kernel(debug=[schedtrace,traptrace]) as k:
942         k.run(hello)
943
944    with Kernel(debug=True) as k:
945         k.run(hello)
946
947    with Kernel(debug=schedtrace) as k:
948         k.run(hello)
949
950    with Kernel(debug=schedtrace(filter='none')) as k:
951        k.run(hello)
952
953
954
955
956
957