1from contextlib import contextmanager
2import datetime
3import faulthandler
4import os
5import signal
6import subprocess
7import sys
8import sysconfig
9from test import support
10from test.support import script_helper, is_android
11import tempfile
12import unittest
13from textwrap import dedent
14
15try:
16    import _testcapi
17except ImportError:
18    _testcapi = None
19
20TIMEOUT = 0.5
21MS_WINDOWS = (os.name == 'nt')
22_cflags = sysconfig.get_config_var('CFLAGS') or ''
23_config_args = sysconfig.get_config_var('CONFIG_ARGS') or ''
24UB_SANITIZER = (
25    '-fsanitize=undefined' in _cflags or
26    '--with-undefined-behavior-sanitizer' in _config_args
27)
28MEMORY_SANITIZER = (
29    '-fsanitize=memory' in _cflags or
30    '--with-memory-sanitizer' in _config_args
31)
32
33
34def expected_traceback(lineno1, lineno2, header, min_count=1):
35    regex = header
36    regex += '  File "<string>", line %s in func\n' % lineno1
37    regex += '  File "<string>", line %s in <module>' % lineno2
38    if 1 < min_count:
39        return '^' + (regex + '\n') * (min_count - 1) + regex
40    else:
41        return '^' + regex + '$'
42
43def skip_segfault_on_android(test):
44    # Issue #32138: Raising SIGSEGV on Android may not cause a crash.
45    return unittest.skipIf(is_android,
46                           'raising SIGSEGV on Android is unreliable')(test)
47
48@contextmanager
49def temporary_filename():
50    filename = tempfile.mktemp()
51    try:
52        yield filename
53    finally:
54        support.unlink(filename)
55
56class FaultHandlerTests(unittest.TestCase):
57    def get_output(self, code, filename=None, fd=None):
58        """
59        Run the specified code in Python (in a new child process) and read the
60        output from the standard error or from a file (if filename is set).
61        Return the output lines as a list.
62
63        Strip the reference count from the standard error for Python debug
64        build, and replace "Current thread 0x00007f8d8fbd9700" by "Current
65        thread XXX".
66        """
67        code = dedent(code).strip()
68        pass_fds = []
69        if fd is not None:
70            pass_fds.append(fd)
71        with support.SuppressCrashReport():
72            process = script_helper.spawn_python('-c', code, pass_fds=pass_fds)
73            with process:
74                stdout, stderr = process.communicate()
75                exitcode = process.wait()
76        output = support.strip_python_stderr(stdout)
77        output = output.decode('ascii', 'backslashreplace')
78        if filename:
79            self.assertEqual(output, '')
80            with open(filename, "rb") as fp:
81                output = fp.read()
82            output = output.decode('ascii', 'backslashreplace')
83        elif fd is not None:
84            self.assertEqual(output, '')
85            os.lseek(fd, os.SEEK_SET, 0)
86            with open(fd, "rb", closefd=False) as fp:
87                output = fp.read()
88            output = output.decode('ascii', 'backslashreplace')
89        return output.splitlines(), exitcode
90
91    def check_error(self, code, line_number, fatal_error, *,
92                    filename=None, all_threads=True, other_regex=None,
93                    fd=None, know_current_thread=True,
94                    py_fatal_error=False):
95        """
96        Check that the fault handler for fatal errors is enabled and check the
97        traceback from the child process output.
98
99        Raise an error if the output doesn't match the expected format.
100        """
101        if all_threads:
102            if know_current_thread:
103                header = 'Current thread 0x[0-9a-f]+'
104            else:
105                header = 'Thread 0x[0-9a-f]+'
106        else:
107            header = 'Stack'
108        regex = r"""
109            (?m)^{fatal_error}
110
111            {header} \(most recent call first\):
112              File "<string>", line {lineno} in <module>
113            """
114        if py_fatal_error:
115            fatal_error += "\nPython runtime state: initialized"
116        regex = dedent(regex).format(
117            lineno=line_number,
118            fatal_error=fatal_error,
119            header=header).strip()
120        if other_regex:
121            regex += '|' + other_regex
122        output, exitcode = self.get_output(code, filename=filename, fd=fd)
123        output = '\n'.join(output)
124        self.assertRegex(output, regex)
125        self.assertNotEqual(exitcode, 0)
126
127    def check_fatal_error(self, code, line_number, name_regex, **kw):
128        fatal_error = 'Fatal Python error: %s' % name_regex
129        self.check_error(code, line_number, fatal_error, **kw)
130
131    def check_windows_exception(self, code, line_number, name_regex, **kw):
132        fatal_error = 'Windows fatal exception: %s' % name_regex
133        self.check_error(code, line_number, fatal_error, **kw)
134
135    @unittest.skipIf(sys.platform.startswith('aix'),
136                     "the first page of memory is a mapped read-only on AIX")
137    def test_read_null(self):
138        if not MS_WINDOWS:
139            self.check_fatal_error("""
140                import faulthandler
141                faulthandler.enable()
142                faulthandler._read_null()
143                """,
144                3,
145                # Issue #12700: Read NULL raises SIGILL on Mac OS X Lion
146                '(?:Segmentation fault'
147                    '|Bus error'
148                    '|Illegal instruction)')
149        else:
150            self.check_windows_exception("""
151                import faulthandler
152                faulthandler.enable()
153                faulthandler._read_null()
154                """,
155                3,
156                'access violation')
157
158    @skip_segfault_on_android
159    def test_sigsegv(self):
160        self.check_fatal_error("""
161            import faulthandler
162            faulthandler.enable()
163            faulthandler._sigsegv()
164            """,
165            3,
166            'Segmentation fault')
167
168    def test_fatal_error_c_thread(self):
169        self.check_fatal_error("""
170            import faulthandler
171            faulthandler.enable()
172            faulthandler._fatal_error_c_thread()
173            """,
174            3,
175            'in new thread',
176            know_current_thread=False,
177            py_fatal_error=True)
178
179    def test_sigabrt(self):
180        self.check_fatal_error("""
181            import faulthandler
182            faulthandler.enable()
183            faulthandler._sigabrt()
184            """,
185            3,
186            'Aborted')
187
188    @unittest.skipIf(sys.platform == 'win32',
189                     "SIGFPE cannot be caught on Windows")
190    def test_sigfpe(self):
191        self.check_fatal_error("""
192            import faulthandler
193            faulthandler.enable()
194            faulthandler._sigfpe()
195            """,
196            3,
197            'Floating point exception')
198
199    @unittest.skipIf(_testcapi is None, 'need _testcapi')
200    @unittest.skipUnless(hasattr(signal, 'SIGBUS'), 'need signal.SIGBUS')
201    @skip_segfault_on_android
202    def test_sigbus(self):
203        self.check_fatal_error("""
204            import faulthandler
205            import signal
206
207            faulthandler.enable()
208            signal.raise_signal(signal.SIGBUS)
209            """,
210            5,
211            'Bus error')
212
213    @unittest.skipIf(_testcapi is None, 'need _testcapi')
214    @unittest.skipUnless(hasattr(signal, 'SIGILL'), 'need signal.SIGILL')
215    @skip_segfault_on_android
216    def test_sigill(self):
217        self.check_fatal_error("""
218            import faulthandler
219            import signal
220
221            faulthandler.enable()
222            signal.raise_signal(signal.SIGILL)
223            """,
224            5,
225            'Illegal instruction')
226
227    def test_fatal_error(self):
228        self.check_fatal_error("""
229            import faulthandler
230            faulthandler._fatal_error(b'xyz')
231            """,
232            2,
233            'xyz',
234            py_fatal_error=True)
235
236    def test_fatal_error_without_gil(self):
237        self.check_fatal_error("""
238            import faulthandler
239            faulthandler._fatal_error(b'xyz', True)
240            """,
241            2,
242            'xyz',
243            py_fatal_error=True)
244
245    @unittest.skipIf(sys.platform.startswith('openbsd'),
246                     "Issue #12868: sigaltstack() doesn't work on "
247                     "OpenBSD if Python is compiled with pthread")
248    @unittest.skipIf(not hasattr(faulthandler, '_stack_overflow'),
249                     'need faulthandler._stack_overflow()')
250    def test_stack_overflow(self):
251        self.check_fatal_error("""
252            import faulthandler
253            faulthandler.enable()
254            faulthandler._stack_overflow()
255            """,
256            3,
257            '(?:Segmentation fault|Bus error)',
258            other_regex='unable to raise a stack overflow')
259
260    @skip_segfault_on_android
261    def test_gil_released(self):
262        self.check_fatal_error("""
263            import faulthandler
264            faulthandler.enable()
265            faulthandler._sigsegv(True)
266            """,
267            3,
268            'Segmentation fault')
269
270    @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER,
271                     "sanitizer builds change crashing process output.")
272    @skip_segfault_on_android
273    def test_enable_file(self):
274        with temporary_filename() as filename:
275            self.check_fatal_error("""
276                import faulthandler
277                output = open({filename}, 'wb')
278                faulthandler.enable(output)
279                faulthandler._sigsegv()
280                """.format(filename=repr(filename)),
281                4,
282                'Segmentation fault',
283                filename=filename)
284
285    @unittest.skipIf(sys.platform == "win32",
286                     "subprocess doesn't support pass_fds on Windows")
287    @unittest.skipIf(UB_SANITIZER or MEMORY_SANITIZER,
288                     "sanitizer builds change crashing process output.")
289    @skip_segfault_on_android
290    def test_enable_fd(self):
291        with tempfile.TemporaryFile('wb+') as fp:
292            fd = fp.fileno()
293            self.check_fatal_error("""
294                import faulthandler
295                import sys
296                faulthandler.enable(%s)
297                faulthandler._sigsegv()
298                """ % fd,
299                4,
300                'Segmentation fault',
301                fd=fd)
302
303    @skip_segfault_on_android
304    def test_enable_single_thread(self):
305        self.check_fatal_error("""
306            import faulthandler
307            faulthandler.enable(all_threads=False)
308            faulthandler._sigsegv()
309            """,
310            3,
311            'Segmentation fault',
312            all_threads=False)
313
314    @skip_segfault_on_android
315    def test_disable(self):
316        code = """
317            import faulthandler
318            faulthandler.enable()
319            faulthandler.disable()
320            faulthandler._sigsegv()
321            """
322        not_expected = 'Fatal Python error'
323        stderr, exitcode = self.get_output(code)
324        stderr = '\n'.join(stderr)
325        self.assertTrue(not_expected not in stderr,
326                     "%r is present in %r" % (not_expected, stderr))
327        self.assertNotEqual(exitcode, 0)
328
329    def test_is_enabled(self):
330        orig_stderr = sys.stderr
331        try:
332            # regrtest may replace sys.stderr by io.StringIO object, but
333            # faulthandler.enable() requires that sys.stderr has a fileno()
334            # method
335            sys.stderr = sys.__stderr__
336
337            was_enabled = faulthandler.is_enabled()
338            try:
339                faulthandler.enable()
340                self.assertTrue(faulthandler.is_enabled())
341                faulthandler.disable()
342                self.assertFalse(faulthandler.is_enabled())
343            finally:
344                if was_enabled:
345                    faulthandler.enable()
346                else:
347                    faulthandler.disable()
348        finally:
349            sys.stderr = orig_stderr
350
351    def test_disabled_by_default(self):
352        # By default, the module should be disabled
353        code = "import faulthandler; print(faulthandler.is_enabled())"
354        args = (sys.executable, "-E", "-c", code)
355        # don't use assert_python_ok() because it always enables faulthandler
356        output = subprocess.check_output(args)
357        self.assertEqual(output.rstrip(), b"False")
358
359    def test_sys_xoptions(self):
360        # Test python -X faulthandler
361        code = "import faulthandler; print(faulthandler.is_enabled())"
362        args = filter(None, (sys.executable,
363                             "-E" if sys.flags.ignore_environment else "",
364                             "-X", "faulthandler", "-c", code))
365        env = os.environ.copy()
366        env.pop("PYTHONFAULTHANDLER", None)
367        # don't use assert_python_ok() because it always enables faulthandler
368        output = subprocess.check_output(args, env=env)
369        self.assertEqual(output.rstrip(), b"True")
370
371    def test_env_var(self):
372        # empty env var
373        code = "import faulthandler; print(faulthandler.is_enabled())"
374        args = (sys.executable, "-c", code)
375        env = dict(os.environ)
376        env['PYTHONFAULTHANDLER'] = ''
377        env['PYTHONDEVMODE'] = ''
378        # don't use assert_python_ok() because it always enables faulthandler
379        output = subprocess.check_output(args, env=env)
380        self.assertEqual(output.rstrip(), b"False")
381
382        # non-empty env var
383        env = dict(os.environ)
384        env['PYTHONFAULTHANDLER'] = '1'
385        env['PYTHONDEVMODE'] = ''
386        output = subprocess.check_output(args, env=env)
387        self.assertEqual(output.rstrip(), b"True")
388
389    def check_dump_traceback(self, *, filename=None, fd=None):
390        """
391        Explicitly call dump_traceback() function and check its output.
392        Raise an error if the output doesn't match the expected format.
393        """
394        code = """
395            import faulthandler
396
397            filename = {filename!r}
398            fd = {fd}
399
400            def funcB():
401                if filename:
402                    with open(filename, "wb") as fp:
403                        faulthandler.dump_traceback(fp, all_threads=False)
404                elif fd is not None:
405                    faulthandler.dump_traceback(fd,
406                                                all_threads=False)
407                else:
408                    faulthandler.dump_traceback(all_threads=False)
409
410            def funcA():
411                funcB()
412
413            funcA()
414            """
415        code = code.format(
416            filename=filename,
417            fd=fd,
418        )
419        if filename:
420            lineno = 9
421        elif fd is not None:
422            lineno = 11
423        else:
424            lineno = 14
425        expected = [
426            'Stack (most recent call first):',
427            '  File "<string>", line %s in funcB' % lineno,
428            '  File "<string>", line 17 in funcA',
429            '  File "<string>", line 19 in <module>'
430        ]
431        trace, exitcode = self.get_output(code, filename, fd)
432        self.assertEqual(trace, expected)
433        self.assertEqual(exitcode, 0)
434
435    def test_dump_traceback(self):
436        self.check_dump_traceback()
437
438    def test_dump_traceback_file(self):
439        with temporary_filename() as filename:
440            self.check_dump_traceback(filename=filename)
441
442    @unittest.skipIf(sys.platform == "win32",
443                     "subprocess doesn't support pass_fds on Windows")
444    def test_dump_traceback_fd(self):
445        with tempfile.TemporaryFile('wb+') as fp:
446            self.check_dump_traceback(fd=fp.fileno())
447
448    def test_truncate(self):
449        maxlen = 500
450        func_name = 'x' * (maxlen + 50)
451        truncated = 'x' * maxlen + '...'
452        code = """
453            import faulthandler
454
455            def {func_name}():
456                faulthandler.dump_traceback(all_threads=False)
457
458            {func_name}()
459            """
460        code = code.format(
461            func_name=func_name,
462        )
463        expected = [
464            'Stack (most recent call first):',
465            '  File "<string>", line 4 in %s' % truncated,
466            '  File "<string>", line 6 in <module>'
467        ]
468        trace, exitcode = self.get_output(code)
469        self.assertEqual(trace, expected)
470        self.assertEqual(exitcode, 0)
471
472    def check_dump_traceback_threads(self, filename):
473        """
474        Call explicitly dump_traceback(all_threads=True) and check the output.
475        Raise an error if the output doesn't match the expected format.
476        """
477        code = """
478            import faulthandler
479            from threading import Thread, Event
480            import time
481
482            def dump():
483                if {filename}:
484                    with open({filename}, "wb") as fp:
485                        faulthandler.dump_traceback(fp, all_threads=True)
486                else:
487                    faulthandler.dump_traceback(all_threads=True)
488
489            class Waiter(Thread):
490                # avoid blocking if the main thread raises an exception.
491                daemon = True
492
493                def __init__(self):
494                    Thread.__init__(self)
495                    self.running = Event()
496                    self.stop = Event()
497
498                def run(self):
499                    self.running.set()
500                    self.stop.wait()
501
502            waiter = Waiter()
503            waiter.start()
504            waiter.running.wait()
505            dump()
506            waiter.stop.set()
507            waiter.join()
508            """
509        code = code.format(filename=repr(filename))
510        output, exitcode = self.get_output(code, filename)
511        output = '\n'.join(output)
512        if filename:
513            lineno = 8
514        else:
515            lineno = 10
516        regex = r"""
517            ^Thread 0x[0-9a-f]+ \(most recent call first\):
518            (?:  File ".*threading.py", line [0-9]+ in [_a-z]+
519            ){{1,3}}  File "<string>", line 23 in run
520              File ".*threading.py", line [0-9]+ in _bootstrap_inner
521              File ".*threading.py", line [0-9]+ in _bootstrap
522
523            Current thread 0x[0-9a-f]+ \(most recent call first\):
524              File "<string>", line {lineno} in dump
525              File "<string>", line 28 in <module>$
526            """
527        regex = dedent(regex.format(lineno=lineno)).strip()
528        self.assertRegex(output, regex)
529        self.assertEqual(exitcode, 0)
530
531    def test_dump_traceback_threads(self):
532        self.check_dump_traceback_threads(None)
533
534    def test_dump_traceback_threads_file(self):
535        with temporary_filename() as filename:
536            self.check_dump_traceback_threads(filename)
537
538    @unittest.skipIf(not hasattr(faulthandler, 'dump_traceback_later'),
539                     'need faulthandler.dump_traceback_later()')
540    def check_dump_traceback_later(self, repeat=False, cancel=False, loops=1,
541                                   *, filename=None, fd=None):
542        """
543        Check how many times the traceback is written in timeout x 2.5 seconds,
544        or timeout x 3.5 seconds if cancel is True: 1, 2 or 3 times depending
545        on repeat and cancel options.
546
547        Raise an error if the output doesn't match the expect format.
548        """
549        timeout_str = str(datetime.timedelta(seconds=TIMEOUT))
550        code = """
551            import faulthandler
552            import time
553            import sys
554
555            timeout = {timeout}
556            repeat = {repeat}
557            cancel = {cancel}
558            loops = {loops}
559            filename = {filename!r}
560            fd = {fd}
561
562            def func(timeout, repeat, cancel, file, loops):
563                for loop in range(loops):
564                    faulthandler.dump_traceback_later(timeout, repeat=repeat, file=file)
565                    if cancel:
566                        faulthandler.cancel_dump_traceback_later()
567                    time.sleep(timeout * 5)
568                    faulthandler.cancel_dump_traceback_later()
569
570            if filename:
571                file = open(filename, "wb")
572            elif fd is not None:
573                file = sys.stderr.fileno()
574            else:
575                file = None
576            func(timeout, repeat, cancel, file, loops)
577            if filename:
578                file.close()
579            """
580        code = code.format(
581            timeout=TIMEOUT,
582            repeat=repeat,
583            cancel=cancel,
584            loops=loops,
585            filename=filename,
586            fd=fd,
587        )
588        trace, exitcode = self.get_output(code, filename)
589        trace = '\n'.join(trace)
590
591        if not cancel:
592            count = loops
593            if repeat:
594                count *= 2
595            header = r'Timeout \(%s\)!\nThread 0x[0-9a-f]+ \(most recent call first\):\n' % timeout_str
596            regex = expected_traceback(17, 26, header, min_count=count)
597            self.assertRegex(trace, regex)
598        else:
599            self.assertEqual(trace, '')
600        self.assertEqual(exitcode, 0)
601
602    def test_dump_traceback_later(self):
603        self.check_dump_traceback_later()
604
605    def test_dump_traceback_later_repeat(self):
606        self.check_dump_traceback_later(repeat=True)
607
608    def test_dump_traceback_later_cancel(self):
609        self.check_dump_traceback_later(cancel=True)
610
611    def test_dump_traceback_later_file(self):
612        with temporary_filename() as filename:
613            self.check_dump_traceback_later(filename=filename)
614
615    @unittest.skipIf(sys.platform == "win32",
616                     "subprocess doesn't support pass_fds on Windows")
617    def test_dump_traceback_later_fd(self):
618        with tempfile.TemporaryFile('wb+') as fp:
619            self.check_dump_traceback_later(fd=fp.fileno())
620
621    def test_dump_traceback_later_twice(self):
622        self.check_dump_traceback_later(loops=2)
623
624    @unittest.skipIf(not hasattr(faulthandler, "register"),
625                     "need faulthandler.register")
626    def check_register(self, filename=False, all_threads=False,
627                       unregister=False, chain=False, fd=None):
628        """
629        Register a handler displaying the traceback on a user signal. Raise the
630        signal and check the written traceback.
631
632        If chain is True, check that the previous signal handler is called.
633
634        Raise an error if the output doesn't match the expected format.
635        """
636        signum = signal.SIGUSR1
637        code = """
638            import faulthandler
639            import os
640            import signal
641            import sys
642
643            all_threads = {all_threads}
644            signum = {signum}
645            unregister = {unregister}
646            chain = {chain}
647            filename = {filename!r}
648            fd = {fd}
649
650            def func(signum):
651                os.kill(os.getpid(), signum)
652
653            def handler(signum, frame):
654                handler.called = True
655            handler.called = False
656
657            if filename:
658                file = open(filename, "wb")
659            elif fd is not None:
660                file = sys.stderr.fileno()
661            else:
662                file = None
663            if chain:
664                signal.signal(signum, handler)
665            faulthandler.register(signum, file=file,
666                                  all_threads=all_threads, chain={chain})
667            if unregister:
668                faulthandler.unregister(signum)
669            func(signum)
670            if chain and not handler.called:
671                if file is not None:
672                    output = file
673                else:
674                    output = sys.stderr
675                print("Error: signal handler not called!", file=output)
676                exitcode = 1
677            else:
678                exitcode = 0
679            if filename:
680                file.close()
681            sys.exit(exitcode)
682            """
683        code = code.format(
684            all_threads=all_threads,
685            signum=signum,
686            unregister=unregister,
687            chain=chain,
688            filename=filename,
689            fd=fd,
690        )
691        trace, exitcode = self.get_output(code, filename)
692        trace = '\n'.join(trace)
693        if not unregister:
694            if all_threads:
695                regex = r'Current thread 0x[0-9a-f]+ \(most recent call first\):\n'
696            else:
697                regex = r'Stack \(most recent call first\):\n'
698            regex = expected_traceback(14, 32, regex)
699            self.assertRegex(trace, regex)
700        else:
701            self.assertEqual(trace, '')
702        if unregister:
703            self.assertNotEqual(exitcode, 0)
704        else:
705            self.assertEqual(exitcode, 0)
706
707    def test_register(self):
708        self.check_register()
709
710    def test_unregister(self):
711        self.check_register(unregister=True)
712
713    def test_register_file(self):
714        with temporary_filename() as filename:
715            self.check_register(filename=filename)
716
717    @unittest.skipIf(sys.platform == "win32",
718                     "subprocess doesn't support pass_fds on Windows")
719    def test_register_fd(self):
720        with tempfile.TemporaryFile('wb+') as fp:
721            self.check_register(fd=fp.fileno())
722
723    def test_register_threads(self):
724        self.check_register(all_threads=True)
725
726    def test_register_chain(self):
727        self.check_register(chain=True)
728
729    @contextmanager
730    def check_stderr_none(self):
731        stderr = sys.stderr
732        try:
733            sys.stderr = None
734            with self.assertRaises(RuntimeError) as cm:
735                yield
736            self.assertEqual(str(cm.exception), "sys.stderr is None")
737        finally:
738            sys.stderr = stderr
739
740    def test_stderr_None(self):
741        # Issue #21497: provide a helpful error if sys.stderr is None,
742        # instead of just an attribute error: "None has no attribute fileno".
743        with self.check_stderr_none():
744            faulthandler.enable()
745        with self.check_stderr_none():
746            faulthandler.dump_traceback()
747        if hasattr(faulthandler, 'dump_traceback_later'):
748            with self.check_stderr_none():
749                faulthandler.dump_traceback_later(1e-3)
750        if hasattr(faulthandler, "register"):
751            with self.check_stderr_none():
752                faulthandler.register(signal.SIGUSR1)
753
754    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
755    def test_raise_exception(self):
756        for exc, name in (
757            ('EXCEPTION_ACCESS_VIOLATION', 'access violation'),
758            ('EXCEPTION_INT_DIVIDE_BY_ZERO', 'int divide by zero'),
759            ('EXCEPTION_STACK_OVERFLOW', 'stack overflow'),
760        ):
761            self.check_windows_exception(f"""
762                import faulthandler
763                faulthandler.enable()
764                faulthandler._raise_exception(faulthandler._{exc})
765                """,
766                3,
767                name)
768
769    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
770    def test_ignore_exception(self):
771        for exc_code in (
772            0xE06D7363,   # MSC exception ("Emsc")
773            0xE0434352,   # COM Callable Runtime exception ("ECCR")
774        ):
775            code = f"""
776                    import faulthandler
777                    faulthandler.enable()
778                    faulthandler._raise_exception({exc_code})
779                    """
780            code = dedent(code)
781            output, exitcode = self.get_output(code)
782            self.assertEqual(output, [])
783            self.assertEqual(exitcode, exc_code)
784
785    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
786    def test_raise_nonfatal_exception(self):
787        # These exceptions are not strictly errors. Letting
788        # faulthandler display the traceback when they are
789        # raised is likely to result in noise. However, they
790        # may still terminate the process if there is no
791        # handler installed for them (which there typically
792        # is, e.g. for debug messages).
793        for exc in (
794            0x00000000,
795            0x34567890,
796            0x40000000,
797            0x40001000,
798            0x70000000,
799            0x7FFFFFFF,
800        ):
801            output, exitcode = self.get_output(f"""
802                import faulthandler
803                faulthandler.enable()
804                faulthandler._raise_exception(0x{exc:x})
805                """
806            )
807            self.assertEqual(output, [])
808            # On Windows older than 7 SP1, the actual exception code has
809            # bit 29 cleared.
810            self.assertIn(exitcode,
811                          (exc, exc & ~0x10000000))
812
813    @unittest.skipUnless(MS_WINDOWS, 'specific to Windows')
814    def test_disable_windows_exc_handler(self):
815        code = dedent("""
816            import faulthandler
817            faulthandler.enable()
818            faulthandler.disable()
819            code = faulthandler._EXCEPTION_ACCESS_VIOLATION
820            faulthandler._raise_exception(code)
821        """)
822        output, exitcode = self.get_output(code)
823        self.assertEqual(output, [])
824        self.assertEqual(exitcode, 0xC0000005)
825
826
827if __name__ == "__main__":
828    unittest.main()
829