1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import queue as pyqueue
7import time
8import io
9import itertools
10import sys
11import os
12import gc
13import errno
14import signal
15import array
16import socket
17import random
18import logging
19import struct
20import operator
21import test.support
22import test.script_helper
23
24
25# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocess.synchronize')
29# import threading after _multiprocessing to raise a more revelant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
33
34import multiprocess as multiprocessing
35import multiprocess.dummy
36import multiprocess.connection
37import multiprocess.managers
38import multiprocess.heap
39import multiprocess.pool
40
41from multiprocess import util
42
43try:
44    from multiprocess import reduction
45    HAS_REDUCTION = True
46except ImportError:
47    HAS_REDUCTION = False
48
49try:
50    from multiprocess.sharedctypes import Value, copy
51    HAS_SHAREDCTYPES = True
52except ImportError:
53    HAS_SHAREDCTYPES = False
54
55try:
56    import msvcrt
57except ImportError:
58    msvcrt = None
59
60#
61#
62#
63
64def latin(s):
65    return s.encode('latin')
66
67#
68# Constants
69#
70
71LOG_LEVEL = util.SUBWARNING
72#LOG_LEVEL = logging.DEBUG
73
74DELTA = 0.1
75CHECK_TIMINGS = False     # making true makes tests take a lot longer
76                          # and can sometimes cause some non-serious
77                          # failures because some calls block a bit
78                          # longer than expected
79if CHECK_TIMINGS:
80    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
81else:
82    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
83
84HAVE_GETVALUE = not getattr(_multiprocessing,
85                            'HAVE_BROKEN_SEM_GETVALUE', False)
86
87WIN32 = (sys.platform == "win32")
88
89from multiprocess.connection import wait
90
91def wait_for_handle(handle, timeout):
92    if timeout is not None and timeout < 0.0:
93        timeout = None
94    return wait([handle], timeout)
95
96try:
97    MAXFD = os.sysconf("SC_OPEN_MAX")
98except:
99    MAXFD = 256
100
101#
102# Some tests require ctypes
103#
104
105try:
106    from ctypes import Structure, c_int, c_double
107except ImportError:
108    Structure = object
109    c_int = c_double = None
110
111
112def check_enough_semaphores():
113    """Check that the system supports enough semaphores to run the test."""
114    # minimum number of semaphores available according to POSIX
115    nsems_min = 256
116    try:
117        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
118    except (AttributeError, ValueError):
119        # sysconf not available or setting not available
120        return
121    if nsems == -1 or nsems >= nsems_min:
122        return
123    raise unittest.SkipTest("The OS doesn't support enough semaphores "
124                            "to run the test (required: %d)." % nsems_min)
125
126
127#
128# Creates a wrapper for a function which records the time it takes to finish
129#
130
131class TimingWrapper(object):
132
133    def __init__(self, func):
134        self.func = func
135        self.elapsed = None
136
137    def __call__(self, *args, **kwds):
138        t = time.time()
139        try:
140            return self.func(*args, **kwds)
141        finally:
142            self.elapsed = time.time() - t
143
144#
145# Base class for test cases
146#
147
148class BaseTestCase(object):
149
150    ALLOWED_TYPES = ('processes', 'manager', 'threads')
151
152    def assertTimingAlmostEqual(self, a, b):
153        if CHECK_TIMINGS:
154            self.assertAlmostEqual(a, b, 1)
155
156    def assertReturnsIfImplemented(self, value, func, *args):
157        try:
158            res = func(*args)
159        except NotImplementedError:
160            pass
161        else:
162            return self.assertEqual(value, res)
163
164    # For the sanity of Windows users, rather than crashing or freezing in
165    # multiple ways.
166    def __reduce__(self, *args):
167        raise NotImplementedError("shouldn't try to pickle a test case")
168
169    __reduce_ex__ = __reduce__
170
171#
172# Return the value of a semaphore
173#
174
175def get_value(self):
176    try:
177        return self.get_value()
178    except AttributeError:
179        try:
180            return self._Semaphore__value
181        except AttributeError:
182            try:
183                return self._value
184            except AttributeError:
185                raise NotImplementedError
186
187#
188# Testcases
189#
190
191class _TestProcess(BaseTestCase):
192
193    ALLOWED_TYPES = ('processes', 'threads')
194
195    def test_current(self):
196        if self.TYPE == 'threads':
197            self.skipTest('test not appropriate for {}'.format(self.TYPE))
198
199        current = self.current_process()
200        authkey = current.authkey
201
202        self.assertTrue(current.is_alive())
203        self.assertTrue(not current.daemon)
204        self.assertIsInstance(authkey, bytes)
205        self.assertTrue(len(authkey) > 0)
206        self.assertEqual(current.ident, os.getpid())
207        self.assertEqual(current.exitcode, None)
208
209    def test_daemon_argument(self):
210        if self.TYPE == "threads":
211            self.skipTest('test not appropriate for {}'.format(self.TYPE))
212
213        # By default uses the current process's daemon flag.
214        proc0 = self.Process(target=self._test)
215        self.assertEqual(proc0.daemon, self.current_process().daemon)
216        proc1 = self.Process(target=self._test, daemon=True)
217        self.assertTrue(proc1.daemon)
218        proc2 = self.Process(target=self._test, daemon=False)
219        self.assertFalse(proc2.daemon)
220
221    @classmethod
222    def _test(cls, q, *args, **kwds):
223        current = cls.current_process()
224        q.put(args)
225        q.put(kwds)
226        q.put(current.name)
227        if cls.TYPE != 'threads':
228            q.put(bytes(current.authkey))
229            q.put(current.pid)
230
231    def test_process(self):
232        q = self.Queue(1)
233        e = self.Event()
234        args = (q, 1, 2)
235        kwargs = {'hello':23, 'bye':2.54}
236        name = 'SomeProcess'
237        p = self.Process(
238            target=self._test, args=args, kwargs=kwargs, name=name
239            )
240        p.daemon = True
241        current = self.current_process()
242
243        if self.TYPE != 'threads':
244            self.assertEqual(p.authkey, current.authkey)
245        self.assertEqual(p.is_alive(), False)
246        self.assertEqual(p.daemon, True)
247        self.assertNotIn(p, self.active_children())
248        self.assertTrue(type(self.active_children()) is list)
249        self.assertEqual(p.exitcode, None)
250
251        p.start()
252
253        self.assertEqual(p.exitcode, None)
254        self.assertEqual(p.is_alive(), True)
255        self.assertIn(p, self.active_children())
256
257        self.assertEqual(q.get(), args[1:])
258        self.assertEqual(q.get(), kwargs)
259        self.assertEqual(q.get(), p.name)
260        if self.TYPE != 'threads':
261            self.assertEqual(q.get(), current.authkey)
262            self.assertEqual(q.get(), p.pid)
263
264        p.join()
265
266        self.assertEqual(p.exitcode, 0)
267        self.assertEqual(p.is_alive(), False)
268        self.assertNotIn(p, self.active_children())
269
270    @classmethod
271    def _test_terminate(cls):
272        time.sleep(100)
273
274    def test_terminate(self):
275        if self.TYPE == 'threads':
276            self.skipTest('test not appropriate for {}'.format(self.TYPE))
277
278        p = self.Process(target=self._test_terminate)
279        p.daemon = True
280        p.start()
281
282        self.assertEqual(p.is_alive(), True)
283        self.assertIn(p, self.active_children())
284        self.assertEqual(p.exitcode, None)
285
286        join = TimingWrapper(p.join)
287
288        self.assertEqual(join(0), None)
289        self.assertTimingAlmostEqual(join.elapsed, 0.0)
290        self.assertEqual(p.is_alive(), True)
291
292        self.assertEqual(join(-1), None)
293        self.assertTimingAlmostEqual(join.elapsed, 0.0)
294        self.assertEqual(p.is_alive(), True)
295
296        p.terminate()
297
298        if hasattr(signal, 'alarm'):
299            def handler(*args):
300                raise RuntimeError('join took too long: %s' % p)
301            old_handler = signal.signal(signal.SIGALRM, handler)
302            try:
303                signal.alarm(10)
304                self.assertEqual(join(), None)
305                signal.alarm(0)
306            finally:
307                signal.signal(signal.SIGALRM, old_handler)
308        else:
309            self.assertEqual(join(), None)
310
311        self.assertTimingAlmostEqual(join.elapsed, 0.0)
312
313        self.assertEqual(p.is_alive(), False)
314        self.assertNotIn(p, self.active_children())
315
316        p.join()
317
318        # XXX sometimes get p.exitcode == 0 on Windows ...
319        #self.assertEqual(p.exitcode, -signal.SIGTERM)
320
321    def test_cpu_count(self):
322        try:
323            cpus = multiprocessing.cpu_count()
324        except NotImplementedError:
325            cpus = 1
326        self.assertTrue(type(cpus) is int)
327        self.assertTrue(cpus >= 1)
328
329    def test_active_children(self):
330        self.assertEqual(type(self.active_children()), list)
331
332        p = self.Process(target=time.sleep, args=(DELTA,))
333        self.assertNotIn(p, self.active_children())
334
335        p.daemon = True
336        p.start()
337        self.assertIn(p, self.active_children())
338
339        p.join()
340        self.assertNotIn(p, self.active_children())
341
342    @classmethod
343    def _test_recursion(cls, wconn, id):
344        from multiprocess import forking
345        wconn.send(id)
346        if len(id) < 2:
347            for i in range(2):
348                p = cls.Process(
349                    target=cls._test_recursion, args=(wconn, id+[i])
350                    )
351                p.start()
352                p.join()
353
354    def test_recursion(self):
355        rconn, wconn = self.Pipe(duplex=False)
356        self._test_recursion(wconn, [])
357
358        time.sleep(DELTA)
359        result = []
360        while rconn.poll():
361            result.append(rconn.recv())
362
363        expected = [
364            [],
365              [0],
366                [0, 0],
367                [0, 1],
368              [1],
369                [1, 0],
370                [1, 1]
371            ]
372        self.assertEqual(result, expected)
373
374    @classmethod
375    def _test_sentinel(cls, event):
376        event.wait(10.0)
377
378    def test_sentinel(self):
379        if self.TYPE == "threads":
380            self.skipTest('test not appropriate for {}'.format(self.TYPE))
381        event = self.Event()
382        p = self.Process(target=self._test_sentinel, args=(event,))
383        with self.assertRaises(ValueError):
384            p.sentinel
385        p.start()
386        self.addCleanup(p.join)
387        sentinel = p.sentinel
388        self.assertIsInstance(sentinel, int)
389        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
390        event.set()
391        p.join()
392        self.assertTrue(wait_for_handle(sentinel, timeout=DELTA))
393
394#
395#
396#
397
398class _UpperCaser(multiprocessing.Process):
399
400    def __init__(self):
401        multiprocessing.Process.__init__(self)
402        self.child_conn, self.parent_conn = multiprocessing.Pipe()
403
404    def run(self):
405        self.parent_conn.close()
406        for s in iter(self.child_conn.recv, None):
407            self.child_conn.send(s.upper())
408        self.child_conn.close()
409
410    def submit(self, s):
411        assert type(s) is str
412        self.parent_conn.send(s)
413        return self.parent_conn.recv()
414
415    def stop(self):
416        self.parent_conn.send(None)
417        self.parent_conn.close()
418        self.child_conn.close()
419
420class _TestSubclassingProcess(BaseTestCase):
421
422    ALLOWED_TYPES = ('processes',)
423
424    def test_subclassing(self):
425        uppercaser = _UpperCaser()
426        uppercaser.daemon = True
427        uppercaser.start()
428        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
429        self.assertEqual(uppercaser.submit('world'), 'WORLD')
430        uppercaser.stop()
431        uppercaser.join()
432
433    def test_stderr_flush(self):
434        # sys.stderr is flushed at process shutdown (issue #13812)
435        if self.TYPE == "threads":
436            self.skipTest('test not appropriate for {}'.format(self.TYPE))
437
438        testfn = test.support.TESTFN
439        self.addCleanup(test.support.unlink, testfn)
440        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
441        proc.start()
442        proc.join()
443        with open(testfn, 'r') as f:
444            err = f.read()
445            # The whole traceback was printed
446            self.assertIn("ZeroDivisionError", err)
447            self.assertIn("__init__.py", err)
448            self.assertIn("1/0 # MARKER", err)
449
450    @classmethod
451    def _test_stderr_flush(cls, testfn):
452        sys.stderr = open(testfn, 'w')
453        1/0 # MARKER
454
455
456    @classmethod
457    def _test_sys_exit(cls, reason, testfn):
458        sys.stderr = open(testfn, 'w')
459        sys.exit(reason)
460
461    def test_sys_exit(self):
462        # See Issue 13854
463        if self.TYPE == 'threads':
464            self.skipTest('test not appropriate for {}'.format(self.TYPE))
465
466        testfn = test.support.TESTFN
467        self.addCleanup(test.support.unlink, testfn)
468
469        for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
470            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
471            p.daemon = True
472            p.start()
473            p.join(5)
474            self.assertEqual(p.exitcode, code)
475
476            with open(testfn, 'r') as f:
477                self.assertEqual(f.read().rstrip(), str(reason))
478
479        for reason in (True, False, 8):
480            p = self.Process(target=sys.exit, args=(reason,))
481            p.daemon = True
482            p.start()
483            p.join(5)
484            self.assertEqual(p.exitcode, reason)
485
486#
487#
488#
489
490def queue_empty(q):
491    if hasattr(q, 'empty'):
492        return q.empty()
493    else:
494        return q.qsize() == 0
495
496def queue_full(q, maxsize):
497    if hasattr(q, 'full'):
498        return q.full()
499    else:
500        return q.qsize() == maxsize
501
502
503class _TestQueue(BaseTestCase):
504
505
506    @classmethod
507    def _test_put(cls, queue, child_can_start, parent_can_continue):
508        child_can_start.wait()
509        for i in range(6):
510            queue.get()
511        parent_can_continue.set()
512
513    def test_put(self):
514        MAXSIZE = 6
515        queue = self.Queue(maxsize=MAXSIZE)
516        child_can_start = self.Event()
517        parent_can_continue = self.Event()
518
519        proc = self.Process(
520            target=self._test_put,
521            args=(queue, child_can_start, parent_can_continue)
522            )
523        proc.daemon = True
524        proc.start()
525
526        self.assertEqual(queue_empty(queue), True)
527        self.assertEqual(queue_full(queue, MAXSIZE), False)
528
529        queue.put(1)
530        queue.put(2, True)
531        queue.put(3, True, None)
532        queue.put(4, False)
533        queue.put(5, False, None)
534        queue.put_nowait(6)
535
536        # the values may be in buffer but not yet in pipe so sleep a bit
537        time.sleep(DELTA)
538
539        self.assertEqual(queue_empty(queue), False)
540        self.assertEqual(queue_full(queue, MAXSIZE), True)
541
542        put = TimingWrapper(queue.put)
543        put_nowait = TimingWrapper(queue.put_nowait)
544
545        self.assertRaises(pyqueue.Full, put, 7, False)
546        self.assertTimingAlmostEqual(put.elapsed, 0)
547
548        self.assertRaises(pyqueue.Full, put, 7, False, None)
549        self.assertTimingAlmostEqual(put.elapsed, 0)
550
551        self.assertRaises(pyqueue.Full, put_nowait, 7)
552        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
553
554        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
555        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
556
557        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
558        self.assertTimingAlmostEqual(put.elapsed, 0)
559
560        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
561        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
562
563        child_can_start.set()
564        parent_can_continue.wait()
565
566        self.assertEqual(queue_empty(queue), True)
567        self.assertEqual(queue_full(queue, MAXSIZE), False)
568
569        proc.join()
570
571    @classmethod
572    def _test_get(cls, queue, child_can_start, parent_can_continue):
573        child_can_start.wait()
574        #queue.put(1)
575        queue.put(2)
576        queue.put(3)
577        queue.put(4)
578        queue.put(5)
579        parent_can_continue.set()
580
581    def test_get(self):
582        queue = self.Queue()
583        child_can_start = self.Event()
584        parent_can_continue = self.Event()
585
586        proc = self.Process(
587            target=self._test_get,
588            args=(queue, child_can_start, parent_can_continue)
589            )
590        proc.daemon = True
591        proc.start()
592
593        self.assertEqual(queue_empty(queue), True)
594
595        child_can_start.set()
596        parent_can_continue.wait()
597
598        time.sleep(DELTA)
599        self.assertEqual(queue_empty(queue), False)
600
601        # Hangs unexpectedly, remove for now
602        #self.assertEqual(queue.get(), 1)
603        self.assertEqual(queue.get(True, None), 2)
604        self.assertEqual(queue.get(True), 3)
605        self.assertEqual(queue.get(timeout=1), 4)
606        self.assertEqual(queue.get_nowait(), 5)
607
608        self.assertEqual(queue_empty(queue), True)
609
610        get = TimingWrapper(queue.get)
611        get_nowait = TimingWrapper(queue.get_nowait)
612
613        self.assertRaises(pyqueue.Empty, get, False)
614        self.assertTimingAlmostEqual(get.elapsed, 0)
615
616        self.assertRaises(pyqueue.Empty, get, False, None)
617        self.assertTimingAlmostEqual(get.elapsed, 0)
618
619        self.assertRaises(pyqueue.Empty, get_nowait)
620        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
621
622        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
623        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
624
625        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
626        self.assertTimingAlmostEqual(get.elapsed, 0)
627
628        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
629        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
630
631        proc.join()
632
633    @classmethod
634    def _test_fork(cls, queue):
635        for i in range(10, 20):
636            queue.put(i)
637        # note that at this point the items may only be buffered, so the
638        # process cannot shutdown until the feeder thread has finished
639        # pushing items onto the pipe.
640
641    def test_fork(self):
642        # Old versions of Queue would fail to create a new feeder
643        # thread for a forked process if the original process had its
644        # own feeder thread.  This test checks that this no longer
645        # happens.
646
647        queue = self.Queue()
648
649        # put items on queue so that main process starts a feeder thread
650        for i in range(10):
651            queue.put(i)
652
653        # wait to make sure thread starts before we fork a new process
654        time.sleep(DELTA)
655
656        # fork process
657        p = self.Process(target=self._test_fork, args=(queue,))
658        p.daemon = True
659        p.start()
660
661        # check that all expected items are in the queue
662        for i in range(20):
663            self.assertEqual(queue.get(), i)
664        self.assertRaises(pyqueue.Empty, queue.get, False)
665
666        p.join()
667
668    def test_qsize(self):
669        q = self.Queue()
670        try:
671            self.assertEqual(q.qsize(), 0)
672        except NotImplementedError:
673            self.skipTest('qsize method not implemented')
674        q.put(1)
675        self.assertEqual(q.qsize(), 1)
676        q.put(5)
677        self.assertEqual(q.qsize(), 2)
678        q.get()
679        self.assertEqual(q.qsize(), 1)
680        q.get()
681        self.assertEqual(q.qsize(), 0)
682
683    @classmethod
684    def _test_task_done(cls, q):
685        for obj in iter(q.get, None):
686            time.sleep(DELTA)
687            q.task_done()
688
689    def test_task_done(self):
690        queue = self.JoinableQueue()
691
692        if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'):
693            self.skipTest("requires 'queue.task_done()' method")
694
695        workers = [self.Process(target=self._test_task_done, args=(queue,))
696                   for i in range(4)]
697
698        for p in workers:
699            p.daemon = True
700            p.start()
701
702        for i in range(10):
703            queue.put(i)
704
705        queue.join()
706
707        for p in workers:
708            queue.put(None)
709
710        for p in workers:
711            p.join()
712
713    def test_timeout(self):
714        q = multiprocessing.Queue()
715        start = time.time()
716        self.assertRaises(pyqueue.Empty, q.get, True, 0.2)
717        delta = time.time() - start
718        self.assertGreaterEqual(delta, 0.18)
719
720#
721#
722#
723
724class _TestLock(BaseTestCase):
725
726    def test_lock(self):
727        lock = self.Lock()
728        self.assertEqual(lock.acquire(), True)
729        self.assertEqual(lock.acquire(False), False)
730        self.assertEqual(lock.release(), None)
731        self.assertRaises((ValueError, threading.ThreadError), lock.release)
732
733    def test_rlock(self):
734        lock = self.RLock()
735        self.assertEqual(lock.acquire(), True)
736        self.assertEqual(lock.acquire(), True)
737        self.assertEqual(lock.acquire(), True)
738        self.assertEqual(lock.release(), None)
739        self.assertEqual(lock.release(), None)
740        self.assertEqual(lock.release(), None)
741        self.assertRaises((AssertionError, RuntimeError), lock.release)
742
743    def test_lock_context(self):
744        with self.Lock():
745            pass
746
747
748class _TestSemaphore(BaseTestCase):
749
750    def _test_semaphore(self, sem):
751        self.assertReturnsIfImplemented(2, get_value, sem)
752        self.assertEqual(sem.acquire(), True)
753        self.assertReturnsIfImplemented(1, get_value, sem)
754        self.assertEqual(sem.acquire(), True)
755        self.assertReturnsIfImplemented(0, get_value, sem)
756        self.assertEqual(sem.acquire(False), False)
757        self.assertReturnsIfImplemented(0, get_value, sem)
758        self.assertEqual(sem.release(), None)
759        self.assertReturnsIfImplemented(1, get_value, sem)
760        self.assertEqual(sem.release(), None)
761        self.assertReturnsIfImplemented(2, get_value, sem)
762
763    def test_semaphore(self):
764        sem = self.Semaphore(2)
765        self._test_semaphore(sem)
766        self.assertEqual(sem.release(), None)
767        self.assertReturnsIfImplemented(3, get_value, sem)
768        self.assertEqual(sem.release(), None)
769        self.assertReturnsIfImplemented(4, get_value, sem)
770
771    def test_bounded_semaphore(self):
772        sem = self.BoundedSemaphore(2)
773        self._test_semaphore(sem)
774        # Currently fails on OS/X
775        #if HAVE_GETVALUE:
776        #    self.assertRaises(ValueError, sem.release)
777        #    self.assertReturnsIfImplemented(2, get_value, sem)
778
779    def test_timeout(self):
780        if self.TYPE != 'processes':
781            self.skipTest('test not appropriate for {}'.format(self.TYPE))
782
783        sem = self.Semaphore(0)
784        acquire = TimingWrapper(sem.acquire)
785
786        self.assertEqual(acquire(False), False)
787        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
788
789        self.assertEqual(acquire(False, None), False)
790        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
791
792        self.assertEqual(acquire(False, TIMEOUT1), False)
793        self.assertTimingAlmostEqual(acquire.elapsed, 0)
794
795        self.assertEqual(acquire(True, TIMEOUT2), False)
796        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
797
798        self.assertEqual(acquire(timeout=TIMEOUT3), False)
799        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
800
801
802class _TestCondition(BaseTestCase):
803
804    @classmethod
805    def f(cls, cond, sleeping, woken, timeout=None):
806        cond.acquire()
807        sleeping.release()
808        cond.wait(timeout)
809        woken.release()
810        cond.release()
811
812    def check_invariant(self, cond):
813        # this is only supposed to succeed when there are no sleepers
814        if self.TYPE == 'processes':
815            try:
816                sleepers = (cond._sleeping_count.get_value() -
817                            cond._woken_count.get_value())
818                self.assertEqual(sleepers, 0)
819                self.assertEqual(cond._wait_semaphore.get_value(), 0)
820            except NotImplementedError:
821                pass
822
823    def test_notify(self):
824        cond = self.Condition()
825        sleeping = self.Semaphore(0)
826        woken = self.Semaphore(0)
827
828        p = self.Process(target=self.f, args=(cond, sleeping, woken))
829        p.daemon = True
830        p.start()
831
832        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
833        p.daemon = True
834        p.start()
835
836        # wait for both children to start sleeping
837        sleeping.acquire()
838        sleeping.acquire()
839
840        # check no process/thread has woken up
841        time.sleep(DELTA)
842        self.assertReturnsIfImplemented(0, get_value, woken)
843
844        # wake up one process/thread
845        cond.acquire()
846        cond.notify()
847        cond.release()
848
849        # check one process/thread has woken up
850        time.sleep(DELTA)
851        self.assertReturnsIfImplemented(1, get_value, woken)
852
853        # wake up another
854        cond.acquire()
855        cond.notify()
856        cond.release()
857
858        # check other has woken up
859        time.sleep(DELTA)
860        self.assertReturnsIfImplemented(2, get_value, woken)
861
862        # check state is not mucked up
863        self.check_invariant(cond)
864        p.join()
865
866    def test_notify_all(self):
867        cond = self.Condition()
868        sleeping = self.Semaphore(0)
869        woken = self.Semaphore(0)
870
871        # start some threads/processes which will timeout
872        for i in range(3):
873            p = self.Process(target=self.f,
874                             args=(cond, sleeping, woken, TIMEOUT1))
875            p.daemon = True
876            p.start()
877
878            t = threading.Thread(target=self.f,
879                                 args=(cond, sleeping, woken, TIMEOUT1))
880            t.daemon = True
881            t.start()
882
883        # wait for them all to sleep
884        for i in range(6):
885            sleeping.acquire()
886
887        # check they have all timed out
888        for i in range(6):
889            woken.acquire()
890        self.assertReturnsIfImplemented(0, get_value, woken)
891
892        # check state is not mucked up
893        self.check_invariant(cond)
894
895        # start some more threads/processes
896        for i in range(3):
897            p = self.Process(target=self.f, args=(cond, sleeping, woken))
898            p.daemon = True
899            p.start()
900
901            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
902            t.daemon = True
903            t.start()
904
905        # wait for them to all sleep
906        for i in range(6):
907            sleeping.acquire()
908
909        # check no process/thread has woken up
910        time.sleep(DELTA)
911        self.assertReturnsIfImplemented(0, get_value, woken)
912
913        # wake them all up
914        cond.acquire()
915        cond.notify_all()
916        cond.release()
917
918        # check they have all woken
919        for i in range(10):
920            try:
921                if get_value(woken) == 6:
922                    break
923            except NotImplementedError:
924                break
925            time.sleep(DELTA)
926        self.assertReturnsIfImplemented(6, get_value, woken)
927
928        # check state is not mucked up
929        self.check_invariant(cond)
930
931    def test_timeout(self):
932        cond = self.Condition()
933        wait = TimingWrapper(cond.wait)
934        cond.acquire()
935        res = wait(TIMEOUT1)
936        cond.release()
937        self.assertEqual(res, False)
938        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
939
940    @classmethod
941    def _test_waitfor_f(cls, cond, state):
942        with cond:
943            state.value = 0
944            cond.notify()
945            result = cond.wait_for(lambda : state.value==4)
946            if not result or state.value != 4:
947                sys.exit(1)
948
949    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
950    def test_waitfor(self):
951        # based on test in test/lock_tests.py
952        cond = self.Condition()
953        state = self.Value('i', -1)
954
955        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
956        p.daemon = True
957        p.start()
958
959        with cond:
960            result = cond.wait_for(lambda : state.value==0)
961            self.assertTrue(result)
962            self.assertEqual(state.value, 0)
963
964        for i in range(4):
965            time.sleep(0.01)
966            with cond:
967                state.value += 1
968                cond.notify()
969
970        p.join(5)
971        self.assertFalse(p.is_alive())
972        self.assertEqual(p.exitcode, 0)
973
974    @classmethod
975    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
976        sem.release()
977        with cond:
978            expected = 0.1
979            dt = time.time()
980            result = cond.wait_for(lambda : state.value==4, timeout=expected)
981            dt = time.time() - dt
982            # borrow logic in assertTimeout() from test/lock_tests.py
983            if not result and expected * 0.6 < dt < expected * 10.0:
984                success.value = True
985
986    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
987    def test_waitfor_timeout(self):
988        # based on test in test/lock_tests.py
989        cond = self.Condition()
990        state = self.Value('i', 0)
991        success = self.Value('i', False)
992        sem = self.Semaphore(0)
993
994        p = self.Process(target=self._test_waitfor_timeout_f,
995                         args=(cond, state, success, sem))
996        p.daemon = True
997        p.start()
998        self.assertTrue(sem.acquire(timeout=10))
999
1000        # Only increment 3 times, so state == 4 is never reached.
1001        for i in range(3):
1002            time.sleep(0.01)
1003            with cond:
1004                state.value += 1
1005                cond.notify()
1006
1007        p.join(5)
1008        self.assertTrue(success.value)
1009
1010    @classmethod
1011    def _test_wait_result(cls, c, pid):
1012        with c:
1013            c.notify()
1014        time.sleep(1)
1015        if pid is not None:
1016            os.kill(pid, signal.SIGINT)
1017
1018    def test_wait_result(self):
1019        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1020            pid = os.getpid()
1021        else:
1022            pid = None
1023
1024        c = self.Condition()
1025        with c:
1026            self.assertFalse(c.wait(0))
1027            self.assertFalse(c.wait(0.1))
1028
1029            p = self.Process(target=self._test_wait_result, args=(c, pid))
1030            p.start()
1031
1032            self.assertTrue(c.wait(10))
1033            if pid is not None:
1034                self.assertRaises(KeyboardInterrupt, c.wait, 10)
1035
1036            p.join()
1037
1038
1039class _TestEvent(BaseTestCase):
1040
1041    @classmethod
1042    def _test_event(cls, event):
1043        time.sleep(TIMEOUT2)
1044        event.set()
1045
1046    def test_event(self):
1047        event = self.Event()
1048        wait = TimingWrapper(event.wait)
1049
1050        # Removed temporarily, due to API shear, this does not
1051        # work with threading._Event objects. is_set == isSet
1052        self.assertEqual(event.is_set(), False)
1053
1054        # Removed, threading.Event.wait() will return the value of the __flag
1055        # instead of None. API Shear with the semaphore backed mp.Event
1056        self.assertEqual(wait(0.0), False)
1057        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1058        self.assertEqual(wait(TIMEOUT1), False)
1059        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1060
1061        event.set()
1062
1063        # See note above on the API differences
1064        self.assertEqual(event.is_set(), True)
1065        self.assertEqual(wait(), True)
1066        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1067        self.assertEqual(wait(TIMEOUT1), True)
1068        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1069        # self.assertEqual(event.is_set(), True)
1070
1071        event.clear()
1072
1073        #self.assertEqual(event.is_set(), False)
1074
1075        p = self.Process(target=self._test_event, args=(event,))
1076        p.daemon = True
1077        p.start()
1078        self.assertEqual(wait(), True)
1079
1080#
1081# Tests for Barrier - adapted from tests in test/lock_tests.py
1082#
1083
1084# Many of the tests for threading.Barrier use a list as an atomic
1085# counter: a value is appended to increment the counter, and the
1086# length of the list gives the value.  We use the class DummyList
1087# for the same purpose.
1088
1089class _DummyList(object):
1090
1091    def __init__(self):
1092        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1093        lock = multiprocessing.Lock()
1094        self.__setstate__((wrapper, lock))
1095        self._lengthbuf[0] = 0
1096
1097    def __setstate__(self, state):
1098        (self._wrapper, self._lock) = state
1099        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1100
1101    def __getstate__(self):
1102        return (self._wrapper, self._lock)
1103
1104    def append(self, _):
1105        with self._lock:
1106            self._lengthbuf[0] += 1
1107
1108    def __len__(self):
1109        with self._lock:
1110            return self._lengthbuf[0]
1111
1112def _wait():
1113    # A crude wait/yield function not relying on synchronization primitives.
1114    time.sleep(0.01)
1115
1116
1117class Bunch(object):
1118    """
1119    A bunch of threads.
1120    """
1121    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1122        """
1123        Construct a bunch of `n` threads running the same function `f`.
1124        If `wait_before_exit` is True, the threads won't terminate until
1125        do_finish() is called.
1126        """
1127        self.f = f
1128        self.args = args
1129        self.n = n
1130        self.started = namespace.DummyList()
1131        self.finished = namespace.DummyList()
1132        self._can_exit = namespace.Event()
1133        if not wait_before_exit:
1134            self._can_exit.set()
1135        for i in range(n):
1136            p = namespace.Process(target=self.task)
1137            p.daemon = True
1138            p.start()
1139
1140    def task(self):
1141        pid = os.getpid()
1142        self.started.append(pid)
1143        try:
1144            self.f(*self.args)
1145        finally:
1146            self.finished.append(pid)
1147            self._can_exit.wait(30)
1148            assert self._can_exit.is_set()
1149
1150    def wait_for_started(self):
1151        while len(self.started) < self.n:
1152            _wait()
1153
1154    def wait_for_finished(self):
1155        while len(self.finished) < self.n:
1156            _wait()
1157
1158    def do_finish(self):
1159        self._can_exit.set()
1160
1161
1162class AppendTrue(object):
1163    def __init__(self, obj):
1164        self.obj = obj
1165    def __call__(self):
1166        self.obj.append(True)
1167
1168
1169class _TestBarrier(BaseTestCase):
1170    """
1171    Tests for Barrier objects.
1172    """
1173    N = 5
1174    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1175
1176    def setUp(self):
1177        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1178
1179    def tearDown(self):
1180        self.barrier.abort()
1181        self.barrier = None
1182
1183    def DummyList(self):
1184        if self.TYPE == 'threads':
1185            return []
1186        elif self.TYPE == 'manager':
1187            return self.manager.list()
1188        else:
1189            return _DummyList()
1190
1191    def run_threads(self, f, args):
1192        b = Bunch(self, f, args, self.N-1)
1193        f(*args)
1194        b.wait_for_finished()
1195
1196    @classmethod
1197    def multipass(cls, barrier, results, n):
1198        m = barrier.parties
1199        assert m == cls.N
1200        for i in range(n):
1201            results[0].append(True)
1202            assert len(results[1]) == i * m
1203            barrier.wait()
1204            results[1].append(True)
1205            assert len(results[0]) == (i + 1) * m
1206            barrier.wait()
1207        try:
1208            assert barrier.n_waiting == 0
1209        except NotImplementedError:
1210            pass
1211        assert not barrier.broken
1212
1213    def test_barrier(self, passes=1):
1214        """
1215        Test that a barrier is passed in lockstep
1216        """
1217        results = [self.DummyList(), self.DummyList()]
1218        self.run_threads(self.multipass, (self.barrier, results, passes))
1219
1220    def test_barrier_10(self):
1221        """
1222        Test that a barrier works for 10 consecutive runs
1223        """
1224        return self.test_barrier(10)
1225
1226    @classmethod
1227    def _test_wait_return_f(cls, barrier, queue):
1228        res = barrier.wait()
1229        queue.put(res)
1230
1231    def test_wait_return(self):
1232        """
1233        test the return value from barrier.wait
1234        """
1235        queue = self.Queue()
1236        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1237        results = [queue.get() for i in range(self.N)]
1238        self.assertEqual(results.count(0), 1)
1239
1240    @classmethod
1241    def _test_action_f(cls, barrier, results):
1242        barrier.wait()
1243        if len(results) != 1:
1244            raise RuntimeError
1245
1246    def test_action(self):
1247        """
1248        Test the 'action' callback
1249        """
1250        results = self.DummyList()
1251        barrier = self.Barrier(self.N, action=AppendTrue(results))
1252        self.run_threads(self._test_action_f, (barrier, results))
1253        self.assertEqual(len(results), 1)
1254
1255    @classmethod
1256    def _test_abort_f(cls, barrier, results1, results2):
1257        try:
1258            i = barrier.wait()
1259            if i == cls.N//2:
1260                raise RuntimeError
1261            barrier.wait()
1262            results1.append(True)
1263        except threading.BrokenBarrierError:
1264            results2.append(True)
1265        except RuntimeError:
1266            barrier.abort()
1267
1268    def test_abort(self):
1269        """
1270        Test that an abort will put the barrier in a broken state
1271        """
1272        results1 = self.DummyList()
1273        results2 = self.DummyList()
1274        self.run_threads(self._test_abort_f,
1275                         (self.barrier, results1, results2))
1276        self.assertEqual(len(results1), 0)
1277        self.assertEqual(len(results2), self.N-1)
1278        self.assertTrue(self.barrier.broken)
1279
1280    @classmethod
1281    def _test_reset_f(cls, barrier, results1, results2, results3):
1282        i = barrier.wait()
1283        if i == cls.N//2:
1284            # Wait until the other threads are all in the barrier.
1285            while barrier.n_waiting < cls.N-1:
1286                time.sleep(0.001)
1287            barrier.reset()
1288        else:
1289            try:
1290                barrier.wait()
1291                results1.append(True)
1292            except threading.BrokenBarrierError:
1293                results2.append(True)
1294        # Now, pass the barrier again
1295        barrier.wait()
1296        results3.append(True)
1297
1298    def test_reset(self):
1299        """
1300        Test that a 'reset' on a barrier frees the waiting threads
1301        """
1302        results1 = self.DummyList()
1303        results2 = self.DummyList()
1304        results3 = self.DummyList()
1305        self.run_threads(self._test_reset_f,
1306                         (self.barrier, results1, results2, results3))
1307        self.assertEqual(len(results1), 0)
1308        self.assertEqual(len(results2), self.N-1)
1309        self.assertEqual(len(results3), self.N)
1310
1311    @classmethod
1312    def _test_abort_and_reset_f(cls, barrier, barrier2,
1313                                results1, results2, results3):
1314        try:
1315            i = barrier.wait()
1316            if i == cls.N//2:
1317                raise RuntimeError
1318            barrier.wait()
1319            results1.append(True)
1320        except threading.BrokenBarrierError:
1321            results2.append(True)
1322        except RuntimeError:
1323            barrier.abort()
1324        # Synchronize and reset the barrier.  Must synchronize first so
1325        # that everyone has left it when we reset, and after so that no
1326        # one enters it before the reset.
1327        if barrier2.wait() == cls.N//2:
1328            barrier.reset()
1329        barrier2.wait()
1330        barrier.wait()
1331        results3.append(True)
1332
1333    def test_abort_and_reset(self):
1334        """
1335        Test that a barrier can be reset after being broken.
1336        """
1337        results1 = self.DummyList()
1338        results2 = self.DummyList()
1339        results3 = self.DummyList()
1340        barrier2 = self.Barrier(self.N)
1341
1342        self.run_threads(self._test_abort_and_reset_f,
1343                         (self.barrier, barrier2, results1, results2, results3))
1344        self.assertEqual(len(results1), 0)
1345        self.assertEqual(len(results2), self.N-1)
1346        self.assertEqual(len(results3), self.N)
1347
1348    @classmethod
1349    def _test_timeout_f(cls, barrier, results):
1350        i = barrier.wait()
1351        if i == cls.N//2:
1352            # One thread is late!
1353            time.sleep(1.0)
1354        try:
1355            barrier.wait(0.5)
1356        except threading.BrokenBarrierError:
1357            results.append(True)
1358
1359    def test_timeout(self):
1360        """
1361        Test wait(timeout)
1362        """
1363        results = self.DummyList()
1364        self.run_threads(self._test_timeout_f, (self.barrier, results))
1365        self.assertEqual(len(results), self.barrier.parties)
1366
1367    @classmethod
1368    def _test_default_timeout_f(cls, barrier, results):
1369        i = barrier.wait(cls.defaultTimeout)
1370        if i == cls.N//2:
1371            # One thread is later than the default timeout
1372            time.sleep(1.0)
1373        try:
1374            barrier.wait()
1375        except threading.BrokenBarrierError:
1376            results.append(True)
1377
1378    def test_default_timeout(self):
1379        """
1380        Test the barrier's default timeout
1381        """
1382        barrier = self.Barrier(self.N, timeout=0.5)
1383        results = self.DummyList()
1384        self.run_threads(self._test_default_timeout_f, (barrier, results))
1385        self.assertEqual(len(results), barrier.parties)
1386
1387    def test_single_thread(self):
1388        b = self.Barrier(1)
1389        b.wait()
1390        b.wait()
1391
1392    @classmethod
1393    def _test_thousand_f(cls, barrier, passes, conn, lock):
1394        for i in range(passes):
1395            barrier.wait()
1396            with lock:
1397                conn.send(i)
1398
1399    def test_thousand(self):
1400        if self.TYPE == 'manager':
1401            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1402        passes = 1000
1403        lock = self.Lock()
1404        conn, child_conn = self.Pipe(False)
1405        for j in range(self.N):
1406            p = self.Process(target=self._test_thousand_f,
1407                           args=(self.barrier, passes, child_conn, lock))
1408            p.start()
1409
1410        for i in range(passes):
1411            for j in range(self.N):
1412                self.assertEqual(conn.recv(), i)
1413
1414#
1415#
1416#
1417
1418class _TestValue(BaseTestCase):
1419
1420    ALLOWED_TYPES = ('processes',)
1421
1422    codes_values = [
1423        ('i', 4343, 24234),
1424        ('d', 3.625, -4.25),
1425        ('h', -232, 234),
1426        ('c', latin('x'), latin('y'))
1427        ]
1428
1429    def setUp(self):
1430        if not HAS_SHAREDCTYPES:
1431            self.skipTest("requires multiprocessing.sharedctypes")
1432
1433    @classmethod
1434    def _test(cls, values):
1435        for sv, cv in zip(values, cls.codes_values):
1436            sv.value = cv[2]
1437
1438
1439    def test_value(self, raw=False):
1440        if raw:
1441            values = [self.RawValue(code, value)
1442                      for code, value, _ in self.codes_values]
1443        else:
1444            values = [self.Value(code, value)
1445                      for code, value, _ in self.codes_values]
1446
1447        for sv, cv in zip(values, self.codes_values):
1448            self.assertEqual(sv.value, cv[1])
1449
1450        proc = self.Process(target=self._test, args=(values,))
1451        proc.daemon = True
1452        proc.start()
1453        proc.join()
1454
1455        for sv, cv in zip(values, self.codes_values):
1456            self.assertEqual(sv.value, cv[2])
1457
1458    def test_rawvalue(self):
1459        self.test_value(raw=True)
1460
1461    def test_getobj_getlock(self):
1462        val1 = self.Value('i', 5)
1463        lock1 = val1.get_lock()
1464        obj1 = val1.get_obj()
1465
1466        val2 = self.Value('i', 5, lock=None)
1467        lock2 = val2.get_lock()
1468        obj2 = val2.get_obj()
1469
1470        lock = self.Lock()
1471        val3 = self.Value('i', 5, lock=lock)
1472        lock3 = val3.get_lock()
1473        obj3 = val3.get_obj()
1474        self.assertEqual(lock, lock3)
1475
1476        arr4 = self.Value('i', 5, lock=False)
1477        self.assertFalse(hasattr(arr4, 'get_lock'))
1478        self.assertFalse(hasattr(arr4, 'get_obj'))
1479
1480        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1481
1482        arr5 = self.RawValue('i', 5)
1483        self.assertFalse(hasattr(arr5, 'get_lock'))
1484        self.assertFalse(hasattr(arr5, 'get_obj'))
1485
1486
1487class _TestArray(BaseTestCase):
1488
1489    ALLOWED_TYPES = ('processes',)
1490
1491    @classmethod
1492    def f(cls, seq):
1493        for i in range(1, len(seq)):
1494            seq[i] += seq[i-1]
1495
1496    @unittest.skipIf(c_int is None, "requires _ctypes")
1497    def test_array(self, raw=False):
1498        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1499        if raw:
1500            arr = self.RawArray('i', seq)
1501        else:
1502            arr = self.Array('i', seq)
1503
1504        self.assertEqual(len(arr), len(seq))
1505        self.assertEqual(arr[3], seq[3])
1506        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1507
1508        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1509
1510        self.assertEqual(list(arr[:]), seq)
1511
1512        self.f(seq)
1513
1514        p = self.Process(target=self.f, args=(arr,))
1515        p.daemon = True
1516        p.start()
1517        p.join()
1518
1519        self.assertEqual(list(arr[:]), seq)
1520
1521    @unittest.skipIf(c_int is None, "requires _ctypes")
1522    def test_array_from_size(self):
1523        size = 10
1524        # Test for zeroing (see issue #11675).
1525        # The repetition below strengthens the test by increasing the chances
1526        # of previously allocated non-zero memory being used for the new array
1527        # on the 2nd and 3rd loops.
1528        for _ in range(3):
1529            arr = self.Array('i', size)
1530            self.assertEqual(len(arr), size)
1531            self.assertEqual(list(arr), [0] * size)
1532            arr[:] = range(10)
1533            self.assertEqual(list(arr), list(range(10)))
1534            del arr
1535
1536    @unittest.skipIf(c_int is None, "requires _ctypes")
1537    def test_rawarray(self):
1538        self.test_array(raw=True)
1539
1540    @unittest.skipIf(c_int is None, "requires _ctypes")
1541    def test_getobj_getlock_obj(self):
1542        arr1 = self.Array('i', list(range(10)))
1543        lock1 = arr1.get_lock()
1544        obj1 = arr1.get_obj()
1545
1546        arr2 = self.Array('i', list(range(10)), lock=None)
1547        lock2 = arr2.get_lock()
1548        obj2 = arr2.get_obj()
1549
1550        lock = self.Lock()
1551        arr3 = self.Array('i', list(range(10)), lock=lock)
1552        lock3 = arr3.get_lock()
1553        obj3 = arr3.get_obj()
1554        self.assertEqual(lock, lock3)
1555
1556        arr4 = self.Array('i', range(10), lock=False)
1557        self.assertFalse(hasattr(arr4, 'get_lock'))
1558        self.assertFalse(hasattr(arr4, 'get_obj'))
1559        self.assertRaises(AttributeError,
1560                          self.Array, 'i', range(10), lock='notalock')
1561
1562        arr5 = self.RawArray('i', range(10))
1563        self.assertFalse(hasattr(arr5, 'get_lock'))
1564        self.assertFalse(hasattr(arr5, 'get_obj'))
1565
1566#
1567#
1568#
1569
1570class _TestContainers(BaseTestCase):
1571
1572    ALLOWED_TYPES = ('manager',)
1573
1574    def test_list(self):
1575        a = self.list(list(range(10)))
1576        self.assertEqual(a[:], list(range(10)))
1577
1578        b = self.list()
1579        self.assertEqual(b[:], [])
1580
1581        b.extend(list(range(5)))
1582        self.assertEqual(b[:], list(range(5)))
1583
1584        self.assertEqual(b[2], 2)
1585        self.assertEqual(b[2:10], [2,3,4])
1586
1587        b *= 2
1588        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1589
1590        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1591
1592        self.assertEqual(a[:], list(range(10)))
1593
1594        d = [a, b]
1595        e = self.list(d)
1596        self.assertEqual(
1597            e[:],
1598            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1599            )
1600
1601        f = self.list([a])
1602        a.append('hello')
1603        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1604
1605    def test_dict(self):
1606        d = self.dict()
1607        indices = list(range(65, 70))
1608        for i in indices:
1609            d[i] = chr(i)
1610        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1611        self.assertEqual(sorted(d.keys()), indices)
1612        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1613        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1614
1615    def test_namespace(self):
1616        n = self.Namespace()
1617        n.name = 'Bob'
1618        n.job = 'Builder'
1619        n._hidden = 'hidden'
1620        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1621        del n.job
1622        self.assertEqual(str(n), "Namespace(name='Bob')")
1623        self.assertTrue(hasattr(n, 'name'))
1624        self.assertTrue(not hasattr(n, 'job'))
1625
1626#
1627#
1628#
1629
1630def sqr(x, wait=0.0):
1631    time.sleep(wait)
1632    return x*x
1633
1634def mul(x, y):
1635    return x*y
1636
1637class _TestPool(BaseTestCase):
1638
1639    @classmethod
1640    def setUpClass(cls):
1641        super().setUpClass()
1642        cls.pool = cls.Pool(4)
1643
1644    @classmethod
1645    def tearDownClass(cls):
1646        cls.pool.terminate()
1647        cls.pool.join()
1648        cls.pool = None
1649        super().tearDownClass()
1650
1651    def test_apply(self):
1652        papply = self.pool.apply
1653        self.assertEqual(papply(sqr, (5,)), sqr(5))
1654        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1655
1656    def test_map(self):
1657        pmap = self.pool.map
1658        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1659        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1660                         list(map(sqr, list(range(100)))))
1661
1662    def test_starmap(self):
1663        psmap = self.pool.starmap
1664        tuples = list(zip(range(10), range(9,-1, -1)))
1665        self.assertEqual(psmap(mul, tuples),
1666                         list(itertools.starmap(mul, tuples)))
1667        tuples = list(zip(range(100), range(99,-1, -1)))
1668        self.assertEqual(psmap(mul, tuples, chunksize=20),
1669                         list(itertools.starmap(mul, tuples)))
1670
1671    def test_starmap_async(self):
1672        tuples = list(zip(range(100), range(99,-1, -1)))
1673        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1674                         list(itertools.starmap(mul, tuples)))
1675
1676    def test_map_async(self):
1677        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1678                         list(map(sqr, list(range(10)))))
1679
1680    def _test_map_async_callbacks(self):
1681        call_args = self.manager.list() if self.TYPE == 'manager' else []
1682        self.pool.map_async(int, ['1'],
1683                            callback=call_args.append,
1684                            error_callback=call_args.append).wait()
1685        self.assertEqual(1, len(call_args))
1686        self.assertEqual([1], call_args[0])
1687        self.pool.map_async(int, ['a'],
1688                            callback=call_args.append,
1689                            error_callback=call_args.append).wait()
1690        self.assertEqual(2, len(call_args))
1691        self.assertIsInstance(call_args[1], ValueError)
1692
1693    def test_map_unplicklable(self):
1694        # Issue #19425 -- failure to pickle should not cause a hang
1695        if self.TYPE == 'threads':
1696            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1697        class A(object):
1698            def __reduce__(self):
1699                raise RuntimeError('cannot pickle')
1700        with self.assertRaises(RuntimeError):
1701            self.pool.map(sqr, [A()]*10)
1702
1703    def test_map_chunksize(self):
1704        try:
1705            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1706        except multiprocessing.TimeoutError:
1707            self.fail("pool.map_async with chunksize stalled on null list")
1708
1709    def test_async(self):
1710        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1711        get = TimingWrapper(res.get)
1712        self.assertEqual(get(), 49)
1713        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1714
1715    def test_async_timeout(self):
1716        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1717        get = TimingWrapper(res.get)
1718        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1719        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1720
1721    def test_imap(self):
1722        it = self.pool.imap(sqr, list(range(10)))
1723        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1724
1725        it = self.pool.imap(sqr, list(range(10)))
1726        for i in range(10):
1727            self.assertEqual(next(it), i*i)
1728        self.assertRaises(StopIteration, it.__next__)
1729
1730        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1731        for i in range(1000):
1732            self.assertEqual(next(it), i*i)
1733        self.assertRaises(StopIteration, it.__next__)
1734
1735    def test_imap_unordered(self):
1736        it = self.pool.imap_unordered(sqr, list(range(1000)))
1737        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1738
1739        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1740        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1741
1742    def test_make_pool(self):
1743        self.assertRaises(ValueError, multiprocessing.Pool, -1)
1744        self.assertRaises(ValueError, multiprocessing.Pool, 0)
1745
1746        p = multiprocessing.Pool(3)
1747        self.assertEqual(3, len(p._pool))
1748        p.close()
1749        p.join()
1750
1751    def test_terminate(self):
1752        result = self.pool.map_async(
1753            time.sleep, [0.1 for i in range(10000)], chunksize=1
1754            )
1755        self.pool.terminate()
1756        join = TimingWrapper(self.pool.join)
1757        join()
1758        self.assertLess(join.elapsed, 0.5)
1759
1760    def test_empty_iterable(self):
1761        # See Issue 12157
1762        p = self.Pool(1)
1763
1764        self.assertEqual(p.map(sqr, []), [])
1765        self.assertEqual(list(p.imap(sqr, [])), [])
1766        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1767        self.assertEqual(p.map_async(sqr, []).get(), [])
1768
1769        p.close()
1770        p.join()
1771
1772    def test_context(self):
1773        if self.TYPE == 'processes':
1774            L = list(range(10))
1775            expected = [sqr(i) for i in L]
1776            with multiprocessing.Pool(2) as p:
1777                r = p.map_async(sqr, L)
1778                self.assertEqual(r.get(), expected)
1779            self.assertRaises(ValueError, p.map_async, sqr, L)
1780
1781def raising():
1782    raise KeyError("key")
1783
1784def unpickleable_result():
1785    return lambda: 42
1786
1787class _TestPoolWorkerErrors(BaseTestCase):
1788    ALLOWED_TYPES = ('processes', )
1789
1790    def test_async_error_callback(self):
1791        p = multiprocessing.Pool(2)
1792
1793        scratchpad = [None]
1794        def errback(exc):
1795            scratchpad[0] = exc
1796
1797        res = p.apply_async(raising, error_callback=errback)
1798        self.assertRaises(KeyError, res.get)
1799        self.assertTrue(scratchpad[0])
1800        self.assertIsInstance(scratchpad[0], KeyError)
1801
1802        p.close()
1803        p.join()
1804
1805    def _test_unpickleable_result(self):
1806        from multiprocess.pool import MaybeEncodingError
1807        p = multiprocessing.Pool(2)
1808
1809        # Make sure we don't lose pool processes because of encoding errors.
1810        for iteration in range(20):
1811
1812            scratchpad = [None]
1813            def errback(exc):
1814                scratchpad[0] = exc
1815
1816            res = p.apply_async(unpickleable_result, error_callback=errback)
1817            self.assertRaises(MaybeEncodingError, res.get)
1818            wrapped = scratchpad[0]
1819            self.assertTrue(wrapped)
1820            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1821            self.assertIsNotNone(wrapped.exc)
1822            self.assertIsNotNone(wrapped.value)
1823
1824        p.close()
1825        p.join()
1826
1827class _TestPoolWorkerLifetime(BaseTestCase):
1828    ALLOWED_TYPES = ('processes', )
1829
1830    def test_pool_worker_lifetime(self):
1831        p = multiprocessing.Pool(3, maxtasksperchild=10)
1832        self.assertEqual(3, len(p._pool))
1833        origworkerpids = [w.pid for w in p._pool]
1834        # Run many tasks so each worker gets replaced (hopefully)
1835        results = []
1836        for i in range(100):
1837            results.append(p.apply_async(sqr, (i, )))
1838        # Fetch the results and verify we got the right answers,
1839        # also ensuring all the tasks have completed.
1840        for (j, res) in enumerate(results):
1841            self.assertEqual(res.get(), sqr(j))
1842        # Refill the pool
1843        p._repopulate_pool()
1844        # Wait until all workers are alive
1845        # (countdown * DELTA = 5 seconds max startup process time)
1846        countdown = 50
1847        while countdown and not all(w.is_alive() for w in p._pool):
1848            countdown -= 1
1849            time.sleep(DELTA)
1850        finalworkerpids = [w.pid for w in p._pool]
1851        # All pids should be assigned.  See issue #7805.
1852        self.assertNotIn(None, origworkerpids)
1853        self.assertNotIn(None, finalworkerpids)
1854        # Finally, check that the worker pids have changed
1855        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1856        p.close()
1857        p.join()
1858
1859    def test_pool_worker_lifetime_early_close(self):
1860        # Issue #10332: closing a pool whose workers have limited lifetimes
1861        # before all the tasks completed would make join() hang.
1862        p = multiprocessing.Pool(3, maxtasksperchild=1)
1863        results = []
1864        for i in range(6):
1865            results.append(p.apply_async(sqr, (i, 0.3)))
1866        p.close()
1867        p.join()
1868        # check the results
1869        for (j, res) in enumerate(results):
1870            self.assertEqual(res.get(), sqr(j))
1871
1872#
1873# Test of creating a customized manager class
1874#
1875
1876from multiprocess.managers import BaseManager, BaseProxy, RemoteError
1877
1878class FooBar(object):
1879    def f(self):
1880        return 'f()'
1881    def g(self):
1882        raise ValueError
1883    def _h(self):
1884        return '_h()'
1885
1886def baz():
1887    for i in range(10):
1888        yield i*i
1889
1890class IteratorProxy(BaseProxy):
1891    _exposed_ = ('__next__',)
1892    def __iter__(self):
1893        return self
1894    def __next__(self):
1895        return self._callmethod('__next__')
1896
1897class MyManager(BaseManager):
1898    pass
1899
1900MyManager.register('Foo', callable=FooBar)
1901MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
1902MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
1903
1904
1905class _TestMyManager(BaseTestCase):
1906
1907    ALLOWED_TYPES = ('manager',)
1908
1909    def test_mymanager(self):
1910        manager = MyManager()
1911        manager.start()
1912        self.common(manager)
1913        manager.shutdown()
1914
1915        # If the manager process exited cleanly then the exitcode
1916        # will be zero.  Otherwise (after a short timeout)
1917        # terminate() is used, resulting in an exitcode of -SIGTERM.
1918        self.assertEqual(manager._process.exitcode, 0)
1919
1920    def test_mymanager_context(self):
1921        with MyManager() as manager:
1922            self.common(manager)
1923        self.assertEqual(manager._process.exitcode, 0)
1924
1925    def test_mymanager_context_prestarted(self):
1926        manager = MyManager()
1927        manager.start()
1928        with manager:
1929            self.common(manager)
1930        self.assertEqual(manager._process.exitcode, 0)
1931
1932    def common(self, manager):
1933        foo = manager.Foo()
1934        bar = manager.Bar()
1935        baz = manager.baz()
1936
1937        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
1938        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
1939
1940        self.assertEqual(foo_methods, ['f', 'g'])
1941        self.assertEqual(bar_methods, ['f', '_h'])
1942
1943        self.assertEqual(foo.f(), 'f()')
1944        self.assertRaises(ValueError, foo.g)
1945        self.assertEqual(foo._callmethod('f'), 'f()')
1946        self.assertRaises(RemoteError, foo._callmethod, '_h')
1947
1948        self.assertEqual(bar.f(), 'f()')
1949        self.assertEqual(bar._h(), '_h()')
1950        self.assertEqual(bar._callmethod('f'), 'f()')
1951        self.assertEqual(bar._callmethod('_h'), '_h()')
1952
1953        self.assertEqual(list(baz), [i*i for i in range(10)])
1954
1955
1956#
1957# Test of connecting to a remote server and using xmlrpclib for serialization
1958#
1959
1960_queue = pyqueue.Queue()
1961def get_queue():
1962    return _queue
1963
1964class QueueManager(BaseManager):
1965    '''manager class used by server process'''
1966QueueManager.register('get_queue', callable=get_queue)
1967
1968class QueueManager2(BaseManager):
1969    '''manager class which specifies the same interface as QueueManager'''
1970QueueManager2.register('get_queue')
1971
1972
1973SERIALIZER = 'xmlrpclib'
1974
1975class _TestRemoteManager(BaseTestCase):
1976
1977    ALLOWED_TYPES = ('manager',)
1978
1979    @classmethod
1980    def _putter(cls, address, authkey):
1981        manager = QueueManager2(
1982            address=address, authkey=authkey, serializer=SERIALIZER
1983            )
1984        manager.connect()
1985        queue = manager.get_queue()
1986        queue.put(('hello world', None, True, 2.25))
1987
1988    def test_remote(self):
1989        authkey = os.urandom(32)
1990
1991        manager = QueueManager(
1992            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
1993            )
1994        manager.start()
1995
1996        p = self.Process(target=self._putter, args=(manager.address, authkey))
1997        p.daemon = True
1998        p.start()
1999
2000        manager2 = QueueManager2(
2001            address=manager.address, authkey=authkey, serializer=SERIALIZER
2002            )
2003        manager2.connect()
2004        queue = manager2.get_queue()
2005
2006        # Note that xmlrpclib will deserialize object as a list not a tuple
2007        self.assertEqual(queue.get(), ['hello world', None, True, 2.25])
2008
2009        # Because we are using xmlrpclib for serialization instead of
2010        # pickle this will cause a serialization error.
2011        self.assertRaises(Exception, queue.put, time.sleep)
2012
2013        # Make queue finalizer run before the server is stopped
2014        del queue
2015        manager.shutdown()
2016
2017class _TestManagerRestart(BaseTestCase):
2018
2019    @classmethod
2020    def _putter(cls, address, authkey):
2021        manager = QueueManager(
2022            address=address, authkey=authkey, serializer=SERIALIZER)
2023        manager.connect()
2024        queue = manager.get_queue()
2025        queue.put('hello world')
2026
2027    def test_rapid_restart(self):
2028        authkey = os.urandom(32)
2029        manager = QueueManager(
2030            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2031        srvr = manager.get_server()
2032        addr = srvr.address
2033        # Close the connection.Listener socket which gets opened as a part
2034        # of manager.get_server(). It's not needed for the test.
2035        srvr.listener.close()
2036        manager.start()
2037
2038        p = self.Process(target=self._putter, args=(manager.address, authkey))
2039        p.daemon = True
2040        p.start()
2041        queue = manager.get_queue()
2042        self.assertEqual(queue.get(), 'hello world')
2043        del queue
2044        manager.shutdown()
2045        manager = QueueManager(
2046            address=addr, authkey=authkey, serializer=SERIALIZER)
2047        try:
2048            manager.start()
2049        except OSError as e:
2050            if e.errno != errno.EADDRINUSE:
2051                raise
2052            # Retry after some time, in case the old socket was lingering
2053            # (sporadic failure on buildbots)
2054            time.sleep(1.0)
2055            manager = QueueManager(
2056                address=addr, authkey=authkey, serializer=SERIALIZER)
2057        manager.shutdown()
2058
2059#
2060#
2061#
2062
2063SENTINEL = latin('')
2064
2065class _TestConnection(BaseTestCase):
2066
2067    ALLOWED_TYPES = ('processes', 'threads')
2068
2069    @classmethod
2070    def _echo(cls, conn):
2071        for msg in iter(conn.recv_bytes, SENTINEL):
2072            conn.send_bytes(msg)
2073        conn.close()
2074
2075    def test_connection(self):
2076        conn, child_conn = self.Pipe()
2077
2078        p = self.Process(target=self._echo, args=(child_conn,))
2079        p.daemon = True
2080        p.start()
2081
2082        seq = [1, 2.25, None]
2083        msg = latin('hello world')
2084        longmsg = msg * 10
2085        arr = array.array('i', list(range(4)))
2086
2087        if self.TYPE == 'processes':
2088            self.assertEqual(type(conn.fileno()), int)
2089
2090        self.assertEqual(conn.send(seq), None)
2091        self.assertEqual(conn.recv(), seq)
2092
2093        self.assertEqual(conn.send_bytes(msg), None)
2094        self.assertEqual(conn.recv_bytes(), msg)
2095
2096        if self.TYPE == 'processes':
2097            buffer = array.array('i', [0]*10)
2098            expected = list(arr) + [0] * (10 - len(arr))
2099            self.assertEqual(conn.send_bytes(arr), None)
2100            self.assertEqual(conn.recv_bytes_into(buffer),
2101                             len(arr) * buffer.itemsize)
2102            self.assertEqual(list(buffer), expected)
2103
2104            buffer = array.array('i', [0]*10)
2105            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2106            self.assertEqual(conn.send_bytes(arr), None)
2107            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2108                             len(arr) * buffer.itemsize)
2109            self.assertEqual(list(buffer), expected)
2110
2111            buffer = bytearray(latin(' ' * 40))
2112            self.assertEqual(conn.send_bytes(longmsg), None)
2113            try:
2114                res = conn.recv_bytes_into(buffer)
2115            except multiprocessing.BufferTooShort as e:
2116                self.assertEqual(e.args, (longmsg,))
2117            else:
2118                self.fail('expected BufferTooShort, got %s' % res)
2119
2120        poll = TimingWrapper(conn.poll)
2121
2122        self.assertEqual(poll(), False)
2123        self.assertTimingAlmostEqual(poll.elapsed, 0)
2124
2125        self.assertEqual(poll(-1), False)
2126        self.assertTimingAlmostEqual(poll.elapsed, 0)
2127
2128        self.assertEqual(poll(TIMEOUT1), False)
2129        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2130
2131        conn.send(None)
2132        time.sleep(.1)
2133
2134        self.assertEqual(poll(TIMEOUT1), True)
2135        self.assertTimingAlmostEqual(poll.elapsed, 0)
2136
2137        self.assertEqual(conn.recv(), None)
2138
2139        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
2140        conn.send_bytes(really_big_msg)
2141        self.assertEqual(conn.recv_bytes(), really_big_msg)
2142
2143        conn.send_bytes(SENTINEL)                          # tell child to quit
2144        child_conn.close()
2145
2146        if self.TYPE == 'processes':
2147            self.assertEqual(conn.readable, True)
2148            self.assertEqual(conn.writable, True)
2149            self.assertRaises(EOFError, conn.recv)
2150            self.assertRaises(EOFError, conn.recv_bytes)
2151
2152        p.join()
2153
2154    def test_duplex_false(self):
2155        reader, writer = self.Pipe(duplex=False)
2156        self.assertEqual(writer.send(1), None)
2157        self.assertEqual(reader.recv(), 1)
2158        if self.TYPE == 'processes':
2159            self.assertEqual(reader.readable, True)
2160            self.assertEqual(reader.writable, False)
2161            self.assertEqual(writer.readable, False)
2162            self.assertEqual(writer.writable, True)
2163            self.assertRaises(OSError, reader.send, 2)
2164            self.assertRaises(OSError, writer.recv)
2165            self.assertRaises(OSError, writer.poll)
2166
2167    def test_spawn_close(self):
2168        # We test that a pipe connection can be closed by parent
2169        # process immediately after child is spawned.  On Windows this
2170        # would have sometimes failed on old versions because
2171        # child_conn would be closed before the child got a chance to
2172        # duplicate it.
2173        conn, child_conn = self.Pipe()
2174
2175        p = self.Process(target=self._echo, args=(child_conn,))
2176        p.daemon = True
2177        p.start()
2178        child_conn.close()    # this might complete before child initializes
2179
2180        msg = latin('hello')
2181        conn.send_bytes(msg)
2182        self.assertEqual(conn.recv_bytes(), msg)
2183
2184        conn.send_bytes(SENTINEL)
2185        conn.close()
2186        p.join()
2187
2188    def test_sendbytes(self):
2189        if self.TYPE != 'processes':
2190            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2191
2192        msg = latin('abcdefghijklmnopqrstuvwxyz')
2193        a, b = self.Pipe()
2194
2195        a.send_bytes(msg)
2196        self.assertEqual(b.recv_bytes(), msg)
2197
2198        a.send_bytes(msg, 5)
2199        self.assertEqual(b.recv_bytes(), msg[5:])
2200
2201        a.send_bytes(msg, 7, 8)
2202        self.assertEqual(b.recv_bytes(), msg[7:7+8])
2203
2204        a.send_bytes(msg, 26)
2205        self.assertEqual(b.recv_bytes(), latin(''))
2206
2207        a.send_bytes(msg, 26, 0)
2208        self.assertEqual(b.recv_bytes(), latin(''))
2209
2210        self.assertRaises(ValueError, a.send_bytes, msg, 27)
2211
2212        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2213
2214        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2215
2216        self.assertRaises(ValueError, a.send_bytes, msg, -1)
2217
2218        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2219
2220    @classmethod
2221    def _is_fd_assigned(cls, fd):
2222        try:
2223            os.fstat(fd)
2224        except OSError as e:
2225            if e.errno == errno.EBADF:
2226                return False
2227            raise
2228        else:
2229            return True
2230
2231    @classmethod
2232    def _writefd(cls, conn, data, create_dummy_fds=False):
2233        if create_dummy_fds:
2234            for i in range(0, 256):
2235                if not cls._is_fd_assigned(i):
2236                    os.dup2(conn.fileno(), i)
2237        fd = reduction.recv_handle(conn)
2238        if msvcrt:
2239            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2240        os.write(fd, data)
2241        os.close(fd)
2242
2243    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2244    def test_fd_transfer(self):
2245        if self.TYPE != 'processes':
2246            self.skipTest("only makes sense with processes")
2247        conn, child_conn = self.Pipe(duplex=True)
2248
2249        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
2250        p.daemon = True
2251        p.start()
2252        self.addCleanup(test.support.unlink, test.support.TESTFN)
2253        with open(test.support.TESTFN, "wb") as f:
2254            fd = f.fileno()
2255            if msvcrt:
2256                fd = msvcrt.get_osfhandle(fd)
2257            reduction.send_handle(conn, fd, p.pid)
2258        p.join()
2259        with open(test.support.TESTFN, "rb") as f:
2260            self.assertEqual(f.read(), b"foo")
2261
2262    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2263    @unittest.skipIf(sys.platform == "win32",
2264                     "test semantics don't make sense on Windows")
2265    @unittest.skipIf(MAXFD <= 256,
2266                     "largest assignable fd number is too small")
2267    @unittest.skipUnless(hasattr(os, "dup2"),
2268                         "test needs os.dup2()")
2269    def test_large_fd_transfer(self):
2270        # With fd > 256 (issue #11657)
2271        if self.TYPE != 'processes':
2272            self.skipTest("only makes sense with processes")
2273        conn, child_conn = self.Pipe(duplex=True)
2274
2275        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
2276        p.daemon = True
2277        p.start()
2278        self.addCleanup(test.support.unlink, test.support.TESTFN)
2279        with open(test.support.TESTFN, "wb") as f:
2280            fd = f.fileno()
2281            for newfd in range(256, MAXFD):
2282                if not self._is_fd_assigned(newfd):
2283                    break
2284            else:
2285                self.fail("could not find an unassigned large file descriptor")
2286            os.dup2(fd, newfd)
2287            try:
2288                reduction.send_handle(conn, newfd, p.pid)
2289            finally:
2290                os.close(newfd)
2291        p.join()
2292        with open(test.support.TESTFN, "rb") as f:
2293            self.assertEqual(f.read(), b"bar")
2294
2295    @classmethod
2296    def _send_data_without_fd(self, conn):
2297        os.write(conn.fileno(), b"\0")
2298
2299    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2300    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2301    def test_missing_fd_transfer(self):
2302        # Check that exception is raised when received data is not
2303        # accompanied by a file descriptor in ancillary data.
2304        if self.TYPE != 'processes':
2305            self.skipTest("only makes sense with processes")
2306        conn, child_conn = self.Pipe(duplex=True)
2307
2308        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2309        p.daemon = True
2310        p.start()
2311        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2312        p.join()
2313
2314    def test_context(self):
2315        a, b = self.Pipe()
2316
2317        with a, b:
2318            a.send(1729)
2319            self.assertEqual(b.recv(), 1729)
2320            if self.TYPE == 'processes':
2321                self.assertFalse(a.closed)
2322                self.assertFalse(b.closed)
2323
2324        if self.TYPE == 'processes':
2325            self.assertTrue(a.closed)
2326            self.assertTrue(b.closed)
2327            self.assertRaises(OSError, a.recv)
2328            self.assertRaises(OSError, b.recv)
2329
2330class _TestListener(BaseTestCase):
2331
2332    ALLOWED_TYPES = ('processes',)
2333
2334    def test_multiple_bind(self):
2335        for family in self.connection.families:
2336            l = self.connection.Listener(family=family)
2337            self.addCleanup(l.close)
2338            self.assertRaises(OSError, self.connection.Listener,
2339                              l.address, family)
2340
2341    def test_context(self):
2342        with self.connection.Listener() as l:
2343            with self.connection.Client(l.address) as c:
2344                with l.accept() as d:
2345                    c.send(1729)
2346                    self.assertEqual(d.recv(), 1729)
2347
2348        if self.TYPE == 'processes':
2349            self.assertRaises(OSError, l.accept)
2350
2351class _TestListenerClient(BaseTestCase):
2352
2353    ALLOWED_TYPES = ('processes', 'threads')
2354
2355    @classmethod
2356    def _test(cls, address):
2357        conn = cls.connection.Client(address)
2358        conn.send('hello')
2359        conn.close()
2360
2361    def test_listener_client(self):
2362        for family in self.connection.families:
2363            l = self.connection.Listener(family=family)
2364            p = self.Process(target=self._test, args=(l.address,))
2365            p.daemon = True
2366            p.start()
2367            conn = l.accept()
2368            self.assertEqual(conn.recv(), 'hello')
2369            p.join()
2370            l.close()
2371
2372    def test_issue14725(self):
2373        l = self.connection.Listener()
2374        p = self.Process(target=self._test, args=(l.address,))
2375        p.daemon = True
2376        p.start()
2377        time.sleep(1)
2378        # On Windows the client process should by now have connected,
2379        # written data and closed the pipe handle by now.  This causes
2380        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
2381        # 14725.
2382        conn = l.accept()
2383        self.assertEqual(conn.recv(), 'hello')
2384        conn.close()
2385        p.join()
2386        l.close()
2387
2388    def test_issue16955(self):
2389        for fam in self.connection.families:
2390            l = self.connection.Listener(family=fam)
2391            c = self.connection.Client(l.address)
2392            a = l.accept()
2393            a.send_bytes(b"hello")
2394            self.assertTrue(c.poll(1))
2395            a.close()
2396            c.close()
2397            l.close()
2398
2399class _TestPoll(BaseTestCase):
2400
2401    ALLOWED_TYPES = ('processes', 'threads')
2402
2403    def test_empty_string(self):
2404        a, b = self.Pipe()
2405        self.assertEqual(a.poll(), False)
2406        b.send_bytes(b'')
2407        self.assertEqual(a.poll(), True)
2408        self.assertEqual(a.poll(), True)
2409
2410    @classmethod
2411    def _child_strings(cls, conn, strings):
2412        for s in strings:
2413            time.sleep(0.1)
2414            conn.send_bytes(s)
2415        conn.close()
2416
2417    def test_strings(self):
2418        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2419        a, b = self.Pipe()
2420        p = self.Process(target=self._child_strings, args=(b, strings))
2421        p.start()
2422
2423        for s in strings:
2424            for i in range(200):
2425                if a.poll(0.01):
2426                    break
2427            x = a.recv_bytes()
2428            self.assertEqual(s, x)
2429
2430        p.join()
2431
2432    @classmethod
2433    def _child_boundaries(cls, r):
2434        # Polling may "pull" a message in to the child process, but we
2435        # don't want it to pull only part of a message, as that would
2436        # corrupt the pipe for any other processes which might later
2437        # read from it.
2438        r.poll(5)
2439
2440    def test_boundaries(self):
2441        r, w = self.Pipe(False)
2442        p = self.Process(target=self._child_boundaries, args=(r,))
2443        p.start()
2444        time.sleep(2)
2445        L = [b"first", b"second"]
2446        for obj in L:
2447            w.send_bytes(obj)
2448        w.close()
2449        p.join()
2450        self.assertIn(r.recv_bytes(), L)
2451
2452    @classmethod
2453    def _child_dont_merge(cls, b):
2454        b.send_bytes(b'a')
2455        b.send_bytes(b'b')
2456        b.send_bytes(b'cd')
2457
2458    def test_dont_merge(self):
2459        a, b = self.Pipe()
2460        self.assertEqual(a.poll(0.0), False)
2461        self.assertEqual(a.poll(0.1), False)
2462
2463        p = self.Process(target=self._child_dont_merge, args=(b,))
2464        p.start()
2465
2466        self.assertEqual(a.recv_bytes(), b'a')
2467        self.assertEqual(a.poll(1.0), True)
2468        self.assertEqual(a.poll(1.0), True)
2469        self.assertEqual(a.recv_bytes(), b'b')
2470        self.assertEqual(a.poll(1.0), True)
2471        self.assertEqual(a.poll(1.0), True)
2472        self.assertEqual(a.poll(0.0), True)
2473        self.assertEqual(a.recv_bytes(), b'cd')
2474
2475        p.join()
2476
2477#
2478# Test of sending connection and socket objects between processes
2479#
2480
2481@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2482class _TestPicklingConnections(BaseTestCase):
2483
2484    ALLOWED_TYPES = ('processes',)
2485
2486    @classmethod
2487    def tearDownClass(cls):
2488        from multiprocess.reduction import resource_sharer
2489        resource_sharer.stop(timeout=5)
2490
2491    @classmethod
2492    def _listener(cls, conn, families):
2493        for fam in families:
2494            l = cls.connection.Listener(family=fam)
2495            conn.send(l.address)
2496            new_conn = l.accept()
2497            conn.send(new_conn)
2498            new_conn.close()
2499            l.close()
2500
2501        l = socket.socket()
2502        l.bind((test.support.HOST, 0))
2503        l.listen(1)
2504        conn.send(l.getsockname())
2505        new_conn, addr = l.accept()
2506        conn.send(new_conn)
2507        new_conn.close()
2508        l.close()
2509
2510        conn.recv()
2511
2512    @classmethod
2513    def _remote(cls, conn):
2514        for (address, msg) in iter(conn.recv, None):
2515            client = cls.connection.Client(address)
2516            client.send(msg.upper())
2517            client.close()
2518
2519        address, msg = conn.recv()
2520        client = socket.socket()
2521        client.connect(address)
2522        client.sendall(msg.upper())
2523        client.close()
2524
2525        conn.close()
2526
2527    def test_pickling(self):
2528        families = self.connection.families
2529
2530        lconn, lconn0 = self.Pipe()
2531        lp = self.Process(target=self._listener, args=(lconn0, families))
2532        lp.daemon = True
2533        lp.start()
2534        lconn0.close()
2535
2536        rconn, rconn0 = self.Pipe()
2537        rp = self.Process(target=self._remote, args=(rconn0,))
2538        rp.daemon = True
2539        rp.start()
2540        rconn0.close()
2541
2542        for fam in families:
2543            msg = ('This connection uses family %s' % fam).encode('ascii')
2544            address = lconn.recv()
2545            rconn.send((address, msg))
2546            new_conn = lconn.recv()
2547            self.assertEqual(new_conn.recv(), msg.upper())
2548
2549        rconn.send(None)
2550
2551        msg = latin('This connection uses a normal socket')
2552        address = lconn.recv()
2553        rconn.send((address, msg))
2554        new_conn = lconn.recv()
2555        buf = []
2556        while True:
2557            s = new_conn.recv(100)
2558            if not s:
2559                break
2560            buf.append(s)
2561        buf = b''.join(buf)
2562        self.assertEqual(buf, msg.upper())
2563        new_conn.close()
2564
2565        lconn.send(None)
2566
2567        rconn.close()
2568        lconn.close()
2569
2570        lp.join()
2571        rp.join()
2572
2573    @classmethod
2574    def child_access(cls, conn):
2575        w = conn.recv()
2576        w.send('all is well')
2577        w.close()
2578
2579        r = conn.recv()
2580        msg = r.recv()
2581        conn.send(msg*2)
2582
2583        conn.close()
2584
2585    def test_access(self):
2586        # On Windows, if we do not specify a destination pid when
2587        # using DupHandle then we need to be careful to use the
2588        # correct access flags for DuplicateHandle(), or else
2589        # DupHandle.detach() will raise PermissionError.  For example,
2590        # for a read only pipe handle we should use
2591        # access=FILE_GENERIC_READ.  (Unfortunately
2592        # DUPLICATE_SAME_ACCESS does not work.)
2593        conn, child_conn = self.Pipe()
2594        p = self.Process(target=self.child_access, args=(child_conn,))
2595        p.daemon = True
2596        p.start()
2597        child_conn.close()
2598
2599        r, w = self.Pipe(duplex=False)
2600        conn.send(w)
2601        w.close()
2602        self.assertEqual(r.recv(), 'all is well')
2603        r.close()
2604
2605        r, w = self.Pipe(duplex=False)
2606        conn.send(r)
2607        r.close()
2608        w.send('foobar')
2609        w.close()
2610        self.assertEqual(conn.recv(), 'foobar'*2)
2611
2612#
2613#
2614#
2615
2616class _TestHeap(BaseTestCase):
2617
2618    ALLOWED_TYPES = ('processes',)
2619
2620    def test_heap(self):
2621        iterations = 5000
2622        maxblocks = 50
2623        blocks = []
2624
2625        # create and destroy lots of blocks of different sizes
2626        for i in range(iterations):
2627            size = int(random.lognormvariate(0, 1) * 1000)
2628            b = multiprocessing.heap.BufferWrapper(size)
2629            blocks.append(b)
2630            if len(blocks) > maxblocks:
2631                i = random.randrange(maxblocks)
2632                del blocks[i]
2633
2634        # get the heap object
2635        heap = multiprocessing.heap.BufferWrapper._heap
2636
2637        # verify the state of the heap
2638        all = []
2639        occupied = 0
2640        heap._lock.acquire()
2641        self.addCleanup(heap._lock.release)
2642        for L in list(heap._len_to_seq.values()):
2643            for arena, start, stop in L:
2644                all.append((heap._arenas.index(arena), start, stop,
2645                            stop-start, 'free'))
2646        for arena, start, stop in heap._allocated_blocks:
2647            all.append((heap._arenas.index(arena), start, stop,
2648                        stop-start, 'occupied'))
2649            occupied += (stop-start)
2650
2651        all.sort()
2652
2653        for i in range(len(all)-1):
2654            (arena, start, stop) = all[i][:3]
2655            (narena, nstart, nstop) = all[i+1][:3]
2656            self.assertTrue((arena != narena and nstart == 0) or
2657                            (stop == nstart))
2658
2659    def test_free_from_gc(self):
2660        # Check that freeing of blocks by the garbage collector doesn't deadlock
2661        # (issue #12352).
2662        # Make sure the GC is enabled, and set lower collection thresholds to
2663        # make collections more frequent (and increase the probability of
2664        # deadlock).
2665        if not gc.isenabled():
2666            gc.enable()
2667            self.addCleanup(gc.disable)
2668        thresholds = gc.get_threshold()
2669        self.addCleanup(gc.set_threshold, *thresholds)
2670        gc.set_threshold(10)
2671
2672        # perform numerous block allocations, with cyclic references to make
2673        # sure objects are collected asynchronously by the gc
2674        for i in range(5000):
2675            a = multiprocessing.heap.BufferWrapper(1)
2676            b = multiprocessing.heap.BufferWrapper(1)
2677            # circular references
2678            a.buddy = b
2679            b.buddy = a
2680
2681#
2682#
2683#
2684
2685class _Foo(Structure):
2686    _fields_ = [
2687        ('x', c_int),
2688        ('y', c_double)
2689        ]
2690
2691class _TestSharedCTypes(BaseTestCase):
2692
2693    ALLOWED_TYPES = ('processes',)
2694
2695    def setUp(self):
2696        if not HAS_SHAREDCTYPES:
2697            self.skipTest("requires multiprocessing.sharedctypes")
2698
2699    @classmethod
2700    def _double(cls, x, y, foo, arr, string):
2701        x.value *= 2
2702        y.value *= 2
2703        foo.x *= 2
2704        foo.y *= 2
2705        string.value *= 2
2706        for i in range(len(arr)):
2707            arr[i] *= 2
2708
2709    def test_sharedctypes(self, lock=False):
2710        x = Value('i', 7, lock=lock)
2711        y = Value(c_double, 1.0/3.0, lock=lock)
2712        foo = Value(_Foo, 3, 2, lock=lock)
2713        arr = self.Array('d', list(range(10)), lock=lock)
2714        string = self.Array('c', 20, lock=lock)
2715        string.value = latin('hello')
2716
2717        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2718        p.daemon = True
2719        p.start()
2720        p.join()
2721
2722        self.assertEqual(x.value, 14)
2723        self.assertAlmostEqual(y.value, 2.0/3.0)
2724        self.assertEqual(foo.x, 6)
2725        self.assertAlmostEqual(foo.y, 4.0)
2726        for i in range(10):
2727            self.assertAlmostEqual(arr[i], i*2)
2728        self.assertEqual(string.value, latin('hellohello'))
2729
2730    def test_synchronize(self):
2731        self.test_sharedctypes(lock=True)
2732
2733    def test_copy(self):
2734        foo = _Foo(2, 5.0)
2735        bar = copy(foo)
2736        foo.x = 0
2737        foo.y = 0
2738        self.assertEqual(bar.x, 2)
2739        self.assertAlmostEqual(bar.y, 5.0)
2740
2741#
2742#
2743#
2744
2745class _TestFinalize(BaseTestCase):
2746
2747    ALLOWED_TYPES = ('processes',)
2748
2749    @classmethod
2750    def _test_finalize(cls, conn):
2751        class Foo(object):
2752            pass
2753
2754        a = Foo()
2755        util.Finalize(a, conn.send, args=('a',))
2756        del a           # triggers callback for a
2757
2758        b = Foo()
2759        close_b = util.Finalize(b, conn.send, args=('b',))
2760        close_b()       # triggers callback for b
2761        close_b()       # does nothing because callback has already been called
2762        del b           # does nothing because callback has already been called
2763
2764        c = Foo()
2765        util.Finalize(c, conn.send, args=('c',))
2766
2767        d10 = Foo()
2768        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2769
2770        d01 = Foo()
2771        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2772        d02 = Foo()
2773        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2774        d03 = Foo()
2775        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2776
2777        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2778
2779        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2780
2781        # call multiprocessing's cleanup function then exit process without
2782        # garbage collecting locals
2783        util._exit_function()
2784        conn.close()
2785        os._exit(0)
2786
2787    def test_finalize(self):
2788        conn, child_conn = self.Pipe()
2789
2790        p = self.Process(target=self._test_finalize, args=(child_conn,))
2791        p.daemon = True
2792        p.start()
2793        p.join()
2794
2795        result = [obj for obj in iter(conn.recv, 'STOP')]
2796        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2797
2798#
2799# Test that from ... import * works for each module
2800#
2801
2802class _TestImportStar(BaseTestCase):
2803
2804    ALLOWED_TYPES = ('processes',)
2805
2806    def test_import(self):
2807        modules = [
2808            'multiprocess', 'multiprocess.connection',
2809            'multiprocess.heap', 'multiprocess.managers',
2810            'multiprocess.pool', 'multiprocess.process',
2811            'multiprocess.synchronize', 'multiprocess.util'
2812            ]
2813
2814        if HAS_REDUCTION:
2815            modules.append('multiprocess.reduction')
2816
2817        if c_int is not None:
2818            # This module requires _ctypes
2819            modules.append('multiprocess.sharedctypes')
2820
2821        for name in modules:
2822            __import__(name)
2823            mod = sys.modules[name]
2824
2825            for attr in getattr(mod, '__all__', ()):
2826                self.assertTrue(
2827                    hasattr(mod, attr),
2828                    '%r does not have attribute %r' % (mod, attr)
2829                    )
2830
2831#
2832# Quick test that logging works -- does not test logging output
2833#
2834
2835class _TestLogging(BaseTestCase):
2836
2837    ALLOWED_TYPES = ('processes',)
2838
2839    def test_enable_logging(self):
2840        logger = multiprocessing.get_logger()
2841        logger.setLevel(util.SUBWARNING)
2842        self.assertTrue(logger is not None)
2843        logger.debug('this will not be printed')
2844        logger.info('nor will this')
2845        logger.setLevel(LOG_LEVEL)
2846
2847    @classmethod
2848    def _test_level(cls, conn):
2849        logger = multiprocessing.get_logger()
2850        conn.send(logger.getEffectiveLevel())
2851
2852    def test_level(self):
2853        LEVEL1 = 32
2854        LEVEL2 = 37
2855
2856        logger = multiprocessing.get_logger()
2857        root_logger = logging.getLogger()
2858        root_level = root_logger.level
2859
2860        reader, writer = multiprocessing.Pipe(duplex=False)
2861
2862        logger.setLevel(LEVEL1)
2863        p = self.Process(target=self._test_level, args=(writer,))
2864        p.daemon = True
2865        p.start()
2866        self.assertEqual(LEVEL1, reader.recv())
2867
2868        logger.setLevel(logging.NOTSET)
2869        root_logger.setLevel(LEVEL2)
2870        p = self.Process(target=self._test_level, args=(writer,))
2871        p.daemon = True
2872        p.start()
2873        self.assertEqual(LEVEL2, reader.recv())
2874
2875        root_logger.setLevel(root_level)
2876        logger.setLevel(level=LOG_LEVEL)
2877
2878
2879# class _TestLoggingProcessName(BaseTestCase):
2880#
2881#     def handle(self, record):
2882#         assert record.processName == multiprocessing.current_process().name
2883#         self.__handled = True
2884#
2885#     def test_logging(self):
2886#         handler = logging.Handler()
2887#         handler.handle = self.handle
2888#         self.__handled = False
2889#         # Bypass getLogger() and side-effects
2890#         logger = logging.getLoggerClass()(
2891#                 'multiprocessing.test.TestLoggingProcessName')
2892#         logger.addHandler(handler)
2893#         logger.propagate = False
2894#
2895#         logger.warn('foo')
2896#         assert self.__handled
2897
2898#
2899# Check that Process.join() retries if os.waitpid() fails with EINTR
2900#
2901
2902class _TestPollEintr(BaseTestCase):
2903
2904    ALLOWED_TYPES = ('processes',)
2905
2906    @classmethod
2907    def _killer(cls, pid):
2908        time.sleep(0.5)
2909        os.kill(pid, signal.SIGUSR1)
2910
2911    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
2912    def test_poll_eintr(self):
2913        got_signal = [False]
2914        def record(*args):
2915            got_signal[0] = True
2916        pid = os.getpid()
2917        oldhandler = signal.signal(signal.SIGUSR1, record)
2918        try:
2919            killer = self.Process(target=self._killer, args=(pid,))
2920            killer.start()
2921            p = self.Process(target=time.sleep, args=(1,))
2922            p.start()
2923            p.join()
2924            self.assertTrue(got_signal[0])
2925            self.assertEqual(p.exitcode, 0)
2926            killer.join()
2927        finally:
2928            signal.signal(signal.SIGUSR1, oldhandler)
2929
2930#
2931# Test to verify handle verification, see issue 3321
2932#
2933
2934class TestInvalidHandle(unittest.TestCase):
2935
2936    @unittest.skipIf(WIN32, "skipped on Windows")
2937    def test_invalid_handles(self):
2938        conn = multiprocessing.connection.Connection(44977608)
2939        try:
2940            self.assertRaises((ValueError, OSError), conn.poll)
2941        finally:
2942            # Hack private attribute _handle to avoid printing an error
2943            # in conn.__del__
2944            conn._handle = None
2945        self.assertRaises((ValueError, OSError),
2946                          multiprocessing.connection.Connection, -1)
2947
2948#
2949# Functions used to create test cases from the base ones in this module
2950#
2951
2952def create_test_cases(Mixin, type):
2953    result = {}
2954    glob = globals()
2955    Type = type.capitalize()
2956    ALL_TYPES = {'processes', 'threads', 'manager'}
2957
2958    for name in list(glob.keys()):
2959        if name.startswith('_Test'):
2960            base = glob[name]
2961            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES)
2962            if type in base.ALLOWED_TYPES:
2963                newname = 'With' + Type + name[1:]
2964                class Temp(base, Mixin, unittest.TestCase):
2965                    pass
2966                result[newname] = Temp
2967                Temp.__name__ = Temp.__qualname__ = newname
2968                Temp.__module__ = Mixin.__module__
2969    return result
2970
2971#
2972# Create test cases
2973#
2974
2975class ProcessesMixin(object):
2976    TYPE = 'processes'
2977    Process = multiprocessing.Process
2978    connection = multiprocessing.connection
2979    current_process = staticmethod(multiprocessing.current_process)
2980    active_children = staticmethod(multiprocessing.active_children)
2981    Pool = staticmethod(multiprocessing.Pool)
2982    Pipe = staticmethod(multiprocessing.Pipe)
2983    Queue = staticmethod(multiprocessing.Queue)
2984    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
2985    Lock = staticmethod(multiprocessing.Lock)
2986    RLock = staticmethod(multiprocessing.RLock)
2987    Semaphore = staticmethod(multiprocessing.Semaphore)
2988    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
2989    Condition = staticmethod(multiprocessing.Condition)
2990    Event = staticmethod(multiprocessing.Event)
2991    Barrier = staticmethod(multiprocessing.Barrier)
2992    Value = staticmethod(multiprocessing.Value)
2993    Array = staticmethod(multiprocessing.Array)
2994    RawValue = staticmethod(multiprocessing.RawValue)
2995    RawArray = staticmethod(multiprocessing.RawArray)
2996
2997testcases_processes = create_test_cases(ProcessesMixin, type='processes')
2998globals().update(testcases_processes)
2999
3000
3001class ManagerMixin(object):
3002    TYPE = 'manager'
3003    Process = multiprocessing.Process
3004    Queue = property(operator.attrgetter('manager.Queue'))
3005    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3006    Lock = property(operator.attrgetter('manager.Lock'))
3007    RLock = property(operator.attrgetter('manager.RLock'))
3008    Semaphore = property(operator.attrgetter('manager.Semaphore'))
3009    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3010    Condition = property(operator.attrgetter('manager.Condition'))
3011    Event = property(operator.attrgetter('manager.Event'))
3012    Barrier = property(operator.attrgetter('manager.Barrier'))
3013    Value = property(operator.attrgetter('manager.Value'))
3014    Array = property(operator.attrgetter('manager.Array'))
3015    list = property(operator.attrgetter('manager.list'))
3016    dict = property(operator.attrgetter('manager.dict'))
3017    Namespace = property(operator.attrgetter('manager.Namespace'))
3018
3019    @classmethod
3020    def Pool(cls, *args, **kwds):
3021        return cls.manager.Pool(*args, **kwds)
3022
3023    @classmethod
3024    def setUpClass(cls):
3025        cls.manager = multiprocessing.Manager()
3026
3027    @classmethod
3028    def tearDownClass(cls):
3029        # only the manager process should be returned by active_children()
3030        # but this can take a bit on slow machines, so wait a few seconds
3031        # if there are other children too (see #17395)
3032        t = 0.01
3033        while len(multiprocessing.active_children()) > 1 and t < 5:
3034            time.sleep(t)
3035            t *= 2
3036        gc.collect()                       # do garbage collection
3037        if cls.manager._number_of_objects() != 0:
3038            # This is not really an error since some tests do not
3039            # ensure that all processes which hold a reference to a
3040            # managed object have been joined.
3041            print('Shared objects which still exist at manager shutdown:')
3042            print(cls.manager._debug_info())
3043        cls.manager.shutdown()
3044        cls.manager.join()
3045        cls.manager = None
3046
3047testcases_manager = create_test_cases(ManagerMixin, type='manager')
3048globals().update(testcases_manager)
3049
3050
3051class ThreadsMixin(object):
3052    TYPE = 'threads'
3053    Process = multiprocessing.dummy.Process
3054    connection = multiprocessing.dummy.connection
3055    current_process = staticmethod(multiprocessing.dummy.current_process)
3056    active_children = staticmethod(multiprocessing.dummy.active_children)
3057    Pool = staticmethod(multiprocessing.Pool)
3058    Pipe = staticmethod(multiprocessing.dummy.Pipe)
3059    Queue = staticmethod(multiprocessing.dummy.Queue)
3060    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3061    Lock = staticmethod(multiprocessing.dummy.Lock)
3062    RLock = staticmethod(multiprocessing.dummy.RLock)
3063    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3064    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3065    Condition = staticmethod(multiprocessing.dummy.Condition)
3066    Event = staticmethod(multiprocessing.dummy.Event)
3067    Barrier = staticmethod(multiprocessing.dummy.Barrier)
3068    Value = staticmethod(multiprocessing.dummy.Value)
3069    Array = staticmethod(multiprocessing.dummy.Array)
3070
3071testcases_threads = create_test_cases(ThreadsMixin, type='threads')
3072globals().update(testcases_threads)
3073
3074
3075class OtherTest(unittest.TestCase):
3076    # TODO: add more tests for deliver/answer challenge.
3077    def test_deliver_challenge_auth_failure(self):
3078        class _FakeConnection(object):
3079            def recv_bytes(self, size):
3080                return b'something bogus'
3081            def send_bytes(self, data):
3082                pass
3083        self.assertRaises(multiprocessing.AuthenticationError,
3084                          multiprocessing.connection.deliver_challenge,
3085                          _FakeConnection(), b'abc')
3086
3087    def test_answer_challenge_auth_failure(self):
3088        class _FakeConnection(object):
3089            def __init__(self):
3090                self.count = 0
3091            def recv_bytes(self, size):
3092                self.count += 1
3093                if self.count == 1:
3094                    return multiprocessing.connection.CHALLENGE
3095                elif self.count == 2:
3096                    return b'something bogus'
3097                return b''
3098            def send_bytes(self, data):
3099                pass
3100        self.assertRaises(multiprocessing.AuthenticationError,
3101                          multiprocessing.connection.answer_challenge,
3102                          _FakeConnection(), b'abc')
3103
3104#
3105# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3106#
3107
3108def initializer(ns):
3109    ns.test += 1
3110
3111class TestInitializers(unittest.TestCase):
3112    def setUp(self):
3113        self.mgr = multiprocessing.Manager()
3114        self.ns = self.mgr.Namespace()
3115        self.ns.test = 0
3116
3117    def tearDown(self):
3118        self.mgr.shutdown()
3119        self.mgr.join()
3120
3121    def test_manager_initializer(self):
3122        m = multiprocessing.managers.SyncManager()
3123        self.assertRaises(TypeError, m.start, 1)
3124        m.start(initializer, (self.ns,))
3125        self.assertEqual(self.ns.test, 1)
3126        m.shutdown()
3127        m.join()
3128
3129    def test_pool_initializer(self):
3130        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3131        p = multiprocessing.Pool(1, initializer, (self.ns,))
3132        p.close()
3133        p.join()
3134        self.assertEqual(self.ns.test, 1)
3135
3136#
3137# Issue 5155, 5313, 5331: Test process in processes
3138# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3139#
3140
3141def _this_sub_process(q):
3142    try:
3143        item = q.get(block=False)
3144    except pyqueue.Empty:
3145        pass
3146
3147def _test_process(q):
3148    queue = multiprocessing.Queue()
3149    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3150    subProc.daemon = True
3151    subProc.start()
3152    subProc.join()
3153
3154def _afunc(x):
3155    return x*x
3156
3157def pool_in_process():
3158    pool = multiprocessing.Pool(processes=4)
3159    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3160    pool.close()
3161    pool.join()
3162
3163class _file_like(object):
3164    def __init__(self, delegate):
3165        self._delegate = delegate
3166        self._pid = None
3167
3168    @property
3169    def cache(self):
3170        pid = os.getpid()
3171        # There are no race conditions since fork keeps only the running thread
3172        if pid != self._pid:
3173            self._pid = pid
3174            self._cache = []
3175        return self._cache
3176
3177    def write(self, data):
3178        self.cache.append(data)
3179
3180    def flush(self):
3181        self._delegate.write(''.join(self.cache))
3182        self._cache = []
3183
3184class TestStdinBadfiledescriptor(unittest.TestCase):
3185
3186    def test_queue_in_process(self):
3187        queue = multiprocessing.Queue()
3188        proc = multiprocessing.Process(target=_test_process, args=(queue,))
3189        proc.start()
3190        proc.join()
3191
3192    def test_pool_in_process(self):
3193        p = multiprocessing.Process(target=pool_in_process)
3194        p.start()
3195        p.join()
3196
3197    def test_flushing(self):
3198        sio = io.StringIO()
3199        flike = _file_like(sio)
3200        flike.write('foo')
3201        proc = multiprocessing.Process(target=lambda: flike.flush())
3202        flike.flush()
3203        assert sio.getvalue() == 'foo'
3204
3205
3206class TestWait(unittest.TestCase):
3207
3208    @classmethod
3209    def _child_test_wait(cls, w, slow):
3210        for i in range(10):
3211            if slow:
3212                time.sleep(random.random()*0.1)
3213            w.send((i, os.getpid()))
3214        w.close()
3215
3216    def test_wait(self, slow=False):
3217        from multiprocess.connection import wait
3218        readers = []
3219        procs = []
3220        messages = []
3221
3222        for i in range(4):
3223            r, w = multiprocessing.Pipe(duplex=False)
3224            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3225            p.daemon = True
3226            p.start()
3227            w.close()
3228            readers.append(r)
3229            procs.append(p)
3230            self.addCleanup(p.join)
3231
3232        while readers:
3233            for r in wait(readers):
3234                try:
3235                    msg = r.recv()
3236                except EOFError:
3237                    readers.remove(r)
3238                    r.close()
3239                else:
3240                    messages.append(msg)
3241
3242        messages.sort()
3243        expected = sorted((i, p.pid) for i in range(10) for p in procs)
3244        self.assertEqual(messages, expected)
3245
3246    @classmethod
3247    def _child_test_wait_socket(cls, address, slow):
3248        s = socket.socket()
3249        s.connect(address)
3250        for i in range(10):
3251            if slow:
3252                time.sleep(random.random()*0.1)
3253            s.sendall(('%s\n' % i).encode('ascii'))
3254        s.close()
3255
3256    def test_wait_socket(self, slow=False):
3257        from multiprocess.connection import wait
3258        l = socket.socket()
3259        l.bind((test.support.HOST, 0))
3260        l.listen(4)
3261        addr = l.getsockname()
3262        readers = []
3263        procs = []
3264        dic = {}
3265
3266        for i in range(4):
3267            p = multiprocessing.Process(target=self._child_test_wait_socket,
3268                                        args=(addr, slow))
3269            p.daemon = True
3270            p.start()
3271            procs.append(p)
3272            self.addCleanup(p.join)
3273
3274        for i in range(4):
3275            r, _ = l.accept()
3276            readers.append(r)
3277            dic[r] = []
3278        l.close()
3279
3280        while readers:
3281            for r in wait(readers):
3282                msg = r.recv(32)
3283                if not msg:
3284                    readers.remove(r)
3285                    r.close()
3286                else:
3287                    dic[r].append(msg)
3288
3289        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3290        for v in dic.values():
3291            self.assertEqual(b''.join(v), expected)
3292
3293    def test_wait_slow(self):
3294        self.test_wait(True)
3295
3296    def test_wait_socket_slow(self):
3297        self.test_wait_socket(True)
3298
3299    def test_wait_timeout(self):
3300        from multiprocess.connection import wait
3301
3302        expected = 5
3303        a, b = multiprocessing.Pipe()
3304
3305        start = time.time()
3306        res = wait([a, b], expected)
3307        delta = time.time() - start
3308
3309        self.assertEqual(res, [])
3310        self.assertLess(delta, expected * 2)
3311        self.assertGreater(delta, expected * 0.5)
3312
3313        b.send(None)
3314
3315        start = time.time()
3316        res = wait([a, b], 20)
3317        delta = time.time() - start
3318
3319        self.assertEqual(res, [a])
3320        self.assertLess(delta, 0.4)
3321
3322    @classmethod
3323    def signal_and_sleep(cls, sem, period):
3324        sem.release()
3325        time.sleep(period)
3326
3327    def test_wait_integer(self):
3328        from multiprocess.connection import wait
3329
3330        expected = 3
3331        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
3332        sem = multiprocessing.Semaphore(0)
3333        a, b = multiprocessing.Pipe()
3334        p = multiprocessing.Process(target=self.signal_and_sleep,
3335                                    args=(sem, expected))
3336
3337        p.start()
3338        self.assertIsInstance(p.sentinel, int)
3339        self.assertTrue(sem.acquire(timeout=20))
3340
3341        start = time.time()
3342        res = wait([a, p.sentinel, b], expected + 20)
3343        delta = time.time() - start
3344
3345        self.assertEqual(res, [p.sentinel])
3346        self.assertLess(delta, expected + 2)
3347        self.assertGreater(delta, expected - 2)
3348
3349        a.send(None)
3350
3351        start = time.time()
3352        res = wait([a, p.sentinel, b], 20)
3353        delta = time.time() - start
3354
3355        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
3356        self.assertLess(delta, 0.4)
3357
3358        b.send(None)
3359
3360        start = time.time()
3361        res = wait([a, p.sentinel, b], 20)
3362        delta = time.time() - start
3363
3364        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
3365        self.assertLess(delta, 0.4)
3366
3367        p.terminate()
3368        p.join()
3369
3370    def test_neg_timeout(self):
3371        from multiprocess.connection import wait
3372        a, b = multiprocessing.Pipe()
3373        t = time.time()
3374        res = wait([a], timeout=-1)
3375        t = time.time() - t
3376        self.assertEqual(res, [])
3377        self.assertLess(t, 1)
3378        a.close()
3379        b.close()
3380
3381#
3382# Issue 14151: Test invalid family on invalid environment
3383#
3384
3385class TestInvalidFamily(unittest.TestCase):
3386
3387    @unittest.skipIf(WIN32, "skipped on Windows")
3388    def test_invalid_family(self):
3389        with self.assertRaises(ValueError):
3390            multiprocessing.connection.Listener(r'\\.\test')
3391
3392    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3393    def test_invalid_family_win32(self):
3394        with self.assertRaises(ValueError):
3395            multiprocessing.connection.Listener('/var/test.pipe')
3396
3397#
3398# Issue 12098: check sys.flags of child matches that for parent
3399#
3400
3401class TestFlags(unittest.TestCase):
3402    @classmethod
3403    def run_in_grandchild(cls, conn):
3404        conn.send(tuple(sys.flags))
3405
3406    @classmethod
3407    def run_in_child(cls):
3408        import json
3409        r, w = multiprocessing.Pipe(duplex=False)
3410        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3411        p.start()
3412        grandchild_flags = r.recv()
3413        p.join()
3414        r.close()
3415        w.close()
3416        flags = (tuple(sys.flags), grandchild_flags)
3417        print(json.dumps(flags))
3418
3419    def _test_flags(self):
3420        import json, subprocess
3421        # start child process using unusual flags
3422        prog = ('from multiprocess.tests import TestFlags; ' +
3423                'TestFlags.run_in_child()')
3424        data = subprocess.check_output(
3425            [sys.executable, '-E', '-S', '-O', '-c', prog])
3426        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3427        self.assertEqual(child_flags, grandchild_flags)
3428
3429#
3430# Test interaction with socket timeouts - see Issue #6056
3431#
3432
3433class TestTimeouts(unittest.TestCase):
3434    @classmethod
3435    def _test_timeout(cls, child, address):
3436        time.sleep(1)
3437        child.send(123)
3438        child.close()
3439        conn = multiprocessing.connection.Client(address)
3440        conn.send(456)
3441        conn.close()
3442
3443    def test_timeout(self):
3444        old_timeout = socket.getdefaulttimeout()
3445        try:
3446            socket.setdefaulttimeout(0.1)
3447            parent, child = multiprocessing.Pipe(duplex=True)
3448            l = multiprocessing.connection.Listener(family='AF_INET')
3449            p = multiprocessing.Process(target=self._test_timeout,
3450                                        args=(child, l.address))
3451            p.start()
3452            child.close()
3453            self.assertEqual(parent.recv(), 123)
3454            parent.close()
3455            conn = l.accept()
3456            self.assertEqual(conn.recv(), 456)
3457            conn.close()
3458            l.close()
3459            p.join(10)
3460        finally:
3461            socket.setdefaulttimeout(old_timeout)
3462
3463#
3464# Test what happens with no "if __name__ == '__main__'"
3465#
3466
3467class TestNoForkBomb(unittest.TestCase):
3468    def _test_noforkbomb(self):
3469        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3470        if WIN32:
3471            rc, out, err = test.script_helper.assert_python_failure(name)
3472            self.assertEqual('', out.decode('ascii'))
3473            self.assertIn('RuntimeError', err.decode('ascii'))
3474        else:
3475            rc, out, err = test.script_helper.assert_python_ok(name)
3476            self.assertEqual('123', out.decode('ascii').rstrip())
3477            self.assertEqual('', err.decode('ascii'))
3478
3479#
3480# Issue #17555: ForkAwareThreadLock
3481#
3482
3483class TestForkAwareThreadLock(unittest.TestCase):
3484    # We recurisvely start processes.  Issue #17555 meant that the
3485    # after fork registry would get duplicate entries for the same
3486    # lock.  The size of the registry at generation n was ~2**n.
3487
3488    @classmethod
3489    def child(cls, n, conn):
3490        if n > 1:
3491            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3492            p.start()
3493            p.join()
3494        else:
3495            conn.send(len(util._afterfork_registry))
3496        conn.close()
3497
3498    def test_lock(self):
3499        r, w = multiprocessing.Pipe(False)
3500        l = util.ForkAwareThreadLock()
3501        old_size = len(util._afterfork_registry)
3502        p = multiprocessing.Process(target=self.child, args=(5, w))
3503        p.start()
3504        new_size = r.recv()
3505        p.join()
3506        self.assertLessEqual(new_size, old_size)
3507
3508#
3509# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3510#
3511
3512class TestIgnoreEINTR(unittest.TestCase):
3513
3514    @classmethod
3515    def _test_ignore(cls, conn):
3516        def handler(signum, frame):
3517            pass
3518        signal.signal(signal.SIGUSR1, handler)
3519        conn.send('ready')
3520        x = conn.recv()
3521        conn.send(x)
3522        conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
3523
3524    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3525    def _test_ignore(self):
3526        conn, child_conn = multiprocessing.Pipe()
3527        try:
3528            p = multiprocessing.Process(target=self._test_ignore,
3529                                        args=(child_conn,))
3530            p.daemon = True
3531            p.start()
3532            child_conn.close()
3533            self.assertEqual(conn.recv(), 'ready')
3534            time.sleep(0.1)
3535            os.kill(p.pid, signal.SIGUSR1)
3536            time.sleep(0.1)
3537            conn.send(1234)
3538            self.assertEqual(conn.recv(), 1234)
3539            time.sleep(0.1)
3540            os.kill(p.pid, signal.SIGUSR1)
3541            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3542            time.sleep(0.1)
3543            p.join()
3544        finally:
3545            conn.close()
3546
3547    @classmethod
3548    def _test_ignore_listener(cls, conn):
3549        def handler(signum, frame):
3550            pass
3551        signal.signal(signal.SIGUSR1, handler)
3552        l = multiprocessing.connection.Listener()
3553        conn.send(l.address)
3554        a = l.accept()
3555        a.send('welcome')
3556
3557    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3558    def test_ignore_listener(self):
3559        conn, child_conn = multiprocessing.Pipe()
3560        try:
3561            p = multiprocessing.Process(target=self._test_ignore_listener,
3562                                        args=(child_conn,))
3563            p.daemon = True
3564            p.start()
3565            child_conn.close()
3566            address = conn.recv()
3567            time.sleep(0.1)
3568            os.kill(p.pid, signal.SIGUSR1)
3569            time.sleep(0.1)
3570            client = multiprocessing.connection.Client(address)
3571            self.assertEqual(client.recv(), 'welcome')
3572            p.join()
3573        finally:
3574            conn.close()
3575
3576#
3577#
3578#
3579
3580def setUpModule():
3581    if sys.platform.startswith("linux"):
3582        try:
3583            lock = multiprocessing.RLock()
3584        except OSError:
3585            raise unittest.SkipTest("OSError raises on RLock creation, "
3586                                    "see issue 3111!")
3587    check_enough_semaphores()
3588    util.get_temp_dir()     # creates temp directory for use by all processes
3589    multiprocessing.get_logger().setLevel(LOG_LEVEL)
3590
3591
3592def tearDownModule():
3593    # pause a bit so we don't get warning about dangling threads/processes
3594    time.sleep(0.5)
3595
3596
3597if __name__ == '__main__':
3598    unittest.main()
3599