1#
2# Unit tests for the multiprocessing package
3#
4
5import unittest
6import unittest.mock
7import queue as pyqueue
8import time
9import io
10import itertools
11import sys
12import os
13import gc
14import errno
15import signal
16import array
17import socket
18import random
19import logging
20import subprocess
21import struct
22import operator
23import pickle
24import weakref
25import warnings
26import test.support
27import test.support.script_helper
28from test import support
29
30
31# Skip tests if _multiprocessing wasn't built.
32_multiprocessing = test.support.import_module('_multiprocessing')
33# Skip tests if sem_open implementation is broken.
34support.skip_if_broken_multiprocessing_synchronize()
35import threading
36
37import multiprocessing.connection
38import multiprocessing.dummy
39import multiprocessing.heap
40import multiprocessing.managers
41import multiprocessing.pool
42import multiprocessing.queues
43
44from multiprocessing import util
45
46try:
47    from multiprocessing import reduction
48    HAS_REDUCTION = reduction.HAVE_SEND_HANDLE
49except ImportError:
50    HAS_REDUCTION = False
51
52try:
53    from multiprocessing.sharedctypes import Value, copy
54    HAS_SHAREDCTYPES = True
55except ImportError:
56    HAS_SHAREDCTYPES = False
57
58try:
59    from multiprocessing import shared_memory
60    HAS_SHMEM = True
61except ImportError:
62    HAS_SHMEM = False
63
64try:
65    import msvcrt
66except ImportError:
67    msvcrt = None
68
69#
70#
71#
72
73# Timeout to wait until a process completes
74TIMEOUT = 60.0 # seconds
75
76def latin(s):
77    return s.encode('latin')
78
79
80def close_queue(queue):
81    if isinstance(queue, multiprocessing.queues.Queue):
82        queue.close()
83        queue.join_thread()
84
85
86def join_process(process):
87    # Since multiprocessing.Process has the same API than threading.Thread
88    # (join() and is_alive(), the support function can be reused
89    support.join_thread(process, timeout=TIMEOUT)
90
91
92if os.name == "posix":
93    from multiprocessing import resource_tracker
94
95    def _resource_unlink(name, rtype):
96        resource_tracker._CLEANUP_FUNCS[rtype](name)
97
98
99#
100# Constants
101#
102
103LOG_LEVEL = util.SUBWARNING
104#LOG_LEVEL = logging.DEBUG
105
106DELTA = 0.1
107CHECK_TIMINGS = False     # making true makes tests take a lot longer
108                          # and can sometimes cause some non-serious
109                          # failures because some calls block a bit
110                          # longer than expected
111if CHECK_TIMINGS:
112    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4
113else:
114    TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1
115
116HAVE_GETVALUE = not getattr(_multiprocessing,
117                            'HAVE_BROKEN_SEM_GETVALUE', False)
118
119WIN32 = (sys.platform == "win32")
120
121from multiprocessing.connection import wait
122
123def wait_for_handle(handle, timeout):
124    if timeout is not None and timeout < 0.0:
125        timeout = None
126    return wait([handle], timeout)
127
128try:
129    MAXFD = os.sysconf("SC_OPEN_MAX")
130except:
131    MAXFD = 256
132
133# To speed up tests when using the forkserver, we can preload these:
134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver']
135
136#
137# Some tests require ctypes
138#
139
140try:
141    from ctypes import Structure, c_int, c_double, c_longlong
142except ImportError:
143    Structure = object
144    c_int = c_double = c_longlong = None
145
146
147def check_enough_semaphores():
148    """Check that the system supports enough semaphores to run the test."""
149    # minimum number of semaphores available according to POSIX
150    nsems_min = 256
151    try:
152        nsems = os.sysconf("SC_SEM_NSEMS_MAX")
153    except (AttributeError, ValueError):
154        # sysconf not available or setting not available
155        return
156    if nsems == -1 or nsems >= nsems_min:
157        return
158    raise unittest.SkipTest("The OS doesn't support enough semaphores "
159                            "to run the test (required: %d)." % nsems_min)
160
161
162#
163# Creates a wrapper for a function which records the time it takes to finish
164#
165
166class TimingWrapper(object):
167
168    def __init__(self, func):
169        self.func = func
170        self.elapsed = None
171
172    def __call__(self, *args, **kwds):
173        t = time.monotonic()
174        try:
175            return self.func(*args, **kwds)
176        finally:
177            self.elapsed = time.monotonic() - t
178
179#
180# Base class for test cases
181#
182
183class BaseTestCase(object):
184
185    ALLOWED_TYPES = ('processes', 'manager', 'threads')
186
187    def assertTimingAlmostEqual(self, a, b):
188        if CHECK_TIMINGS:
189            self.assertAlmostEqual(a, b, 1)
190
191    def assertReturnsIfImplemented(self, value, func, *args):
192        try:
193            res = func(*args)
194        except NotImplementedError:
195            pass
196        else:
197            return self.assertEqual(value, res)
198
199    # For the sanity of Windows users, rather than crashing or freezing in
200    # multiple ways.
201    def __reduce__(self, *args):
202        raise NotImplementedError("shouldn't try to pickle a test case")
203
204    __reduce_ex__ = __reduce__
205
206#
207# Return the value of a semaphore
208#
209
210def get_value(self):
211    try:
212        return self.get_value()
213    except AttributeError:
214        try:
215            return self._Semaphore__value
216        except AttributeError:
217            try:
218                return self._value
219            except AttributeError:
220                raise NotImplementedError
221
222#
223# Testcases
224#
225
226class DummyCallable:
227    def __call__(self, q, c):
228        assert isinstance(c, DummyCallable)
229        q.put(5)
230
231
232class _TestProcess(BaseTestCase):
233
234    ALLOWED_TYPES = ('processes', 'threads')
235
236    def test_current(self):
237        if self.TYPE == 'threads':
238            self.skipTest('test not appropriate for {}'.format(self.TYPE))
239
240        current = self.current_process()
241        authkey = current.authkey
242
243        self.assertTrue(current.is_alive())
244        self.assertTrue(not current.daemon)
245        self.assertIsInstance(authkey, bytes)
246        self.assertTrue(len(authkey) > 0)
247        self.assertEqual(current.ident, os.getpid())
248        self.assertEqual(current.exitcode, None)
249
250    def test_daemon_argument(self):
251        if self.TYPE == "threads":
252            self.skipTest('test not appropriate for {}'.format(self.TYPE))
253
254        # By default uses the current process's daemon flag.
255        proc0 = self.Process(target=self._test)
256        self.assertEqual(proc0.daemon, self.current_process().daemon)
257        proc1 = self.Process(target=self._test, daemon=True)
258        self.assertTrue(proc1.daemon)
259        proc2 = self.Process(target=self._test, daemon=False)
260        self.assertFalse(proc2.daemon)
261
262    @classmethod
263    def _test(cls, q, *args, **kwds):
264        current = cls.current_process()
265        q.put(args)
266        q.put(kwds)
267        q.put(current.name)
268        if cls.TYPE != 'threads':
269            q.put(bytes(current.authkey))
270            q.put(current.pid)
271
272    def test_parent_process_attributes(self):
273        if self.TYPE == "threads":
274            self.skipTest('test not appropriate for {}'.format(self.TYPE))
275
276        self.assertIsNone(self.parent_process())
277
278        rconn, wconn = self.Pipe(duplex=False)
279        p = self.Process(target=self._test_send_parent_process, args=(wconn,))
280        p.start()
281        p.join()
282        parent_pid, parent_name = rconn.recv()
283        self.assertEqual(parent_pid, self.current_process().pid)
284        self.assertEqual(parent_pid, os.getpid())
285        self.assertEqual(parent_name, self.current_process().name)
286
287    @classmethod
288    def _test_send_parent_process(cls, wconn):
289        from multiprocessing.process import parent_process
290        wconn.send([parent_process().pid, parent_process().name])
291
292    def test_parent_process(self):
293        if self.TYPE == "threads":
294            self.skipTest('test not appropriate for {}'.format(self.TYPE))
295
296        # Launch a child process. Make it launch a grandchild process. Kill the
297        # child process and make sure that the grandchild notices the death of
298        # its parent (a.k.a the child process).
299        rconn, wconn = self.Pipe(duplex=False)
300        p = self.Process(
301            target=self._test_create_grandchild_process, args=(wconn, ))
302        p.start()
303
304        if not rconn.poll(timeout=60):
305            raise AssertionError("Could not communicate with child process")
306        parent_process_status = rconn.recv()
307        self.assertEqual(parent_process_status, "alive")
308
309        p.terminate()
310        p.join()
311
312        if not rconn.poll(timeout=60):
313            raise AssertionError("Could not communicate with child process")
314        parent_process_status = rconn.recv()
315        self.assertEqual(parent_process_status, "not alive")
316
317    @classmethod
318    def _test_create_grandchild_process(cls, wconn):
319        p = cls.Process(target=cls._test_report_parent_status, args=(wconn, ))
320        p.start()
321        time.sleep(300)
322
323    @classmethod
324    def _test_report_parent_status(cls, wconn):
325        from multiprocessing.process import parent_process
326        wconn.send("alive" if parent_process().is_alive() else "not alive")
327        parent_process().join(timeout=5)
328        wconn.send("alive" if parent_process().is_alive() else "not alive")
329
330    def test_process(self):
331        q = self.Queue(1)
332        e = self.Event()
333        args = (q, 1, 2)
334        kwargs = {'hello':23, 'bye':2.54}
335        name = 'SomeProcess'
336        p = self.Process(
337            target=self._test, args=args, kwargs=kwargs, name=name
338            )
339        p.daemon = True
340        current = self.current_process()
341
342        if self.TYPE != 'threads':
343            self.assertEqual(p.authkey, current.authkey)
344        self.assertEqual(p.is_alive(), False)
345        self.assertEqual(p.daemon, True)
346        self.assertNotIn(p, self.active_children())
347        self.assertTrue(type(self.active_children()) is list)
348        self.assertEqual(p.exitcode, None)
349
350        p.start()
351
352        self.assertEqual(p.exitcode, None)
353        self.assertEqual(p.is_alive(), True)
354        self.assertIn(p, self.active_children())
355
356        self.assertEqual(q.get(), args[1:])
357        self.assertEqual(q.get(), kwargs)
358        self.assertEqual(q.get(), p.name)
359        if self.TYPE != 'threads':
360            self.assertEqual(q.get(), current.authkey)
361            self.assertEqual(q.get(), p.pid)
362
363        p.join()
364
365        self.assertEqual(p.exitcode, 0)
366        self.assertEqual(p.is_alive(), False)
367        self.assertNotIn(p, self.active_children())
368        close_queue(q)
369
370    @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id")
371    def test_process_mainthread_native_id(self):
372        if self.TYPE == 'threads':
373            self.skipTest('test not appropriate for {}'.format(self.TYPE))
374
375        current_mainthread_native_id = threading.main_thread().native_id
376
377        q = self.Queue(1)
378        p = self.Process(target=self._test_process_mainthread_native_id, args=(q,))
379        p.start()
380
381        child_mainthread_native_id = q.get()
382        p.join()
383        close_queue(q)
384
385        self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id)
386
387    @classmethod
388    def _test_process_mainthread_native_id(cls, q):
389        mainthread_native_id = threading.main_thread().native_id
390        q.put(mainthread_native_id)
391
392    @classmethod
393    def _sleep_some(cls):
394        time.sleep(100)
395
396    @classmethod
397    def _test_sleep(cls, delay):
398        time.sleep(delay)
399
400    def _kill_process(self, meth):
401        if self.TYPE == 'threads':
402            self.skipTest('test not appropriate for {}'.format(self.TYPE))
403
404        p = self.Process(target=self._sleep_some)
405        p.daemon = True
406        p.start()
407
408        self.assertEqual(p.is_alive(), True)
409        self.assertIn(p, self.active_children())
410        self.assertEqual(p.exitcode, None)
411
412        join = TimingWrapper(p.join)
413
414        self.assertEqual(join(0), None)
415        self.assertTimingAlmostEqual(join.elapsed, 0.0)
416        self.assertEqual(p.is_alive(), True)
417
418        self.assertEqual(join(-1), None)
419        self.assertTimingAlmostEqual(join.elapsed, 0.0)
420        self.assertEqual(p.is_alive(), True)
421
422        # XXX maybe terminating too soon causes the problems on Gentoo...
423        time.sleep(1)
424
425        meth(p)
426
427        if hasattr(signal, 'alarm'):
428            # On the Gentoo buildbot waitpid() often seems to block forever.
429            # We use alarm() to interrupt it if it blocks for too long.
430            def handler(*args):
431                raise RuntimeError('join took too long: %s' % p)
432            old_handler = signal.signal(signal.SIGALRM, handler)
433            try:
434                signal.alarm(10)
435                self.assertEqual(join(), None)
436            finally:
437                signal.alarm(0)
438                signal.signal(signal.SIGALRM, old_handler)
439        else:
440            self.assertEqual(join(), None)
441
442        self.assertTimingAlmostEqual(join.elapsed, 0.0)
443
444        self.assertEqual(p.is_alive(), False)
445        self.assertNotIn(p, self.active_children())
446
447        p.join()
448
449        return p.exitcode
450
451    def test_terminate(self):
452        exitcode = self._kill_process(multiprocessing.Process.terminate)
453        if os.name != 'nt':
454            self.assertEqual(exitcode, -signal.SIGTERM)
455
456    def test_kill(self):
457        exitcode = self._kill_process(multiprocessing.Process.kill)
458        if os.name != 'nt':
459            self.assertEqual(exitcode, -signal.SIGKILL)
460
461    def test_cpu_count(self):
462        try:
463            cpus = multiprocessing.cpu_count()
464        except NotImplementedError:
465            cpus = 1
466        self.assertTrue(type(cpus) is int)
467        self.assertTrue(cpus >= 1)
468
469    def test_active_children(self):
470        self.assertEqual(type(self.active_children()), list)
471
472        p = self.Process(target=time.sleep, args=(DELTA,))
473        self.assertNotIn(p, self.active_children())
474
475        p.daemon = True
476        p.start()
477        self.assertIn(p, self.active_children())
478
479        p.join()
480        self.assertNotIn(p, self.active_children())
481
482    @classmethod
483    def _test_recursion(cls, wconn, id):
484        wconn.send(id)
485        if len(id) < 2:
486            for i in range(2):
487                p = cls.Process(
488                    target=cls._test_recursion, args=(wconn, id+[i])
489                    )
490                p.start()
491                p.join()
492
493    def test_recursion(self):
494        rconn, wconn = self.Pipe(duplex=False)
495        self._test_recursion(wconn, [])
496
497        time.sleep(DELTA)
498        result = []
499        while rconn.poll():
500            result.append(rconn.recv())
501
502        expected = [
503            [],
504              [0],
505                [0, 0],
506                [0, 1],
507              [1],
508                [1, 0],
509                [1, 1]
510            ]
511        self.assertEqual(result, expected)
512
513    @classmethod
514    def _test_sentinel(cls, event):
515        event.wait(10.0)
516
517    def test_sentinel(self):
518        if self.TYPE == "threads":
519            self.skipTest('test not appropriate for {}'.format(self.TYPE))
520        event = self.Event()
521        p = self.Process(target=self._test_sentinel, args=(event,))
522        with self.assertRaises(ValueError):
523            p.sentinel
524        p.start()
525        self.addCleanup(p.join)
526        sentinel = p.sentinel
527        self.assertIsInstance(sentinel, int)
528        self.assertFalse(wait_for_handle(sentinel, timeout=0.0))
529        event.set()
530        p.join()
531        self.assertTrue(wait_for_handle(sentinel, timeout=1))
532
533    @classmethod
534    def _test_close(cls, rc=0, q=None):
535        if q is not None:
536            q.get()
537        sys.exit(rc)
538
539    def test_close(self):
540        if self.TYPE == "threads":
541            self.skipTest('test not appropriate for {}'.format(self.TYPE))
542        q = self.Queue()
543        p = self.Process(target=self._test_close, kwargs={'q': q})
544        p.daemon = True
545        p.start()
546        self.assertEqual(p.is_alive(), True)
547        # Child is still alive, cannot close
548        with self.assertRaises(ValueError):
549            p.close()
550
551        q.put(None)
552        p.join()
553        self.assertEqual(p.is_alive(), False)
554        self.assertEqual(p.exitcode, 0)
555        p.close()
556        with self.assertRaises(ValueError):
557            p.is_alive()
558        with self.assertRaises(ValueError):
559            p.join()
560        with self.assertRaises(ValueError):
561            p.terminate()
562        p.close()
563
564        wr = weakref.ref(p)
565        del p
566        gc.collect()
567        self.assertIs(wr(), None)
568
569        close_queue(q)
570
571    def test_many_processes(self):
572        if self.TYPE == 'threads':
573            self.skipTest('test not appropriate for {}'.format(self.TYPE))
574
575        sm = multiprocessing.get_start_method()
576        N = 5 if sm == 'spawn' else 100
577
578        # Try to overwhelm the forkserver loop with events
579        procs = [self.Process(target=self._test_sleep, args=(0.01,))
580                 for i in range(N)]
581        for p in procs:
582            p.start()
583        for p in procs:
584            join_process(p)
585        for p in procs:
586            self.assertEqual(p.exitcode, 0)
587
588        procs = [self.Process(target=self._sleep_some)
589                 for i in range(N)]
590        for p in procs:
591            p.start()
592        time.sleep(0.001)  # let the children start...
593        for p in procs:
594            p.terminate()
595        for p in procs:
596            join_process(p)
597        if os.name != 'nt':
598            exitcodes = [-signal.SIGTERM]
599            if sys.platform == 'darwin':
600                # bpo-31510: On macOS, killing a freshly started process with
601                # SIGTERM sometimes kills the process with SIGKILL.
602                exitcodes.append(-signal.SIGKILL)
603            for p in procs:
604                self.assertIn(p.exitcode, exitcodes)
605
606    def test_lose_target_ref(self):
607        c = DummyCallable()
608        wr = weakref.ref(c)
609        q = self.Queue()
610        p = self.Process(target=c, args=(q, c))
611        del c
612        p.start()
613        p.join()
614        self.assertIs(wr(), None)
615        self.assertEqual(q.get(), 5)
616        close_queue(q)
617
618    @classmethod
619    def _test_child_fd_inflation(self, evt, q):
620        q.put(test.support.fd_count())
621        evt.wait()
622
623    def test_child_fd_inflation(self):
624        # Number of fds in child processes should not grow with the
625        # number of running children.
626        if self.TYPE == 'threads':
627            self.skipTest('test not appropriate for {}'.format(self.TYPE))
628
629        sm = multiprocessing.get_start_method()
630        if sm == 'fork':
631            # The fork method by design inherits all fds from the parent,
632            # trying to go against it is a lost battle
633            self.skipTest('test not appropriate for {}'.format(sm))
634
635        N = 5
636        evt = self.Event()
637        q = self.Queue()
638
639        procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q))
640                 for i in range(N)]
641        for p in procs:
642            p.start()
643
644        try:
645            fd_counts = [q.get() for i in range(N)]
646            self.assertEqual(len(set(fd_counts)), 1, fd_counts)
647
648        finally:
649            evt.set()
650            for p in procs:
651                p.join()
652            close_queue(q)
653
654    @classmethod
655    def _test_wait_for_threads(self, evt):
656        def func1():
657            time.sleep(0.5)
658            evt.set()
659
660        def func2():
661            time.sleep(20)
662            evt.clear()
663
664        threading.Thread(target=func1).start()
665        threading.Thread(target=func2, daemon=True).start()
666
667    def test_wait_for_threads(self):
668        # A child process should wait for non-daemonic threads to end
669        # before exiting
670        if self.TYPE == 'threads':
671            self.skipTest('test not appropriate for {}'.format(self.TYPE))
672
673        evt = self.Event()
674        proc = self.Process(target=self._test_wait_for_threads, args=(evt,))
675        proc.start()
676        proc.join()
677        self.assertTrue(evt.is_set())
678
679    @classmethod
680    def _test_error_on_stdio_flush(self, evt, break_std_streams={}):
681        for stream_name, action in break_std_streams.items():
682            if action == 'close':
683                stream = io.StringIO()
684                stream.close()
685            else:
686                assert action == 'remove'
687                stream = None
688            setattr(sys, stream_name, None)
689        evt.set()
690
691    def test_error_on_stdio_flush_1(self):
692        # Check that Process works with broken standard streams
693        streams = [io.StringIO(), None]
694        streams[0].close()
695        for stream_name in ('stdout', 'stderr'):
696            for stream in streams:
697                old_stream = getattr(sys, stream_name)
698                setattr(sys, stream_name, stream)
699                try:
700                    evt = self.Event()
701                    proc = self.Process(target=self._test_error_on_stdio_flush,
702                                        args=(evt,))
703                    proc.start()
704                    proc.join()
705                    self.assertTrue(evt.is_set())
706                    self.assertEqual(proc.exitcode, 0)
707                finally:
708                    setattr(sys, stream_name, old_stream)
709
710    def test_error_on_stdio_flush_2(self):
711        # Same as test_error_on_stdio_flush_1(), but standard streams are
712        # broken by the child process
713        for stream_name in ('stdout', 'stderr'):
714            for action in ('close', 'remove'):
715                old_stream = getattr(sys, stream_name)
716                try:
717                    evt = self.Event()
718                    proc = self.Process(target=self._test_error_on_stdio_flush,
719                                        args=(evt, {stream_name: action}))
720                    proc.start()
721                    proc.join()
722                    self.assertTrue(evt.is_set())
723                    self.assertEqual(proc.exitcode, 0)
724                finally:
725                    setattr(sys, stream_name, old_stream)
726
727    @classmethod
728    def _sleep_and_set_event(self, evt, delay=0.0):
729        time.sleep(delay)
730        evt.set()
731
732    def check_forkserver_death(self, signum):
733        # bpo-31308: if the forkserver process has died, we should still
734        # be able to create and run new Process instances (the forkserver
735        # is implicitly restarted).
736        if self.TYPE == 'threads':
737            self.skipTest('test not appropriate for {}'.format(self.TYPE))
738        sm = multiprocessing.get_start_method()
739        if sm != 'forkserver':
740            # The fork method by design inherits all fds from the parent,
741            # trying to go against it is a lost battle
742            self.skipTest('test not appropriate for {}'.format(sm))
743
744        from multiprocessing.forkserver import _forkserver
745        _forkserver.ensure_running()
746
747        # First process sleeps 500 ms
748        delay = 0.5
749
750        evt = self.Event()
751        proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay))
752        proc.start()
753
754        pid = _forkserver._forkserver_pid
755        os.kill(pid, signum)
756        # give time to the fork server to die and time to proc to complete
757        time.sleep(delay * 2.0)
758
759        evt2 = self.Event()
760        proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,))
761        proc2.start()
762        proc2.join()
763        self.assertTrue(evt2.is_set())
764        self.assertEqual(proc2.exitcode, 0)
765
766        proc.join()
767        self.assertTrue(evt.is_set())
768        self.assertIn(proc.exitcode, (0, 255))
769
770    def test_forkserver_sigint(self):
771        # Catchable signal
772        self.check_forkserver_death(signal.SIGINT)
773
774    def test_forkserver_sigkill(self):
775        # Uncatchable signal
776        if os.name != 'nt':
777            self.check_forkserver_death(signal.SIGKILL)
778
779
780#
781#
782#
783
784class _UpperCaser(multiprocessing.Process):
785
786    def __init__(self):
787        multiprocessing.Process.__init__(self)
788        self.child_conn, self.parent_conn = multiprocessing.Pipe()
789
790    def run(self):
791        self.parent_conn.close()
792        for s in iter(self.child_conn.recv, None):
793            self.child_conn.send(s.upper())
794        self.child_conn.close()
795
796    def submit(self, s):
797        assert type(s) is str
798        self.parent_conn.send(s)
799        return self.parent_conn.recv()
800
801    def stop(self):
802        self.parent_conn.send(None)
803        self.parent_conn.close()
804        self.child_conn.close()
805
806class _TestSubclassingProcess(BaseTestCase):
807
808    ALLOWED_TYPES = ('processes',)
809
810    def test_subclassing(self):
811        uppercaser = _UpperCaser()
812        uppercaser.daemon = True
813        uppercaser.start()
814        self.assertEqual(uppercaser.submit('hello'), 'HELLO')
815        self.assertEqual(uppercaser.submit('world'), 'WORLD')
816        uppercaser.stop()
817        uppercaser.join()
818
819    def test_stderr_flush(self):
820        # sys.stderr is flushed at process shutdown (issue #13812)
821        if self.TYPE == "threads":
822            self.skipTest('test not appropriate for {}'.format(self.TYPE))
823
824        testfn = test.support.TESTFN
825        self.addCleanup(test.support.unlink, testfn)
826        proc = self.Process(target=self._test_stderr_flush, args=(testfn,))
827        proc.start()
828        proc.join()
829        with open(testfn, 'r') as f:
830            err = f.read()
831            # The whole traceback was printed
832            self.assertIn("ZeroDivisionError", err)
833            self.assertIn("test_multiprocessing.py", err)
834            self.assertIn("1/0 # MARKER", err)
835
836    @classmethod
837    def _test_stderr_flush(cls, testfn):
838        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
839        sys.stderr = open(fd, 'w', closefd=False)
840        1/0 # MARKER
841
842
843    @classmethod
844    def _test_sys_exit(cls, reason, testfn):
845        fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL)
846        sys.stderr = open(fd, 'w', closefd=False)
847        sys.exit(reason)
848
849    def test_sys_exit(self):
850        # See Issue 13854
851        if self.TYPE == 'threads':
852            self.skipTest('test not appropriate for {}'.format(self.TYPE))
853
854        testfn = test.support.TESTFN
855        self.addCleanup(test.support.unlink, testfn)
856
857        for reason in (
858            [1, 2, 3],
859            'ignore this',
860        ):
861            p = self.Process(target=self._test_sys_exit, args=(reason, testfn))
862            p.daemon = True
863            p.start()
864            join_process(p)
865            self.assertEqual(p.exitcode, 1)
866
867            with open(testfn, 'r') as f:
868                content = f.read()
869            self.assertEqual(content.rstrip(), str(reason))
870
871            os.unlink(testfn)
872
873        for reason in (True, False, 8):
874            p = self.Process(target=sys.exit, args=(reason,))
875            p.daemon = True
876            p.start()
877            join_process(p)
878            self.assertEqual(p.exitcode, reason)
879
880#
881#
882#
883
884def queue_empty(q):
885    if hasattr(q, 'empty'):
886        return q.empty()
887    else:
888        return q.qsize() == 0
889
890def queue_full(q, maxsize):
891    if hasattr(q, 'full'):
892        return q.full()
893    else:
894        return q.qsize() == maxsize
895
896
897class _TestQueue(BaseTestCase):
898
899
900    @classmethod
901    def _test_put(cls, queue, child_can_start, parent_can_continue):
902        child_can_start.wait()
903        for i in range(6):
904            queue.get()
905        parent_can_continue.set()
906
907    def test_put(self):
908        MAXSIZE = 6
909        queue = self.Queue(maxsize=MAXSIZE)
910        child_can_start = self.Event()
911        parent_can_continue = self.Event()
912
913        proc = self.Process(
914            target=self._test_put,
915            args=(queue, child_can_start, parent_can_continue)
916            )
917        proc.daemon = True
918        proc.start()
919
920        self.assertEqual(queue_empty(queue), True)
921        self.assertEqual(queue_full(queue, MAXSIZE), False)
922
923        queue.put(1)
924        queue.put(2, True)
925        queue.put(3, True, None)
926        queue.put(4, False)
927        queue.put(5, False, None)
928        queue.put_nowait(6)
929
930        # the values may be in buffer but not yet in pipe so sleep a bit
931        time.sleep(DELTA)
932
933        self.assertEqual(queue_empty(queue), False)
934        self.assertEqual(queue_full(queue, MAXSIZE), True)
935
936        put = TimingWrapper(queue.put)
937        put_nowait = TimingWrapper(queue.put_nowait)
938
939        self.assertRaises(pyqueue.Full, put, 7, False)
940        self.assertTimingAlmostEqual(put.elapsed, 0)
941
942        self.assertRaises(pyqueue.Full, put, 7, False, None)
943        self.assertTimingAlmostEqual(put.elapsed, 0)
944
945        self.assertRaises(pyqueue.Full, put_nowait, 7)
946        self.assertTimingAlmostEqual(put_nowait.elapsed, 0)
947
948        self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1)
949        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1)
950
951        self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2)
952        self.assertTimingAlmostEqual(put.elapsed, 0)
953
954        self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3)
955        self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3)
956
957        child_can_start.set()
958        parent_can_continue.wait()
959
960        self.assertEqual(queue_empty(queue), True)
961        self.assertEqual(queue_full(queue, MAXSIZE), False)
962
963        proc.join()
964        close_queue(queue)
965
966    @classmethod
967    def _test_get(cls, queue, child_can_start, parent_can_continue):
968        child_can_start.wait()
969        #queue.put(1)
970        queue.put(2)
971        queue.put(3)
972        queue.put(4)
973        queue.put(5)
974        parent_can_continue.set()
975
976    def test_get(self):
977        queue = self.Queue()
978        child_can_start = self.Event()
979        parent_can_continue = self.Event()
980
981        proc = self.Process(
982            target=self._test_get,
983            args=(queue, child_can_start, parent_can_continue)
984            )
985        proc.daemon = True
986        proc.start()
987
988        self.assertEqual(queue_empty(queue), True)
989
990        child_can_start.set()
991        parent_can_continue.wait()
992
993        time.sleep(DELTA)
994        self.assertEqual(queue_empty(queue), False)
995
996        # Hangs unexpectedly, remove for now
997        #self.assertEqual(queue.get(), 1)
998        self.assertEqual(queue.get(True, None), 2)
999        self.assertEqual(queue.get(True), 3)
1000        self.assertEqual(queue.get(timeout=1), 4)
1001        self.assertEqual(queue.get_nowait(), 5)
1002
1003        self.assertEqual(queue_empty(queue), True)
1004
1005        get = TimingWrapper(queue.get)
1006        get_nowait = TimingWrapper(queue.get_nowait)
1007
1008        self.assertRaises(pyqueue.Empty, get, False)
1009        self.assertTimingAlmostEqual(get.elapsed, 0)
1010
1011        self.assertRaises(pyqueue.Empty, get, False, None)
1012        self.assertTimingAlmostEqual(get.elapsed, 0)
1013
1014        self.assertRaises(pyqueue.Empty, get_nowait)
1015        self.assertTimingAlmostEqual(get_nowait.elapsed, 0)
1016
1017        self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1)
1018        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
1019
1020        self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2)
1021        self.assertTimingAlmostEqual(get.elapsed, 0)
1022
1023        self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3)
1024        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3)
1025
1026        proc.join()
1027        close_queue(queue)
1028
1029    @classmethod
1030    def _test_fork(cls, queue):
1031        for i in range(10, 20):
1032            queue.put(i)
1033        # note that at this point the items may only be buffered, so the
1034        # process cannot shutdown until the feeder thread has finished
1035        # pushing items onto the pipe.
1036
1037    def test_fork(self):
1038        # Old versions of Queue would fail to create a new feeder
1039        # thread for a forked process if the original process had its
1040        # own feeder thread.  This test checks that this no longer
1041        # happens.
1042
1043        queue = self.Queue()
1044
1045        # put items on queue so that main process starts a feeder thread
1046        for i in range(10):
1047            queue.put(i)
1048
1049        # wait to make sure thread starts before we fork a new process
1050        time.sleep(DELTA)
1051
1052        # fork process
1053        p = self.Process(target=self._test_fork, args=(queue,))
1054        p.daemon = True
1055        p.start()
1056
1057        # check that all expected items are in the queue
1058        for i in range(20):
1059            self.assertEqual(queue.get(), i)
1060        self.assertRaises(pyqueue.Empty, queue.get, False)
1061
1062        p.join()
1063        close_queue(queue)
1064
1065    def test_qsize(self):
1066        q = self.Queue()
1067        try:
1068            self.assertEqual(q.qsize(), 0)
1069        except NotImplementedError:
1070            self.skipTest('qsize method not implemented')
1071        q.put(1)
1072        self.assertEqual(q.qsize(), 1)
1073        q.put(5)
1074        self.assertEqual(q.qsize(), 2)
1075        q.get()
1076        self.assertEqual(q.qsize(), 1)
1077        q.get()
1078        self.assertEqual(q.qsize(), 0)
1079        close_queue(q)
1080
1081    @classmethod
1082    def _test_task_done(cls, q):
1083        for obj in iter(q.get, None):
1084            time.sleep(DELTA)
1085            q.task_done()
1086
1087    def test_task_done(self):
1088        queue = self.JoinableQueue()
1089
1090        workers = [self.Process(target=self._test_task_done, args=(queue,))
1091                   for i in range(4)]
1092
1093        for p in workers:
1094            p.daemon = True
1095            p.start()
1096
1097        for i in range(10):
1098            queue.put(i)
1099
1100        queue.join()
1101
1102        for p in workers:
1103            queue.put(None)
1104
1105        for p in workers:
1106            p.join()
1107        close_queue(queue)
1108
1109    def test_no_import_lock_contention(self):
1110        with test.support.temp_cwd():
1111            module_name = 'imported_by_an_imported_module'
1112            with open(module_name + '.py', 'w') as f:
1113                f.write("""if 1:
1114                    import multiprocessing
1115
1116                    q = multiprocessing.Queue()
1117                    q.put('knock knock')
1118                    q.get(timeout=3)
1119                    q.close()
1120                    del q
1121                """)
1122
1123            with test.support.DirsOnSysPath(os.getcwd()):
1124                try:
1125                    __import__(module_name)
1126                except pyqueue.Empty:
1127                    self.fail("Probable regression on import lock contention;"
1128                              " see Issue #22853")
1129
1130    def test_timeout(self):
1131        q = multiprocessing.Queue()
1132        start = time.monotonic()
1133        self.assertRaises(pyqueue.Empty, q.get, True, 0.200)
1134        delta = time.monotonic() - start
1135        # bpo-30317: Tolerate a delta of 100 ms because of the bad clock
1136        # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once
1137        # failed because the delta was only 135.8 ms.
1138        self.assertGreaterEqual(delta, 0.100)
1139        close_queue(q)
1140
1141    def test_queue_feeder_donot_stop_onexc(self):
1142        # bpo-30414: verify feeder handles exceptions correctly
1143        if self.TYPE != 'processes':
1144            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1145
1146        class NotSerializable(object):
1147            def __reduce__(self):
1148                raise AttributeError
1149        with test.support.captured_stderr():
1150            q = self.Queue()
1151            q.put(NotSerializable())
1152            q.put(True)
1153            self.assertTrue(q.get(timeout=TIMEOUT))
1154            close_queue(q)
1155
1156        with test.support.captured_stderr():
1157            # bpo-33078: verify that the queue size is correctly handled
1158            # on errors.
1159            q = self.Queue(maxsize=1)
1160            q.put(NotSerializable())
1161            q.put(True)
1162            try:
1163                self.assertEqual(q.qsize(), 1)
1164            except NotImplementedError:
1165                # qsize is not available on all platform as it
1166                # relies on sem_getvalue
1167                pass
1168            # bpo-30595: use a timeout of 1 second for slow buildbots
1169            self.assertTrue(q.get(timeout=1.0))
1170            # Check that the size of the queue is correct
1171            self.assertTrue(q.empty())
1172            close_queue(q)
1173
1174    def test_queue_feeder_on_queue_feeder_error(self):
1175        # bpo-30006: verify feeder handles exceptions using the
1176        # _on_queue_feeder_error hook.
1177        if self.TYPE != 'processes':
1178            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1179
1180        class NotSerializable(object):
1181            """Mock unserializable object"""
1182            def __init__(self):
1183                self.reduce_was_called = False
1184                self.on_queue_feeder_error_was_called = False
1185
1186            def __reduce__(self):
1187                self.reduce_was_called = True
1188                raise AttributeError
1189
1190        class SafeQueue(multiprocessing.queues.Queue):
1191            """Queue with overloaded _on_queue_feeder_error hook"""
1192            @staticmethod
1193            def _on_queue_feeder_error(e, obj):
1194                if (isinstance(e, AttributeError) and
1195                        isinstance(obj, NotSerializable)):
1196                    obj.on_queue_feeder_error_was_called = True
1197
1198        not_serializable_obj = NotSerializable()
1199        # The captured_stderr reduces the noise in the test report
1200        with test.support.captured_stderr():
1201            q = SafeQueue(ctx=multiprocessing.get_context())
1202            q.put(not_serializable_obj)
1203
1204            # Verify that q is still functioning correctly
1205            q.put(True)
1206            self.assertTrue(q.get(timeout=1.0))
1207
1208        # Assert that the serialization and the hook have been called correctly
1209        self.assertTrue(not_serializable_obj.reduce_was_called)
1210        self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called)
1211
1212    def test_closed_queue_put_get_exceptions(self):
1213        for q in multiprocessing.Queue(), multiprocessing.JoinableQueue():
1214            q.close()
1215            with self.assertRaisesRegex(ValueError, 'is closed'):
1216                q.put('foo')
1217            with self.assertRaisesRegex(ValueError, 'is closed'):
1218                q.get()
1219#
1220#
1221#
1222
1223class _TestLock(BaseTestCase):
1224
1225    def test_lock(self):
1226        lock = self.Lock()
1227        self.assertEqual(lock.acquire(), True)
1228        self.assertEqual(lock.acquire(False), False)
1229        self.assertEqual(lock.release(), None)
1230        self.assertRaises((ValueError, threading.ThreadError), lock.release)
1231
1232    def test_rlock(self):
1233        lock = self.RLock()
1234        self.assertEqual(lock.acquire(), True)
1235        self.assertEqual(lock.acquire(), True)
1236        self.assertEqual(lock.acquire(), True)
1237        self.assertEqual(lock.release(), None)
1238        self.assertEqual(lock.release(), None)
1239        self.assertEqual(lock.release(), None)
1240        self.assertRaises((AssertionError, RuntimeError), lock.release)
1241
1242    def test_lock_context(self):
1243        with self.Lock():
1244            pass
1245
1246
1247class _TestSemaphore(BaseTestCase):
1248
1249    def _test_semaphore(self, sem):
1250        self.assertReturnsIfImplemented(2, get_value, sem)
1251        self.assertEqual(sem.acquire(), True)
1252        self.assertReturnsIfImplemented(1, get_value, sem)
1253        self.assertEqual(sem.acquire(), True)
1254        self.assertReturnsIfImplemented(0, get_value, sem)
1255        self.assertEqual(sem.acquire(False), False)
1256        self.assertReturnsIfImplemented(0, get_value, sem)
1257        self.assertEqual(sem.release(), None)
1258        self.assertReturnsIfImplemented(1, get_value, sem)
1259        self.assertEqual(sem.release(), None)
1260        self.assertReturnsIfImplemented(2, get_value, sem)
1261
1262    def test_semaphore(self):
1263        sem = self.Semaphore(2)
1264        self._test_semaphore(sem)
1265        self.assertEqual(sem.release(), None)
1266        self.assertReturnsIfImplemented(3, get_value, sem)
1267        self.assertEqual(sem.release(), None)
1268        self.assertReturnsIfImplemented(4, get_value, sem)
1269
1270    def test_bounded_semaphore(self):
1271        sem = self.BoundedSemaphore(2)
1272        self._test_semaphore(sem)
1273        # Currently fails on OS/X
1274        #if HAVE_GETVALUE:
1275        #    self.assertRaises(ValueError, sem.release)
1276        #    self.assertReturnsIfImplemented(2, get_value, sem)
1277
1278    def test_timeout(self):
1279        if self.TYPE != 'processes':
1280            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1281
1282        sem = self.Semaphore(0)
1283        acquire = TimingWrapper(sem.acquire)
1284
1285        self.assertEqual(acquire(False), False)
1286        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1287
1288        self.assertEqual(acquire(False, None), False)
1289        self.assertTimingAlmostEqual(acquire.elapsed, 0.0)
1290
1291        self.assertEqual(acquire(False, TIMEOUT1), False)
1292        self.assertTimingAlmostEqual(acquire.elapsed, 0)
1293
1294        self.assertEqual(acquire(True, TIMEOUT2), False)
1295        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2)
1296
1297        self.assertEqual(acquire(timeout=TIMEOUT3), False)
1298        self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3)
1299
1300
1301class _TestCondition(BaseTestCase):
1302
1303    @classmethod
1304    def f(cls, cond, sleeping, woken, timeout=None):
1305        cond.acquire()
1306        sleeping.release()
1307        cond.wait(timeout)
1308        woken.release()
1309        cond.release()
1310
1311    def assertReachesEventually(self, func, value):
1312        for i in range(10):
1313            try:
1314                if func() == value:
1315                    break
1316            except NotImplementedError:
1317                break
1318            time.sleep(DELTA)
1319        time.sleep(DELTA)
1320        self.assertReturnsIfImplemented(value, func)
1321
1322    def check_invariant(self, cond):
1323        # this is only supposed to succeed when there are no sleepers
1324        if self.TYPE == 'processes':
1325            try:
1326                sleepers = (cond._sleeping_count.get_value() -
1327                            cond._woken_count.get_value())
1328                self.assertEqual(sleepers, 0)
1329                self.assertEqual(cond._wait_semaphore.get_value(), 0)
1330            except NotImplementedError:
1331                pass
1332
1333    def test_notify(self):
1334        cond = self.Condition()
1335        sleeping = self.Semaphore(0)
1336        woken = self.Semaphore(0)
1337
1338        p = self.Process(target=self.f, args=(cond, sleeping, woken))
1339        p.daemon = True
1340        p.start()
1341        self.addCleanup(p.join)
1342
1343        p = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1344        p.daemon = True
1345        p.start()
1346        self.addCleanup(p.join)
1347
1348        # wait for both children to start sleeping
1349        sleeping.acquire()
1350        sleeping.acquire()
1351
1352        # check no process/thread has woken up
1353        time.sleep(DELTA)
1354        self.assertReturnsIfImplemented(0, get_value, woken)
1355
1356        # wake up one process/thread
1357        cond.acquire()
1358        cond.notify()
1359        cond.release()
1360
1361        # check one process/thread has woken up
1362        time.sleep(DELTA)
1363        self.assertReturnsIfImplemented(1, get_value, woken)
1364
1365        # wake up another
1366        cond.acquire()
1367        cond.notify()
1368        cond.release()
1369
1370        # check other has woken up
1371        time.sleep(DELTA)
1372        self.assertReturnsIfImplemented(2, get_value, woken)
1373
1374        # check state is not mucked up
1375        self.check_invariant(cond)
1376        p.join()
1377
1378    def test_notify_all(self):
1379        cond = self.Condition()
1380        sleeping = self.Semaphore(0)
1381        woken = self.Semaphore(0)
1382
1383        # start some threads/processes which will timeout
1384        for i in range(3):
1385            p = self.Process(target=self.f,
1386                             args=(cond, sleeping, woken, TIMEOUT1))
1387            p.daemon = True
1388            p.start()
1389            self.addCleanup(p.join)
1390
1391            t = threading.Thread(target=self.f,
1392                                 args=(cond, sleeping, woken, TIMEOUT1))
1393            t.daemon = True
1394            t.start()
1395            self.addCleanup(t.join)
1396
1397        # wait for them all to sleep
1398        for i in range(6):
1399            sleeping.acquire()
1400
1401        # check they have all timed out
1402        for i in range(6):
1403            woken.acquire()
1404        self.assertReturnsIfImplemented(0, get_value, woken)
1405
1406        # check state is not mucked up
1407        self.check_invariant(cond)
1408
1409        # start some more threads/processes
1410        for i in range(3):
1411            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1412            p.daemon = True
1413            p.start()
1414            self.addCleanup(p.join)
1415
1416            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1417            t.daemon = True
1418            t.start()
1419            self.addCleanup(t.join)
1420
1421        # wait for them to all sleep
1422        for i in range(6):
1423            sleeping.acquire()
1424
1425        # check no process/thread has woken up
1426        time.sleep(DELTA)
1427        self.assertReturnsIfImplemented(0, get_value, woken)
1428
1429        # wake them all up
1430        cond.acquire()
1431        cond.notify_all()
1432        cond.release()
1433
1434        # check they have all woken
1435        self.assertReachesEventually(lambda: get_value(woken), 6)
1436
1437        # check state is not mucked up
1438        self.check_invariant(cond)
1439
1440    def test_notify_n(self):
1441        cond = self.Condition()
1442        sleeping = self.Semaphore(0)
1443        woken = self.Semaphore(0)
1444
1445        # start some threads/processes
1446        for i in range(3):
1447            p = self.Process(target=self.f, args=(cond, sleeping, woken))
1448            p.daemon = True
1449            p.start()
1450            self.addCleanup(p.join)
1451
1452            t = threading.Thread(target=self.f, args=(cond, sleeping, woken))
1453            t.daemon = True
1454            t.start()
1455            self.addCleanup(t.join)
1456
1457        # wait for them to all sleep
1458        for i in range(6):
1459            sleeping.acquire()
1460
1461        # check no process/thread has woken up
1462        time.sleep(DELTA)
1463        self.assertReturnsIfImplemented(0, get_value, woken)
1464
1465        # wake some of them up
1466        cond.acquire()
1467        cond.notify(n=2)
1468        cond.release()
1469
1470        # check 2 have woken
1471        self.assertReachesEventually(lambda: get_value(woken), 2)
1472
1473        # wake the rest of them
1474        cond.acquire()
1475        cond.notify(n=4)
1476        cond.release()
1477
1478        self.assertReachesEventually(lambda: get_value(woken), 6)
1479
1480        # doesn't do anything more
1481        cond.acquire()
1482        cond.notify(n=3)
1483        cond.release()
1484
1485        self.assertReturnsIfImplemented(6, get_value, woken)
1486
1487        # check state is not mucked up
1488        self.check_invariant(cond)
1489
1490    def test_timeout(self):
1491        cond = self.Condition()
1492        wait = TimingWrapper(cond.wait)
1493        cond.acquire()
1494        res = wait(TIMEOUT1)
1495        cond.release()
1496        self.assertEqual(res, False)
1497        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1498
1499    @classmethod
1500    def _test_waitfor_f(cls, cond, state):
1501        with cond:
1502            state.value = 0
1503            cond.notify()
1504            result = cond.wait_for(lambda : state.value==4)
1505            if not result or state.value != 4:
1506                sys.exit(1)
1507
1508    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1509    def test_waitfor(self):
1510        # based on test in test/lock_tests.py
1511        cond = self.Condition()
1512        state = self.Value('i', -1)
1513
1514        p = self.Process(target=self._test_waitfor_f, args=(cond, state))
1515        p.daemon = True
1516        p.start()
1517
1518        with cond:
1519            result = cond.wait_for(lambda : state.value==0)
1520            self.assertTrue(result)
1521            self.assertEqual(state.value, 0)
1522
1523        for i in range(4):
1524            time.sleep(0.01)
1525            with cond:
1526                state.value += 1
1527                cond.notify()
1528
1529        join_process(p)
1530        self.assertEqual(p.exitcode, 0)
1531
1532    @classmethod
1533    def _test_waitfor_timeout_f(cls, cond, state, success, sem):
1534        sem.release()
1535        with cond:
1536            expected = 0.1
1537            dt = time.monotonic()
1538            result = cond.wait_for(lambda : state.value==4, timeout=expected)
1539            dt = time.monotonic() - dt
1540            # borrow logic in assertTimeout() from test/lock_tests.py
1541            if not result and expected * 0.6 < dt < expected * 10.0:
1542                success.value = True
1543
1544    @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes')
1545    def test_waitfor_timeout(self):
1546        # based on test in test/lock_tests.py
1547        cond = self.Condition()
1548        state = self.Value('i', 0)
1549        success = self.Value('i', False)
1550        sem = self.Semaphore(0)
1551
1552        p = self.Process(target=self._test_waitfor_timeout_f,
1553                         args=(cond, state, success, sem))
1554        p.daemon = True
1555        p.start()
1556        self.assertTrue(sem.acquire(timeout=TIMEOUT))
1557
1558        # Only increment 3 times, so state == 4 is never reached.
1559        for i in range(3):
1560            time.sleep(0.01)
1561            with cond:
1562                state.value += 1
1563                cond.notify()
1564
1565        join_process(p)
1566        self.assertTrue(success.value)
1567
1568    @classmethod
1569    def _test_wait_result(cls, c, pid):
1570        with c:
1571            c.notify()
1572        time.sleep(1)
1573        if pid is not None:
1574            os.kill(pid, signal.SIGINT)
1575
1576    def test_wait_result(self):
1577        if isinstance(self, ProcessesMixin) and sys.platform != 'win32':
1578            pid = os.getpid()
1579        else:
1580            pid = None
1581
1582        c = self.Condition()
1583        with c:
1584            self.assertFalse(c.wait(0))
1585            self.assertFalse(c.wait(0.1))
1586
1587            p = self.Process(target=self._test_wait_result, args=(c, pid))
1588            p.start()
1589
1590            self.assertTrue(c.wait(60))
1591            if pid is not None:
1592                self.assertRaises(KeyboardInterrupt, c.wait, 60)
1593
1594            p.join()
1595
1596
1597class _TestEvent(BaseTestCase):
1598
1599    @classmethod
1600    def _test_event(cls, event):
1601        time.sleep(TIMEOUT2)
1602        event.set()
1603
1604    def test_event(self):
1605        event = self.Event()
1606        wait = TimingWrapper(event.wait)
1607
1608        # Removed temporarily, due to API shear, this does not
1609        # work with threading._Event objects. is_set == isSet
1610        self.assertEqual(event.is_set(), False)
1611
1612        # Removed, threading.Event.wait() will return the value of the __flag
1613        # instead of None. API Shear with the semaphore backed mp.Event
1614        self.assertEqual(wait(0.0), False)
1615        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1616        self.assertEqual(wait(TIMEOUT1), False)
1617        self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1)
1618
1619        event.set()
1620
1621        # See note above on the API differences
1622        self.assertEqual(event.is_set(), True)
1623        self.assertEqual(wait(), True)
1624        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1625        self.assertEqual(wait(TIMEOUT1), True)
1626        self.assertTimingAlmostEqual(wait.elapsed, 0.0)
1627        # self.assertEqual(event.is_set(), True)
1628
1629        event.clear()
1630
1631        #self.assertEqual(event.is_set(), False)
1632
1633        p = self.Process(target=self._test_event, args=(event,))
1634        p.daemon = True
1635        p.start()
1636        self.assertEqual(wait(), True)
1637        p.join()
1638
1639#
1640# Tests for Barrier - adapted from tests in test/lock_tests.py
1641#
1642
1643# Many of the tests for threading.Barrier use a list as an atomic
1644# counter: a value is appended to increment the counter, and the
1645# length of the list gives the value.  We use the class DummyList
1646# for the same purpose.
1647
1648class _DummyList(object):
1649
1650    def __init__(self):
1651        wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i'))
1652        lock = multiprocessing.Lock()
1653        self.__setstate__((wrapper, lock))
1654        self._lengthbuf[0] = 0
1655
1656    def __setstate__(self, state):
1657        (self._wrapper, self._lock) = state
1658        self._lengthbuf = self._wrapper.create_memoryview().cast('i')
1659
1660    def __getstate__(self):
1661        return (self._wrapper, self._lock)
1662
1663    def append(self, _):
1664        with self._lock:
1665            self._lengthbuf[0] += 1
1666
1667    def __len__(self):
1668        with self._lock:
1669            return self._lengthbuf[0]
1670
1671def _wait():
1672    # A crude wait/yield function not relying on synchronization primitives.
1673    time.sleep(0.01)
1674
1675
1676class Bunch(object):
1677    """
1678    A bunch of threads.
1679    """
1680    def __init__(self, namespace, f, args, n, wait_before_exit=False):
1681        """
1682        Construct a bunch of `n` threads running the same function `f`.
1683        If `wait_before_exit` is True, the threads won't terminate until
1684        do_finish() is called.
1685        """
1686        self.f = f
1687        self.args = args
1688        self.n = n
1689        self.started = namespace.DummyList()
1690        self.finished = namespace.DummyList()
1691        self._can_exit = namespace.Event()
1692        if not wait_before_exit:
1693            self._can_exit.set()
1694
1695        threads = []
1696        for i in range(n):
1697            p = namespace.Process(target=self.task)
1698            p.daemon = True
1699            p.start()
1700            threads.append(p)
1701
1702        def finalize(threads):
1703            for p in threads:
1704                p.join()
1705
1706        self._finalizer = weakref.finalize(self, finalize, threads)
1707
1708    def task(self):
1709        pid = os.getpid()
1710        self.started.append(pid)
1711        try:
1712            self.f(*self.args)
1713        finally:
1714            self.finished.append(pid)
1715            self._can_exit.wait(30)
1716            assert self._can_exit.is_set()
1717
1718    def wait_for_started(self):
1719        while len(self.started) < self.n:
1720            _wait()
1721
1722    def wait_for_finished(self):
1723        while len(self.finished) < self.n:
1724            _wait()
1725
1726    def do_finish(self):
1727        self._can_exit.set()
1728
1729    def close(self):
1730        self._finalizer()
1731
1732
1733class AppendTrue(object):
1734    def __init__(self, obj):
1735        self.obj = obj
1736    def __call__(self):
1737        self.obj.append(True)
1738
1739
1740class _TestBarrier(BaseTestCase):
1741    """
1742    Tests for Barrier objects.
1743    """
1744    N = 5
1745    defaultTimeout = 30.0  # XXX Slow Windows buildbots need generous timeout
1746
1747    def setUp(self):
1748        self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout)
1749
1750    def tearDown(self):
1751        self.barrier.abort()
1752        self.barrier = None
1753
1754    def DummyList(self):
1755        if self.TYPE == 'threads':
1756            return []
1757        elif self.TYPE == 'manager':
1758            return self.manager.list()
1759        else:
1760            return _DummyList()
1761
1762    def run_threads(self, f, args):
1763        b = Bunch(self, f, args, self.N-1)
1764        try:
1765            f(*args)
1766            b.wait_for_finished()
1767        finally:
1768            b.close()
1769
1770    @classmethod
1771    def multipass(cls, barrier, results, n):
1772        m = barrier.parties
1773        assert m == cls.N
1774        for i in range(n):
1775            results[0].append(True)
1776            assert len(results[1]) == i * m
1777            barrier.wait()
1778            results[1].append(True)
1779            assert len(results[0]) == (i + 1) * m
1780            barrier.wait()
1781        try:
1782            assert barrier.n_waiting == 0
1783        except NotImplementedError:
1784            pass
1785        assert not barrier.broken
1786
1787    def test_barrier(self, passes=1):
1788        """
1789        Test that a barrier is passed in lockstep
1790        """
1791        results = [self.DummyList(), self.DummyList()]
1792        self.run_threads(self.multipass, (self.barrier, results, passes))
1793
1794    def test_barrier_10(self):
1795        """
1796        Test that a barrier works for 10 consecutive runs
1797        """
1798        return self.test_barrier(10)
1799
1800    @classmethod
1801    def _test_wait_return_f(cls, barrier, queue):
1802        res = barrier.wait()
1803        queue.put(res)
1804
1805    def test_wait_return(self):
1806        """
1807        test the return value from barrier.wait
1808        """
1809        queue = self.Queue()
1810        self.run_threads(self._test_wait_return_f, (self.barrier, queue))
1811        results = [queue.get() for i in range(self.N)]
1812        self.assertEqual(results.count(0), 1)
1813        close_queue(queue)
1814
1815    @classmethod
1816    def _test_action_f(cls, barrier, results):
1817        barrier.wait()
1818        if len(results) != 1:
1819            raise RuntimeError
1820
1821    def test_action(self):
1822        """
1823        Test the 'action' callback
1824        """
1825        results = self.DummyList()
1826        barrier = self.Barrier(self.N, action=AppendTrue(results))
1827        self.run_threads(self._test_action_f, (barrier, results))
1828        self.assertEqual(len(results), 1)
1829
1830    @classmethod
1831    def _test_abort_f(cls, barrier, results1, results2):
1832        try:
1833            i = barrier.wait()
1834            if i == cls.N//2:
1835                raise RuntimeError
1836            barrier.wait()
1837            results1.append(True)
1838        except threading.BrokenBarrierError:
1839            results2.append(True)
1840        except RuntimeError:
1841            barrier.abort()
1842
1843    def test_abort(self):
1844        """
1845        Test that an abort will put the barrier in a broken state
1846        """
1847        results1 = self.DummyList()
1848        results2 = self.DummyList()
1849        self.run_threads(self._test_abort_f,
1850                         (self.barrier, results1, results2))
1851        self.assertEqual(len(results1), 0)
1852        self.assertEqual(len(results2), self.N-1)
1853        self.assertTrue(self.barrier.broken)
1854
1855    @classmethod
1856    def _test_reset_f(cls, barrier, results1, results2, results3):
1857        i = barrier.wait()
1858        if i == cls.N//2:
1859            # Wait until the other threads are all in the barrier.
1860            while barrier.n_waiting < cls.N-1:
1861                time.sleep(0.001)
1862            barrier.reset()
1863        else:
1864            try:
1865                barrier.wait()
1866                results1.append(True)
1867            except threading.BrokenBarrierError:
1868                results2.append(True)
1869        # Now, pass the barrier again
1870        barrier.wait()
1871        results3.append(True)
1872
1873    def test_reset(self):
1874        """
1875        Test that a 'reset' on a barrier frees the waiting threads
1876        """
1877        results1 = self.DummyList()
1878        results2 = self.DummyList()
1879        results3 = self.DummyList()
1880        self.run_threads(self._test_reset_f,
1881                         (self.barrier, results1, results2, results3))
1882        self.assertEqual(len(results1), 0)
1883        self.assertEqual(len(results2), self.N-1)
1884        self.assertEqual(len(results3), self.N)
1885
1886    @classmethod
1887    def _test_abort_and_reset_f(cls, barrier, barrier2,
1888                                results1, results2, results3):
1889        try:
1890            i = barrier.wait()
1891            if i == cls.N//2:
1892                raise RuntimeError
1893            barrier.wait()
1894            results1.append(True)
1895        except threading.BrokenBarrierError:
1896            results2.append(True)
1897        except RuntimeError:
1898            barrier.abort()
1899        # Synchronize and reset the barrier.  Must synchronize first so
1900        # that everyone has left it when we reset, and after so that no
1901        # one enters it before the reset.
1902        if barrier2.wait() == cls.N//2:
1903            barrier.reset()
1904        barrier2.wait()
1905        barrier.wait()
1906        results3.append(True)
1907
1908    def test_abort_and_reset(self):
1909        """
1910        Test that a barrier can be reset after being broken.
1911        """
1912        results1 = self.DummyList()
1913        results2 = self.DummyList()
1914        results3 = self.DummyList()
1915        barrier2 = self.Barrier(self.N)
1916
1917        self.run_threads(self._test_abort_and_reset_f,
1918                         (self.barrier, barrier2, results1, results2, results3))
1919        self.assertEqual(len(results1), 0)
1920        self.assertEqual(len(results2), self.N-1)
1921        self.assertEqual(len(results3), self.N)
1922
1923    @classmethod
1924    def _test_timeout_f(cls, barrier, results):
1925        i = barrier.wait()
1926        if i == cls.N//2:
1927            # One thread is late!
1928            time.sleep(1.0)
1929        try:
1930            barrier.wait(0.5)
1931        except threading.BrokenBarrierError:
1932            results.append(True)
1933
1934    def test_timeout(self):
1935        """
1936        Test wait(timeout)
1937        """
1938        results = self.DummyList()
1939        self.run_threads(self._test_timeout_f, (self.barrier, results))
1940        self.assertEqual(len(results), self.barrier.parties)
1941
1942    @classmethod
1943    def _test_default_timeout_f(cls, barrier, results):
1944        i = barrier.wait(cls.defaultTimeout)
1945        if i == cls.N//2:
1946            # One thread is later than the default timeout
1947            time.sleep(1.0)
1948        try:
1949            barrier.wait()
1950        except threading.BrokenBarrierError:
1951            results.append(True)
1952
1953    def test_default_timeout(self):
1954        """
1955        Test the barrier's default timeout
1956        """
1957        barrier = self.Barrier(self.N, timeout=0.5)
1958        results = self.DummyList()
1959        self.run_threads(self._test_default_timeout_f, (barrier, results))
1960        self.assertEqual(len(results), barrier.parties)
1961
1962    def test_single_thread(self):
1963        b = self.Barrier(1)
1964        b.wait()
1965        b.wait()
1966
1967    @classmethod
1968    def _test_thousand_f(cls, barrier, passes, conn, lock):
1969        for i in range(passes):
1970            barrier.wait()
1971            with lock:
1972                conn.send(i)
1973
1974    def test_thousand(self):
1975        if self.TYPE == 'manager':
1976            self.skipTest('test not appropriate for {}'.format(self.TYPE))
1977        passes = 1000
1978        lock = self.Lock()
1979        conn, child_conn = self.Pipe(False)
1980        for j in range(self.N):
1981            p = self.Process(target=self._test_thousand_f,
1982                           args=(self.barrier, passes, child_conn, lock))
1983            p.start()
1984            self.addCleanup(p.join)
1985
1986        for i in range(passes):
1987            for j in range(self.N):
1988                self.assertEqual(conn.recv(), i)
1989
1990#
1991#
1992#
1993
1994class _TestValue(BaseTestCase):
1995
1996    ALLOWED_TYPES = ('processes',)
1997
1998    codes_values = [
1999        ('i', 4343, 24234),
2000        ('d', 3.625, -4.25),
2001        ('h', -232, 234),
2002        ('q', 2 ** 33, 2 ** 34),
2003        ('c', latin('x'), latin('y'))
2004        ]
2005
2006    def setUp(self):
2007        if not HAS_SHAREDCTYPES:
2008            self.skipTest("requires multiprocessing.sharedctypes")
2009
2010    @classmethod
2011    def _test(cls, values):
2012        for sv, cv in zip(values, cls.codes_values):
2013            sv.value = cv[2]
2014
2015
2016    def test_value(self, raw=False):
2017        if raw:
2018            values = [self.RawValue(code, value)
2019                      for code, value, _ in self.codes_values]
2020        else:
2021            values = [self.Value(code, value)
2022                      for code, value, _ in self.codes_values]
2023
2024        for sv, cv in zip(values, self.codes_values):
2025            self.assertEqual(sv.value, cv[1])
2026
2027        proc = self.Process(target=self._test, args=(values,))
2028        proc.daemon = True
2029        proc.start()
2030        proc.join()
2031
2032        for sv, cv in zip(values, self.codes_values):
2033            self.assertEqual(sv.value, cv[2])
2034
2035    def test_rawvalue(self):
2036        self.test_value(raw=True)
2037
2038    def test_getobj_getlock(self):
2039        val1 = self.Value('i', 5)
2040        lock1 = val1.get_lock()
2041        obj1 = val1.get_obj()
2042
2043        val2 = self.Value('i', 5, lock=None)
2044        lock2 = val2.get_lock()
2045        obj2 = val2.get_obj()
2046
2047        lock = self.Lock()
2048        val3 = self.Value('i', 5, lock=lock)
2049        lock3 = val3.get_lock()
2050        obj3 = val3.get_obj()
2051        self.assertEqual(lock, lock3)
2052
2053        arr4 = self.Value('i', 5, lock=False)
2054        self.assertFalse(hasattr(arr4, 'get_lock'))
2055        self.assertFalse(hasattr(arr4, 'get_obj'))
2056
2057        self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue')
2058
2059        arr5 = self.RawValue('i', 5)
2060        self.assertFalse(hasattr(arr5, 'get_lock'))
2061        self.assertFalse(hasattr(arr5, 'get_obj'))
2062
2063
2064class _TestArray(BaseTestCase):
2065
2066    ALLOWED_TYPES = ('processes',)
2067
2068    @classmethod
2069    def f(cls, seq):
2070        for i in range(1, len(seq)):
2071            seq[i] += seq[i-1]
2072
2073    @unittest.skipIf(c_int is None, "requires _ctypes")
2074    def test_array(self, raw=False):
2075        seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831]
2076        if raw:
2077            arr = self.RawArray('i', seq)
2078        else:
2079            arr = self.Array('i', seq)
2080
2081        self.assertEqual(len(arr), len(seq))
2082        self.assertEqual(arr[3], seq[3])
2083        self.assertEqual(list(arr[2:7]), list(seq[2:7]))
2084
2085        arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4])
2086
2087        self.assertEqual(list(arr[:]), seq)
2088
2089        self.f(seq)
2090
2091        p = self.Process(target=self.f, args=(arr,))
2092        p.daemon = True
2093        p.start()
2094        p.join()
2095
2096        self.assertEqual(list(arr[:]), seq)
2097
2098    @unittest.skipIf(c_int is None, "requires _ctypes")
2099    def test_array_from_size(self):
2100        size = 10
2101        # Test for zeroing (see issue #11675).
2102        # The repetition below strengthens the test by increasing the chances
2103        # of previously allocated non-zero memory being used for the new array
2104        # on the 2nd and 3rd loops.
2105        for _ in range(3):
2106            arr = self.Array('i', size)
2107            self.assertEqual(len(arr), size)
2108            self.assertEqual(list(arr), [0] * size)
2109            arr[:] = range(10)
2110            self.assertEqual(list(arr), list(range(10)))
2111            del arr
2112
2113    @unittest.skipIf(c_int is None, "requires _ctypes")
2114    def test_rawarray(self):
2115        self.test_array(raw=True)
2116
2117    @unittest.skipIf(c_int is None, "requires _ctypes")
2118    def test_getobj_getlock_obj(self):
2119        arr1 = self.Array('i', list(range(10)))
2120        lock1 = arr1.get_lock()
2121        obj1 = arr1.get_obj()
2122
2123        arr2 = self.Array('i', list(range(10)), lock=None)
2124        lock2 = arr2.get_lock()
2125        obj2 = arr2.get_obj()
2126
2127        lock = self.Lock()
2128        arr3 = self.Array('i', list(range(10)), lock=lock)
2129        lock3 = arr3.get_lock()
2130        obj3 = arr3.get_obj()
2131        self.assertEqual(lock, lock3)
2132
2133        arr4 = self.Array('i', range(10), lock=False)
2134        self.assertFalse(hasattr(arr4, 'get_lock'))
2135        self.assertFalse(hasattr(arr4, 'get_obj'))
2136        self.assertRaises(AttributeError,
2137                          self.Array, 'i', range(10), lock='notalock')
2138
2139        arr5 = self.RawArray('i', range(10))
2140        self.assertFalse(hasattr(arr5, 'get_lock'))
2141        self.assertFalse(hasattr(arr5, 'get_obj'))
2142
2143#
2144#
2145#
2146
2147class _TestContainers(BaseTestCase):
2148
2149    ALLOWED_TYPES = ('manager',)
2150
2151    def test_list(self):
2152        a = self.list(list(range(10)))
2153        self.assertEqual(a[:], list(range(10)))
2154
2155        b = self.list()
2156        self.assertEqual(b[:], [])
2157
2158        b.extend(list(range(5)))
2159        self.assertEqual(b[:], list(range(5)))
2160
2161        self.assertEqual(b[2], 2)
2162        self.assertEqual(b[2:10], [2,3,4])
2163
2164        b *= 2
2165        self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4])
2166
2167        self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6])
2168
2169        self.assertEqual(a[:], list(range(10)))
2170
2171        d = [a, b]
2172        e = self.list(d)
2173        self.assertEqual(
2174            [element[:] for element in e],
2175            [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]]
2176            )
2177
2178        f = self.list([a])
2179        a.append('hello')
2180        self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello'])
2181
2182    def test_list_iter(self):
2183        a = self.list(list(range(10)))
2184        it = iter(a)
2185        self.assertEqual(list(it), list(range(10)))
2186        self.assertEqual(list(it), [])  # exhausted
2187        # list modified during iteration
2188        it = iter(a)
2189        a[0] = 100
2190        self.assertEqual(next(it), 100)
2191
2192    def test_list_proxy_in_list(self):
2193        a = self.list([self.list(range(3)) for _i in range(3)])
2194        self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3)
2195
2196        a[0][-1] = 55
2197        self.assertEqual(a[0][:], [0, 1, 55])
2198        for i in range(1, 3):
2199            self.assertEqual(a[i][:], [0, 1, 2])
2200
2201        self.assertEqual(a[1].pop(), 2)
2202        self.assertEqual(len(a[1]), 2)
2203        for i in range(0, 3, 2):
2204            self.assertEqual(len(a[i]), 3)
2205
2206        del a
2207
2208        b = self.list()
2209        b.append(b)
2210        del b
2211
2212    def test_dict(self):
2213        d = self.dict()
2214        indices = list(range(65, 70))
2215        for i in indices:
2216            d[i] = chr(i)
2217        self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices))
2218        self.assertEqual(sorted(d.keys()), indices)
2219        self.assertEqual(sorted(d.values()), [chr(i) for i in indices])
2220        self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices])
2221
2222    def test_dict_iter(self):
2223        d = self.dict()
2224        indices = list(range(65, 70))
2225        for i in indices:
2226            d[i] = chr(i)
2227        it = iter(d)
2228        self.assertEqual(list(it), indices)
2229        self.assertEqual(list(it), [])  # exhausted
2230        # dictionary changed size during iteration
2231        it = iter(d)
2232        d.clear()
2233        self.assertRaises(RuntimeError, next, it)
2234
2235    def test_dict_proxy_nested(self):
2236        pets = self.dict(ferrets=2, hamsters=4)
2237        supplies = self.dict(water=10, feed=3)
2238        d = self.dict(pets=pets, supplies=supplies)
2239
2240        self.assertEqual(supplies['water'], 10)
2241        self.assertEqual(d['supplies']['water'], 10)
2242
2243        d['supplies']['blankets'] = 5
2244        self.assertEqual(supplies['blankets'], 5)
2245        self.assertEqual(d['supplies']['blankets'], 5)
2246
2247        d['supplies']['water'] = 7
2248        self.assertEqual(supplies['water'], 7)
2249        self.assertEqual(d['supplies']['water'], 7)
2250
2251        del pets
2252        del supplies
2253        self.assertEqual(d['pets']['ferrets'], 2)
2254        d['supplies']['blankets'] = 11
2255        self.assertEqual(d['supplies']['blankets'], 11)
2256
2257        pets = d['pets']
2258        supplies = d['supplies']
2259        supplies['water'] = 7
2260        self.assertEqual(supplies['water'], 7)
2261        self.assertEqual(d['supplies']['water'], 7)
2262
2263        d.clear()
2264        self.assertEqual(len(d), 0)
2265        self.assertEqual(supplies['water'], 7)
2266        self.assertEqual(pets['hamsters'], 4)
2267
2268        l = self.list([pets, supplies])
2269        l[0]['marmots'] = 1
2270        self.assertEqual(pets['marmots'], 1)
2271        self.assertEqual(l[0]['marmots'], 1)
2272
2273        del pets
2274        del supplies
2275        self.assertEqual(l[0]['marmots'], 1)
2276
2277        outer = self.list([[88, 99], l])
2278        self.assertIsInstance(outer[0], list)  # Not a ListProxy
2279        self.assertEqual(outer[-1][-1]['feed'], 3)
2280
2281    def test_namespace(self):
2282        n = self.Namespace()
2283        n.name = 'Bob'
2284        n.job = 'Builder'
2285        n._hidden = 'hidden'
2286        self.assertEqual((n.name, n.job), ('Bob', 'Builder'))
2287        del n.job
2288        self.assertEqual(str(n), "Namespace(name='Bob')")
2289        self.assertTrue(hasattr(n, 'name'))
2290        self.assertTrue(not hasattr(n, 'job'))
2291
2292#
2293#
2294#
2295
2296def sqr(x, wait=0.0):
2297    time.sleep(wait)
2298    return x*x
2299
2300def mul(x, y):
2301    return x*y
2302
2303def raise_large_valuerror(wait):
2304    time.sleep(wait)
2305    raise ValueError("x" * 1024**2)
2306
2307def identity(x):
2308    return x
2309
2310class CountedObject(object):
2311    n_instances = 0
2312
2313    def __new__(cls):
2314        cls.n_instances += 1
2315        return object.__new__(cls)
2316
2317    def __del__(self):
2318        type(self).n_instances -= 1
2319
2320class SayWhenError(ValueError): pass
2321
2322def exception_throwing_generator(total, when):
2323    if when == -1:
2324        raise SayWhenError("Somebody said when")
2325    for i in range(total):
2326        if i == when:
2327            raise SayWhenError("Somebody said when")
2328        yield i
2329
2330
2331class _TestPool(BaseTestCase):
2332
2333    @classmethod
2334    def setUpClass(cls):
2335        super().setUpClass()
2336        cls.pool = cls.Pool(4)
2337
2338    @classmethod
2339    def tearDownClass(cls):
2340        cls.pool.terminate()
2341        cls.pool.join()
2342        cls.pool = None
2343        super().tearDownClass()
2344
2345    def test_apply(self):
2346        papply = self.pool.apply
2347        self.assertEqual(papply(sqr, (5,)), sqr(5))
2348        self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3))
2349
2350    def test_map(self):
2351        pmap = self.pool.map
2352        self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10)))))
2353        self.assertEqual(pmap(sqr, list(range(100)), chunksize=20),
2354                         list(map(sqr, list(range(100)))))
2355
2356    def test_starmap(self):
2357        psmap = self.pool.starmap
2358        tuples = list(zip(range(10), range(9,-1, -1)))
2359        self.assertEqual(psmap(mul, tuples),
2360                         list(itertools.starmap(mul, tuples)))
2361        tuples = list(zip(range(100), range(99,-1, -1)))
2362        self.assertEqual(psmap(mul, tuples, chunksize=20),
2363                         list(itertools.starmap(mul, tuples)))
2364
2365    def test_starmap_async(self):
2366        tuples = list(zip(range(100), range(99,-1, -1)))
2367        self.assertEqual(self.pool.starmap_async(mul, tuples).get(),
2368                         list(itertools.starmap(mul, tuples)))
2369
2370    def test_map_async(self):
2371        self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(),
2372                         list(map(sqr, list(range(10)))))
2373
2374    def test_map_async_callbacks(self):
2375        call_args = self.manager.list() if self.TYPE == 'manager' else []
2376        self.pool.map_async(int, ['1'],
2377                            callback=call_args.append,
2378                            error_callback=call_args.append).wait()
2379        self.assertEqual(1, len(call_args))
2380        self.assertEqual([1], call_args[0])
2381        self.pool.map_async(int, ['a'],
2382                            callback=call_args.append,
2383                            error_callback=call_args.append).wait()
2384        self.assertEqual(2, len(call_args))
2385        self.assertIsInstance(call_args[1], ValueError)
2386
2387    def test_map_unplicklable(self):
2388        # Issue #19425 -- failure to pickle should not cause a hang
2389        if self.TYPE == 'threads':
2390            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2391        class A(object):
2392            def __reduce__(self):
2393                raise RuntimeError('cannot pickle')
2394        with self.assertRaises(RuntimeError):
2395            self.pool.map(sqr, [A()]*10)
2396
2397    def test_map_chunksize(self):
2398        try:
2399            self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1)
2400        except multiprocessing.TimeoutError:
2401            self.fail("pool.map_async with chunksize stalled on null list")
2402
2403    def test_map_handle_iterable_exception(self):
2404        if self.TYPE == 'manager':
2405            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2406
2407        # SayWhenError seen at the very first of the iterable
2408        with self.assertRaises(SayWhenError):
2409            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2410        # again, make sure it's reentrant
2411        with self.assertRaises(SayWhenError):
2412            self.pool.map(sqr, exception_throwing_generator(1, -1), 1)
2413
2414        with self.assertRaises(SayWhenError):
2415            self.pool.map(sqr, exception_throwing_generator(10, 3), 1)
2416
2417        class SpecialIterable:
2418            def __iter__(self):
2419                return self
2420            def __next__(self):
2421                raise SayWhenError
2422            def __len__(self):
2423                return 1
2424        with self.assertRaises(SayWhenError):
2425            self.pool.map(sqr, SpecialIterable(), 1)
2426        with self.assertRaises(SayWhenError):
2427            self.pool.map(sqr, SpecialIterable(), 1)
2428
2429    def test_async(self):
2430        res = self.pool.apply_async(sqr, (7, TIMEOUT1,))
2431        get = TimingWrapper(res.get)
2432        self.assertEqual(get(), 49)
2433        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1)
2434
2435    def test_async_timeout(self):
2436        res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0))
2437        get = TimingWrapper(res.get)
2438        self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2)
2439        self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2)
2440
2441    def test_imap(self):
2442        it = self.pool.imap(sqr, list(range(10)))
2443        self.assertEqual(list(it), list(map(sqr, list(range(10)))))
2444
2445        it = self.pool.imap(sqr, list(range(10)))
2446        for i in range(10):
2447            self.assertEqual(next(it), i*i)
2448        self.assertRaises(StopIteration, it.__next__)
2449
2450        it = self.pool.imap(sqr, list(range(1000)), chunksize=100)
2451        for i in range(1000):
2452            self.assertEqual(next(it), i*i)
2453        self.assertRaises(StopIteration, it.__next__)
2454
2455    def test_imap_handle_iterable_exception(self):
2456        if self.TYPE == 'manager':
2457            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2458
2459        # SayWhenError seen at the very first of the iterable
2460        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2461        self.assertRaises(SayWhenError, it.__next__)
2462        # again, make sure it's reentrant
2463        it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1)
2464        self.assertRaises(SayWhenError, it.__next__)
2465
2466        it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1)
2467        for i in range(3):
2468            self.assertEqual(next(it), i*i)
2469        self.assertRaises(SayWhenError, it.__next__)
2470
2471        # SayWhenError seen at start of problematic chunk's results
2472        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2)
2473        for i in range(6):
2474            self.assertEqual(next(it), i*i)
2475        self.assertRaises(SayWhenError, it.__next__)
2476        it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4)
2477        for i in range(4):
2478            self.assertEqual(next(it), i*i)
2479        self.assertRaises(SayWhenError, it.__next__)
2480
2481    def test_imap_unordered(self):
2482        it = self.pool.imap_unordered(sqr, list(range(10)))
2483        self.assertEqual(sorted(it), list(map(sqr, list(range(10)))))
2484
2485        it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100)
2486        self.assertEqual(sorted(it), list(map(sqr, list(range(1000)))))
2487
2488    def test_imap_unordered_handle_iterable_exception(self):
2489        if self.TYPE == 'manager':
2490            self.skipTest('test not appropriate for {}'.format(self.TYPE))
2491
2492        # SayWhenError seen at the very first of the iterable
2493        it = self.pool.imap_unordered(sqr,
2494                                      exception_throwing_generator(1, -1),
2495                                      1)
2496        self.assertRaises(SayWhenError, it.__next__)
2497        # again, make sure it's reentrant
2498        it = self.pool.imap_unordered(sqr,
2499                                      exception_throwing_generator(1, -1),
2500                                      1)
2501        self.assertRaises(SayWhenError, it.__next__)
2502
2503        it = self.pool.imap_unordered(sqr,
2504                                      exception_throwing_generator(10, 3),
2505                                      1)
2506        expected_values = list(map(sqr, list(range(10))))
2507        with self.assertRaises(SayWhenError):
2508            # imap_unordered makes it difficult to anticipate the SayWhenError
2509            for i in range(10):
2510                value = next(it)
2511                self.assertIn(value, expected_values)
2512                expected_values.remove(value)
2513
2514        it = self.pool.imap_unordered(sqr,
2515                                      exception_throwing_generator(20, 7),
2516                                      2)
2517        expected_values = list(map(sqr, list(range(20))))
2518        with self.assertRaises(SayWhenError):
2519            for i in range(20):
2520                value = next(it)
2521                self.assertIn(value, expected_values)
2522                expected_values.remove(value)
2523
2524    def test_make_pool(self):
2525        expected_error = (RemoteError if self.TYPE == 'manager'
2526                          else ValueError)
2527
2528        self.assertRaises(expected_error, self.Pool, -1)
2529        self.assertRaises(expected_error, self.Pool, 0)
2530
2531        if self.TYPE != 'manager':
2532            p = self.Pool(3)
2533            try:
2534                self.assertEqual(3, len(p._pool))
2535            finally:
2536                p.close()
2537                p.join()
2538
2539    def test_terminate(self):
2540        result = self.pool.map_async(
2541            time.sleep, [0.1 for i in range(10000)], chunksize=1
2542            )
2543        self.pool.terminate()
2544        join = TimingWrapper(self.pool.join)
2545        join()
2546        # Sanity check the pool didn't wait for all tasks to finish
2547        self.assertLess(join.elapsed, 2.0)
2548
2549    def test_empty_iterable(self):
2550        # See Issue 12157
2551        p = self.Pool(1)
2552
2553        self.assertEqual(p.map(sqr, []), [])
2554        self.assertEqual(list(p.imap(sqr, [])), [])
2555        self.assertEqual(list(p.imap_unordered(sqr, [])), [])
2556        self.assertEqual(p.map_async(sqr, []).get(), [])
2557
2558        p.close()
2559        p.join()
2560
2561    def test_context(self):
2562        if self.TYPE == 'processes':
2563            L = list(range(10))
2564            expected = [sqr(i) for i in L]
2565            with self.Pool(2) as p:
2566                r = p.map_async(sqr, L)
2567                self.assertEqual(r.get(), expected)
2568            p.join()
2569            self.assertRaises(ValueError, p.map_async, sqr, L)
2570
2571    @classmethod
2572    def _test_traceback(cls):
2573        raise RuntimeError(123) # some comment
2574
2575    def test_traceback(self):
2576        # We want ensure that the traceback from the child process is
2577        # contained in the traceback raised in the main process.
2578        if self.TYPE == 'processes':
2579            with self.Pool(1) as p:
2580                try:
2581                    p.apply(self._test_traceback)
2582                except Exception as e:
2583                    exc = e
2584                else:
2585                    self.fail('expected RuntimeError')
2586            p.join()
2587            self.assertIs(type(exc), RuntimeError)
2588            self.assertEqual(exc.args, (123,))
2589            cause = exc.__cause__
2590            self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback)
2591            self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
2592
2593            with test.support.captured_stderr() as f1:
2594                try:
2595                    raise exc
2596                except RuntimeError:
2597                    sys.excepthook(*sys.exc_info())
2598            self.assertIn('raise RuntimeError(123) # some comment',
2599                          f1.getvalue())
2600            # _helper_reraises_exception should not make the error
2601            # a remote exception
2602            with self.Pool(1) as p:
2603                try:
2604                    p.map(sqr, exception_throwing_generator(1, -1), 1)
2605                except Exception as e:
2606                    exc = e
2607                else:
2608                    self.fail('expected SayWhenError')
2609                self.assertIs(type(exc), SayWhenError)
2610                self.assertIs(exc.__cause__, None)
2611            p.join()
2612
2613    @classmethod
2614    def _test_wrapped_exception(cls):
2615        raise RuntimeError('foo')
2616
2617    def test_wrapped_exception(self):
2618        # Issue #20980: Should not wrap exception when using thread pool
2619        with self.Pool(1) as p:
2620            with self.assertRaises(RuntimeError):
2621                p.apply(self._test_wrapped_exception)
2622        p.join()
2623
2624    def test_map_no_failfast(self):
2625        # Issue #23992: the fail-fast behaviour when an exception is raised
2626        # during map() would make Pool.join() deadlock, because a worker
2627        # process would fill the result queue (after the result handler thread
2628        # terminated, hence not draining it anymore).
2629
2630        t_start = time.monotonic()
2631
2632        with self.assertRaises(ValueError):
2633            with self.Pool(2) as p:
2634                try:
2635                    p.map(raise_large_valuerror, [0, 1])
2636                finally:
2637                    time.sleep(0.5)
2638                    p.close()
2639                    p.join()
2640
2641        # check that we indeed waited for all jobs
2642        self.assertGreater(time.monotonic() - t_start, 0.9)
2643
2644    def test_release_task_refs(self):
2645        # Issue #29861: task arguments and results should not be kept
2646        # alive after we are done with them.
2647        objs = [CountedObject() for i in range(10)]
2648        refs = [weakref.ref(o) for o in objs]
2649        self.pool.map(identity, objs)
2650
2651        del objs
2652        time.sleep(DELTA)  # let threaded cleanup code run
2653        self.assertEqual(set(wr() for wr in refs), {None})
2654        # With a process pool, copies of the objects are returned, check
2655        # they were released too.
2656        self.assertEqual(CountedObject.n_instances, 0)
2657
2658    def test_enter(self):
2659        if self.TYPE == 'manager':
2660            self.skipTest("test not applicable to manager")
2661
2662        pool = self.Pool(1)
2663        with pool:
2664            pass
2665            # call pool.terminate()
2666        # pool is no longer running
2667
2668        with self.assertRaises(ValueError):
2669            # bpo-35477: pool.__enter__() fails if the pool is not running
2670            with pool:
2671                pass
2672        pool.join()
2673
2674    def test_resource_warning(self):
2675        if self.TYPE == 'manager':
2676            self.skipTest("test not applicable to manager")
2677
2678        pool = self.Pool(1)
2679        pool.terminate()
2680        pool.join()
2681
2682        # force state to RUN to emit ResourceWarning in __del__()
2683        pool._state = multiprocessing.pool.RUN
2684
2685        with support.check_warnings(('unclosed running multiprocessing pool',
2686                                     ResourceWarning)):
2687            pool = None
2688            support.gc_collect()
2689
2690def raising():
2691    raise KeyError("key")
2692
2693def unpickleable_result():
2694    return lambda: 42
2695
2696class _TestPoolWorkerErrors(BaseTestCase):
2697    ALLOWED_TYPES = ('processes', )
2698
2699    def test_async_error_callback(self):
2700        p = multiprocessing.Pool(2)
2701
2702        scratchpad = [None]
2703        def errback(exc):
2704            scratchpad[0] = exc
2705
2706        res = p.apply_async(raising, error_callback=errback)
2707        self.assertRaises(KeyError, res.get)
2708        self.assertTrue(scratchpad[0])
2709        self.assertIsInstance(scratchpad[0], KeyError)
2710
2711        p.close()
2712        p.join()
2713
2714    def test_unpickleable_result(self):
2715        from multiprocessing.pool import MaybeEncodingError
2716        p = multiprocessing.Pool(2)
2717
2718        # Make sure we don't lose pool processes because of encoding errors.
2719        for iteration in range(20):
2720
2721            scratchpad = [None]
2722            def errback(exc):
2723                scratchpad[0] = exc
2724
2725            res = p.apply_async(unpickleable_result, error_callback=errback)
2726            self.assertRaises(MaybeEncodingError, res.get)
2727            wrapped = scratchpad[0]
2728            self.assertTrue(wrapped)
2729            self.assertIsInstance(scratchpad[0], MaybeEncodingError)
2730            self.assertIsNotNone(wrapped.exc)
2731            self.assertIsNotNone(wrapped.value)
2732
2733        p.close()
2734        p.join()
2735
2736class _TestPoolWorkerLifetime(BaseTestCase):
2737    ALLOWED_TYPES = ('processes', )
2738
2739    def test_pool_worker_lifetime(self):
2740        p = multiprocessing.Pool(3, maxtasksperchild=10)
2741        self.assertEqual(3, len(p._pool))
2742        origworkerpids = [w.pid for w in p._pool]
2743        # Run many tasks so each worker gets replaced (hopefully)
2744        results = []
2745        for i in range(100):
2746            results.append(p.apply_async(sqr, (i, )))
2747        # Fetch the results and verify we got the right answers,
2748        # also ensuring all the tasks have completed.
2749        for (j, res) in enumerate(results):
2750            self.assertEqual(res.get(), sqr(j))
2751        # Refill the pool
2752        p._repopulate_pool()
2753        # Wait until all workers are alive
2754        # (countdown * DELTA = 5 seconds max startup process time)
2755        countdown = 50
2756        while countdown and not all(w.is_alive() for w in p._pool):
2757            countdown -= 1
2758            time.sleep(DELTA)
2759        finalworkerpids = [w.pid for w in p._pool]
2760        # All pids should be assigned.  See issue #7805.
2761        self.assertNotIn(None, origworkerpids)
2762        self.assertNotIn(None, finalworkerpids)
2763        # Finally, check that the worker pids have changed
2764        self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids))
2765        p.close()
2766        p.join()
2767
2768    def test_pool_worker_lifetime_early_close(self):
2769        # Issue #10332: closing a pool whose workers have limited lifetimes
2770        # before all the tasks completed would make join() hang.
2771        p = multiprocessing.Pool(3, maxtasksperchild=1)
2772        results = []
2773        for i in range(6):
2774            results.append(p.apply_async(sqr, (i, 0.3)))
2775        p.close()
2776        p.join()
2777        # check the results
2778        for (j, res) in enumerate(results):
2779            self.assertEqual(res.get(), sqr(j))
2780
2781    def test_worker_finalization_via_atexit_handler_of_multiprocessing(self):
2782        # tests cases against bpo-38744 and bpo-39360
2783        cmd = '''if 1:
2784            from multiprocessing import Pool
2785            problem = None
2786            class A:
2787                def __init__(self):
2788                    self.pool = Pool(processes=1)
2789            def test():
2790                global problem
2791                problem = A()
2792                problem.pool.map(float, tuple(range(10)))
2793            if __name__ == "__main__":
2794                test()
2795        '''
2796        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
2797        self.assertEqual(rc, 0)
2798
2799#
2800# Test of creating a customized manager class
2801#
2802
2803from multiprocessing.managers import BaseManager, BaseProxy, RemoteError
2804
2805class FooBar(object):
2806    def f(self):
2807        return 'f()'
2808    def g(self):
2809        raise ValueError
2810    def _h(self):
2811        return '_h()'
2812
2813def baz():
2814    for i in range(10):
2815        yield i*i
2816
2817class IteratorProxy(BaseProxy):
2818    _exposed_ = ('__next__',)
2819    def __iter__(self):
2820        return self
2821    def __next__(self):
2822        return self._callmethod('__next__')
2823
2824class MyManager(BaseManager):
2825    pass
2826
2827MyManager.register('Foo', callable=FooBar)
2828MyManager.register('Bar', callable=FooBar, exposed=('f', '_h'))
2829MyManager.register('baz', callable=baz, proxytype=IteratorProxy)
2830
2831
2832class _TestMyManager(BaseTestCase):
2833
2834    ALLOWED_TYPES = ('manager',)
2835
2836    def test_mymanager(self):
2837        manager = MyManager()
2838        manager.start()
2839        self.common(manager)
2840        manager.shutdown()
2841
2842        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2843        # to the manager process if it takes longer than 1 second to stop,
2844        # which happens on slow buildbots.
2845        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2846
2847    def test_mymanager_context(self):
2848        with MyManager() as manager:
2849            self.common(manager)
2850        # bpo-30356: BaseManager._finalize_manager() sends SIGTERM
2851        # to the manager process if it takes longer than 1 second to stop,
2852        # which happens on slow buildbots.
2853        self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM))
2854
2855    def test_mymanager_context_prestarted(self):
2856        manager = MyManager()
2857        manager.start()
2858        with manager:
2859            self.common(manager)
2860        self.assertEqual(manager._process.exitcode, 0)
2861
2862    def common(self, manager):
2863        foo = manager.Foo()
2864        bar = manager.Bar()
2865        baz = manager.baz()
2866
2867        foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)]
2868        bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)]
2869
2870        self.assertEqual(foo_methods, ['f', 'g'])
2871        self.assertEqual(bar_methods, ['f', '_h'])
2872
2873        self.assertEqual(foo.f(), 'f()')
2874        self.assertRaises(ValueError, foo.g)
2875        self.assertEqual(foo._callmethod('f'), 'f()')
2876        self.assertRaises(RemoteError, foo._callmethod, '_h')
2877
2878        self.assertEqual(bar.f(), 'f()')
2879        self.assertEqual(bar._h(), '_h()')
2880        self.assertEqual(bar._callmethod('f'), 'f()')
2881        self.assertEqual(bar._callmethod('_h'), '_h()')
2882
2883        self.assertEqual(list(baz), [i*i for i in range(10)])
2884
2885
2886#
2887# Test of connecting to a remote server and using xmlrpclib for serialization
2888#
2889
2890_queue = pyqueue.Queue()
2891def get_queue():
2892    return _queue
2893
2894class QueueManager(BaseManager):
2895    '''manager class used by server process'''
2896QueueManager.register('get_queue', callable=get_queue)
2897
2898class QueueManager2(BaseManager):
2899    '''manager class which specifies the same interface as QueueManager'''
2900QueueManager2.register('get_queue')
2901
2902
2903SERIALIZER = 'xmlrpclib'
2904
2905class _TestRemoteManager(BaseTestCase):
2906
2907    ALLOWED_TYPES = ('manager',)
2908    values = ['hello world', None, True, 2.25,
2909              'hall\xe5 v\xe4rlden',
2910              '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442',
2911              b'hall\xe5 v\xe4rlden',
2912             ]
2913    result = values[:]
2914
2915    @classmethod
2916    def _putter(cls, address, authkey):
2917        manager = QueueManager2(
2918            address=address, authkey=authkey, serializer=SERIALIZER
2919            )
2920        manager.connect()
2921        queue = manager.get_queue()
2922        # Note that xmlrpclib will deserialize object as a list not a tuple
2923        queue.put(tuple(cls.values))
2924
2925    def test_remote(self):
2926        authkey = os.urandom(32)
2927
2928        manager = QueueManager(
2929            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER
2930            )
2931        manager.start()
2932        self.addCleanup(manager.shutdown)
2933
2934        p = self.Process(target=self._putter, args=(manager.address, authkey))
2935        p.daemon = True
2936        p.start()
2937
2938        manager2 = QueueManager2(
2939            address=manager.address, authkey=authkey, serializer=SERIALIZER
2940            )
2941        manager2.connect()
2942        queue = manager2.get_queue()
2943
2944        self.assertEqual(queue.get(), self.result)
2945
2946        # Because we are using xmlrpclib for serialization instead of
2947        # pickle this will cause a serialization error.
2948        self.assertRaises(Exception, queue.put, time.sleep)
2949
2950        # Make queue finalizer run before the server is stopped
2951        del queue
2952
2953class _TestManagerRestart(BaseTestCase):
2954
2955    @classmethod
2956    def _putter(cls, address, authkey):
2957        manager = QueueManager(
2958            address=address, authkey=authkey, serializer=SERIALIZER)
2959        manager.connect()
2960        queue = manager.get_queue()
2961        queue.put('hello world')
2962
2963    def test_rapid_restart(self):
2964        authkey = os.urandom(32)
2965        manager = QueueManager(
2966            address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER)
2967        try:
2968            srvr = manager.get_server()
2969            addr = srvr.address
2970            # Close the connection.Listener socket which gets opened as a part
2971            # of manager.get_server(). It's not needed for the test.
2972            srvr.listener.close()
2973            manager.start()
2974
2975            p = self.Process(target=self._putter, args=(manager.address, authkey))
2976            p.start()
2977            p.join()
2978            queue = manager.get_queue()
2979            self.assertEqual(queue.get(), 'hello world')
2980            del queue
2981        finally:
2982            if hasattr(manager, "shutdown"):
2983                manager.shutdown()
2984
2985        manager = QueueManager(
2986            address=addr, authkey=authkey, serializer=SERIALIZER)
2987        try:
2988            manager.start()
2989            self.addCleanup(manager.shutdown)
2990        except OSError as e:
2991            if e.errno != errno.EADDRINUSE:
2992                raise
2993            # Retry after some time, in case the old socket was lingering
2994            # (sporadic failure on buildbots)
2995            time.sleep(1.0)
2996            manager = QueueManager(
2997                address=addr, authkey=authkey, serializer=SERIALIZER)
2998            if hasattr(manager, "shutdown"):
2999                self.addCleanup(manager.shutdown)
3000
3001#
3002#
3003#
3004
3005SENTINEL = latin('')
3006
3007class _TestConnection(BaseTestCase):
3008
3009    ALLOWED_TYPES = ('processes', 'threads')
3010
3011    @classmethod
3012    def _echo(cls, conn):
3013        for msg in iter(conn.recv_bytes, SENTINEL):
3014            conn.send_bytes(msg)
3015        conn.close()
3016
3017    def test_connection(self):
3018        conn, child_conn = self.Pipe()
3019
3020        p = self.Process(target=self._echo, args=(child_conn,))
3021        p.daemon = True
3022        p.start()
3023
3024        seq = [1, 2.25, None]
3025        msg = latin('hello world')
3026        longmsg = msg * 10
3027        arr = array.array('i', list(range(4)))
3028
3029        if self.TYPE == 'processes':
3030            self.assertEqual(type(conn.fileno()), int)
3031
3032        self.assertEqual(conn.send(seq), None)
3033        self.assertEqual(conn.recv(), seq)
3034
3035        self.assertEqual(conn.send_bytes(msg), None)
3036        self.assertEqual(conn.recv_bytes(), msg)
3037
3038        if self.TYPE == 'processes':
3039            buffer = array.array('i', [0]*10)
3040            expected = list(arr) + [0] * (10 - len(arr))
3041            self.assertEqual(conn.send_bytes(arr), None)
3042            self.assertEqual(conn.recv_bytes_into(buffer),
3043                             len(arr) * buffer.itemsize)
3044            self.assertEqual(list(buffer), expected)
3045
3046            buffer = array.array('i', [0]*10)
3047            expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr))
3048            self.assertEqual(conn.send_bytes(arr), None)
3049            self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize),
3050                             len(arr) * buffer.itemsize)
3051            self.assertEqual(list(buffer), expected)
3052
3053            buffer = bytearray(latin(' ' * 40))
3054            self.assertEqual(conn.send_bytes(longmsg), None)
3055            try:
3056                res = conn.recv_bytes_into(buffer)
3057            except multiprocessing.BufferTooShort as e:
3058                self.assertEqual(e.args, (longmsg,))
3059            else:
3060                self.fail('expected BufferTooShort, got %s' % res)
3061
3062        poll = TimingWrapper(conn.poll)
3063
3064        self.assertEqual(poll(), False)
3065        self.assertTimingAlmostEqual(poll.elapsed, 0)
3066
3067        self.assertEqual(poll(-1), False)
3068        self.assertTimingAlmostEqual(poll.elapsed, 0)
3069
3070        self.assertEqual(poll(TIMEOUT1), False)
3071        self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1)
3072
3073        conn.send(None)
3074        time.sleep(.1)
3075
3076        self.assertEqual(poll(TIMEOUT1), True)
3077        self.assertTimingAlmostEqual(poll.elapsed, 0)
3078
3079        self.assertEqual(conn.recv(), None)
3080
3081        really_big_msg = latin('X') * (1024 * 1024 * 16)   # 16Mb
3082        conn.send_bytes(really_big_msg)
3083        self.assertEqual(conn.recv_bytes(), really_big_msg)
3084
3085        conn.send_bytes(SENTINEL)                          # tell child to quit
3086        child_conn.close()
3087
3088        if self.TYPE == 'processes':
3089            self.assertEqual(conn.readable, True)
3090            self.assertEqual(conn.writable, True)
3091            self.assertRaises(EOFError, conn.recv)
3092            self.assertRaises(EOFError, conn.recv_bytes)
3093
3094        p.join()
3095
3096    def test_duplex_false(self):
3097        reader, writer = self.Pipe(duplex=False)
3098        self.assertEqual(writer.send(1), None)
3099        self.assertEqual(reader.recv(), 1)
3100        if self.TYPE == 'processes':
3101            self.assertEqual(reader.readable, True)
3102            self.assertEqual(reader.writable, False)
3103            self.assertEqual(writer.readable, False)
3104            self.assertEqual(writer.writable, True)
3105            self.assertRaises(OSError, reader.send, 2)
3106            self.assertRaises(OSError, writer.recv)
3107            self.assertRaises(OSError, writer.poll)
3108
3109    def test_spawn_close(self):
3110        # We test that a pipe connection can be closed by parent
3111        # process immediately after child is spawned.  On Windows this
3112        # would have sometimes failed on old versions because
3113        # child_conn would be closed before the child got a chance to
3114        # duplicate it.
3115        conn, child_conn = self.Pipe()
3116
3117        p = self.Process(target=self._echo, args=(child_conn,))
3118        p.daemon = True
3119        p.start()
3120        child_conn.close()    # this might complete before child initializes
3121
3122        msg = latin('hello')
3123        conn.send_bytes(msg)
3124        self.assertEqual(conn.recv_bytes(), msg)
3125
3126        conn.send_bytes(SENTINEL)
3127        conn.close()
3128        p.join()
3129
3130    def test_sendbytes(self):
3131        if self.TYPE != 'processes':
3132            self.skipTest('test not appropriate for {}'.format(self.TYPE))
3133
3134        msg = latin('abcdefghijklmnopqrstuvwxyz')
3135        a, b = self.Pipe()
3136
3137        a.send_bytes(msg)
3138        self.assertEqual(b.recv_bytes(), msg)
3139
3140        a.send_bytes(msg, 5)
3141        self.assertEqual(b.recv_bytes(), msg[5:])
3142
3143        a.send_bytes(msg, 7, 8)
3144        self.assertEqual(b.recv_bytes(), msg[7:7+8])
3145
3146        a.send_bytes(msg, 26)
3147        self.assertEqual(b.recv_bytes(), latin(''))
3148
3149        a.send_bytes(msg, 26, 0)
3150        self.assertEqual(b.recv_bytes(), latin(''))
3151
3152        self.assertRaises(ValueError, a.send_bytes, msg, 27)
3153
3154        self.assertRaises(ValueError, a.send_bytes, msg, 22, 5)
3155
3156        self.assertRaises(ValueError, a.send_bytes, msg, 26, 1)
3157
3158        self.assertRaises(ValueError, a.send_bytes, msg, -1)
3159
3160        self.assertRaises(ValueError, a.send_bytes, msg, 4, -1)
3161
3162    @classmethod
3163    def _is_fd_assigned(cls, fd):
3164        try:
3165            os.fstat(fd)
3166        except OSError as e:
3167            if e.errno == errno.EBADF:
3168                return False
3169            raise
3170        else:
3171            return True
3172
3173    @classmethod
3174    def _writefd(cls, conn, data, create_dummy_fds=False):
3175        if create_dummy_fds:
3176            for i in range(0, 256):
3177                if not cls._is_fd_assigned(i):
3178                    os.dup2(conn.fileno(), i)
3179        fd = reduction.recv_handle(conn)
3180        if msvcrt:
3181            fd = msvcrt.open_osfhandle(fd, os.O_WRONLY)
3182        os.write(fd, data)
3183        os.close(fd)
3184
3185    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3186    def test_fd_transfer(self):
3187        if self.TYPE != 'processes':
3188            self.skipTest("only makes sense with processes")
3189        conn, child_conn = self.Pipe(duplex=True)
3190
3191        p = self.Process(target=self._writefd, args=(child_conn, b"foo"))
3192        p.daemon = True
3193        p.start()
3194        self.addCleanup(test.support.unlink, test.support.TESTFN)
3195        with open(test.support.TESTFN, "wb") as f:
3196            fd = f.fileno()
3197            if msvcrt:
3198                fd = msvcrt.get_osfhandle(fd)
3199            reduction.send_handle(conn, fd, p.pid)
3200        p.join()
3201        with open(test.support.TESTFN, "rb") as f:
3202            self.assertEqual(f.read(), b"foo")
3203
3204    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3205    @unittest.skipIf(sys.platform == "win32",
3206                     "test semantics don't make sense on Windows")
3207    @unittest.skipIf(MAXFD <= 256,
3208                     "largest assignable fd number is too small")
3209    @unittest.skipUnless(hasattr(os, "dup2"),
3210                         "test needs os.dup2()")
3211    def test_large_fd_transfer(self):
3212        # With fd > 256 (issue #11657)
3213        if self.TYPE != 'processes':
3214            self.skipTest("only makes sense with processes")
3215        conn, child_conn = self.Pipe(duplex=True)
3216
3217        p = self.Process(target=self._writefd, args=(child_conn, b"bar", True))
3218        p.daemon = True
3219        p.start()
3220        self.addCleanup(test.support.unlink, test.support.TESTFN)
3221        with open(test.support.TESTFN, "wb") as f:
3222            fd = f.fileno()
3223            for newfd in range(256, MAXFD):
3224                if not self._is_fd_assigned(newfd):
3225                    break
3226            else:
3227                self.fail("could not find an unassigned large file descriptor")
3228            os.dup2(fd, newfd)
3229            try:
3230                reduction.send_handle(conn, newfd, p.pid)
3231            finally:
3232                os.close(newfd)
3233        p.join()
3234        with open(test.support.TESTFN, "rb") as f:
3235            self.assertEqual(f.read(), b"bar")
3236
3237    @classmethod
3238    def _send_data_without_fd(self, conn):
3239        os.write(conn.fileno(), b"\0")
3240
3241    @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3242    @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows")
3243    def test_missing_fd_transfer(self):
3244        # Check that exception is raised when received data is not
3245        # accompanied by a file descriptor in ancillary data.
3246        if self.TYPE != 'processes':
3247            self.skipTest("only makes sense with processes")
3248        conn, child_conn = self.Pipe(duplex=True)
3249
3250        p = self.Process(target=self._send_data_without_fd, args=(child_conn,))
3251        p.daemon = True
3252        p.start()
3253        self.assertRaises(RuntimeError, reduction.recv_handle, conn)
3254        p.join()
3255
3256    def test_context(self):
3257        a, b = self.Pipe()
3258
3259        with a, b:
3260            a.send(1729)
3261            self.assertEqual(b.recv(), 1729)
3262            if self.TYPE == 'processes':
3263                self.assertFalse(a.closed)
3264                self.assertFalse(b.closed)
3265
3266        if self.TYPE == 'processes':
3267            self.assertTrue(a.closed)
3268            self.assertTrue(b.closed)
3269            self.assertRaises(OSError, a.recv)
3270            self.assertRaises(OSError, b.recv)
3271
3272class _TestListener(BaseTestCase):
3273
3274    ALLOWED_TYPES = ('processes',)
3275
3276    def test_multiple_bind(self):
3277        for family in self.connection.families:
3278            l = self.connection.Listener(family=family)
3279            self.addCleanup(l.close)
3280            self.assertRaises(OSError, self.connection.Listener,
3281                              l.address, family)
3282
3283    def test_context(self):
3284        with self.connection.Listener() as l:
3285            with self.connection.Client(l.address) as c:
3286                with l.accept() as d:
3287                    c.send(1729)
3288                    self.assertEqual(d.recv(), 1729)
3289
3290        if self.TYPE == 'processes':
3291            self.assertRaises(OSError, l.accept)
3292
3293    @unittest.skipUnless(util.abstract_sockets_supported,
3294                         "test needs abstract socket support")
3295    def test_abstract_socket(self):
3296        with self.connection.Listener("\0something") as listener:
3297            with self.connection.Client(listener.address) as client:
3298                with listener.accept() as d:
3299                    client.send(1729)
3300                    self.assertEqual(d.recv(), 1729)
3301
3302        if self.TYPE == 'processes':
3303            self.assertRaises(OSError, listener.accept)
3304
3305
3306class _TestListenerClient(BaseTestCase):
3307
3308    ALLOWED_TYPES = ('processes', 'threads')
3309
3310    @classmethod
3311    def _test(cls, address):
3312        conn = cls.connection.Client(address)
3313        conn.send('hello')
3314        conn.close()
3315
3316    def test_listener_client(self):
3317        for family in self.connection.families:
3318            l = self.connection.Listener(family=family)
3319            p = self.Process(target=self._test, args=(l.address,))
3320            p.daemon = True
3321            p.start()
3322            conn = l.accept()
3323            self.assertEqual(conn.recv(), 'hello')
3324            p.join()
3325            l.close()
3326
3327    def test_issue14725(self):
3328        l = self.connection.Listener()
3329        p = self.Process(target=self._test, args=(l.address,))
3330        p.daemon = True
3331        p.start()
3332        time.sleep(1)
3333        # On Windows the client process should by now have connected,
3334        # written data and closed the pipe handle by now.  This causes
3335        # ConnectNamdedPipe() to fail with ERROR_NO_DATA.  See Issue
3336        # 14725.
3337        conn = l.accept()
3338        self.assertEqual(conn.recv(), 'hello')
3339        conn.close()
3340        p.join()
3341        l.close()
3342
3343    def test_issue16955(self):
3344        for fam in self.connection.families:
3345            l = self.connection.Listener(family=fam)
3346            c = self.connection.Client(l.address)
3347            a = l.accept()
3348            a.send_bytes(b"hello")
3349            self.assertTrue(c.poll(1))
3350            a.close()
3351            c.close()
3352            l.close()
3353
3354class _TestPoll(BaseTestCase):
3355
3356    ALLOWED_TYPES = ('processes', 'threads')
3357
3358    def test_empty_string(self):
3359        a, b = self.Pipe()
3360        self.assertEqual(a.poll(), False)
3361        b.send_bytes(b'')
3362        self.assertEqual(a.poll(), True)
3363        self.assertEqual(a.poll(), True)
3364
3365    @classmethod
3366    def _child_strings(cls, conn, strings):
3367        for s in strings:
3368            time.sleep(0.1)
3369            conn.send_bytes(s)
3370        conn.close()
3371
3372    def test_strings(self):
3373        strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop')
3374        a, b = self.Pipe()
3375        p = self.Process(target=self._child_strings, args=(b, strings))
3376        p.start()
3377
3378        for s in strings:
3379            for i in range(200):
3380                if a.poll(0.01):
3381                    break
3382            x = a.recv_bytes()
3383            self.assertEqual(s, x)
3384
3385        p.join()
3386
3387    @classmethod
3388    def _child_boundaries(cls, r):
3389        # Polling may "pull" a message in to the child process, but we
3390        # don't want it to pull only part of a message, as that would
3391        # corrupt the pipe for any other processes which might later
3392        # read from it.
3393        r.poll(5)
3394
3395    def test_boundaries(self):
3396        r, w = self.Pipe(False)
3397        p = self.Process(target=self._child_boundaries, args=(r,))
3398        p.start()
3399        time.sleep(2)
3400        L = [b"first", b"second"]
3401        for obj in L:
3402            w.send_bytes(obj)
3403        w.close()
3404        p.join()
3405        self.assertIn(r.recv_bytes(), L)
3406
3407    @classmethod
3408    def _child_dont_merge(cls, b):
3409        b.send_bytes(b'a')
3410        b.send_bytes(b'b')
3411        b.send_bytes(b'cd')
3412
3413    def test_dont_merge(self):
3414        a, b = self.Pipe()
3415        self.assertEqual(a.poll(0.0), False)
3416        self.assertEqual(a.poll(0.1), False)
3417
3418        p = self.Process(target=self._child_dont_merge, args=(b,))
3419        p.start()
3420
3421        self.assertEqual(a.recv_bytes(), b'a')
3422        self.assertEqual(a.poll(1.0), True)
3423        self.assertEqual(a.poll(1.0), True)
3424        self.assertEqual(a.recv_bytes(), b'b')
3425        self.assertEqual(a.poll(1.0), True)
3426        self.assertEqual(a.poll(1.0), True)
3427        self.assertEqual(a.poll(0.0), True)
3428        self.assertEqual(a.recv_bytes(), b'cd')
3429
3430        p.join()
3431
3432#
3433# Test of sending connection and socket objects between processes
3434#
3435
3436@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction")
3437class _TestPicklingConnections(BaseTestCase):
3438
3439    ALLOWED_TYPES = ('processes',)
3440
3441    @classmethod
3442    def tearDownClass(cls):
3443        from multiprocessing import resource_sharer
3444        resource_sharer.stop(timeout=TIMEOUT)
3445
3446    @classmethod
3447    def _listener(cls, conn, families):
3448        for fam in families:
3449            l = cls.connection.Listener(family=fam)
3450            conn.send(l.address)
3451            new_conn = l.accept()
3452            conn.send(new_conn)
3453            new_conn.close()
3454            l.close()
3455
3456        l = socket.create_server((test.support.HOST, 0))
3457        conn.send(l.getsockname())
3458        new_conn, addr = l.accept()
3459        conn.send(new_conn)
3460        new_conn.close()
3461        l.close()
3462
3463        conn.recv()
3464
3465    @classmethod
3466    def _remote(cls, conn):
3467        for (address, msg) in iter(conn.recv, None):
3468            client = cls.connection.Client(address)
3469            client.send(msg.upper())
3470            client.close()
3471
3472        address, msg = conn.recv()
3473        client = socket.socket()
3474        client.connect(address)
3475        client.sendall(msg.upper())
3476        client.close()
3477
3478        conn.close()
3479
3480    def test_pickling(self):
3481        families = self.connection.families
3482
3483        lconn, lconn0 = self.Pipe()
3484        lp = self.Process(target=self._listener, args=(lconn0, families))
3485        lp.daemon = True
3486        lp.start()
3487        lconn0.close()
3488
3489        rconn, rconn0 = self.Pipe()
3490        rp = self.Process(target=self._remote, args=(rconn0,))
3491        rp.daemon = True
3492        rp.start()
3493        rconn0.close()
3494
3495        for fam in families:
3496            msg = ('This connection uses family %s' % fam).encode('ascii')
3497            address = lconn.recv()
3498            rconn.send((address, msg))
3499            new_conn = lconn.recv()
3500            self.assertEqual(new_conn.recv(), msg.upper())
3501
3502        rconn.send(None)
3503
3504        msg = latin('This connection uses a normal socket')
3505        address = lconn.recv()
3506        rconn.send((address, msg))
3507        new_conn = lconn.recv()
3508        buf = []
3509        while True:
3510            s = new_conn.recv(100)
3511            if not s:
3512                break
3513            buf.append(s)
3514        buf = b''.join(buf)
3515        self.assertEqual(buf, msg.upper())
3516        new_conn.close()
3517
3518        lconn.send(None)
3519
3520        rconn.close()
3521        lconn.close()
3522
3523        lp.join()
3524        rp.join()
3525
3526    @classmethod
3527    def child_access(cls, conn):
3528        w = conn.recv()
3529        w.send('all is well')
3530        w.close()
3531
3532        r = conn.recv()
3533        msg = r.recv()
3534        conn.send(msg*2)
3535
3536        conn.close()
3537
3538    def test_access(self):
3539        # On Windows, if we do not specify a destination pid when
3540        # using DupHandle then we need to be careful to use the
3541        # correct access flags for DuplicateHandle(), or else
3542        # DupHandle.detach() will raise PermissionError.  For example,
3543        # for a read only pipe handle we should use
3544        # access=FILE_GENERIC_READ.  (Unfortunately
3545        # DUPLICATE_SAME_ACCESS does not work.)
3546        conn, child_conn = self.Pipe()
3547        p = self.Process(target=self.child_access, args=(child_conn,))
3548        p.daemon = True
3549        p.start()
3550        child_conn.close()
3551
3552        r, w = self.Pipe(duplex=False)
3553        conn.send(w)
3554        w.close()
3555        self.assertEqual(r.recv(), 'all is well')
3556        r.close()
3557
3558        r, w = self.Pipe(duplex=False)
3559        conn.send(r)
3560        r.close()
3561        w.send('foobar')
3562        w.close()
3563        self.assertEqual(conn.recv(), 'foobar'*2)
3564
3565        p.join()
3566
3567#
3568#
3569#
3570
3571class _TestHeap(BaseTestCase):
3572
3573    ALLOWED_TYPES = ('processes',)
3574
3575    def setUp(self):
3576        super().setUp()
3577        # Make pristine heap for these tests
3578        self.old_heap = multiprocessing.heap.BufferWrapper._heap
3579        multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap()
3580
3581    def tearDown(self):
3582        multiprocessing.heap.BufferWrapper._heap = self.old_heap
3583        super().tearDown()
3584
3585    def test_heap(self):
3586        iterations = 5000
3587        maxblocks = 50
3588        blocks = []
3589
3590        # get the heap object
3591        heap = multiprocessing.heap.BufferWrapper._heap
3592        heap._DISCARD_FREE_SPACE_LARGER_THAN = 0
3593
3594        # create and destroy lots of blocks of different sizes
3595        for i in range(iterations):
3596            size = int(random.lognormvariate(0, 1) * 1000)
3597            b = multiprocessing.heap.BufferWrapper(size)
3598            blocks.append(b)
3599            if len(blocks) > maxblocks:
3600                i = random.randrange(maxblocks)
3601                del blocks[i]
3602            del b
3603
3604        # verify the state of the heap
3605        with heap._lock:
3606            all = []
3607            free = 0
3608            occupied = 0
3609            for L in list(heap._len_to_seq.values()):
3610                # count all free blocks in arenas
3611                for arena, start, stop in L:
3612                    all.append((heap._arenas.index(arena), start, stop,
3613                                stop-start, 'free'))
3614                    free += (stop-start)
3615            for arena, arena_blocks in heap._allocated_blocks.items():
3616                # count all allocated blocks in arenas
3617                for start, stop in arena_blocks:
3618                    all.append((heap._arenas.index(arena), start, stop,
3619                                stop-start, 'occupied'))
3620                    occupied += (stop-start)
3621
3622            self.assertEqual(free + occupied,
3623                             sum(arena.size for arena in heap._arenas))
3624
3625            all.sort()
3626
3627            for i in range(len(all)-1):
3628                (arena, start, stop) = all[i][:3]
3629                (narena, nstart, nstop) = all[i+1][:3]
3630                if arena != narena:
3631                    # Two different arenas
3632                    self.assertEqual(stop, heap._arenas[arena].size)  # last block
3633                    self.assertEqual(nstart, 0)         # first block
3634                else:
3635                    # Same arena: two adjacent blocks
3636                    self.assertEqual(stop, nstart)
3637
3638        # test free'ing all blocks
3639        random.shuffle(blocks)
3640        while blocks:
3641            blocks.pop()
3642
3643        self.assertEqual(heap._n_frees, heap._n_mallocs)
3644        self.assertEqual(len(heap._pending_free_blocks), 0)
3645        self.assertEqual(len(heap._arenas), 0)
3646        self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks)
3647        self.assertEqual(len(heap._len_to_seq), 0)
3648
3649    def test_free_from_gc(self):
3650        # Check that freeing of blocks by the garbage collector doesn't deadlock
3651        # (issue #12352).
3652        # Make sure the GC is enabled, and set lower collection thresholds to
3653        # make collections more frequent (and increase the probability of
3654        # deadlock).
3655        if not gc.isenabled():
3656            gc.enable()
3657            self.addCleanup(gc.disable)
3658        thresholds = gc.get_threshold()
3659        self.addCleanup(gc.set_threshold, *thresholds)
3660        gc.set_threshold(10)
3661
3662        # perform numerous block allocations, with cyclic references to make
3663        # sure objects are collected asynchronously by the gc
3664        for i in range(5000):
3665            a = multiprocessing.heap.BufferWrapper(1)
3666            b = multiprocessing.heap.BufferWrapper(1)
3667            # circular references
3668            a.buddy = b
3669            b.buddy = a
3670
3671#
3672#
3673#
3674
3675class _Foo(Structure):
3676    _fields_ = [
3677        ('x', c_int),
3678        ('y', c_double),
3679        ('z', c_longlong,)
3680        ]
3681
3682class _TestSharedCTypes(BaseTestCase):
3683
3684    ALLOWED_TYPES = ('processes',)
3685
3686    def setUp(self):
3687        if not HAS_SHAREDCTYPES:
3688            self.skipTest("requires multiprocessing.sharedctypes")
3689
3690    @classmethod
3691    def _double(cls, x, y, z, foo, arr, string):
3692        x.value *= 2
3693        y.value *= 2
3694        z.value *= 2
3695        foo.x *= 2
3696        foo.y *= 2
3697        string.value *= 2
3698        for i in range(len(arr)):
3699            arr[i] *= 2
3700
3701    def test_sharedctypes(self, lock=False):
3702        x = Value('i', 7, lock=lock)
3703        y = Value(c_double, 1.0/3.0, lock=lock)
3704        z = Value(c_longlong, 2 ** 33, lock=lock)
3705        foo = Value(_Foo, 3, 2, lock=lock)
3706        arr = self.Array('d', list(range(10)), lock=lock)
3707        string = self.Array('c', 20, lock=lock)
3708        string.value = latin('hello')
3709
3710        p = self.Process(target=self._double, args=(x, y, z, foo, arr, string))
3711        p.daemon = True
3712        p.start()
3713        p.join()
3714
3715        self.assertEqual(x.value, 14)
3716        self.assertAlmostEqual(y.value, 2.0/3.0)
3717        self.assertEqual(z.value, 2 ** 34)
3718        self.assertEqual(foo.x, 6)
3719        self.assertAlmostEqual(foo.y, 4.0)
3720        for i in range(10):
3721            self.assertAlmostEqual(arr[i], i*2)
3722        self.assertEqual(string.value, latin('hellohello'))
3723
3724    def test_synchronize(self):
3725        self.test_sharedctypes(lock=True)
3726
3727    def test_copy(self):
3728        foo = _Foo(2, 5.0, 2 ** 33)
3729        bar = copy(foo)
3730        foo.x = 0
3731        foo.y = 0
3732        foo.z = 0
3733        self.assertEqual(bar.x, 2)
3734        self.assertAlmostEqual(bar.y, 5.0)
3735        self.assertEqual(bar.z, 2 ** 33)
3736
3737
3738@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory")
3739class _TestSharedMemory(BaseTestCase):
3740
3741    ALLOWED_TYPES = ('processes',)
3742
3743    @staticmethod
3744    def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data):
3745        if isinstance(shmem_name_or_obj, str):
3746            local_sms = shared_memory.SharedMemory(shmem_name_or_obj)
3747        else:
3748            local_sms = shmem_name_or_obj
3749        local_sms.buf[:len(binary_data)] = binary_data
3750        local_sms.close()
3751
3752    def test_shared_memory_basics(self):
3753        sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512)
3754        self.addCleanup(sms.unlink)
3755
3756        # Verify attributes are readable.
3757        self.assertEqual(sms.name, 'test01_tsmb')
3758        self.assertGreaterEqual(sms.size, 512)
3759        self.assertGreaterEqual(len(sms.buf), sms.size)
3760
3761        # Modify contents of shared memory segment through memoryview.
3762        sms.buf[0] = 42
3763        self.assertEqual(sms.buf[0], 42)
3764
3765        # Attach to existing shared memory segment.
3766        also_sms = shared_memory.SharedMemory('test01_tsmb')
3767        self.assertEqual(also_sms.buf[0], 42)
3768        also_sms.close()
3769
3770        # Attach to existing shared memory segment but specify a new size.
3771        same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size)
3772        self.assertLess(same_sms.size, 20*sms.size)  # Size was ignored.
3773        same_sms.close()
3774
3775        if shared_memory._USE_POSIX:
3776            # Posix Shared Memory can only be unlinked once.  Here we
3777            # test an implementation detail that is not observed across
3778            # all supported platforms (since WindowsNamedSharedMemory
3779            # manages unlinking on its own and unlink() does nothing).
3780            # True release of shared memory segment does not necessarily
3781            # happen until process exits, depending on the OS platform.
3782            with self.assertRaises(FileNotFoundError):
3783                sms_uno = shared_memory.SharedMemory(
3784                    'test01_dblunlink',
3785                    create=True,
3786                    size=5000
3787                )
3788
3789                try:
3790                    self.assertGreaterEqual(sms_uno.size, 5000)
3791
3792                    sms_duo = shared_memory.SharedMemory('test01_dblunlink')
3793                    sms_duo.unlink()  # First shm_unlink() call.
3794                    sms_duo.close()
3795                    sms_uno.close()
3796
3797                finally:
3798                    sms_uno.unlink()  # A second shm_unlink() call is bad.
3799
3800        with self.assertRaises(FileExistsError):
3801            # Attempting to create a new shared memory segment with a
3802            # name that is already in use triggers an exception.
3803            there_can_only_be_one_sms = shared_memory.SharedMemory(
3804                'test01_tsmb',
3805                create=True,
3806                size=512
3807            )
3808
3809        if shared_memory._USE_POSIX:
3810            # Requesting creation of a shared memory segment with the option
3811            # to attach to an existing segment, if that name is currently in
3812            # use, should not trigger an exception.
3813            # Note:  Using a smaller size could possibly cause truncation of
3814            # the existing segment but is OS platform dependent.  In the
3815            # case of MacOS/darwin, requesting a smaller size is disallowed.
3816            class OptionalAttachSharedMemory(shared_memory.SharedMemory):
3817                _flags = os.O_CREAT | os.O_RDWR
3818            ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb')
3819            self.assertEqual(ok_if_exists_sms.size, sms.size)
3820            ok_if_exists_sms.close()
3821
3822        # Attempting to attach to an existing shared memory segment when
3823        # no segment exists with the supplied name triggers an exception.
3824        with self.assertRaises(FileNotFoundError):
3825            nonexisting_sms = shared_memory.SharedMemory('test01_notthere')
3826            nonexisting_sms.unlink()  # Error should occur on prior line.
3827
3828        sms.close()
3829
3830        # Test creating a shared memory segment with negative size
3831        with self.assertRaises(ValueError):
3832            sms_invalid = shared_memory.SharedMemory(create=True, size=-1)
3833
3834        # Test creating a shared memory segment with size 0
3835        with self.assertRaises(ValueError):
3836            sms_invalid = shared_memory.SharedMemory(create=True, size=0)
3837
3838        # Test creating a shared memory segment without size argument
3839        with self.assertRaises(ValueError):
3840            sms_invalid = shared_memory.SharedMemory(create=True)
3841
3842    def test_shared_memory_across_processes(self):
3843        # bpo-40135: don't define shared memory block's name in case of
3844        # the failure when we run multiprocessing tests in parallel.
3845        sms = shared_memory.SharedMemory(create=True, size=512)
3846        self.addCleanup(sms.unlink)
3847
3848        # Verify remote attachment to existing block by name is working.
3849        p = self.Process(
3850            target=self._attach_existing_shmem_then_write,
3851            args=(sms.name, b'howdy')
3852        )
3853        p.daemon = True
3854        p.start()
3855        p.join()
3856        self.assertEqual(bytes(sms.buf[:5]), b'howdy')
3857
3858        # Verify pickling of SharedMemory instance also works.
3859        p = self.Process(
3860            target=self._attach_existing_shmem_then_write,
3861            args=(sms, b'HELLO')
3862        )
3863        p.daemon = True
3864        p.start()
3865        p.join()
3866        self.assertEqual(bytes(sms.buf[:5]), b'HELLO')
3867
3868        sms.close()
3869
3870    @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms")
3871    def test_shared_memory_SharedMemoryServer_ignores_sigint(self):
3872        # bpo-36368: protect SharedMemoryManager server process from
3873        # KeyboardInterrupt signals.
3874        smm = multiprocessing.managers.SharedMemoryManager()
3875        smm.start()
3876
3877        # make sure the manager works properly at the beginning
3878        sl = smm.ShareableList(range(10))
3879
3880        # the manager's server should ignore KeyboardInterrupt signals, and
3881        # maintain its connection with the current process, and success when
3882        # asked to deliver memory segments.
3883        os.kill(smm._process.pid, signal.SIGINT)
3884
3885        sl2 = smm.ShareableList(range(10))
3886
3887        # test that the custom signal handler registered in the Manager does
3888        # not affect signal handling in the parent process.
3889        with self.assertRaises(KeyboardInterrupt):
3890            os.kill(os.getpid(), signal.SIGINT)
3891
3892        smm.shutdown()
3893
3894    @unittest.skipIf(os.name != "posix", "resource_tracker is posix only")
3895    def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self):
3896        # bpo-36867: test that a SharedMemoryManager uses the
3897        # same resource_tracker process as its parent.
3898        cmd = '''if 1:
3899            from multiprocessing.managers import SharedMemoryManager
3900
3901
3902            smm = SharedMemoryManager()
3903            smm.start()
3904            sl = smm.ShareableList(range(10))
3905            smm.shutdown()
3906        '''
3907        rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd)
3908
3909        # Before bpo-36867 was fixed, a SharedMemoryManager not using the same
3910        # resource_tracker process as its parent would make the parent's
3911        # tracker complain about sl being leaked even though smm.shutdown()
3912        # properly released sl.
3913        self.assertFalse(err)
3914
3915    def test_shared_memory_SharedMemoryManager_basics(self):
3916        smm1 = multiprocessing.managers.SharedMemoryManager()
3917        with self.assertRaises(ValueError):
3918            smm1.SharedMemory(size=9)  # Fails if SharedMemoryServer not started
3919        smm1.start()
3920        lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ]
3921        lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ]
3922        doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name)
3923        self.assertEqual(len(doppleganger_list0), 5)
3924        doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name)
3925        self.assertGreaterEqual(len(doppleganger_shm0.buf), 32)
3926        held_name = lom[0].name
3927        smm1.shutdown()
3928        if sys.platform != "win32":
3929            # Calls to unlink() have no effect on Windows platform; shared
3930            # memory will only be released once final process exits.
3931            with self.assertRaises(FileNotFoundError):
3932                # No longer there to be attached to again.
3933                absent_shm = shared_memory.SharedMemory(name=held_name)
3934
3935        with multiprocessing.managers.SharedMemoryManager() as smm2:
3936            sl = smm2.ShareableList("howdy")
3937            shm = smm2.SharedMemory(size=128)
3938            held_name = sl.shm.name
3939        if sys.platform != "win32":
3940            with self.assertRaises(FileNotFoundError):
3941                # No longer there to be attached to again.
3942                absent_sl = shared_memory.ShareableList(name=held_name)
3943
3944
3945    def test_shared_memory_ShareableList_basics(self):
3946        sl = shared_memory.ShareableList(
3947            ['howdy', b'HoWdY', -273.154, 100, None, True, 42]
3948        )
3949        self.addCleanup(sl.shm.unlink)
3950
3951        # Verify attributes are readable.
3952        self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q')
3953
3954        # Exercise len().
3955        self.assertEqual(len(sl), 7)
3956
3957        # Exercise index().
3958        with warnings.catch_warnings():
3959            # Suppress BytesWarning when comparing against b'HoWdY'.
3960            warnings.simplefilter('ignore')
3961            with self.assertRaises(ValueError):
3962                sl.index('100')
3963            self.assertEqual(sl.index(100), 3)
3964
3965        # Exercise retrieving individual values.
3966        self.assertEqual(sl[0], 'howdy')
3967        self.assertEqual(sl[-2], True)
3968
3969        # Exercise iterability.
3970        self.assertEqual(
3971            tuple(sl),
3972            ('howdy', b'HoWdY', -273.154, 100, None, True, 42)
3973        )
3974
3975        # Exercise modifying individual values.
3976        sl[3] = 42
3977        self.assertEqual(sl[3], 42)
3978        sl[4] = 'some'  # Change type at a given position.
3979        self.assertEqual(sl[4], 'some')
3980        self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q')
3981        with self.assertRaisesRegex(ValueError,
3982                                    "exceeds available storage"):
3983            sl[4] = 'far too many'
3984        self.assertEqual(sl[4], 'some')
3985        sl[0] = 'encodés'  # Exactly 8 bytes of UTF-8 data
3986        self.assertEqual(sl[0], 'encodés')
3987        self.assertEqual(sl[1], b'HoWdY')  # no spillage
3988        with self.assertRaisesRegex(ValueError,
3989                                    "exceeds available storage"):
3990            sl[0] = 'encodées'  # Exactly 9 bytes of UTF-8 data
3991        self.assertEqual(sl[1], b'HoWdY')
3992        with self.assertRaisesRegex(ValueError,
3993                                    "exceeds available storage"):
3994            sl[1] = b'123456789'
3995        self.assertEqual(sl[1], b'HoWdY')
3996
3997        # Exercise count().
3998        with warnings.catch_warnings():
3999            # Suppress BytesWarning when comparing against b'HoWdY'.
4000            warnings.simplefilter('ignore')
4001            self.assertEqual(sl.count(42), 2)
4002            self.assertEqual(sl.count(b'HoWdY'), 1)
4003            self.assertEqual(sl.count(b'adios'), 0)
4004
4005        # Exercise creating a duplicate.
4006        sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate')
4007        try:
4008            self.assertNotEqual(sl.shm.name, sl_copy.shm.name)
4009            self.assertEqual('test03_duplicate', sl_copy.shm.name)
4010            self.assertEqual(list(sl), list(sl_copy))
4011            self.assertEqual(sl.format, sl_copy.format)
4012            sl_copy[-1] = 77
4013            self.assertEqual(sl_copy[-1], 77)
4014            self.assertNotEqual(sl[-1], 77)
4015            sl_copy.shm.close()
4016        finally:
4017            sl_copy.shm.unlink()
4018
4019        # Obtain a second handle on the same ShareableList.
4020        sl_tethered = shared_memory.ShareableList(name=sl.shm.name)
4021        self.assertEqual(sl.shm.name, sl_tethered.shm.name)
4022        sl_tethered[-1] = 880
4023        self.assertEqual(sl[-1], 880)
4024        sl_tethered.shm.close()
4025
4026        sl.shm.close()
4027
4028        # Exercise creating an empty ShareableList.
4029        empty_sl = shared_memory.ShareableList()
4030        try:
4031            self.assertEqual(len(empty_sl), 0)
4032            self.assertEqual(empty_sl.format, '')
4033            self.assertEqual(empty_sl.count('any'), 0)
4034            with self.assertRaises(ValueError):
4035                empty_sl.index(None)
4036            empty_sl.shm.close()
4037        finally:
4038            empty_sl.shm.unlink()
4039
4040    def test_shared_memory_ShareableList_pickling(self):
4041        sl = shared_memory.ShareableList(range(10))
4042        self.addCleanup(sl.shm.unlink)
4043
4044        serialized_sl = pickle.dumps(sl)
4045        deserialized_sl = pickle.loads(serialized_sl)
4046        self.assertTrue(
4047            isinstance(deserialized_sl, shared_memory.ShareableList)
4048        )
4049        self.assertTrue(deserialized_sl[-1], 9)
4050        self.assertFalse(sl is deserialized_sl)
4051        deserialized_sl[4] = "changed"
4052        self.assertEqual(sl[4], "changed")
4053
4054        # Verify data is not being put into the pickled representation.
4055        name = 'a' * len(sl.shm.name)
4056        larger_sl = shared_memory.ShareableList(range(400))
4057        self.addCleanup(larger_sl.shm.unlink)
4058        serialized_larger_sl = pickle.dumps(larger_sl)
4059        self.assertTrue(len(serialized_sl) == len(serialized_larger_sl))
4060        larger_sl.shm.close()
4061
4062        deserialized_sl.shm.close()
4063        sl.shm.close()
4064
4065    def test_shared_memory_cleaned_after_process_termination(self):
4066        cmd = '''if 1:
4067            import os, time, sys
4068            from multiprocessing import shared_memory
4069
4070            # Create a shared_memory segment, and send the segment name
4071            sm = shared_memory.SharedMemory(create=True, size=10)
4072            sys.stdout.write(sm.name + '\\n')
4073            sys.stdout.flush()
4074            time.sleep(100)
4075        '''
4076        with subprocess.Popen([sys.executable, '-E', '-c', cmd],
4077                              stdout=subprocess.PIPE,
4078                              stderr=subprocess.PIPE) as p:
4079            name = p.stdout.readline().strip().decode()
4080
4081            # killing abruptly processes holding reference to a shared memory
4082            # segment should not leak the given memory segment.
4083            p.terminate()
4084            p.wait()
4085
4086            deadline = time.monotonic() + 60
4087            t = 0.1
4088            while time.monotonic() < deadline:
4089                time.sleep(t)
4090                t = min(t*2, 5)
4091                try:
4092                    smm = shared_memory.SharedMemory(name, create=False)
4093                except FileNotFoundError:
4094                    break
4095            else:
4096                raise AssertionError("A SharedMemory segment was leaked after"
4097                                     " a process was abruptly terminated.")
4098
4099            if os.name == 'posix':
4100                # A warning was emitted by the subprocess' own
4101                # resource_tracker (on Windows, shared memory segments
4102                # are released automatically by the OS).
4103                err = p.stderr.read().decode()
4104                self.assertIn(
4105                    "resource_tracker: There appear to be 1 leaked "
4106                    "shared_memory objects to clean up at shutdown", err)
4107
4108#
4109#
4110#
4111
4112class _TestFinalize(BaseTestCase):
4113
4114    ALLOWED_TYPES = ('processes',)
4115
4116    def setUp(self):
4117        self.registry_backup = util._finalizer_registry.copy()
4118        util._finalizer_registry.clear()
4119
4120    def tearDown(self):
4121        self.assertFalse(util._finalizer_registry)
4122        util._finalizer_registry.update(self.registry_backup)
4123
4124    @classmethod
4125    def _test_finalize(cls, conn):
4126        class Foo(object):
4127            pass
4128
4129        a = Foo()
4130        util.Finalize(a, conn.send, args=('a',))
4131        del a           # triggers callback for a
4132
4133        b = Foo()
4134        close_b = util.Finalize(b, conn.send, args=('b',))
4135        close_b()       # triggers callback for b
4136        close_b()       # does nothing because callback has already been called
4137        del b           # does nothing because callback has already been called
4138
4139        c = Foo()
4140        util.Finalize(c, conn.send, args=('c',))
4141
4142        d10 = Foo()
4143        util.Finalize(d10, conn.send, args=('d10',), exitpriority=1)
4144
4145        d01 = Foo()
4146        util.Finalize(d01, conn.send, args=('d01',), exitpriority=0)
4147        d02 = Foo()
4148        util.Finalize(d02, conn.send, args=('d02',), exitpriority=0)
4149        d03 = Foo()
4150        util.Finalize(d03, conn.send, args=('d03',), exitpriority=0)
4151
4152        util.Finalize(None, conn.send, args=('e',), exitpriority=-10)
4153
4154        util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100)
4155
4156        # call multiprocessing's cleanup function then exit process without
4157        # garbage collecting locals
4158        util._exit_function()
4159        conn.close()
4160        os._exit(0)
4161
4162    def test_finalize(self):
4163        conn, child_conn = self.Pipe()
4164
4165        p = self.Process(target=self._test_finalize, args=(child_conn,))
4166        p.daemon = True
4167        p.start()
4168        p.join()
4169
4170        result = [obj for obj in iter(conn.recv, 'STOP')]
4171        self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e'])
4172
4173    def test_thread_safety(self):
4174        # bpo-24484: _run_finalizers() should be thread-safe
4175        def cb():
4176            pass
4177
4178        class Foo(object):
4179            def __init__(self):
4180                self.ref = self  # create reference cycle
4181                # insert finalizer at random key
4182                util.Finalize(self, cb, exitpriority=random.randint(1, 100))
4183
4184        finish = False
4185        exc = None
4186
4187        def run_finalizers():
4188            nonlocal exc
4189            while not finish:
4190                time.sleep(random.random() * 1e-1)
4191                try:
4192                    # A GC run will eventually happen during this,
4193                    # collecting stale Foo's and mutating the registry
4194                    util._run_finalizers()
4195                except Exception as e:
4196                    exc = e
4197
4198        def make_finalizers():
4199            nonlocal exc
4200            d = {}
4201            while not finish:
4202                try:
4203                    # Old Foo's get gradually replaced and later
4204                    # collected by the GC (because of the cyclic ref)
4205                    d[random.getrandbits(5)] = {Foo() for i in range(10)}
4206                except Exception as e:
4207                    exc = e
4208                    d.clear()
4209
4210        old_interval = sys.getswitchinterval()
4211        old_threshold = gc.get_threshold()
4212        try:
4213            sys.setswitchinterval(1e-6)
4214            gc.set_threshold(5, 5, 5)
4215            threads = [threading.Thread(target=run_finalizers),
4216                       threading.Thread(target=make_finalizers)]
4217            with test.support.start_threads(threads):
4218                time.sleep(4.0)  # Wait a bit to trigger race condition
4219                finish = True
4220            if exc is not None:
4221                raise exc
4222        finally:
4223            sys.setswitchinterval(old_interval)
4224            gc.set_threshold(*old_threshold)
4225            gc.collect()  # Collect remaining Foo's
4226
4227
4228#
4229# Test that from ... import * works for each module
4230#
4231
4232class _TestImportStar(unittest.TestCase):
4233
4234    def get_module_names(self):
4235        import glob
4236        folder = os.path.dirname(multiprocessing.__file__)
4237        pattern = os.path.join(glob.escape(folder), '*.py')
4238        files = glob.glob(pattern)
4239        modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files]
4240        modules = ['multiprocessing.' + m for m in modules]
4241        modules.remove('multiprocessing.__init__')
4242        modules.append('multiprocessing')
4243        return modules
4244
4245    def test_import(self):
4246        modules = self.get_module_names()
4247        if sys.platform == 'win32':
4248            modules.remove('multiprocessing.popen_fork')
4249            modules.remove('multiprocessing.popen_forkserver')
4250            modules.remove('multiprocessing.popen_spawn_posix')
4251        else:
4252            modules.remove('multiprocessing.popen_spawn_win32')
4253            if not HAS_REDUCTION:
4254                modules.remove('multiprocessing.popen_forkserver')
4255
4256        if c_int is None:
4257            # This module requires _ctypes
4258            modules.remove('multiprocessing.sharedctypes')
4259
4260        for name in modules:
4261            __import__(name)
4262            mod = sys.modules[name]
4263            self.assertTrue(hasattr(mod, '__all__'), name)
4264
4265            for attr in mod.__all__:
4266                self.assertTrue(
4267                    hasattr(mod, attr),
4268                    '%r does not have attribute %r' % (mod, attr)
4269                    )
4270
4271#
4272# Quick test that logging works -- does not test logging output
4273#
4274
4275class _TestLogging(BaseTestCase):
4276
4277    ALLOWED_TYPES = ('processes',)
4278
4279    def test_enable_logging(self):
4280        logger = multiprocessing.get_logger()
4281        logger.setLevel(util.SUBWARNING)
4282        self.assertTrue(logger is not None)
4283        logger.debug('this will not be printed')
4284        logger.info('nor will this')
4285        logger.setLevel(LOG_LEVEL)
4286
4287    @classmethod
4288    def _test_level(cls, conn):
4289        logger = multiprocessing.get_logger()
4290        conn.send(logger.getEffectiveLevel())
4291
4292    def test_level(self):
4293        LEVEL1 = 32
4294        LEVEL2 = 37
4295
4296        logger = multiprocessing.get_logger()
4297        root_logger = logging.getLogger()
4298        root_level = root_logger.level
4299
4300        reader, writer = multiprocessing.Pipe(duplex=False)
4301
4302        logger.setLevel(LEVEL1)
4303        p = self.Process(target=self._test_level, args=(writer,))
4304        p.start()
4305        self.assertEqual(LEVEL1, reader.recv())
4306        p.join()
4307        p.close()
4308
4309        logger.setLevel(logging.NOTSET)
4310        root_logger.setLevel(LEVEL2)
4311        p = self.Process(target=self._test_level, args=(writer,))
4312        p.start()
4313        self.assertEqual(LEVEL2, reader.recv())
4314        p.join()
4315        p.close()
4316
4317        root_logger.setLevel(root_level)
4318        logger.setLevel(level=LOG_LEVEL)
4319
4320
4321# class _TestLoggingProcessName(BaseTestCase):
4322#
4323#     def handle(self, record):
4324#         assert record.processName == multiprocessing.current_process().name
4325#         self.__handled = True
4326#
4327#     def test_logging(self):
4328#         handler = logging.Handler()
4329#         handler.handle = self.handle
4330#         self.__handled = False
4331#         # Bypass getLogger() and side-effects
4332#         logger = logging.getLoggerClass()(
4333#                 'multiprocessing.test.TestLoggingProcessName')
4334#         logger.addHandler(handler)
4335#         logger.propagate = False
4336#
4337#         logger.warn('foo')
4338#         assert self.__handled
4339
4340#
4341# Check that Process.join() retries if os.waitpid() fails with EINTR
4342#
4343
4344class _TestPollEintr(BaseTestCase):
4345
4346    ALLOWED_TYPES = ('processes',)
4347
4348    @classmethod
4349    def _killer(cls, pid):
4350        time.sleep(0.1)
4351        os.kill(pid, signal.SIGUSR1)
4352
4353    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4354    def test_poll_eintr(self):
4355        got_signal = [False]
4356        def record(*args):
4357            got_signal[0] = True
4358        pid = os.getpid()
4359        oldhandler = signal.signal(signal.SIGUSR1, record)
4360        try:
4361            killer = self.Process(target=self._killer, args=(pid,))
4362            killer.start()
4363            try:
4364                p = self.Process(target=time.sleep, args=(2,))
4365                p.start()
4366                p.join()
4367            finally:
4368                killer.join()
4369            self.assertTrue(got_signal[0])
4370            self.assertEqual(p.exitcode, 0)
4371        finally:
4372            signal.signal(signal.SIGUSR1, oldhandler)
4373
4374#
4375# Test to verify handle verification, see issue 3321
4376#
4377
4378class TestInvalidHandle(unittest.TestCase):
4379
4380    @unittest.skipIf(WIN32, "skipped on Windows")
4381    def test_invalid_handles(self):
4382        conn = multiprocessing.connection.Connection(44977608)
4383        # check that poll() doesn't crash
4384        try:
4385            conn.poll()
4386        except (ValueError, OSError):
4387            pass
4388        finally:
4389            # Hack private attribute _handle to avoid printing an error
4390            # in conn.__del__
4391            conn._handle = None
4392        self.assertRaises((ValueError, OSError),
4393                          multiprocessing.connection.Connection, -1)
4394
4395
4396
4397class OtherTest(unittest.TestCase):
4398    # TODO: add more tests for deliver/answer challenge.
4399    def test_deliver_challenge_auth_failure(self):
4400        class _FakeConnection(object):
4401            def recv_bytes(self, size):
4402                return b'something bogus'
4403            def send_bytes(self, data):
4404                pass
4405        self.assertRaises(multiprocessing.AuthenticationError,
4406                          multiprocessing.connection.deliver_challenge,
4407                          _FakeConnection(), b'abc')
4408
4409    def test_answer_challenge_auth_failure(self):
4410        class _FakeConnection(object):
4411            def __init__(self):
4412                self.count = 0
4413            def recv_bytes(self, size):
4414                self.count += 1
4415                if self.count == 1:
4416                    return multiprocessing.connection.CHALLENGE
4417                elif self.count == 2:
4418                    return b'something bogus'
4419                return b''
4420            def send_bytes(self, data):
4421                pass
4422        self.assertRaises(multiprocessing.AuthenticationError,
4423                          multiprocessing.connection.answer_challenge,
4424                          _FakeConnection(), b'abc')
4425
4426#
4427# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585
4428#
4429
4430def initializer(ns):
4431    ns.test += 1
4432
4433class TestInitializers(unittest.TestCase):
4434    def setUp(self):
4435        self.mgr = multiprocessing.Manager()
4436        self.ns = self.mgr.Namespace()
4437        self.ns.test = 0
4438
4439    def tearDown(self):
4440        self.mgr.shutdown()
4441        self.mgr.join()
4442
4443    def test_manager_initializer(self):
4444        m = multiprocessing.managers.SyncManager()
4445        self.assertRaises(TypeError, m.start, 1)
4446        m.start(initializer, (self.ns,))
4447        self.assertEqual(self.ns.test, 1)
4448        m.shutdown()
4449        m.join()
4450
4451    def test_pool_initializer(self):
4452        self.assertRaises(TypeError, multiprocessing.Pool, initializer=1)
4453        p = multiprocessing.Pool(1, initializer, (self.ns,))
4454        p.close()
4455        p.join()
4456        self.assertEqual(self.ns.test, 1)
4457
4458#
4459# Issue 5155, 5313, 5331: Test process in processes
4460# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior
4461#
4462
4463def _this_sub_process(q):
4464    try:
4465        item = q.get(block=False)
4466    except pyqueue.Empty:
4467        pass
4468
4469def _test_process():
4470    queue = multiprocessing.Queue()
4471    subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,))
4472    subProc.daemon = True
4473    subProc.start()
4474    subProc.join()
4475
4476def _afunc(x):
4477    return x*x
4478
4479def pool_in_process():
4480    pool = multiprocessing.Pool(processes=4)
4481    x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7])
4482    pool.close()
4483    pool.join()
4484
4485class _file_like(object):
4486    def __init__(self, delegate):
4487        self._delegate = delegate
4488        self._pid = None
4489
4490    @property
4491    def cache(self):
4492        pid = os.getpid()
4493        # There are no race conditions since fork keeps only the running thread
4494        if pid != self._pid:
4495            self._pid = pid
4496            self._cache = []
4497        return self._cache
4498
4499    def write(self, data):
4500        self.cache.append(data)
4501
4502    def flush(self):
4503        self._delegate.write(''.join(self.cache))
4504        self._cache = []
4505
4506class TestStdinBadfiledescriptor(unittest.TestCase):
4507
4508    def test_queue_in_process(self):
4509        proc = multiprocessing.Process(target=_test_process)
4510        proc.start()
4511        proc.join()
4512
4513    def test_pool_in_process(self):
4514        p = multiprocessing.Process(target=pool_in_process)
4515        p.start()
4516        p.join()
4517
4518    def test_flushing(self):
4519        sio = io.StringIO()
4520        flike = _file_like(sio)
4521        flike.write('foo')
4522        proc = multiprocessing.Process(target=lambda: flike.flush())
4523        flike.flush()
4524        assert sio.getvalue() == 'foo'
4525
4526
4527class TestWait(unittest.TestCase):
4528
4529    @classmethod
4530    def _child_test_wait(cls, w, slow):
4531        for i in range(10):
4532            if slow:
4533                time.sleep(random.random()*0.1)
4534            w.send((i, os.getpid()))
4535        w.close()
4536
4537    def test_wait(self, slow=False):
4538        from multiprocessing.connection import wait
4539        readers = []
4540        procs = []
4541        messages = []
4542
4543        for i in range(4):
4544            r, w = multiprocessing.Pipe(duplex=False)
4545            p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow))
4546            p.daemon = True
4547            p.start()
4548            w.close()
4549            readers.append(r)
4550            procs.append(p)
4551            self.addCleanup(p.join)
4552
4553        while readers:
4554            for r in wait(readers):
4555                try:
4556                    msg = r.recv()
4557                except EOFError:
4558                    readers.remove(r)
4559                    r.close()
4560                else:
4561                    messages.append(msg)
4562
4563        messages.sort()
4564        expected = sorted((i, p.pid) for i in range(10) for p in procs)
4565        self.assertEqual(messages, expected)
4566
4567    @classmethod
4568    def _child_test_wait_socket(cls, address, slow):
4569        s = socket.socket()
4570        s.connect(address)
4571        for i in range(10):
4572            if slow:
4573                time.sleep(random.random()*0.1)
4574            s.sendall(('%s\n' % i).encode('ascii'))
4575        s.close()
4576
4577    def test_wait_socket(self, slow=False):
4578        from multiprocessing.connection import wait
4579        l = socket.create_server((test.support.HOST, 0))
4580        addr = l.getsockname()
4581        readers = []
4582        procs = []
4583        dic = {}
4584
4585        for i in range(4):
4586            p = multiprocessing.Process(target=self._child_test_wait_socket,
4587                                        args=(addr, slow))
4588            p.daemon = True
4589            p.start()
4590            procs.append(p)
4591            self.addCleanup(p.join)
4592
4593        for i in range(4):
4594            r, _ = l.accept()
4595            readers.append(r)
4596            dic[r] = []
4597        l.close()
4598
4599        while readers:
4600            for r in wait(readers):
4601                msg = r.recv(32)
4602                if not msg:
4603                    readers.remove(r)
4604                    r.close()
4605                else:
4606                    dic[r].append(msg)
4607
4608        expected = ''.join('%s\n' % i for i in range(10)).encode('ascii')
4609        for v in dic.values():
4610            self.assertEqual(b''.join(v), expected)
4611
4612    def test_wait_slow(self):
4613        self.test_wait(True)
4614
4615    def test_wait_socket_slow(self):
4616        self.test_wait_socket(True)
4617
4618    def test_wait_timeout(self):
4619        from multiprocessing.connection import wait
4620
4621        expected = 5
4622        a, b = multiprocessing.Pipe()
4623
4624        start = time.monotonic()
4625        res = wait([a, b], expected)
4626        delta = time.monotonic() - start
4627
4628        self.assertEqual(res, [])
4629        self.assertLess(delta, expected * 2)
4630        self.assertGreater(delta, expected * 0.5)
4631
4632        b.send(None)
4633
4634        start = time.monotonic()
4635        res = wait([a, b], 20)
4636        delta = time.monotonic() - start
4637
4638        self.assertEqual(res, [a])
4639        self.assertLess(delta, 0.4)
4640
4641    @classmethod
4642    def signal_and_sleep(cls, sem, period):
4643        sem.release()
4644        time.sleep(period)
4645
4646    def test_wait_integer(self):
4647        from multiprocessing.connection import wait
4648
4649        expected = 3
4650        sorted_ = lambda l: sorted(l, key=lambda x: id(x))
4651        sem = multiprocessing.Semaphore(0)
4652        a, b = multiprocessing.Pipe()
4653        p = multiprocessing.Process(target=self.signal_and_sleep,
4654                                    args=(sem, expected))
4655
4656        p.start()
4657        self.assertIsInstance(p.sentinel, int)
4658        self.assertTrue(sem.acquire(timeout=20))
4659
4660        start = time.monotonic()
4661        res = wait([a, p.sentinel, b], expected + 20)
4662        delta = time.monotonic() - start
4663
4664        self.assertEqual(res, [p.sentinel])
4665        self.assertLess(delta, expected + 2)
4666        self.assertGreater(delta, expected - 2)
4667
4668        a.send(None)
4669
4670        start = time.monotonic()
4671        res = wait([a, p.sentinel, b], 20)
4672        delta = time.monotonic() - start
4673
4674        self.assertEqual(sorted_(res), sorted_([p.sentinel, b]))
4675        self.assertLess(delta, 0.4)
4676
4677        b.send(None)
4678
4679        start = time.monotonic()
4680        res = wait([a, p.sentinel, b], 20)
4681        delta = time.monotonic() - start
4682
4683        self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b]))
4684        self.assertLess(delta, 0.4)
4685
4686        p.terminate()
4687        p.join()
4688
4689    def test_neg_timeout(self):
4690        from multiprocessing.connection import wait
4691        a, b = multiprocessing.Pipe()
4692        t = time.monotonic()
4693        res = wait([a], timeout=-1)
4694        t = time.monotonic() - t
4695        self.assertEqual(res, [])
4696        self.assertLess(t, 1)
4697        a.close()
4698        b.close()
4699
4700#
4701# Issue 14151: Test invalid family on invalid environment
4702#
4703
4704class TestInvalidFamily(unittest.TestCase):
4705
4706    @unittest.skipIf(WIN32, "skipped on Windows")
4707    def test_invalid_family(self):
4708        with self.assertRaises(ValueError):
4709            multiprocessing.connection.Listener(r'\\.\test')
4710
4711    @unittest.skipUnless(WIN32, "skipped on non-Windows platforms")
4712    def test_invalid_family_win32(self):
4713        with self.assertRaises(ValueError):
4714            multiprocessing.connection.Listener('/var/test.pipe')
4715
4716#
4717# Issue 12098: check sys.flags of child matches that for parent
4718#
4719
4720class TestFlags(unittest.TestCase):
4721    @classmethod
4722    def run_in_grandchild(cls, conn):
4723        conn.send(tuple(sys.flags))
4724
4725    @classmethod
4726    def run_in_child(cls):
4727        import json
4728        r, w = multiprocessing.Pipe(duplex=False)
4729        p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,))
4730        p.start()
4731        grandchild_flags = r.recv()
4732        p.join()
4733        r.close()
4734        w.close()
4735        flags = (tuple(sys.flags), grandchild_flags)
4736        print(json.dumps(flags))
4737
4738    def test_flags(self):
4739        import json
4740        # start child process using unusual flags
4741        prog = ('from test._test_multiprocessing import TestFlags; ' +
4742                'TestFlags.run_in_child()')
4743        data = subprocess.check_output(
4744            [sys.executable, '-E', '-S', '-O', '-c', prog])
4745        child_flags, grandchild_flags = json.loads(data.decode('ascii'))
4746        self.assertEqual(child_flags, grandchild_flags)
4747
4748#
4749# Test interaction with socket timeouts - see Issue #6056
4750#
4751
4752class TestTimeouts(unittest.TestCase):
4753    @classmethod
4754    def _test_timeout(cls, child, address):
4755        time.sleep(1)
4756        child.send(123)
4757        child.close()
4758        conn = multiprocessing.connection.Client(address)
4759        conn.send(456)
4760        conn.close()
4761
4762    def test_timeout(self):
4763        old_timeout = socket.getdefaulttimeout()
4764        try:
4765            socket.setdefaulttimeout(0.1)
4766            parent, child = multiprocessing.Pipe(duplex=True)
4767            l = multiprocessing.connection.Listener(family='AF_INET')
4768            p = multiprocessing.Process(target=self._test_timeout,
4769                                        args=(child, l.address))
4770            p.start()
4771            child.close()
4772            self.assertEqual(parent.recv(), 123)
4773            parent.close()
4774            conn = l.accept()
4775            self.assertEqual(conn.recv(), 456)
4776            conn.close()
4777            l.close()
4778            join_process(p)
4779        finally:
4780            socket.setdefaulttimeout(old_timeout)
4781
4782#
4783# Test what happens with no "if __name__ == '__main__'"
4784#
4785
4786class TestNoForkBomb(unittest.TestCase):
4787    def test_noforkbomb(self):
4788        sm = multiprocessing.get_start_method()
4789        name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py')
4790        if sm != 'fork':
4791            rc, out, err = test.support.script_helper.assert_python_failure(name, sm)
4792            self.assertEqual(out, b'')
4793            self.assertIn(b'RuntimeError', err)
4794        else:
4795            rc, out, err = test.support.script_helper.assert_python_ok(name, sm)
4796            self.assertEqual(out.rstrip(), b'123')
4797            self.assertEqual(err, b'')
4798
4799#
4800# Issue #17555: ForkAwareThreadLock
4801#
4802
4803class TestForkAwareThreadLock(unittest.TestCase):
4804    # We recursively start processes.  Issue #17555 meant that the
4805    # after fork registry would get duplicate entries for the same
4806    # lock.  The size of the registry at generation n was ~2**n.
4807
4808    @classmethod
4809    def child(cls, n, conn):
4810        if n > 1:
4811            p = multiprocessing.Process(target=cls.child, args=(n-1, conn))
4812            p.start()
4813            conn.close()
4814            join_process(p)
4815        else:
4816            conn.send(len(util._afterfork_registry))
4817        conn.close()
4818
4819    def test_lock(self):
4820        r, w = multiprocessing.Pipe(False)
4821        l = util.ForkAwareThreadLock()
4822        old_size = len(util._afterfork_registry)
4823        p = multiprocessing.Process(target=self.child, args=(5, w))
4824        p.start()
4825        w.close()
4826        new_size = r.recv()
4827        join_process(p)
4828        self.assertLessEqual(new_size, old_size)
4829
4830#
4831# Check that non-forked child processes do not inherit unneeded fds/handles
4832#
4833
4834class TestCloseFds(unittest.TestCase):
4835
4836    def get_high_socket_fd(self):
4837        if WIN32:
4838            # The child process will not have any socket handles, so
4839            # calling socket.fromfd() should produce WSAENOTSOCK even
4840            # if there is a handle of the same number.
4841            return socket.socket().detach()
4842        else:
4843            # We want to produce a socket with an fd high enough that a
4844            # freshly created child process will not have any fds as high.
4845            fd = socket.socket().detach()
4846            to_close = []
4847            while fd < 50:
4848                to_close.append(fd)
4849                fd = os.dup(fd)
4850            for x in to_close:
4851                os.close(x)
4852            return fd
4853
4854    def close(self, fd):
4855        if WIN32:
4856            socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close()
4857        else:
4858            os.close(fd)
4859
4860    @classmethod
4861    def _test_closefds(cls, conn, fd):
4862        try:
4863            s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM)
4864        except Exception as e:
4865            conn.send(e)
4866        else:
4867            s.close()
4868            conn.send(None)
4869
4870    def test_closefd(self):
4871        if not HAS_REDUCTION:
4872            raise unittest.SkipTest('requires fd pickling')
4873
4874        reader, writer = multiprocessing.Pipe()
4875        fd = self.get_high_socket_fd()
4876        try:
4877            p = multiprocessing.Process(target=self._test_closefds,
4878                                        args=(writer, fd))
4879            p.start()
4880            writer.close()
4881            e = reader.recv()
4882            join_process(p)
4883        finally:
4884            self.close(fd)
4885            writer.close()
4886            reader.close()
4887
4888        if multiprocessing.get_start_method() == 'fork':
4889            self.assertIs(e, None)
4890        else:
4891            WSAENOTSOCK = 10038
4892            self.assertIsInstance(e, OSError)
4893            self.assertTrue(e.errno == errno.EBADF or
4894                            e.winerror == WSAENOTSOCK, e)
4895
4896#
4897# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc
4898#
4899
4900class TestIgnoreEINTR(unittest.TestCase):
4901
4902    # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block
4903    CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE)
4904
4905    @classmethod
4906    def _test_ignore(cls, conn):
4907        def handler(signum, frame):
4908            pass
4909        signal.signal(signal.SIGUSR1, handler)
4910        conn.send('ready')
4911        x = conn.recv()
4912        conn.send(x)
4913        conn.send_bytes(b'x' * cls.CONN_MAX_SIZE)
4914
4915    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4916    def test_ignore(self):
4917        conn, child_conn = multiprocessing.Pipe()
4918        try:
4919            p = multiprocessing.Process(target=self._test_ignore,
4920                                        args=(child_conn,))
4921            p.daemon = True
4922            p.start()
4923            child_conn.close()
4924            self.assertEqual(conn.recv(), 'ready')
4925            time.sleep(0.1)
4926            os.kill(p.pid, signal.SIGUSR1)
4927            time.sleep(0.1)
4928            conn.send(1234)
4929            self.assertEqual(conn.recv(), 1234)
4930            time.sleep(0.1)
4931            os.kill(p.pid, signal.SIGUSR1)
4932            self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE)
4933            time.sleep(0.1)
4934            p.join()
4935        finally:
4936            conn.close()
4937
4938    @classmethod
4939    def _test_ignore_listener(cls, conn):
4940        def handler(signum, frame):
4941            pass
4942        signal.signal(signal.SIGUSR1, handler)
4943        with multiprocessing.connection.Listener() as l:
4944            conn.send(l.address)
4945            a = l.accept()
4946            a.send('welcome')
4947
4948    @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1')
4949    def test_ignore_listener(self):
4950        conn, child_conn = multiprocessing.Pipe()
4951        try:
4952            p = multiprocessing.Process(target=self._test_ignore_listener,
4953                                        args=(child_conn,))
4954            p.daemon = True
4955            p.start()
4956            child_conn.close()
4957            address = conn.recv()
4958            time.sleep(0.1)
4959            os.kill(p.pid, signal.SIGUSR1)
4960            time.sleep(0.1)
4961            client = multiprocessing.connection.Client(address)
4962            self.assertEqual(client.recv(), 'welcome')
4963            p.join()
4964        finally:
4965            conn.close()
4966
4967class TestStartMethod(unittest.TestCase):
4968    @classmethod
4969    def _check_context(cls, conn):
4970        conn.send(multiprocessing.get_start_method())
4971
4972    def check_context(self, ctx):
4973        r, w = ctx.Pipe(duplex=False)
4974        p = ctx.Process(target=self._check_context, args=(w,))
4975        p.start()
4976        w.close()
4977        child_method = r.recv()
4978        r.close()
4979        p.join()
4980        self.assertEqual(child_method, ctx.get_start_method())
4981
4982    def test_context(self):
4983        for method in ('fork', 'spawn', 'forkserver'):
4984            try:
4985                ctx = multiprocessing.get_context(method)
4986            except ValueError:
4987                continue
4988            self.assertEqual(ctx.get_start_method(), method)
4989            self.assertIs(ctx.get_context(), ctx)
4990            self.assertRaises(ValueError, ctx.set_start_method, 'spawn')
4991            self.assertRaises(ValueError, ctx.set_start_method, None)
4992            self.check_context(ctx)
4993
4994    def test_set_get(self):
4995        multiprocessing.set_forkserver_preload(PRELOAD)
4996        count = 0
4997        old_method = multiprocessing.get_start_method()
4998        try:
4999            for method in ('fork', 'spawn', 'forkserver'):
5000                try:
5001                    multiprocessing.set_start_method(method, force=True)
5002                except ValueError:
5003                    continue
5004                self.assertEqual(multiprocessing.get_start_method(), method)
5005                ctx = multiprocessing.get_context()
5006                self.assertEqual(ctx.get_start_method(), method)
5007                self.assertTrue(type(ctx).__name__.lower().startswith(method))
5008                self.assertTrue(
5009                    ctx.Process.__name__.lower().startswith(method))
5010                self.check_context(multiprocessing)
5011                count += 1
5012        finally:
5013            multiprocessing.set_start_method(old_method, force=True)
5014        self.assertGreaterEqual(count, 1)
5015
5016    def test_get_all(self):
5017        methods = multiprocessing.get_all_start_methods()
5018        if sys.platform == 'win32':
5019            self.assertEqual(methods, ['spawn'])
5020        else:
5021            self.assertTrue(methods == ['fork', 'spawn'] or
5022                            methods == ['spawn', 'fork'] or
5023                            methods == ['fork', 'spawn', 'forkserver'] or
5024                            methods == ['spawn', 'fork', 'forkserver'])
5025
5026    def test_preload_resources(self):
5027        if multiprocessing.get_start_method() != 'forkserver':
5028            self.skipTest("test only relevant for 'forkserver' method")
5029        name = os.path.join(os.path.dirname(__file__), 'mp_preload.py')
5030        rc, out, err = test.support.script_helper.assert_python_ok(name)
5031        out = out.decode()
5032        err = err.decode()
5033        if out.rstrip() != 'ok' or err != '':
5034            print(out)
5035            print(err)
5036            self.fail("failed spawning forkserver or grandchild")
5037
5038
5039@unittest.skipIf(sys.platform == "win32",
5040                 "test semantics don't make sense on Windows")
5041class TestResourceTracker(unittest.TestCase):
5042
5043    def test_resource_tracker(self):
5044        #
5045        # Check that killing process does not leak named semaphores
5046        #
5047        cmd = '''if 1:
5048            import time, os, tempfile
5049            import multiprocessing as mp
5050            from multiprocessing import resource_tracker
5051            from multiprocessing.shared_memory import SharedMemory
5052
5053            mp.set_start_method("spawn")
5054            rand = tempfile._RandomNameSequence()
5055
5056
5057            def create_and_register_resource(rtype):
5058                if rtype == "semaphore":
5059                    lock = mp.Lock()
5060                    return lock, lock._semlock.name
5061                elif rtype == "shared_memory":
5062                    sm = SharedMemory(create=True, size=10)
5063                    return sm, sm._name
5064                else:
5065                    raise ValueError(
5066                        "Resource type {{}} not understood".format(rtype))
5067
5068
5069            resource1, rname1 = create_and_register_resource("{rtype}")
5070            resource2, rname2 = create_and_register_resource("{rtype}")
5071
5072            os.write({w}, rname1.encode("ascii") + b"\\n")
5073            os.write({w}, rname2.encode("ascii") + b"\\n")
5074
5075            time.sleep(10)
5076        '''
5077        for rtype in resource_tracker._CLEANUP_FUNCS:
5078            with self.subTest(rtype=rtype):
5079                if rtype == "noop":
5080                    # Artefact resource type used by the resource_tracker
5081                    continue
5082                r, w = os.pipe()
5083                p = subprocess.Popen([sys.executable,
5084                                     '-E', '-c', cmd.format(w=w, rtype=rtype)],
5085                                     pass_fds=[w],
5086                                     stderr=subprocess.PIPE)
5087                os.close(w)
5088                with open(r, 'rb', closefd=True) as f:
5089                    name1 = f.readline().rstrip().decode('ascii')
5090                    name2 = f.readline().rstrip().decode('ascii')
5091                _resource_unlink(name1, rtype)
5092                p.terminate()
5093                p.wait()
5094
5095                deadline = time.monotonic() + 60
5096                while time.monotonic() < deadline:
5097                    time.sleep(.5)
5098                    try:
5099                        _resource_unlink(name2, rtype)
5100                    except OSError as e:
5101                        # docs say it should be ENOENT, but OSX seems to give
5102                        # EINVAL
5103                        self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL))
5104                        break
5105                else:
5106                    raise AssertionError(
5107                        f"A {rtype} resource was leaked after a process was "
5108                        f"abruptly terminated.")
5109                err = p.stderr.read().decode('utf-8')
5110                p.stderr.close()
5111                expected = ('resource_tracker: There appear to be 2 leaked {} '
5112                            'objects'.format(
5113                            rtype))
5114                self.assertRegex(err, expected)
5115                self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1)
5116
5117    def check_resource_tracker_death(self, signum, should_die):
5118        # bpo-31310: if the semaphore tracker process has died, it should
5119        # be restarted implicitly.
5120        from multiprocessing.resource_tracker import _resource_tracker
5121        pid = _resource_tracker._pid
5122        if pid is not None:
5123            os.kill(pid, signal.SIGKILL)
5124            os.waitpid(pid, 0)
5125        with warnings.catch_warnings():
5126            warnings.simplefilter("ignore")
5127            _resource_tracker.ensure_running()
5128        pid = _resource_tracker._pid
5129
5130        os.kill(pid, signum)
5131        time.sleep(1.0)  # give it time to die
5132
5133        ctx = multiprocessing.get_context("spawn")
5134        with warnings.catch_warnings(record=True) as all_warn:
5135            warnings.simplefilter("always")
5136            sem = ctx.Semaphore()
5137            sem.acquire()
5138            sem.release()
5139            wr = weakref.ref(sem)
5140            # ensure `sem` gets collected, which triggers communication with
5141            # the semaphore tracker
5142            del sem
5143            gc.collect()
5144            self.assertIsNone(wr())
5145            if should_die:
5146                self.assertEqual(len(all_warn), 1)
5147                the_warn = all_warn[0]
5148                self.assertTrue(issubclass(the_warn.category, UserWarning))
5149                self.assertTrue("resource_tracker: process died"
5150                                in str(the_warn.message))
5151            else:
5152                self.assertEqual(len(all_warn), 0)
5153
5154    def test_resource_tracker_sigint(self):
5155        # Catchable signal (ignored by semaphore tracker)
5156        self.check_resource_tracker_death(signal.SIGINT, False)
5157
5158    def test_resource_tracker_sigterm(self):
5159        # Catchable signal (ignored by semaphore tracker)
5160        self.check_resource_tracker_death(signal.SIGTERM, False)
5161
5162    def test_resource_tracker_sigkill(self):
5163        # Uncatchable signal.
5164        self.check_resource_tracker_death(signal.SIGKILL, True)
5165
5166    @staticmethod
5167    def _is_resource_tracker_reused(conn, pid):
5168        from multiprocessing.resource_tracker import _resource_tracker
5169        _resource_tracker.ensure_running()
5170        # The pid should be None in the child process, expect for the fork
5171        # context. It should not be a new value.
5172        reused = _resource_tracker._pid in (None, pid)
5173        reused &= _resource_tracker._check_alive()
5174        conn.send(reused)
5175
5176    def test_resource_tracker_reused(self):
5177        from multiprocessing.resource_tracker import _resource_tracker
5178        _resource_tracker.ensure_running()
5179        pid = _resource_tracker._pid
5180
5181        r, w = multiprocessing.Pipe(duplex=False)
5182        p = multiprocessing.Process(target=self._is_resource_tracker_reused,
5183                                    args=(w, pid))
5184        p.start()
5185        is_resource_tracker_reused = r.recv()
5186
5187        # Clean up
5188        p.join()
5189        w.close()
5190        r.close()
5191
5192        self.assertTrue(is_resource_tracker_reused)
5193
5194
5195class TestSimpleQueue(unittest.TestCase):
5196
5197    @classmethod
5198    def _test_empty(cls, queue, child_can_start, parent_can_continue):
5199        child_can_start.wait()
5200        # issue 30301, could fail under spawn and forkserver
5201        try:
5202            queue.put(queue.empty())
5203            queue.put(queue.empty())
5204        finally:
5205            parent_can_continue.set()
5206
5207    def test_empty(self):
5208        queue = multiprocessing.SimpleQueue()
5209        child_can_start = multiprocessing.Event()
5210        parent_can_continue = multiprocessing.Event()
5211
5212        proc = multiprocessing.Process(
5213            target=self._test_empty,
5214            args=(queue, child_can_start, parent_can_continue)
5215        )
5216        proc.daemon = True
5217        proc.start()
5218
5219        self.assertTrue(queue.empty())
5220
5221        child_can_start.set()
5222        parent_can_continue.wait()
5223
5224        self.assertFalse(queue.empty())
5225        self.assertEqual(queue.get(), True)
5226        self.assertEqual(queue.get(), False)
5227        self.assertTrue(queue.empty())
5228
5229        proc.join()
5230
5231
5232class TestPoolNotLeakOnFailure(unittest.TestCase):
5233
5234    def test_release_unused_processes(self):
5235        # Issue #19675: During pool creation, if we can't create a process,
5236        # don't leak already created ones.
5237        will_fail_in = 3
5238        forked_processes = []
5239
5240        class FailingForkProcess:
5241            def __init__(self, **kwargs):
5242                self.name = 'Fake Process'
5243                self.exitcode = None
5244                self.state = None
5245                forked_processes.append(self)
5246
5247            def start(self):
5248                nonlocal will_fail_in
5249                if will_fail_in <= 0:
5250                    raise OSError("Manually induced OSError")
5251                will_fail_in -= 1
5252                self.state = 'started'
5253
5254            def terminate(self):
5255                self.state = 'stopping'
5256
5257            def join(self):
5258                if self.state == 'stopping':
5259                    self.state = 'stopped'
5260
5261            def is_alive(self):
5262                return self.state == 'started' or self.state == 'stopping'
5263
5264        with self.assertRaisesRegex(OSError, 'Manually induced OSError'):
5265            p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock(
5266                Process=FailingForkProcess))
5267            p.close()
5268            p.join()
5269        self.assertFalse(
5270            any(process.is_alive() for process in forked_processes))
5271
5272
5273class TestSyncManagerTypes(unittest.TestCase):
5274    """Test all the types which can be shared between a parent and a
5275    child process by using a manager which acts as an intermediary
5276    between them.
5277
5278    In the following unit-tests the base type is created in the parent
5279    process, the @classmethod represents the worker process and the
5280    shared object is readable and editable between the two.
5281
5282    # The child.
5283    @classmethod
5284    def _test_list(cls, obj):
5285        assert obj[0] == 5
5286        assert obj.append(6)
5287
5288    # The parent.
5289    def test_list(self):
5290        o = self.manager.list()
5291        o.append(5)
5292        self.run_worker(self._test_list, o)
5293        assert o[1] == 6
5294    """
5295    manager_class = multiprocessing.managers.SyncManager
5296
5297    def setUp(self):
5298        self.manager = self.manager_class()
5299        self.manager.start()
5300        self.proc = None
5301
5302    def tearDown(self):
5303        if self.proc is not None and self.proc.is_alive():
5304            self.proc.terminate()
5305            self.proc.join()
5306        self.manager.shutdown()
5307        self.manager = None
5308        self.proc = None
5309
5310    @classmethod
5311    def setUpClass(cls):
5312        support.reap_children()
5313
5314    tearDownClass = setUpClass
5315
5316    def wait_proc_exit(self):
5317        # Only the manager process should be returned by active_children()
5318        # but this can take a bit on slow machines, so wait a few seconds
5319        # if there are other children too (see #17395).
5320        join_process(self.proc)
5321        start_time = time.monotonic()
5322        t = 0.01
5323        while len(multiprocessing.active_children()) > 1:
5324            time.sleep(t)
5325            t *= 2
5326            dt = time.monotonic() - start_time
5327            if dt >= 5.0:
5328                test.support.environment_altered = True
5329                support.print_warning(f"multiprocessing.Manager still has "
5330                                      f"{multiprocessing.active_children()} "
5331                                      f"active children after {dt} seconds")
5332                break
5333
5334    def run_worker(self, worker, obj):
5335        self.proc = multiprocessing.Process(target=worker, args=(obj, ))
5336        self.proc.daemon = True
5337        self.proc.start()
5338        self.wait_proc_exit()
5339        self.assertEqual(self.proc.exitcode, 0)
5340
5341    @classmethod
5342    def _test_event(cls, obj):
5343        assert obj.is_set()
5344        obj.wait()
5345        obj.clear()
5346        obj.wait(0.001)
5347
5348    def test_event(self):
5349        o = self.manager.Event()
5350        o.set()
5351        self.run_worker(self._test_event, o)
5352        assert not o.is_set()
5353        o.wait(0.001)
5354
5355    @classmethod
5356    def _test_lock(cls, obj):
5357        obj.acquire()
5358
5359    def test_lock(self, lname="Lock"):
5360        o = getattr(self.manager, lname)()
5361        self.run_worker(self._test_lock, o)
5362        o.release()
5363        self.assertRaises(RuntimeError, o.release)  # already released
5364
5365    @classmethod
5366    def _test_rlock(cls, obj):
5367        obj.acquire()
5368        obj.release()
5369
5370    def test_rlock(self, lname="Lock"):
5371        o = getattr(self.manager, lname)()
5372        self.run_worker(self._test_rlock, o)
5373
5374    @classmethod
5375    def _test_semaphore(cls, obj):
5376        obj.acquire()
5377
5378    def test_semaphore(self, sname="Semaphore"):
5379        o = getattr(self.manager, sname)()
5380        self.run_worker(self._test_semaphore, o)
5381        o.release()
5382
5383    def test_bounded_semaphore(self):
5384        self.test_semaphore(sname="BoundedSemaphore")
5385
5386    @classmethod
5387    def _test_condition(cls, obj):
5388        obj.acquire()
5389        obj.release()
5390
5391    def test_condition(self):
5392        o = self.manager.Condition()
5393        self.run_worker(self._test_condition, o)
5394
5395    @classmethod
5396    def _test_barrier(cls, obj):
5397        assert obj.parties == 5
5398        obj.reset()
5399
5400    def test_barrier(self):
5401        o = self.manager.Barrier(5)
5402        self.run_worker(self._test_barrier, o)
5403
5404    @classmethod
5405    def _test_pool(cls, obj):
5406        # TODO: fix https://bugs.python.org/issue35919
5407        with obj:
5408            pass
5409
5410    def test_pool(self):
5411        o = self.manager.Pool(processes=4)
5412        self.run_worker(self._test_pool, o)
5413
5414    @classmethod
5415    def _test_queue(cls, obj):
5416        assert obj.qsize() == 2
5417        assert obj.full()
5418        assert not obj.empty()
5419        assert obj.get() == 5
5420        assert not obj.empty()
5421        assert obj.get() == 6
5422        assert obj.empty()
5423
5424    def test_queue(self, qname="Queue"):
5425        o = getattr(self.manager, qname)(2)
5426        o.put(5)
5427        o.put(6)
5428        self.run_worker(self._test_queue, o)
5429        assert o.empty()
5430        assert not o.full()
5431
5432    def test_joinable_queue(self):
5433        self.test_queue("JoinableQueue")
5434
5435    @classmethod
5436    def _test_list(cls, obj):
5437        assert obj[0] == 5
5438        assert obj.count(5) == 1
5439        assert obj.index(5) == 0
5440        obj.sort()
5441        obj.reverse()
5442        for x in obj:
5443            pass
5444        assert len(obj) == 1
5445        assert obj.pop(0) == 5
5446
5447    def test_list(self):
5448        o = self.manager.list()
5449        o.append(5)
5450        self.run_worker(self._test_list, o)
5451        assert not o
5452        self.assertEqual(len(o), 0)
5453
5454    @classmethod
5455    def _test_dict(cls, obj):
5456        assert len(obj) == 1
5457        assert obj['foo'] == 5
5458        assert obj.get('foo') == 5
5459        assert list(obj.items()) == [('foo', 5)]
5460        assert list(obj.keys()) == ['foo']
5461        assert list(obj.values()) == [5]
5462        assert obj.copy() == {'foo': 5}
5463        assert obj.popitem() == ('foo', 5)
5464
5465    def test_dict(self):
5466        o = self.manager.dict()
5467        o['foo'] = 5
5468        self.run_worker(self._test_dict, o)
5469        assert not o
5470        self.assertEqual(len(o), 0)
5471
5472    @classmethod
5473    def _test_value(cls, obj):
5474        assert obj.value == 1
5475        assert obj.get() == 1
5476        obj.set(2)
5477
5478    def test_value(self):
5479        o = self.manager.Value('i', 1)
5480        self.run_worker(self._test_value, o)
5481        self.assertEqual(o.value, 2)
5482        self.assertEqual(o.get(), 2)
5483
5484    @classmethod
5485    def _test_array(cls, obj):
5486        assert obj[0] == 0
5487        assert obj[1] == 1
5488        assert len(obj) == 2
5489        assert list(obj) == [0, 1]
5490
5491    def test_array(self):
5492        o = self.manager.Array('i', [0, 1])
5493        self.run_worker(self._test_array, o)
5494
5495    @classmethod
5496    def _test_namespace(cls, obj):
5497        assert obj.x == 0
5498        assert obj.y == 1
5499
5500    def test_namespace(self):
5501        o = self.manager.Namespace()
5502        o.x = 0
5503        o.y = 1
5504        self.run_worker(self._test_namespace, o)
5505
5506
5507class MiscTestCase(unittest.TestCase):
5508    def test__all__(self):
5509        # Just make sure names in blacklist are excluded
5510        support.check__all__(self, multiprocessing, extra=multiprocessing.__all__,
5511                             blacklist=['SUBDEBUG', 'SUBWARNING'])
5512#
5513# Mixins
5514#
5515
5516class BaseMixin(object):
5517    @classmethod
5518    def setUpClass(cls):
5519        cls.dangling = (multiprocessing.process._dangling.copy(),
5520                        threading._dangling.copy())
5521
5522    @classmethod
5523    def tearDownClass(cls):
5524        # bpo-26762: Some multiprocessing objects like Pool create reference
5525        # cycles. Trigger a garbage collection to break these cycles.
5526        test.support.gc_collect()
5527
5528        processes = set(multiprocessing.process._dangling) - set(cls.dangling[0])
5529        if processes:
5530            test.support.environment_altered = True
5531            support.print_warning(f'Dangling processes: {processes}')
5532        processes = None
5533
5534        threads = set(threading._dangling) - set(cls.dangling[1])
5535        if threads:
5536            test.support.environment_altered = True
5537            support.print_warning(f'Dangling threads: {threads}')
5538        threads = None
5539
5540
5541class ProcessesMixin(BaseMixin):
5542    TYPE = 'processes'
5543    Process = multiprocessing.Process
5544    connection = multiprocessing.connection
5545    current_process = staticmethod(multiprocessing.current_process)
5546    parent_process = staticmethod(multiprocessing.parent_process)
5547    active_children = staticmethod(multiprocessing.active_children)
5548    Pool = staticmethod(multiprocessing.Pool)
5549    Pipe = staticmethod(multiprocessing.Pipe)
5550    Queue = staticmethod(multiprocessing.Queue)
5551    JoinableQueue = staticmethod(multiprocessing.JoinableQueue)
5552    Lock = staticmethod(multiprocessing.Lock)
5553    RLock = staticmethod(multiprocessing.RLock)
5554    Semaphore = staticmethod(multiprocessing.Semaphore)
5555    BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore)
5556    Condition = staticmethod(multiprocessing.Condition)
5557    Event = staticmethod(multiprocessing.Event)
5558    Barrier = staticmethod(multiprocessing.Barrier)
5559    Value = staticmethod(multiprocessing.Value)
5560    Array = staticmethod(multiprocessing.Array)
5561    RawValue = staticmethod(multiprocessing.RawValue)
5562    RawArray = staticmethod(multiprocessing.RawArray)
5563
5564
5565class ManagerMixin(BaseMixin):
5566    TYPE = 'manager'
5567    Process = multiprocessing.Process
5568    Queue = property(operator.attrgetter('manager.Queue'))
5569    JoinableQueue = property(operator.attrgetter('manager.JoinableQueue'))
5570    Lock = property(operator.attrgetter('manager.Lock'))
5571    RLock = property(operator.attrgetter('manager.RLock'))
5572    Semaphore = property(operator.attrgetter('manager.Semaphore'))
5573    BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore'))
5574    Condition = property(operator.attrgetter('manager.Condition'))
5575    Event = property(operator.attrgetter('manager.Event'))
5576    Barrier = property(operator.attrgetter('manager.Barrier'))
5577    Value = property(operator.attrgetter('manager.Value'))
5578    Array = property(operator.attrgetter('manager.Array'))
5579    list = property(operator.attrgetter('manager.list'))
5580    dict = property(operator.attrgetter('manager.dict'))
5581    Namespace = property(operator.attrgetter('manager.Namespace'))
5582
5583    @classmethod
5584    def Pool(cls, *args, **kwds):
5585        return cls.manager.Pool(*args, **kwds)
5586
5587    @classmethod
5588    def setUpClass(cls):
5589        super().setUpClass()
5590        cls.manager = multiprocessing.Manager()
5591
5592    @classmethod
5593    def tearDownClass(cls):
5594        # only the manager process should be returned by active_children()
5595        # but this can take a bit on slow machines, so wait a few seconds
5596        # if there are other children too (see #17395)
5597        start_time = time.monotonic()
5598        t = 0.01
5599        while len(multiprocessing.active_children()) > 1:
5600            time.sleep(t)
5601            t *= 2
5602            dt = time.monotonic() - start_time
5603            if dt >= 5.0:
5604                test.support.environment_altered = True
5605                support.print_warning(f"multiprocessing.Manager still has "
5606                                      f"{multiprocessing.active_children()} "
5607                                      f"active children after {dt} seconds")
5608                break
5609
5610        gc.collect()                       # do garbage collection
5611        if cls.manager._number_of_objects() != 0:
5612            # This is not really an error since some tests do not
5613            # ensure that all processes which hold a reference to a
5614            # managed object have been joined.
5615            test.support.environment_altered = True
5616            support.print_warning('Shared objects which still exist '
5617                                  'at manager shutdown:')
5618            support.print_warning(cls.manager._debug_info())
5619        cls.manager.shutdown()
5620        cls.manager.join()
5621        cls.manager = None
5622
5623        super().tearDownClass()
5624
5625
5626class ThreadsMixin(BaseMixin):
5627    TYPE = 'threads'
5628    Process = multiprocessing.dummy.Process
5629    connection = multiprocessing.dummy.connection
5630    current_process = staticmethod(multiprocessing.dummy.current_process)
5631    active_children = staticmethod(multiprocessing.dummy.active_children)
5632    Pool = staticmethod(multiprocessing.dummy.Pool)
5633    Pipe = staticmethod(multiprocessing.dummy.Pipe)
5634    Queue = staticmethod(multiprocessing.dummy.Queue)
5635    JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue)
5636    Lock = staticmethod(multiprocessing.dummy.Lock)
5637    RLock = staticmethod(multiprocessing.dummy.RLock)
5638    Semaphore = staticmethod(multiprocessing.dummy.Semaphore)
5639    BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore)
5640    Condition = staticmethod(multiprocessing.dummy.Condition)
5641    Event = staticmethod(multiprocessing.dummy.Event)
5642    Barrier = staticmethod(multiprocessing.dummy.Barrier)
5643    Value = staticmethod(multiprocessing.dummy.Value)
5644    Array = staticmethod(multiprocessing.dummy.Array)
5645
5646#
5647# Functions used to create test cases from the base ones in this module
5648#
5649
5650def install_tests_in_module_dict(remote_globs, start_method):
5651    __module__ = remote_globs['__name__']
5652    local_globs = globals()
5653    ALL_TYPES = {'processes', 'threads', 'manager'}
5654
5655    for name, base in local_globs.items():
5656        if not isinstance(base, type):
5657            continue
5658        if issubclass(base, BaseTestCase):
5659            if base is BaseTestCase:
5660                continue
5661            assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES
5662            for type_ in base.ALLOWED_TYPES:
5663                newname = 'With' + type_.capitalize() + name[1:]
5664                Mixin = local_globs[type_.capitalize() + 'Mixin']
5665                class Temp(base, Mixin, unittest.TestCase):
5666                    pass
5667                Temp.__name__ = Temp.__qualname__ = newname
5668                Temp.__module__ = __module__
5669                remote_globs[newname] = Temp
5670        elif issubclass(base, unittest.TestCase):
5671            class Temp(base, object):
5672                pass
5673            Temp.__name__ = Temp.__qualname__ = name
5674            Temp.__module__ = __module__
5675            remote_globs[name] = Temp
5676
5677    dangling = [None, None]
5678    old_start_method = [None]
5679
5680    def setUpModule():
5681        multiprocessing.set_forkserver_preload(PRELOAD)
5682        multiprocessing.process._cleanup()
5683        dangling[0] = multiprocessing.process._dangling.copy()
5684        dangling[1] = threading._dangling.copy()
5685        old_start_method[0] = multiprocessing.get_start_method(allow_none=True)
5686        try:
5687            multiprocessing.set_start_method(start_method, force=True)
5688        except ValueError:
5689            raise unittest.SkipTest(start_method +
5690                                    ' start method not supported')
5691
5692        if sys.platform.startswith("linux"):
5693            try:
5694                lock = multiprocessing.RLock()
5695            except OSError:
5696                raise unittest.SkipTest("OSError raises on RLock creation, "
5697                                        "see issue 3111!")
5698        check_enough_semaphores()
5699        util.get_temp_dir()     # creates temp directory
5700        multiprocessing.get_logger().setLevel(LOG_LEVEL)
5701
5702    def tearDownModule():
5703        need_sleep = False
5704
5705        # bpo-26762: Some multiprocessing objects like Pool create reference
5706        # cycles. Trigger a garbage collection to break these cycles.
5707        test.support.gc_collect()
5708
5709        multiprocessing.set_start_method(old_start_method[0], force=True)
5710        # pause a bit so we don't get warning about dangling threads/processes
5711        processes = set(multiprocessing.process._dangling) - set(dangling[0])
5712        if processes:
5713            need_sleep = True
5714            test.support.environment_altered = True
5715            support.print_warning(f'Dangling processes: {processes}')
5716        processes = None
5717
5718        threads = set(threading._dangling) - set(dangling[1])
5719        if threads:
5720            need_sleep = True
5721            test.support.environment_altered = True
5722            support.print_warning(f'Dangling threads: {threads}')
5723        threads = None
5724
5725        # Sleep 500 ms to give time to child processes to complete.
5726        if need_sleep:
5727            time.sleep(0.5)
5728
5729        multiprocessing.util._cleanup_tests()
5730
5731    remote_globs['setUpModule'] = setUpModule
5732    remote_globs['tearDownModule'] = tearDownModule
5733