1#
2# Simple benchmarks for the processing package
3#
4
5import time, sys, multiprocess as processing, threading, Queue, gc
6processing.freezeSupport = processing.freeze_support
7
8if sys.platform == 'win32':
9    _timer = time.clock
10else:
11    _timer = time.time
12
13delta = 1
14
15
16#### TEST_QUEUESPEED
17
18def queuespeed_func(q, c, iterations):
19    a = '0' * 256
20    c.acquire()
21    c.notify()
22    c.release()
23
24    for i in xrange(iterations):
25        q.put(a)
26#    q.putMany((a for i in xrange(iterations))
27
28    q.put('STOP')
29
30def test_queuespeed(Process, q, c):
31    elapsed = 0
32    iterations = 1
33
34    while elapsed < delta:
35        iterations *= 2
36
37        p = Process(target=queuespeed_func, args=(q, c, iterations))
38        c.acquire()
39        p.start()
40        c.wait()
41        c.release()
42
43        result = None
44        t = _timer()
45
46        while result != 'STOP':
47            result = q.get()
48
49        elapsed = _timer() - t
50
51        p.join()
52
53    print iterations, 'objects passed through the queue in', elapsed, 'seconds'
54    print 'average number/sec:', iterations/elapsed
55
56
57#### TEST_PIPESPEED
58
59def pipe_func(c, cond, iterations):
60    a = '0' * 256
61    cond.acquire()
62    cond.notify()
63    cond.release()
64
65    for i in xrange(iterations):
66        c.send(a)
67
68    c.send('STOP')
69
70def test_pipespeed():
71    c, d = processing.Pipe()
72    cond = processing.Condition()
73    elapsed = 0
74    iterations = 1
75
76    while elapsed < delta:
77        iterations *= 2
78
79        p = processing.Process(target=pipe_func, args=(d, cond, iterations))
80        cond.acquire()
81        p.start()
82        cond.wait()
83        cond.release()
84
85        result = None
86        t = _timer()
87
88        while result != 'STOP':
89            result = c.recv()
90
91        elapsed = _timer() - t
92        p.join()
93
94    print iterations, 'objects passed through connection in',elapsed,'seconds'
95    print 'average number/sec:', iterations/elapsed
96
97
98#### TEST_SEQSPEED
99
100def test_seqspeed(seq):
101    elapsed = 0
102    iterations = 1
103
104    while elapsed < delta:
105        iterations *= 2
106
107        t = _timer()
108
109        for i in xrange(iterations):
110            a = seq[5]
111
112        elapsed = _timer()-t
113
114    print iterations, 'iterations in', elapsed, 'seconds'
115    print 'average number/sec:', iterations/elapsed
116
117
118#### TEST_LOCK
119
120def test_lockspeed(l):
121    elapsed = 0
122    iterations = 1
123
124    while elapsed < delta:
125        iterations *= 2
126
127        t = _timer()
128
129        for i in xrange(iterations):
130            l.acquire()
131            l.release()
132
133        elapsed = _timer()-t
134
135    print iterations, 'iterations in', elapsed, 'seconds'
136    print 'average number/sec:', iterations/elapsed
137
138
139#### TEST_CONDITION
140
141def conditionspeed_func(c, N):
142    c.acquire()
143    c.notify()
144
145    for i in xrange(N):
146        c.wait()
147        c.notify()
148
149    c.release()
150
151def test_conditionspeed(Process, c):
152    elapsed = 0
153    iterations = 1
154
155    while elapsed < delta:
156        iterations *= 2
157
158        c.acquire()
159        p = Process(target=conditionspeed_func, args=(c, iterations))
160        p.start()
161
162        c.wait()
163
164        t = _timer()
165
166        for i in xrange(iterations):
167            c.notify()
168            c.wait()
169
170        elapsed = _timer()-t
171
172        c.release()
173        p.join()
174
175    print iterations * 2, 'waits in', elapsed, 'seconds'
176    print 'average number/sec:', iterations * 2 / elapsed
177
178####
179
180def test():
181    manager = processing.Manager()
182
183    gc.disable()
184
185    print '\n\t######## testing Queue.Queue\n'
186    test_queuespeed(threading.Thread, Queue.Queue(),
187                    threading.Condition())
188    print '\n\t######## testing processing.Queue\n'
189    test_queuespeed(processing.Process, processing.Queue(),
190                    processing.Condition())
191    print '\n\t######## testing Queue managed by server process\n'
192    test_queuespeed(processing.Process, manager.Queue(),
193                    manager.Condition())
194    print '\n\t######## testing processing.Pipe\n'
195    test_pipespeed()
196
197    print
198
199    print '\n\t######## testing list\n'
200    test_seqspeed(range(10))
201    print '\n\t######## testing list managed by server process\n'
202    test_seqspeed(manager.list(range(10)))
203    print '\n\t######## testing Array("i", ..., lock=False)\n'
204    test_seqspeed(processing.Array('i', range(10), lock=False))
205    print '\n\t######## testing Array("i", ..., lock=True)\n'
206    test_seqspeed(processing.Array('i', range(10), lock=True))
207
208    print
209
210    print '\n\t######## testing threading.Lock\n'
211    test_lockspeed(threading.Lock())
212    print '\n\t######## testing threading.RLock\n'
213    test_lockspeed(threading.RLock())
214    print '\n\t######## testing processing.Lock\n'
215    test_lockspeed(processing.Lock())
216    print '\n\t######## testing processing.RLock\n'
217    test_lockspeed(processing.RLock())
218    print '\n\t######## testing lock managed by server process\n'
219    test_lockspeed(manager.Lock())
220    print '\n\t######## testing rlock managed by server process\n'
221    test_lockspeed(manager.RLock())
222
223    print
224
225    print '\n\t######## testing threading.Condition\n'
226    test_conditionspeed(threading.Thread, threading.Condition())
227    print '\n\t######## testing processing.Condition\n'
228    test_conditionspeed(processing.Process, processing.Condition())
229    print '\n\t######## testing condition managed by a server process\n'
230    test_conditionspeed(processing.Process, manager.Condition())
231
232    gc.enable()
233
234if __name__ == '__main__':
235    processing.freezeSupport()
236    test()
237