1#
2# A test file for the `processing` package
3#
4
5import time, sys, random
6from queue import Empty
7
8import multiprocess as processing               # may get overwritten
9processing.currentProcess = processing.current_process
10processing.freezeSupport = processing.freeze_support
11processing.activeChildren = processing.active_children
12
13
14#### TEST_VALUE
15
16def value_func(running, mutex):
17    random.seed()
18    time.sleep(random.random()*4)
19
20    mutex.acquire()
21    print('\n\t\t\t' + str(processing.currentProcess()) + ' has finished')
22    running.value -= 1
23    mutex.release()
24
25def test_value():
26    TASKS = 10
27    running = processing.Value('i', TASKS)
28    mutex = processing.Lock()
29
30    for i in range(TASKS):
31        processing.Process(target=value_func, args=(running, mutex)).start()
32
33    while running.value > 0:
34        time.sleep(0.08)
35        mutex.acquire()
36        print(running.value, end=' ')
37        sys.stdout.flush()
38        mutex.release()
39
40    print()
41    print('No more running processes')
42
43
44#### TEST_QUEUE
45
46def queue_func(queue):
47    for i in range(30):
48        time.sleep(0.5 * random.random())
49        queue.put(i*i)
50    queue.put('STOP')
51
52def test_queue():
53    q = processing.Queue()
54
55    p = processing.Process(target=queue_func, args=(q,))
56    p.start()
57
58    o = None
59    while o != 'STOP':
60        try:
61            o = q.get(timeout=0.3)
62            print(o, end=' ')
63            sys.stdout.flush()
64        except Empty:
65            print('TIMEOUT')
66
67    print()
68
69
70#### TEST_CONDITION
71
72def condition_func(cond):
73    cond.acquire()
74    print('\t' + str(cond))
75    time.sleep(2)
76    print('\tchild is notifying')
77    print('\t' + str(cond))
78    cond.notify()
79    cond.release()
80
81def test_condition():
82    cond = processing.Condition()
83
84    p = processing.Process(target=condition_func, args=(cond,))
85    print(cond)
86
87    cond.acquire()
88    print(cond)
89    cond.acquire()
90    print(cond)
91
92    p.start()
93
94    print('main is waiting')
95    cond.wait()
96    print('main has woken up')
97
98    print(cond)
99    cond.release()
100    print(cond)
101    cond.release()
102
103    p.join()
104    print(cond)
105
106
107#### TEST_SEMAPHORE
108
109def semaphore_func(sema, mutex, running):
110    sema.acquire()
111
112    mutex.acquire()
113    running.value += 1
114    print(running.value, 'tasks are running')
115    mutex.release()
116
117    random.seed()
118    time.sleep(random.random()*2)
119
120    mutex.acquire()
121    running.value -= 1
122    print('%s has finished' % processing.currentProcess())
123    mutex.release()
124
125    sema.release()
126
127def test_semaphore():
128    sema = processing.Semaphore(3)
129    mutex = processing.RLock()
130    running = processing.Value('i', 0)
131
132    processes = [
133        processing.Process(target=semaphore_func, args=(sema, mutex, running))
134        for i in range(10)
135        ]
136
137    for p in processes:
138        p.start()
139
140    for p in processes:
141        p.join()
142
143
144#### TEST_JOIN_TIMEOUT
145
146def join_timeout_func():
147    print('\tchild sleeping')
148    time.sleep(5.5)
149    print('\n\tchild terminating')
150
151def test_join_timeout():
152    p = processing.Process(target=join_timeout_func)
153    p.start()
154
155    print('waiting for process to finish')
156
157    while 1:
158        p.join(timeout=1)
159        if not p.is_alive():
160            break
161        print('.', end=' ')
162        sys.stdout.flush()
163
164
165#### TEST_EVENT
166
167def event_func(event):
168    print('\t%r is waiting' % processing.currentProcess())
169    event.wait()
170    print('\t%r has woken up' % processing.currentProcess())
171
172def test_event():
173    event = processing.Event()
174
175    processes = [processing.Process(target=event_func, args=(event,))
176                 for i in range(5)]
177
178    for p in processes:
179        p.start()
180
181    print('main is sleeping')
182    time.sleep(2)
183
184    print('main is setting event')
185    event.set()
186
187    for p in processes:
188        p.join()
189
190
191#### TEST_SHAREDVALUES
192
193def sharedvalues_func(values, arrays, shared_values, shared_arrays):
194    for i in range(len(values)):
195        v = values[i][1]
196        sv = shared_values[i].value
197        assert v == sv
198
199    for i in range(len(values)):
200        a = arrays[i][1]
201        sa = list(shared_arrays[i][:])
202        assert list(a) == sa
203
204    print('Tests passed')
205
206def test_sharedvalues():
207    values = [
208        ('i', 10),
209        ('h', -2),
210        ('d', 1.25)
211        ]
212    arrays = [
213        ('i', range(100)),
214        ('d', [0.25 * i for i in range(100)]),
215        ('H', range(1000))
216        ]
217
218    shared_values = [processing.Value(id, v) for id, v in values]
219    shared_arrays = [processing.Array(id, a) for id, a in arrays]
220
221    p = processing.Process(
222        target=sharedvalues_func,
223        args=(values, arrays, shared_values, shared_arrays)
224        )
225    p.start()
226    p.join()
227
228    assert p.exitcode == 0
229
230
231####
232
233def test(namespace=processing):
234    global processing
235
236    processing = namespace
237
238    for func in [ test_value, test_queue, test_condition,
239                  test_semaphore, test_join_timeout, test_event,
240                  test_sharedvalues ]:
241
242        print('\n\t######## %s\n' % func.__name__)
243        func()
244
245    ignore = processing.activeChildren()        # cleanup any old processes
246    if hasattr(processing, '_debugInfo'):
247        info = processing._debugInfo()
248        if info:
249            print(info)
250            raise ValueError('there should be no positive refcounts left')
251
252
253if __name__ == '__main__':
254    processing.freezeSupport()
255
256    assert len(sys.argv) in (1, 2)
257
258    if len(sys.argv) == 1 or sys.argv[1] == 'processes':
259        print(' Using processes '.center(79, '-'))
260        namespace = processing
261    elif sys.argv[1] == 'manager':
262        print(' Using processes and a manager '.center(79, '-'))
263        namespace = processing.Manager()
264        namespace.Process = processing.Process
265        namespace.currentProcess = processing.currentProcess
266        namespace.activeChildren = processing.activeChildren
267    elif sys.argv[1] == 'threads':
268        print(' Using threads '.center(79, '-'))
269        import processing.dummy as namespace
270    else:
271        print('Usage:\n\t%s [processes | manager | threads]' % sys.argv[0])
272        raise SystemExit(2)
273
274    test(namespace)
275