1""" 2Tests for the threading module. 3""" 4 5import test.support 6from test.support import (verbose, import_module, cpython_only, 7 requires_type_collecting) 8from test.support.script_helper import assert_python_ok, assert_python_failure 9 10import random 11import sys 12import _thread 13import threading 14import time 15import unittest 16import weakref 17import os 18import subprocess 19import signal 20 21from test import lock_tests 22from test import support 23 24 25# Between fork() and exec(), only async-safe functions are allowed (issues 26# #12316 and #11870), and fork() from a worker thread is known to trigger 27# problems with some operating systems (issue #3863): skip problematic tests 28# on platforms known to behave badly. 29platforms_to_skip = ('netbsd5', 'hp-ux11') 30 31 32# A trivial mutable counter. 33class Counter(object): 34 def __init__(self): 35 self.value = 0 36 def inc(self): 37 self.value += 1 38 def dec(self): 39 self.value -= 1 40 def get(self): 41 return self.value 42 43class TestThread(threading.Thread): 44 def __init__(self, name, testcase, sema, mutex, nrunning): 45 threading.Thread.__init__(self, name=name) 46 self.testcase = testcase 47 self.sema = sema 48 self.mutex = mutex 49 self.nrunning = nrunning 50 51 def run(self): 52 delay = random.random() / 10000.0 53 if verbose: 54 print('task %s will run for %.1f usec' % 55 (self.name, delay * 1e6)) 56 57 with self.sema: 58 with self.mutex: 59 self.nrunning.inc() 60 if verbose: 61 print(self.nrunning.get(), 'tasks are running') 62 self.testcase.assertLessEqual(self.nrunning.get(), 3) 63 64 time.sleep(delay) 65 if verbose: 66 print('task', self.name, 'done') 67 68 with self.mutex: 69 self.nrunning.dec() 70 self.testcase.assertGreaterEqual(self.nrunning.get(), 0) 71 if verbose: 72 print('%s is finished. %d tasks are running' % 73 (self.name, self.nrunning.get())) 74 75 76class BaseTestCase(unittest.TestCase): 77 def setUp(self): 78 self._threads = test.support.threading_setup() 79 80 def tearDown(self): 81 test.support.threading_cleanup(*self._threads) 82 test.support.reap_children() 83 84 85class ThreadTests(BaseTestCase): 86 87 # Create a bunch of threads, let each do some work, wait until all are 88 # done. 89 def test_various_ops(self): 90 # This takes about n/3 seconds to run (about n/3 clumps of tasks, 91 # times about 1 second per clump). 92 NUMTASKS = 10 93 94 # no more than 3 of the 10 can run at once 95 sema = threading.BoundedSemaphore(value=3) 96 mutex = threading.RLock() 97 numrunning = Counter() 98 99 threads = [] 100 101 for i in range(NUMTASKS): 102 t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning) 103 threads.append(t) 104 self.assertIsNone(t.ident) 105 self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$') 106 t.start() 107 108 if verbose: 109 print('waiting for all tasks to complete') 110 for t in threads: 111 t.join() 112 self.assertFalse(t.is_alive()) 113 self.assertNotEqual(t.ident, 0) 114 self.assertIsNotNone(t.ident) 115 self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$') 116 if verbose: 117 print('all tasks done') 118 self.assertEqual(numrunning.get(), 0) 119 120 def test_ident_of_no_threading_threads(self): 121 # The ident still must work for the main thread and dummy threads. 122 self.assertIsNotNone(threading.currentThread().ident) 123 def f(): 124 ident.append(threading.currentThread().ident) 125 done.set() 126 done = threading.Event() 127 ident = [] 128 with support.wait_threads_exit(): 129 tid = _thread.start_new_thread(f, ()) 130 done.wait() 131 self.assertEqual(ident[0], tid) 132 # Kill the "immortal" _DummyThread 133 del threading._active[ident[0]] 134 135 # run with a small(ish) thread stack size (256 KiB) 136 def test_various_ops_small_stack(self): 137 if verbose: 138 print('with 256 KiB thread stack size...') 139 try: 140 threading.stack_size(262144) 141 except _thread.error: 142 raise unittest.SkipTest( 143 'platform does not support changing thread stack size') 144 self.test_various_ops() 145 threading.stack_size(0) 146 147 # run with a large thread stack size (1 MiB) 148 def test_various_ops_large_stack(self): 149 if verbose: 150 print('with 1 MiB thread stack size...') 151 try: 152 threading.stack_size(0x100000) 153 except _thread.error: 154 raise unittest.SkipTest( 155 'platform does not support changing thread stack size') 156 self.test_various_ops() 157 threading.stack_size(0) 158 159 def test_foreign_thread(self): 160 # Check that a "foreign" thread can use the threading module. 161 def f(mutex): 162 # Calling current_thread() forces an entry for the foreign 163 # thread to get made in the threading._active map. 164 threading.current_thread() 165 mutex.release() 166 167 mutex = threading.Lock() 168 mutex.acquire() 169 with support.wait_threads_exit(): 170 tid = _thread.start_new_thread(f, (mutex,)) 171 # Wait for the thread to finish. 172 mutex.acquire() 173 self.assertIn(tid, threading._active) 174 self.assertIsInstance(threading._active[tid], threading._DummyThread) 175 #Issue 29376 176 self.assertTrue(threading._active[tid].is_alive()) 177 self.assertRegex(repr(threading._active[tid]), '_DummyThread') 178 del threading._active[tid] 179 180 # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently) 181 # exposed at the Python level. This test relies on ctypes to get at it. 182 def test_PyThreadState_SetAsyncExc(self): 183 ctypes = import_module("ctypes") 184 185 set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc 186 set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object) 187 188 class AsyncExc(Exception): 189 pass 190 191 exception = ctypes.py_object(AsyncExc) 192 193 # First check it works when setting the exception from the same thread. 194 tid = threading.get_ident() 195 self.assertIsInstance(tid, int) 196 self.assertGreater(tid, 0) 197 198 try: 199 result = set_async_exc(tid, exception) 200 # The exception is async, so we might have to keep the VM busy until 201 # it notices. 202 while True: 203 pass 204 except AsyncExc: 205 pass 206 else: 207 # This code is unreachable but it reflects the intent. If we wanted 208 # to be smarter the above loop wouldn't be infinite. 209 self.fail("AsyncExc not raised") 210 try: 211 self.assertEqual(result, 1) # one thread state modified 212 except UnboundLocalError: 213 # The exception was raised too quickly for us to get the result. 214 pass 215 216 # `worker_started` is set by the thread when it's inside a try/except 217 # block waiting to catch the asynchronously set AsyncExc exception. 218 # `worker_saw_exception` is set by the thread upon catching that 219 # exception. 220 worker_started = threading.Event() 221 worker_saw_exception = threading.Event() 222 223 class Worker(threading.Thread): 224 def run(self): 225 self.id = threading.get_ident() 226 self.finished = False 227 228 try: 229 while True: 230 worker_started.set() 231 time.sleep(0.1) 232 except AsyncExc: 233 self.finished = True 234 worker_saw_exception.set() 235 236 t = Worker() 237 t.daemon = True # so if this fails, we don't hang Python at shutdown 238 t.start() 239 if verbose: 240 print(" started worker thread") 241 242 # Try a thread id that doesn't make sense. 243 if verbose: 244 print(" trying nonsensical thread id") 245 result = set_async_exc(-1, exception) 246 self.assertEqual(result, 0) # no thread states modified 247 248 # Now raise an exception in the worker thread. 249 if verbose: 250 print(" waiting for worker thread to get started") 251 ret = worker_started.wait() 252 self.assertTrue(ret) 253 if verbose: 254 print(" verifying worker hasn't exited") 255 self.assertFalse(t.finished) 256 if verbose: 257 print(" attempting to raise asynch exception in worker") 258 result = set_async_exc(t.id, exception) 259 self.assertEqual(result, 1) # one thread state modified 260 if verbose: 261 print(" waiting for worker to say it caught the exception") 262 worker_saw_exception.wait(timeout=10) 263 self.assertTrue(t.finished) 264 if verbose: 265 print(" all OK -- joining worker") 266 if t.finished: 267 t.join() 268 # else the thread is still running, and we have no way to kill it 269 270 def test_limbo_cleanup(self): 271 # Issue 7481: Failure to start thread should cleanup the limbo map. 272 def fail_new_thread(*args): 273 raise threading.ThreadError() 274 _start_new_thread = threading._start_new_thread 275 threading._start_new_thread = fail_new_thread 276 try: 277 t = threading.Thread(target=lambda: None) 278 self.assertRaises(threading.ThreadError, t.start) 279 self.assertFalse( 280 t in threading._limbo, 281 "Failed to cleanup _limbo map on failure of Thread.start().") 282 finally: 283 threading._start_new_thread = _start_new_thread 284 285 def test_finalize_runnning_thread(self): 286 # Issue 1402: the PyGILState_Ensure / _Release functions may be called 287 # very late on python exit: on deallocation of a running thread for 288 # example. 289 import_module("ctypes") 290 291 rc, out, err = assert_python_failure("-c", """if 1: 292 import ctypes, sys, time, _thread 293 294 # This lock is used as a simple event variable. 295 ready = _thread.allocate_lock() 296 ready.acquire() 297 298 # Module globals are cleared before __del__ is run 299 # So we save the functions in class dict 300 class C: 301 ensure = ctypes.pythonapi.PyGILState_Ensure 302 release = ctypes.pythonapi.PyGILState_Release 303 def __del__(self): 304 state = self.ensure() 305 self.release(state) 306 307 def waitingThread(): 308 x = C() 309 ready.release() 310 time.sleep(100) 311 312 _thread.start_new_thread(waitingThread, ()) 313 ready.acquire() # Be sure the other thread is waiting. 314 sys.exit(42) 315 """) 316 self.assertEqual(rc, 42) 317 318 def test_finalize_with_trace(self): 319 # Issue1733757 320 # Avoid a deadlock when sys.settrace steps into threading._shutdown 321 assert_python_ok("-c", """if 1: 322 import sys, threading 323 324 # A deadlock-killer, to prevent the 325 # testsuite to hang forever 326 def killer(): 327 import os, time 328 time.sleep(2) 329 print('program blocked; aborting') 330 os._exit(2) 331 t = threading.Thread(target=killer) 332 t.daemon = True 333 t.start() 334 335 # This is the trace function 336 def func(frame, event, arg): 337 threading.current_thread() 338 return func 339 340 sys.settrace(func) 341 """) 342 343 def test_join_nondaemon_on_shutdown(self): 344 # Issue 1722344 345 # Raising SystemExit skipped threading._shutdown 346 rc, out, err = assert_python_ok("-c", """if 1: 347 import threading 348 from time import sleep 349 350 def child(): 351 sleep(1) 352 # As a non-daemon thread we SHOULD wake up and nothing 353 # should be torn down yet 354 print("Woke up, sleep function is:", sleep) 355 356 threading.Thread(target=child).start() 357 raise SystemExit 358 """) 359 self.assertEqual(out.strip(), 360 b"Woke up, sleep function is: <built-in function sleep>") 361 self.assertEqual(err, b"") 362 363 def test_enumerate_after_join(self): 364 # Try hard to trigger #1703448: a thread is still returned in 365 # threading.enumerate() after it has been join()ed. 366 enum = threading.enumerate 367 old_interval = sys.getswitchinterval() 368 try: 369 for i in range(1, 100): 370 sys.setswitchinterval(i * 0.0002) 371 t = threading.Thread(target=lambda: None) 372 t.start() 373 t.join() 374 l = enum() 375 self.assertNotIn(t, l, 376 "#1703448 triggered after %d trials: %s" % (i, l)) 377 finally: 378 sys.setswitchinterval(old_interval) 379 380 def test_no_refcycle_through_target(self): 381 class RunSelfFunction(object): 382 def __init__(self, should_raise): 383 # The links in this refcycle from Thread back to self 384 # should be cleaned up when the thread completes. 385 self.should_raise = should_raise 386 self.thread = threading.Thread(target=self._run, 387 args=(self,), 388 kwargs={'yet_another':self}) 389 self.thread.start() 390 391 def _run(self, other_ref, yet_another): 392 if self.should_raise: 393 raise SystemExit 394 395 cyclic_object = RunSelfFunction(should_raise=False) 396 weak_cyclic_object = weakref.ref(cyclic_object) 397 cyclic_object.thread.join() 398 del cyclic_object 399 self.assertIsNone(weak_cyclic_object(), 400 msg=('%d references still around' % 401 sys.getrefcount(weak_cyclic_object()))) 402 403 raising_cyclic_object = RunSelfFunction(should_raise=True) 404 weak_raising_cyclic_object = weakref.ref(raising_cyclic_object) 405 raising_cyclic_object.thread.join() 406 del raising_cyclic_object 407 self.assertIsNone(weak_raising_cyclic_object(), 408 msg=('%d references still around' % 409 sys.getrefcount(weak_raising_cyclic_object()))) 410 411 def test_old_threading_api(self): 412 # Just a quick sanity check to make sure the old method names are 413 # still present 414 t = threading.Thread() 415 t.isDaemon() 416 t.setDaemon(True) 417 t.getName() 418 t.setName("name") 419 with self.assertWarnsRegex(PendingDeprecationWarning, 'use is_alive()'): 420 t.isAlive() 421 e = threading.Event() 422 e.isSet() 423 threading.activeCount() 424 425 def test_repr_daemon(self): 426 t = threading.Thread() 427 self.assertNotIn('daemon', repr(t)) 428 t.daemon = True 429 self.assertIn('daemon', repr(t)) 430 431 def test_daemon_param(self): 432 t = threading.Thread() 433 self.assertFalse(t.daemon) 434 t = threading.Thread(daemon=False) 435 self.assertFalse(t.daemon) 436 t = threading.Thread(daemon=True) 437 self.assertTrue(t.daemon) 438 439 @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()') 440 def test_dummy_thread_after_fork(self): 441 # Issue #14308: a dummy thread in the active list doesn't mess up 442 # the after-fork mechanism. 443 code = """if 1: 444 import _thread, threading, os, time 445 446 def background_thread(evt): 447 # Creates and registers the _DummyThread instance 448 threading.current_thread() 449 evt.set() 450 time.sleep(10) 451 452 evt = threading.Event() 453 _thread.start_new_thread(background_thread, (evt,)) 454 evt.wait() 455 assert threading.active_count() == 2, threading.active_count() 456 if os.fork() == 0: 457 assert threading.active_count() == 1, threading.active_count() 458 os._exit(0) 459 else: 460 os.wait() 461 """ 462 _, out, err = assert_python_ok("-c", code) 463 self.assertEqual(out, b'') 464 self.assertEqual(err, b'') 465 466 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 467 def test_is_alive_after_fork(self): 468 # Try hard to trigger #18418: is_alive() could sometimes be True on 469 # threads that vanished after a fork. 470 old_interval = sys.getswitchinterval() 471 self.addCleanup(sys.setswitchinterval, old_interval) 472 473 # Make the bug more likely to manifest. 474 test.support.setswitchinterval(1e-6) 475 476 for i in range(20): 477 t = threading.Thread(target=lambda: None) 478 t.start() 479 pid = os.fork() 480 if pid == 0: 481 os._exit(11 if t.is_alive() else 10) 482 else: 483 t.join() 484 485 pid, status = os.waitpid(pid, 0) 486 self.assertTrue(os.WIFEXITED(status)) 487 self.assertEqual(10, os.WEXITSTATUS(status)) 488 489 def test_main_thread(self): 490 main = threading.main_thread() 491 self.assertEqual(main.name, 'MainThread') 492 self.assertEqual(main.ident, threading.current_thread().ident) 493 self.assertEqual(main.ident, threading.get_ident()) 494 495 def f(): 496 self.assertNotEqual(threading.main_thread().ident, 497 threading.current_thread().ident) 498 th = threading.Thread(target=f) 499 th.start() 500 th.join() 501 502 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 503 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 504 def test_main_thread_after_fork(self): 505 code = """if 1: 506 import os, threading 507 508 pid = os.fork() 509 if pid == 0: 510 main = threading.main_thread() 511 print(main.name) 512 print(main.ident == threading.current_thread().ident) 513 print(main.ident == threading.get_ident()) 514 else: 515 os.waitpid(pid, 0) 516 """ 517 _, out, err = assert_python_ok("-c", code) 518 data = out.decode().replace('\r', '') 519 self.assertEqual(err, b"") 520 self.assertEqual(data, "MainThread\nTrue\nTrue\n") 521 522 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 523 @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()") 524 @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()") 525 def test_main_thread_after_fork_from_nonmain_thread(self): 526 code = """if 1: 527 import os, threading, sys 528 529 def f(): 530 pid = os.fork() 531 if pid == 0: 532 main = threading.main_thread() 533 print(main.name) 534 print(main.ident == threading.current_thread().ident) 535 print(main.ident == threading.get_ident()) 536 # stdout is fully buffered because not a tty, 537 # we have to flush before exit. 538 sys.stdout.flush() 539 else: 540 os.waitpid(pid, 0) 541 542 th = threading.Thread(target=f) 543 th.start() 544 th.join() 545 """ 546 _, out, err = assert_python_ok("-c", code) 547 data = out.decode().replace('\r', '') 548 self.assertEqual(err, b"") 549 self.assertEqual(data, "Thread-1\nTrue\nTrue\n") 550 551 @requires_type_collecting 552 def test_main_thread_during_shutdown(self): 553 # bpo-31516: current_thread() should still point to the main thread 554 # at shutdown 555 code = """if 1: 556 import gc, threading 557 558 main_thread = threading.current_thread() 559 assert main_thread is threading.main_thread() # sanity check 560 561 class RefCycle: 562 def __init__(self): 563 self.cycle = self 564 565 def __del__(self): 566 print("GC:", 567 threading.current_thread() is main_thread, 568 threading.main_thread() is main_thread, 569 threading.enumerate() == [main_thread]) 570 571 RefCycle() 572 gc.collect() # sanity check 573 x = RefCycle() 574 """ 575 _, out, err = assert_python_ok("-c", code) 576 data = out.decode() 577 self.assertEqual(err, b"") 578 self.assertEqual(data.splitlines(), 579 ["GC: True True True"] * 2) 580 581 def test_finalization_shutdown(self): 582 # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait 583 # until Python thread states of all non-daemon threads get deleted. 584 # 585 # Test similar to SubinterpThreadingTests.test_threads_join_2(), but 586 # test the finalization of the main interpreter. 587 code = """if 1: 588 import os 589 import threading 590 import time 591 import random 592 593 def random_sleep(): 594 seconds = random.random() * 0.010 595 time.sleep(seconds) 596 597 class Sleeper: 598 def __del__(self): 599 random_sleep() 600 601 tls = threading.local() 602 603 def f(): 604 # Sleep a bit so that the thread is still running when 605 # Py_Finalize() is called. 606 random_sleep() 607 tls.x = Sleeper() 608 random_sleep() 609 610 threading.Thread(target=f).start() 611 random_sleep() 612 """ 613 rc, out, err = assert_python_ok("-c", code) 614 self.assertEqual(err, b"") 615 616 def test_tstate_lock(self): 617 # Test an implementation detail of Thread objects. 618 started = _thread.allocate_lock() 619 finish = _thread.allocate_lock() 620 started.acquire() 621 finish.acquire() 622 def f(): 623 started.release() 624 finish.acquire() 625 time.sleep(0.01) 626 # The tstate lock is None until the thread is started 627 t = threading.Thread(target=f) 628 self.assertIs(t._tstate_lock, None) 629 t.start() 630 started.acquire() 631 self.assertTrue(t.is_alive()) 632 # The tstate lock can't be acquired when the thread is running 633 # (or suspended). 634 tstate_lock = t._tstate_lock 635 self.assertFalse(tstate_lock.acquire(timeout=0), False) 636 finish.release() 637 # When the thread ends, the state_lock can be successfully 638 # acquired. 639 self.assertTrue(tstate_lock.acquire(timeout=5), False) 640 # But is_alive() is still True: we hold _tstate_lock now, which 641 # prevents is_alive() from knowing the thread's end-of-life C code 642 # is done. 643 self.assertTrue(t.is_alive()) 644 # Let is_alive() find out the C code is done. 645 tstate_lock.release() 646 self.assertFalse(t.is_alive()) 647 # And verify the thread disposed of _tstate_lock. 648 self.assertIsNone(t._tstate_lock) 649 t.join() 650 651 def test_repr_stopped(self): 652 # Verify that "stopped" shows up in repr(Thread) appropriately. 653 started = _thread.allocate_lock() 654 finish = _thread.allocate_lock() 655 started.acquire() 656 finish.acquire() 657 def f(): 658 started.release() 659 finish.acquire() 660 t = threading.Thread(target=f) 661 t.start() 662 started.acquire() 663 self.assertIn("started", repr(t)) 664 finish.release() 665 # "stopped" should appear in the repr in a reasonable amount of time. 666 # Implementation detail: as of this writing, that's trivially true 667 # if .join() is called, and almost trivially true if .is_alive() is 668 # called. The detail we're testing here is that "stopped" shows up 669 # "all on its own". 670 LOOKING_FOR = "stopped" 671 for i in range(500): 672 if LOOKING_FOR in repr(t): 673 break 674 time.sleep(0.01) 675 self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds 676 t.join() 677 678 def test_BoundedSemaphore_limit(self): 679 # BoundedSemaphore should raise ValueError if released too often. 680 for limit in range(1, 10): 681 bs = threading.BoundedSemaphore(limit) 682 threads = [threading.Thread(target=bs.acquire) 683 for _ in range(limit)] 684 for t in threads: 685 t.start() 686 for t in threads: 687 t.join() 688 threads = [threading.Thread(target=bs.release) 689 for _ in range(limit)] 690 for t in threads: 691 t.start() 692 for t in threads: 693 t.join() 694 self.assertRaises(ValueError, bs.release) 695 696 @cpython_only 697 def test_frame_tstate_tracing(self): 698 # Issue #14432: Crash when a generator is created in a C thread that is 699 # destroyed while the generator is still used. The issue was that a 700 # generator contains a frame, and the frame kept a reference to the 701 # Python state of the destroyed C thread. The crash occurs when a trace 702 # function is setup. 703 704 def noop_trace(frame, event, arg): 705 # no operation 706 return noop_trace 707 708 def generator(): 709 while 1: 710 yield "generator" 711 712 def callback(): 713 if callback.gen is None: 714 callback.gen = generator() 715 return next(callback.gen) 716 callback.gen = None 717 718 old_trace = sys.gettrace() 719 sys.settrace(noop_trace) 720 try: 721 # Install a trace function 722 threading.settrace(noop_trace) 723 724 # Create a generator in a C thread which exits after the call 725 import _testcapi 726 _testcapi.call_in_temporary_c_thread(callback) 727 728 # Call the generator in a different Python thread, check that the 729 # generator didn't keep a reference to the destroyed thread state 730 for test in range(3): 731 # The trace function is still called here 732 callback() 733 finally: 734 sys.settrace(old_trace) 735 736 @cpython_only 737 def test_shutdown_locks(self): 738 for daemon in (False, True): 739 with self.subTest(daemon=daemon): 740 event = threading.Event() 741 thread = threading.Thread(target=event.wait, daemon=daemon) 742 743 # Thread.start() must add lock to _shutdown_locks, 744 # but only for non-daemon thread 745 thread.start() 746 tstate_lock = thread._tstate_lock 747 if not daemon: 748 self.assertIn(tstate_lock, threading._shutdown_locks) 749 else: 750 self.assertNotIn(tstate_lock, threading._shutdown_locks) 751 752 # unblock the thread and join it 753 event.set() 754 thread.join() 755 756 # Thread._stop() must remove tstate_lock from _shutdown_locks. 757 # Daemon threads must never add it to _shutdown_locks. 758 self.assertNotIn(tstate_lock, threading._shutdown_locks) 759 760 761class ThreadJoinOnShutdown(BaseTestCase): 762 763 def _run_and_join(self, script): 764 script = """if 1: 765 import sys, os, time, threading 766 767 # a thread, which waits for the main program to terminate 768 def joiningfunc(mainthread): 769 mainthread.join() 770 print('end of thread') 771 # stdout is fully buffered because not a tty, we have to flush 772 # before exit. 773 sys.stdout.flush() 774 \n""" + script 775 776 rc, out, err = assert_python_ok("-c", script) 777 data = out.decode().replace('\r', '') 778 self.assertEqual(data, "end of main\nend of thread\n") 779 780 def test_1_join_on_shutdown(self): 781 # The usual case: on exit, wait for a non-daemon thread 782 script = """if 1: 783 import os 784 t = threading.Thread(target=joiningfunc, 785 args=(threading.current_thread(),)) 786 t.start() 787 time.sleep(0.1) 788 print('end of main') 789 """ 790 self._run_and_join(script) 791 792 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 793 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 794 def test_2_join_in_forked_process(self): 795 # Like the test above, but from a forked interpreter 796 script = """if 1: 797 childpid = os.fork() 798 if childpid != 0: 799 os.waitpid(childpid, 0) 800 sys.exit(0) 801 802 t = threading.Thread(target=joiningfunc, 803 args=(threading.current_thread(),)) 804 t.start() 805 print('end of main') 806 """ 807 self._run_and_join(script) 808 809 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 810 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 811 def test_3_join_in_forked_from_thread(self): 812 # Like the test above, but fork() was called from a worker thread 813 # In the forked process, the main Thread object must be marked as stopped. 814 815 script = """if 1: 816 main_thread = threading.current_thread() 817 def worker(): 818 childpid = os.fork() 819 if childpid != 0: 820 os.waitpid(childpid, 0) 821 sys.exit(0) 822 823 t = threading.Thread(target=joiningfunc, 824 args=(main_thread,)) 825 print('end of main') 826 t.start() 827 t.join() # Should not block: main_thread is already stopped 828 829 w = threading.Thread(target=worker) 830 w.start() 831 """ 832 self._run_and_join(script) 833 834 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 835 def test_4_daemon_threads(self): 836 # Check that a daemon thread cannot crash the interpreter on shutdown 837 # by manipulating internal structures that are being disposed of in 838 # the main thread. 839 script = """if True: 840 import os 841 import random 842 import sys 843 import time 844 import threading 845 846 thread_has_run = set() 847 848 def random_io(): 849 '''Loop for a while sleeping random tiny amounts and doing some I/O.''' 850 while True: 851 in_f = open(os.__file__, 'rb') 852 stuff = in_f.read(200) 853 null_f = open(os.devnull, 'wb') 854 null_f.write(stuff) 855 time.sleep(random.random() / 1995) 856 null_f.close() 857 in_f.close() 858 thread_has_run.add(threading.current_thread()) 859 860 def main(): 861 count = 0 862 for _ in range(40): 863 new_thread = threading.Thread(target=random_io) 864 new_thread.daemon = True 865 new_thread.start() 866 count += 1 867 while len(thread_has_run) < count: 868 time.sleep(0.001) 869 # Trigger process shutdown 870 sys.exit(0) 871 872 main() 873 """ 874 rc, out, err = assert_python_ok('-c', script) 875 self.assertFalse(err) 876 877 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 878 @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug") 879 def test_reinit_tls_after_fork(self): 880 # Issue #13817: fork() would deadlock in a multithreaded program with 881 # the ad-hoc TLS implementation. 882 883 def do_fork_and_wait(): 884 # just fork a child process and wait it 885 pid = os.fork() 886 if pid > 0: 887 os.waitpid(pid, 0) 888 else: 889 os._exit(0) 890 891 # start a bunch of threads that will fork() child processes 892 threads = [] 893 for i in range(16): 894 t = threading.Thread(target=do_fork_and_wait) 895 threads.append(t) 896 t.start() 897 898 for t in threads: 899 t.join() 900 901 @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()") 902 def test_clear_threads_states_after_fork(self): 903 # Issue #17094: check that threads states are cleared after fork() 904 905 # start a bunch of threads 906 threads = [] 907 for i in range(16): 908 t = threading.Thread(target=lambda : time.sleep(0.3)) 909 threads.append(t) 910 t.start() 911 912 pid = os.fork() 913 if pid == 0: 914 # check that threads states have been cleared 915 if len(sys._current_frames()) == 1: 916 os._exit(0) 917 else: 918 os._exit(1) 919 else: 920 _, status = os.waitpid(pid, 0) 921 self.assertEqual(0, status) 922 923 for t in threads: 924 t.join() 925 926 927class SubinterpThreadingTests(BaseTestCase): 928 929 def test_threads_join(self): 930 # Non-daemon threads should be joined at subinterpreter shutdown 931 # (issue #18808) 932 r, w = os.pipe() 933 self.addCleanup(os.close, r) 934 self.addCleanup(os.close, w) 935 code = r"""if 1: 936 import os 937 import random 938 import threading 939 import time 940 941 def random_sleep(): 942 seconds = random.random() * 0.010 943 time.sleep(seconds) 944 945 def f(): 946 # Sleep a bit so that the thread is still running when 947 # Py_EndInterpreter is called. 948 random_sleep() 949 os.write(%d, b"x") 950 951 threading.Thread(target=f).start() 952 random_sleep() 953 """ % (w,) 954 ret = test.support.run_in_subinterp(code) 955 self.assertEqual(ret, 0) 956 # The thread was joined properly. 957 self.assertEqual(os.read(r, 1), b"x") 958 959 def test_threads_join_2(self): 960 # Same as above, but a delay gets introduced after the thread's 961 # Python code returned but before the thread state is deleted. 962 # To achieve this, we register a thread-local object which sleeps 963 # a bit when deallocated. 964 r, w = os.pipe() 965 self.addCleanup(os.close, r) 966 self.addCleanup(os.close, w) 967 code = r"""if 1: 968 import os 969 import random 970 import threading 971 import time 972 973 def random_sleep(): 974 seconds = random.random() * 0.010 975 time.sleep(seconds) 976 977 class Sleeper: 978 def __del__(self): 979 random_sleep() 980 981 tls = threading.local() 982 983 def f(): 984 # Sleep a bit so that the thread is still running when 985 # Py_EndInterpreter is called. 986 random_sleep() 987 tls.x = Sleeper() 988 os.write(%d, b"x") 989 990 threading.Thread(target=f).start() 991 random_sleep() 992 """ % (w,) 993 ret = test.support.run_in_subinterp(code) 994 self.assertEqual(ret, 0) 995 # The thread was joined properly. 996 self.assertEqual(os.read(r, 1), b"x") 997 998 @cpython_only 999 def test_daemon_threads_fatal_error(self): 1000 subinterp_code = r"""if 1: 1001 import os 1002 import threading 1003 import time 1004 1005 def f(): 1006 # Make sure the daemon thread is still running when 1007 # Py_EndInterpreter is called. 1008 time.sleep(10) 1009 threading.Thread(target=f, daemon=True).start() 1010 """ 1011 script = r"""if 1: 1012 import _testcapi 1013 1014 _testcapi.run_in_subinterp(%r) 1015 """ % (subinterp_code,) 1016 with test.support.SuppressCrashReport(): 1017 rc, out, err = assert_python_failure("-c", script) 1018 self.assertIn("Fatal Python error: Py_EndInterpreter: " 1019 "not the last thread", err.decode()) 1020 1021 1022class ThreadingExceptionTests(BaseTestCase): 1023 # A RuntimeError should be raised if Thread.start() is called 1024 # multiple times. 1025 def test_start_thread_again(self): 1026 thread = threading.Thread() 1027 thread.start() 1028 self.assertRaises(RuntimeError, thread.start) 1029 thread.join() 1030 1031 def test_joining_current_thread(self): 1032 current_thread = threading.current_thread() 1033 self.assertRaises(RuntimeError, current_thread.join); 1034 1035 def test_joining_inactive_thread(self): 1036 thread = threading.Thread() 1037 self.assertRaises(RuntimeError, thread.join) 1038 1039 def test_daemonize_active_thread(self): 1040 thread = threading.Thread() 1041 thread.start() 1042 self.assertRaises(RuntimeError, setattr, thread, "daemon", True) 1043 thread.join() 1044 1045 def test_releasing_unacquired_lock(self): 1046 lock = threading.Lock() 1047 self.assertRaises(RuntimeError, lock.release) 1048 1049 @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(), 1050 'test macosx problem') 1051 def test_recursion_limit(self): 1052 # Issue 9670 1053 # test that excessive recursion within a non-main thread causes 1054 # an exception rather than crashing the interpreter on platforms 1055 # like Mac OS X or FreeBSD which have small default stack sizes 1056 # for threads 1057 script = """if True: 1058 import threading 1059 1060 def recurse(): 1061 return recurse() 1062 1063 def outer(): 1064 try: 1065 recurse() 1066 except RecursionError: 1067 pass 1068 1069 w = threading.Thread(target=outer) 1070 w.start() 1071 w.join() 1072 print('end of main thread') 1073 """ 1074 expected_output = "end of main thread\n" 1075 p = subprocess.Popen([sys.executable, "-c", script], 1076 stdout=subprocess.PIPE, stderr=subprocess.PIPE) 1077 stdout, stderr = p.communicate() 1078 data = stdout.decode().replace('\r', '') 1079 self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode()) 1080 self.assertEqual(data, expected_output) 1081 1082 def test_print_exception(self): 1083 script = r"""if True: 1084 import threading 1085 import time 1086 1087 running = False 1088 def run(): 1089 global running 1090 running = True 1091 while running: 1092 time.sleep(0.01) 1093 1/0 1094 t = threading.Thread(target=run) 1095 t.start() 1096 while not running: 1097 time.sleep(0.01) 1098 running = False 1099 t.join() 1100 """ 1101 rc, out, err = assert_python_ok("-c", script) 1102 self.assertEqual(out, b'') 1103 err = err.decode() 1104 self.assertIn("Exception in thread", err) 1105 self.assertIn("Traceback (most recent call last):", err) 1106 self.assertIn("ZeroDivisionError", err) 1107 self.assertNotIn("Unhandled exception", err) 1108 1109 @requires_type_collecting 1110 def test_print_exception_stderr_is_none_1(self): 1111 script = r"""if True: 1112 import sys 1113 import threading 1114 import time 1115 1116 running = False 1117 def run(): 1118 global running 1119 running = True 1120 while running: 1121 time.sleep(0.01) 1122 1/0 1123 t = threading.Thread(target=run) 1124 t.start() 1125 while not running: 1126 time.sleep(0.01) 1127 sys.stderr = None 1128 running = False 1129 t.join() 1130 """ 1131 rc, out, err = assert_python_ok("-c", script) 1132 self.assertEqual(out, b'') 1133 err = err.decode() 1134 self.assertIn("Exception in thread", err) 1135 self.assertIn("Traceback (most recent call last):", err) 1136 self.assertIn("ZeroDivisionError", err) 1137 self.assertNotIn("Unhandled exception", err) 1138 1139 def test_print_exception_stderr_is_none_2(self): 1140 script = r"""if True: 1141 import sys 1142 import threading 1143 import time 1144 1145 running = False 1146 def run(): 1147 global running 1148 running = True 1149 while running: 1150 time.sleep(0.01) 1151 1/0 1152 sys.stderr = None 1153 t = threading.Thread(target=run) 1154 t.start() 1155 while not running: 1156 time.sleep(0.01) 1157 running = False 1158 t.join() 1159 """ 1160 rc, out, err = assert_python_ok("-c", script) 1161 self.assertEqual(out, b'') 1162 self.assertNotIn("Unhandled exception", err.decode()) 1163 1164 def test_bare_raise_in_brand_new_thread(self): 1165 def bare_raise(): 1166 raise 1167 1168 class Issue27558(threading.Thread): 1169 exc = None 1170 1171 def run(self): 1172 try: 1173 bare_raise() 1174 except Exception as exc: 1175 self.exc = exc 1176 1177 thread = Issue27558() 1178 thread.start() 1179 thread.join() 1180 self.assertIsNotNone(thread.exc) 1181 self.assertIsInstance(thread.exc, RuntimeError) 1182 # explicitly break the reference cycle to not leak a dangling thread 1183 thread.exc = None 1184 1185class TimerTests(BaseTestCase): 1186 1187 def setUp(self): 1188 BaseTestCase.setUp(self) 1189 self.callback_args = [] 1190 self.callback_event = threading.Event() 1191 1192 def test_init_immutable_default_args(self): 1193 # Issue 17435: constructor defaults were mutable objects, they could be 1194 # mutated via the object attributes and affect other Timer objects. 1195 timer1 = threading.Timer(0.01, self._callback_spy) 1196 timer1.start() 1197 self.callback_event.wait() 1198 timer1.args.append("blah") 1199 timer1.kwargs["foo"] = "bar" 1200 self.callback_event.clear() 1201 timer2 = threading.Timer(0.01, self._callback_spy) 1202 timer2.start() 1203 self.callback_event.wait() 1204 self.assertEqual(len(self.callback_args), 2) 1205 self.assertEqual(self.callback_args, [((), {}), ((), {})]) 1206 timer1.join() 1207 timer2.join() 1208 1209 def _callback_spy(self, *args, **kwargs): 1210 self.callback_args.append((args[:], kwargs.copy())) 1211 self.callback_event.set() 1212 1213class LockTests(lock_tests.LockTests): 1214 locktype = staticmethod(threading.Lock) 1215 1216class PyRLockTests(lock_tests.RLockTests): 1217 locktype = staticmethod(threading._PyRLock) 1218 1219@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C') 1220class CRLockTests(lock_tests.RLockTests): 1221 locktype = staticmethod(threading._CRLock) 1222 1223class EventTests(lock_tests.EventTests): 1224 eventtype = staticmethod(threading.Event) 1225 1226class ConditionAsRLockTests(lock_tests.RLockTests): 1227 # Condition uses an RLock by default and exports its API. 1228 locktype = staticmethod(threading.Condition) 1229 1230class ConditionTests(lock_tests.ConditionTests): 1231 condtype = staticmethod(threading.Condition) 1232 1233class SemaphoreTests(lock_tests.SemaphoreTests): 1234 semtype = staticmethod(threading.Semaphore) 1235 1236class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests): 1237 semtype = staticmethod(threading.BoundedSemaphore) 1238 1239class BarrierTests(lock_tests.BarrierTests): 1240 barriertype = staticmethod(threading.Barrier) 1241 1242 1243class MiscTestCase(unittest.TestCase): 1244 def test__all__(self): 1245 extra = {"ThreadError"} 1246 blacklist = {'currentThread', 'activeCount'} 1247 support.check__all__(self, threading, ('threading', '_thread'), 1248 extra=extra, blacklist=blacklist) 1249 1250 1251class InterruptMainTests(unittest.TestCase): 1252 def test_interrupt_main_subthread(self): 1253 # Calling start_new_thread with a function that executes interrupt_main 1254 # should raise KeyboardInterrupt upon completion. 1255 def call_interrupt(): 1256 _thread.interrupt_main() 1257 t = threading.Thread(target=call_interrupt) 1258 with self.assertRaises(KeyboardInterrupt): 1259 t.start() 1260 t.join() 1261 t.join() 1262 1263 def test_interrupt_main_mainthread(self): 1264 # Make sure that if interrupt_main is called in main thread that 1265 # KeyboardInterrupt is raised instantly. 1266 with self.assertRaises(KeyboardInterrupt): 1267 _thread.interrupt_main() 1268 1269 def test_interrupt_main_noerror(self): 1270 handler = signal.getsignal(signal.SIGINT) 1271 try: 1272 # No exception should arise. 1273 signal.signal(signal.SIGINT, signal.SIG_IGN) 1274 _thread.interrupt_main() 1275 1276 signal.signal(signal.SIGINT, signal.SIG_DFL) 1277 _thread.interrupt_main() 1278 finally: 1279 # Restore original handler 1280 signal.signal(signal.SIGINT, handler) 1281 1282 1283if __name__ == "__main__": 1284 unittest.main() 1285