1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from collections import OrderedDict
5import os
6import pickle
7import random
8import re
9import subprocess
10import sys
11import textwrap
12import threading
13import time
14import unittest
15from test import support
16from test.support import MISSING_C_DOCSTRINGS
17from test.support.script_helper import assert_python_failure, assert_python_ok
18try:
19    import _posixsubprocess
20except ImportError:
21    _posixsubprocess = None
22
23# Skip this test if the _testcapi module isn't available.
24_testcapi = support.import_module('_testcapi')
25
26# Were we compiled --with-pydebug or with #define Py_DEBUG?
27Py_DEBUG = hasattr(sys, 'gettotalrefcount')
28
29
30def testfunction(self):
31    """some doc"""
32    return self
33
34
35class InstanceMethod:
36    id = _testcapi.instancemethod(id)
37    testfunction = _testcapi.instancemethod(testfunction)
38
39class CAPITest(unittest.TestCase):
40
41    def test_instancemethod(self):
42        inst = InstanceMethod()
43        self.assertEqual(id(inst), inst.id())
44        self.assertTrue(inst.testfunction() is inst)
45        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
46        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
47
48        InstanceMethod.testfunction.attribute = "test"
49        self.assertEqual(testfunction.attribute, "test")
50        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
51
52    def test_no_FatalError_infinite_loop(self):
53        with support.SuppressCrashReport():
54            p = subprocess.Popen([sys.executable, "-c",
55                                  'import _testcapi;'
56                                  '_testcapi.crash_no_current_thread()'],
57                                 stdout=subprocess.PIPE,
58                                 stderr=subprocess.PIPE)
59        (out, err) = p.communicate()
60        self.assertEqual(out, b'')
61        # This used to cause an infinite loop.
62        self.assertTrue(err.rstrip().startswith(
63                         b'Fatal Python error:'
64                         b' PyThreadState_Get: no current thread'))
65
66    def test_memoryview_from_NULL_pointer(self):
67        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
68
69    def test_exc_info(self):
70        raised_exception = ValueError("5")
71        new_exc = TypeError("TEST")
72        try:
73            raise raised_exception
74        except ValueError as e:
75            tb = e.__traceback__
76            orig_sys_exc_info = sys.exc_info()
77            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
78            new_sys_exc_info = sys.exc_info()
79            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
80            reset_sys_exc_info = sys.exc_info()
81
82            self.assertEqual(orig_exc_info[1], e)
83
84            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
85            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
86            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
87            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
88            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
89        else:
90            self.assertTrue(False)
91
92    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
93    def test_seq_bytes_to_charp_array(self):
94        # Issue #15732: crash in _PySequence_BytesToCharpArray()
95        class Z(object):
96            def __len__(self):
97                return 1
98        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
99                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
100        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
101        class Z(object):
102            def __len__(self):
103                return sys.maxsize
104            def __getitem__(self, i):
105                return b'x'
106        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
107                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
108
109    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
110    def test_subprocess_fork_exec(self):
111        class Z(object):
112            def __len__(self):
113                return 1
114
115        # Issue #15738: crash in subprocess_fork_exec()
116        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
117                          Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17)
118
119    @unittest.skipIf(MISSING_C_DOCSTRINGS,
120                     "Signature information for builtins requires docstrings")
121    def test_docstring_signature_parsing(self):
122
123        self.assertEqual(_testcapi.no_docstring.__doc__, None)
124        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
125
126        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
127        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
128
129        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
130            "This docstring has no signature.")
131        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
132
133        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
134            "docstring_with_invalid_signature($module, /, boo)\n"
135            "\n"
136            "This docstring has an invalid signature."
137            )
138        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
139
140        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
141            "docstring_with_invalid_signature2($module, /, boo)\n"
142            "\n"
143            "--\n"
144            "\n"
145            "This docstring also has an invalid signature."
146            )
147        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
148
149        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
150            "This docstring has a valid signature.")
151        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
152
153        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
154        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
155            "($module, /, sig)")
156
157        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
158            "\nThis docstring has a valid signature and some extra newlines.")
159        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
160            "($module, /, parameter)")
161
162    def test_c_type_with_matrix_multiplication(self):
163        M = _testcapi.matmulType
164        m1 = M()
165        m2 = M()
166        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
167        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
168        self.assertEqual(42 @ m1, ("matmul", 42, m1))
169        o = m1
170        o @= m2
171        self.assertEqual(o, ("imatmul", m1, m2))
172        o = m1
173        o @= 42
174        self.assertEqual(o, ("imatmul", m1, 42))
175        o = 42
176        o @= m1
177        self.assertEqual(o, ("matmul", 42, m1))
178
179    def test_c_type_with_ipow(self):
180        # When the __ipow__ method of a type was implemented in C, using the
181        # modulo param would cause segfaults.
182        o = _testcapi.ipowType()
183        self.assertEqual(o.__ipow__(1), (1, None))
184        self.assertEqual(o.__ipow__(2, 2), (2, 2))
185
186    def test_return_null_without_error(self):
187        # Issue #23571: A function must not return NULL without setting an
188        # error
189        if Py_DEBUG:
190            code = textwrap.dedent("""
191                import _testcapi
192                from test import support
193
194                with support.SuppressCrashReport():
195                    _testcapi.return_null_without_error()
196            """)
197            rc, out, err = assert_python_failure('-c', code)
198            self.assertRegex(err.replace(b'\r', b''),
199                             br'Fatal Python error: a function returned NULL '
200                                br'without setting an error\n'
201                             br'Python runtime state: initialized\n'
202                             br'SystemError: <built-in function '
203                                 br'return_null_without_error> returned NULL '
204                                 br'without setting an error\n'
205                             br'\n'
206                             br'Current thread.*:\n'
207                             br'  File .*", line 6 in <module>')
208        else:
209            with self.assertRaises(SystemError) as cm:
210                _testcapi.return_null_without_error()
211            self.assertRegex(str(cm.exception),
212                             'return_null_without_error.* '
213                             'returned NULL without setting an error')
214
215    def test_return_result_with_error(self):
216        # Issue #23571: A function must not return a result with an error set
217        if Py_DEBUG:
218            code = textwrap.dedent("""
219                import _testcapi
220                from test import support
221
222                with support.SuppressCrashReport():
223                    _testcapi.return_result_with_error()
224            """)
225            rc, out, err = assert_python_failure('-c', code)
226            self.assertRegex(err.replace(b'\r', b''),
227                             br'Fatal Python error: a function returned a '
228                                br'result with an error set\n'
229                             br'Python runtime state: initialized\n'
230                             br'ValueError\n'
231                             br'\n'
232                             br'The above exception was the direct cause '
233                                br'of the following exception:\n'
234                             br'\n'
235                             br'SystemError: <built-in '
236                                br'function return_result_with_error> '
237                                br'returned a result with an error set\n'
238                             br'\n'
239                             br'Current thread.*:\n'
240                             br'  File .*, line 6 in <module>')
241        else:
242            with self.assertRaises(SystemError) as cm:
243                _testcapi.return_result_with_error()
244            self.assertRegex(str(cm.exception),
245                             'return_result_with_error.* '
246                             'returned a result with an error set')
247
248    def test_buildvalue_N(self):
249        _testcapi.test_buildvalue_N()
250
251    def test_set_nomemory(self):
252        code = """if 1:
253            import _testcapi
254
255            class C(): pass
256
257            # The first loop tests both functions and that remove_mem_hooks()
258            # can be called twice in a row. The second loop checks a call to
259            # set_nomemory() after a call to remove_mem_hooks(). The third
260            # loop checks the start and stop arguments of set_nomemory().
261            for outer_cnt in range(1, 4):
262                start = 10 * outer_cnt
263                for j in range(100):
264                    if j == 0:
265                        if outer_cnt != 3:
266                            _testcapi.set_nomemory(start)
267                        else:
268                            _testcapi.set_nomemory(start, start + 1)
269                    try:
270                        C()
271                    except MemoryError as e:
272                        if outer_cnt != 3:
273                            _testcapi.remove_mem_hooks()
274                        print('MemoryError', outer_cnt, j)
275                        _testcapi.remove_mem_hooks()
276                        break
277        """
278        rc, out, err = assert_python_ok('-c', code)
279        self.assertIn(b'MemoryError 1 10', out)
280        self.assertIn(b'MemoryError 2 20', out)
281        self.assertIn(b'MemoryError 3 30', out)
282
283    def test_mapping_keys_values_items(self):
284        class Mapping1(dict):
285            def keys(self):
286                return list(super().keys())
287            def values(self):
288                return list(super().values())
289            def items(self):
290                return list(super().items())
291        class Mapping2(dict):
292            def keys(self):
293                return tuple(super().keys())
294            def values(self):
295                return tuple(super().values())
296            def items(self):
297                return tuple(super().items())
298        dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
299
300        for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
301                        dict_obj, OrderedDict(dict_obj),
302                        Mapping1(dict_obj), Mapping2(dict_obj)]:
303            self.assertListEqual(_testcapi.get_mapping_keys(mapping),
304                                 list(mapping.keys()))
305            self.assertListEqual(_testcapi.get_mapping_values(mapping),
306                                 list(mapping.values()))
307            self.assertListEqual(_testcapi.get_mapping_items(mapping),
308                                 list(mapping.items()))
309
310    def test_mapping_keys_values_items_bad_arg(self):
311        self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
312        self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
313        self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
314
315        class BadMapping:
316            def keys(self):
317                return None
318            def values(self):
319                return None
320            def items(self):
321                return None
322        bad_mapping = BadMapping()
323        self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
324        self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
325        self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
326
327    @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
328                         'need _testcapi.negative_refcount')
329    def test_negative_refcount(self):
330        # bpo-35059: Check that Py_DECREF() reports the correct filename
331        # when calling _Py_NegativeRefcount() to abort Python.
332        code = textwrap.dedent("""
333            import _testcapi
334            from test import support
335
336            with support.SuppressCrashReport():
337                _testcapi.negative_refcount()
338        """)
339        rc, out, err = assert_python_failure('-c', code)
340        self.assertRegex(err,
341                         br'_testcapimodule\.c:[0-9]+: '
342                         br'_Py_NegativeRefcount: Assertion failed: '
343                         br'object has negative ref count')
344
345    def test_trashcan_subclass(self):
346        # bpo-35983: Check that the trashcan mechanism for "list" is NOT
347        # activated when its tp_dealloc is being called by a subclass
348        from _testcapi import MyList
349        L = None
350        for i in range(1000):
351            L = MyList((L,))
352
353    @support.requires_resource('cpu')
354    def test_trashcan_python_class1(self):
355        self.do_test_trashcan_python_class(list)
356
357    @support.requires_resource('cpu')
358    def test_trashcan_python_class2(self):
359        from _testcapi import MyList
360        self.do_test_trashcan_python_class(MyList)
361
362    def do_test_trashcan_python_class(self, base):
363        # Check that the trashcan mechanism works properly for a Python
364        # subclass of a class using the trashcan (this specific test assumes
365        # that the base class "base" behaves like list)
366        class PyList(base):
367            # Count the number of PyList instances to verify that there is
368            # no memory leak
369            num = 0
370            def __init__(self, *args):
371                __class__.num += 1
372                super().__init__(*args)
373            def __del__(self):
374                __class__.num -= 1
375
376        for parity in (0, 1):
377            L = None
378            # We need in the order of 2**20 iterations here such that a
379            # typical 8MB stack would overflow without the trashcan.
380            for i in range(2**20):
381                L = PyList((L,))
382                L.attr = i
383            if parity:
384                # Add one additional nesting layer
385                L = (L,)
386            self.assertGreater(PyList.num, 0)
387            del L
388            self.assertEqual(PyList.num, 0)
389
390    def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
391        class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
392            def __init__(self):
393                self.value2 = 20
394                super().__init__()
395
396        subclass_instance = HeapGcCTypeSubclass()
397        type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
398
399        # Test that subclass instance was fully created
400        self.assertEqual(subclass_instance.value, 10)
401        self.assertEqual(subclass_instance.value2, 20)
402
403        # Test that the type reference count is only decremented once
404        del subclass_instance
405        self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
406
407    def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
408        class A(_testcapi.HeapGcCType):
409            def __init__(self):
410                self.value2 = 20
411                super().__init__()
412
413        class B(A):
414            def __init__(self):
415                super().__init__()
416
417            def __del__(self):
418                self.__class__ = A
419                A.refcnt_in_del = sys.getrefcount(A)
420                B.refcnt_in_del = sys.getrefcount(B)
421
422        subclass_instance = B()
423        type_refcnt = sys.getrefcount(B)
424        new_type_refcnt = sys.getrefcount(A)
425
426        # Test that subclass instance was fully created
427        self.assertEqual(subclass_instance.value, 10)
428        self.assertEqual(subclass_instance.value2, 20)
429
430        del subclass_instance
431
432        # Test that setting __class__ modified the reference counts of the types
433        self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
434        self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
435
436        # Test that the original type already has decreased its refcnt
437        self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
438
439        # Test that subtype_dealloc decref the newly assigned __class__ only once
440        self.assertEqual(new_type_refcnt, sys.getrefcount(A))
441
442    def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
443        subclass_instance = _testcapi.HeapCTypeSubclass()
444        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
445
446        # Test that subclass instance was fully created
447        self.assertEqual(subclass_instance.value, 10)
448        self.assertEqual(subclass_instance.value2, 20)
449
450        # Test that the type reference count is only decremented once
451        del subclass_instance
452        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
453
454    def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
455        subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
456        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
457        new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
458
459        # Test that subclass instance was fully created
460        self.assertEqual(subclass_instance.value, 10)
461        self.assertEqual(subclass_instance.value2, 20)
462
463        # The tp_finalize slot will set __class__ to HeapCTypeSubclass
464        del subclass_instance
465
466        # Test that setting __class__ modified the reference counts of the types
467        self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
468        self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
469
470        # Test that the original type already has decreased its refcnt
471        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
472
473        # Test that subtype_dealloc decref the newly assigned __class__ only once
474        self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
475
476    def test_heaptype_with_setattro(self):
477        obj = _testcapi.HeapCTypeSetattr()
478        self.assertEqual(obj.pvalue, 10)
479        obj.value = 12
480        self.assertEqual(obj.pvalue, 12)
481        del obj.value
482        self.assertEqual(obj.pvalue, 0)
483
484    def test_pynumber_tobase(self):
485        from _testcapi import pynumber_tobase
486        self.assertEqual(pynumber_tobase(123, 2), '0b1111011')
487        self.assertEqual(pynumber_tobase(123, 8), '0o173')
488        self.assertEqual(pynumber_tobase(123, 10), '123')
489        self.assertEqual(pynumber_tobase(123, 16), '0x7b')
490        self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011')
491        self.assertEqual(pynumber_tobase(-123, 8), '-0o173')
492        self.assertEqual(pynumber_tobase(-123, 10), '-123')
493        self.assertEqual(pynumber_tobase(-123, 16), '-0x7b')
494        self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
495        self.assertRaises(TypeError, pynumber_tobase, '123', 10)
496        self.assertRaises(SystemError, pynumber_tobase, 123, 0)
497
498
499class TestPendingCalls(unittest.TestCase):
500
501    def pendingcalls_submit(self, l, n):
502        def callback():
503            #this function can be interrupted by thread switching so let's
504            #use an atomic operation
505            l.append(None)
506
507        for i in range(n):
508            time.sleep(random.random()*0.02) #0.01 secs on average
509            #try submitting callback until successful.
510            #rely on regular interrupt to flush queue if we are
511            #unsuccessful.
512            while True:
513                if _testcapi._pending_threadfunc(callback):
514                    break;
515
516    def pendingcalls_wait(self, l, n, context = None):
517        #now, stick around until l[0] has grown to 10
518        count = 0;
519        while len(l) != n:
520            #this busy loop is where we expect to be interrupted to
521            #run our callbacks.  Note that callbacks are only run on the
522            #main thread
523            if False and support.verbose:
524                print("(%i)"%(len(l),),)
525            for i in range(1000):
526                a = i*i
527            if context and not context.event.is_set():
528                continue
529            count += 1
530            self.assertTrue(count < 10000,
531                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
532        if False and support.verbose:
533            print("(%i)"%(len(l),))
534
535    def test_pendingcalls_threaded(self):
536
537        #do every callback on a separate thread
538        n = 32 #total callbacks
539        threads = []
540        class foo(object):pass
541        context = foo()
542        context.l = []
543        context.n = 2 #submits per thread
544        context.nThreads = n // context.n
545        context.nFinished = 0
546        context.lock = threading.Lock()
547        context.event = threading.Event()
548
549        threads = [threading.Thread(target=self.pendingcalls_thread,
550                                    args=(context,))
551                   for i in range(context.nThreads)]
552        with support.start_threads(threads):
553            self.pendingcalls_wait(context.l, n, context)
554
555    def pendingcalls_thread(self, context):
556        try:
557            self.pendingcalls_submit(context.l, context.n)
558        finally:
559            with context.lock:
560                context.nFinished += 1
561                nFinished = context.nFinished
562                if False and support.verbose:
563                    print("finished threads: ", nFinished)
564            if nFinished == context.nThreads:
565                context.event.set()
566
567    def test_pendingcalls_non_threaded(self):
568        #again, just using the main thread, likely they will all be dispatched at
569        #once.  It is ok to ask for too many, because we loop until we find a slot.
570        #the loop can be interrupted to dispatch.
571        #there are only 32 dispatch slots, so we go for twice that!
572        l = []
573        n = 64
574        self.pendingcalls_submit(l, n)
575        self.pendingcalls_wait(l, n)
576
577
578class SubinterpreterTest(unittest.TestCase):
579
580    def test_subinterps(self):
581        import builtins
582        r, w = os.pipe()
583        code = """if 1:
584            import sys, builtins, pickle
585            with open({:d}, "wb") as f:
586                pickle.dump(id(sys.modules), f)
587                pickle.dump(id(builtins), f)
588            """.format(w)
589        with open(r, "rb") as f:
590            ret = support.run_in_subinterp(code)
591            self.assertEqual(ret, 0)
592            self.assertNotEqual(pickle.load(f), id(sys.modules))
593            self.assertNotEqual(pickle.load(f), id(builtins))
594
595    def test_subinterps_recent_language_features(self):
596        r, w = os.pipe()
597        code = """if 1:
598            import pickle
599            with open({:d}, "wb") as f:
600
601                def noop(x): return x
602
603                a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
604
605                async def foo(arg): return await arg  # Py 3.5
606
607                pickle.dump(dict(a=a, b=b), f)
608            """.format(w)
609
610        with open(r, "rb") as f:
611            ret = support.run_in_subinterp(code)
612            self.assertEqual(ret, 0)
613            self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
614
615    def test_mutate_exception(self):
616        """
617        Exceptions saved in global module state get shared between
618        individual module instances. This test checks whether or not
619        a change in one interpreter's module gets reflected into the
620        other ones.
621        """
622        import binascii
623
624        support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
625
626        self.assertFalse(hasattr(binascii.Error, "foobar"))
627
628
629class TestThreadState(unittest.TestCase):
630
631    @support.reap_threads
632    def test_thread_state(self):
633        # some extra thread-state tests driven via _testcapi
634        def target():
635            idents = []
636
637            def callback():
638                idents.append(threading.get_ident())
639
640            _testcapi._test_thread_state(callback)
641            a = b = callback
642            time.sleep(1)
643            # Check our main thread is in the list exactly 3 times.
644            self.assertEqual(idents.count(threading.get_ident()), 3,
645                             "Couldn't find main thread correctly in the list")
646
647        target()
648        t = threading.Thread(target=target)
649        t.start()
650        t.join()
651
652
653class Test_testcapi(unittest.TestCase):
654    locals().update((name, getattr(_testcapi, name))
655                    for name in dir(_testcapi)
656                    if name.startswith('test_') and not name.endswith('_code'))
657
658
659class PyMemDebugTests(unittest.TestCase):
660    PYTHONMALLOC = 'debug'
661    # '0x04c06e0' or '04C06E0'
662    PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
663
664    def check(self, code):
665        with support.SuppressCrashReport():
666            out = assert_python_failure('-c', code,
667                                        PYTHONMALLOC=self.PYTHONMALLOC)
668        stderr = out.err
669        return stderr.decode('ascii', 'replace')
670
671    def test_buffer_overflow(self):
672        out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
673        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
674                 r"    16 bytes originally requested\n"
675                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
676                 r"    The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
677                 r"        at tail\+0: 0x78 \*\*\* OUCH\n"
678                 r"        at tail\+1: 0xfd\n"
679                 r"        at tail\+2: 0xfd\n"
680                 r"        .*\n"
681                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
682                 r"    Data at p: cd cd cd .*\n"
683                 r"\n"
684                 r"Enable tracemalloc to get the memory block allocation traceback\n"
685                 r"\n"
686                 r"Fatal Python error: bad trailing pad byte")
687        regex = regex.format(ptr=self.PTR_REGEX)
688        regex = re.compile(regex, flags=re.DOTALL)
689        self.assertRegex(out, regex)
690
691    def test_api_misuse(self):
692        out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
693        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
694                 r"    16 bytes originally requested\n"
695                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
696                 r"    The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
697                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
698                 r"    Data at p: cd cd cd .*\n"
699                 r"\n"
700                 r"Enable tracemalloc to get the memory block allocation traceback\n"
701                 r"\n"
702                 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n")
703        regex = regex.format(ptr=self.PTR_REGEX)
704        self.assertRegex(out, regex)
705
706    def check_malloc_without_gil(self, code):
707        out = self.check(code)
708        expected = ('Fatal Python error: Python memory allocator called '
709                    'without holding the GIL')
710        self.assertIn(expected, out)
711
712    def test_pymem_malloc_without_gil(self):
713        # Debug hooks must raise an error if PyMem_Malloc() is called
714        # without holding the GIL
715        code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
716        self.check_malloc_without_gil(code)
717
718    def test_pyobject_malloc_without_gil(self):
719        # Debug hooks must raise an error if PyObject_Malloc() is called
720        # without holding the GIL
721        code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
722        self.check_malloc_without_gil(code)
723
724    def check_pyobject_is_freed(self, func_name):
725        code = textwrap.dedent(f'''
726            import gc, os, sys, _testcapi
727            # Disable the GC to avoid crash on GC collection
728            gc.disable()
729            try:
730                _testcapi.{func_name}()
731                # Exit immediately to avoid a crash while deallocating
732                # the invalid object
733                os._exit(0)
734            except _testcapi.error:
735                os._exit(1)
736        ''')
737        assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC)
738
739    def test_pyobject_null_is_freed(self):
740        self.check_pyobject_is_freed('check_pyobject_null_is_freed')
741
742    def test_pyobject_uninitialized_is_freed(self):
743        self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed')
744
745    def test_pyobject_forbidden_bytes_is_freed(self):
746        self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed')
747
748    def test_pyobject_freed_is_freed(self):
749        self.check_pyobject_is_freed('check_pyobject_freed_is_freed')
750
751
752class PyMemMallocDebugTests(PyMemDebugTests):
753    PYTHONMALLOC = 'malloc_debug'
754
755
756@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
757class PyMemPymallocDebugTests(PyMemDebugTests):
758    PYTHONMALLOC = 'pymalloc_debug'
759
760
761@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
762class PyMemDefaultTests(PyMemDebugTests):
763    # test default allocator of Python compiled in debug mode
764    PYTHONMALLOC = ''
765
766
767if __name__ == "__main__":
768    unittest.main()
769