1import outcome
2import pytest
3import sys
4import os
5import signal
6import threading
7import contextlib
8import time
9
10from async_generator import (
11    async_generator,
12    yield_,
13    isasyncgenfunction,
14    asynccontextmanager,
15)
16
17from ... import _core
18from ...testing import wait_all_tasks_blocked
19from ..._util import signal_raise, is_main_thread
20from ..._timeouts import sleep
21from .tutil import slow
22
23
24def ki_self():
25    signal_raise(signal.SIGINT)
26
27
28def test_ki_self():
29    with pytest.raises(KeyboardInterrupt):
30        ki_self()
31
32
33async def test_ki_enabled():
34    # Regular tasks aren't KI-protected
35    assert not _core.currently_ki_protected()
36
37    # Low-level call-soon callbacks are KI-protected
38    token = _core.current_trio_token()
39    record = []
40
41    def check():
42        record.append(_core.currently_ki_protected())
43
44    token.run_sync_soon(check)
45    await wait_all_tasks_blocked()
46    assert record == [True]
47
48    @_core.enable_ki_protection
49    def protected():
50        assert _core.currently_ki_protected()
51        unprotected()
52
53    @_core.disable_ki_protection
54    def unprotected():
55        assert not _core.currently_ki_protected()
56
57    protected()
58
59    @_core.enable_ki_protection
60    async def aprotected():
61        assert _core.currently_ki_protected()
62        await aunprotected()
63
64    @_core.disable_ki_protection
65    async def aunprotected():
66        assert not _core.currently_ki_protected()
67
68    await aprotected()
69
70    # make sure that the decorator here overrides the automatic manipulation
71    # that start_soon() does:
72    async with _core.open_nursery() as nursery:
73        nursery.start_soon(aprotected)
74        nursery.start_soon(aunprotected)
75
76    @_core.enable_ki_protection
77    def gen_protected():
78        assert _core.currently_ki_protected()
79        yield
80
81    for _ in gen_protected():
82        pass
83
84    @_core.disable_ki_protection
85    def gen_unprotected():
86        assert not _core.currently_ki_protected()
87        yield
88
89    for _ in gen_unprotected():
90        pass
91
92
93# This used to be broken due to
94#
95#   https://bugs.python.org/issue29590
96#
97# Specifically, after a coroutine is resumed with .throw(), then the stack
98# makes it look like the immediate caller is the function that called
99# .throw(), not the actual caller. So child() here would have a caller deep in
100# the guts of the run loop, and always be protected, even when it shouldn't
101# have been. (Solution: we don't use .throw() anymore.)
102async def test_ki_enabled_after_yield_briefly():
103    @_core.enable_ki_protection
104    async def protected():
105        await child(True)
106
107    @_core.disable_ki_protection
108    async def unprotected():
109        await child(False)
110
111    async def child(expected):
112        import traceback
113
114        traceback.print_stack()
115        assert _core.currently_ki_protected() == expected
116        await _core.checkpoint()
117        traceback.print_stack()
118        assert _core.currently_ki_protected() == expected
119
120    await protected()
121    await unprotected()
122
123
124# This also used to be broken due to
125#   https://bugs.python.org/issue29590
126async def test_generator_based_context_manager_throw():
127    @contextlib.contextmanager
128    @_core.enable_ki_protection
129    def protected_manager():
130        assert _core.currently_ki_protected()
131        try:
132            yield
133        finally:
134            assert _core.currently_ki_protected()
135
136    with protected_manager():
137        assert not _core.currently_ki_protected()
138
139    with pytest.raises(KeyError):
140        # This is the one that used to fail
141        with protected_manager():
142            raise KeyError
143
144
145async def test_agen_protection():
146    @_core.enable_ki_protection
147    @async_generator
148    async def agen_protected1():
149        assert _core.currently_ki_protected()
150        try:
151            await yield_()
152        finally:
153            assert _core.currently_ki_protected()
154
155    @_core.disable_ki_protection
156    @async_generator
157    async def agen_unprotected1():
158        assert not _core.currently_ki_protected()
159        try:
160            await yield_()
161        finally:
162            assert not _core.currently_ki_protected()
163
164    # Swap the order of the decorators:
165    @async_generator
166    @_core.enable_ki_protection
167    async def agen_protected2():
168        assert _core.currently_ki_protected()
169        try:
170            await yield_()
171        finally:
172            assert _core.currently_ki_protected()
173
174    @async_generator
175    @_core.disable_ki_protection
176    async def agen_unprotected2():
177        assert not _core.currently_ki_protected()
178        try:
179            await yield_()
180        finally:
181            assert not _core.currently_ki_protected()
182
183    # Native async generators
184    @_core.enable_ki_protection
185    async def agen_protected3():
186        assert _core.currently_ki_protected()
187        try:
188            yield
189        finally:
190            assert _core.currently_ki_protected()
191
192    @_core.disable_ki_protection
193    async def agen_unprotected3():
194        assert not _core.currently_ki_protected()
195        try:
196            yield
197        finally:
198            assert not _core.currently_ki_protected()
199
200    for agen_fn in [
201        agen_protected1,
202        agen_protected2,
203        agen_protected3,
204        agen_unprotected1,
205        agen_unprotected2,
206        agen_unprotected3,
207    ]:
208        async for _ in agen_fn():  # noqa
209            assert not _core.currently_ki_protected()
210
211        # asynccontextmanager insists that the function passed must itself be an
212        # async gen function, not a wrapper around one
213        if isasyncgenfunction(agen_fn):
214            async with asynccontextmanager(agen_fn)():
215                assert not _core.currently_ki_protected()
216
217            # Another case that's tricky due to:
218            #   https://bugs.python.org/issue29590
219            with pytest.raises(KeyError):
220                async with asynccontextmanager(agen_fn)():
221                    raise KeyError
222
223
224# Test the case where there's no magic local anywhere in the call stack
225def test_ki_disabled_out_of_context():
226    assert _core.currently_ki_protected()
227
228
229def test_ki_disabled_in_del():
230    def nestedfunction():
231        return _core.currently_ki_protected()
232
233    def __del__():
234        assert _core.currently_ki_protected()
235        assert nestedfunction()
236
237    @_core.disable_ki_protection
238    def outerfunction():
239        assert not _core.currently_ki_protected()
240        assert not nestedfunction()
241        __del__()
242
243    __del__()
244    outerfunction()
245    assert nestedfunction()
246
247
248def test_ki_protection_works():
249    async def sleeper(name, record):
250        try:
251            while True:
252                await _core.checkpoint()
253        except _core.Cancelled:
254            record.add(name + " ok")
255
256    async def raiser(name, record):
257        try:
258            # os.kill runs signal handlers before returning, so we don't need
259            # to worry that the handler will be delayed
260            print("killing, protection =", _core.currently_ki_protected())
261            ki_self()
262        except KeyboardInterrupt:
263            print("raised!")
264            # Make sure we aren't getting cancelled as well as siginted
265            await _core.checkpoint()
266            record.add(name + " raise ok")
267            raise
268        else:
269            print("didn't raise!")
270            # If we didn't raise (b/c protected), then we *should* get
271            # cancelled at the next opportunity
272            try:
273                await _core.wait_task_rescheduled(lambda _: _core.Abort.SUCCEEDED)
274            except _core.Cancelled:
275                record.add(name + " cancel ok")
276
277    # simulated control-C during raiser, which is *unprotected*
278    print("check 1")
279    record = set()
280
281    async def check_unprotected_kill():
282        async with _core.open_nursery() as nursery:
283            nursery.start_soon(sleeper, "s1", record)
284            nursery.start_soon(sleeper, "s2", record)
285            nursery.start_soon(raiser, "r1", record)
286
287    with pytest.raises(KeyboardInterrupt):
288        _core.run(check_unprotected_kill)
289    assert record == {"s1 ok", "s2 ok", "r1 raise ok"}
290
291    # simulated control-C during raiser, which is *protected*, so the KI gets
292    # delivered to the main task instead
293    print("check 2")
294    record = set()
295
296    async def check_protected_kill():
297        async with _core.open_nursery() as nursery:
298            nursery.start_soon(sleeper, "s1", record)
299            nursery.start_soon(sleeper, "s2", record)
300            nursery.start_soon(_core.enable_ki_protection(raiser), "r1", record)
301            # __aexit__ blocks, and then receives the KI
302
303    with pytest.raises(KeyboardInterrupt):
304        _core.run(check_protected_kill)
305    assert record == {"s1 ok", "s2 ok", "r1 cancel ok"}
306
307    # kill at last moment still raises (run_sync_soon until it raises an
308    # error, then kill)
309    print("check 3")
310
311    async def check_kill_during_shutdown():
312        token = _core.current_trio_token()
313
314        def kill_during_shutdown():
315            assert _core.currently_ki_protected()
316            try:
317                token.run_sync_soon(kill_during_shutdown)
318            except _core.RunFinishedError:
319                # it's too late for regular handling! handle this!
320                print("kill! kill!")
321                ki_self()
322
323        token.run_sync_soon(kill_during_shutdown)
324
325    with pytest.raises(KeyboardInterrupt):
326        _core.run(check_kill_during_shutdown)
327
328    # KI arrives very early, before main is even spawned
329    print("check 4")
330
331    class InstrumentOfDeath:
332        def before_run(self):
333            ki_self()
334
335    async def main():
336        await _core.checkpoint()
337
338    with pytest.raises(KeyboardInterrupt):
339        _core.run(main, instruments=[InstrumentOfDeath()])
340
341    # checkpoint_if_cancelled notices pending KI
342    print("check 5")
343
344    @_core.enable_ki_protection
345    async def main():
346        assert _core.currently_ki_protected()
347        ki_self()
348        with pytest.raises(KeyboardInterrupt):
349            await _core.checkpoint_if_cancelled()
350
351    _core.run(main)
352
353    # KI arrives while main task is not abortable, b/c already scheduled
354    print("check 6")
355
356    @_core.enable_ki_protection
357    async def main():
358        assert _core.currently_ki_protected()
359        ki_self()
360        await _core.cancel_shielded_checkpoint()
361        await _core.cancel_shielded_checkpoint()
362        await _core.cancel_shielded_checkpoint()
363        with pytest.raises(KeyboardInterrupt):
364            await _core.checkpoint()
365
366    _core.run(main)
367
368    # KI arrives while main task is not abortable, b/c refuses to be aborted
369    print("check 7")
370
371    @_core.enable_ki_protection
372    async def main():
373        assert _core.currently_ki_protected()
374        ki_self()
375        task = _core.current_task()
376
377        def abort(_):
378            _core.reschedule(task, outcome.Value(1))
379            return _core.Abort.FAILED
380
381        assert await _core.wait_task_rescheduled(abort) == 1
382        with pytest.raises(KeyboardInterrupt):
383            await _core.checkpoint()
384
385    _core.run(main)
386
387    # KI delivered via slow abort
388    print("check 8")
389
390    @_core.enable_ki_protection
391    async def main():
392        assert _core.currently_ki_protected()
393        ki_self()
394        task = _core.current_task()
395
396        def abort(raise_cancel):
397            result = outcome.capture(raise_cancel)
398            _core.reschedule(task, result)
399            return _core.Abort.FAILED
400
401        with pytest.raises(KeyboardInterrupt):
402            assert await _core.wait_task_rescheduled(abort)
403        await _core.checkpoint()
404
405    _core.run(main)
406
407    # KI arrives just before main task exits, so the run_sync_soon machinery
408    # is still functioning and will accept the callback to deliver the KI, but
409    # by the time the callback is actually run, main has exited and can't be
410    # aborted.
411    print("check 9")
412
413    @_core.enable_ki_protection
414    async def main():
415        ki_self()
416
417    with pytest.raises(KeyboardInterrupt):
418        _core.run(main)
419
420    print("check 10")
421    # KI in unprotected code, with
422    # restrict_keyboard_interrupt_to_checkpoints=True
423    record = []
424
425    async def main():
426        # We're not KI protected...
427        assert not _core.currently_ki_protected()
428        ki_self()
429        # ...but even after the KI, we keep running uninterrupted...
430        record.append("ok")
431        # ...until we hit a checkpoint:
432        with pytest.raises(KeyboardInterrupt):
433            await sleep(10)
434
435    _core.run(main, restrict_keyboard_interrupt_to_checkpoints=True)
436    assert record == ["ok"]
437    record = []
438    # Exact same code raises KI early if we leave off the argument, doesn't
439    # even reach the record.append call:
440    with pytest.raises(KeyboardInterrupt):
441        _core.run(main)
442    assert record == []
443
444    # KI arrives while main task is inside a cancelled cancellation scope
445    # the KeyboardInterrupt should take priority
446    print("check 11")
447
448    @_core.enable_ki_protection
449    async def main():
450        assert _core.currently_ki_protected()
451        with _core.CancelScope() as cancel_scope:
452            cancel_scope.cancel()
453            with pytest.raises(_core.Cancelled):
454                await _core.checkpoint()
455            ki_self()
456            with pytest.raises(KeyboardInterrupt):
457                await _core.checkpoint()
458            with pytest.raises(_core.Cancelled):
459                await _core.checkpoint()
460
461    _core.run(main)
462
463
464def test_ki_is_good_neighbor():
465    # in the unlikely event someone overwrites our signal handler, we leave
466    # the overwritten one be
467    try:
468        orig = signal.getsignal(signal.SIGINT)
469
470        def my_handler(signum, frame):  # pragma: no cover
471            pass
472
473        async def main():
474            signal.signal(signal.SIGINT, my_handler)
475
476        _core.run(main)
477
478        assert signal.getsignal(signal.SIGINT) is my_handler
479    finally:
480        signal.signal(signal.SIGINT, orig)
481
482
483# Regression test for #461
484def test_ki_with_broken_threads():
485    thread = threading.main_thread()
486
487    # scary!
488    original = threading._active[thread.ident]
489
490    # put this in a try finally so we don't have a chance of cascading a
491    # breakage down to everything else
492    try:
493        del threading._active[thread.ident]
494
495        @_core.enable_ki_protection
496        async def inner():
497            assert signal.getsignal(signal.SIGINT) != signal.default_int_handler
498
499        _core.run(inner)
500    finally:
501        threading._active[thread.ident] = original
502
503
504# For details on why this test is non-trivial, see:
505#   https://github.com/python-trio/trio/issues/42
506#   https://github.com/python-trio/trio/issues/109
507@slow
508def test_ki_wakes_us_up():
509    assert is_main_thread()
510
511    # This test is flaky due to a race condition on Windows; see:
512    #   https://github.com/python-trio/trio/issues/119
513    #   https://bugs.python.org/issue30038
514    # I think the only fix is to wait for fixed CPython to be released, so in
515    # the mean time, on affected versions we send two signals (equivalent to
516    # hitting control-C twice). This works because the problem is that the C
517    # level signal handler does
518    #
519    #   write-to-fd -> set-flags
520    #
521    # and we need
522    #
523    #   set-flags -> write-to-fd
524    #
525    # so running the C level signal handler twice does
526    #
527    #   write-to-fd -> set-flags -> write-to-fd -> set-flags
528    #
529    # which contains the desired sequence.
530    #
531    # Affected version of CPython include 3.6.1 and earlier.
532    # It's fixed in 3.6.2 and 3.7+
533    #
534    # PyPy was never affected.
535    #
536    # The problem technically can occur on Unix as well, if a signal is
537    # delivered to a non-main thread, though we haven't observed this in
538    # practice.
539    #
540    # There's also this theoretical problem, but hopefully it won't actually
541    # bite us in practice:
542    #   https://bugs.python.org/issue31119
543    #   https://bitbucket.org/pypy/pypy/issues/2623
544    import platform
545
546    buggy_wakeup_fd = (
547        sys.version_info < (3, 6, 2) and platform.python_implementation() == "CPython"
548    )
549
550    # lock is only needed to avoid an annoying race condition where the
551    # *second* ki_self() call arrives *after* the first one woke us up and its
552    # KeyboardInterrupt was caught, and then generates a second
553    # KeyboardInterrupt that aborts the test run. The kill_soon thread holds
554    # the lock while doing the calls to ki_self, which means that it holds it
555    # while the C-level signal handler is running. Then in the main thread,
556    # when we're woken up we know that ki_self() has been run at least once;
557    # if we then take the lock it guaranteeds that ki_self() has been run
558    # twice, so if a second KeyboardInterrupt is going to arrive it should
559    # arrive by the time we've acquired the lock. This lets us force it to
560    # happen inside the pytest.raises block.
561    #
562    # It will be very nice when the buggy_wakeup_fd bug is fixed.
563    lock = threading.Lock()
564
565    def kill_soon():
566        # We want the signal to be raised after the main thread has entered
567        # the IO manager blocking primitive. There really is no way to
568        # deterministically interlock with that, so we have to use sleep and
569        # hope it's long enough.
570        time.sleep(1.1)
571        with lock:
572            print("thread doing ki_self()")
573            ki_self()
574            if buggy_wakeup_fd:
575                print("buggy_wakeup_fd =", buggy_wakeup_fd)
576                ki_self()
577
578    async def main():
579        thread = threading.Thread(target=kill_soon)
580        print("Starting thread")
581        thread.start()
582        try:
583            with pytest.raises(KeyboardInterrupt):
584                # To limit the damage on CI if this does get broken (as
585                # compared to sleep_forever())
586                print("Going to sleep")
587                try:
588                    await sleep(20)
589                    print("Woke without raising?!")  # pragma: no cover
590                # The only purpose of this finally: block is to soak up the
591                # second KeyboardInterrupt that might arrive on
592                # buggy_wakeup_fd platforms. So it might get aborted at any
593                # moment randomly on some runs, so pragma: no cover avoids
594                # coverage flapping:
595                finally:  # pragma: no cover
596                    print("waiting for lock")
597                    with lock:
598                        print("got lock")
599                    # And then we want to force a PyErr_CheckSignals. Which is
600                    # not so easy on Windows. Weird kluge: builtin_repr calls
601                    # PyObject_Repr, which does an unconditional
602                    # PyErr_CheckSignals for some reason.
603                    print(repr(None))
604                    # And finally, it's possible that the signal was delivered
605                    # but at a moment when we had KI protection enabled, so we
606                    # need to execute a checkpoint to ensure it's delivered
607                    # before we exit main().
608                    await _core.checkpoint()
609        finally:
610            print("joining thread", sys.exc_info())
611            thread.join()
612
613    start = time.perf_counter()
614    try:
615        _core.run(main)
616    finally:
617        end = time.perf_counter()
618        print("duration", end - start)
619        print("sys.exc_info", sys.exc_info())
620    assert 1.0 <= (end - start) < 2
621