1# 2# A test file for the `processing` package 3# 4 5import time, sys, random 6from queue import Empty 7 8import multiprocess as processing # may get overwritten 9processing.currentProcess = processing.current_process 10processing.freezeSupport = processing.freeze_support 11processing.activeChildren = processing.active_children 12 13 14#### TEST_VALUE 15 16def value_func(running, mutex): 17 random.seed() 18 time.sleep(random.random()*4) 19 20 mutex.acquire() 21 print('\n\t\t\t' + str(processing.currentProcess()) + ' has finished') 22 running.value -= 1 23 mutex.release() 24 25def test_value(): 26 TASKS = 10 27 running = processing.Value('i', TASKS) 28 mutex = processing.Lock() 29 30 for i in range(TASKS): 31 processing.Process(target=value_func, args=(running, mutex)).start() 32 33 while running.value > 0: 34 time.sleep(0.08) 35 mutex.acquire() 36 print(running.value, end=' ') 37 sys.stdout.flush() 38 mutex.release() 39 40 print() 41 print('No more running processes') 42 43 44#### TEST_QUEUE 45 46def queue_func(queue): 47 for i in range(30): 48 time.sleep(0.5 * random.random()) 49 queue.put(i*i) 50 queue.put('STOP') 51 52def test_queue(): 53 q = processing.Queue() 54 55 p = processing.Process(target=queue_func, args=(q,)) 56 p.start() 57 58 o = None 59 while o != 'STOP': 60 try: 61 o = q.get(timeout=0.3) 62 print(o, end=' ') 63 sys.stdout.flush() 64 except Empty: 65 print('TIMEOUT') 66 67 print() 68 69 70#### TEST_CONDITION 71 72def condition_func(cond): 73 cond.acquire() 74 print('\t' + str(cond)) 75 time.sleep(2) 76 print('\tchild is notifying') 77 print('\t' + str(cond)) 78 cond.notify() 79 cond.release() 80 81def test_condition(): 82 cond = processing.Condition() 83 84 p = processing.Process(target=condition_func, args=(cond,)) 85 print(cond) 86 87 cond.acquire() 88 print(cond) 89 cond.acquire() 90 print(cond) 91 92 p.start() 93 94 print('main is waiting') 95 cond.wait() 96 print('main has woken up') 97 98 print(cond) 99 cond.release() 100 print(cond) 101 cond.release() 102 103 p.join() 104 print(cond) 105 106 107#### TEST_SEMAPHORE 108 109def semaphore_func(sema, mutex, running): 110 sema.acquire() 111 112 mutex.acquire() 113 running.value += 1 114 print(running.value, 'tasks are running') 115 mutex.release() 116 117 random.seed() 118 time.sleep(random.random()*2) 119 120 mutex.acquire() 121 running.value -= 1 122 print('%s has finished' % processing.currentProcess()) 123 mutex.release() 124 125 sema.release() 126 127def test_semaphore(): 128 sema = processing.Semaphore(3) 129 mutex = processing.RLock() 130 running = processing.Value('i', 0) 131 132 processes = [ 133 processing.Process(target=semaphore_func, args=(sema, mutex, running)) 134 for i in range(10) 135 ] 136 137 for p in processes: 138 p.start() 139 140 for p in processes: 141 p.join() 142 143 144#### TEST_JOIN_TIMEOUT 145 146def join_timeout_func(): 147 print('\tchild sleeping') 148 time.sleep(5.5) 149 print('\n\tchild terminating') 150 151def test_join_timeout(): 152 p = processing.Process(target=join_timeout_func) 153 p.start() 154 155 print('waiting for process to finish') 156 157 while 1: 158 p.join(timeout=1) 159 if not p.is_alive(): 160 break 161 print('.', end=' ') 162 sys.stdout.flush() 163 164 165#### TEST_EVENT 166 167def event_func(event): 168 print('\t%r is waiting' % processing.currentProcess()) 169 event.wait() 170 print('\t%r has woken up' % processing.currentProcess()) 171 172def test_event(): 173 event = processing.Event() 174 175 processes = [processing.Process(target=event_func, args=(event,)) 176 for i in range(5)] 177 178 for p in processes: 179 p.start() 180 181 print('main is sleeping') 182 time.sleep(2) 183 184 print('main is setting event') 185 event.set() 186 187 for p in processes: 188 p.join() 189 190 191#### TEST_SHAREDVALUES 192 193def sharedvalues_func(values, arrays, shared_values, shared_arrays): 194 for i in range(len(values)): 195 v = values[i][1] 196 sv = shared_values[i].value 197 assert v == sv 198 199 for i in range(len(values)): 200 a = arrays[i][1] 201 sa = list(shared_arrays[i][:]) 202 assert list(a) == sa 203 204 print('Tests passed') 205 206def test_sharedvalues(): 207 values = [ 208 ('i', 10), 209 ('h', -2), 210 ('d', 1.25) 211 ] 212 arrays = [ 213 ('i', range(100)), 214 ('d', [0.25 * i for i in range(100)]), 215 ('H', range(1000)) 216 ] 217 218 shared_values = [processing.Value(id, v) for id, v in values] 219 shared_arrays = [processing.Array(id, a) for id, a in arrays] 220 221 p = processing.Process( 222 target=sharedvalues_func, 223 args=(values, arrays, shared_values, shared_arrays) 224 ) 225 p.start() 226 p.join() 227 228 assert p.exitcode == 0 229 230 231#### 232 233def test(namespace=processing): 234 global processing 235 236 processing = namespace 237 238 for func in [ test_value, test_queue, test_condition, 239 test_semaphore, test_join_timeout, test_event, 240 test_sharedvalues ]: 241 242 print('\n\t######## %s\n' % func.__name__) 243 func() 244 245 ignore = processing.activeChildren() # cleanup any old processes 246 if hasattr(processing, '_debugInfo'): 247 info = processing._debugInfo() 248 if info: 249 print(info) 250 raise ValueError('there should be no positive refcounts left') 251 252 253if __name__ == '__main__': 254 processing.freezeSupport() 255 256 assert len(sys.argv) in (1, 2) 257 258 if len(sys.argv) == 1 or sys.argv[1] == 'processes': 259 print(' Using processes '.center(79, '-')) 260 namespace = processing 261 elif sys.argv[1] == 'manager': 262 print(' Using processes and a manager '.center(79, '-')) 263 namespace = processing.Manager() 264 namespace.Process = processing.Process 265 namespace.currentProcess = processing.currentProcess 266 namespace.activeChildren = processing.activeChildren 267 elif sys.argv[1] == 'threads': 268 print(' Using threads '.center(79, '-')) 269 import processing.dummy as namespace 270 else: 271 print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0]) 272 raise SystemExit(2) 273 274 test(namespace) 275