1import unittest
2from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP
3from pygame import threads, Surface, transform
4from pygame.compat import xrange_
5
6import time
7
8
9class WorkerQueueTypeTest(unittest.TestCase):
10    def test_usage_with_different_functions(self):
11        def f(x):
12            return x + 1
13
14        def f2(x):
15            return x + 2
16
17        wq = WorkerQueue()
18        fr = FuncResult(f)
19        fr2 = FuncResult(f2)
20        wq.do(fr, 1)
21        wq.do(fr2, 1)
22        wq.wait()
23        wq.stop()
24
25        self.assertEqual(fr.result, 2)
26        self.assertEqual(fr2.result, 3)
27
28    def test_do(self):
29        """ Tests function placement on queue and execution after blocking function completion."""
30        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do:
31
32        # puts a function on a queue for running _later_.
33
34        # TODO: This tests needs refactoring to avoid sleep.
35        #       sleep is slow and unreliable (especially on VMs).
36
37        # def sleep_test():
38        #     time.sleep(0.5)
39
40        # def calc_test(x):
41        #     return x + 1
42
43        # worker_queue = WorkerQueue(num_workers=1)
44        # sleep_return = FuncResult(sleep_test)
45        # calc_return = FuncResult(calc_test)
46        # init_time = time.time()
47        # worker_queue.do(sleep_return)
48        # worker_queue.do(calc_return, 1)
49        # worker_queue.wait()
50        # worker_queue.stop()
51        # time_diff = time.time() - init_time
52
53        # self.assertEqual(sleep_return.result, None)
54        # self.assertEqual(calc_return.result, 2)
55        # self.assertGreaterEqual(time_diff, 0.5)
56
57    def test_stop(self):
58        """Ensure stop() stops the worker queue"""
59        wq = WorkerQueue()
60
61        self.assertGreater(len(wq.pool), 0)
62
63        for t in wq.pool:
64            self.assertTrue(t.is_alive())
65
66        for i in xrange_(200):
67            wq.do(lambda x: x + 1, i)
68
69        wq.stop()
70
71        for t in wq.pool:
72            self.assertFalse(t.is_alive())
73
74        self.assertIs(wq.queue.get(), STOP)
75
76    def test_threadloop(self):
77
78        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop:
79
80        # Loops until all of the tasks are finished.
81
82        #Make a worker queue with only one thread
83        wq = WorkerQueue(1)
84
85        #Ocuppy the one worker with the threadloop
86        #wq threads are just threadloop, so this makes an embedded threadloop
87        wq.do(wq.threadloop)
88
89        #Make sure wq can still do work
90        #If wq can still do work, threadloop works
91        l = []
92        wq.do(l.append,1)
93        #Wait won't work because the primary thread is in an infinite loop
94        time.sleep(.5)
95        self.assertEqual(l[0],1)
96
97        #Kill the embedded threadloop by sending stop onto the stack
98        #Threadloop puts STOP back onto the queue when it STOPs so this kills both loops
99        wq.stop()
100
101        #Make sure wq has stopped
102        self.assertFalse(wq.pool[0].is_alive())
103
104    def test_wait(self):
105
106        # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait:
107
108        # waits until all tasks are complete.
109
110        wq = WorkerQueue()
111
112        for i in xrange_(2000):
113            wq.do(lambda x: x + 1, i)
114        wq.wait()
115
116        self.assertRaises(Empty, wq.queue.get_nowait)
117
118        wq.stop()
119
120
121class ThreadsModuleTest(unittest.TestCase):
122    def test_benchmark_workers(self):
123        """Ensure benchmark_workers performance measure functions properly with both default and specified inputs"""
124        "tags:long_running"
125
126        # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers:
127
128        # does a little test to see if workers are at all faster.
129        # Returns the number of workers which works best.
130        # Takes a little bit of time to run, so you should only really call
131        #   it once.
132        # You can pass in benchmark data, and functions if you want.
133        # a_bench_func - f(data)
134        # the_data - data to work on.
135        optimal_workers = threads.benchmark_workers()
136        self.assertIsInstance(optimal_workers, int)
137        self.assertTrue(0 <= optimal_workers < 64)
138
139        # Test passing benchmark data and function explicitly
140        def smooth_scale_bench(data):
141            transform.smoothscale(data, (128, 128))
142
143        surf_data = [Surface((x, x), 0, 32) for x in range(12, 64, 12)]
144        best_num_workers = threads.benchmark_workers(smooth_scale_bench, surf_data)
145        self.assertIsInstance(best_num_workers, int)
146
147    def test_init(self):
148        """Ensure init() sets up the worker queue"""
149        threads.init(8)
150
151        self.assertIsInstance(threads._wq, WorkerQueue)
152
153        threads.quit()
154
155    def test_quit(self):
156        """Ensure quit() cleans up the worker queue"""
157        threads.init(8)
158        threads.quit()
159
160        self.assertIsNone(threads._wq)
161
162    def test_tmap(self):
163        # __doc__ (as of 2008-06-28) for pygame.threads.tmap:
164
165        # like map, but uses a thread pool to execute.
166        # num_workers - the number of worker threads that will be used.  If pool
167        #                 is passed in, then the num_workers arg is ignored.
168        # worker_queue - you can optionally pass in an existing WorkerQueue.
169        # wait - True means that the results are returned when everything is finished.
170        #        False means that we return the [worker_queue, results] right away instead.
171        #        results, is returned as a list of FuncResult instances.
172        # stop_on_error -
173
174        ## test that the outcomes of map and tmap are the same
175        func, data = lambda x: x + 1, xrange_(100)
176
177        tmapped = list(tmap(func, data))
178        mapped = list(map(func, data))
179
180        self.assertEqual(tmapped, mapped)
181
182        ## Test that setting tmap to not stop on errors produces the expected result
183        data2 = xrange_(100)
184        always_excepts = lambda x: 1/0
185
186        tmapped2 = list(tmap(always_excepts, data2, stop_on_error=False))
187
188        # Use list comprehension to check all entries are None as all function
189        # calls made by tmap will have thrown an exception (ZeroDivisionError)
190        # Condense to single bool with `all`, which will return true if all
191        # entries are true
192        self.assertTrue(all([x is None for x in tmapped2]))
193
194
195
196
197
198    def todo_test_tmap__None_func_and_multiple_sequences(self):
199        """Using a None as func and multiple sequences"""
200        self.fail()
201
202        res = tmap(None, [1, 2, 3, 4])
203        res2 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55])
204        res3 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55, 66])
205        res4 = tmap(None, [1, 2, 3, 4, 5], [22, 33, 44, 55])
206
207        self.assertEqual([1, 2, 3, 4], res)
208        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2)
209        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3)
210        self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5, None)], res4)
211
212    def test_tmap__wait(self):
213        r = range(1000)
214        wq, results = tmap(lambda x: x, r, num_workers=5, wait=False)
215        wq.wait()
216        r2 = map(lambda x: x.result, results)
217        self.assertEqual(list(r), list(r2))
218
219    def test_FuncResult(self):
220        """Ensure FuncResult sets its result and exception attributes"""
221        # Results are stored in result attribute
222        fr = FuncResult(lambda x: x + 1)
223        fr(2)
224
225        self.assertEqual(fr.result, 3)
226
227        # Exceptions are store in exception attribute
228        self.assertIsNone(fr.exception, "no exception should be raised")
229
230        exception = ValueError("rast")
231
232        def x(sdf):
233            raise exception
234
235        fr = FuncResult(x)
236        fr(None)
237
238        self.assertIs(fr.exception, exception)
239
240
241################################################################################
242
243if __name__ == "__main__":
244    unittest.main()
245