1import unittest 2from pygame.threads import FuncResult, tmap, WorkerQueue, Empty, STOP 3from pygame import threads, Surface, transform 4from pygame.compat import xrange_ 5 6import time 7 8 9class WorkerQueueTypeTest(unittest.TestCase): 10 def test_usage_with_different_functions(self): 11 def f(x): 12 return x + 1 13 14 def f2(x): 15 return x + 2 16 17 wq = WorkerQueue() 18 fr = FuncResult(f) 19 fr2 = FuncResult(f2) 20 wq.do(fr, 1) 21 wq.do(fr2, 1) 22 wq.wait() 23 wq.stop() 24 25 self.assertEqual(fr.result, 2) 26 self.assertEqual(fr2.result, 3) 27 28 def test_do(self): 29 """ Tests function placement on queue and execution after blocking function completion.""" 30 # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.do: 31 32 # puts a function on a queue for running _later_. 33 34 # TODO: This tests needs refactoring to avoid sleep. 35 # sleep is slow and unreliable (especially on VMs). 36 37 # def sleep_test(): 38 # time.sleep(0.5) 39 40 # def calc_test(x): 41 # return x + 1 42 43 # worker_queue = WorkerQueue(num_workers=1) 44 # sleep_return = FuncResult(sleep_test) 45 # calc_return = FuncResult(calc_test) 46 # init_time = time.time() 47 # worker_queue.do(sleep_return) 48 # worker_queue.do(calc_return, 1) 49 # worker_queue.wait() 50 # worker_queue.stop() 51 # time_diff = time.time() - init_time 52 53 # self.assertEqual(sleep_return.result, None) 54 # self.assertEqual(calc_return.result, 2) 55 # self.assertGreaterEqual(time_diff, 0.5) 56 57 def test_stop(self): 58 """Ensure stop() stops the worker queue""" 59 wq = WorkerQueue() 60 61 self.assertGreater(len(wq.pool), 0) 62 63 for t in wq.pool: 64 self.assertTrue(t.is_alive()) 65 66 for i in xrange_(200): 67 wq.do(lambda x: x + 1, i) 68 69 wq.stop() 70 71 for t in wq.pool: 72 self.assertFalse(t.is_alive()) 73 74 self.assertIs(wq.queue.get(), STOP) 75 76 def test_threadloop(self): 77 78 # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.threadloop: 79 80 # Loops until all of the tasks are finished. 81 82 #Make a worker queue with only one thread 83 wq = WorkerQueue(1) 84 85 #Ocuppy the one worker with the threadloop 86 #wq threads are just threadloop, so this makes an embedded threadloop 87 wq.do(wq.threadloop) 88 89 #Make sure wq can still do work 90 #If wq can still do work, threadloop works 91 l = [] 92 wq.do(l.append,1) 93 #Wait won't work because the primary thread is in an infinite loop 94 time.sleep(.5) 95 self.assertEqual(l[0],1) 96 97 #Kill the embedded threadloop by sending stop onto the stack 98 #Threadloop puts STOP back onto the queue when it STOPs so this kills both loops 99 wq.stop() 100 101 #Make sure wq has stopped 102 self.assertFalse(wq.pool[0].is_alive()) 103 104 def test_wait(self): 105 106 # __doc__ (as of 2008-06-28) for pygame.threads.WorkerQueue.wait: 107 108 # waits until all tasks are complete. 109 110 wq = WorkerQueue() 111 112 for i in xrange_(2000): 113 wq.do(lambda x: x + 1, i) 114 wq.wait() 115 116 self.assertRaises(Empty, wq.queue.get_nowait) 117 118 wq.stop() 119 120 121class ThreadsModuleTest(unittest.TestCase): 122 def test_benchmark_workers(self): 123 """Ensure benchmark_workers performance measure functions properly with both default and specified inputs""" 124 "tags:long_running" 125 126 # __doc__ (as of 2008-06-28) for pygame.threads.benchmark_workers: 127 128 # does a little test to see if workers are at all faster. 129 # Returns the number of workers which works best. 130 # Takes a little bit of time to run, so you should only really call 131 # it once. 132 # You can pass in benchmark data, and functions if you want. 133 # a_bench_func - f(data) 134 # the_data - data to work on. 135 optimal_workers = threads.benchmark_workers() 136 self.assertIsInstance(optimal_workers, int) 137 self.assertTrue(0 <= optimal_workers < 64) 138 139 # Test passing benchmark data and function explicitly 140 def smooth_scale_bench(data): 141 transform.smoothscale(data, (128, 128)) 142 143 surf_data = [Surface((x, x), 0, 32) for x in range(12, 64, 12)] 144 best_num_workers = threads.benchmark_workers(smooth_scale_bench, surf_data) 145 self.assertIsInstance(best_num_workers, int) 146 147 def test_init(self): 148 """Ensure init() sets up the worker queue""" 149 threads.init(8) 150 151 self.assertIsInstance(threads._wq, WorkerQueue) 152 153 threads.quit() 154 155 def test_quit(self): 156 """Ensure quit() cleans up the worker queue""" 157 threads.init(8) 158 threads.quit() 159 160 self.assertIsNone(threads._wq) 161 162 def test_tmap(self): 163 # __doc__ (as of 2008-06-28) for pygame.threads.tmap: 164 165 # like map, but uses a thread pool to execute. 166 # num_workers - the number of worker threads that will be used. If pool 167 # is passed in, then the num_workers arg is ignored. 168 # worker_queue - you can optionally pass in an existing WorkerQueue. 169 # wait - True means that the results are returned when everything is finished. 170 # False means that we return the [worker_queue, results] right away instead. 171 # results, is returned as a list of FuncResult instances. 172 # stop_on_error - 173 174 ## test that the outcomes of map and tmap are the same 175 func, data = lambda x: x + 1, xrange_(100) 176 177 tmapped = list(tmap(func, data)) 178 mapped = list(map(func, data)) 179 180 self.assertEqual(tmapped, mapped) 181 182 ## Test that setting tmap to not stop on errors produces the expected result 183 data2 = xrange_(100) 184 always_excepts = lambda x: 1/0 185 186 tmapped2 = list(tmap(always_excepts, data2, stop_on_error=False)) 187 188 # Use list comprehension to check all entries are None as all function 189 # calls made by tmap will have thrown an exception (ZeroDivisionError) 190 # Condense to single bool with `all`, which will return true if all 191 # entries are true 192 self.assertTrue(all([x is None for x in tmapped2])) 193 194 195 196 197 198 def todo_test_tmap__None_func_and_multiple_sequences(self): 199 """Using a None as func and multiple sequences""" 200 self.fail() 201 202 res = tmap(None, [1, 2, 3, 4]) 203 res2 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55]) 204 res3 = tmap(None, [1, 2, 3, 4], [22, 33, 44, 55, 66]) 205 res4 = tmap(None, [1, 2, 3, 4, 5], [22, 33, 44, 55]) 206 207 self.assertEqual([1, 2, 3, 4], res) 208 self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55)], res2) 209 self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (None, 66)], res3) 210 self.assertEqual([(1, 22), (2, 33), (3, 44), (4, 55), (5, None)], res4) 211 212 def test_tmap__wait(self): 213 r = range(1000) 214 wq, results = tmap(lambda x: x, r, num_workers=5, wait=False) 215 wq.wait() 216 r2 = map(lambda x: x.result, results) 217 self.assertEqual(list(r), list(r2)) 218 219 def test_FuncResult(self): 220 """Ensure FuncResult sets its result and exception attributes""" 221 # Results are stored in result attribute 222 fr = FuncResult(lambda x: x + 1) 223 fr(2) 224 225 self.assertEqual(fr.result, 3) 226 227 # Exceptions are store in exception attribute 228 self.assertIsNone(fr.exception, "no exception should be raised") 229 230 exception = ValueError("rast") 231 232 def x(sdf): 233 raise exception 234 235 fr = FuncResult(x) 236 fr(None) 237 238 self.assertIs(fr.exception, exception) 239 240 241################################################################################ 242 243if __name__ == "__main__": 244 unittest.main() 245