1import os
2import unittest
3import random
4from test import support
5import _thread as thread
6import time
7import weakref
8
9from test import lock_tests
10
11NUMTASKS = 10
12NUMTRIPS = 3
13POLL_SLEEP = 0.010 # seconds = 10 ms
14
15_print_mutex = thread.allocate_lock()
16
17def verbose_print(arg):
18    """Helper function for printing out debugging output."""
19    if support.verbose:
20        with _print_mutex:
21            print(arg)
22
23
24class BasicThreadTest(unittest.TestCase):
25
26    def setUp(self):
27        self.done_mutex = thread.allocate_lock()
28        self.done_mutex.acquire()
29        self.running_mutex = thread.allocate_lock()
30        self.random_mutex = thread.allocate_lock()
31        self.created = 0
32        self.running = 0
33        self.next_ident = 0
34
35        key = support.threading_setup()
36        self.addCleanup(support.threading_cleanup, *key)
37
38
39class ThreadRunningTests(BasicThreadTest):
40
41    def newtask(self):
42        with self.running_mutex:
43            self.next_ident += 1
44            verbose_print("creating task %s" % self.next_ident)
45            thread.start_new_thread(self.task, (self.next_ident,))
46            self.created += 1
47            self.running += 1
48
49    def task(self, ident):
50        with self.random_mutex:
51            delay = random.random() / 10000.0
52        verbose_print("task %s will run for %sus" % (ident, round(delay*1e6)))
53        time.sleep(delay)
54        verbose_print("task %s done" % ident)
55        with self.running_mutex:
56            self.running -= 1
57            if self.created == NUMTASKS and self.running == 0:
58                self.done_mutex.release()
59
60    def test_starting_threads(self):
61        with support.wait_threads_exit():
62            # Basic test for thread creation.
63            for i in range(NUMTASKS):
64                self.newtask()
65            verbose_print("waiting for tasks to complete...")
66            self.done_mutex.acquire()
67            verbose_print("all tasks done")
68
69    def test_stack_size(self):
70        # Various stack size tests.
71        self.assertEqual(thread.stack_size(), 0, "initial stack size is not 0")
72
73        thread.stack_size(0)
74        self.assertEqual(thread.stack_size(), 0, "stack_size not reset to default")
75
76    @unittest.skipIf(os.name not in ("nt", "posix"), 'test meant for nt and posix')
77    def test_nt_and_posix_stack_size(self):
78        try:
79            thread.stack_size(4096)
80        except ValueError:
81            verbose_print("caught expected ValueError setting "
82                            "stack_size(4096)")
83        except thread.error:
84            self.skipTest("platform does not support changing thread stack "
85                          "size")
86
87        fail_msg = "stack_size(%d) failed - should succeed"
88        for tss in (262144, 0x100000, 0):
89            thread.stack_size(tss)
90            self.assertEqual(thread.stack_size(), tss, fail_msg % tss)
91            verbose_print("successfully set stack_size(%d)" % tss)
92
93        for tss in (262144, 0x100000):
94            verbose_print("trying stack_size = (%d)" % tss)
95            self.next_ident = 0
96            self.created = 0
97            with support.wait_threads_exit():
98                for i in range(NUMTASKS):
99                    self.newtask()
100
101                verbose_print("waiting for all tasks to complete")
102                self.done_mutex.acquire()
103                verbose_print("all tasks done")
104
105        thread.stack_size(0)
106
107    def test__count(self):
108        # Test the _count() function.
109        orig = thread._count()
110        mut = thread.allocate_lock()
111        mut.acquire()
112        started = []
113
114        def task():
115            started.append(None)
116            mut.acquire()
117            mut.release()
118
119        with support.wait_threads_exit():
120            thread.start_new_thread(task, ())
121            while not started:
122                time.sleep(POLL_SLEEP)
123            self.assertEqual(thread._count(), orig + 1)
124            # Allow the task to finish.
125            mut.release()
126            # The only reliable way to be sure that the thread ended from the
127            # interpreter's point of view is to wait for the function object to be
128            # destroyed.
129            done = []
130            wr = weakref.ref(task, lambda _: done.append(None))
131            del task
132            while not done:
133                time.sleep(POLL_SLEEP)
134            self.assertEqual(thread._count(), orig)
135
136    def test_unraisable_exception(self):
137        def task():
138            started.release()
139            raise ValueError("task failed")
140
141        started = thread.allocate_lock()
142        with support.catch_unraisable_exception() as cm:
143            with support.wait_threads_exit():
144                started.acquire()
145                thread.start_new_thread(task, ())
146                started.acquire()
147
148            self.assertEqual(str(cm.unraisable.exc_value), "task failed")
149            self.assertIs(cm.unraisable.object, task)
150            self.assertEqual(cm.unraisable.err_msg,
151                             "Exception ignored in thread started by")
152            self.assertIsNotNone(cm.unraisable.exc_traceback)
153
154
155class Barrier:
156    def __init__(self, num_threads):
157        self.num_threads = num_threads
158        self.waiting = 0
159        self.checkin_mutex  = thread.allocate_lock()
160        self.checkout_mutex = thread.allocate_lock()
161        self.checkout_mutex.acquire()
162
163    def enter(self):
164        self.checkin_mutex.acquire()
165        self.waiting = self.waiting + 1
166        if self.waiting == self.num_threads:
167            self.waiting = self.num_threads - 1
168            self.checkout_mutex.release()
169            return
170        self.checkin_mutex.release()
171
172        self.checkout_mutex.acquire()
173        self.waiting = self.waiting - 1
174        if self.waiting == 0:
175            self.checkin_mutex.release()
176            return
177        self.checkout_mutex.release()
178
179
180class BarrierTest(BasicThreadTest):
181
182    def test_barrier(self):
183        with support.wait_threads_exit():
184            self.bar = Barrier(NUMTASKS)
185            self.running = NUMTASKS
186            for i in range(NUMTASKS):
187                thread.start_new_thread(self.task2, (i,))
188            verbose_print("waiting for tasks to end")
189            self.done_mutex.acquire()
190            verbose_print("tasks done")
191
192    def task2(self, ident):
193        for i in range(NUMTRIPS):
194            if ident == 0:
195                # give it a good chance to enter the next
196                # barrier before the others are all out
197                # of the current one
198                delay = 0
199            else:
200                with self.random_mutex:
201                    delay = random.random() / 10000.0
202            verbose_print("task %s will run for %sus" %
203                          (ident, round(delay * 1e6)))
204            time.sleep(delay)
205            verbose_print("task %s entering %s" % (ident, i))
206            self.bar.enter()
207            verbose_print("task %s leaving barrier" % ident)
208        with self.running_mutex:
209            self.running -= 1
210            # Must release mutex before releasing done, else the main thread can
211            # exit and set mutex to None as part of global teardown; then
212            # mutex.release() raises AttributeError.
213            finished = self.running == 0
214        if finished:
215            self.done_mutex.release()
216
217class LockTests(lock_tests.LockTests):
218    locktype = thread.allocate_lock
219
220
221class TestForkInThread(unittest.TestCase):
222    def setUp(self):
223        self.read_fd, self.write_fd = os.pipe()
224
225    @unittest.skipUnless(hasattr(os, 'fork'), 'need os.fork')
226    @support.reap_threads
227    def test_forkinthread(self):
228        status = "not set"
229
230        def thread1():
231            nonlocal status
232
233            # fork in a thread
234            pid = os.fork()
235            if pid == 0:
236                # child
237                try:
238                    os.close(self.read_fd)
239                    os.write(self.write_fd, b"OK")
240                finally:
241                    os._exit(0)
242            else:
243                # parent
244                os.close(self.write_fd)
245                pid, status = os.waitpid(pid, 0)
246
247        with support.wait_threads_exit():
248            thread.start_new_thread(thread1, ())
249            self.assertEqual(os.read(self.read_fd, 2), b"OK",
250                             "Unable to fork() in thread")
251        self.assertEqual(status, 0)
252
253    def tearDown(self):
254        try:
255            os.close(self.read_fd)
256        except OSError:
257            pass
258
259        try:
260            os.close(self.write_fd)
261        except OSError:
262            pass
263
264
265if __name__ == "__main__":
266    unittest.main()
267