1import os
2import unittest
3import random
4from test import support
5from test.support import threading_helper
6import _thread as thread
7import time
8import weakref
9
10from test import lock_tests
11
12NUMTASKS = 10
13NUMTRIPS = 3
14POLL_SLEEP = 0.010 # seconds = 10 ms
15
16_print_mutex = thread.allocate_lock()
17
18def verbose_print(arg):
19    """Helper function for printing out debugging output."""
20    if support.verbose:
21        with _print_mutex:
22            print(arg)
23
24
25class BasicThreadTest(unittest.TestCase):
26
27    def setUp(self):
28        self.done_mutex = thread.allocate_lock()
29        self.done_mutex.acquire()
30        self.running_mutex = thread.allocate_lock()
31        self.random_mutex = thread.allocate_lock()
32        self.created = 0
33        self.running = 0
34        self.next_ident = 0
35
36        key = threading_helper.threading_setup()
37        self.addCleanup(threading_helper.threading_cleanup, *key)
38
39
40class ThreadRunningTests(BasicThreadTest):
41
42    def newtask(self):
43        with self.running_mutex:
44            self.next_ident += 1
45            verbose_print("creating task %s" % self.next_ident)
46            thread.start_new_thread(self.task, (self.next_ident,))
47            self.created += 1
48            self.running += 1
49
50    def task(self, ident):
51        with self.random_mutex:
52            delay = random.random() / 10000.0
53        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
54        time.sleep(delay)
55        verbose_print("task %s done" % ident)
56        with self.running_mutex:
57            self.running -= 1
58            if self.created == NUMTASKS and self.running == 0:
59                self.done_mutex.release()
60
61    def test_starting_threads(self):
62        with threading_helper.wait_threads_exit():
63            # Basic test for thread creation.
64            for i in range(NUMTASKS):
65                self.newtask()
66            verbose_print("waiting for tasks to complete...")
67            self.done_mutex.acquire()
68            verbose_print("all tasks done")
69
70    def test_stack_size(self):
71        # Various stack size tests.
72        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
73
74        thread.stack_size(0)
75        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
76
77    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
78    def test_nt_and_posix_stack_size(self):
79        try:
80            thread.stack_size(4096)
81        except ValueError:
82            verbose_print("caught expected ValueError setting "
83                            "stack_size(4096)")
84        except thread.error:
85            self.skipTest("platform does not support changing thread stack "
86                          "size")
87
88        fail_msg = "stack_size(%d) failed - should succeed"
89        for tss in (262144, 0x100000, 0):
90            thread.stack_size(tss)
91            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
92            verbose_print("successfully set stack_size(%d)" % tss)
93
94        for tss in (262144, 0x100000):
95            verbose_print("trying stack_size = (%d)" % tss)
96            self.next_ident = 0
97            self.created = 0
98            with threading_helper.wait_threads_exit():
99                for i in range(NUMTASKS):
100                    self.newtask()
101
102                verbose_print("waiting for all tasks to complete")
103                self.done_mutex.acquire()
104                verbose_print("all tasks done")
105
106        thread.stack_size(0)
107
108    def test__count(self):
109        # Test the _count() function.
110        orig = thread._count()
111        mut = thread.allocate_lock()
112        mut.acquire()
113        started = []
114
115        def task():
116            started.append(None)
117            mut.acquire()
118            mut.release()
119
120        with threading_helper.wait_threads_exit():
121            thread.start_new_thread(task, ())
122            while not started:
123                time.sleep(POLL_SLEEP)
124            self.assertEqual(thread._count(), orig + 1)
125            # Allow the task to finish.
126            mut.release()
127            # The only reliable way to be sure that the thread ended from the
128            # interpreter's point of view is to wait for the function object to be
129            # destroyed.
130            done = []
131            wr = weakref.ref(task, lambda _: done.append(None))
132            del task
133            while not done:
134                time.sleep(POLL_SLEEP)
135                support.gc_collect()  # For PyPy or other GCs.
136            self.assertEqual(thread._count(), orig)
137
138    def test_unraisable_exception(self):
139        def task():
140            started.release()
141            raise ValueError("task failed")
142
143        started = thread.allocate_lock()
144        with support.catch_unraisable_exception() as cm:
145            with threading_helper.wait_threads_exit():
146                started.acquire()
147                thread.start_new_thread(task, ())
148                started.acquire()
149
150            self.assertEqual(str(cm.unraisable.exc_value), "task failed")
151            self.assertIs(cm.unraisable.object, task)
152            self.assertEqual(cm.unraisable.err_msg,
153                             "Exception ignored in thread started by")
154            self.assertIsNotNone(cm.unraisable.exc_traceback)
155
156
157class Barrier:
158    def __init__(self, num_threads):
159        self.num_threads = num_threads
160        self.waiting = 0
161        self.checkin_mutex  = thread.allocate_lock()
162        self.checkout_mutex = thread.allocate_lock()
163        self.checkout_mutex.acquire()
164
165    def enter(self):
166        self.checkin_mutex.acquire()
167        self.waiting = self.waiting + 1
168        if self.waiting == self.num_threads:
169            self.waiting = self.num_threads - 1
170            self.checkout_mutex.release()
171            return
172        self.checkin_mutex.release()
173
174        self.checkout_mutex.acquire()
175        self.waiting = self.waiting - 1
176        if self.waiting == 0:
177            self.checkin_mutex.release()
178            return
179        self.checkout_mutex.release()
180
181
182class BarrierTest(BasicThreadTest):
183
184    def test_barrier(self):
185        with threading_helper.wait_threads_exit():
186            self.bar = Barrier(NUMTASKS)
187            self.running = NUMTASKS
188            for i in range(NUMTASKS):
189                thread.start_new_thread(self.task2, (i,))
190            verbose_print("waiting for tasks to end")
191            self.done_mutex.acquire()
192            verbose_print("tasks done")
193
194    def task2(self, ident):
195        for i in range(NUMTRIPS):
196            if ident == 0:
197                # give it a good chance to enter the next
198                # barrier before the others are all out
199                # of the current one
200                delay = 0
201            else:
202                with self.random_mutex:
203                    delay = random.random() / 10000.0
204            verbose_print("task %s will run for %sus" %
205                          (ident, round(delay * 1e6)))
206            time.sleep(delay)
207            verbose_print("task %s entering %s" % (ident, i))
208            self.bar.enter()
209            verbose_print("task %s leaving barrier" % ident)
210        with self.running_mutex:
211            self.running -= 1
212            # Must release mutex before releasing done, else the main thread can
213            # exit and set mutex to None as part of global teardown; then
214            # mutex.release() raises AttributeError.
215            finished = self.running == 0
216        if finished:
217            self.done_mutex.release()
218
219class LockTests(lock_tests.LockTests):
220    locktype = thread.allocate_lock
221
222
223class TestForkInThread(unittest.TestCase):
224    def setUp(self):
225        self.read_fd, self.write_fd = os.pipe()
226
227    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
228    @threading_helper.reap_threads
229    def test_forkinthread(self):
230        pid = None
231
232        def fork_thread(read_fd, write_fd):
233            nonlocal pid
234
235            # fork in a thread
236            pid = os.fork()
237            if pid:
238                # parent process
239                return
240
241            # child process
242            try:
243                os.close(read_fd)
244                os.write(write_fd, b"OK")
245            finally:
246                os._exit(0)
247
248        with threading_helper.wait_threads_exit():
249            thread.start_new_thread(fork_thread, (self.read_fd, self.write_fd))
250            self.assertEqual(os.read(self.read_fd, 2), b"OK")
251            os.close(self.write_fd)
252
253        self.assertIsNotNone(pid)
254        support.wait_process(pid, exitcode=0)
255
256    def tearDown(self):
257        try:
258            os.close(self.read_fd)
259        except OSError:
260            pass
261
262        try:
263            os.close(self.write_fd)
264        except OSError:
265            pass
266
267
268if __name__ == "__main__":
269    unittest.main()
270