1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import queue as pyqueue
7import time
8import io
9import itertools
10import sys
11import os
12import gc
13import errno
14import signal
15import array
16import socket
17import random
18import logging
19import struct
20import operator
21import test.support
22import test.support.script_helper
23
24
25# Skip tests if _multiprocessing wasn't built.
26_multiprocessing = test.support.import_module('_multiprocessing')
27# Skip tests if sem_open implementation is broken.
28test.support.import_module('multiprocess.synchronize')
29# import threading after _multiprocessing to raise a more relevant error
30# message: "No module named _multiprocessing". _multiprocessing is not compiled
31# without thread support.
32import threading
33
34import multiprocess as multiprocessing
35import multiprocess.dummy
36import multiprocess.connection
37import multiprocess.managers
38import multiprocess.heap
39import multiprocess.pool
40
41from multiprocess import util
42
43try:
44    from multiprocess import reduction
45    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
46except ImportError:
47    HAS_REDUCTION = False
48
49try:
50    from multiprocess.sharedctypes import Value, copy
51    HAS_SHAREDCTYPES = True
52except ImportError:
53    HAS_SHAREDCTYPES = False
54
55try:
56    import msvcrt
57except ImportError:
58    msvcrt = None
59
60#
61#
62#
63
64def latin(s):
65    return s.encode('latin')
66
67#
68# Constants
69#
70
71LOG_LEVEL = util.SUBWARNING
72#LOG_LEVEL = logging.DEBUG
73
74DELTA = 0.1
75CHECK_TIMINGS = False     # making true makes tests take a lot longer
76                          # and can sometimes cause some non-serious
77                          # failures because some calls block a bit
78                          # longer than expected
79if CHECK_TIMINGS:
80    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
81else:
82    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
83
84HAVE_GETVALUE = not getattr(_multiprocessing,
85                            'HAVE_BROKEN_SEM_GETVALUE', False)
86
87WIN32 = (sys.platform == "win32")
88
89from multiprocess.connection import wait
90
91def wait_for_handle(handle, timeout):
92    if timeout is not None and timeout < 0.0:
93        timeout = None
94    return wait([handle], timeout)
95
96try:
97    MAXFD = os.sysconf("SC_OPEN_MAX")
98except:
99    MAXFD = 256
100
101# To speed up tests when using the forkserver, we can preload these:
102PRELOAD = ['__main__', 'test_multiprocessing_forkserver']
103
104#
105# Some tests require ctypes
106#
107
108try:
109    from ctypes import Structure, c_int, c_double
110except ImportError:
111    Structure = object
112    c_int = c_double = None
113
114
115def check_enough_semaphores():
116    """Check that the system supports enough semaphores to run the test."""
117    # minimum number of semaphores available according to POSIX
118    nsems_min = 256
119    try:
120        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
121    except (AttributeError, ValueError):
122        # sysconf not available or setting not available
123        return
124    if nsems == -1 or nsems >= nsems_min:
125        return
126    raise unittest.SkipTest("The OS doesn't support enough semaphores "
127                            "to run the test (required: %d)." % nsems_min)
128
129
130#
131# Creates a wrapper for a function which records the time it takes to finish
132#
133
134class TimingWrapper(object):
135
136    def __init__(self, func):
137        self.func = func
138        self.elapsed = None
139
140    def __call__(self, *args, **kwds):
141        t = time.time()
142        try:
143            return self.func(*args, **kwds)
144        finally:
145            self.elapsed = time.time() - t
146
147#
148# Base class for test cases
149#
150
151class BaseTestCase(object):
152
153    ALLOWED_TYPES = ('processes', 'manager', 'threads')
154
155    def assertTimingAlmostEqual(self, a, b):
156        if CHECK_TIMINGS:
157            self.assertAlmostEqual(a, b, 1)
158
159    def assertReturnsIfImplemented(self, value, func, *args):
160        try:
161            res = func(*args)
162        except NotImplementedError:
163            pass
164        else:
165            return self.assertEqual(value, res)
166
167    # For the sanity of Windows users, rather than crashing or freezing in
168    # multiple ways.
169    def __reduce__(self, *args):
170        raise NotImplementedError("shouldn't try to pickle a test case")
171
172    __reduce_ex__ = __reduce__
173
174#
175# Return the value of a semaphore
176#
177
178def get_value(self):
179    try:
180        return self.get_value()
181    except AttributeError:
182        try:
183            return self._Semaphore__value
184        except AttributeError:
185            try:
186                return self._value
187            except AttributeError:
188                raise NotImplementedError
189
190#
191# Testcases
192#
193
194class _TestProcess(BaseTestCase):
195
196    ALLOWED_TYPES = ('processes', 'threads')
197
198    def test_current(self):
199        if self.TYPE == 'threads':
200            self.skipTest('test not appropriate for {}'.format(self.TYPE))
201
202        current = self.current_process()
203        authkey = current.authkey
204
205        self.assertTrue(current.is_alive())
206        self.assertTrue(not current.daemon)
207        self.assertIsInstance(authkey, bytes)
208        self.assertTrue(len(authkey) > 0)
209        self.assertEqual(current.ident, os.getpid())
210        self.assertEqual(current.exitcode, None)
211
212    def test_daemon_argument(self):
213        if self.TYPE == "threads":
214            self.skipTest('test not appropriate for {}'.format(self.TYPE))
215
216        # By default uses the current process's daemon flag.
217        proc0 = self.Process(target=self._test)
218        self.assertEqual(proc0.daemon, self.current_process().daemon)
219        proc1 = self.Process(target=self._test, daemon=True)
220        self.assertTrue(proc1.daemon)
221        proc2 = self.Process(target=self._test, daemon=False)
222        self.assertFalse(proc2.daemon)
223
224    @classmethod
225    def _test(cls, q, *args, **kwds):
226        current = cls.current_process()
227        q.put(args)
228        q.put(kwds)
229        q.put(current.name)
230        if cls.TYPE != 'threads':
231            q.put(bytes(current.authkey))
232            q.put(current.pid)
233
234    def test_process(self):
235        q = self.Queue(1)
236        e = self.Event()
237        args = (q, 1, 2)
238        kwargs = {'hello':23, 'bye':2.54}
239        name = 'SomeProcess'
240        p = self.Process(
241            target=self._test, args=args, kwargs=kwargs, name=name
242            )
243        p.daemon = True
244        current = self.current_process()
245
246        if self.TYPE != 'threads':
247            self.assertEqual(p.authkey, current.authkey)
248        self.assertEqual(p.is_alive(), False)
249        self.assertEqual(p.daemon, True)
250        self.assertNotIn(p, self.active_children())
251        self.assertTrue(type(self.active_children()) is list)
252        self.assertEqual(p.exitcode, None)
253
254        p.start()
255
256        self.assertEqual(p.exitcode, None)
257        self.assertEqual(p.is_alive(), True)
258        self.assertIn(p, self.active_children())
259
260        self.assertEqual(q.get(), args[1:])
261        self.assertEqual(q.get(), kwargs)
262        self.assertEqual(q.get(), p.name)
263        if self.TYPE != 'threads':
264            self.assertEqual(q.get(), current.authkey)
265            self.assertEqual(q.get(), p.pid)
266
267        p.join()
268
269        self.assertEqual(p.exitcode, 0)
270        self.assertEqual(p.is_alive(), False)
271        self.assertNotIn(p, self.active_children())
272
273    @classmethod
274    def _test_terminate(cls):
275        time.sleep(100)
276
277    def test_terminate(self):
278        if self.TYPE == 'threads':
279            self.skipTest('test not appropriate for {}'.format(self.TYPE))
280
281        p = self.Process(target=self._test_terminate)
282        p.daemon = True
283        p.start()
284
285        self.assertEqual(p.is_alive(), True)
286        self.assertIn(p, self.active_children())
287        self.assertEqual(p.exitcode, None)
288
289        join = TimingWrapper(p.join)
290
291        self.assertEqual(join(0), None)
292        self.assertTimingAlmostEqual(join.elapsed, 0.0)
293        self.assertEqual(p.is_alive(), True)
294
295        self.assertEqual(join(-1), None)
296        self.assertTimingAlmostEqual(join.elapsed, 0.0)
297        self.assertEqual(p.is_alive(), True)
298
299        # XXX maybe terminating too soon causes the problems on Gentoo...
300        time.sleep(1)
301
302        p.terminate()
303
304        if hasattr(signal, 'alarm'):
305            # On the Gentoo buildbot waitpid() often seems to block forever.
306            # We use alarm() to interrupt it if it blocks for too long.
307            def handler(*args):
308                raise RuntimeError('join took too long: %s' % p)
309            old_handler = signal.signal(signal.SIGALRM, handler)
310            try:
311                signal.alarm(10)
312                self.assertEqual(join(), None)
313            finally:
314                signal.alarm(0)
315                signal.signal(signal.SIGALRM, old_handler)
316        else:
317            self.assertEqual(join(), None)
318
319        self.assertTimingAlmostEqual(join.elapsed, 0.0)
320
321        self.assertEqual(p.is_alive(), False)
322        self.assertNotIn(p, self.active_children())
323
324        p.join()
325
326        # XXX sometimes get p.exitcode == 0 on Windows ...
327        #self.assertEqual(p.exitcode, -signal.SIGTERM)
328
329    def test_cpu_count(self):
330        try:
331            cpus = multiprocessing.cpu_count()
332        except NotImplementedError:
333            cpus = 1
334        self.assertTrue(type(cpus) is int)
335        self.assertTrue(cpus >= 1)
336
337    def test_active_children(self):
338        self.assertEqual(type(self.active_children()), list)
339
340        p = self.Process(target=time.sleep, args=(DELTA,))
341        self.assertNotIn(p, self.active_children())
342
343        p.daemon = True
344        p.start()
345        self.assertIn(p, self.active_children())
346
347        p.join()
348        self.assertNotIn(p, self.active_children())
349
350    @classmethod
351    def _test_recursion(cls, wconn, id):
352        wconn.send(id)
353        if len(id) < 2:
354            for i in range(2):
355                p = cls.Process(
356                    target=cls._test_recursion, args=(wconn, id+[i])
357                    )
358                p.start()
359                p.join()
360
361    @unittest.skipIf(True, "fails with is_dill(obj, child=True)")
362    def test_recursion(self):
363        rconn, wconn = self.Pipe(duplex=False)
364        self._test_recursion(wconn, [])
365
366        time.sleep(DELTA)
367        result = []
368        while rconn.poll():
369            result.append(rconn.recv())
370
371        expected = [
372            [],
373              [0],
374                [0, 0],
375                [0, 1],
376              [1],
377                [1, 0],
378                [1, 1]
379            ]
380        self.assertEqual(result, expected)
381
382    @classmethod
383    def _test_sentinel(cls, event):
384        event.wait(10.0)
385
386    def test_sentinel(self):
387        if self.TYPE == "threads":
388            self.skipTest('test not appropriate for {}'.format(self.TYPE))
389        event = self.Event()
390        p = self.Process(target=self._test_sentinel, args=(event,))
391        with self.assertRaises(ValueError):
392            p.sentinel
393        p.start()
394        self.addCleanup(p.join)
395        sentinel = p.sentinel
396        self.assertIsInstance(sentinel, int)
397        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
398        event.set()
399        p.join()
400        self.assertTrue(wait_for_handle(sentinel, timeout=1))
401
402#
403#
404#
405
406class _UpperCaser(multiprocessing.Process):
407
408    def __init__(self):
409        multiprocessing.Process.__init__(self)
410        self.child_conn, self.parent_conn = multiprocessing.Pipe()
411
412    def run(self):
413        self.parent_conn.close()
414        for s in iter(self.child_conn.recv, None):
415            self.child_conn.send(s.upper())
416        self.child_conn.close()
417
418    def submit(self, s):
419        assert type(s) is str
420        self.parent_conn.send(s)
421        return self.parent_conn.recv()
422
423    def stop(self):
424        self.parent_conn.send(None)
425        self.parent_conn.close()
426        self.child_conn.close()
427
428class _TestSubclassingProcess(BaseTestCase):
429
430    ALLOWED_TYPES = ('processes',)
431
432    def test_subclassing(self):
433        uppercaser = _UpperCaser()
434        uppercaser.daemon = True
435        uppercaser.start()
436        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
437        self.assertEqual(uppercaser.submit('world'), 'WORLD')
438        uppercaser.stop()
439        uppercaser.join()
440
441    def test_stderr_flush(self):
442        # sys.stderr is flushed at process shutdown (issue #13812)
443        if self.TYPE == "threads":
444            self.skipTest('test not appropriate for {}'.format(self.TYPE))
445
446        testfn = test.support.TESTFN
447        self.addCleanup(test.support.unlink, testfn)
448        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
449        proc.start()
450        proc.join()
451        with open(testfn, 'r') as f:
452            err = f.read()
453            # The whole traceback was printed
454            self.assertIn("ZeroDivisionError", err)
455            self.assertIn("__init__.py", err)
456            self.assertIn("1/0 # MARKER", err)
457
458    @classmethod
459    def _test_stderr_flush(cls, testfn):
460        sys.stderr = open(testfn, 'w')
461        1/0 # MARKER
462
463
464    @classmethod
465    def _test_sys_exit(cls, reason, testfn):
466        sys.stderr = open(testfn, 'w')
467        sys.exit(reason)
468
469    def test_sys_exit(self):
470        # See Issue 13854
471        if self.TYPE == 'threads':
472            self.skipTest('test not appropriate for {}'.format(self.TYPE))
473
474        testfn = test.support.TESTFN
475        self.addCleanup(test.support.unlink, testfn)
476
477        for reason, code in (([1, 2, 3], 1), ('ignore this', 1)):
478            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
479            p.daemon = True
480            p.start()
481            p.join(5)
482            self.assertEqual(p.exitcode, code)
483
484            with open(testfn, 'r') as f:
485                self.assertEqual(f.read().rstrip(), str(reason))
486
487        for reason in (True, False, 8):
488            p = self.Process(target=sys.exit, args=(reason,))
489            p.daemon = True
490            p.start()
491            p.join(5)
492            self.assertEqual(p.exitcode, reason)
493
494#
495#
496#
497
498def queue_empty(q):
499    if hasattr(q, 'empty'):
500        return q.empty()
501    else:
502        return q.qsize() == 0
503
504def queue_full(q, maxsize):
505    if hasattr(q, 'full'):
506        return q.full()
507    else:
508        return q.qsize() == maxsize
509
510
511class _TestQueue(BaseTestCase):
512
513
514    @classmethod
515    def _test_put(cls, queue, child_can_start, parent_can_continue):
516        child_can_start.wait()
517        for i in range(6):
518            queue.get()
519        parent_can_continue.set()
520
521    def test_put(self):
522        MAXSIZE = 6
523        queue = self.Queue(maxsize=MAXSIZE)
524        child_can_start = self.Event()
525        parent_can_continue = self.Event()
526
527        proc = self.Process(
528            target=self._test_put,
529            args=(queue, child_can_start, parent_can_continue)
530            )
531        proc.daemon = True
532        proc.start()
533
534        self.assertEqual(queue_empty(queue), True)
535        self.assertEqual(queue_full(queue, MAXSIZE), False)
536
537        queue.put(1)
538        queue.put(2, True)
539        queue.put(3, True, None)
540        queue.put(4, False)
541        queue.put(5, False, None)
542        queue.put_nowait(6)
543
544        # the values may be in buffer but not yet in pipe so sleep a bit
545        time.sleep(DELTA)
546
547        self.assertEqual(queue_empty(queue), False)
548        self.assertEqual(queue_full(queue, MAXSIZE), True)
549
550        put = TimingWrapper(queue.put)
551        put_nowait = TimingWrapper(queue.put_nowait)
552
553        self.assertRaises(pyqueue.Full, put, 7, False)
554        self.assertTimingAlmostEqual(put.elapsed, 0)
555
556        self.assertRaises(pyqueue.Full, put, 7, False, None)
557        self.assertTimingAlmostEqual(put.elapsed, 0)
558
559        self.assertRaises(pyqueue.Full, put_nowait, 7)
560        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
561
562        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
563        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
564
565        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
566        self.assertTimingAlmostEqual(put.elapsed, 0)
567
568        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
569        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
570
571        child_can_start.set()
572        parent_can_continue.wait()
573
574        self.assertEqual(queue_empty(queue), True)
575        self.assertEqual(queue_full(queue, MAXSIZE), False)
576
577        proc.join()
578
579    @classmethod
580    def _test_get(cls, queue, child_can_start, parent_can_continue):
581        child_can_start.wait()
582        #queue.put(1)
583        queue.put(2)
584        queue.put(3)
585        queue.put(4)
586        queue.put(5)
587        parent_can_continue.set()
588
589    def test_get(self):
590        queue = self.Queue()
591        child_can_start = self.Event()
592        parent_can_continue = self.Event()
593
594        proc = self.Process(
595            target=self._test_get,
596            args=(queue, child_can_start, parent_can_continue)
597            )
598        proc.daemon = True
599        proc.start()
600
601        self.assertEqual(queue_empty(queue), True)
602
603        child_can_start.set()
604        parent_can_continue.wait()
605
606        time.sleep(DELTA)
607        self.assertEqual(queue_empty(queue), False)
608
609        # Hangs unexpectedly, remove for now
610        #self.assertEqual(queue.get(), 1)
611        self.assertEqual(queue.get(True, None), 2)
612        self.assertEqual(queue.get(True), 3)
613        self.assertEqual(queue.get(timeout=1), 4)
614        self.assertEqual(queue.get_nowait(), 5)
615
616        self.assertEqual(queue_empty(queue), True)
617
618        get = TimingWrapper(queue.get)
619        get_nowait = TimingWrapper(queue.get_nowait)
620
621        self.assertRaises(pyqueue.Empty, get, False)
622        self.assertTimingAlmostEqual(get.elapsed, 0)
623
624        self.assertRaises(pyqueue.Empty, get, False, None)
625        self.assertTimingAlmostEqual(get.elapsed, 0)
626
627        self.assertRaises(pyqueue.Empty, get_nowait)
628        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
629
630        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
631        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
632
633        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
634        self.assertTimingAlmostEqual(get.elapsed, 0)
635
636        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
637        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
638
639        proc.join()
640
641    @classmethod
642    def _test_fork(cls, queue):
643        for i in range(10, 20):
644            queue.put(i)
645        # note that at this point the items may only be buffered, so the
646        # process cannot shutdown until the feeder thread has finished
647        # pushing items onto the pipe.
648
649    def test_fork(self):
650        # Old versions of Queue would fail to create a new feeder
651        # thread for a forked process if the original process had its
652        # own feeder thread.  This test checks that this no longer
653        # happens.
654
655        queue = self.Queue()
656
657        # put items on queue so that main process starts a feeder thread
658        for i in range(10):
659            queue.put(i)
660
661        # wait to make sure thread starts before we fork a new process
662        time.sleep(DELTA)
663
664        # fork process
665        p = self.Process(target=self._test_fork, args=(queue,))
666        p.daemon = True
667        p.start()
668
669        # check that all expected items are in the queue
670        for i in range(20):
671            self.assertEqual(queue.get(), i)
672        self.assertRaises(pyqueue.Empty, queue.get, False)
673
674        p.join()
675
676    def test_qsize(self):
677        q = self.Queue()
678        try:
679            self.assertEqual(q.qsize(), 0)
680        except NotImplementedError:
681            self.skipTest('qsize method not implemented')
682        q.put(1)
683        self.assertEqual(q.qsize(), 1)
684        q.put(5)
685        self.assertEqual(q.qsize(), 2)
686        q.get()
687        self.assertEqual(q.qsize(), 1)
688        q.get()
689        self.assertEqual(q.qsize(), 0)
690
691    @classmethod
692    def _test_task_done(cls, q):
693        for obj in iter(q.get, None):
694            time.sleep(DELTA)
695            q.task_done()
696
697    def test_task_done(self):
698        queue = self.JoinableQueue()
699
700        workers = [self.Process(target=self._test_task_done, args=(queue,))
701                   for i in range(4)]
702
703        for p in workers:
704            p.daemon = True
705            p.start()
706
707        for i in range(10):
708            queue.put(i)
709
710        queue.join()
711
712        for p in workers:
713            queue.put(None)
714
715        for p in workers:
716            p.join()
717
718    def test_no_import_lock_contention(self):
719        with test.support.temp_cwd():
720            module_name = 'imported_by_an_imported_module'
721            with open(module_name + '.py', 'w') as f:
722                f.write("""if 1:
723                    import multiprocess as multiprocessing
724
725                    q = multiprocessing.Queue()
726                    q.put('knock knock')
727                    q.get(timeout=3)
728                    q.close()
729                    del q
730                """)
731
732            with test.support.DirsOnSysPath(os.getcwd()):
733                try:
734                    __import__(module_name)
735                except pyqueue.Empty:
736                    self.fail("Probable regression on import lock contention;"
737                              " see Issue #22853")
738
739    def test_timeout(self):
740        q = multiprocessing.Queue()
741        start = time.time()
742        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
743        delta = time.time() - start
744        # Tolerate a delta of 30 ms because of the bad clock resolution on
745        # Windows (usually 15.6 ms)
746        self.assertGreaterEqual(delta, 0.170)
747
748#
749#
750#
751
752class _TestLock(BaseTestCase):
753
754    def test_lock(self):
755        lock = self.Lock()
756        self.assertEqual(lock.acquire(), True)
757        self.assertEqual(lock.acquire(False), False)
758        self.assertEqual(lock.release(), None)
759        self.assertRaises((ValueError, threading.ThreadError), lock.release)
760
761    def test_rlock(self):
762        lock = self.RLock()
763        self.assertEqual(lock.acquire(), True)
764        self.assertEqual(lock.acquire(), True)
765        self.assertEqual(lock.acquire(), True)
766        self.assertEqual(lock.release(), None)
767        self.assertEqual(lock.release(), None)
768        self.assertEqual(lock.release(), None)
769        self.assertRaises((AssertionError, RuntimeError), lock.release)
770
771    def test_lock_context(self):
772        with self.Lock():
773            pass
774
775
776class _TestSemaphore(BaseTestCase):
777
778    def _test_semaphore(self, sem):
779        self.assertReturnsIfImplemented(2, get_value, sem)
780        self.assertEqual(sem.acquire(), True)
781        self.assertReturnsIfImplemented(1, get_value, sem)
782        self.assertEqual(sem.acquire(), True)
783        self.assertReturnsIfImplemented(0, get_value, sem)
784        self.assertEqual(sem.acquire(False), False)
785        self.assertReturnsIfImplemented(0, get_value, sem)
786        self.assertEqual(sem.release(), None)
787        self.assertReturnsIfImplemented(1, get_value, sem)
788        self.assertEqual(sem.release(), None)
789        self.assertReturnsIfImplemented(2, get_value, sem)
790
791    def test_semaphore(self):
792        sem = self.Semaphore(2)
793        self._test_semaphore(sem)
794        self.assertEqual(sem.release(), None)
795        self.assertReturnsIfImplemented(3, get_value, sem)
796        self.assertEqual(sem.release(), None)
797        self.assertReturnsIfImplemented(4, get_value, sem)
798
799    def test_bounded_semaphore(self):
800        sem = self.BoundedSemaphore(2)
801        self._test_semaphore(sem)
802        # Currently fails on OS/X
803        #if HAVE_GETVALUE:
804        #    self.assertRaises(ValueError, sem.release)
805        #    self.assertReturnsIfImplemented(2, get_value, sem)
806
807    def test_timeout(self):
808        if self.TYPE != 'processes':
809            self.skipTest('test not appropriate for {}'.format(self.TYPE))
810
811        sem = self.Semaphore(0)
812        acquire = TimingWrapper(sem.acquire)
813
814        self.assertEqual(acquire(False), False)
815        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
816
817        self.assertEqual(acquire(False, None), False)
818        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
819
820        self.assertEqual(acquire(False, TIMEOUT1), False)
821        self.assertTimingAlmostEqual(acquire.elapsed, 0)
822
823        self.assertEqual(acquire(True, TIMEOUT2), False)
824        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
825
826        self.assertEqual(acquire(timeout=TIMEOUT3), False)
827        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
828
829
830class _TestCondition(BaseTestCase):
831
832    @classmethod
833    def f(cls, cond, sleeping, woken, timeout=None):
834        cond.acquire()
835        sleeping.release()
836        cond.wait(timeout)
837        woken.release()
838        cond.release()
839
840    def check_invariant(self, cond):
841        # this is only supposed to succeed when there are no sleepers
842        if self.TYPE == 'processes':
843            try:
844                sleepers = (cond._sleeping_count.get_value() -
845                            cond._woken_count.get_value())
846                self.assertEqual(sleepers, 0)
847                self.assertEqual(cond._wait_semaphore.get_value(), 0)
848            except NotImplementedError:
849                pass
850
851    def test_notify(self):
852        cond = self.Condition()
853        sleeping = self.Semaphore(0)
854        woken = self.Semaphore(0)
855
856        p = self.Process(target=self.f, args=(cond, sleeping, woken))
857        p.daemon = True
858        p.start()
859
860        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
861        p.daemon = True
862        p.start()
863
864        # wait for both children to start sleeping
865        sleeping.acquire()
866        sleeping.acquire()
867
868        # check no process/thread has woken up
869        time.sleep(DELTA)
870        self.assertReturnsIfImplemented(0, get_value, woken)
871
872        # wake up one process/thread
873        cond.acquire()
874        cond.notify()
875        cond.release()
876
877        # check one process/thread has woken up
878        time.sleep(DELTA)
879        self.assertReturnsIfImplemented(1, get_value, woken)
880
881        # wake up another
882        cond.acquire()
883        cond.notify()
884        cond.release()
885
886        # check other has woken up
887        time.sleep(DELTA)
888        self.assertReturnsIfImplemented(2, get_value, woken)
889
890        # check state is not mucked up
891        self.check_invariant(cond)
892        p.join()
893
894    def test_notify_all(self):
895        cond = self.Condition()
896        sleeping = self.Semaphore(0)
897        woken = self.Semaphore(0)
898
899        # start some threads/processes which will timeout
900        for i in range(3):
901            p = self.Process(target=self.f,
902                             args=(cond, sleeping, woken, TIMEOUT1))
903            p.daemon = True
904            p.start()
905
906            t = threading.Thread(target=self.f,
907                                 args=(cond, sleeping, woken, TIMEOUT1))
908            t.daemon = True
909            t.start()
910
911        # wait for them all to sleep
912        for i in range(6):
913            sleeping.acquire()
914
915        # check they have all timed out
916        for i in range(6):
917            woken.acquire()
918        self.assertReturnsIfImplemented(0, get_value, woken)
919
920        # check state is not mucked up
921        self.check_invariant(cond)
922
923        # start some more threads/processes
924        for i in range(3):
925            p = self.Process(target=self.f, args=(cond, sleeping, woken))
926            p.daemon = True
927            p.start()
928
929            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
930            t.daemon = True
931            t.start()
932
933        # wait for them to all sleep
934        for i in range(6):
935            sleeping.acquire()
936
937        # check no process/thread has woken up
938        time.sleep(DELTA)
939        self.assertReturnsIfImplemented(0, get_value, woken)
940
941        # wake them all up
942        cond.acquire()
943        cond.notify_all()
944        cond.release()
945
946        # check they have all woken
947        for i in range(10):
948            try:
949                if get_value(woken) == 6:
950                    break
951            except NotImplementedError:
952                break
953            time.sleep(DELTA)
954        self.assertReturnsIfImplemented(6, get_value, woken)
955
956        # check state is not mucked up
957        self.check_invariant(cond)
958
959    def test_timeout(self):
960        cond = self.Condition()
961        wait = TimingWrapper(cond.wait)
962        cond.acquire()
963        res = wait(TIMEOUT1)
964        cond.release()
965        self.assertEqual(res, False)
966        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
967
968    @classmethod
969    def _test_waitfor_f(cls, cond, state):
970        with cond:
971            state.value = 0
972            cond.notify()
973            result = cond.wait_for(lambda : state.value==4)
974            if not result or state.value != 4:
975                sys.exit(1)
976
977    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
978    def test_waitfor(self):
979        # based on test in test/lock_tests.py
980        cond = self.Condition()
981        state = self.Value('i', -1)
982
983        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
984        p.daemon = True
985        p.start()
986
987        with cond:
988            result = cond.wait_for(lambda : state.value==0)
989            self.assertTrue(result)
990            self.assertEqual(state.value, 0)
991
992        for i in range(4):
993            time.sleep(0.01)
994            with cond:
995                state.value += 1
996                cond.notify()
997
998        p.join(5)
999        self.assertFalse(p.is_alive())
1000        self.assertEqual(p.exitcode, 0)
1001
1002    @classmethod
1003    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1004        sem.release()
1005        with cond:
1006            expected = 0.1
1007            dt = time.time()
1008            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1009            dt = time.time() - dt
1010            # borrow logic in assertTimeout() from test/lock_tests.py
1011            if not result and expected * 0.6 < dt < expected * 10.0:
1012                success.value = True
1013
1014    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1015    def test_waitfor_timeout(self):
1016        # based on test in test/lock_tests.py
1017        cond = self.Condition()
1018        state = self.Value('i', 0)
1019        success = self.Value('i', False)
1020        sem = self.Semaphore(0)
1021
1022        p = self.Process(target=self._test_waitfor_timeout_f,
1023                         args=(cond, state, success, sem))
1024        p.daemon = True
1025        p.start()
1026        self.assertTrue(sem.acquire(timeout=10))
1027
1028        # Only increment 3 times, so state == 4 is never reached.
1029        for i in range(3):
1030            time.sleep(0.01)
1031            with cond:
1032                state.value += 1
1033                cond.notify()
1034
1035        p.join(5)
1036        self.assertTrue(success.value)
1037
1038    @classmethod
1039    def _test_wait_result(cls, c, pid):
1040        with c:
1041            c.notify()
1042        time.sleep(1)
1043        if pid is not None:
1044            os.kill(pid, signal.SIGINT)
1045
1046    def test_wait_result(self):
1047        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1048            pid = os.getpid()
1049        else:
1050            pid = None
1051
1052        c = self.Condition()
1053        with c:
1054            self.assertFalse(c.wait(0))
1055            self.assertFalse(c.wait(0.1))
1056
1057            p = self.Process(target=self._test_wait_result, args=(c, pid))
1058            p.start()
1059
1060            self.assertTrue(c.wait(10))
1061            if pid is not None:
1062                self.assertRaises(KeyboardInterrupt, c.wait, 10)
1063
1064            p.join()
1065
1066
1067class _TestEvent(BaseTestCase):
1068
1069    @classmethod
1070    def _test_event(cls, event):
1071        time.sleep(TIMEOUT2)
1072        event.set()
1073
1074    def test_event(self):
1075        event = self.Event()
1076        wait = TimingWrapper(event.wait)
1077
1078        # Removed temporarily, due to API shear, this does not
1079        # work with threading._Event objects. is_set == isSet
1080        self.assertEqual(event.is_set(), False)
1081
1082        # Removed, threading.Event.wait() will return the value of the __flag
1083        # instead of None. API Shear with the semaphore backed mp.Event
1084        self.assertEqual(wait(0.0), False)
1085        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1086        self.assertEqual(wait(TIMEOUT1), False)
1087        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1088
1089        event.set()
1090
1091        # See note above on the API differences
1092        self.assertEqual(event.is_set(), True)
1093        self.assertEqual(wait(), True)
1094        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1095        self.assertEqual(wait(TIMEOUT1), True)
1096        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1097        # self.assertEqual(event.is_set(), True)
1098
1099        event.clear()
1100
1101        #self.assertEqual(event.is_set(), False)
1102
1103        p = self.Process(target=self._test_event, args=(event,))
1104        p.daemon = True
1105        p.start()
1106        self.assertEqual(wait(), True)
1107
1108#
1109# Tests for Barrier - adapted from tests in test/lock_tests.py
1110#
1111
1112# Many of the tests for threading.Barrier use a list as an atomic
1113# counter: a value is appended to increment the counter, and the
1114# length of the list gives the value.  We use the class DummyList
1115# for the same purpose.
1116
1117class _DummyList(object):
1118
1119    def __init__(self):
1120        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1121        lock = multiprocessing.Lock()
1122        self.__setstate__((wrapper, lock))
1123        self._lengthbuf[0] = 0
1124
1125    def __setstate__(self, state):
1126        (self._wrapper, self._lock) = state
1127        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1128
1129    def __getstate__(self):
1130        return (self._wrapper, self._lock)
1131
1132    def append(self, _):
1133        with self._lock:
1134            self._lengthbuf[0] += 1
1135
1136    def __len__(self):
1137        with self._lock:
1138            return self._lengthbuf[0]
1139
1140def _wait():
1141    # A crude wait/yield function not relying on synchronization primitives.
1142    time.sleep(0.01)
1143
1144
1145class Bunch(object):
1146    """
1147    A bunch of threads.
1148    """
1149    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1150        """
1151        Construct a bunch of `n` threads running the same function `f`.
1152        If `wait_before_exit` is True, the threads won't terminate until
1153        do_finish() is called.
1154        """
1155        self.f = f
1156        self.args = args
1157        self.n = n
1158        self.started = namespace.DummyList()
1159        self.finished = namespace.DummyList()
1160        self._can_exit = namespace.Event()
1161        if not wait_before_exit:
1162            self._can_exit.set()
1163        for i in range(n):
1164            p = namespace.Process(target=self.task)
1165            p.daemon = True
1166            p.start()
1167
1168    def task(self):
1169        pid = os.getpid()
1170        self.started.append(pid)
1171        try:
1172            self.f(*self.args)
1173        finally:
1174            self.finished.append(pid)
1175            self._can_exit.wait(30)
1176            assert self._can_exit.is_set()
1177
1178    def wait_for_started(self):
1179        while len(self.started) < self.n:
1180            _wait()
1181
1182    def wait_for_finished(self):
1183        while len(self.finished) < self.n:
1184            _wait()
1185
1186    def do_finish(self):
1187        self._can_exit.set()
1188
1189
1190class AppendTrue(object):
1191    def __init__(self, obj):
1192        self.obj = obj
1193    def __call__(self):
1194        self.obj.append(True)
1195
1196
1197class _TestBarrier(BaseTestCase):
1198    """
1199    Tests for Barrier objects.
1200    """
1201    N = 5
1202    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1203
1204    def setUp(self):
1205        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1206
1207    def tearDown(self):
1208        self.barrier.abort()
1209        self.barrier = None
1210
1211    def DummyList(self):
1212        if self.TYPE == 'threads':
1213            return []
1214        elif self.TYPE == 'manager':
1215            return self.manager.list()
1216        else:
1217            return _DummyList()
1218
1219    def run_threads(self, f, args):
1220        b = Bunch(self, f, args, self.N-1)
1221        f(*args)
1222        b.wait_for_finished()
1223
1224    @classmethod
1225    def multipass(cls, barrier, results, n):
1226        m = barrier.parties
1227        assert m == cls.N
1228        for i in range(n):
1229            results[0].append(True)
1230            assert len(results[1]) == i * m
1231            barrier.wait()
1232            results[1].append(True)
1233            assert len(results[0]) == (i + 1) * m
1234            barrier.wait()
1235        try:
1236            assert barrier.n_waiting == 0
1237        except NotImplementedError:
1238            pass
1239        assert not barrier.broken
1240
1241    def test_barrier(self, passes=1):
1242        """
1243        Test that a barrier is passed in lockstep
1244        """
1245        results = [self.DummyList(), self.DummyList()]
1246        self.run_threads(self.multipass, (self.barrier, results, passes))
1247
1248    def test_barrier_10(self):
1249        """
1250        Test that a barrier works for 10 consecutive runs
1251        """
1252        return self.test_barrier(10)
1253
1254    @classmethod
1255    def _test_wait_return_f(cls, barrier, queue):
1256        res = barrier.wait()
1257        queue.put(res)
1258
1259    def test_wait_return(self):
1260        """
1261        test the return value from barrier.wait
1262        """
1263        queue = self.Queue()
1264        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1265        results = [queue.get() for i in range(self.N)]
1266        self.assertEqual(results.count(0), 1)
1267
1268    @classmethod
1269    def _test_action_f(cls, barrier, results):
1270        barrier.wait()
1271        if len(results) != 1:
1272            raise RuntimeError
1273
1274    def test_action(self):
1275        """
1276        Test the 'action' callback
1277        """
1278        results = self.DummyList()
1279        barrier = self.Barrier(self.N, action=AppendTrue(results))
1280        self.run_threads(self._test_action_f, (barrier, results))
1281        self.assertEqual(len(results), 1)
1282
1283    @classmethod
1284    def _test_abort_f(cls, barrier, results1, results2):
1285        try:
1286            i = barrier.wait()
1287            if i == cls.N//2:
1288                raise RuntimeError
1289            barrier.wait()
1290            results1.append(True)
1291        except threading.BrokenBarrierError:
1292            results2.append(True)
1293        except RuntimeError:
1294            barrier.abort()
1295
1296    def test_abort(self):
1297        """
1298        Test that an abort will put the barrier in a broken state
1299        """
1300        results1 = self.DummyList()
1301        results2 = self.DummyList()
1302        self.run_threads(self._test_abort_f,
1303                         (self.barrier, results1, results2))
1304        self.assertEqual(len(results1), 0)
1305        self.assertEqual(len(results2), self.N-1)
1306        self.assertTrue(self.barrier.broken)
1307
1308    @classmethod
1309    def _test_reset_f(cls, barrier, results1, results2, results3):
1310        i = barrier.wait()
1311        if i == cls.N//2:
1312            # Wait until the other threads are all in the barrier.
1313            while barrier.n_waiting < cls.N-1:
1314                time.sleep(0.001)
1315            barrier.reset()
1316        else:
1317            try:
1318                barrier.wait()
1319                results1.append(True)
1320            except threading.BrokenBarrierError:
1321                results2.append(True)
1322        # Now, pass the barrier again
1323        barrier.wait()
1324        results3.append(True)
1325
1326    def test_reset(self):
1327        """
1328        Test that a 'reset' on a barrier frees the waiting threads
1329        """
1330        results1 = self.DummyList()
1331        results2 = self.DummyList()
1332        results3 = self.DummyList()
1333        self.run_threads(self._test_reset_f,
1334                         (self.barrier, results1, results2, results3))
1335        self.assertEqual(len(results1), 0)
1336        self.assertEqual(len(results2), self.N-1)
1337        self.assertEqual(len(results3), self.N)
1338
1339    @classmethod
1340    def _test_abort_and_reset_f(cls, barrier, barrier2,
1341                                results1, results2, results3):
1342        try:
1343            i = barrier.wait()
1344            if i == cls.N//2:
1345                raise RuntimeError
1346            barrier.wait()
1347            results1.append(True)
1348        except threading.BrokenBarrierError:
1349            results2.append(True)
1350        except RuntimeError:
1351            barrier.abort()
1352        # Synchronize and reset the barrier.  Must synchronize first so
1353        # that everyone has left it when we reset, and after so that no
1354        # one enters it before the reset.
1355        if barrier2.wait() == cls.N//2:
1356            barrier.reset()
1357        barrier2.wait()
1358        barrier.wait()
1359        results3.append(True)
1360
1361    def test_abort_and_reset(self):
1362        """
1363        Test that a barrier can be reset after being broken.
1364        """
1365        results1 = self.DummyList()
1366        results2 = self.DummyList()
1367        results3 = self.DummyList()
1368        barrier2 = self.Barrier(self.N)
1369
1370        self.run_threads(self._test_abort_and_reset_f,
1371                         (self.barrier, barrier2, results1, results2, results3))
1372        self.assertEqual(len(results1), 0)
1373        self.assertEqual(len(results2), self.N-1)
1374        self.assertEqual(len(results3), self.N)
1375
1376    @classmethod
1377    def _test_timeout_f(cls, barrier, results):
1378        i = barrier.wait()
1379        if i == cls.N//2:
1380            # One thread is late!
1381            time.sleep(1.0)
1382        try:
1383            barrier.wait(0.5)
1384        except threading.BrokenBarrierError:
1385            results.append(True)
1386
1387    def test_timeout(self):
1388        """
1389        Test wait(timeout)
1390        """
1391        results = self.DummyList()
1392        self.run_threads(self._test_timeout_f, (self.barrier, results))
1393        self.assertEqual(len(results), self.barrier.parties)
1394
1395    @classmethod
1396    def _test_default_timeout_f(cls, barrier, results):
1397        i = barrier.wait(cls.defaultTimeout)
1398        if i == cls.N//2:
1399            # One thread is later than the default timeout
1400            time.sleep(1.0)
1401        try:
1402            barrier.wait()
1403        except threading.BrokenBarrierError:
1404            results.append(True)
1405
1406    def test_default_timeout(self):
1407        """
1408        Test the barrier's default timeout
1409        """
1410        barrier = self.Barrier(self.N, timeout=0.5)
1411        results = self.DummyList()
1412        self.run_threads(self._test_default_timeout_f, (barrier, results))
1413        self.assertEqual(len(results), barrier.parties)
1414
1415    def test_single_thread(self):
1416        b = self.Barrier(1)
1417        b.wait()
1418        b.wait()
1419
1420    @classmethod
1421    def _test_thousand_f(cls, barrier, passes, conn, lock):
1422        for i in range(passes):
1423            barrier.wait()
1424            with lock:
1425                conn.send(i)
1426
1427    def test_thousand(self):
1428        if self.TYPE == 'manager':
1429            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1430        passes = 1000
1431        lock = self.Lock()
1432        conn, child_conn = self.Pipe(False)
1433        for j in range(self.N):
1434            p = self.Process(target=self._test_thousand_f,
1435                           args=(self.barrier, passes, child_conn, lock))
1436            p.start()
1437
1438        for i in range(passes):
1439            for j in range(self.N):
1440                self.assertEqual(conn.recv(), i)
1441
1442#
1443#
1444#
1445
1446class _TestValue(BaseTestCase):
1447
1448    ALLOWED_TYPES = ('processes',)
1449
1450    codes_values = [
1451        ('i', 4343, 24234),
1452        ('d', 3.625, -4.25),
1453        ('h', -232, 234),
1454        ('c', latin('x'), latin('y'))
1455        ]
1456
1457    def setUp(self):
1458        if not HAS_SHAREDCTYPES:
1459            self.skipTest("requires multiprocessing.sharedctypes")
1460
1461    @classmethod
1462    def _test(cls, values):
1463        for sv, cv in zip(values, cls.codes_values):
1464            sv.value = cv[2]
1465
1466
1467    def test_value(self, raw=False):
1468        if raw:
1469            values = [self.RawValue(code, value)
1470                      for code, value, _ in self.codes_values]
1471        else:
1472            values = [self.Value(code, value)
1473                      for code, value, _ in self.codes_values]
1474
1475        for sv, cv in zip(values, self.codes_values):
1476            self.assertEqual(sv.value, cv[1])
1477
1478        proc = self.Process(target=self._test, args=(values,))
1479        proc.daemon = True
1480        proc.start()
1481        proc.join()
1482
1483        for sv, cv in zip(values, self.codes_values):
1484            self.assertEqual(sv.value, cv[2])
1485
1486    def test_rawvalue(self):
1487        self.test_value(raw=True)
1488
1489    def test_getobj_getlock(self):
1490        val1 = self.Value('i', 5)
1491        lock1 = val1.get_lock()
1492        obj1 = val1.get_obj()
1493
1494        val2 = self.Value('i', 5, lock=None)
1495        lock2 = val2.get_lock()
1496        obj2 = val2.get_obj()
1497
1498        lock = self.Lock()
1499        val3 = self.Value('i', 5, lock=lock)
1500        lock3 = val3.get_lock()
1501        obj3 = val3.get_obj()
1502        self.assertEqual(lock, lock3)
1503
1504        arr4 = self.Value('i', 5, lock=False)
1505        self.assertFalse(hasattr(arr4, 'get_lock'))
1506        self.assertFalse(hasattr(arr4, 'get_obj'))
1507
1508        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
1509
1510        arr5 = self.RawValue('i', 5)
1511        self.assertFalse(hasattr(arr5, 'get_lock'))
1512        self.assertFalse(hasattr(arr5, 'get_obj'))
1513
1514
1515class _TestArray(BaseTestCase):
1516
1517    ALLOWED_TYPES = ('processes',)
1518
1519    @classmethod
1520    def f(cls, seq):
1521        for i in range(1, len(seq)):
1522            seq[i] += seq[i-1]
1523
1524    @unittest.skipIf(c_int is None, "requires _ctypes")
1525    def test_array(self, raw=False):
1526        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
1527        if raw:
1528            arr = self.RawArray('i', seq)
1529        else:
1530            arr = self.Array('i', seq)
1531
1532        self.assertEqual(len(arr), len(seq))
1533        self.assertEqual(arr[3], seq[3])
1534        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
1535
1536        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
1537
1538        self.assertEqual(list(arr[:]), seq)
1539
1540        self.f(seq)
1541
1542        p = self.Process(target=self.f, args=(arr,))
1543        p.daemon = True
1544        p.start()
1545        p.join()
1546
1547        self.assertEqual(list(arr[:]), seq)
1548
1549    @unittest.skipIf(c_int is None, "requires _ctypes")
1550    def test_array_from_size(self):
1551        size = 10
1552        # Test for zeroing (see issue #11675).
1553        # The repetition below strengthens the test by increasing the chances
1554        # of previously allocated non-zero memory being used for the new array
1555        # on the 2nd and 3rd loops.
1556        for _ in range(3):
1557            arr = self.Array('i', size)
1558            self.assertEqual(len(arr), size)
1559            self.assertEqual(list(arr), [0] * size)
1560            arr[:] = range(10)
1561            self.assertEqual(list(arr), list(range(10)))
1562            del arr
1563
1564    @unittest.skipIf(c_int is None, "requires _ctypes")
1565    def test_rawarray(self):
1566        self.test_array(raw=True)
1567
1568    @unittest.skipIf(c_int is None, "requires _ctypes")
1569    def test_getobj_getlock_obj(self):
1570        arr1 = self.Array('i', list(range(10)))
1571        lock1 = arr1.get_lock()
1572        obj1 = arr1.get_obj()
1573
1574        arr2 = self.Array('i', list(range(10)), lock=None)
1575        lock2 = arr2.get_lock()
1576        obj2 = arr2.get_obj()
1577
1578        lock = self.Lock()
1579        arr3 = self.Array('i', list(range(10)), lock=lock)
1580        lock3 = arr3.get_lock()
1581        obj3 = arr3.get_obj()
1582        self.assertEqual(lock, lock3)
1583
1584        arr4 = self.Array('i', range(10), lock=False)
1585        self.assertFalse(hasattr(arr4, 'get_lock'))
1586        self.assertFalse(hasattr(arr4, 'get_obj'))
1587        self.assertRaises(AttributeError,
1588                          self.Array, 'i', range(10), lock='notalock')
1589
1590        arr5 = self.RawArray('i', range(10))
1591        self.assertFalse(hasattr(arr5, 'get_lock'))
1592        self.assertFalse(hasattr(arr5, 'get_obj'))
1593
1594#
1595#
1596#
1597
1598class _TestContainers(BaseTestCase):
1599
1600    ALLOWED_TYPES = ('manager',)
1601
1602    def test_list(self):
1603        a = self.list(list(range(10)))
1604        self.assertEqual(a[:], list(range(10)))
1605
1606        b = self.list()
1607        self.assertEqual(b[:], [])
1608
1609        b.extend(list(range(5)))
1610        self.assertEqual(b[:], list(range(5)))
1611
1612        self.assertEqual(b[2], 2)
1613        self.assertEqual(b[2:10], [2,3,4])
1614
1615        b *= 2
1616        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
1617
1618        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
1619
1620        self.assertEqual(a[:], list(range(10)))
1621
1622        d = [a, b]
1623        e = self.list(d)
1624        self.assertEqual(
1625            e[:],
1626            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
1627            )
1628
1629        f = self.list([a])
1630        a.append('hello')
1631        self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']])
1632
1633    def test_dict(self):
1634        d = self.dict()
1635        indices = list(range(65, 70))
1636        for i in indices:
1637            d[i] = chr(i)
1638        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
1639        self.assertEqual(sorted(d.keys()), indices)
1640        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
1641        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
1642
1643    def test_namespace(self):
1644        n = self.Namespace()
1645        n.name = 'Bob'
1646        n.job = 'Builder'
1647        n._hidden = 'hidden'
1648        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
1649        del n.job
1650        self.assertEqual(str(n), "Namespace(name='Bob')")
1651        self.assertTrue(hasattr(n, 'name'))
1652        self.assertTrue(not hasattr(n, 'job'))
1653
1654#
1655#
1656#
1657
1658def sqr(x, wait=0.0):
1659    time.sleep(wait)
1660    return x*x
1661
1662def mul(x, y):
1663    return x*y
1664
1665class SayWhenError(ValueError): pass
1666
1667def exception_throwing_generator(total, when):
1668    for i in range(total):
1669        if i == when:
1670            raise SayWhenError("Somebody said when")
1671        yield i
1672
1673class _TestPool(BaseTestCase):
1674
1675    @classmethod
1676    def setUpClass(cls):
1677        super().setUpClass()
1678        cls.pool = cls.Pool(4)
1679
1680    @classmethod
1681    def tearDownClass(cls):
1682        cls.pool.terminate()
1683        cls.pool.join()
1684        cls.pool = None
1685        super().tearDownClass()
1686
1687    def test_apply(self):
1688        papply = self.pool.apply
1689        self.assertEqual(papply(sqr, (5,)), sqr(5))
1690        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
1691
1692    def test_map(self):
1693        pmap = self.pool.map
1694        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
1695        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
1696                         list(map(sqr, list(range(100)))))
1697
1698    def test_starmap(self):
1699        psmap = self.pool.starmap
1700        tuples = list(zip(range(10), range(9,-1, -1)))
1701        self.assertEqual(psmap(mul, tuples),
1702                         list(itertools.starmap(mul, tuples)))
1703        tuples = list(zip(range(100), range(99,-1, -1)))
1704        self.assertEqual(psmap(mul, tuples, chunksize=20),
1705                         list(itertools.starmap(mul, tuples)))
1706
1707    def test_starmap_async(self):
1708        tuples = list(zip(range(100), range(99,-1, -1)))
1709        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
1710                         list(itertools.starmap(mul, tuples)))
1711
1712    def test_map_async(self):
1713        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
1714                         list(map(sqr, list(range(10)))))
1715
1716    def _test_map_async_callbacks(self):
1717        call_args = self.manager.list() if self.TYPE == 'manager' else []
1718        self.pool.map_async(int, ['1'],
1719                            callback=call_args.append,
1720                            error_callback=call_args.append).wait()
1721        self.assertEqual(1, len(call_args))
1722        self.assertEqual([1], call_args[0])
1723        self.pool.map_async(int, ['a'],
1724                            callback=call_args.append,
1725                            error_callback=call_args.append).wait()
1726        self.assertEqual(2, len(call_args))
1727        self.assertIsInstance(call_args[1], ValueError)
1728
1729    def test_map_unplicklable(self):
1730        # Issue #19425 -- failure to pickle should not cause a hang
1731        if self.TYPE == 'threads':
1732            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1733        class A(object):
1734            def __reduce__(self):
1735                raise RuntimeError('cannot pickle')
1736        with self.assertRaises(RuntimeError):
1737            self.pool.map(sqr, [A()]*10)
1738
1739    def test_map_chunksize(self):
1740        try:
1741            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
1742        except multiprocessing.TimeoutError:
1743            self.fail("pool.map_async with chunksize stalled on null list")
1744
1745    def test_async(self):
1746        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
1747        get = TimingWrapper(res.get)
1748        self.assertEqual(get(), 49)
1749        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1750
1751    def test_async_timeout(self):
1752        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
1753        get = TimingWrapper(res.get)
1754        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
1755        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
1756
1757    def test_imap(self):
1758        it = self.pool.imap(sqr, list(range(10)))
1759        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
1760
1761        it = self.pool.imap(sqr, list(range(10)))
1762        for i in range(10):
1763            self.assertEqual(next(it), i*i)
1764        self.assertRaises(StopIteration, it.__next__)
1765
1766        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
1767        for i in range(1000):
1768            self.assertEqual(next(it), i*i)
1769        self.assertRaises(StopIteration, it.__next__)
1770
1771    def test_imap_handle_iterable_exception(self):
1772        if self.TYPE == 'manager':
1773            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1774
1775        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
1776        for i in range(3):
1777            self.assertEqual(next(it), i*i)
1778        self.assertRaises(SayWhenError, it.__next__)
1779
1780        # SayWhenError seen at start of problematic chunk's results
1781        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
1782        for i in range(6):
1783            self.assertEqual(next(it), i*i)
1784        self.assertRaises(SayWhenError, it.__next__)
1785        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
1786        for i in range(4):
1787            self.assertEqual(next(it), i*i)
1788        self.assertRaises(SayWhenError, it.__next__)
1789
1790    def test_imap_unordered(self):
1791        it = self.pool.imap_unordered(sqr, list(range(1000)))
1792        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1793
1794        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53)
1795        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
1796
1797    def test_imap_unordered_handle_iterable_exception(self):
1798        if self.TYPE == 'manager':
1799            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1800
1801        it = self.pool.imap_unordered(sqr,
1802                                      exception_throwing_generator(10, 3),
1803                                      1)
1804        expected_values = list(map(sqr, list(range(10))))
1805        with self.assertRaises(SayWhenError):
1806            # imap_unordered makes it difficult to anticipate the SayWhenError
1807            for i in range(10):
1808                value = next(it)
1809                self.assertIn(value, expected_values)
1810                expected_values.remove(value)
1811
1812        it = self.pool.imap_unordered(sqr,
1813                                      exception_throwing_generator(20, 7),
1814                                      2)
1815        expected_values = list(map(sqr, list(range(20))))
1816        with self.assertRaises(SayWhenError):
1817            for i in range(20):
1818                value = next(it)
1819                self.assertIn(value, expected_values)
1820                expected_values.remove(value)
1821
1822    def test_make_pool(self):
1823        self.assertRaises(ValueError, multiprocessing.Pool, -1)
1824        self.assertRaises(ValueError, multiprocessing.Pool, 0)
1825
1826        p = multiprocessing.Pool(3)
1827        self.assertEqual(3, len(p._pool))
1828        p.close()
1829        p.join()
1830
1831    def test_terminate(self):
1832        result = self.pool.map_async(
1833            time.sleep, [0.1 for i in range(10000)], chunksize=1
1834            )
1835        self.pool.terminate()
1836        join = TimingWrapper(self.pool.join)
1837        join()
1838        self.assertLess(join.elapsed, 0.5)
1839
1840    def test_empty_iterable(self):
1841        # See Issue 12157
1842        p = self.Pool(1)
1843
1844        self.assertEqual(p.map(sqr, []), [])
1845        self.assertEqual(list(p.imap(sqr, [])), [])
1846        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
1847        self.assertEqual(p.map_async(sqr, []).get(), [])
1848
1849        p.close()
1850        p.join()
1851
1852    def test_context(self):
1853        if self.TYPE == 'processes':
1854            L = list(range(10))
1855            expected = [sqr(i) for i in L]
1856            with multiprocessing.Pool(2) as p:
1857                r = p.map_async(sqr, L)
1858                self.assertEqual(r.get(), expected)
1859            self.assertRaises(ValueError, p.map_async, sqr, L)
1860
1861    @classmethod
1862    def _test_traceback(cls):
1863        raise RuntimeError(123) # some comment
1864
1865    @unittest.skipIf(True, "fails with is_dill(obj, child=True)")
1866    def test_traceback(self):
1867        # We want ensure that the traceback from the child process is
1868        # contained in the traceback raised in the main process.
1869        if self.TYPE == 'processes':
1870            with self.Pool(1) as p:
1871                try:
1872                    p.apply(self._test_traceback)
1873                except Exception as e:
1874                    exc = e
1875                else:
1876                    raise AssertionError('expected RuntimeError')
1877            self.assertIs(type(exc), RuntimeError)
1878            self.assertEqual(exc.args, (123,))
1879            cause = exc.__cause__
1880            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
1881            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
1882
1883            with test.support.captured_stderr() as f1:
1884                try:
1885                    raise exc
1886                except RuntimeError:
1887                    sys.excepthook(*sys.exc_info())
1888            self.assertIn('raise RuntimeError(123) # some comment',
1889                          f1.getvalue())
1890
1891    @classmethod
1892    def _test_wrapped_exception(cls):
1893        raise RuntimeError('foo')
1894
1895    @unittest.skipIf(True, "fails with is_dill(obj, child=True)")
1896    def test_wrapped_exception(self):
1897        # Issue #20980: Should not wrap exception when using thread pool
1898        with self.Pool(1) as p:
1899            with self.assertRaises(RuntimeError):
1900                p.apply(self._test_wrapped_exception)
1901
1902
1903def raising():
1904    raise KeyError("key")
1905
1906def unpickleable_result():
1907    return lambda: 42
1908
1909class _TestPoolWorkerErrors(BaseTestCase):
1910    ALLOWED_TYPES = ('processes', )
1911
1912    def test_async_error_callback(self):
1913        p = multiprocessing.Pool(2)
1914
1915        scratchpad = [None]
1916        def errback(exc):
1917            scratchpad[0] = exc
1918
1919        res = p.apply_async(raising, error_callback=errback)
1920        self.assertRaises(KeyError, res.get)
1921        self.assertTrue(scratchpad[0])
1922        self.assertIsInstance(scratchpad[0], KeyError)
1923
1924        p.close()
1925        p.join()
1926
1927    def _test_unpickleable_result(self):
1928        from multiprocess.pool import MaybeEncodingError
1929        p = multiprocessing.Pool(2)
1930
1931        # Make sure we don't lose pool processes because of encoding errors.
1932        for iteration in range(20):
1933
1934            scratchpad = [None]
1935            def errback(exc):
1936                scratchpad[0] = exc
1937
1938            res = p.apply_async(unpickleable_result, error_callback=errback)
1939            self.assertRaises(MaybeEncodingError, res.get)
1940            wrapped = scratchpad[0]
1941            self.assertTrue(wrapped)
1942            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
1943            self.assertIsNotNone(wrapped.exc)
1944            self.assertIsNotNone(wrapped.value)
1945
1946        p.close()
1947        p.join()
1948
1949class _TestPoolWorkerLifetime(BaseTestCase):
1950    ALLOWED_TYPES = ('processes', )
1951
1952    def test_pool_worker_lifetime(self):
1953        p = multiprocessing.Pool(3, maxtasksperchild=10)
1954        self.assertEqual(3, len(p._pool))
1955        origworkerpids = [w.pid for w in p._pool]
1956        # Run many tasks so each worker gets replaced (hopefully)
1957        results = []
1958        for i in range(100):
1959            results.append(p.apply_async(sqr, (i, )))
1960        # Fetch the results and verify we got the right answers,
1961        # also ensuring all the tasks have completed.
1962        for (j, res) in enumerate(results):
1963            self.assertEqual(res.get(), sqr(j))
1964        # Refill the pool
1965        p._repopulate_pool()
1966        # Wait until all workers are alive
1967        # (countdown * DELTA = 5 seconds max startup process time)
1968        countdown = 50
1969        while countdown and not all(w.is_alive() for w in p._pool):
1970            countdown -= 1
1971            time.sleep(DELTA)
1972        finalworkerpids = [w.pid for w in p._pool]
1973        # All pids should be assigned.  See issue #7805.
1974        self.assertNotIn(None, origworkerpids)
1975        self.assertNotIn(None, finalworkerpids)
1976        # Finally, check that the worker pids have changed
1977        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
1978        p.close()
1979        p.join()
1980
1981    def test_pool_worker_lifetime_early_close(self):
1982        # Issue #10332: closing a pool whose workers have limited lifetimes
1983        # before all the tasks completed would make join() hang.
1984        p = multiprocessing.Pool(3, maxtasksperchild=1)
1985        results = []
1986        for i in range(6):
1987            results.append(p.apply_async(sqr, (i, 0.3)))
1988        p.close()
1989        p.join()
1990        # check the results
1991        for (j, res) in enumerate(results):
1992            self.assertEqual(res.get(), sqr(j))
1993
1994#
1995# Test of creating a customized manager class
1996#
1997
1998from multiprocess.managers import BaseManager, BaseProxy, RemoteError
1999
2000class FooBar(object):
2001    def f(self):
2002        return 'f()'
2003    def g(self):
2004        raise ValueError
2005    def _h(self):
2006        return '_h()'
2007
2008def baz():
2009    for i in range(10):
2010        yield i*i
2011
2012class IteratorProxy(BaseProxy):
2013    _exposed_ = ('__next__',)
2014    def __iter__(self):
2015        return self
2016    def __next__(self):
2017        return self._callmethod('__next__')
2018
2019class MyManager(BaseManager):
2020    pass
2021
2022MyManager.register('Foo', callable=FooBar)
2023MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2024MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2025
2026
2027class _TestMyManager(BaseTestCase):
2028
2029    ALLOWED_TYPES = ('manager',)
2030
2031    def test_mymanager(self):
2032        manager = MyManager()
2033        manager.start()
2034        self.common(manager)
2035        manager.shutdown()
2036
2037        # If the manager process exited cleanly then the exitcode
2038        # will be zero.  Otherwise (after a short timeout)
2039        # terminate() is used, resulting in an exitcode of -SIGTERM.
2040        self.assertEqual(manager._process.exitcode, 0)
2041
2042    def test_mymanager_context(self):
2043        with MyManager() as manager:
2044            self.common(manager)
2045        self.assertEqual(manager._process.exitcode, 0)
2046
2047    def test_mymanager_context_prestarted(self):
2048        manager = MyManager()
2049        manager.start()
2050        with manager:
2051            self.common(manager)
2052        self.assertEqual(manager._process.exitcode, 0)
2053
2054    def common(self, manager):
2055        foo = manager.Foo()
2056        bar = manager.Bar()
2057        baz = manager.baz()
2058
2059        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2060        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2061
2062        self.assertEqual(foo_methods, ['f', 'g'])
2063        self.assertEqual(bar_methods, ['f', '_h'])
2064
2065        self.assertEqual(foo.f(), 'f()')
2066        self.assertRaises(ValueError, foo.g)
2067        self.assertEqual(foo._callmethod('f'), 'f()')
2068        self.assertRaises(RemoteError, foo._callmethod, '_h')
2069
2070        self.assertEqual(bar.f(), 'f()')
2071        self.assertEqual(bar._h(), '_h()')
2072        self.assertEqual(bar._callmethod('f'), 'f()')
2073        self.assertEqual(bar._callmethod('_h'), '_h()')
2074
2075        self.assertEqual(list(baz), [i*i for i in range(10)])
2076
2077
2078#
2079# Test of connecting to a remote server and using xmlrpclib for serialization
2080#
2081
2082_queue = pyqueue.Queue()
2083def get_queue():
2084    return _queue
2085
2086class QueueManager(BaseManager):
2087    '''manager class used by server process'''
2088QueueManager.register('get_queue', callable=get_queue)
2089
2090class QueueManager2(BaseManager):
2091    '''manager class which specifies the same interface as QueueManager'''
2092QueueManager2.register('get_queue')
2093
2094
2095SERIALIZER = 'xmlrpclib'
2096
2097class _TestRemoteManager(BaseTestCase):
2098
2099    ALLOWED_TYPES = ('manager',)
2100    values = ['hello world', None, True, 2.25,
2101              'hall\xe5 v\xe4rlden',
2102              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2103              b'hall\xe5 v\xe4rlden',
2104             ]
2105    result = values[:]
2106
2107    @classmethod
2108    def _putter(cls, address, authkey):
2109        manager = QueueManager2(
2110            address=address, authkey=authkey, serializer=SERIALIZER
2111            )
2112        manager.connect()
2113        queue = manager.get_queue()
2114        # Note that xmlrpclib will deserialize object as a list not a tuple
2115        queue.put(tuple(cls.values))
2116
2117    def test_remote(self):
2118        authkey = os.urandom(32)
2119
2120        manager = QueueManager(
2121            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2122            )
2123        manager.start()
2124
2125        p = self.Process(target=self._putter, args=(manager.address, authkey))
2126        p.daemon = True
2127        p.start()
2128
2129        manager2 = QueueManager2(
2130            address=manager.address, authkey=authkey, serializer=SERIALIZER
2131            )
2132        manager2.connect()
2133        queue = manager2.get_queue()
2134
2135        self.assertEqual(queue.get(), self.result)
2136
2137        # Because we are using xmlrpclib for serialization instead of
2138        # pickle this will cause a serialization error.
2139        self.assertRaises(Exception, queue.put, time.sleep)
2140
2141        # Make queue finalizer run before the server is stopped
2142        del queue
2143        manager.shutdown()
2144
2145class _TestManagerRestart(BaseTestCase):
2146
2147    @classmethod
2148    def _putter(cls, address, authkey):
2149        manager = QueueManager(
2150            address=address, authkey=authkey, serializer=SERIALIZER)
2151        manager.connect()
2152        queue = manager.get_queue()
2153        queue.put('hello world')
2154
2155    def test_rapid_restart(self):
2156        authkey = os.urandom(32)
2157        manager = QueueManager(
2158            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2159        srvr = manager.get_server()
2160        addr = srvr.address
2161        # Close the connection.Listener socket which gets opened as a part
2162        # of manager.get_server(). It's not needed for the test.
2163        srvr.listener.close()
2164        manager.start()
2165
2166        p = self.Process(target=self._putter, args=(manager.address, authkey))
2167        p.daemon = True
2168        p.start()
2169        queue = manager.get_queue()
2170        self.assertEqual(queue.get(), 'hello world')
2171        del queue
2172        manager.shutdown()
2173        manager = QueueManager(
2174            address=addr, authkey=authkey, serializer=SERIALIZER)
2175        try:
2176            manager.start()
2177        except OSError as e:
2178            if e.errno != errno.EADDRINUSE:
2179                raise
2180            # Retry after some time, in case the old socket was lingering
2181            # (sporadic failure on buildbots)
2182            time.sleep(1.0)
2183            manager = QueueManager(
2184                address=addr, authkey=authkey, serializer=SERIALIZER)
2185        manager.shutdown()
2186
2187#
2188#
2189#
2190
2191SENTINEL = latin('')
2192
2193class _TestConnection(BaseTestCase):
2194
2195    ALLOWED_TYPES = ('processes', 'threads')
2196
2197    @classmethod
2198    def _echo(cls, conn):
2199        for msg in iter(conn.recv_bytes, SENTINEL):
2200            conn.send_bytes(msg)
2201        conn.close()
2202
2203    def test_connection(self):
2204        conn, child_conn = self.Pipe()
2205
2206        p = self.Process(target=self._echo, args=(child_conn,))
2207        p.daemon = True
2208        p.start()
2209
2210        seq = [1, 2.25, None]
2211        msg = latin('hello world')
2212        longmsg = msg * 10
2213        arr = array.array('i', list(range(4)))
2214
2215        if self.TYPE == 'processes':
2216            self.assertEqual(type(conn.fileno()), int)
2217
2218        self.assertEqual(conn.send(seq), None)
2219        self.assertEqual(conn.recv(), seq)
2220
2221        self.assertEqual(conn.send_bytes(msg), None)
2222        self.assertEqual(conn.recv_bytes(), msg)
2223
2224        if self.TYPE == 'processes':
2225            buffer = array.array('i', [0]*10)
2226            expected = list(arr) + [0] * (10 - len(arr))
2227            self.assertEqual(conn.send_bytes(arr), None)
2228            self.assertEqual(conn.recv_bytes_into(buffer),
2229                             len(arr) * buffer.itemsize)
2230            self.assertEqual(list(buffer), expected)
2231
2232            buffer = array.array('i', [0]*10)
2233            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
2234            self.assertEqual(conn.send_bytes(arr), None)
2235            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
2236                             len(arr) * buffer.itemsize)
2237            self.assertEqual(list(buffer), expected)
2238
2239            buffer = bytearray(latin(' ' * 40))
2240            self.assertEqual(conn.send_bytes(longmsg), None)
2241            try:
2242                res = conn.recv_bytes_into(buffer)
2243            except multiprocessing.BufferTooShort as e:
2244                self.assertEqual(e.args, (longmsg,))
2245            else:
2246                self.fail('expected BufferTooShort, got %s' % res)
2247
2248        poll = TimingWrapper(conn.poll)
2249
2250        self.assertEqual(poll(), False)
2251        self.assertTimingAlmostEqual(poll.elapsed, 0)
2252
2253        self.assertEqual(poll(-1), False)
2254        self.assertTimingAlmostEqual(poll.elapsed, 0)
2255
2256        self.assertEqual(poll(TIMEOUT1), False)
2257        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
2258
2259        conn.send(None)
2260        time.sleep(.1)
2261
2262        self.assertEqual(poll(TIMEOUT1), True)
2263        self.assertTimingAlmostEqual(poll.elapsed, 0)
2264
2265        self.assertEqual(conn.recv(), None)
2266
2267        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
2268        conn.send_bytes(really_big_msg)
2269        self.assertEqual(conn.recv_bytes(), really_big_msg)
2270
2271        conn.send_bytes(SENTINEL)                          # tell child to quit
2272        child_conn.close()
2273
2274        if self.TYPE == 'processes':
2275            self.assertEqual(conn.readable, True)
2276            self.assertEqual(conn.writable, True)
2277            self.assertRaises(EOFError, conn.recv)
2278            self.assertRaises(EOFError, conn.recv_bytes)
2279
2280        p.join()
2281
2282    def test_duplex_false(self):
2283        reader, writer = self.Pipe(duplex=False)
2284        self.assertEqual(writer.send(1), None)
2285        self.assertEqual(reader.recv(), 1)
2286        if self.TYPE == 'processes':
2287            self.assertEqual(reader.readable, True)
2288            self.assertEqual(reader.writable, False)
2289            self.assertEqual(writer.readable, False)
2290            self.assertEqual(writer.writable, True)
2291            self.assertRaises(OSError, reader.send, 2)
2292            self.assertRaises(OSError, writer.recv)
2293            self.assertRaises(OSError, writer.poll)
2294
2295    def test_spawn_close(self):
2296        # We test that a pipe connection can be closed by parent
2297        # process immediately after child is spawned.  On Windows this
2298        # would have sometimes failed on old versions because
2299        # child_conn would be closed before the child got a chance to
2300        # duplicate it.
2301        conn, child_conn = self.Pipe()
2302
2303        p = self.Process(target=self._echo, args=(child_conn,))
2304        p.daemon = True
2305        p.start()
2306        child_conn.close()    # this might complete before child initializes
2307
2308        msg = latin('hello')
2309        conn.send_bytes(msg)
2310        self.assertEqual(conn.recv_bytes(), msg)
2311
2312        conn.send_bytes(SENTINEL)
2313        conn.close()
2314        p.join()
2315
2316    def test_sendbytes(self):
2317        if self.TYPE != 'processes':
2318            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2319
2320        msg = latin('abcdefghijklmnopqrstuvwxyz')
2321        a, b = self.Pipe()
2322
2323        a.send_bytes(msg)
2324        self.assertEqual(b.recv_bytes(), msg)
2325
2326        a.send_bytes(msg, 5)
2327        self.assertEqual(b.recv_bytes(), msg[5:])
2328
2329        a.send_bytes(msg, 7, 8)
2330        self.assertEqual(b.recv_bytes(), msg[7:7+8])
2331
2332        a.send_bytes(msg, 26)
2333        self.assertEqual(b.recv_bytes(), latin(''))
2334
2335        a.send_bytes(msg, 26, 0)
2336        self.assertEqual(b.recv_bytes(), latin(''))
2337
2338        self.assertRaises(ValueError, a.send_bytes, msg, 27)
2339
2340        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
2341
2342        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
2343
2344        self.assertRaises(ValueError, a.send_bytes, msg, -1)
2345
2346        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
2347
2348    @classmethod
2349    def _is_fd_assigned(cls, fd):
2350        try:
2351            os.fstat(fd)
2352        except OSError as e:
2353            if e.errno == errno.EBADF:
2354                return False
2355            raise
2356        else:
2357            return True
2358
2359    @classmethod
2360    def _writefd(cls, conn, data, create_dummy_fds=False):
2361        if create_dummy_fds:
2362            for i in range(0, 256):
2363                if not cls._is_fd_assigned(i):
2364                    os.dup2(conn.fileno(), i)
2365        fd = reduction.recv_handle(conn)
2366        if msvcrt:
2367            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
2368        os.write(fd, data)
2369        os.close(fd)
2370
2371    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2372    def test_fd_transfer(self):
2373        if self.TYPE != 'processes':
2374            self.skipTest("only makes sense with processes")
2375        conn, child_conn = self.Pipe(duplex=True)
2376
2377        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
2378        p.daemon = True
2379        p.start()
2380        self.addCleanup(test.support.unlink, test.support.TESTFN)
2381        with open(test.support.TESTFN, "wb") as f:
2382            fd = f.fileno()
2383            if msvcrt:
2384                fd = msvcrt.get_osfhandle(fd)
2385            reduction.send_handle(conn, fd, p.pid)
2386        p.join()
2387        with open(test.support.TESTFN, "rb") as f:
2388            self.assertEqual(f.read(), b"foo")
2389
2390    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2391    @unittest.skipIf(sys.platform == "win32",
2392                     "test semantics don't make sense on Windows")
2393    @unittest.skipIf(MAXFD <= 256,
2394                     "largest assignable fd number is too small")
2395    @unittest.skipUnless(hasattr(os, "dup2"),
2396                         "test needs os.dup2()")
2397    def test_large_fd_transfer(self):
2398        # With fd > 256 (issue #11657)
2399        if self.TYPE != 'processes':
2400            self.skipTest("only makes sense with processes")
2401        conn, child_conn = self.Pipe(duplex=True)
2402
2403        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
2404        p.daemon = True
2405        p.start()
2406        self.addCleanup(test.support.unlink, test.support.TESTFN)
2407        with open(test.support.TESTFN, "wb") as f:
2408            fd = f.fileno()
2409            for newfd in range(256, MAXFD):
2410                if not self._is_fd_assigned(newfd):
2411                    break
2412            else:
2413                self.fail("could not find an unassigned large file descriptor")
2414            os.dup2(fd, newfd)
2415            try:
2416                reduction.send_handle(conn, newfd, p.pid)
2417            finally:
2418                os.close(newfd)
2419        p.join()
2420        with open(test.support.TESTFN, "rb") as f:
2421            self.assertEqual(f.read(), b"bar")
2422
2423    @classmethod
2424    def _send_data_without_fd(self, conn):
2425        os.write(conn.fileno(), b"\0")
2426
2427    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2428    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
2429    def test_missing_fd_transfer(self):
2430        # Check that exception is raised when received data is not
2431        # accompanied by a file descriptor in ancillary data.
2432        if self.TYPE != 'processes':
2433            self.skipTest("only makes sense with processes")
2434        conn, child_conn = self.Pipe(duplex=True)
2435
2436        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
2437        p.daemon = True
2438        p.start()
2439        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
2440        p.join()
2441
2442    def test_context(self):
2443        a, b = self.Pipe()
2444
2445        with a, b:
2446            a.send(1729)
2447            self.assertEqual(b.recv(), 1729)
2448            if self.TYPE == 'processes':
2449                self.assertFalse(a.closed)
2450                self.assertFalse(b.closed)
2451
2452        if self.TYPE == 'processes':
2453            self.assertTrue(a.closed)
2454            self.assertTrue(b.closed)
2455            self.assertRaises(OSError, a.recv)
2456            self.assertRaises(OSError, b.recv)
2457
2458class _TestListener(BaseTestCase):
2459
2460    ALLOWED_TYPES = ('processes',)
2461
2462    def test_multiple_bind(self):
2463        for family in self.connection.families:
2464            l = self.connection.Listener(family=family)
2465            self.addCleanup(l.close)
2466            self.assertRaises(OSError, self.connection.Listener,
2467                              l.address, family)
2468
2469    def test_context(self):
2470        with self.connection.Listener() as l:
2471            with self.connection.Client(l.address) as c:
2472                with l.accept() as d:
2473                    c.send(1729)
2474                    self.assertEqual(d.recv(), 1729)
2475
2476        if self.TYPE == 'processes':
2477            self.assertRaises(OSError, l.accept)
2478
2479class _TestListenerClient(BaseTestCase):
2480
2481    ALLOWED_TYPES = ('processes', 'threads')
2482
2483    @classmethod
2484    def _test(cls, address):
2485        conn = cls.connection.Client(address)
2486        conn.send('hello')
2487        conn.close()
2488
2489    def test_listener_client(self):
2490        for family in self.connection.families:
2491            l = self.connection.Listener(family=family)
2492            p = self.Process(target=self._test, args=(l.address,))
2493            p.daemon = True
2494            p.start()
2495            conn = l.accept()
2496            self.assertEqual(conn.recv(), 'hello')
2497            p.join()
2498            l.close()
2499
2500    def test_issue14725(self):
2501        l = self.connection.Listener()
2502        p = self.Process(target=self._test, args=(l.address,))
2503        p.daemon = True
2504        p.start()
2505        time.sleep(1)
2506        # On Windows the client process should by now have connected,
2507        # written data and closed the pipe handle by now.  This causes
2508        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
2509        # 14725.
2510        conn = l.accept()
2511        self.assertEqual(conn.recv(), 'hello')
2512        conn.close()
2513        p.join()
2514        l.close()
2515
2516    def test_issue16955(self):
2517        for fam in self.connection.families:
2518            l = self.connection.Listener(family=fam)
2519            c = self.connection.Client(l.address)
2520            a = l.accept()
2521            a.send_bytes(b"hello")
2522            self.assertTrue(c.poll(1))
2523            a.close()
2524            c.close()
2525            l.close()
2526
2527class _TestPoll(BaseTestCase):
2528
2529    ALLOWED_TYPES = ('processes', 'threads')
2530
2531    def test_empty_string(self):
2532        a, b = self.Pipe()
2533        self.assertEqual(a.poll(), False)
2534        b.send_bytes(b'')
2535        self.assertEqual(a.poll(), True)
2536        self.assertEqual(a.poll(), True)
2537
2538    @classmethod
2539    def _child_strings(cls, conn, strings):
2540        for s in strings:
2541            time.sleep(0.1)
2542            conn.send_bytes(s)
2543        conn.close()
2544
2545    def test_strings(self):
2546        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
2547        a, b = self.Pipe()
2548        p = self.Process(target=self._child_strings, args=(b, strings))
2549        p.start()
2550
2551        for s in strings:
2552            for i in range(200):
2553                if a.poll(0.01):
2554                    break
2555            x = a.recv_bytes()
2556            self.assertEqual(s, x)
2557
2558        p.join()
2559
2560    @classmethod
2561    def _child_boundaries(cls, r):
2562        # Polling may "pull" a message in to the child process, but we
2563        # don't want it to pull only part of a message, as that would
2564        # corrupt the pipe for any other processes which might later
2565        # read from it.
2566        r.poll(5)
2567
2568    def test_boundaries(self):
2569        r, w = self.Pipe(False)
2570        p = self.Process(target=self._child_boundaries, args=(r,))
2571        p.start()
2572        time.sleep(2)
2573        L = [b"first", b"second"]
2574        for obj in L:
2575            w.send_bytes(obj)
2576        w.close()
2577        p.join()
2578        self.assertIn(r.recv_bytes(), L)
2579
2580    @classmethod
2581    def _child_dont_merge(cls, b):
2582        b.send_bytes(b'a')
2583        b.send_bytes(b'b')
2584        b.send_bytes(b'cd')
2585
2586    def test_dont_merge(self):
2587        a, b = self.Pipe()
2588        self.assertEqual(a.poll(0.0), False)
2589        self.assertEqual(a.poll(0.1), False)
2590
2591        p = self.Process(target=self._child_dont_merge, args=(b,))
2592        p.start()
2593
2594        self.assertEqual(a.recv_bytes(), b'a')
2595        self.assertEqual(a.poll(1.0), True)
2596        self.assertEqual(a.poll(1.0), True)
2597        self.assertEqual(a.recv_bytes(), b'b')
2598        self.assertEqual(a.poll(1.0), True)
2599        self.assertEqual(a.poll(1.0), True)
2600        self.assertEqual(a.poll(0.0), True)
2601        self.assertEqual(a.recv_bytes(), b'cd')
2602
2603        p.join()
2604
2605#
2606# Test of sending connection and socket objects between processes
2607#
2608
2609@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
2610class _TestPicklingConnections(BaseTestCase):
2611
2612    ALLOWED_TYPES = ('processes',)
2613
2614    @classmethod
2615    def tearDownClass(cls):
2616        from multiprocess import resource_sharer
2617        resource_sharer.stop(timeout=5)
2618
2619    @classmethod
2620    def _listener(cls, conn, families):
2621        for fam in families:
2622            l = cls.connection.Listener(family=fam)
2623            conn.send(l.address)
2624            new_conn = l.accept()
2625            conn.send(new_conn)
2626            new_conn.close()
2627            l.close()
2628
2629        l = socket.socket()
2630        l.bind((test.support.HOST, 0))
2631        l.listen()
2632        conn.send(l.getsockname())
2633        new_conn, addr = l.accept()
2634        conn.send(new_conn)
2635        new_conn.close()
2636        l.close()
2637
2638        conn.recv()
2639
2640    @classmethod
2641    def _remote(cls, conn):
2642        for (address, msg) in iter(conn.recv, None):
2643            client = cls.connection.Client(address)
2644            client.send(msg.upper())
2645            client.close()
2646
2647        address, msg = conn.recv()
2648        client = socket.socket()
2649        client.connect(address)
2650        client.sendall(msg.upper())
2651        client.close()
2652
2653        conn.close()
2654
2655    def test_pickling(self):
2656        families = self.connection.families
2657
2658        lconn, lconn0 = self.Pipe()
2659        lp = self.Process(target=self._listener, args=(lconn0, families))
2660        lp.daemon = True
2661        lp.start()
2662        lconn0.close()
2663
2664        rconn, rconn0 = self.Pipe()
2665        rp = self.Process(target=self._remote, args=(rconn0,))
2666        rp.daemon = True
2667        rp.start()
2668        rconn0.close()
2669
2670        for fam in families:
2671            msg = ('This connection uses family %s' % fam).encode('ascii')
2672            address = lconn.recv()
2673            rconn.send((address, msg))
2674            new_conn = lconn.recv()
2675            self.assertEqual(new_conn.recv(), msg.upper())
2676
2677        rconn.send(None)
2678
2679        msg = latin('This connection uses a normal socket')
2680        address = lconn.recv()
2681        rconn.send((address, msg))
2682        new_conn = lconn.recv()
2683        buf = []
2684        while True:
2685            s = new_conn.recv(100)
2686            if not s:
2687                break
2688            buf.append(s)
2689        buf = b''.join(buf)
2690        self.assertEqual(buf, msg.upper())
2691        new_conn.close()
2692
2693        lconn.send(None)
2694
2695        rconn.close()
2696        lconn.close()
2697
2698        lp.join()
2699        rp.join()
2700
2701    @classmethod
2702    def child_access(cls, conn):
2703        w = conn.recv()
2704        w.send('all is well')
2705        w.close()
2706
2707        r = conn.recv()
2708        msg = r.recv()
2709        conn.send(msg*2)
2710
2711        conn.close()
2712
2713    def test_access(self):
2714        # On Windows, if we do not specify a destination pid when
2715        # using DupHandle then we need to be careful to use the
2716        # correct access flags for DuplicateHandle(), or else
2717        # DupHandle.detach() will raise PermissionError.  For example,
2718        # for a read only pipe handle we should use
2719        # access=FILE_GENERIC_READ.  (Unfortunately
2720        # DUPLICATE_SAME_ACCESS does not work.)
2721        conn, child_conn = self.Pipe()
2722        p = self.Process(target=self.child_access, args=(child_conn,))
2723        p.daemon = True
2724        p.start()
2725        child_conn.close()
2726
2727        r, w = self.Pipe(duplex=False)
2728        conn.send(w)
2729        w.close()
2730        self.assertEqual(r.recv(), 'all is well')
2731        r.close()
2732
2733        r, w = self.Pipe(duplex=False)
2734        conn.send(r)
2735        r.close()
2736        w.send('foobar')
2737        w.close()
2738        self.assertEqual(conn.recv(), 'foobar'*2)
2739
2740#
2741#
2742#
2743
2744class _TestHeap(BaseTestCase):
2745
2746    ALLOWED_TYPES = ('processes',)
2747
2748    def test_heap(self):
2749        iterations = 5000
2750        maxblocks = 50
2751        blocks = []
2752
2753        # create and destroy lots of blocks of different sizes
2754        for i in range(iterations):
2755            size = int(random.lognormvariate(0, 1) * 1000)
2756            b = multiprocessing.heap.BufferWrapper(size)
2757            blocks.append(b)
2758            if len(blocks) > maxblocks:
2759                i = random.randrange(maxblocks)
2760                del blocks[i]
2761
2762        # get the heap object
2763        heap = multiprocessing.heap.BufferWrapper._heap
2764
2765        # verify the state of the heap
2766        all = []
2767        occupied = 0
2768        heap._lock.acquire()
2769        self.addCleanup(heap._lock.release)
2770        for L in list(heap._len_to_seq.values()):
2771            for arena, start, stop in L:
2772                all.append((heap._arenas.index(arena), start, stop,
2773                            stop-start, 'free'))
2774        for arena, start, stop in heap._allocated_blocks:
2775            all.append((heap._arenas.index(arena), start, stop,
2776                        stop-start, 'occupied'))
2777            occupied += (stop-start)
2778
2779        all.sort()
2780
2781        for i in range(len(all)-1):
2782            (arena, start, stop) = all[i][:3]
2783            (narena, nstart, nstop) = all[i+1][:3]
2784            self.assertTrue((arena != narena and nstart == 0) or
2785                            (stop == nstart))
2786
2787    def test_free_from_gc(self):
2788        # Check that freeing of blocks by the garbage collector doesn't deadlock
2789        # (issue #12352).
2790        # Make sure the GC is enabled, and set lower collection thresholds to
2791        # make collections more frequent (and increase the probability of
2792        # deadlock).
2793        if not gc.isenabled():
2794            gc.enable()
2795            self.addCleanup(gc.disable)
2796        thresholds = gc.get_threshold()
2797        self.addCleanup(gc.set_threshold, *thresholds)
2798        gc.set_threshold(10)
2799
2800        # perform numerous block allocations, with cyclic references to make
2801        # sure objects are collected asynchronously by the gc
2802        for i in range(5000):
2803            a = multiprocessing.heap.BufferWrapper(1)
2804            b = multiprocessing.heap.BufferWrapper(1)
2805            # circular references
2806            a.buddy = b
2807            b.buddy = a
2808
2809#
2810#
2811#
2812
2813class _Foo(Structure):
2814    _fields_ = [
2815        ('x', c_int),
2816        ('y', c_double)
2817        ]
2818
2819class _TestSharedCTypes(BaseTestCase):
2820
2821    ALLOWED_TYPES = ('processes',)
2822
2823    def setUp(self):
2824        if not HAS_SHAREDCTYPES:
2825            self.skipTest("requires multiprocessing.sharedctypes")
2826
2827    @classmethod
2828    def _double(cls, x, y, foo, arr, string):
2829        x.value *= 2
2830        y.value *= 2
2831        foo.x *= 2
2832        foo.y *= 2
2833        string.value *= 2
2834        for i in range(len(arr)):
2835            arr[i] *= 2
2836
2837    def test_sharedctypes(self, lock=False):
2838        x = Value('i', 7, lock=lock)
2839        y = Value(c_double, 1.0/3.0, lock=lock)
2840        foo = Value(_Foo, 3, 2, lock=lock)
2841        arr = self.Array('d', list(range(10)), lock=lock)
2842        string = self.Array('c', 20, lock=lock)
2843        string.value = latin('hello')
2844
2845        p = self.Process(target=self._double, args=(x, y, foo, arr, string))
2846        p.daemon = True
2847        p.start()
2848        p.join()
2849
2850        self.assertEqual(x.value, 14)
2851        self.assertAlmostEqual(y.value, 2.0/3.0)
2852        self.assertEqual(foo.x, 6)
2853        self.assertAlmostEqual(foo.y, 4.0)
2854        for i in range(10):
2855            self.assertAlmostEqual(arr[i], i*2)
2856        self.assertEqual(string.value, latin('hellohello'))
2857
2858    def test_synchronize(self):
2859        self.test_sharedctypes(lock=True)
2860
2861    def test_copy(self):
2862        foo = _Foo(2, 5.0)
2863        bar = copy(foo)
2864        foo.x = 0
2865        foo.y = 0
2866        self.assertEqual(bar.x, 2)
2867        self.assertAlmostEqual(bar.y, 5.0)
2868
2869#
2870#
2871#
2872
2873class _TestFinalize(BaseTestCase):
2874
2875    ALLOWED_TYPES = ('processes',)
2876
2877    @classmethod
2878    def _test_finalize(cls, conn):
2879        class Foo(object):
2880            pass
2881
2882        a = Foo()
2883        util.Finalize(a, conn.send, args=('a',))
2884        del a           # triggers callback for a
2885
2886        b = Foo()
2887        close_b = util.Finalize(b, conn.send, args=('b',))
2888        close_b()       # triggers callback for b
2889        close_b()       # does nothing because callback has already been called
2890        del b           # does nothing because callback has already been called
2891
2892        c = Foo()
2893        util.Finalize(c, conn.send, args=('c',))
2894
2895        d10 = Foo()
2896        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
2897
2898        d01 = Foo()
2899        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
2900        d02 = Foo()
2901        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
2902        d03 = Foo()
2903        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
2904
2905        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
2906
2907        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
2908
2909        # call multiprocessing's cleanup function then exit process without
2910        # garbage collecting locals
2911        util._exit_function()
2912        conn.close()
2913        os._exit(0)
2914
2915    def test_finalize(self):
2916        conn, child_conn = self.Pipe()
2917
2918        p = self.Process(target=self._test_finalize, args=(child_conn,))
2919        p.daemon = True
2920        p.start()
2921        p.join()
2922
2923        result = [obj for obj in iter(conn.recv, 'STOP')]
2924        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
2925
2926#
2927# Test that from ... import * works for each module
2928#
2929
2930class _TestImportStar(unittest.TestCase):
2931
2932    def get_module_names(self):
2933        import glob
2934        folder = os.path.dirname(multiprocessing.__file__)
2935        pattern = os.path.join(folder, '*.py')
2936        files = glob.glob(pattern)
2937        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
2938        modules = ['multiprocess.' + m for m in modules]
2939        modules.remove('multiprocess.__init__')
2940        modules.append('multiprocess')
2941        return modules
2942
2943    def test_import(self):
2944        modules = self.get_module_names()
2945        if sys.platform == 'win32':
2946            modules.remove('multiprocess.popen_fork')
2947            modules.remove('multiprocess.popen_forkserver')
2948            modules.remove('multiprocess.popen_spawn_posix')
2949        else:
2950            modules.remove('multiprocess.popen_spawn_win32')
2951            if not HAS_REDUCTION:
2952                modules.remove('multiprocess.popen_forkserver')
2953
2954        if c_int is None:
2955            # This module requires _ctypes
2956            modules.remove('multiprocess.sharedctypes')
2957
2958        for name in modules:
2959            __import__(name)
2960            mod = sys.modules[name]
2961            self.assertTrue(hasattr(mod, '__all__'), name)
2962
2963            for attr in mod.__all__:
2964                self.assertTrue(
2965                    hasattr(mod, attr),
2966                    '%r does not have attribute %r' % (mod, attr)
2967                    )
2968
2969#
2970# Quick test that logging works -- does not test logging output
2971#
2972
2973class _TestLogging(BaseTestCase):
2974
2975    ALLOWED_TYPES = ('processes',)
2976
2977    def test_enable_logging(self):
2978        logger = multiprocessing.get_logger()
2979        logger.setLevel(util.SUBWARNING)
2980        self.assertTrue(logger is not None)
2981        logger.debug('this will not be printed')
2982        logger.info('nor will this')
2983        logger.setLevel(LOG_LEVEL)
2984
2985    @classmethod
2986    def _test_level(cls, conn):
2987        logger = multiprocessing.get_logger()
2988        conn.send(logger.getEffectiveLevel())
2989
2990    def test_level(self):
2991        LEVEL1 = 32
2992        LEVEL2 = 37
2993
2994        logger = multiprocessing.get_logger()
2995        root_logger = logging.getLogger()
2996        root_level = root_logger.level
2997
2998        reader, writer = multiprocessing.Pipe(duplex=False)
2999
3000        logger.setLevel(LEVEL1)
3001        p = self.Process(target=self._test_level, args=(writer,))
3002        p.daemon = True
3003        p.start()
3004        self.assertEqual(LEVEL1, reader.recv())
3005
3006        logger.setLevel(logging.NOTSET)
3007        root_logger.setLevel(LEVEL2)
3008        p = self.Process(target=self._test_level, args=(writer,))
3009        p.daemon = True
3010        p.start()
3011        self.assertEqual(LEVEL2, reader.recv())
3012
3013        root_logger.setLevel(root_level)
3014        logger.setLevel(level=LOG_LEVEL)
3015
3016
3017# class _TestLoggingProcessName(BaseTestCase):
3018#
3019#     def handle(self, record):
3020#         assert record.processName == multiprocessing.current_process().name
3021#         self.__handled = True
3022#
3023#     def test_logging(self):
3024#         handler = logging.Handler()
3025#         handler.handle = self.handle
3026#         self.__handled = False
3027#         # Bypass getLogger() and side-effects
3028#         logger = logging.getLoggerClass()(
3029#                 'multiprocessing.test.TestLoggingProcessName')
3030#         logger.addHandler(handler)
3031#         logger.propagate = False
3032#
3033#         logger.warn('foo')
3034#         assert self.__handled
3035
3036#
3037# Check that Process.join() retries if os.waitpid() fails with EINTR
3038#
3039
3040class _TestPollEintr(BaseTestCase):
3041
3042    ALLOWED_TYPES = ('processes',)
3043
3044    @classmethod
3045    def _killer(cls, pid):
3046        time.sleep(0.1)
3047        os.kill(pid, signal.SIGUSR1)
3048
3049    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3050    def test_poll_eintr(self):
3051        got_signal = [False]
3052        def record(*args):
3053            got_signal[0] = True
3054        pid = os.getpid()
3055        oldhandler = signal.signal(signal.SIGUSR1, record)
3056        try:
3057            killer = self.Process(target=self._killer, args=(pid,))
3058            killer.start()
3059            try:
3060                p = self.Process(target=time.sleep, args=(2,))
3061                p.start()
3062                p.join()
3063            finally:
3064                killer.join()
3065            self.assertTrue(got_signal[0])
3066            self.assertEqual(p.exitcode, 0)
3067        finally:
3068            signal.signal(signal.SIGUSR1, oldhandler)
3069
3070#
3071# Test to verify handle verification, see issue 3321
3072#
3073
3074class TestInvalidHandle(unittest.TestCase):
3075
3076    @unittest.skipIf(WIN32, "skipped on Windows")
3077    def test_invalid_handles(self):
3078        conn = multiprocessing.connection.Connection(44977608)
3079        # check that poll() doesn't crash
3080        try:
3081            conn.poll()
3082        except (ValueError, OSError):
3083            pass
3084        finally:
3085            # Hack private attribute _handle to avoid printing an error
3086            # in conn.__del__
3087            conn._handle = None
3088        self.assertRaises((ValueError, OSError),
3089                          multiprocessing.connection.Connection, -1)
3090
3091
3092
3093class OtherTest(unittest.TestCase):
3094    # TODO: add more tests for deliver/answer challenge.
3095    def test_deliver_challenge_auth_failure(self):
3096        class _FakeConnection(object):
3097            def recv_bytes(self, size):
3098                return b'something bogus'
3099            def send_bytes(self, data):
3100                pass
3101        self.assertRaises(multiprocessing.AuthenticationError,
3102                          multiprocessing.connection.deliver_challenge,
3103                          _FakeConnection(), b'abc')
3104
3105    def test_answer_challenge_auth_failure(self):
3106        class _FakeConnection(object):
3107            def __init__(self):
3108                self.count = 0
3109            def recv_bytes(self, size):
3110                self.count += 1
3111                if self.count == 1:
3112                    return multiprocessing.connection.CHALLENGE
3113                elif self.count == 2:
3114                    return b'something bogus'
3115                return b''
3116            def send_bytes(self, data):
3117                pass
3118        self.assertRaises(multiprocessing.AuthenticationError,
3119                          multiprocessing.connection.answer_challenge,
3120                          _FakeConnection(), b'abc')
3121
3122#
3123# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
3124#
3125
3126def initializer(ns):
3127    ns.test += 1
3128
3129class TestInitializers(unittest.TestCase):
3130    def setUp(self):
3131        self.mgr = multiprocessing.Manager()
3132        self.ns = self.mgr.Namespace()
3133        self.ns.test = 0
3134
3135    def tearDown(self):
3136        self.mgr.shutdown()
3137        self.mgr.join()
3138
3139    def test_manager_initializer(self):
3140        m = multiprocessing.managers.SyncManager()
3141        self.assertRaises(TypeError, m.start, 1)
3142        m.start(initializer, (self.ns,))
3143        self.assertEqual(self.ns.test, 1)
3144        m.shutdown()
3145        m.join()
3146
3147    def test_pool_initializer(self):
3148        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
3149        p = multiprocessing.Pool(1, initializer, (self.ns,))
3150        p.close()
3151        p.join()
3152        self.assertEqual(self.ns.test, 1)
3153
3154#
3155# Issue 5155, 5313, 5331: Test process in processes
3156# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
3157#
3158
3159def _this_sub_process(q):
3160    try:
3161        item = q.get(block=False)
3162    except pyqueue.Empty:
3163        pass
3164
3165def _test_process(q):
3166    queue = multiprocessing.Queue()
3167    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
3168    subProc.daemon = True
3169    subProc.start()
3170    subProc.join()
3171
3172def _afunc(x):
3173    return x*x
3174
3175def pool_in_process():
3176    pool = multiprocessing.Pool(processes=4)
3177    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
3178    pool.close()
3179    pool.join()
3180
3181class _file_like(object):
3182    def __init__(self, delegate):
3183        self._delegate = delegate
3184        self._pid = None
3185
3186    @property
3187    def cache(self):
3188        pid = os.getpid()
3189        # There are no race conditions since fork keeps only the running thread
3190        if pid != self._pid:
3191            self._pid = pid
3192            self._cache = []
3193        return self._cache
3194
3195    def write(self, data):
3196        self.cache.append(data)
3197
3198    def flush(self):
3199        self._delegate.write(''.join(self.cache))
3200        self._cache = []
3201
3202class TestStdinBadfiledescriptor(unittest.TestCase):
3203
3204    def test_queue_in_process(self):
3205        queue = multiprocessing.Queue()
3206        proc = multiprocessing.Process(target=_test_process, args=(queue,))
3207        proc.start()
3208        proc.join()
3209
3210    def test_pool_in_process(self):
3211        p = multiprocessing.Process(target=pool_in_process)
3212        p.start()
3213        p.join()
3214
3215    def test_flushing(self):
3216        sio = io.StringIO()
3217        flike = _file_like(sio)
3218        flike.write('foo')
3219        proc = multiprocessing.Process(target=lambda: flike.flush())
3220        flike.flush()
3221        assert sio.getvalue() == 'foo'
3222
3223
3224class TestWait(unittest.TestCase):
3225
3226    @classmethod
3227    def _child_test_wait(cls, w, slow):
3228        for i in range(10):
3229            if slow:
3230                time.sleep(random.random()*0.1)
3231            w.send((i, os.getpid()))
3232        w.close()
3233
3234    def test_wait(self, slow=False):
3235        from multiprocess.connection import wait
3236        readers = []
3237        procs = []
3238        messages = []
3239
3240        for i in range(4):
3241            r, w = multiprocessing.Pipe(duplex=False)
3242            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
3243            p.daemon = True
3244            p.start()
3245            w.close()
3246            readers.append(r)
3247            procs.append(p)
3248            self.addCleanup(p.join)
3249
3250        while readers:
3251            for r in wait(readers):
3252                try:
3253                    msg = r.recv()
3254                except EOFError:
3255                    readers.remove(r)
3256                    r.close()
3257                else:
3258                    messages.append(msg)
3259
3260        messages.sort()
3261        expected = sorted((i, p.pid) for i in range(10) for p in procs)
3262        self.assertEqual(messages, expected)
3263
3264    @classmethod
3265    def _child_test_wait_socket(cls, address, slow):
3266        s = socket.socket()
3267        s.connect(address)
3268        for i in range(10):
3269            if slow:
3270                time.sleep(random.random()*0.1)
3271            s.sendall(('%s\n' % i).encode('ascii'))
3272        s.close()
3273
3274    def test_wait_socket(self, slow=False):
3275        from multiprocess.connection import wait
3276        l = socket.socket()
3277        l.bind((test.support.HOST, 0))
3278        l.listen()
3279        addr = l.getsockname()
3280        readers = []
3281        procs = []
3282        dic = {}
3283
3284        for i in range(4):
3285            p = multiprocessing.Process(target=self._child_test_wait_socket,
3286                                        args=(addr, slow))
3287            p.daemon = True
3288            p.start()
3289            procs.append(p)
3290            self.addCleanup(p.join)
3291
3292        for i in range(4):
3293            r, _ = l.accept()
3294            readers.append(r)
3295            dic[r] = []
3296        l.close()
3297
3298        while readers:
3299            for r in wait(readers):
3300                msg = r.recv(32)
3301                if not msg:
3302                    readers.remove(r)
3303                    r.close()
3304                else:
3305                    dic[r].append(msg)
3306
3307        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
3308        for v in dic.values():
3309            self.assertEqual(b''.join(v), expected)
3310
3311    def test_wait_slow(self):
3312        self.test_wait(True)
3313
3314    def test_wait_socket_slow(self):
3315        self.test_wait_socket(True)
3316
3317    def test_wait_timeout(self):
3318        from multiprocess.connection import wait
3319
3320        expected = 5
3321        a, b = multiprocessing.Pipe()
3322
3323        start = time.time()
3324        res = wait([a, b], expected)
3325        delta = time.time() - start
3326
3327        self.assertEqual(res, [])
3328        self.assertLess(delta, expected * 2)
3329        self.assertGreater(delta, expected * 0.5)
3330
3331        b.send(None)
3332
3333        start = time.time()
3334        res = wait([a, b], 20)
3335        delta = time.time() - start
3336
3337        self.assertEqual(res, [a])
3338        self.assertLess(delta, 0.4)
3339
3340    @classmethod
3341    def signal_and_sleep(cls, sem, period):
3342        sem.release()
3343        time.sleep(period)
3344
3345    def test_wait_integer(self):
3346        from multiprocess.connection import wait
3347
3348        expected = 3
3349        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
3350        sem = multiprocessing.Semaphore(0)
3351        a, b = multiprocessing.Pipe()
3352        p = multiprocessing.Process(target=self.signal_and_sleep,
3353                                    args=(sem, expected))
3354
3355        p.start()
3356        self.assertIsInstance(p.sentinel, int)
3357        self.assertTrue(sem.acquire(timeout=20))
3358
3359        start = time.time()
3360        res = wait([a, p.sentinel, b], expected + 20)
3361        delta = time.time() - start
3362
3363        self.assertEqual(res, [p.sentinel])
3364        self.assertLess(delta, expected + 2)
3365        self.assertGreater(delta, expected - 2)
3366
3367        a.send(None)
3368
3369        start = time.time()
3370        res = wait([a, p.sentinel, b], 20)
3371        delta = time.time() - start
3372
3373        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
3374        self.assertLess(delta, 0.4)
3375
3376        b.send(None)
3377
3378        start = time.time()
3379        res = wait([a, p.sentinel, b], 20)
3380        delta = time.time() - start
3381
3382        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
3383        self.assertLess(delta, 0.4)
3384
3385        p.terminate()
3386        p.join()
3387
3388    def test_neg_timeout(self):
3389        from multiprocess.connection import wait
3390        a, b = multiprocessing.Pipe()
3391        t = time.time()
3392        res = wait([a], timeout=-1)
3393        t = time.time() - t
3394        self.assertEqual(res, [])
3395        self.assertLess(t, 1)
3396        a.close()
3397        b.close()
3398
3399#
3400# Issue 14151: Test invalid family on invalid environment
3401#
3402
3403class TestInvalidFamily(unittest.TestCase):
3404
3405    @unittest.skipIf(WIN32, "skipped on Windows")
3406    def test_invalid_family(self):
3407        with self.assertRaises(ValueError):
3408            multiprocessing.connection.Listener(r'\\.\test')
3409
3410    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
3411    def test_invalid_family_win32(self):
3412        with self.assertRaises(ValueError):
3413            multiprocessing.connection.Listener('/var/test.pipe')
3414
3415#
3416# Issue 12098: check sys.flags of child matches that for parent
3417#
3418
3419class TestFlags(unittest.TestCase):
3420    @classmethod
3421    def run_in_grandchild(cls, conn):
3422        conn.send(tuple(sys.flags))
3423
3424    @classmethod
3425    def run_in_child(cls):
3426        import json
3427        r, w = multiprocessing.Pipe(duplex=False)
3428        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
3429        p.start()
3430        grandchild_flags = r.recv()
3431        p.join()
3432        r.close()
3433        w.close()
3434        flags = (tuple(sys.flags), grandchild_flags)
3435        print(json.dumps(flags))
3436
3437    def _test_flags(self):
3438        import json, subprocess
3439        # start child process using unusual flags
3440        prog = ('from multiprocess.tests import TestFlags; ' +
3441                'TestFlags.run_in_child()')
3442        data = subprocess.check_output(
3443            [sys.executable, '-E', '-S', '-O', '-c', prog])
3444        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
3445        self.assertEqual(child_flags, grandchild_flags)
3446
3447#
3448# Test interaction with socket timeouts - see Issue #6056
3449#
3450
3451class TestTimeouts(unittest.TestCase):
3452    @classmethod
3453    def _test_timeout(cls, child, address):
3454        time.sleep(1)
3455        child.send(123)
3456        child.close()
3457        conn = multiprocessing.connection.Client(address)
3458        conn.send(456)
3459        conn.close()
3460
3461    def test_timeout(self):
3462        old_timeout = socket.getdefaulttimeout()
3463        try:
3464            socket.setdefaulttimeout(0.1)
3465            parent, child = multiprocessing.Pipe(duplex=True)
3466            l = multiprocessing.connection.Listener(family='AF_INET')
3467            p = multiprocessing.Process(target=self._test_timeout,
3468                                        args=(child, l.address))
3469            p.start()
3470            child.close()
3471            self.assertEqual(parent.recv(), 123)
3472            parent.close()
3473            conn = l.accept()
3474            self.assertEqual(conn.recv(), 456)
3475            conn.close()
3476            l.close()
3477            p.join(10)
3478        finally:
3479            socket.setdefaulttimeout(old_timeout)
3480
3481#
3482# Test what happens with no "if __name__ == '__main__'"
3483#
3484
3485class TestNoForkBomb(unittest.TestCase):
3486    def _test_noforkbomb(self):
3487        sm = multiprocessing.get_start_method()
3488        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
3489        if sm != 'fork':
3490            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
3491            self.assertEqual(out, b'')
3492            self.assertIn(b'RuntimeError', err)
3493        else:
3494            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
3495            self.assertEqual(out.rstrip(), b'123')
3496            self.assertEqual(err, b'')
3497
3498#
3499# Issue #17555: ForkAwareThreadLock
3500#
3501
3502class TestForkAwareThreadLock(unittest.TestCase):
3503    # We recurisvely start processes.  Issue #17555 meant that the
3504    # after fork registry would get duplicate entries for the same
3505    # lock.  The size of the registry at generation n was ~2**n.
3506
3507    @classmethod
3508    def child(cls, n, conn):
3509        if n > 1:
3510            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
3511            p.start()
3512            conn.close()
3513            p.join(timeout=5)
3514        else:
3515            conn.send(len(util._afterfork_registry))
3516        conn.close()
3517
3518    def test_lock(self):
3519        r, w = multiprocessing.Pipe(False)
3520        l = util.ForkAwareThreadLock()
3521        old_size = len(util._afterfork_registry)
3522        p = multiprocessing.Process(target=self.child, args=(5, w))
3523        p.start()
3524        w.close()
3525        new_size = r.recv()
3526        p.join(timeout=5)
3527        self.assertLessEqual(new_size, old_size)
3528
3529#
3530# Check that non-forked child processes do not inherit unneeded fds/handles
3531#
3532
3533class TestCloseFds(unittest.TestCase):
3534
3535    def get_high_socket_fd(self):
3536        if WIN32:
3537            # The child process will not have any socket handles, so
3538            # calling socket.fromfd() should produce WSAENOTSOCK even
3539            # if there is a handle of the same number.
3540            return socket.socket().detach()
3541        else:
3542            # We want to produce a socket with an fd high enough that a
3543            # freshly created child process will not have any fds as high.
3544            fd = socket.socket().detach()
3545            to_close = []
3546            while fd < 50:
3547                to_close.append(fd)
3548                fd = os.dup(fd)
3549            for x in to_close:
3550                os.close(x)
3551            return fd
3552
3553    def close(self, fd):
3554        if WIN32:
3555            socket.socket(fileno=fd).close()
3556        else:
3557            os.close(fd)
3558
3559    @classmethod
3560    def _test_closefds(cls, conn, fd):
3561        try:
3562            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
3563        except Exception as e:
3564            conn.send(e)
3565        else:
3566            s.close()
3567            conn.send(None)
3568
3569    def test_closefd(self):
3570        if not HAS_REDUCTION:
3571            raise unittest.SkipTest('requires fd pickling')
3572
3573        reader, writer = multiprocessing.Pipe()
3574        fd = self.get_high_socket_fd()
3575        try:
3576            p = multiprocessing.Process(target=self._test_closefds,
3577                                        args=(writer, fd))
3578            p.start()
3579            writer.close()
3580            e = reader.recv()
3581            p.join(timeout=5)
3582        finally:
3583            self.close(fd)
3584            writer.close()
3585            reader.close()
3586
3587        if multiprocessing.get_start_method() == 'fork':
3588            self.assertIs(e, None)
3589        else:
3590            WSAENOTSOCK = 10038
3591            self.assertIsInstance(e, OSError)
3592            self.assertTrue(e.errno == errno.EBADF or
3593                            e.winerror == WSAENOTSOCK, e)
3594
3595#
3596# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
3597#
3598
3599class TestIgnoreEINTR(unittest.TestCase):
3600
3601    @classmethod
3602    def _test_ignore(cls, conn):
3603        def handler(signum, frame):
3604            pass
3605        signal.signal(signal.SIGUSR1, handler)
3606        conn.send('ready')
3607        x = conn.recv()
3608        conn.send(x)
3609        conn.send_bytes(b'x'*(1024*1024))   # sending 1 MB should block
3610
3611    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3612    def _test_ignore(self):
3613        conn, child_conn = multiprocessing.Pipe()
3614        try:
3615            p = multiprocessing.Process(target=self._test_ignore,
3616                                        args=(child_conn,))
3617            p.daemon = True
3618            p.start()
3619            child_conn.close()
3620            self.assertEqual(conn.recv(), 'ready')
3621            time.sleep(0.1)
3622            os.kill(p.pid, signal.SIGUSR1)
3623            time.sleep(0.1)
3624            conn.send(1234)
3625            self.assertEqual(conn.recv(), 1234)
3626            time.sleep(0.1)
3627            os.kill(p.pid, signal.SIGUSR1)
3628            self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024))
3629            time.sleep(0.1)
3630            p.join()
3631        finally:
3632            conn.close()
3633
3634    @classmethod
3635    def _test_ignore_listener(cls, conn):
3636        def handler(signum, frame):
3637            pass
3638        signal.signal(signal.SIGUSR1, handler)
3639        with multiprocessing.connection.Listener() as l:
3640            conn.send(l.address)
3641            a = l.accept()
3642            a.send('welcome')
3643
3644    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
3645    def test_ignore_listener(self):
3646        conn, child_conn = multiprocessing.Pipe()
3647        try:
3648            p = multiprocessing.Process(target=self._test_ignore_listener,
3649                                        args=(child_conn,))
3650            p.daemon = True
3651            p.start()
3652            child_conn.close()
3653            address = conn.recv()
3654            time.sleep(0.1)
3655            os.kill(p.pid, signal.SIGUSR1)
3656            time.sleep(0.1)
3657            client = multiprocessing.connection.Client(address)
3658            self.assertEqual(client.recv(), 'welcome')
3659            p.join()
3660        finally:
3661            conn.close()
3662
3663class TestStartMethod(unittest.TestCase):
3664    @classmethod
3665    def _check_context(cls, conn):
3666        conn.send(multiprocessing.get_start_method())
3667
3668    def check_context(self, ctx):
3669        r, w = ctx.Pipe(duplex=False)
3670        p = ctx.Process(target=self._check_context, args=(w,))
3671        p.start()
3672        w.close()
3673        child_method = r.recv()
3674        r.close()
3675        p.join()
3676        self.assertEqual(child_method, ctx.get_start_method())
3677
3678    def test_context(self):
3679        for method in ('fork', 'spawn', 'forkserver'):
3680            try:
3681                ctx = multiprocessing.get_context(method)
3682            except ValueError:
3683                continue
3684            self.assertEqual(ctx.get_start_method(), method)
3685            self.assertIs(ctx.get_context(), ctx)
3686            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
3687            self.assertRaises(ValueError, ctx.set_start_method, None)
3688            self.check_context(ctx)
3689
3690    def test_set_get(self):
3691        multiprocessing.set_forkserver_preload(PRELOAD)
3692        count = 0
3693        old_method = multiprocessing.get_start_method()
3694        try:
3695            for method in ('fork', 'spawn', 'forkserver'):
3696                try:
3697                    multiprocessing.set_start_method(method, force=True)
3698                except ValueError:
3699                    continue
3700                self.assertEqual(multiprocessing.get_start_method(), method)
3701                ctx = multiprocessing.get_context()
3702                self.assertEqual(ctx.get_start_method(), method)
3703                self.assertTrue(type(ctx).__name__.lower().startswith(method))
3704                self.assertTrue(
3705                    ctx.Process.__name__.lower().startswith(method))
3706                self.check_context(multiprocessing)
3707                count += 1
3708        finally:
3709            multiprocessing.set_start_method(old_method, force=True)
3710        self.assertGreaterEqual(count, 1)
3711
3712    def test_get_all(self):
3713        methods = multiprocessing.get_all_start_methods()
3714        if sys.platform == 'win32':
3715            self.assertEqual(methods, ['spawn'])
3716        else:
3717            self.assertTrue(methods == ['fork', 'spawn'] or
3718                            methods == ['fork', 'spawn', 'forkserver'])
3719
3720#
3721# Check that killing process does not leak named semaphores
3722#
3723
3724@unittest.skipIf(sys.platform == "win32",
3725                 "test semantics don't make sense on Windows")
3726class TestSemaphoreTracker(unittest.TestCase):
3727    def test_semaphore_tracker(self):
3728        import subprocess
3729        cmd = '''if 1:
3730            import multiprocess as mp, time, os
3731            mp.set_start_method("spawn")
3732            lock1 = mp.Lock()
3733            lock2 = mp.Lock()
3734            os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n")
3735            os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n")
3736            time.sleep(10)
3737        '''
3738        r, w = os.pipe()
3739        p = subprocess.Popen([sys.executable,
3740                             '-c', cmd % (w, w)],
3741                             pass_fds=[w],
3742                             stderr=subprocess.PIPE)
3743        os.close(w)
3744        with open(r, 'rb', closefd=True) as f:
3745            name1 = f.readline().rstrip().decode('ascii')
3746            name2 = f.readline().rstrip().decode('ascii')
3747        _multiprocessing.sem_unlink(name1)
3748        p.terminate()
3749        p.wait()
3750        time.sleep(2.0)
3751        with self.assertRaises(OSError) as ctx:
3752            _multiprocessing.sem_unlink(name2)
3753        # docs say it should be ENOENT, but OSX seems to give EINVAL
3754        self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL))
3755        err = p.stderr.read().decode('utf-8')
3756        p.stderr.close()
3757        expected = 'semaphore_tracker: There appear to be 2 leaked semaphores'
3758        self.assertRegex(err, expected)
3759        self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1)
3760
3761#
3762# Mixins
3763#
3764
3765class ProcessesMixin(object):
3766    TYPE = 'processes'
3767    Process = multiprocessing.Process
3768    connection = multiprocessing.connection
3769    current_process = staticmethod(multiprocessing.current_process)
3770    active_children = staticmethod(multiprocessing.active_children)
3771    Pool = staticmethod(multiprocessing.Pool)
3772    Pipe = staticmethod(multiprocessing.Pipe)
3773    Queue = staticmethod(multiprocessing.Queue)
3774    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
3775    Lock = staticmethod(multiprocessing.Lock)
3776    RLock = staticmethod(multiprocessing.RLock)
3777    Semaphore = staticmethod(multiprocessing.Semaphore)
3778    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
3779    Condition = staticmethod(multiprocessing.Condition)
3780    Event = staticmethod(multiprocessing.Event)
3781    Barrier = staticmethod(multiprocessing.Barrier)
3782    Value = staticmethod(multiprocessing.Value)
3783    Array = staticmethod(multiprocessing.Array)
3784    RawValue = staticmethod(multiprocessing.RawValue)
3785    RawArray = staticmethod(multiprocessing.RawArray)
3786
3787
3788class ManagerMixin(object):
3789    TYPE = 'manager'
3790    Process = multiprocessing.Process
3791    Queue = property(operator.attrgetter('manager.Queue'))
3792    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
3793    Lock = property(operator.attrgetter('manager.Lock'))
3794    RLock = property(operator.attrgetter('manager.RLock'))
3795    Semaphore = property(operator.attrgetter('manager.Semaphore'))
3796    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
3797    Condition = property(operator.attrgetter('manager.Condition'))
3798    Event = property(operator.attrgetter('manager.Event'))
3799    Barrier = property(operator.attrgetter('manager.Barrier'))
3800    Value = property(operator.attrgetter('manager.Value'))
3801    Array = property(operator.attrgetter('manager.Array'))
3802    list = property(operator.attrgetter('manager.list'))
3803    dict = property(operator.attrgetter('manager.dict'))
3804    Namespace = property(operator.attrgetter('manager.Namespace'))
3805
3806    @classmethod
3807    def Pool(cls, *args, **kwds):
3808        return cls.manager.Pool(*args, **kwds)
3809
3810    @classmethod
3811    def setUpClass(cls):
3812        cls.manager = multiprocessing.Manager()
3813
3814    @classmethod
3815    def tearDownClass(cls):
3816        # only the manager process should be returned by active_children()
3817        # but this can take a bit on slow machines, so wait a few seconds
3818        # if there are other children too (see #17395)
3819        t = 0.01
3820        while len(multiprocessing.active_children()) > 1 and t < 5:
3821            time.sleep(t)
3822            t *= 2
3823        gc.collect()                       # do garbage collection
3824        if cls.manager._number_of_objects() != 0:
3825            # This is not really an error since some tests do not
3826            # ensure that all processes which hold a reference to a
3827            # managed object have been joined.
3828            print('Shared objects which still exist at manager shutdown:')
3829            print(cls.manager._debug_info())
3830        cls.manager.shutdown()
3831        cls.manager.join()
3832        cls.manager = None
3833
3834
3835class ThreadsMixin(object):
3836    TYPE = 'threads'
3837    Process = multiprocessing.dummy.Process
3838    connection = multiprocessing.dummy.connection
3839    current_process = staticmethod(multiprocessing.dummy.current_process)
3840    active_children = staticmethod(multiprocessing.dummy.active_children)
3841    Pool = staticmethod(multiprocessing.Pool)
3842    Pipe = staticmethod(multiprocessing.dummy.Pipe)
3843    Queue = staticmethod(multiprocessing.dummy.Queue)
3844    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
3845    Lock = staticmethod(multiprocessing.dummy.Lock)
3846    RLock = staticmethod(multiprocessing.dummy.RLock)
3847    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
3848    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
3849    Condition = staticmethod(multiprocessing.dummy.Condition)
3850    Event = staticmethod(multiprocessing.dummy.Event)
3851    Barrier = staticmethod(multiprocessing.dummy.Barrier)
3852    Value = staticmethod(multiprocessing.dummy.Value)
3853    Array = staticmethod(multiprocessing.dummy.Array)
3854
3855#
3856# Functions used to create test cases from the base ones in this module
3857#
3858
3859def install_tests_in_module_dict(remote_globs, start_method):
3860    __module__ = remote_globs['__name__']
3861    local_globs = globals()
3862    ALL_TYPES = {'processes', 'threads', 'manager'}
3863
3864    for name, base in local_globs.items():
3865        if not isinstance(base, type):
3866            continue
3867        if issubclass(base, BaseTestCase):
3868            if base is BaseTestCase:
3869                continue
3870            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
3871            for type_ in base.ALLOWED_TYPES:
3872                newname = 'With' + type_.capitalize() + name[1:]
3873                Mixin = local_globs[type_.capitalize() + 'Mixin']
3874                class Temp(base, Mixin, unittest.TestCase):
3875                    pass
3876                Temp.__name__ = Temp.__qualname__ = newname
3877                Temp.__module__ = __module__
3878                remote_globs[newname] = Temp
3879        elif issubclass(base, unittest.TestCase):
3880            class Temp(base, object):
3881                pass
3882            Temp.__name__ = Temp.__qualname__ = name
3883            Temp.__module__ = __module__
3884            remote_globs[name] = Temp
3885
3886    dangling = [None, None]
3887    old_start_method = [None]
3888
3889    def setUpModule():
3890        multiprocessing.set_forkserver_preload(PRELOAD)
3891        multiprocessing.process._cleanup()
3892        dangling[0] = multiprocessing.process._dangling.copy()
3893        dangling[1] = threading._dangling.copy()
3894        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
3895        try:
3896            multiprocessing.set_start_method(start_method, force=True)
3897        except ValueError:
3898            raise unittest.SkipTest(start_method +
3899                                    ' start method not supported')
3900
3901        if sys.platform.startswith("linux"):
3902            try:
3903                lock = multiprocessing.RLock()
3904            except OSError:
3905                raise unittest.SkipTest("OSError raises on RLock creation, "
3906                                        "see issue 3111!")
3907        check_enough_semaphores()
3908        util.get_temp_dir()     # creates temp directory
3909        multiprocessing.get_logger().setLevel(LOG_LEVEL)
3910
3911    def tearDownModule():
3912        multiprocessing.set_start_method(old_start_method[0], force=True)
3913        # pause a bit so we don't get warning about dangling threads/processes
3914        time.sleep(0.5)
3915        multiprocessing.process._cleanup()
3916        gc.collect()
3917        tmp = set(multiprocessing.process._dangling) - set(dangling[0])
3918        if tmp:
3919            print('Dangling processes:', tmp, file=sys.stderr)
3920        del tmp
3921        tmp = set(threading._dangling) - set(dangling[1])
3922        if tmp:
3923            print('Dangling threads:', tmp, file=sys.stderr)
3924
3925    remote_globs['setUpModule'] = setUpModule
3926    remote_globs['tearDownModule'] = tearDownModule
3927