1# 2# Simple benchmarks for the processing package 3# 4 5import time, sys, multiprocess as processing, threading, Queue, gc 6processing.freezeSupport = processing.freeze_support 7 8if sys.platform == 'win32': 9 _timer = time.clock 10else: 11 _timer = time.time 12 13delta = 1 14 15 16#### TEST_QUEUESPEED 17 18def queuespeed_func(q, c, iterations): 19 a = '0' * 256 20 c.acquire() 21 c.notify() 22 c.release() 23 24 for i in xrange(iterations): 25 q.put(a) 26# q.putMany((a for i in xrange(iterations)) 27 28 q.put('STOP') 29 30def test_queuespeed(Process, q, c): 31 elapsed = 0 32 iterations = 1 33 34 while elapsed < delta: 35 iterations *= 2 36 37 p = Process(target=queuespeed_func, args=(q, c, iterations)) 38 c.acquire() 39 p.start() 40 c.wait() 41 c.release() 42 43 result = None 44 t = _timer() 45 46 while result != 'STOP': 47 result = q.get() 48 49 elapsed = _timer() - t 50 51 p.join() 52 53 print iterations, 'objects passed through the queue in', elapsed, 'seconds' 54 print 'average number/sec:', iterations/elapsed 55 56 57#### TEST_PIPESPEED 58 59def pipe_func(c, cond, iterations): 60 a = '0' * 256 61 cond.acquire() 62 cond.notify() 63 cond.release() 64 65 for i in xrange(iterations): 66 c.send(a) 67 68 c.send('STOP') 69 70def test_pipespeed(): 71 c, d = processing.Pipe() 72 cond = processing.Condition() 73 elapsed = 0 74 iterations = 1 75 76 while elapsed < delta: 77 iterations *= 2 78 79 p = processing.Process(target=pipe_func, args=(d, cond, iterations)) 80 cond.acquire() 81 p.start() 82 cond.wait() 83 cond.release() 84 85 result = None 86 t = _timer() 87 88 while result != 'STOP': 89 result = c.recv() 90 91 elapsed = _timer() - t 92 p.join() 93 94 print iterations, 'objects passed through connection in',elapsed,'seconds' 95 print 'average number/sec:', iterations/elapsed 96 97 98#### TEST_SEQSPEED 99 100def test_seqspeed(seq): 101 elapsed = 0 102 iterations = 1 103 104 while elapsed < delta: 105 iterations *= 2 106 107 t = _timer() 108 109 for i in xrange(iterations): 110 a = seq[5] 111 112 elapsed = _timer()-t 113 114 print iterations, 'iterations in', elapsed, 'seconds' 115 print 'average number/sec:', iterations/elapsed 116 117 118#### TEST_LOCK 119 120def test_lockspeed(l): 121 elapsed = 0 122 iterations = 1 123 124 while elapsed < delta: 125 iterations *= 2 126 127 t = _timer() 128 129 for i in xrange(iterations): 130 l.acquire() 131 l.release() 132 133 elapsed = _timer()-t 134 135 print iterations, 'iterations in', elapsed, 'seconds' 136 print 'average number/sec:', iterations/elapsed 137 138 139#### TEST_CONDITION 140 141def conditionspeed_func(c, N): 142 c.acquire() 143 c.notify() 144 145 for i in xrange(N): 146 c.wait() 147 c.notify() 148 149 c.release() 150 151def test_conditionspeed(Process, c): 152 elapsed = 0 153 iterations = 1 154 155 while elapsed < delta: 156 iterations *= 2 157 158 c.acquire() 159 p = Process(target=conditionspeed_func, args=(c, iterations)) 160 p.start() 161 162 c.wait() 163 164 t = _timer() 165 166 for i in xrange(iterations): 167 c.notify() 168 c.wait() 169 170 elapsed = _timer()-t 171 172 c.release() 173 p.join() 174 175 print iterations * 2, 'waits in', elapsed, 'seconds' 176 print 'average number/sec:', iterations * 2 / elapsed 177 178#### 179 180def test(): 181 manager = processing.Manager() 182 183 gc.disable() 184 185 print '\n\t######## testing Queue.Queue\n' 186 test_queuespeed(threading.Thread, Queue.Queue(), 187 threading.Condition()) 188 print '\n\t######## testing processing.Queue\n' 189 test_queuespeed(processing.Process, processing.Queue(), 190 processing.Condition()) 191 print '\n\t######## testing Queue managed by server process\n' 192 test_queuespeed(processing.Process, manager.Queue(), 193 manager.Condition()) 194 print '\n\t######## testing processing.Pipe\n' 195 test_pipespeed() 196 197 print 198 199 print '\n\t######## testing list\n' 200 test_seqspeed(range(10)) 201 print '\n\t######## testing list managed by server process\n' 202 test_seqspeed(manager.list(range(10))) 203 print '\n\t######## testing Array("i", ..., lock=False)\n' 204 test_seqspeed(processing.Array('i', range(10), lock=False)) 205 print '\n\t######## testing Array("i", ..., lock=True)\n' 206 test_seqspeed(processing.Array('i', range(10), lock=True)) 207 208 print 209 210 print '\n\t######## testing threading.Lock\n' 211 test_lockspeed(threading.Lock()) 212 print '\n\t######## testing threading.RLock\n' 213 test_lockspeed(threading.RLock()) 214 print '\n\t######## testing processing.Lock\n' 215 test_lockspeed(processing.Lock()) 216 print '\n\t######## testing processing.RLock\n' 217 test_lockspeed(processing.RLock()) 218 print '\n\t######## testing lock managed by server process\n' 219 test_lockspeed(manager.Lock()) 220 print '\n\t######## testing rlock managed by server process\n' 221 test_lockspeed(manager.RLock()) 222 223 print 224 225 print '\n\t######## testing threading.Condition\n' 226 test_conditionspeed(threading.Thread, threading.Condition()) 227 print '\n\t######## testing processing.Condition\n' 228 test_conditionspeed(processing.Process, processing.Condition()) 229 print '\n\t######## testing condition managed by a server process\n' 230 test_conditionspeed(processing.Process, manager.Condition()) 231 232 gc.enable() 233 234if __name__ == '__main__': 235 processing.freezeSupport() 236 test() 237