1# Run the _testcapi module tests (tests for the Python/C API):  by defn,
2# these are all functions _testcapi exports whose name begins with 'test_'.
3
4from collections import OrderedDict
5import importlib.machinery
6import importlib.util
7import os
8import pickle
9import random
10import re
11import subprocess
12import sys
13import textwrap
14import threading
15import time
16import unittest
17import weakref
18from test import support
19from test.support import MISSING_C_DOCSTRINGS
20from test.support import import_helper
21from test.support import threading_helper
22from test.support import warnings_helper
23from test.support.script_helper import assert_python_failure, assert_python_ok
24try:
25    import _posixsubprocess
26except ImportError:
27    _posixsubprocess = None
28
29# Skip this test if the _testcapi module isn't available.
30_testcapi = import_helper.import_module('_testcapi')
31
32import _testinternalcapi
33
34# Were we compiled --with-pydebug or with #define Py_DEBUG?
35Py_DEBUG = hasattr(sys, 'gettotalrefcount')
36
37
38def decode_stderr(err):
39    return err.decode('utf-8', 'replace').replace('\r', '')
40
41
42def testfunction(self):
43    """some doc"""
44    return self
45
46
47class InstanceMethod:
48    id = _testcapi.instancemethod(id)
49    testfunction = _testcapi.instancemethod(testfunction)
50
51class CAPITest(unittest.TestCase):
52
53    def test_instancemethod(self):
54        inst = InstanceMethod()
55        self.assertEqual(id(inst), inst.id())
56        self.assertTrue(inst.testfunction() is inst)
57        self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__)
58        self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__)
59
60        InstanceMethod.testfunction.attribute = "test"
61        self.assertEqual(testfunction.attribute, "test")
62        self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test")
63
64    def test_no_FatalError_infinite_loop(self):
65        with support.SuppressCrashReport():
66            p = subprocess.Popen([sys.executable, "-c",
67                                  'import _testcapi;'
68                                  '_testcapi.crash_no_current_thread()'],
69                                 stdout=subprocess.PIPE,
70                                 stderr=subprocess.PIPE)
71        (out, err) = p.communicate()
72        self.assertEqual(out, b'')
73        # This used to cause an infinite loop.
74        self.assertTrue(err.rstrip().startswith(
75                         b'Fatal Python error: '
76                         b'PyThreadState_Get: '
77                         b'the function must be called with the GIL held, '
78                         b'but the GIL is released '
79                         b'(the current Python thread state is NULL)'),
80                        err)
81
82    def test_memoryview_from_NULL_pointer(self):
83        self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer)
84
85    def test_exc_info(self):
86        raised_exception = ValueError("5")
87        new_exc = TypeError("TEST")
88        try:
89            raise raised_exception
90        except ValueError as e:
91            tb = e.__traceback__
92            orig_sys_exc_info = sys.exc_info()
93            orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None)
94            new_sys_exc_info = sys.exc_info()
95            new_exc_info = _testcapi.set_exc_info(*orig_exc_info)
96            reset_sys_exc_info = sys.exc_info()
97
98            self.assertEqual(orig_exc_info[1], e)
99
100            self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb))
101            self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info)
102            self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info)
103            self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None))
104            self.assertSequenceEqual(new_sys_exc_info, new_exc_info)
105        else:
106            self.assertTrue(False)
107
108    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
109    def test_seq_bytes_to_charp_array(self):
110        # Issue #15732: crash in _PySequence_BytesToCharpArray()
111        class Z(object):
112            def __len__(self):
113                return 1
114        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
115                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
116        # Issue #15736: overflow in _PySequence_BytesToCharpArray()
117        class Z(object):
118            def __len__(self):
119                return sys.maxsize
120            def __getitem__(self, i):
121                return b'x'
122        self.assertRaises(MemoryError, _posixsubprocess.fork_exec,
123                          1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
124
125    @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.')
126    def test_subprocess_fork_exec(self):
127        class Z(object):
128            def __len__(self):
129                return 1
130
131        # Issue #15738: crash in subprocess_fork_exec()
132        self.assertRaises(TypeError, _posixsubprocess.fork_exec,
133                          Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21)
134
135    @unittest.skipIf(MISSING_C_DOCSTRINGS,
136                     "Signature information for builtins requires docstrings")
137    def test_docstring_signature_parsing(self):
138
139        self.assertEqual(_testcapi.no_docstring.__doc__, None)
140        self.assertEqual(_testcapi.no_docstring.__text_signature__, None)
141
142        self.assertEqual(_testcapi.docstring_empty.__doc__, None)
143        self.assertEqual(_testcapi.docstring_empty.__text_signature__, None)
144
145        self.assertEqual(_testcapi.docstring_no_signature.__doc__,
146            "This docstring has no signature.")
147        self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None)
148
149        self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__,
150            "docstring_with_invalid_signature($module, /, boo)\n"
151            "\n"
152            "This docstring has an invalid signature."
153            )
154        self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None)
155
156        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__,
157            "docstring_with_invalid_signature2($module, /, boo)\n"
158            "\n"
159            "--\n"
160            "\n"
161            "This docstring also has an invalid signature."
162            )
163        self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None)
164
165        self.assertEqual(_testcapi.docstring_with_signature.__doc__,
166            "This docstring has a valid signature.")
167        self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)")
168
169        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None)
170        self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__,
171            "($module, /, sig)")
172
173        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__,
174            "\nThis docstring has a valid signature and some extra newlines.")
175        self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__,
176            "($module, /, parameter)")
177
178    def test_c_type_with_matrix_multiplication(self):
179        M = _testcapi.matmulType
180        m1 = M()
181        m2 = M()
182        self.assertEqual(m1 @ m2, ("matmul", m1, m2))
183        self.assertEqual(m1 @ 42, ("matmul", m1, 42))
184        self.assertEqual(42 @ m1, ("matmul", 42, m1))
185        o = m1
186        o @= m2
187        self.assertEqual(o, ("imatmul", m1, m2))
188        o = m1
189        o @= 42
190        self.assertEqual(o, ("imatmul", m1, 42))
191        o = 42
192        o @= m1
193        self.assertEqual(o, ("matmul", 42, m1))
194
195    def test_c_type_with_ipow(self):
196        # When the __ipow__ method of a type was implemented in C, using the
197        # modulo param would cause segfaults.
198        o = _testcapi.ipowType()
199        self.assertEqual(o.__ipow__(1), (1, None))
200        self.assertEqual(o.__ipow__(2, 2), (2, 2))
201
202    def test_return_null_without_error(self):
203        # Issue #23571: A function must not return NULL without setting an
204        # error
205        if Py_DEBUG:
206            code = textwrap.dedent("""
207                import _testcapi
208                from test import support
209
210                with support.SuppressCrashReport():
211                    _testcapi.return_null_without_error()
212            """)
213            rc, out, err = assert_python_failure('-c', code)
214            err = decode_stderr(err)
215            self.assertRegex(err,
216                r'Fatal Python error: _Py_CheckFunctionResult: '
217                    r'a function returned NULL without setting an exception\n'
218                r'Python runtime state: initialized\n'
219                r'SystemError: <built-in function return_null_without_error> '
220                    r'returned NULL without setting an exception\n'
221                r'\n'
222                r'Current thread.*:\n'
223                r'  File .*", line 6 in <module>\n')
224        else:
225            with self.assertRaises(SystemError) as cm:
226                _testcapi.return_null_without_error()
227            self.assertRegex(str(cm.exception),
228                             'return_null_without_error.* '
229                             'returned NULL without setting an exception')
230
231    def test_return_result_with_error(self):
232        # Issue #23571: A function must not return a result with an error set
233        if Py_DEBUG:
234            code = textwrap.dedent("""
235                import _testcapi
236                from test import support
237
238                with support.SuppressCrashReport():
239                    _testcapi.return_result_with_error()
240            """)
241            rc, out, err = assert_python_failure('-c', code)
242            err = decode_stderr(err)
243            self.assertRegex(err,
244                    r'Fatal Python error: _Py_CheckFunctionResult: '
245                        r'a function returned a result with an exception set\n'
246                    r'Python runtime state: initialized\n'
247                    r'ValueError\n'
248                    r'\n'
249                    r'The above exception was the direct cause '
250                        r'of the following exception:\n'
251                    r'\n'
252                    r'SystemError: <built-in '
253                        r'function return_result_with_error> '
254                        r'returned a result with an exception set\n'
255                    r'\n'
256                    r'Current thread.*:\n'
257                    r'  File .*, line 6 in <module>\n')
258        else:
259            with self.assertRaises(SystemError) as cm:
260                _testcapi.return_result_with_error()
261            self.assertRegex(str(cm.exception),
262                             'return_result_with_error.* '
263                             'returned a result with an exception set')
264
265    def test_getitem_with_error(self):
266        # Test _Py_CheckSlotResult(). Raise an exception and then calls
267        # PyObject_GetItem(): check that the assertion catches the bug.
268        # PyObject_GetItem() must not be called with an exception set.
269        code = textwrap.dedent("""
270            import _testcapi
271            from test import support
272
273            with support.SuppressCrashReport():
274                _testcapi.getitem_with_error({1: 2}, 1)
275        """)
276        rc, out, err = assert_python_failure('-c', code)
277        err = decode_stderr(err)
278        if 'SystemError: ' not in err:
279            self.assertRegex(err,
280                    r'Fatal Python error: _Py_CheckSlotResult: '
281                        r'Slot __getitem__ of type dict succeeded '
282                        r'with an exception set\n'
283                    r'Python runtime state: initialized\n'
284                    r'ValueError: bug\n'
285                    r'\n'
286                    r'Current thread .* \(most recent call first\):\n'
287                    r'  File .*, line 6 in <module>\n'
288                    r'\n'
289                    r'Extension modules: _testcapi \(total: 1\)\n')
290        else:
291            # Python built with NDEBUG macro defined:
292            # test _Py_CheckFunctionResult() instead.
293            self.assertIn('returned a result with an exception set', err)
294
295    def test_buildvalue_N(self):
296        _testcapi.test_buildvalue_N()
297
298    def test_set_nomemory(self):
299        code = """if 1:
300            import _testcapi
301
302            class C(): pass
303
304            # The first loop tests both functions and that remove_mem_hooks()
305            # can be called twice in a row. The second loop checks a call to
306            # set_nomemory() after a call to remove_mem_hooks(). The third
307            # loop checks the start and stop arguments of set_nomemory().
308            for outer_cnt in range(1, 4):
309                start = 10 * outer_cnt
310                for j in range(100):
311                    if j == 0:
312                        if outer_cnt != 3:
313                            _testcapi.set_nomemory(start)
314                        else:
315                            _testcapi.set_nomemory(start, start + 1)
316                    try:
317                        C()
318                    except MemoryError as e:
319                        if outer_cnt != 3:
320                            _testcapi.remove_mem_hooks()
321                        print('MemoryError', outer_cnt, j)
322                        _testcapi.remove_mem_hooks()
323                        break
324        """
325        rc, out, err = assert_python_ok('-c', code)
326        lines = out.splitlines()
327        for i, line in enumerate(lines, 1):
328            self.assertIn(b'MemoryError', out)
329            *_, count = line.split(b' ')
330            count = int(count)
331            self.assertLessEqual(count, i*5)
332            self.assertGreaterEqual(count, i*5-1)
333
334    def test_mapping_keys_values_items(self):
335        class Mapping1(dict):
336            def keys(self):
337                return list(super().keys())
338            def values(self):
339                return list(super().values())
340            def items(self):
341                return list(super().items())
342        class Mapping2(dict):
343            def keys(self):
344                return tuple(super().keys())
345            def values(self):
346                return tuple(super().values())
347            def items(self):
348                return tuple(super().items())
349        dict_obj = {'foo': 1, 'bar': 2, 'spam': 3}
350
351        for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(),
352                        dict_obj, OrderedDict(dict_obj),
353                        Mapping1(dict_obj), Mapping2(dict_obj)]:
354            self.assertListEqual(_testcapi.get_mapping_keys(mapping),
355                                 list(mapping.keys()))
356            self.assertListEqual(_testcapi.get_mapping_values(mapping),
357                                 list(mapping.values()))
358            self.assertListEqual(_testcapi.get_mapping_items(mapping),
359                                 list(mapping.items()))
360
361    def test_mapping_keys_values_items_bad_arg(self):
362        self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None)
363        self.assertRaises(AttributeError, _testcapi.get_mapping_values, None)
364        self.assertRaises(AttributeError, _testcapi.get_mapping_items, None)
365
366        class BadMapping:
367            def keys(self):
368                return None
369            def values(self):
370                return None
371            def items(self):
372                return None
373        bad_mapping = BadMapping()
374        self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping)
375        self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping)
376        self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping)
377
378    @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'),
379                         'need _testcapi.negative_refcount')
380    def test_negative_refcount(self):
381        # bpo-35059: Check that Py_DECREF() reports the correct filename
382        # when calling _Py_NegativeRefcount() to abort Python.
383        code = textwrap.dedent("""
384            import _testcapi
385            from test import support
386
387            with support.SuppressCrashReport():
388                _testcapi.negative_refcount()
389        """)
390        rc, out, err = assert_python_failure('-c', code)
391        self.assertRegex(err,
392                         br'_testcapimodule\.c:[0-9]+: '
393                         br'_Py_NegativeRefcount: Assertion failed: '
394                         br'object has negative ref count')
395
396    def test_trashcan_subclass(self):
397        # bpo-35983: Check that the trashcan mechanism for "list" is NOT
398        # activated when its tp_dealloc is being called by a subclass
399        from _testcapi import MyList
400        L = None
401        for i in range(1000):
402            L = MyList((L,))
403
404    @support.requires_resource('cpu')
405    def test_trashcan_python_class1(self):
406        self.do_test_trashcan_python_class(list)
407
408    @support.requires_resource('cpu')
409    def test_trashcan_python_class2(self):
410        from _testcapi import MyList
411        self.do_test_trashcan_python_class(MyList)
412
413    def do_test_trashcan_python_class(self, base):
414        # Check that the trashcan mechanism works properly for a Python
415        # subclass of a class using the trashcan (this specific test assumes
416        # that the base class "base" behaves like list)
417        class PyList(base):
418            # Count the number of PyList instances to verify that there is
419            # no memory leak
420            num = 0
421            def __init__(self, *args):
422                __class__.num += 1
423                super().__init__(*args)
424            def __del__(self):
425                __class__.num -= 1
426
427        for parity in (0, 1):
428            L = None
429            # We need in the order of 2**20 iterations here such that a
430            # typical 8MB stack would overflow without the trashcan.
431            for i in range(2**20):
432                L = PyList((L,))
433                L.attr = i
434            if parity:
435                # Add one additional nesting layer
436                L = (L,)
437            self.assertGreater(PyList.num, 0)
438            del L
439            self.assertEqual(PyList.num, 0)
440
441    def test_heap_ctype_doc_and_text_signature(self):
442        self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc")
443        self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)")
444
445    def test_null_type_doc(self):
446        self.assertEqual(_testcapi.NullTpDocType.__doc__, None)
447
448    def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self):
449        class HeapGcCTypeSubclass(_testcapi.HeapGcCType):
450            def __init__(self):
451                self.value2 = 20
452                super().__init__()
453
454        subclass_instance = HeapGcCTypeSubclass()
455        type_refcnt = sys.getrefcount(HeapGcCTypeSubclass)
456
457        # Test that subclass instance was fully created
458        self.assertEqual(subclass_instance.value, 10)
459        self.assertEqual(subclass_instance.value2, 20)
460
461        # Test that the type reference count is only decremented once
462        del subclass_instance
463        self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass))
464
465    def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
466        class A(_testcapi.HeapGcCType):
467            def __init__(self):
468                self.value2 = 20
469                super().__init__()
470
471        class B(A):
472            def __init__(self):
473                super().__init__()
474
475            def __del__(self):
476                self.__class__ = A
477                A.refcnt_in_del = sys.getrefcount(A)
478                B.refcnt_in_del = sys.getrefcount(B)
479
480        subclass_instance = B()
481        type_refcnt = sys.getrefcount(B)
482        new_type_refcnt = sys.getrefcount(A)
483
484        # Test that subclass instance was fully created
485        self.assertEqual(subclass_instance.value, 10)
486        self.assertEqual(subclass_instance.value2, 20)
487
488        del subclass_instance
489
490        # Test that setting __class__ modified the reference counts of the types
491        self.assertEqual(type_refcnt - 1, B.refcnt_in_del)
492        self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del)
493
494        # Test that the original type already has decreased its refcnt
495        self.assertEqual(type_refcnt - 1, sys.getrefcount(B))
496
497        # Test that subtype_dealloc decref the newly assigned __class__ only once
498        self.assertEqual(new_type_refcnt, sys.getrefcount(A))
499
500    def test_heaptype_with_dict(self):
501        inst = _testcapi.HeapCTypeWithDict()
502        inst.foo = 42
503        self.assertEqual(inst.foo, 42)
504        self.assertEqual(inst.dictobj, inst.__dict__)
505        self.assertEqual(inst.dictobj, {"foo": 42})
506
507        inst = _testcapi.HeapCTypeWithDict()
508        self.assertEqual({}, inst.__dict__)
509
510    def test_heaptype_with_negative_dict(self):
511        inst = _testcapi.HeapCTypeWithNegativeDict()
512        inst.foo = 42
513        self.assertEqual(inst.foo, 42)
514        self.assertEqual(inst.dictobj, inst.__dict__)
515        self.assertEqual(inst.dictobj, {"foo": 42})
516
517        inst = _testcapi.HeapCTypeWithNegativeDict()
518        self.assertEqual({}, inst.__dict__)
519
520    def test_heaptype_with_weakref(self):
521        inst = _testcapi.HeapCTypeWithWeakref()
522        ref = weakref.ref(inst)
523        self.assertEqual(ref(), inst)
524        self.assertEqual(inst.weakreflist, ref)
525
526    def test_heaptype_with_buffer(self):
527        inst = _testcapi.HeapCTypeWithBuffer()
528        b = bytes(inst)
529        self.assertEqual(b, b"1234")
530
531    def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self):
532        subclass_instance = _testcapi.HeapCTypeSubclass()
533        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
534
535        # Test that subclass instance was fully created
536        self.assertEqual(subclass_instance.value, 10)
537        self.assertEqual(subclass_instance.value2, 20)
538
539        # Test that the type reference count is only decremented once
540        del subclass_instance
541        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass))
542
543    def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self):
544        subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer()
545        type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)
546        new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass)
547
548        # Test that subclass instance was fully created
549        self.assertEqual(subclass_instance.value, 10)
550        self.assertEqual(subclass_instance.value2, 20)
551
552        # The tp_finalize slot will set __class__ to HeapCTypeSubclass
553        del subclass_instance
554
555        # Test that setting __class__ modified the reference counts of the types
556        self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del)
557        self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del)
558
559        # Test that the original type already has decreased its refcnt
560        self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer))
561
562        # Test that subtype_dealloc decref the newly assigned __class__ only once
563        self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass))
564
565    def test_heaptype_with_setattro(self):
566        obj = _testcapi.HeapCTypeSetattr()
567        self.assertEqual(obj.pvalue, 10)
568        obj.value = 12
569        self.assertEqual(obj.pvalue, 12)
570        del obj.value
571        self.assertEqual(obj.pvalue, 0)
572
573    def test_pynumber_tobase(self):
574        from _testcapi import pynumber_tobase
575        self.assertEqual(pynumber_tobase(123, 2), '0b1111011')
576        self.assertEqual(pynumber_tobase(123, 8), '0o173')
577        self.assertEqual(pynumber_tobase(123, 10), '123')
578        self.assertEqual(pynumber_tobase(123, 16), '0x7b')
579        self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011')
580        self.assertEqual(pynumber_tobase(-123, 8), '-0o173')
581        self.assertEqual(pynumber_tobase(-123, 10), '-123')
582        self.assertEqual(pynumber_tobase(-123, 16), '-0x7b')
583        self.assertRaises(TypeError, pynumber_tobase, 123.0, 10)
584        self.assertRaises(TypeError, pynumber_tobase, '123', 10)
585        self.assertRaises(SystemError, pynumber_tobase, 123, 0)
586
587    def check_fatal_error(self, code, expected, not_expected=()):
588        with support.SuppressCrashReport():
589            rc, out, err = assert_python_failure('-sSI', '-c', code)
590
591        err = decode_stderr(err)
592        self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n',
593                      err)
594
595        match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$',
596                          err, re.MULTILINE)
597        if not match:
598            self.fail(f"Cannot find 'Extension modules:' in {err!r}")
599        modules = set(match.group(1).strip().split(', '))
600        total = int(match.group(2))
601
602        for name in expected:
603            self.assertIn(name, modules)
604        for name in not_expected:
605            self.assertNotIn(name, modules)
606        self.assertEqual(len(modules), total)
607
608    def test_fatal_error(self):
609        # By default, stdlib extension modules are ignored,
610        # but not test modules.
611        expected = ('_testcapi',)
612        not_expected = ('sys',)
613        code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")'
614        self.check_fatal_error(code, expected, not_expected)
615
616        # Mark _testcapi as stdlib module, but not sys
617        expected = ('sys',)
618        not_expected = ('_testcapi',)
619        code = textwrap.dedent('''
620            import _testcapi, sys
621            sys.stdlib_module_names = frozenset({"_testcapi"})
622            _testcapi.fatal_error(b"MESSAGE")
623        ''')
624        self.check_fatal_error(code, expected)
625
626    def test_pyobject_repr_from_null(self):
627        s = _testcapi.pyobject_repr_from_null()
628        self.assertEqual(s, '<NULL>')
629
630    def test_pyobject_str_from_null(self):
631        s = _testcapi.pyobject_str_from_null()
632        self.assertEqual(s, '<NULL>')
633
634    def test_pyobject_bytes_from_null(self):
635        s = _testcapi.pyobject_bytes_from_null()
636        self.assertEqual(s, b'<NULL>')
637
638    def test_Py_CompileString(self):
639        # Check that Py_CompileString respects the coding cookie
640        _compile = _testcapi.Py_CompileString
641        code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n"
642        result = _compile(code)
643        expected = compile(code, "<string>", "exec")
644        self.assertEqual(result.co_consts, expected.co_consts)
645
646
647class TestPendingCalls(unittest.TestCase):
648
649    def pendingcalls_submit(self, l, n):
650        def callback():
651            #this function can be interrupted by thread switching so let's
652            #use an atomic operation
653            l.append(None)
654
655        for i in range(n):
656            time.sleep(random.random()*0.02) #0.01 secs on average
657            #try submitting callback until successful.
658            #rely on regular interrupt to flush queue if we are
659            #unsuccessful.
660            while True:
661                if _testcapi._pending_threadfunc(callback):
662                    break
663
664    def pendingcalls_wait(self, l, n, context = None):
665        #now, stick around until l[0] has grown to 10
666        count = 0
667        while len(l) != n:
668            #this busy loop is where we expect to be interrupted to
669            #run our callbacks.  Note that callbacks are only run on the
670            #main thread
671            if False and support.verbose:
672                print("(%i)"%(len(l),),)
673            for i in range(1000):
674                a = i*i
675            if context and not context.event.is_set():
676                continue
677            count += 1
678            self.assertTrue(count < 10000,
679                "timeout waiting for %i callbacks, got %i"%(n, len(l)))
680        if False and support.verbose:
681            print("(%i)"%(len(l),))
682
683    def test_pendingcalls_threaded(self):
684
685        #do every callback on a separate thread
686        n = 32 #total callbacks
687        threads = []
688        class foo(object):pass
689        context = foo()
690        context.l = []
691        context.n = 2 #submits per thread
692        context.nThreads = n // context.n
693        context.nFinished = 0
694        context.lock = threading.Lock()
695        context.event = threading.Event()
696
697        threads = [threading.Thread(target=self.pendingcalls_thread,
698                                    args=(context,))
699                   for i in range(context.nThreads)]
700        with threading_helper.start_threads(threads):
701            self.pendingcalls_wait(context.l, n, context)
702
703    def pendingcalls_thread(self, context):
704        try:
705            self.pendingcalls_submit(context.l, context.n)
706        finally:
707            with context.lock:
708                context.nFinished += 1
709                nFinished = context.nFinished
710                if False and support.verbose:
711                    print("finished threads: ", nFinished)
712            if nFinished == context.nThreads:
713                context.event.set()
714
715    def test_pendingcalls_non_threaded(self):
716        #again, just using the main thread, likely they will all be dispatched at
717        #once.  It is ok to ask for too many, because we loop until we find a slot.
718        #the loop can be interrupted to dispatch.
719        #there are only 32 dispatch slots, so we go for twice that!
720        l = []
721        n = 64
722        self.pendingcalls_submit(l, n)
723        self.pendingcalls_wait(l, n)
724
725
726class SubinterpreterTest(unittest.TestCase):
727
728    def test_subinterps(self):
729        import builtins
730        r, w = os.pipe()
731        code = """if 1:
732            import sys, builtins, pickle
733            with open({:d}, "wb") as f:
734                pickle.dump(id(sys.modules), f)
735                pickle.dump(id(builtins), f)
736            """.format(w)
737        with open(r, "rb") as f:
738            ret = support.run_in_subinterp(code)
739            self.assertEqual(ret, 0)
740            self.assertNotEqual(pickle.load(f), id(sys.modules))
741            self.assertNotEqual(pickle.load(f), id(builtins))
742
743    def test_subinterps_recent_language_features(self):
744        r, w = os.pipe()
745        code = """if 1:
746            import pickle
747            with open({:d}, "wb") as f:
748
749                @(lambda x:x)  # Py 3.9
750                def noop(x): return x
751
752                a = (b := f'1{{2}}3') + noop('x')  # Py 3.8 (:=) / 3.6 (f'')
753
754                async def foo(arg): return await arg  # Py 3.5
755
756                pickle.dump(dict(a=a, b=b), f)
757            """.format(w)
758
759        with open(r, "rb") as f:
760            ret = support.run_in_subinterp(code)
761            self.assertEqual(ret, 0)
762            self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'})
763
764    def test_mutate_exception(self):
765        """
766        Exceptions saved in global module state get shared between
767        individual module instances. This test checks whether or not
768        a change in one interpreter's module gets reflected into the
769        other ones.
770        """
771        import binascii
772
773        support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'")
774
775        self.assertFalse(hasattr(binascii.Error, "foobar"))
776
777    def test_module_state_shared_in_global(self):
778        """
779        bpo-44050: Extension module state should be shared between interpreters
780        when it doesn't support sub-interpreters.
781        """
782        r, w = os.pipe()
783        self.addCleanup(os.close, r)
784        self.addCleanup(os.close, w)
785
786        script = textwrap.dedent(f"""
787            import importlib.machinery
788            import importlib.util
789            import os
790
791            fullname = '_test_module_state_shared'
792            origin = importlib.util.find_spec('_testmultiphase').origin
793            loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
794            spec = importlib.util.spec_from_loader(fullname, loader)
795            module = importlib.util.module_from_spec(spec)
796            attr_id = str(id(module.Error)).encode()
797
798            os.write({w}, attr_id)
799            """)
800        exec(script)
801        main_attr_id = os.read(r, 100)
802
803        ret = support.run_in_subinterp(script)
804        self.assertEqual(ret, 0)
805        subinterp_attr_id = os.read(r, 100)
806        self.assertEqual(main_attr_id, subinterp_attr_id)
807
808
809class TestThreadState(unittest.TestCase):
810
811    @threading_helper.reap_threads
812    def test_thread_state(self):
813        # some extra thread-state tests driven via _testcapi
814        def target():
815            idents = []
816
817            def callback():
818                idents.append(threading.get_ident())
819
820            _testcapi._test_thread_state(callback)
821            a = b = callback
822            time.sleep(1)
823            # Check our main thread is in the list exactly 3 times.
824            self.assertEqual(idents.count(threading.get_ident()), 3,
825                             "Couldn't find main thread correctly in the list")
826
827        target()
828        t = threading.Thread(target=target)
829        t.start()
830        t.join()
831
832
833class Test_testcapi(unittest.TestCase):
834    locals().update((name, getattr(_testcapi, name))
835                    for name in dir(_testcapi)
836                    if name.startswith('test_') and not name.endswith('_code'))
837
838    # Suppress warning from PyUnicode_FromUnicode().
839    @warnings_helper.ignore_warnings(category=DeprecationWarning)
840    def test_widechar(self):
841        _testcapi.test_widechar()
842
843
844class Test_testinternalcapi(unittest.TestCase):
845    locals().update((name, getattr(_testinternalcapi, name))
846                    for name in dir(_testinternalcapi)
847                    if name.startswith('test_'))
848
849
850class PyMemDebugTests(unittest.TestCase):
851    PYTHONMALLOC = 'debug'
852    # '0x04c06e0' or '04C06E0'
853    PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+'
854
855    def check(self, code):
856        with support.SuppressCrashReport():
857            out = assert_python_failure('-c', code,
858                                        PYTHONMALLOC=self.PYTHONMALLOC)
859        stderr = out.err
860        return stderr.decode('ascii', 'replace')
861
862    def test_buffer_overflow(self):
863        out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()')
864        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
865                 r"    16 bytes originally requested\n"
866                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
867                 r"    The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n"
868                 r"        at tail\+0: 0x78 \*\*\* OUCH\n"
869                 r"        at tail\+1: 0xfd\n"
870                 r"        at tail\+2: 0xfd\n"
871                 r"        .*\n"
872                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
873                 r"    Data at p: cd cd cd .*\n"
874                 r"\n"
875                 r"Enable tracemalloc to get the memory block allocation traceback\n"
876                 r"\n"
877                 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte")
878        regex = regex.format(ptr=self.PTR_REGEX)
879        regex = re.compile(regex, flags=re.DOTALL)
880        self.assertRegex(out, regex)
881
882    def test_api_misuse(self):
883        out = self.check('import _testcapi; _testcapi.pymem_api_misuse()')
884        regex = (r"Debug memory block at address p={ptr}: API 'm'\n"
885                 r"    16 bytes originally requested\n"
886                 r"    The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n"
887                 r"    The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n"
888                 r"(    The block was made by call #[0-9]+ to debug malloc/realloc.\n)?"
889                 r"    Data at p: cd cd cd .*\n"
890                 r"\n"
891                 r"Enable tracemalloc to get the memory block allocation traceback\n"
892                 r"\n"
893                 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n")
894        regex = regex.format(ptr=self.PTR_REGEX)
895        self.assertRegex(out, regex)
896
897    def check_malloc_without_gil(self, code):
898        out = self.check(code)
899        expected = ('Fatal Python error: _PyMem_DebugMalloc: '
900                    'Python memory allocator called without holding the GIL')
901        self.assertIn(expected, out)
902
903    def test_pymem_malloc_without_gil(self):
904        # Debug hooks must raise an error if PyMem_Malloc() is called
905        # without holding the GIL
906        code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()'
907        self.check_malloc_without_gil(code)
908
909    def test_pyobject_malloc_without_gil(self):
910        # Debug hooks must raise an error if PyObject_Malloc() is called
911        # without holding the GIL
912        code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()'
913        self.check_malloc_without_gil(code)
914
915    def check_pyobject_is_freed(self, func_name):
916        code = textwrap.dedent(f'''
917            import gc, os, sys, _testcapi
918            # Disable the GC to avoid crash on GC collection
919            gc.disable()
920            try:
921                _testcapi.{func_name}()
922                # Exit immediately to avoid a crash while deallocating
923                # the invalid object
924                os._exit(0)
925            except _testcapi.error:
926                os._exit(1)
927        ''')
928        assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC)
929
930    def test_pyobject_null_is_freed(self):
931        self.check_pyobject_is_freed('check_pyobject_null_is_freed')
932
933    def test_pyobject_uninitialized_is_freed(self):
934        self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed')
935
936    def test_pyobject_forbidden_bytes_is_freed(self):
937        self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed')
938
939    def test_pyobject_freed_is_freed(self):
940        self.check_pyobject_is_freed('check_pyobject_freed_is_freed')
941
942
943class PyMemMallocDebugTests(PyMemDebugTests):
944    PYTHONMALLOC = 'malloc_debug'
945
946
947@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc')
948class PyMemPymallocDebugTests(PyMemDebugTests):
949    PYTHONMALLOC = 'pymalloc_debug'
950
951
952@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG')
953class PyMemDefaultTests(PyMemDebugTests):
954    # test default allocator of Python compiled in debug mode
955    PYTHONMALLOC = ''
956
957
958class Test_ModuleStateAccess(unittest.TestCase):
959    """Test access to module start (PEP 573)"""
960
961    # The C part of the tests lives in _testmultiphase, in a module called
962    # _testmultiphase_meth_state_access.
963    # This module has multi-phase initialization, unlike _testcapi.
964
965    def setUp(self):
966        fullname = '_testmultiphase_meth_state_access'  # XXX
967        origin = importlib.util.find_spec('_testmultiphase').origin
968        loader = importlib.machinery.ExtensionFileLoader(fullname, origin)
969        spec = importlib.util.spec_from_loader(fullname, loader)
970        module = importlib.util.module_from_spec(spec)
971        loader.exec_module(module)
972        self.module = module
973
974    def test_subclass_get_module(self):
975        """PyType_GetModule for defining_class"""
976        class StateAccessType_Subclass(self.module.StateAccessType):
977            pass
978
979        instance = StateAccessType_Subclass()
980        self.assertIs(instance.get_defining_module(), self.module)
981
982    def test_subclass_get_module_with_super(self):
983        class StateAccessType_Subclass(self.module.StateAccessType):
984            def get_defining_module(self):
985                return super().get_defining_module()
986
987        instance = StateAccessType_Subclass()
988        self.assertIs(instance.get_defining_module(), self.module)
989
990    def test_state_access(self):
991        """Checks methods defined with and without argument clinic
992
993        This tests a no-arg method (get_count) and a method with
994        both a positional and keyword argument.
995        """
996
997        a = self.module.StateAccessType()
998        b = self.module.StateAccessType()
999
1000        methods = {
1001            'clinic': a.increment_count_clinic,
1002            'noclinic': a.increment_count_noclinic,
1003        }
1004
1005        for name, increment_count in methods.items():
1006            with self.subTest(name):
1007                self.assertEqual(a.get_count(), b.get_count())
1008                self.assertEqual(a.get_count(), 0)
1009
1010                increment_count()
1011                self.assertEqual(a.get_count(), b.get_count())
1012                self.assertEqual(a.get_count(), 1)
1013
1014                increment_count(3)
1015                self.assertEqual(a.get_count(), b.get_count())
1016                self.assertEqual(a.get_count(), 4)
1017
1018                increment_count(-2, twice=True)
1019                self.assertEqual(a.get_count(), b.get_count())
1020                self.assertEqual(a.get_count(), 0)
1021
1022                with self.assertRaises(TypeError):
1023                    increment_count(thrice=3)
1024
1025                with self.assertRaises(TypeError):
1026                    increment_count(1, 2, 3)
1027
1028
1029if __name__ == "__main__":
1030    unittest.main()
1031