1# Run the _testcapi module tests (tests for the Python/C API): by defn, 2# these are all functions _testcapi exports whose name begins with 'test_'. 3 4from collections import OrderedDict 5import os 6import pickle 7import random 8import re 9import subprocess 10import sys 11import textwrap 12import threading 13import time 14import unittest 15from test import support 16from test.support import MISSING_C_DOCSTRINGS 17from test.support.script_helper import assert_python_failure, assert_python_ok 18try: 19 import _posixsubprocess 20except ImportError: 21 _posixsubprocess = None 22 23# Skip this test if the _testcapi module isn't available. 24_testcapi = support.import_module('_testcapi') 25 26# Were we compiled --with-pydebug or with #define Py_DEBUG? 27Py_DEBUG = hasattr(sys, 'gettotalrefcount') 28 29 30def testfunction(self): 31 """some doc""" 32 return self 33 34 35class InstanceMethod: 36 id = _testcapi.instancemethod(id) 37 testfunction = _testcapi.instancemethod(testfunction) 38 39class CAPITest(unittest.TestCase): 40 41 def test_instancemethod(self): 42 inst = InstanceMethod() 43 self.assertEqual(id(inst), inst.id()) 44 self.assertTrue(inst.testfunction() is inst) 45 self.assertEqual(inst.testfunction.__doc__, testfunction.__doc__) 46 self.assertEqual(InstanceMethod.testfunction.__doc__, testfunction.__doc__) 47 48 InstanceMethod.testfunction.attribute = "test" 49 self.assertEqual(testfunction.attribute, "test") 50 self.assertRaises(AttributeError, setattr, inst.testfunction, "attribute", "test") 51 52 def test_no_FatalError_infinite_loop(self): 53 with support.SuppressCrashReport(): 54 p = subprocess.Popen([sys.executable, "-c", 55 'import _testcapi;' 56 '_testcapi.crash_no_current_thread()'], 57 stdout=subprocess.PIPE, 58 stderr=subprocess.PIPE) 59 (out, err) = p.communicate() 60 self.assertEqual(out, b'') 61 # This used to cause an infinite loop. 62 self.assertTrue(err.rstrip().startswith( 63 b'Fatal Python error:' 64 b' PyThreadState_Get: no current thread')) 65 66 def test_memoryview_from_NULL_pointer(self): 67 self.assertRaises(ValueError, _testcapi.make_memoryview_from_NULL_pointer) 68 69 def test_exc_info(self): 70 raised_exception = ValueError("5") 71 new_exc = TypeError("TEST") 72 try: 73 raise raised_exception 74 except ValueError as e: 75 tb = e.__traceback__ 76 orig_sys_exc_info = sys.exc_info() 77 orig_exc_info = _testcapi.set_exc_info(new_exc.__class__, new_exc, None) 78 new_sys_exc_info = sys.exc_info() 79 new_exc_info = _testcapi.set_exc_info(*orig_exc_info) 80 reset_sys_exc_info = sys.exc_info() 81 82 self.assertEqual(orig_exc_info[1], e) 83 84 self.assertSequenceEqual(orig_exc_info, (raised_exception.__class__, raised_exception, tb)) 85 self.assertSequenceEqual(orig_sys_exc_info, orig_exc_info) 86 self.assertSequenceEqual(reset_sys_exc_info, orig_exc_info) 87 self.assertSequenceEqual(new_exc_info, (new_exc.__class__, new_exc, None)) 88 self.assertSequenceEqual(new_sys_exc_info, new_exc_info) 89 else: 90 self.assertTrue(False) 91 92 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 93 def test_seq_bytes_to_charp_array(self): 94 # Issue #15732: crash in _PySequence_BytesToCharpArray() 95 class Z(object): 96 def __len__(self): 97 return 1 98 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 99 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 100 # Issue #15736: overflow in _PySequence_BytesToCharpArray() 101 class Z(object): 102 def __len__(self): 103 return sys.maxsize 104 def __getitem__(self, i): 105 return b'x' 106 self.assertRaises(MemoryError, _posixsubprocess.fork_exec, 107 1,Z(),3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 108 109 @unittest.skipUnless(_posixsubprocess, '_posixsubprocess required for this test.') 110 def test_subprocess_fork_exec(self): 111 class Z(object): 112 def __len__(self): 113 return 1 114 115 # Issue #15738: crash in subprocess_fork_exec() 116 self.assertRaises(TypeError, _posixsubprocess.fork_exec, 117 Z(),[b'1'],3,(1, 2),5,6,7,8,9,10,11,12,13,14,15,16,17) 118 119 @unittest.skipIf(MISSING_C_DOCSTRINGS, 120 "Signature information for builtins requires docstrings") 121 def test_docstring_signature_parsing(self): 122 123 self.assertEqual(_testcapi.no_docstring.__doc__, None) 124 self.assertEqual(_testcapi.no_docstring.__text_signature__, None) 125 126 self.assertEqual(_testcapi.docstring_empty.__doc__, None) 127 self.assertEqual(_testcapi.docstring_empty.__text_signature__, None) 128 129 self.assertEqual(_testcapi.docstring_no_signature.__doc__, 130 "This docstring has no signature.") 131 self.assertEqual(_testcapi.docstring_no_signature.__text_signature__, None) 132 133 self.assertEqual(_testcapi.docstring_with_invalid_signature.__doc__, 134 "docstring_with_invalid_signature($module, /, boo)\n" 135 "\n" 136 "This docstring has an invalid signature." 137 ) 138 self.assertEqual(_testcapi.docstring_with_invalid_signature.__text_signature__, None) 139 140 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__doc__, 141 "docstring_with_invalid_signature2($module, /, boo)\n" 142 "\n" 143 "--\n" 144 "\n" 145 "This docstring also has an invalid signature." 146 ) 147 self.assertEqual(_testcapi.docstring_with_invalid_signature2.__text_signature__, None) 148 149 self.assertEqual(_testcapi.docstring_with_signature.__doc__, 150 "This docstring has a valid signature.") 151 self.assertEqual(_testcapi.docstring_with_signature.__text_signature__, "($module, /, sig)") 152 153 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__doc__, None) 154 self.assertEqual(_testcapi.docstring_with_signature_but_no_doc.__text_signature__, 155 "($module, /, sig)") 156 157 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__doc__, 158 "\nThis docstring has a valid signature and some extra newlines.") 159 self.assertEqual(_testcapi.docstring_with_signature_and_extra_newlines.__text_signature__, 160 "($module, /, parameter)") 161 162 def test_c_type_with_matrix_multiplication(self): 163 M = _testcapi.matmulType 164 m1 = M() 165 m2 = M() 166 self.assertEqual(m1 @ m2, ("matmul", m1, m2)) 167 self.assertEqual(m1 @ 42, ("matmul", m1, 42)) 168 self.assertEqual(42 @ m1, ("matmul", 42, m1)) 169 o = m1 170 o @= m2 171 self.assertEqual(o, ("imatmul", m1, m2)) 172 o = m1 173 o @= 42 174 self.assertEqual(o, ("imatmul", m1, 42)) 175 o = 42 176 o @= m1 177 self.assertEqual(o, ("matmul", 42, m1)) 178 179 def test_c_type_with_ipow(self): 180 # When the __ipow__ method of a type was implemented in C, using the 181 # modulo param would cause segfaults. 182 o = _testcapi.ipowType() 183 self.assertEqual(o.__ipow__(1), (1, None)) 184 self.assertEqual(o.__ipow__(2, 2), (2, 2)) 185 186 def test_return_null_without_error(self): 187 # Issue #23571: A function must not return NULL without setting an 188 # error 189 if Py_DEBUG: 190 code = textwrap.dedent(""" 191 import _testcapi 192 from test import support 193 194 with support.SuppressCrashReport(): 195 _testcapi.return_null_without_error() 196 """) 197 rc, out, err = assert_python_failure('-c', code) 198 self.assertRegex(err.replace(b'\r', b''), 199 br'Fatal Python error: a function returned NULL ' 200 br'without setting an error\n' 201 br'Python runtime state: initialized\n' 202 br'SystemError: <built-in function ' 203 br'return_null_without_error> returned NULL ' 204 br'without setting an error\n' 205 br'\n' 206 br'Current thread.*:\n' 207 br' File .*", line 6 in <module>') 208 else: 209 with self.assertRaises(SystemError) as cm: 210 _testcapi.return_null_without_error() 211 self.assertRegex(str(cm.exception), 212 'return_null_without_error.* ' 213 'returned NULL without setting an error') 214 215 def test_return_result_with_error(self): 216 # Issue #23571: A function must not return a result with an error set 217 if Py_DEBUG: 218 code = textwrap.dedent(""" 219 import _testcapi 220 from test import support 221 222 with support.SuppressCrashReport(): 223 _testcapi.return_result_with_error() 224 """) 225 rc, out, err = assert_python_failure('-c', code) 226 self.assertRegex(err.replace(b'\r', b''), 227 br'Fatal Python error: a function returned a ' 228 br'result with an error set\n' 229 br'Python runtime state: initialized\n' 230 br'ValueError\n' 231 br'\n' 232 br'The above exception was the direct cause ' 233 br'of the following exception:\n' 234 br'\n' 235 br'SystemError: <built-in ' 236 br'function return_result_with_error> ' 237 br'returned a result with an error set\n' 238 br'\n' 239 br'Current thread.*:\n' 240 br' File .*, line 6 in <module>') 241 else: 242 with self.assertRaises(SystemError) as cm: 243 _testcapi.return_result_with_error() 244 self.assertRegex(str(cm.exception), 245 'return_result_with_error.* ' 246 'returned a result with an error set') 247 248 def test_buildvalue_N(self): 249 _testcapi.test_buildvalue_N() 250 251 def test_set_nomemory(self): 252 code = """if 1: 253 import _testcapi 254 255 class C(): pass 256 257 # The first loop tests both functions and that remove_mem_hooks() 258 # can be called twice in a row. The second loop checks a call to 259 # set_nomemory() after a call to remove_mem_hooks(). The third 260 # loop checks the start and stop arguments of set_nomemory(). 261 for outer_cnt in range(1, 4): 262 start = 10 * outer_cnt 263 for j in range(100): 264 if j == 0: 265 if outer_cnt != 3: 266 _testcapi.set_nomemory(start) 267 else: 268 _testcapi.set_nomemory(start, start + 1) 269 try: 270 C() 271 except MemoryError as e: 272 if outer_cnt != 3: 273 _testcapi.remove_mem_hooks() 274 print('MemoryError', outer_cnt, j) 275 _testcapi.remove_mem_hooks() 276 break 277 """ 278 rc, out, err = assert_python_ok('-c', code) 279 self.assertIn(b'MemoryError 1 10', out) 280 self.assertIn(b'MemoryError 2 20', out) 281 self.assertIn(b'MemoryError 3 30', out) 282 283 def test_mapping_keys_values_items(self): 284 class Mapping1(dict): 285 def keys(self): 286 return list(super().keys()) 287 def values(self): 288 return list(super().values()) 289 def items(self): 290 return list(super().items()) 291 class Mapping2(dict): 292 def keys(self): 293 return tuple(super().keys()) 294 def values(self): 295 return tuple(super().values()) 296 def items(self): 297 return tuple(super().items()) 298 dict_obj = {'foo': 1, 'bar': 2, 'spam': 3} 299 300 for mapping in [{}, OrderedDict(), Mapping1(), Mapping2(), 301 dict_obj, OrderedDict(dict_obj), 302 Mapping1(dict_obj), Mapping2(dict_obj)]: 303 self.assertListEqual(_testcapi.get_mapping_keys(mapping), 304 list(mapping.keys())) 305 self.assertListEqual(_testcapi.get_mapping_values(mapping), 306 list(mapping.values())) 307 self.assertListEqual(_testcapi.get_mapping_items(mapping), 308 list(mapping.items())) 309 310 def test_mapping_keys_values_items_bad_arg(self): 311 self.assertRaises(AttributeError, _testcapi.get_mapping_keys, None) 312 self.assertRaises(AttributeError, _testcapi.get_mapping_values, None) 313 self.assertRaises(AttributeError, _testcapi.get_mapping_items, None) 314 315 class BadMapping: 316 def keys(self): 317 return None 318 def values(self): 319 return None 320 def items(self): 321 return None 322 bad_mapping = BadMapping() 323 self.assertRaises(TypeError, _testcapi.get_mapping_keys, bad_mapping) 324 self.assertRaises(TypeError, _testcapi.get_mapping_values, bad_mapping) 325 self.assertRaises(TypeError, _testcapi.get_mapping_items, bad_mapping) 326 327 @unittest.skipUnless(hasattr(_testcapi, 'negative_refcount'), 328 'need _testcapi.negative_refcount') 329 def test_negative_refcount(self): 330 # bpo-35059: Check that Py_DECREF() reports the correct filename 331 # when calling _Py_NegativeRefcount() to abort Python. 332 code = textwrap.dedent(""" 333 import _testcapi 334 from test import support 335 336 with support.SuppressCrashReport(): 337 _testcapi.negative_refcount() 338 """) 339 rc, out, err = assert_python_failure('-c', code) 340 self.assertRegex(err, 341 br'_testcapimodule\.c:[0-9]+: ' 342 br'_Py_NegativeRefcount: Assertion failed: ' 343 br'object has negative ref count') 344 345 def test_trashcan_subclass(self): 346 # bpo-35983: Check that the trashcan mechanism for "list" is NOT 347 # activated when its tp_dealloc is being called by a subclass 348 from _testcapi import MyList 349 L = None 350 for i in range(1000): 351 L = MyList((L,)) 352 353 @support.requires_resource('cpu') 354 def test_trashcan_python_class1(self): 355 self.do_test_trashcan_python_class(list) 356 357 @support.requires_resource('cpu') 358 def test_trashcan_python_class2(self): 359 from _testcapi import MyList 360 self.do_test_trashcan_python_class(MyList) 361 362 def do_test_trashcan_python_class(self, base): 363 # Check that the trashcan mechanism works properly for a Python 364 # subclass of a class using the trashcan (this specific test assumes 365 # that the base class "base" behaves like list) 366 class PyList(base): 367 # Count the number of PyList instances to verify that there is 368 # no memory leak 369 num = 0 370 def __init__(self, *args): 371 __class__.num += 1 372 super().__init__(*args) 373 def __del__(self): 374 __class__.num -= 1 375 376 for parity in (0, 1): 377 L = None 378 # We need in the order of 2**20 iterations here such that a 379 # typical 8MB stack would overflow without the trashcan. 380 for i in range(2**20): 381 L = PyList((L,)) 382 L.attr = i 383 if parity: 384 # Add one additional nesting layer 385 L = (L,) 386 self.assertGreater(PyList.num, 0) 387 del L 388 self.assertEqual(PyList.num, 0) 389 390 def test_subclass_of_heap_gc_ctype_with_tpdealloc_decrefs_once(self): 391 class HeapGcCTypeSubclass(_testcapi.HeapGcCType): 392 def __init__(self): 393 self.value2 = 20 394 super().__init__() 395 396 subclass_instance = HeapGcCTypeSubclass() 397 type_refcnt = sys.getrefcount(HeapGcCTypeSubclass) 398 399 # Test that subclass instance was fully created 400 self.assertEqual(subclass_instance.value, 10) 401 self.assertEqual(subclass_instance.value2, 20) 402 403 # Test that the type reference count is only decremented once 404 del subclass_instance 405 self.assertEqual(type_refcnt - 1, sys.getrefcount(HeapGcCTypeSubclass)) 406 407 def test_subclass_of_heap_gc_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 408 class A(_testcapi.HeapGcCType): 409 def __init__(self): 410 self.value2 = 20 411 super().__init__() 412 413 class B(A): 414 def __init__(self): 415 super().__init__() 416 417 def __del__(self): 418 self.__class__ = A 419 A.refcnt_in_del = sys.getrefcount(A) 420 B.refcnt_in_del = sys.getrefcount(B) 421 422 subclass_instance = B() 423 type_refcnt = sys.getrefcount(B) 424 new_type_refcnt = sys.getrefcount(A) 425 426 # Test that subclass instance was fully created 427 self.assertEqual(subclass_instance.value, 10) 428 self.assertEqual(subclass_instance.value2, 20) 429 430 del subclass_instance 431 432 # Test that setting __class__ modified the reference counts of the types 433 self.assertEqual(type_refcnt - 1, B.refcnt_in_del) 434 self.assertEqual(new_type_refcnt + 1, A.refcnt_in_del) 435 436 # Test that the original type already has decreased its refcnt 437 self.assertEqual(type_refcnt - 1, sys.getrefcount(B)) 438 439 # Test that subtype_dealloc decref the newly assigned __class__ only once 440 self.assertEqual(new_type_refcnt, sys.getrefcount(A)) 441 442 def test_c_subclass_of_heap_ctype_with_tpdealloc_decrefs_once(self): 443 subclass_instance = _testcapi.HeapCTypeSubclass() 444 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 445 446 # Test that subclass instance was fully created 447 self.assertEqual(subclass_instance.value, 10) 448 self.assertEqual(subclass_instance.value2, 20) 449 450 # Test that the type reference count is only decremented once 451 del subclass_instance 452 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 453 454 def test_c_subclass_of_heap_ctype_with_del_modifying_dunder_class_only_decrefs_once(self): 455 subclass_instance = _testcapi.HeapCTypeSubclassWithFinalizer() 456 type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer) 457 new_type_refcnt = sys.getrefcount(_testcapi.HeapCTypeSubclass) 458 459 # Test that subclass instance was fully created 460 self.assertEqual(subclass_instance.value, 10) 461 self.assertEqual(subclass_instance.value2, 20) 462 463 # The tp_finalize slot will set __class__ to HeapCTypeSubclass 464 del subclass_instance 465 466 # Test that setting __class__ modified the reference counts of the types 467 self.assertEqual(type_refcnt - 1, _testcapi.HeapCTypeSubclassWithFinalizer.refcnt_in_del) 468 self.assertEqual(new_type_refcnt + 1, _testcapi.HeapCTypeSubclass.refcnt_in_del) 469 470 # Test that the original type already has decreased its refcnt 471 self.assertEqual(type_refcnt - 1, sys.getrefcount(_testcapi.HeapCTypeSubclassWithFinalizer)) 472 473 # Test that subtype_dealloc decref the newly assigned __class__ only once 474 self.assertEqual(new_type_refcnt, sys.getrefcount(_testcapi.HeapCTypeSubclass)) 475 476 def test_heaptype_with_setattro(self): 477 obj = _testcapi.HeapCTypeSetattr() 478 self.assertEqual(obj.pvalue, 10) 479 obj.value = 12 480 self.assertEqual(obj.pvalue, 12) 481 del obj.value 482 self.assertEqual(obj.pvalue, 0) 483 484 def test_pynumber_tobase(self): 485 from _testcapi import pynumber_tobase 486 self.assertEqual(pynumber_tobase(123, 2), '0b1111011') 487 self.assertEqual(pynumber_tobase(123, 8), '0o173') 488 self.assertEqual(pynumber_tobase(123, 10), '123') 489 self.assertEqual(pynumber_tobase(123, 16), '0x7b') 490 self.assertEqual(pynumber_tobase(-123, 2), '-0b1111011') 491 self.assertEqual(pynumber_tobase(-123, 8), '-0o173') 492 self.assertEqual(pynumber_tobase(-123, 10), '-123') 493 self.assertEqual(pynumber_tobase(-123, 16), '-0x7b') 494 self.assertRaises(TypeError, pynumber_tobase, 123.0, 10) 495 self.assertRaises(TypeError, pynumber_tobase, '123', 10) 496 self.assertRaises(SystemError, pynumber_tobase, 123, 0) 497 498 499class TestPendingCalls(unittest.TestCase): 500 501 def pendingcalls_submit(self, l, n): 502 def callback(): 503 #this function can be interrupted by thread switching so let's 504 #use an atomic operation 505 l.append(None) 506 507 for i in range(n): 508 time.sleep(random.random()*0.02) #0.01 secs on average 509 #try submitting callback until successful. 510 #rely on regular interrupt to flush queue if we are 511 #unsuccessful. 512 while True: 513 if _testcapi._pending_threadfunc(callback): 514 break; 515 516 def pendingcalls_wait(self, l, n, context = None): 517 #now, stick around until l[0] has grown to 10 518 count = 0; 519 while len(l) != n: 520 #this busy loop is where we expect to be interrupted to 521 #run our callbacks. Note that callbacks are only run on the 522 #main thread 523 if False and support.verbose: 524 print("(%i)"%(len(l),),) 525 for i in range(1000): 526 a = i*i 527 if context and not context.event.is_set(): 528 continue 529 count += 1 530 self.assertTrue(count < 10000, 531 "timeout waiting for %i callbacks, got %i"%(n, len(l))) 532 if False and support.verbose: 533 print("(%i)"%(len(l),)) 534 535 def test_pendingcalls_threaded(self): 536 537 #do every callback on a separate thread 538 n = 32 #total callbacks 539 threads = [] 540 class foo(object):pass 541 context = foo() 542 context.l = [] 543 context.n = 2 #submits per thread 544 context.nThreads = n // context.n 545 context.nFinished = 0 546 context.lock = threading.Lock() 547 context.event = threading.Event() 548 549 threads = [threading.Thread(target=self.pendingcalls_thread, 550 args=(context,)) 551 for i in range(context.nThreads)] 552 with support.start_threads(threads): 553 self.pendingcalls_wait(context.l, n, context) 554 555 def pendingcalls_thread(self, context): 556 try: 557 self.pendingcalls_submit(context.l, context.n) 558 finally: 559 with context.lock: 560 context.nFinished += 1 561 nFinished = context.nFinished 562 if False and support.verbose: 563 print("finished threads: ", nFinished) 564 if nFinished == context.nThreads: 565 context.event.set() 566 567 def test_pendingcalls_non_threaded(self): 568 #again, just using the main thread, likely they will all be dispatched at 569 #once. It is ok to ask for too many, because we loop until we find a slot. 570 #the loop can be interrupted to dispatch. 571 #there are only 32 dispatch slots, so we go for twice that! 572 l = [] 573 n = 64 574 self.pendingcalls_submit(l, n) 575 self.pendingcalls_wait(l, n) 576 577 578class SubinterpreterTest(unittest.TestCase): 579 580 def test_subinterps(self): 581 import builtins 582 r, w = os.pipe() 583 code = """if 1: 584 import sys, builtins, pickle 585 with open({:d}, "wb") as f: 586 pickle.dump(id(sys.modules), f) 587 pickle.dump(id(builtins), f) 588 """.format(w) 589 with open(r, "rb") as f: 590 ret = support.run_in_subinterp(code) 591 self.assertEqual(ret, 0) 592 self.assertNotEqual(pickle.load(f), id(sys.modules)) 593 self.assertNotEqual(pickle.load(f), id(builtins)) 594 595 def test_subinterps_recent_language_features(self): 596 r, w = os.pipe() 597 code = """if 1: 598 import pickle 599 with open({:d}, "wb") as f: 600 601 def noop(x): return x 602 603 a = (b := f'1{{2}}3') + noop('x') # Py 3.8 (:=) / 3.6 (f'') 604 605 async def foo(arg): return await arg # Py 3.5 606 607 pickle.dump(dict(a=a, b=b), f) 608 """.format(w) 609 610 with open(r, "rb") as f: 611 ret = support.run_in_subinterp(code) 612 self.assertEqual(ret, 0) 613 self.assertEqual(pickle.load(f), {'a': '123x', 'b': '123'}) 614 615 def test_mutate_exception(self): 616 """ 617 Exceptions saved in global module state get shared between 618 individual module instances. This test checks whether or not 619 a change in one interpreter's module gets reflected into the 620 other ones. 621 """ 622 import binascii 623 624 support.run_in_subinterp("import binascii; binascii.Error.foobar = 'foobar'") 625 626 self.assertFalse(hasattr(binascii.Error, "foobar")) 627 628 629class TestThreadState(unittest.TestCase): 630 631 @support.reap_threads 632 def test_thread_state(self): 633 # some extra thread-state tests driven via _testcapi 634 def target(): 635 idents = [] 636 637 def callback(): 638 idents.append(threading.get_ident()) 639 640 _testcapi._test_thread_state(callback) 641 a = b = callback 642 time.sleep(1) 643 # Check our main thread is in the list exactly 3 times. 644 self.assertEqual(idents.count(threading.get_ident()), 3, 645 "Couldn't find main thread correctly in the list") 646 647 target() 648 t = threading.Thread(target=target) 649 t.start() 650 t.join() 651 652 653class Test_testcapi(unittest.TestCase): 654 locals().update((name, getattr(_testcapi, name)) 655 for name in dir(_testcapi) 656 if name.startswith('test_') and not name.endswith('_code')) 657 658 659class PyMemDebugTests(unittest.TestCase): 660 PYTHONMALLOC = 'debug' 661 # '0x04c06e0' or '04C06E0' 662 PTR_REGEX = r'(?:0x)?[0-9a-fA-F]+' 663 664 def check(self, code): 665 with support.SuppressCrashReport(): 666 out = assert_python_failure('-c', code, 667 PYTHONMALLOC=self.PYTHONMALLOC) 668 stderr = out.err 669 return stderr.decode('ascii', 'replace') 670 671 def test_buffer_overflow(self): 672 out = self.check('import _testcapi; _testcapi.pymem_buffer_overflow()') 673 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 674 r" 16 bytes originally requested\n" 675 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 676 r" The [0-9] pad bytes at tail={ptr} are not all FORBIDDENBYTE \(0x[0-9a-f]{{2}}\):\n" 677 r" at tail\+0: 0x78 \*\*\* OUCH\n" 678 r" at tail\+1: 0xfd\n" 679 r" at tail\+2: 0xfd\n" 680 r" .*\n" 681 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 682 r" Data at p: cd cd cd .*\n" 683 r"\n" 684 r"Enable tracemalloc to get the memory block allocation traceback\n" 685 r"\n" 686 r"Fatal Python error: bad trailing pad byte") 687 regex = regex.format(ptr=self.PTR_REGEX) 688 regex = re.compile(regex, flags=re.DOTALL) 689 self.assertRegex(out, regex) 690 691 def test_api_misuse(self): 692 out = self.check('import _testcapi; _testcapi.pymem_api_misuse()') 693 regex = (r"Debug memory block at address p={ptr}: API 'm'\n" 694 r" 16 bytes originally requested\n" 695 r" The [0-9] pad bytes at p-[0-9] are FORBIDDENBYTE, as expected.\n" 696 r" The [0-9] pad bytes at tail={ptr} are FORBIDDENBYTE, as expected.\n" 697 r"( The block was made by call #[0-9]+ to debug malloc/realloc.\n)?" 698 r" Data at p: cd cd cd .*\n" 699 r"\n" 700 r"Enable tracemalloc to get the memory block allocation traceback\n" 701 r"\n" 702 r"Fatal Python error: bad ID: Allocated using API 'm', verified using API 'r'\n") 703 regex = regex.format(ptr=self.PTR_REGEX) 704 self.assertRegex(out, regex) 705 706 def check_malloc_without_gil(self, code): 707 out = self.check(code) 708 expected = ('Fatal Python error: Python memory allocator called ' 709 'without holding the GIL') 710 self.assertIn(expected, out) 711 712 def test_pymem_malloc_without_gil(self): 713 # Debug hooks must raise an error if PyMem_Malloc() is called 714 # without holding the GIL 715 code = 'import _testcapi; _testcapi.pymem_malloc_without_gil()' 716 self.check_malloc_without_gil(code) 717 718 def test_pyobject_malloc_without_gil(self): 719 # Debug hooks must raise an error if PyObject_Malloc() is called 720 # without holding the GIL 721 code = 'import _testcapi; _testcapi.pyobject_malloc_without_gil()' 722 self.check_malloc_without_gil(code) 723 724 def check_pyobject_is_freed(self, func_name): 725 code = textwrap.dedent(f''' 726 import gc, os, sys, _testcapi 727 # Disable the GC to avoid crash on GC collection 728 gc.disable() 729 try: 730 _testcapi.{func_name}() 731 # Exit immediately to avoid a crash while deallocating 732 # the invalid object 733 os._exit(0) 734 except _testcapi.error: 735 os._exit(1) 736 ''') 737 assert_python_ok('-c', code, PYTHONMALLOC=self.PYTHONMALLOC) 738 739 def test_pyobject_null_is_freed(self): 740 self.check_pyobject_is_freed('check_pyobject_null_is_freed') 741 742 def test_pyobject_uninitialized_is_freed(self): 743 self.check_pyobject_is_freed('check_pyobject_uninitialized_is_freed') 744 745 def test_pyobject_forbidden_bytes_is_freed(self): 746 self.check_pyobject_is_freed('check_pyobject_forbidden_bytes_is_freed') 747 748 def test_pyobject_freed_is_freed(self): 749 self.check_pyobject_is_freed('check_pyobject_freed_is_freed') 750 751 752class PyMemMallocDebugTests(PyMemDebugTests): 753 PYTHONMALLOC = 'malloc_debug' 754 755 756@unittest.skipUnless(support.with_pymalloc(), 'need pymalloc') 757class PyMemPymallocDebugTests(PyMemDebugTests): 758 PYTHONMALLOC = 'pymalloc_debug' 759 760 761@unittest.skipUnless(Py_DEBUG, 'need Py_DEBUG') 762class PyMemDefaultTests(PyMemDebugTests): 763 # test default allocator of Python compiled in debug mode 764 PYTHONMALLOC = '' 765 766 767if __name__ == "__main__": 768 unittest.main() 769