1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import importlib.machinery 6import importlib.util 7import os 8import pickle 9import random 10import re 11import subprocess 12import sys 13import textwrap 14import threading 15import time 16import unittest 17import weakref 18from test import support 19from test.support import MISSING_C_DOCSTRINGS 20from test.support import import_helper 21from test.support import threading_helper 22from test.support import warnings_helper 23from test.support.script_helper import assert_python_failure, assert_python_ok 24try: 25 import _posixsubprocess 26except ImportError: 27 _posixsubprocess = None 28 29# Skip this test if the _testcapi module isn't available. 30_testcapi = import_helper.import_module('_testcapi') 31 32import _testinternalcapi 33 34# Were we compiled --with-pydebug or with #define Py_DEBUG? 35Py_DEBUG = hasattr(sys, 'gettotalrefcount') 36 37 38def decode_stderr(err): 39 return err.decode('utf-8', 'replace').replace('\r', '') 40 41 42def testfunction(self): 43 """some doc""" 44 return self 45 46 47class InstanceMethod: 48 id = _testcapi.instancemethod(id) 49 testfunction = _testcapi.instancemethod(testfunction) 50 51class CAPITest(unittest.TestCase): 52 53 def test_instancemethod(self): 54 inst = InstanceMethod() 55 self.assertEqual(id(inst), inst.id()) 56 self.assertTrue(inst.testfunction() is inst) 57 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 58 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 59 60 InstanceMethod.testfunction.attribute = "test" 61 self.assertEqual(testfunction.attribute, "test") 62 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 63 64 def test_no_FatalError_infinite_loop(self): 65 with support.SuppressCrashReport(): 66 p = subprocess.Popen([sys.executable, "-c", 67 'import _testcapi;' 68 '_testcapi.crash_no_current_thread()'], 69 stdout=subprocess.PIPE, 70 stderr=subprocess.PIPE) 71 (out, err) = p.communicate() 72 self.assertEqual(out, b'') 73 # This used to cause an infinite loop. 74 self.assertTrue(err.rstrip().startswith( 75 b'Fatal Python error: ' 76 b'PyThreadState_Get: ' 77 b'the function must be called with the GIL held, ' 78 b'but the GIL is released ' 79 b'(the current Python thread state is NULL)'), 80 err) 81 82 def test_memoryview_from_NULL_pointer(self): 83 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 84 85 def test_exc_info(self): 86 raised_exception = ValueError("5") 87 new_exc = TypeError("TEST") 88 try: 89 raise raised_exception 90 except ValueError as e: 91 tb = e.__traceback__ 92 orig_sys_exc_info = sys.exc_info() 93 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 94 new_sys_exc_info = sys.exc_info() 95 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 96 reset_sys_exc_info = sys.exc_info() 97 98 self.assertEqual(orig_exc_info[1], e) 99 100 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 101 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 102 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 103 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 104 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 105 else: 106 self.assertTrue(False) 107 108 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 109 def test_seq_bytes_to_charp_array(self): 110 # Issue #15732: crash in _PySequence_BytesToCharpArray() 111 class Z(object): 112 def __len__(self): 113 return 1 114 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 115 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 116 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 117 class Z(object): 118 def __len__(self): 119 return sys.maxsize 120 def __getitem__(self, i): 121 return b'x' 122 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 123 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 124 125 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 126 def test_subprocess_fork_exec(self): 127 class Z(object): 128 def __len__(self): 129 return 1 130 131 # Issue #15738: crash in subprocess_fork_exec() 132 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 133 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17,18,19,20,21) 134 135 @unittest.skipIf(MISSING_C_DOCSTRINGS, 136 "Signature information for builtins requires docstrings") 137 def test_docstring_signature_parsing(self): 138 139 self.assertEqual(_testcapi.no_docstring.__doc__, None) 140 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 141 142 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 143 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 144 145 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 146 "This docstring has no signature.") 147 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 150 "docstring_with_invalid_signature($module, /, boo)\n" 151 "\n" 152 "This docstring has an invalid signature." 153 ) 154 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 155 156 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 157 "docstring_with_invalid_signature2($module, /, boo)\n" 158 "\n" 159 "--\n" 160 "\n" 161 "This docstring also has an invalid signature." 162 ) 163 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 164 165 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 166 "This docstring has a valid signature.") 167 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 168 169 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 170 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 171 "($module, /, sig)") 172 173 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 174 "\nThis docstring has a valid signature and some extra newlines.") 175 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 176 "($module, /, parameter)") 177 178 def test_c_type_with_matrix_multiplication(self): 179 M = _testcapi.matmulType 180 m1 = M() 181 m2 = M() 182 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 183 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 184 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 185 o = m1 186 o @= m2 187 self.assertEqual(o, ("imatmul", m1, m2)) 188 o = m1 189 o @= 42 190 self.assertEqual(o, ("imatmul", m1, 42)) 191 o = 42 192 o @= m1 193 self.assertEqual(o, ("matmul", 42, m1)) 194 195 def test_c_type_with_ipow(self): 196 # When the __ipow__ method of a type was implemented in C, using the 197 # modulo param would cause segfaults. 198 o = _testcapi.ipowType() 199 self.assertEqual(o.__ipow__(1), (1, None)) 200 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 201 202 def test_return_null_without_error(self): 203 # Issue #23571: A function must not return NULL without setting an 204 # error 205 if Py_DEBUG: 206 code = textwrap.dedent(""" 207 import _testcapi 208 from test import support 209 210 with support.SuppressCrashReport(): 211 _testcapi.return_null_without_error() 212 """) 213 rc, out, err = assert_python_failure('-c', code) 214 err = decode_stderr(err) 215 self.assertRegex(err, 216 r'Fatal Python error: _Py_CheckFunctionResult: ' 217 r'a function returned NULL without setting an exception\n' 218 r'Python runtime state: initialized\n' 219 r'SystemError: <built-in function return_null_without_error> ' 220 r'returned NULL without setting an exception\n' 221 r'\n' 222 r'Current thread.*:\n' 223 r' File .*", line 6 in <module>\n') 224 else: 225 with self.assertRaises(SystemError) as cm: 226 _testcapi.return_null_without_error() 227 self.assertRegex(str(cm.exception), 228 'return_null_without_error.* ' 229 'returned NULL without setting an exception') 230 231 def test_return_result_with_error(self): 232 # Issue #23571: A function must not return a result with an error set 233 if Py_DEBUG: 234 code = textwrap.dedent(""" 235 import _testcapi 236 from test import support 237 238 with support.SuppressCrashReport(): 239 _testcapi.return_result_with_error() 240 """) 241 rc, out, err = assert_python_failure('-c', code) 242 err = decode_stderr(err) 243 self.assertRegex(err, 244 r'Fatal Python error: _Py_CheckFunctionResult: ' 245 r'a function returned a result with an exception set\n' 246 r'Python runtime state: initialized\n' 247 r'ValueError\n' 248 r'\n' 249 r'The above exception was the direct cause ' 250 r'of the following exception:\n' 251 r'\n' 252 r'SystemError: <built-in ' 253 r'function return_result_with_error> ' 254 r'returned a result with an exception set\n' 255 r'\n' 256 r'Current thread.*:\n' 257 r' File .*, line 6 in <module>\n') 258 else: 259 with self.assertRaises(SystemError) as cm: 260 _testcapi.return_result_with_error() 261 self.assertRegex(str(cm.exception), 262 'return_result_with_error.* ' 263 'returned a result with an exception set') 264 265 def test_getitem_with_error(self): 266 # Test _Py_CheckSlotResult(). Raise an exception and then calls 267 # PyObject_GetItem(): check that the assertion catches the bug. 268 # PyObject_GetItem() must not be called with an exception set. 269 code = textwrap.dedent(""" 270 import _testcapi 271 from test import support 272 273 with support.SuppressCrashReport(): 274 _testcapi.getitem_with_error({1: 2}, 1) 275 """) 276 rc, out, err = assert_python_failure('-c', code) 277 err = decode_stderr(err) 278 if 'SystemError: ' not in err: 279 self.assertRegex(err, 280 r'Fatal Python error: _Py_CheckSlotResult: ' 281 r'Slot __getitem__ of type dict succeeded ' 282 r'with an exception set\n' 283 r'Python runtime state: initialized\n' 284 r'ValueError: bug\n' 285 r'\n' 286 r'Current thread .* \(most recent call first\):\n' 287 r' File .*, line 6 in <module>\n' 288 r'\n' 289 r'Extension modules: _testcapi \(total: 1\)\n') 290 else: 291 # Python built with NDEBUG macro defined: 292 # test _Py_CheckFunctionResult() instead. 293 self.assertIn('returned a result with an exception set', err) 294 295 def test_buildvalue_N(self): 296 _testcapi.test_buildvalue_N() 297 298 def test_set_nomemory(self): 299 code = """if 1: 300 import _testcapi 301 302 class C(): pass 303 304 # The first loop tests both functions and that remove_mem_hooks() 305 # can be called twice in a row. The second loop checks a call to 306 # set_nomemory() after a call to remove_mem_hooks(). The third 307 # loop checks the start and stop arguments of set_nomemory(). 308 for outer_cnt in range(1, 4): 309 start = 10 * outer_cnt 310 for j in range(100): 311 if j == 0: 312 if outer_cnt != 3: 313 _testcapi.set_nomemory(start) 314 else: 315 _testcapi.set_nomemory(start, start + 1) 316 try: 317 C() 318 except MemoryError as e: 319 if outer_cnt != 3: 320 _testcapi.remove_mem_hooks() 321 print('MemoryError', outer_cnt, j) 322 _testcapi.remove_mem_hooks() 323 break 324 """ 325 rc, out, err = assert_python_ok('-c', code) 326 lines = out.splitlines() 327 for i, line in enumerate(lines, 1): 328 self.assertIn(b'MemoryError', out) 329 *_, count = line.split(b' ') 330 count = int(count) 331 self.assertLessEqual(count, i*5) 332 self.assertGreaterEqual(count, i*5-1) 333 334 def test_mapping_keys_values_items(self): 335 class Mapping1(dict): 336 def keys(self): 337 return list(super().keys()) 338 def values(self): 339 return list(super().values()) 340 def items(self): 341 return list(super().items()) 342 class Mapping2(dict): 343 def keys(self): 344 return tuple(super().keys()) 345 def values(self): 346 return tuple(super().values()) 347 def items(self): 348 return tuple(super().items()) 349 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 350 351 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 352 dict_obj, OrderedDict(dict_obj), 353 Mapping1(dict_obj), Mapping2(dict_obj)]: 354 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 355 list(mapping.keys())) 356 self.assertListEqual(_testcapi.get_mapping_values(mapping), 357 list(mapping.values())) 358 self.assertListEqual(_testcapi.get_mapping_items(mapping), 359 list(mapping.items())) 360 361 def test_mapping_keys_values_items_bad_arg(self): 362 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 363 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 364 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 365 366 class BadMapping: 367 def keys(self): 368 return None 369 def values(self): 370 return None 371 def items(self): 372 return None 373 bad_mapping = BadMapping() 374 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 375 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 376 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 377 378 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 379 'need _testcapi.negative_refcount') 380 def test_negative_refcount(self): 381 # bpo-35059: Check that Py_DECREF() reports the correct filename 382 # when calling _Py_NegativeRefcount() to abort Python. 383 code = textwrap.dedent(""" 384 import _testcapi 385 from test import support 386 387 with support.SuppressCrashReport(): 388 _testcapi.negative_refcount() 389 """) 390 rc, out, err = assert_python_failure('-c', code) 391 self.assertRegex(err, 392 br'_testcapimodule\.c:[0-9]+: ' 393 br'_Py_NegativeRefcount: Assertion failed: ' 394 br'object has negative ref count') 395 396 def test_trashcan_subclass(self): 397 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 398 # activated when its tp_dealloc is being called by a subclass 399 from _testcapi import MyList 400 L = None 401 for i in range(1000): 402 L = MyList((L,)) 403 404 @support.requires_resource('cpu') 405 def test_trashcan_python_class1(self): 406 self.do_test_trashcan_python_class(list) 407 408 @support.requires_resource('cpu') 409 def test_trashcan_python_class2(self): 410 from _testcapi import MyList 411 self.do_test_trashcan_python_class(MyList) 412 413 def do_test_trashcan_python_class(self, base): 414 # Check that the trashcan mechanism works properly for a Python 415 # subclass of a class using the trashcan (this specific test assumes 416 # that the base class "base" behaves like list) 417 class PyList(base): 418 # Count the number of PyList instances to verify that there is 419 # no memory leak 420 num = 0 421 def __init__(self, *args): 422 __class__.num += 1 423 super().__init__(*args) 424 def __del__(self): 425 __class__.num -= 1 426 427 for parity in (0, 1): 428 L = None 429 # We need in the order of 2**20 iterations here such that a 430 # typical 8MB stack would overflow without the trashcan. 431 for i in range(2**20): 432 L = PyList((L,)) 433 L.attr = i 434 if parity: 435 # Add one additional nesting layer 436 L = (L,) 437 self.assertGreater(PyList.num, 0) 438 del L 439 self.assertEqual(PyList.num, 0) 440 441 def test_heap_ctype_doc_and_text_signature(self): 442 self.assertEqual(_testcapi.HeapDocCType.__doc__, "somedoc") 443 self.assertEqual(_testcapi.HeapDocCType.__text_signature__, "(arg1, arg2)") 444 445 def test_null_type_doc(self): 446 self.assertEqual(_testcapi.NullTpDocType.__doc__, None) 447 448 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 449 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 450 def __init__(self): 451 self.value2 = 20 452 super().__init__() 453 454 subclass_instance = HeapGcCTypeSubclass() 455 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 456 457 # Test that subclass instance was fully created 458 self.assertEqual(subclass_instance.value, 10) 459 self.assertEqual(subclass_instance.value2, 20) 460 461 # Test that the type reference count is only decremented once 462 del subclass_instance 463 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 464 465 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 466 class A(_testcapi.HeapGcCType): 467 def __init__(self): 468 self.value2 = 20 469 super().__init__() 470 471 class B(A): 472 def __init__(self): 473 super().__init__() 474 475 def __del__(self): 476 self.__class__ = A 477 A.refcnt_in_del = sys.getrefcount(A) 478 B.refcnt_in_del = sys.getrefcount(B) 479 480 subclass_instance = B() 481 type_refcnt = sys.getrefcount(B) 482 new_type_refcnt = sys.getrefcount(A) 483 484 # Test that subclass instance was fully created 485 self.assertEqual(subclass_instance.value, 10) 486 self.assertEqual(subclass_instance.value2, 20) 487 488 del subclass_instance 489 490 # Test that setting __class__ modified the reference counts of the types 491 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 492 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 493 494 # Test that the original type already has decreased its refcnt 495 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 496 497 # Test that subtype_dealloc decref the newly assigned __class__ only once 498 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 499 500 def test_heaptype_with_dict(self): 501 inst = _testcapi.HeapCTypeWithDict() 502 inst.foo = 42 503 self.assertEqual(inst.foo, 42) 504 self.assertEqual(inst.dictobj, inst.__dict__) 505 self.assertEqual(inst.dictobj, {"foo": 42}) 506 507 inst = _testcapi.HeapCTypeWithDict() 508 self.assertEqual({}, inst.__dict__) 509 510 def test_heaptype_with_negative_dict(self): 511 inst = _testcapi.HeapCTypeWithNegativeDict() 512 inst.foo = 42 513 self.assertEqual(inst.foo, 42) 514 self.assertEqual(inst.dictobj, inst.__dict__) 515 self.assertEqual(inst.dictobj, {"foo": 42}) 516 517 inst = _testcapi.HeapCTypeWithNegativeDict() 518 self.assertEqual({}, inst.__dict__) 519 520 def test_heaptype_with_weakref(self): 521 inst = _testcapi.HeapCTypeWithWeakref() 522 ref = weakref.ref(inst) 523 self.assertEqual(ref(), inst) 524 self.assertEqual(inst.weakreflist, ref) 525 526 def test_heaptype_with_buffer(self): 527 inst = _testcapi.HeapCTypeWithBuffer() 528 b = bytes(inst) 529 self.assertEqual(b, b"1234") 530 531 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 532 subclass_instance = _testcapi.HeapCTypeSubclass() 533 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 534 535 # Test that subclass instance was fully created 536 self.assertEqual(subclass_instance.value, 10) 537 self.assertEqual(subclass_instance.value2, 20) 538 539 # Test that the type reference count is only decremented once 540 del subclass_instance 541 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 542 543 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 544 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 545 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 546 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 547 548 # Test that subclass instance was fully created 549 self.assertEqual(subclass_instance.value, 10) 550 self.assertEqual(subclass_instance.value2, 20) 551 552 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 553 del subclass_instance 554 555 # Test that setting __class__ modified the reference counts of the types 556 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 557 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 558 559 # Test that the original type already has decreased its refcnt 560 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 561 562 # Test that subtype_dealloc decref the newly assigned __class__ only once 563 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 564 565 def test_heaptype_with_setattro(self): 566 obj = _testcapi.HeapCTypeSetattr() 567 self.assertEqual(obj.pvalue, 10) 568 obj.value = 12 569 self.assertEqual(obj.pvalue, 12) 570 del obj.value 571 self.assertEqual(obj.pvalue, 0) 572 573 def test_pynumber_tobase(self): 574 from _testcapi import pynumber_tobase 575 self.assertEqual(pynumber_tobase(123, 2), '0b1111011') 576 self.assertEqual(pynumber_tobase(123, 8), '0o173') 577 self.assertEqual(pynumber_tobase(123, 10), '123') 578 self.assertEqual(pynumber_tobase(123, 16), '0x7b') 579 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011') 580 self.assertEqual(pynumber_tobase(-123, 8), '-0o173') 581 self.assertEqual(pynumber_tobase(-123, 10), '-123') 582 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b') 583 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 584 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 585 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 586 587 def check_fatal_error(self, code, expected, not_expected=()): 588 with support.SuppressCrashReport(): 589 rc, out, err = assert_python_failure('-sSI', '-c', code) 590 591 err = decode_stderr(err) 592 self.assertIn('Fatal Python error: test_fatal_error: MESSAGE\n', 593 err) 594 595 match = re.search(r'^Extension modules:(.*) \(total: ([0-9]+)\)$', 596 err, re.MULTILINE) 597 if not match: 598 self.fail(f"Cannot find 'Extension modules:' in {err!r}") 599 modules = set(match.group(1).strip().split(', ')) 600 total = int(match.group(2)) 601 602 for name in expected: 603 self.assertIn(name, modules) 604 for name in not_expected: 605 self.assertNotIn(name, modules) 606 self.assertEqual(len(modules), total) 607 608 def test_fatal_error(self): 609 # By default, stdlib extension modules are ignored, 610 # but not test modules. 611 expected = ('_testcapi',) 612 not_expected = ('sys',) 613 code = 'import _testcapi, sys; _testcapi.fatal_error(b"MESSAGE")' 614 self.check_fatal_error(code, expected, not_expected) 615 616 # Mark _testcapi as stdlib module, but not sys 617 expected = ('sys',) 618 not_expected = ('_testcapi',) 619 code = textwrap.dedent(''' 620 import _testcapi, sys 621 sys.stdlib_module_names = frozenset({"_testcapi"}) 622 _testcapi.fatal_error(b"MESSAGE") 623 ''') 624 self.check_fatal_error(code, expected) 625 626 def test_pyobject_repr_from_null(self): 627 s = _testcapi.pyobject_repr_from_null() 628 self.assertEqual(s, '<NULL>') 629 630 def test_pyobject_str_from_null(self): 631 s = _testcapi.pyobject_str_from_null() 632 self.assertEqual(s, '<NULL>') 633 634 def test_pyobject_bytes_from_null(self): 635 s = _testcapi.pyobject_bytes_from_null() 636 self.assertEqual(s, b'<NULL>') 637 638 def test_Py_CompileString(self): 639 # Check that Py_CompileString respects the coding cookie 640 _compile = _testcapi.Py_CompileString 641 code = b"# -*- coding: latin1 -*-\nprint('\xc2\xa4')\n" 642 result = _compile(code) 643 expected = compile(code, "<string>", "exec") 644 self.assertEqual(result.co_consts, expected.co_consts) 645 646 647class TestPendingCalls(unittest.TestCase): 648 649 def pendingcalls_submit(self, l, n): 650 def callback(): 651 #this function can be interrupted by thread switching so let's 652 #use an atomic operation 653 l.append(None) 654 655 for i in range(n): 656 time.sleep(random.random()*0.02) #0.01 secs on average 657 #try submitting callback until successful. 658 #rely on regular interrupt to flush queue if we are 659 #unsuccessful. 660 while True: 661 if _testcapi._pending_threadfunc(callback): 662 break 663 664 def pendingcalls_wait(self, l, n, context = None): 665 #now, stick around until l[0] has grown to 10 666 count = 0 667 while len(l) != n: 668 #this busy loop is where we expect to be interrupted to 669 #run our callbacks. Note that callbacks are only run on the 670 #main thread 671 if False and support.verbose: 672 print("(%i)"%(len(l),),) 673 for i in range(1000): 674 a = i*i 675 if context and not context.event.is_set(): 676 continue 677 count += 1 678 self.assertTrue(count < 10000, 679 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 680 if False and support.verbose: 681 print("(%i)"%(len(l),)) 682 683 def test_pendingcalls_threaded(self): 684 685 #do every callback on a separate thread 686 n = 32 #total callbacks 687 threads = [] 688 class foo(object):pass 689 context = foo() 690 context.l = [] 691 context.n = 2 #submits per thread 692 context.nThreads = n // context.n 693 context.nFinished = 0 694 context.lock = threading.Lock() 695 context.event = threading.Event() 696 697 threads = [threading.Thread(target=self.pendingcalls_thread, 698 args=(context,)) 699 for i in range(context.nThreads)] 700 with threading_helper.start_threads(threads): 701 self.pendingcalls_wait(context.l, n, context) 702 703 def pendingcalls_thread(self, context): 704 try: 705 self.pendingcalls_submit(context.l, context.n) 706 finally: 707 with context.lock: 708 context.nFinished += 1 709 nFinished = context.nFinished 710 if False and support.verbose: 711 print("finished threads: ", nFinished) 712 if nFinished == context.nThreads: 713 context.event.set() 714 715 def test_pendingcalls_non_threaded(self): 716 #again, just using the main thread, likely they will all be dispatched at 717 #once. It is ok to ask for too many, because we loop until we find a slot. 718 #the loop can be interrupted to dispatch. 719 #there are only 32 dispatch slots, so we go for twice that! 720 l = [] 721 n = 64 722 self.pendingcalls_submit(l, n) 723 self.pendingcalls_wait(l, n) 724 725 726class SubinterpreterTest(unittest.TestCase): 727 728 def test_subinterps(self): 729 import builtins 730 r, w = os.pipe() 731 code = """if 1: 732 import sys, builtins, pickle 733 with open({:d}, "wb") as f: 734 pickle.dump(id(sys.modules), f) 735 pickle.dump(id(builtins), f) 736 """.format(w) 737 with open(r, "rb") as f: 738 ret = support.run_in_subinterp(code) 739 self.assertEqual(ret, 0) 740 self.assertNotEqual(pickle.load(f), id(sys.modules)) 741 self.assertNotEqual(pickle.load(f), id(builtins)) 742 743 def test_subinterps_recent_language_features(self): 744 r, w = os.pipe() 745 code = """if 1: 746 import pickle 747 with open({:d}, "wb") as f: 748 749 @(lambda x:x) # Py 3.9 750 def noop(x): return x 751 752 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 753 754 async def foo(arg): return await arg # Py 3.5 755 756 pickle.dump(dict(a=a, b=b), f) 757 """.format(w) 758 759 with open(r, "rb") as f: 760 ret = support.run_in_subinterp(code) 761 self.assertEqual(ret, 0) 762 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 763 764 def test_mutate_exception(self): 765 """ 766 Exceptions saved in global module state get shared between 767 individual module instances. This test checks whether or not 768 a change in one interpreter's module gets reflected into the 769 other ones. 770 """ 771 import binascii 772 773 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 774 775 self.assertFalse(hasattr(binascii.Error, "foobar")) 776 777 def test_module_state_shared_in_global(self): 778 """ 779 bpo-44050: Extension module state should be shared between interpreters 780 when it doesn't support sub-interpreters. 781 """ 782 r, w = os.pipe() 783 self.addCleanup(os.close, r) 784 self.addCleanup(os.close, w) 785 786 script = textwrap.dedent(f""" 787 import importlib.machinery 788 import importlib.util 789 import os 790 791 fullname = '_test_module_state_shared' 792 origin = importlib.util.find_spec('_testmultiphase').origin 793 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 794 spec = importlib.util.spec_from_loader(fullname, loader) 795 module = importlib.util.module_from_spec(spec) 796 attr_id = str(id(module.Error)).encode() 797 798 os.write({w}, attr_id) 799 """) 800 exec(script) 801 main_attr_id = os.read(r, 100) 802 803 ret = support.run_in_subinterp(script) 804 self.assertEqual(ret, 0) 805 subinterp_attr_id = os.read(r, 100) 806 self.assertEqual(main_attr_id, subinterp_attr_id) 807 808 809class TestThreadState(unittest.TestCase): 810 811 @threading_helper.reap_threads 812 def test_thread_state(self): 813 # some extra thread-state tests driven via _testcapi 814 def target(): 815 idents = [] 816 817 def callback(): 818 idents.append(threading.get_ident()) 819 820 _testcapi._test_thread_state(callback) 821 a = b = callback 822 time.sleep(1) 823 # Check our main thread is in the list exactly 3 times. 824 self.assertEqual(idents.count(threading.get_ident()), 3, 825 "Couldn't find main thread correctly in the list") 826 827 target() 828 t = threading.Thread(target=target) 829 t.start() 830 t.join() 831 832 833class Test_testcapi(unittest.TestCase): 834 locals().update((name, getattr(_testcapi, name)) 835 for name in dir(_testcapi) 836 if name.startswith('test_') and not name.endswith('_code')) 837 838 # Suppress warning from PyUnicode_FromUnicode(). 839 @warnings_helper.ignore_warnings(category=DeprecationWarning) 840 def test_widechar(self): 841 _testcapi.test_widechar() 842 843 844class Test_testinternalcapi(unittest.TestCase): 845 locals().update((name, getattr(_testinternalcapi, name)) 846 for name in dir(_testinternalcapi) 847 if name.startswith('test_')) 848 849 850class PyMemDebugTests(unittest.TestCase): 851 PYTHONMALLOC = 'debug' 852 # '0x04c06e0' or '04C06E0' 853 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 854 855 def check(self, code): 856 with support.SuppressCrashReport(): 857 out = assert_python_failure('-c', code, 858 PYTHONMALLOC=self.PYTHONMALLOC) 859 stderr = out.err 860 return stderr.decode('ascii', 'replace') 861 862 def test_buffer_overflow(self): 863 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 864 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 865 r" 16 bytes originally requested\n" 866 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 867 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 868 r" at tail\+0: 0x78 \*\*\* OUCH\n" 869 r" at tail\+1: 0xfd\n" 870 r" at tail\+2: 0xfd\n" 871 r" .*\n" 872 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 873 r" Data at p: cd cd cd .*\n" 874 r"\n" 875 r"Enable tracemalloc to get the memory block allocation traceback\n" 876 r"\n" 877 r"Fatal Python error: _PyMem_DebugRawFree: bad trailing pad byte") 878 regex = regex.format(ptr=self.PTR_REGEX) 879 regex = re.compile(regex, flags=re.DOTALL) 880 self.assertRegex(out, regex) 881 882 def test_api_misuse(self): 883 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 884 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 885 r" 16 bytes originally requested\n" 886 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 887 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 888 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 889 r" Data at p: cd cd cd .*\n" 890 r"\n" 891 r"Enable tracemalloc to get the memory block allocation traceback\n" 892 r"\n" 893 r"Fatal Python error: _PyMem_DebugRawFree: bad ID: Allocated using API 'm', verified using API 'r'\n") 894 regex = regex.format(ptr=self.PTR_REGEX) 895 self.assertRegex(out, regex) 896 897 def check_malloc_without_gil(self, code): 898 out = self.check(code) 899 expected = ('Fatal Python error: _PyMem_DebugMalloc: ' 900 'Python memory allocator called without holding the GIL') 901 self.assertIn(expected, out) 902 903 def test_pymem_malloc_without_gil(self): 904 # Debug hooks must raise an error if PyMem_Malloc() is called 905 # without holding the GIL 906 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 907 self.check_malloc_without_gil(code) 908 909 def test_pyobject_malloc_without_gil(self): 910 # Debug hooks must raise an error if PyObject_Malloc() is called 911 # without holding the GIL 912 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 913 self.check_malloc_without_gil(code) 914 915 def check_pyobject_is_freed(self, func_name): 916 code = textwrap.dedent(f''' 917 import gc, os, sys, _testcapi 918 # Disable the GC to avoid crash on GC collection 919 gc.disable() 920 try: 921 _testcapi.{func_name}() 922 # Exit immediately to avoid a crash while deallocating 923 # the invalid object 924 os._exit(0) 925 except _testcapi.error: 926 os._exit(1) 927 ''') 928 assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC) 929 930 def test_pyobject_null_is_freed(self): 931 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 932 933 def test_pyobject_uninitialized_is_freed(self): 934 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 935 936 def test_pyobject_forbidden_bytes_is_freed(self): 937 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 938 939 def test_pyobject_freed_is_freed(self): 940 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 941 942 943class PyMemMallocDebugTests(PyMemDebugTests): 944 PYTHONMALLOC = 'malloc_debug' 945 946 947@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 948class PyMemPymallocDebugTests(PyMemDebugTests): 949 PYTHONMALLOC = 'pymalloc_debug' 950 951 952@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 953class PyMemDefaultTests(PyMemDebugTests): 954 # test default allocator of Python compiled in debug mode 955 PYTHONMALLOC = '' 956 957 958class Test_ModuleStateAccess(unittest.TestCase): 959 """Test access to module start (PEP 573)""" 960 961 # The C part of the tests lives in _testmultiphase, in a module called 962 # _testmultiphase_meth_state_access. 963 # This module has multi-phase initialization, unlike _testcapi. 964 965 def setUp(self): 966 fullname = '_testmultiphase_meth_state_access' # XXX 967 origin = importlib.util.find_spec('_testmultiphase').origin 968 loader = importlib.machinery.ExtensionFileLoader(fullname, origin) 969 spec = importlib.util.spec_from_loader(fullname, loader) 970 module = importlib.util.module_from_spec(spec) 971 loader.exec_module(module) 972 self.module = module 973 974 def test_subclass_get_module(self): 975 """PyType_GetModule for defining_class""" 976 class StateAccessType_Subclass(self.module.StateAccessType): 977 pass 978 979 instance = StateAccessType_Subclass() 980 self.assertIs(instance.get_defining_module(), self.module) 981 982 def test_subclass_get_module_with_super(self): 983 class StateAccessType_Subclass(self.module.StateAccessType): 984 def get_defining_module(self): 985 return super().get_defining_module() 986 987 instance = StateAccessType_Subclass() 988 self.assertIs(instance.get_defining_module(), self.module) 989 990 def test_state_access(self): 991 """Checks methods defined with and without argument clinic 992 993 This tests a no-arg method (get_count) and a method with 994 both a positional and keyword argument. 995 """ 996 997 a = self.module.StateAccessType() 998 b = self.module.StateAccessType() 999 1000 methods = { 1001 'clinic': a.increment_count_clinic, 1002 'noclinic': a.increment_count_noclinic, 1003 } 1004 1005 for name, increment_count in methods.items(): 1006 with self.subTest(name): 1007 self.assertEqual(a.get_count(), b.get_count()) 1008 self.assertEqual(a.get_count(), 0) 1009 1010 increment_count() 1011 self.assertEqual(a.get_count(), b.get_count()) 1012 self.assertEqual(a.get_count(), 1) 1013 1014 increment_count(3) 1015 self.assertEqual(a.get_count(), b.get_count()) 1016 self.assertEqual(a.get_count(), 4) 1017 1018 increment_count(-2, twice=True) 1019 self.assertEqual(a.get_count(), b.get_count()) 1020 self.assertEqual(a.get_count(), 0) 1021 1022 with self.assertRaises(TypeError): 1023 increment_count(thrice=3) 1024 1025 with self.assertRaises(TypeError): 1026 increment_count(1, 2, 3) 1027 1028 1029if __name__ == "__main__": 1030 unittest.main() 1031