1"""
2Tests for the threading module.
3"""
4
5import test.support
6from test.support import (verbose, import_module, cpython_only,
7                          requires_type_collecting)
8from test.support.script_helper import assert_python_ok, assert_python_failure
9
10import random
11import sys
12import _thread
13import threading
14import time
15import unittest
16import weakref
17import os
18import subprocess
19import signal
20
21from test import lock_tests
22from test import support
23
24
25# Between fork() and exec(), only async-safe functions are allowed (issues
26# #12316 and #11870), and fork() from a worker thread is known to trigger
27# problems with some operating systems (issue #3863): skip problematic tests
28# on platforms known to behave badly.
29platforms_to_skip = ('netbsd5', 'hp-ux11')
30
31
32# A trivial mutable counter.
33class Counter(object):
34    def __init__(self):
35        self.value = 0
36    def inc(self):
37        self.value += 1
38    def dec(self):
39        self.value -= 1
40    def get(self):
41        return self.value
42
43class TestThread(threading.Thread):
44    def __init__(self, name, testcase, sema, mutex, nrunning):
45        threading.Thread.__init__(self, name=name)
46        self.testcase = testcase
47        self.sema = sema
48        self.mutex = mutex
49        self.nrunning = nrunning
50
51    def run(self):
52        delay = random.random() / 10000.0
53        if verbose:
54            print('task %s will run for %.1f usec' %
55                  (self.name, delay * 1e6))
56
57        with self.sema:
58            with self.mutex:
59                self.nrunning.inc()
60                if verbose:
61                    print(self.nrunning.get(), 'tasks are running')
62                self.testcase.assertLessEqual(self.nrunning.get(), 3)
63
64            time.sleep(delay)
65            if verbose:
66                print('task', self.name, 'done')
67
68            with self.mutex:
69                self.nrunning.dec()
70                self.testcase.assertGreaterEqual(self.nrunning.get(), 0)
71                if verbose:
72                    print('%s is finished. %d tasks are running' %
73                          (self.name, self.nrunning.get()))
74
75
76class BaseTestCase(unittest.TestCase):
77    def setUp(self):
78        self._threads = test.support.threading_setup()
79
80    def tearDown(self):
81        test.support.threading_cleanup(*self._threads)
82        test.support.reap_children()
83
84
85class ThreadTests(BaseTestCase):
86
87    # Create a bunch of threads, let each do some work, wait until all are
88    # done.
89    def test_various_ops(self):
90        # This takes about n/3 seconds to run (about n/3 clumps of tasks,
91        # times about 1 second per clump).
92        NUMTASKS = 10
93
94        # no more than 3 of the 10 can run at once
95        sema = threading.BoundedSemaphore(value=3)
96        mutex = threading.RLock()
97        numrunning = Counter()
98
99        threads = []
100
101        for i in range(NUMTASKS):
102            t = TestThread("<thread %d>"%i, self, sema, mutex, numrunning)
103            threads.append(t)
104            self.assertIsNone(t.ident)
105            self.assertRegex(repr(t), r'^<TestThread\(.*, initial\)>$')
106            t.start()
107
108        if verbose:
109            print('waiting for all tasks to complete')
110        for t in threads:
111            t.join()
112            self.assertFalse(t.is_alive())
113            self.assertNotEqual(t.ident, 0)
114            self.assertIsNotNone(t.ident)
115            self.assertRegex(repr(t), r'^<TestThread\(.*, stopped -?\d+\)>$')
116        if verbose:
117            print('all tasks done')
118        self.assertEqual(numrunning.get(), 0)
119
120    def test_ident_of_no_threading_threads(self):
121        # The ident still must work for the main thread and dummy threads.
122        self.assertIsNotNone(threading.currentThread().ident)
123        def f():
124            ident.append(threading.currentThread().ident)
125            done.set()
126        done = threading.Event()
127        ident = []
128        with support.wait_threads_exit():
129            tid = _thread.start_new_thread(f, ())
130            done.wait()
131            self.assertEqual(ident[0], tid)
132        # Kill the "immortal" _DummyThread
133        del threading._active[ident[0]]
134
135    # run with a small(ish) thread stack size (256 KiB)
136    def test_various_ops_small_stack(self):
137        if verbose:
138            print('with 256 KiB thread stack size...')
139        try:
140            threading.stack_size(262144)
141        except _thread.error:
142            raise unittest.SkipTest(
143                'platform does not support changing thread stack size')
144        self.test_various_ops()
145        threading.stack_size(0)
146
147    # run with a large thread stack size (1 MiB)
148    def test_various_ops_large_stack(self):
149        if verbose:
150            print('with 1 MiB thread stack size...')
151        try:
152            threading.stack_size(0x100000)
153        except _thread.error:
154            raise unittest.SkipTest(
155                'platform does not support changing thread stack size')
156        self.test_various_ops()
157        threading.stack_size(0)
158
159    def test_foreign_thread(self):
160        # Check that a "foreign" thread can use the threading module.
161        def f(mutex):
162            # Calling current_thread() forces an entry for the foreign
163            # thread to get made in the threading._active map.
164            threading.current_thread()
165            mutex.release()
166
167        mutex = threading.Lock()
168        mutex.acquire()
169        with support.wait_threads_exit():
170            tid = _thread.start_new_thread(f, (mutex,))
171            # Wait for the thread to finish.
172            mutex.acquire()
173        self.assertIn(tid, threading._active)
174        self.assertIsInstance(threading._active[tid], threading._DummyThread)
175        #Issue 29376
176        self.assertTrue(threading._active[tid].is_alive())
177        self.assertRegex(repr(threading._active[tid]), '_DummyThread')
178        del threading._active[tid]
179
180    # PyThreadState_SetAsyncExc() is a CPython-only gimmick, not (currently)
181    # exposed at the Python level.  This test relies on ctypes to get at it.
182    def test_PyThreadState_SetAsyncExc(self):
183        ctypes = import_module("ctypes")
184
185        set_async_exc = ctypes.pythonapi.PyThreadState_SetAsyncExc
186        set_async_exc.argtypes = (ctypes.c_ulong, ctypes.py_object)
187
188        class AsyncExc(Exception):
189            pass
190
191        exception = ctypes.py_object(AsyncExc)
192
193        # First check it works when setting the exception from the same thread.
194        tid = threading.get_ident()
195        self.assertIsInstance(tid, int)
196        self.assertGreater(tid, 0)
197
198        try:
199            result = set_async_exc(tid, exception)
200            # The exception is async, so we might have to keep the VM busy until
201            # it notices.
202            while True:
203                pass
204        except AsyncExc:
205            pass
206        else:
207            # This code is unreachable but it reflects the intent. If we wanted
208            # to be smarter the above loop wouldn't be infinite.
209            self.fail("AsyncExc not raised")
210        try:
211            self.assertEqual(result, 1) # one thread state modified
212        except UnboundLocalError:
213            # The exception was raised too quickly for us to get the result.
214            pass
215
216        # `worker_started` is set by the thread when it's inside a try/except
217        # block waiting to catch the asynchronously set AsyncExc exception.
218        # `worker_saw_exception` is set by the thread upon catching that
219        # exception.
220        worker_started = threading.Event()
221        worker_saw_exception = threading.Event()
222
223        class Worker(threading.Thread):
224            def run(self):
225                self.id = threading.get_ident()
226                self.finished = False
227
228                try:
229                    while True:
230                        worker_started.set()
231                        time.sleep(0.1)
232                except AsyncExc:
233                    self.finished = True
234                    worker_saw_exception.set()
235
236        t = Worker()
237        t.daemon = True # so if this fails, we don't hang Python at shutdown
238        t.start()
239        if verbose:
240            print("    started worker thread")
241
242        # Try a thread id that doesn't make sense.
243        if verbose:
244            print("    trying nonsensical thread id")
245        result = set_async_exc(-1, exception)
246        self.assertEqual(result, 0)  # no thread states modified
247
248        # Now raise an exception in the worker thread.
249        if verbose:
250            print("    waiting for worker thread to get started")
251        ret = worker_started.wait()
252        self.assertTrue(ret)
253        if verbose:
254            print("    verifying worker hasn't exited")
255        self.assertFalse(t.finished)
256        if verbose:
257            print("    attempting to raise asynch exception in worker")
258        result = set_async_exc(t.id, exception)
259        self.assertEqual(result, 1) # one thread state modified
260        if verbose:
261            print("    waiting for worker to say it caught the exception")
262        worker_saw_exception.wait(timeout=10)
263        self.assertTrue(t.finished)
264        if verbose:
265            print("    all OK -- joining worker")
266        if t.finished:
267            t.join()
268        # else the thread is still running, and we have no way to kill it
269
270    def test_limbo_cleanup(self):
271        # Issue 7481: Failure to start thread should cleanup the limbo map.
272        def fail_new_thread(*args):
273            raise threading.ThreadError()
274        _start_new_thread = threading._start_new_thread
275        threading._start_new_thread = fail_new_thread
276        try:
277            t = threading.Thread(target=lambda: None)
278            self.assertRaises(threading.ThreadError, t.start)
279            self.assertFalse(
280                t in threading._limbo,
281                "Failed to cleanup _limbo map on failure of Thread.start().")
282        finally:
283            threading._start_new_thread = _start_new_thread
284
285    def test_finalize_runnning_thread(self):
286        # Issue 1402: the PyGILState_Ensure / _Release functions may be called
287        # very late on python exit: on deallocation of a running thread for
288        # example.
289        import_module("ctypes")
290
291        rc, out, err = assert_python_failure("-c", """if 1:
292            import ctypes, sys, time, _thread
293
294            # This lock is used as a simple event variable.
295            ready = _thread.allocate_lock()
296            ready.acquire()
297
298            # Module globals are cleared before __del__ is run
299            # So we save the functions in class dict
300            class C:
301                ensure = ctypes.pythonapi.PyGILState_Ensure
302                release = ctypes.pythonapi.PyGILState_Release
303                def __del__(self):
304                    state = self.ensure()
305                    self.release(state)
306
307            def waitingThread():
308                x = C()
309                ready.release()
310                time.sleep(100)
311
312            _thread.start_new_thread(waitingThread, ())
313            ready.acquire()  # Be sure the other thread is waiting.
314            sys.exit(42)
315            """)
316        self.assertEqual(rc, 42)
317
318    def test_finalize_with_trace(self):
319        # Issue1733757
320        # Avoid a deadlock when sys.settrace steps into threading._shutdown
321        assert_python_ok("-c", """if 1:
322            import sys, threading
323
324            # A deadlock-killer, to prevent the
325            # testsuite to hang forever
326            def killer():
327                import os, time
328                time.sleep(2)
329                print('program blocked; aborting')
330                os._exit(2)
331            t = threading.Thread(target=killer)
332            t.daemon = True
333            t.start()
334
335            # This is the trace function
336            def func(frame, event, arg):
337                threading.current_thread()
338                return func
339
340            sys.settrace(func)
341            """)
342
343    def test_join_nondaemon_on_shutdown(self):
344        # Issue 1722344
345        # Raising SystemExit skipped threading._shutdown
346        rc, out, err = assert_python_ok("-c", """if 1:
347                import threading
348                from time import sleep
349
350                def child():
351                    sleep(1)
352                    # As a non-daemon thread we SHOULD wake up and nothing
353                    # should be torn down yet
354                    print("Woke up, sleep function is:", sleep)
355
356                threading.Thread(target=child).start()
357                raise SystemExit
358            """)
359        self.assertEqual(out.strip(),
360            b"Woke up, sleep function is: <built-in function sleep>")
361        self.assertEqual(err, b"")
362
363    def test_enumerate_after_join(self):
364        # Try hard to trigger #1703448: a thread is still returned in
365        # threading.enumerate() after it has been join()ed.
366        enum = threading.enumerate
367        old_interval = sys.getswitchinterval()
368        try:
369            for i in range(1, 100):
370                sys.setswitchinterval(i * 0.0002)
371                t = threading.Thread(target=lambda: None)
372                t.start()
373                t.join()
374                l = enum()
375                self.assertNotIn(t, l,
376                    "#1703448 triggered after %d trials: %s" % (i, l))
377        finally:
378            sys.setswitchinterval(old_interval)
379
380    def test_no_refcycle_through_target(self):
381        class RunSelfFunction(object):
382            def __init__(self, should_raise):
383                # The links in this refcycle from Thread back to self
384                # should be cleaned up when the thread completes.
385                self.should_raise = should_raise
386                self.thread = threading.Thread(target=self._run,
387                                               args=(self,),
388                                               kwargs={'yet_another':self})
389                self.thread.start()
390
391            def _run(self, other_ref, yet_another):
392                if self.should_raise:
393                    raise SystemExit
394
395        cyclic_object = RunSelfFunction(should_raise=False)
396        weak_cyclic_object = weakref.ref(cyclic_object)
397        cyclic_object.thread.join()
398        del cyclic_object
399        self.assertIsNone(weak_cyclic_object(),
400                         msg=('%d references still around' %
401                              sys.getrefcount(weak_cyclic_object())))
402
403        raising_cyclic_object = RunSelfFunction(should_raise=True)
404        weak_raising_cyclic_object = weakref.ref(raising_cyclic_object)
405        raising_cyclic_object.thread.join()
406        del raising_cyclic_object
407        self.assertIsNone(weak_raising_cyclic_object(),
408                         msg=('%d references still around' %
409                              sys.getrefcount(weak_raising_cyclic_object())))
410
411    def test_old_threading_api(self):
412        # Just a quick sanity check to make sure the old method names are
413        # still present
414        t = threading.Thread()
415        t.isDaemon()
416        t.setDaemon(True)
417        t.getName()
418        t.setName("name")
419        with self.assertWarnsRegex(PendingDeprecationWarning, 'use is_alive()'):
420            t.isAlive()
421        e = threading.Event()
422        e.isSet()
423        threading.activeCount()
424
425    def test_repr_daemon(self):
426        t = threading.Thread()
427        self.assertNotIn('daemon', repr(t))
428        t.daemon = True
429        self.assertIn('daemon', repr(t))
430
431    def test_daemon_param(self):
432        t = threading.Thread()
433        self.assertFalse(t.daemon)
434        t = threading.Thread(daemon=False)
435        self.assertFalse(t.daemon)
436        t = threading.Thread(daemon=True)
437        self.assertTrue(t.daemon)
438
439    @unittest.skipUnless(hasattr(os, 'fork'), 'test needs fork()')
440    def test_dummy_thread_after_fork(self):
441        # Issue #14308: a dummy thread in the active list doesn't mess up
442        # the after-fork mechanism.
443        code = """if 1:
444            import _thread, threading, os, time
445
446            def background_thread(evt):
447                # Creates and registers the _DummyThread instance
448                threading.current_thread()
449                evt.set()
450                time.sleep(10)
451
452            evt = threading.Event()
453            _thread.start_new_thread(background_thread, (evt,))
454            evt.wait()
455            assert threading.active_count() == 2, threading.active_count()
456            if os.fork() == 0:
457                assert threading.active_count() == 1, threading.active_count()
458                os._exit(0)
459            else:
460                os.wait()
461        """
462        _, out, err = assert_python_ok("-c", code)
463        self.assertEqual(out, b'')
464        self.assertEqual(err, b'')
465
466    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
467    def test_is_alive_after_fork(self):
468        # Try hard to trigger #18418: is_alive() could sometimes be True on
469        # threads that vanished after a fork.
470        old_interval = sys.getswitchinterval()
471        self.addCleanup(sys.setswitchinterval, old_interval)
472
473        # Make the bug more likely to manifest.
474        test.support.setswitchinterval(1e-6)
475
476        for i in range(20):
477            t = threading.Thread(target=lambda: None)
478            t.start()
479            pid = os.fork()
480            if pid == 0:
481                os._exit(11 if t.is_alive() else 10)
482            else:
483                t.join()
484
485                pid, status = os.waitpid(pid, 0)
486                self.assertTrue(os.WIFEXITED(status))
487                self.assertEqual(10, os.WEXITSTATUS(status))
488
489    def test_main_thread(self):
490        main = threading.main_thread()
491        self.assertEqual(main.name, 'MainThread')
492        self.assertEqual(main.ident, threading.current_thread().ident)
493        self.assertEqual(main.ident, threading.get_ident())
494
495        def f():
496            self.assertNotEqual(threading.main_thread().ident,
497                                threading.current_thread().ident)
498        th = threading.Thread(target=f)
499        th.start()
500        th.join()
501
502    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
503    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
504    def test_main_thread_after_fork(self):
505        code = """if 1:
506            import os, threading
507
508            pid = os.fork()
509            if pid == 0:
510                main = threading.main_thread()
511                print(main.name)
512                print(main.ident == threading.current_thread().ident)
513                print(main.ident == threading.get_ident())
514            else:
515                os.waitpid(pid, 0)
516        """
517        _, out, err = assert_python_ok("-c", code)
518        data = out.decode().replace('\r', '')
519        self.assertEqual(err, b"")
520        self.assertEqual(data, "MainThread\nTrue\nTrue\n")
521
522    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
523    @unittest.skipUnless(hasattr(os, 'fork'), "test needs os.fork()")
524    @unittest.skipUnless(hasattr(os, 'waitpid'), "test needs os.waitpid()")
525    def test_main_thread_after_fork_from_nonmain_thread(self):
526        code = """if 1:
527            import os, threading, sys
528
529            def f():
530                pid = os.fork()
531                if pid == 0:
532                    main = threading.main_thread()
533                    print(main.name)
534                    print(main.ident == threading.current_thread().ident)
535                    print(main.ident == threading.get_ident())
536                    # stdout is fully buffered because not a tty,
537                    # we have to flush before exit.
538                    sys.stdout.flush()
539                else:
540                    os.waitpid(pid, 0)
541
542            th = threading.Thread(target=f)
543            th.start()
544            th.join()
545        """
546        _, out, err = assert_python_ok("-c", code)
547        data = out.decode().replace('\r', '')
548        self.assertEqual(err, b"")
549        self.assertEqual(data, "Thread-1\nTrue\nTrue\n")
550
551    @requires_type_collecting
552    def test_main_thread_during_shutdown(self):
553        # bpo-31516: current_thread() should still point to the main thread
554        # at shutdown
555        code = """if 1:
556            import gc, threading
557
558            main_thread = threading.current_thread()
559            assert main_thread is threading.main_thread()  # sanity check
560
561            class RefCycle:
562                def __init__(self):
563                    self.cycle = self
564
565                def __del__(self):
566                    print("GC:",
567                          threading.current_thread() is main_thread,
568                          threading.main_thread() is main_thread,
569                          threading.enumerate() == [main_thread])
570
571            RefCycle()
572            gc.collect()  # sanity check
573            x = RefCycle()
574        """
575        _, out, err = assert_python_ok("-c", code)
576        data = out.decode()
577        self.assertEqual(err, b"")
578        self.assertEqual(data.splitlines(),
579                         ["GC: True True True"] * 2)
580
581    def test_finalization_shutdown(self):
582        # bpo-36402: Py_Finalize() calls threading._shutdown() which must wait
583        # until Python thread states of all non-daemon threads get deleted.
584        #
585        # Test similar to SubinterpThreadingTests.test_threads_join_2(), but
586        # test the finalization of the main interpreter.
587        code = """if 1:
588            import os
589            import threading
590            import time
591            import random
592
593            def random_sleep():
594                seconds = random.random() * 0.010
595                time.sleep(seconds)
596
597            class Sleeper:
598                def __del__(self):
599                    random_sleep()
600
601            tls = threading.local()
602
603            def f():
604                # Sleep a bit so that the thread is still running when
605                # Py_Finalize() is called.
606                random_sleep()
607                tls.x = Sleeper()
608                random_sleep()
609
610            threading.Thread(target=f).start()
611            random_sleep()
612        """
613        rc, out, err = assert_python_ok("-c", code)
614        self.assertEqual(err, b"")
615
616    def test_tstate_lock(self):
617        # Test an implementation detail of Thread objects.
618        started = _thread.allocate_lock()
619        finish = _thread.allocate_lock()
620        started.acquire()
621        finish.acquire()
622        def f():
623            started.release()
624            finish.acquire()
625            time.sleep(0.01)
626        # The tstate lock is None until the thread is started
627        t = threading.Thread(target=f)
628        self.assertIs(t._tstate_lock, None)
629        t.start()
630        started.acquire()
631        self.assertTrue(t.is_alive())
632        # The tstate lock can't be acquired when the thread is running
633        # (or suspended).
634        tstate_lock = t._tstate_lock
635        self.assertFalse(tstate_lock.acquire(timeout=0), False)
636        finish.release()
637        # When the thread ends, the state_lock can be successfully
638        # acquired.
639        self.assertTrue(tstate_lock.acquire(timeout=5), False)
640        # But is_alive() is still True:  we hold _tstate_lock now, which
641        # prevents is_alive() from knowing the thread's end-of-life C code
642        # is done.
643        self.assertTrue(t.is_alive())
644        # Let is_alive() find out the C code is done.
645        tstate_lock.release()
646        self.assertFalse(t.is_alive())
647        # And verify the thread disposed of _tstate_lock.
648        self.assertIsNone(t._tstate_lock)
649        t.join()
650
651    def test_repr_stopped(self):
652        # Verify that "stopped" shows up in repr(Thread) appropriately.
653        started = _thread.allocate_lock()
654        finish = _thread.allocate_lock()
655        started.acquire()
656        finish.acquire()
657        def f():
658            started.release()
659            finish.acquire()
660        t = threading.Thread(target=f)
661        t.start()
662        started.acquire()
663        self.assertIn("started", repr(t))
664        finish.release()
665        # "stopped" should appear in the repr in a reasonable amount of time.
666        # Implementation detail:  as of this writing, that's trivially true
667        # if .join() is called, and almost trivially true if .is_alive() is
668        # called.  The detail we're testing here is that "stopped" shows up
669        # "all on its own".
670        LOOKING_FOR = "stopped"
671        for i in range(500):
672            if LOOKING_FOR in repr(t):
673                break
674            time.sleep(0.01)
675        self.assertIn(LOOKING_FOR, repr(t)) # we waited at least 5 seconds
676        t.join()
677
678    def test_BoundedSemaphore_limit(self):
679        # BoundedSemaphore should raise ValueError if released too often.
680        for limit in range(1, 10):
681            bs = threading.BoundedSemaphore(limit)
682            threads = [threading.Thread(target=bs.acquire)
683                       for _ in range(limit)]
684            for t in threads:
685                t.start()
686            for t in threads:
687                t.join()
688            threads = [threading.Thread(target=bs.release)
689                       for _ in range(limit)]
690            for t in threads:
691                t.start()
692            for t in threads:
693                t.join()
694            self.assertRaises(ValueError, bs.release)
695
696    @cpython_only
697    def test_frame_tstate_tracing(self):
698        # Issue #14432: Crash when a generator is created in a C thread that is
699        # destroyed while the generator is still used. The issue was that a
700        # generator contains a frame, and the frame kept a reference to the
701        # Python state of the destroyed C thread. The crash occurs when a trace
702        # function is setup.
703
704        def noop_trace(frame, event, arg):
705            # no operation
706            return noop_trace
707
708        def generator():
709            while 1:
710                yield "generator"
711
712        def callback():
713            if callback.gen is None:
714                callback.gen = generator()
715            return next(callback.gen)
716        callback.gen = None
717
718        old_trace = sys.gettrace()
719        sys.settrace(noop_trace)
720        try:
721            # Install a trace function
722            threading.settrace(noop_trace)
723
724            # Create a generator in a C thread which exits after the call
725            import _testcapi
726            _testcapi.call_in_temporary_c_thread(callback)
727
728            # Call the generator in a different Python thread, check that the
729            # generator didn't keep a reference to the destroyed thread state
730            for test in range(3):
731                # The trace function is still called here
732                callback()
733        finally:
734            sys.settrace(old_trace)
735
736    @cpython_only
737    def test_shutdown_locks(self):
738        for daemon in (False, True):
739            with self.subTest(daemon=daemon):
740                event = threading.Event()
741                thread = threading.Thread(target=event.wait, daemon=daemon)
742
743                # Thread.start() must add lock to _shutdown_locks,
744                # but only for non-daemon thread
745                thread.start()
746                tstate_lock = thread._tstate_lock
747                if not daemon:
748                    self.assertIn(tstate_lock, threading._shutdown_locks)
749                else:
750                    self.assertNotIn(tstate_lock, threading._shutdown_locks)
751
752                # unblock the thread and join it
753                event.set()
754                thread.join()
755
756                # Thread._stop() must remove tstate_lock from _shutdown_locks.
757                # Daemon threads must never add it to _shutdown_locks.
758                self.assertNotIn(tstate_lock, threading._shutdown_locks)
759
760
761class ThreadJoinOnShutdown(BaseTestCase):
762
763    def _run_and_join(self, script):
764        script = """if 1:
765            import sys, os, time, threading
766
767            # a thread, which waits for the main program to terminate
768            def joiningfunc(mainthread):
769                mainthread.join()
770                print('end of thread')
771                # stdout is fully buffered because not a tty, we have to flush
772                # before exit.
773                sys.stdout.flush()
774        \n""" + script
775
776        rc, out, err = assert_python_ok("-c", script)
777        data = out.decode().replace('\r', '')
778        self.assertEqual(data, "end of main\nend of thread\n")
779
780    def test_1_join_on_shutdown(self):
781        # The usual case: on exit, wait for a non-daemon thread
782        script = """if 1:
783            import os
784            t = threading.Thread(target=joiningfunc,
785                                 args=(threading.current_thread(),))
786            t.start()
787            time.sleep(0.1)
788            print('end of main')
789            """
790        self._run_and_join(script)
791
792    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
793    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
794    def test_2_join_in_forked_process(self):
795        # Like the test above, but from a forked interpreter
796        script = """if 1:
797            childpid = os.fork()
798            if childpid != 0:
799                os.waitpid(childpid, 0)
800                sys.exit(0)
801
802            t = threading.Thread(target=joiningfunc,
803                                 args=(threading.current_thread(),))
804            t.start()
805            print('end of main')
806            """
807        self._run_and_join(script)
808
809    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
810    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
811    def test_3_join_in_forked_from_thread(self):
812        # Like the test above, but fork() was called from a worker thread
813        # In the forked process, the main Thread object must be marked as stopped.
814
815        script = """if 1:
816            main_thread = threading.current_thread()
817            def worker():
818                childpid = os.fork()
819                if childpid != 0:
820                    os.waitpid(childpid, 0)
821                    sys.exit(0)
822
823                t = threading.Thread(target=joiningfunc,
824                                     args=(main_thread,))
825                print('end of main')
826                t.start()
827                t.join() # Should not block: main_thread is already stopped
828
829            w = threading.Thread(target=worker)
830            w.start()
831            """
832        self._run_and_join(script)
833
834    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
835    def test_4_daemon_threads(self):
836        # Check that a daemon thread cannot crash the interpreter on shutdown
837        # by manipulating internal structures that are being disposed of in
838        # the main thread.
839        script = """if True:
840            import os
841            import random
842            import sys
843            import time
844            import threading
845
846            thread_has_run = set()
847
848            def random_io():
849                '''Loop for a while sleeping random tiny amounts and doing some I/O.'''
850                while True:
851                    in_f = open(os.__file__, 'rb')
852                    stuff = in_f.read(200)
853                    null_f = open(os.devnull, 'wb')
854                    null_f.write(stuff)
855                    time.sleep(random.random() / 1995)
856                    null_f.close()
857                    in_f.close()
858                    thread_has_run.add(threading.current_thread())
859
860            def main():
861                count = 0
862                for _ in range(40):
863                    new_thread = threading.Thread(target=random_io)
864                    new_thread.daemon = True
865                    new_thread.start()
866                    count += 1
867                while len(thread_has_run) < count:
868                    time.sleep(0.001)
869                # Trigger process shutdown
870                sys.exit(0)
871
872            main()
873            """
874        rc, out, err = assert_python_ok('-c', script)
875        self.assertFalse(err)
876
877    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
878    @unittest.skipIf(sys.platform in platforms_to_skip, "due to known OS bug")
879    def test_reinit_tls_after_fork(self):
880        # Issue #13817: fork() would deadlock in a multithreaded program with
881        # the ad-hoc TLS implementation.
882
883        def do_fork_and_wait():
884            # just fork a child process and wait it
885            pid = os.fork()
886            if pid > 0:
887                os.waitpid(pid, 0)
888            else:
889                os._exit(0)
890
891        # start a bunch of threads that will fork() child processes
892        threads = []
893        for i in range(16):
894            t = threading.Thread(target=do_fork_and_wait)
895            threads.append(t)
896            t.start()
897
898        for t in threads:
899            t.join()
900
901    @unittest.skipUnless(hasattr(os, 'fork'), "needs os.fork()")
902    def test_clear_threads_states_after_fork(self):
903        # Issue #17094: check that threads states are cleared after fork()
904
905        # start a bunch of threads
906        threads = []
907        for i in range(16):
908            t = threading.Thread(target=lambda : time.sleep(0.3))
909            threads.append(t)
910            t.start()
911
912        pid = os.fork()
913        if pid == 0:
914            # check that threads states have been cleared
915            if len(sys._current_frames()) == 1:
916                os._exit(0)
917            else:
918                os._exit(1)
919        else:
920            _, status = os.waitpid(pid, 0)
921            self.assertEqual(0, status)
922
923        for t in threads:
924            t.join()
925
926
927class SubinterpThreadingTests(BaseTestCase):
928
929    def test_threads_join(self):
930        # Non-daemon threads should be joined at subinterpreter shutdown
931        # (issue #18808)
932        r, w = os.pipe()
933        self.addCleanup(os.close, r)
934        self.addCleanup(os.close, w)
935        code = r"""if 1:
936            import os
937            import random
938            import threading
939            import time
940
941            def random_sleep():
942                seconds = random.random() * 0.010
943                time.sleep(seconds)
944
945            def f():
946                # Sleep a bit so that the thread is still running when
947                # Py_EndInterpreter is called.
948                random_sleep()
949                os.write(%d, b"x")
950
951            threading.Thread(target=f).start()
952            random_sleep()
953            """ % (w,)
954        ret = test.support.run_in_subinterp(code)
955        self.assertEqual(ret, 0)
956        # The thread was joined properly.
957        self.assertEqual(os.read(r, 1), b"x")
958
959    def test_threads_join_2(self):
960        # Same as above, but a delay gets introduced after the thread's
961        # Python code returned but before the thread state is deleted.
962        # To achieve this, we register a thread-local object which sleeps
963        # a bit when deallocated.
964        r, w = os.pipe()
965        self.addCleanup(os.close, r)
966        self.addCleanup(os.close, w)
967        code = r"""if 1:
968            import os
969            import random
970            import threading
971            import time
972
973            def random_sleep():
974                seconds = random.random() * 0.010
975                time.sleep(seconds)
976
977            class Sleeper:
978                def __del__(self):
979                    random_sleep()
980
981            tls = threading.local()
982
983            def f():
984                # Sleep a bit so that the thread is still running when
985                # Py_EndInterpreter is called.
986                random_sleep()
987                tls.x = Sleeper()
988                os.write(%d, b"x")
989
990            threading.Thread(target=f).start()
991            random_sleep()
992            """ % (w,)
993        ret = test.support.run_in_subinterp(code)
994        self.assertEqual(ret, 0)
995        # The thread was joined properly.
996        self.assertEqual(os.read(r, 1), b"x")
997
998    @cpython_only
999    def test_daemon_threads_fatal_error(self):
1000        subinterp_code = r"""if 1:
1001            import os
1002            import threading
1003            import time
1004
1005            def f():
1006                # Make sure the daemon thread is still running when
1007                # Py_EndInterpreter is called.
1008                time.sleep(10)
1009            threading.Thread(target=f, daemon=True).start()
1010            """
1011        script = r"""if 1:
1012            import _testcapi
1013
1014            _testcapi.run_in_subinterp(%r)
1015            """ % (subinterp_code,)
1016        with test.support.SuppressCrashReport():
1017            rc, out, err = assert_python_failure("-c", script)
1018        self.assertIn("Fatal Python error: Py_EndInterpreter: "
1019                      "not the last thread", err.decode())
1020
1021
1022class ThreadingExceptionTests(BaseTestCase):
1023    # A RuntimeError should be raised if Thread.start() is called
1024    # multiple times.
1025    def test_start_thread_again(self):
1026        thread = threading.Thread()
1027        thread.start()
1028        self.assertRaises(RuntimeError, thread.start)
1029        thread.join()
1030
1031    def test_joining_current_thread(self):
1032        current_thread = threading.current_thread()
1033        self.assertRaises(RuntimeError, current_thread.join);
1034
1035    def test_joining_inactive_thread(self):
1036        thread = threading.Thread()
1037        self.assertRaises(RuntimeError, thread.join)
1038
1039    def test_daemonize_active_thread(self):
1040        thread = threading.Thread()
1041        thread.start()
1042        self.assertRaises(RuntimeError, setattr, thread, "daemon", True)
1043        thread.join()
1044
1045    def test_releasing_unacquired_lock(self):
1046        lock = threading.Lock()
1047        self.assertRaises(RuntimeError, lock.release)
1048
1049    @unittest.skipUnless(sys.platform == 'darwin' and test.support.python_is_optimized(),
1050                         'test macosx problem')
1051    def test_recursion_limit(self):
1052        # Issue 9670
1053        # test that excessive recursion within a non-main thread causes
1054        # an exception rather than crashing the interpreter on platforms
1055        # like Mac OS X or FreeBSD which have small default stack sizes
1056        # for threads
1057        script = """if True:
1058            import threading
1059
1060            def recurse():
1061                return recurse()
1062
1063            def outer():
1064                try:
1065                    recurse()
1066                except RecursionError:
1067                    pass
1068
1069            w = threading.Thread(target=outer)
1070            w.start()
1071            w.join()
1072            print('end of main thread')
1073            """
1074        expected_output = "end of main thread\n"
1075        p = subprocess.Popen([sys.executable, "-c", script],
1076                             stdout=subprocess.PIPE, stderr=subprocess.PIPE)
1077        stdout, stderr = p.communicate()
1078        data = stdout.decode().replace('\r', '')
1079        self.assertEqual(p.returncode, 0, "Unexpected error: " + stderr.decode())
1080        self.assertEqual(data, expected_output)
1081
1082    def test_print_exception(self):
1083        script = r"""if True:
1084            import threading
1085            import time
1086
1087            running = False
1088            def run():
1089                global running
1090                running = True
1091                while running:
1092                    time.sleep(0.01)
1093                1/0
1094            t = threading.Thread(target=run)
1095            t.start()
1096            while not running:
1097                time.sleep(0.01)
1098            running = False
1099            t.join()
1100            """
1101        rc, out, err = assert_python_ok("-c", script)
1102        self.assertEqual(out, b'')
1103        err = err.decode()
1104        self.assertIn("Exception in thread", err)
1105        self.assertIn("Traceback (most recent call last):", err)
1106        self.assertIn("ZeroDivisionError", err)
1107        self.assertNotIn("Unhandled exception", err)
1108
1109    @requires_type_collecting
1110    def test_print_exception_stderr_is_none_1(self):
1111        script = r"""if True:
1112            import sys
1113            import threading
1114            import time
1115
1116            running = False
1117            def run():
1118                global running
1119                running = True
1120                while running:
1121                    time.sleep(0.01)
1122                1/0
1123            t = threading.Thread(target=run)
1124            t.start()
1125            while not running:
1126                time.sleep(0.01)
1127            sys.stderr = None
1128            running = False
1129            t.join()
1130            """
1131        rc, out, err = assert_python_ok("-c", script)
1132        self.assertEqual(out, b'')
1133        err = err.decode()
1134        self.assertIn("Exception in thread", err)
1135        self.assertIn("Traceback (most recent call last):", err)
1136        self.assertIn("ZeroDivisionError", err)
1137        self.assertNotIn("Unhandled exception", err)
1138
1139    def test_print_exception_stderr_is_none_2(self):
1140        script = r"""if True:
1141            import sys
1142            import threading
1143            import time
1144
1145            running = False
1146            def run():
1147                global running
1148                running = True
1149                while running:
1150                    time.sleep(0.01)
1151                1/0
1152            sys.stderr = None
1153            t = threading.Thread(target=run)
1154            t.start()
1155            while not running:
1156                time.sleep(0.01)
1157            running = False
1158            t.join()
1159            """
1160        rc, out, err = assert_python_ok("-c", script)
1161        self.assertEqual(out, b'')
1162        self.assertNotIn("Unhandled exception", err.decode())
1163
1164    def test_bare_raise_in_brand_new_thread(self):
1165        def bare_raise():
1166            raise
1167
1168        class Issue27558(threading.Thread):
1169            exc = None
1170
1171            def run(self):
1172                try:
1173                    bare_raise()
1174                except Exception as exc:
1175                    self.exc = exc
1176
1177        thread = Issue27558()
1178        thread.start()
1179        thread.join()
1180        self.assertIsNotNone(thread.exc)
1181        self.assertIsInstance(thread.exc, RuntimeError)
1182        # explicitly break the reference cycle to not leak a dangling thread
1183        thread.exc = None
1184
1185class TimerTests(BaseTestCase):
1186
1187    def setUp(self):
1188        BaseTestCase.setUp(self)
1189        self.callback_args = []
1190        self.callback_event = threading.Event()
1191
1192    def test_init_immutable_default_args(self):
1193        # Issue 17435: constructor defaults were mutable objects, they could be
1194        # mutated via the object attributes and affect other Timer objects.
1195        timer1 = threading.Timer(0.01, self._callback_spy)
1196        timer1.start()
1197        self.callback_event.wait()
1198        timer1.args.append("blah")
1199        timer1.kwargs["foo"] = "bar"
1200        self.callback_event.clear()
1201        timer2 = threading.Timer(0.01, self._callback_spy)
1202        timer2.start()
1203        self.callback_event.wait()
1204        self.assertEqual(len(self.callback_args), 2)
1205        self.assertEqual(self.callback_args, [((), {}), ((), {})])
1206        timer1.join()
1207        timer2.join()
1208
1209    def _callback_spy(self, *args, **kwargs):
1210        self.callback_args.append((args[:], kwargs.copy()))
1211        self.callback_event.set()
1212
1213class LockTests(lock_tests.LockTests):
1214    locktype = staticmethod(threading.Lock)
1215
1216class PyRLockTests(lock_tests.RLockTests):
1217    locktype = staticmethod(threading._PyRLock)
1218
1219@unittest.skipIf(threading._CRLock is None, 'RLock not implemented in C')
1220class CRLockTests(lock_tests.RLockTests):
1221    locktype = staticmethod(threading._CRLock)
1222
1223class EventTests(lock_tests.EventTests):
1224    eventtype = staticmethod(threading.Event)
1225
1226class ConditionAsRLockTests(lock_tests.RLockTests):
1227    # Condition uses an RLock by default and exports its API.
1228    locktype = staticmethod(threading.Condition)
1229
1230class ConditionTests(lock_tests.ConditionTests):
1231    condtype = staticmethod(threading.Condition)
1232
1233class SemaphoreTests(lock_tests.SemaphoreTests):
1234    semtype = staticmethod(threading.Semaphore)
1235
1236class BoundedSemaphoreTests(lock_tests.BoundedSemaphoreTests):
1237    semtype = staticmethod(threading.BoundedSemaphore)
1238
1239class BarrierTests(lock_tests.BarrierTests):
1240    barriertype = staticmethod(threading.Barrier)
1241
1242
1243class MiscTestCase(unittest.TestCase):
1244    def test__all__(self):
1245        extra = {"ThreadError"}
1246        blacklist = {'currentThread', 'activeCount'}
1247        support.check__all__(self, threading, ('threading', '_thread'),
1248                             extra=extra, blacklist=blacklist)
1249
1250
1251class InterruptMainTests(unittest.TestCase):
1252    def test_interrupt_main_subthread(self):
1253        # Calling start_new_thread with a function that executes interrupt_main
1254        # should raise KeyboardInterrupt upon completion.
1255        def call_interrupt():
1256            _thread.interrupt_main()
1257        t = threading.Thread(target=call_interrupt)
1258        with self.assertRaises(KeyboardInterrupt):
1259            t.start()
1260            t.join()
1261        t.join()
1262
1263    def test_interrupt_main_mainthread(self):
1264        # Make sure that if interrupt_main is called in main thread that
1265        # KeyboardInterrupt is raised instantly.
1266        with self.assertRaises(KeyboardInterrupt):
1267            _thread.interrupt_main()
1268
1269    def test_interrupt_main_noerror(self):
1270        handler = signal.getsignal(signal.SIGINT)
1271        try:
1272            # No exception should arise.
1273            signal.signal(signal.SIGINT, signal.SIG_IGN)
1274            _thread.interrupt_main()
1275
1276            signal.signal(signal.SIGINT, signal.SIG_DFL)
1277            _thread.interrupt_main()
1278        finally:
1279            # Restore original handler
1280            signal.signal(signal.SIGINT, handler)
1281
1282
1283if __name__ == "__main__":
1284    unittest.main()
1285