1import test.support
2
3# Skip tests if _multiprocessing wasn't built.
4test.support.import_module('_multiprocessing')
5# Skip tests if sem_open implementation is broken.
6test.support.skip_if_broken_multiprocessing_synchronize()
7
8from test.support.script_helper import assert_python_ok
9
10import contextlib
11import itertools
12import logging
13from logging.handlers import QueueHandler
14import os
15import queue
16import sys
17import threading
18import time
19import unittest
20import weakref
21from pickle import PicklingError
22
23from concurrent import futures
24from concurrent.futures._base import (
25    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
26    BrokenExecutor)
27from concurrent.futures.process import BrokenProcessPool
28from multiprocessing import get_context
29
30import multiprocessing.process
31import multiprocessing.util
32
33
34def create_future(state=PENDING, exception=None, result=None):
35    f = Future()
36    f._state = state
37    f._exception = exception
38    f._result = result
39    return f
40
41
42PENDING_FUTURE = create_future(state=PENDING)
43RUNNING_FUTURE = create_future(state=RUNNING)
44CANCELLED_FUTURE = create_future(state=CANCELLED)
45CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
46EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
47SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
48
49INITIALIZER_STATUS = 'uninitialized'
50
51
52def mul(x, y):
53    return x * y
54
55def capture(*args, **kwargs):
56    return args, kwargs
57
58def sleep_and_raise(t):
59    time.sleep(t)
60    raise Exception('this is an exception')
61
62def sleep_and_print(t, msg):
63    time.sleep(t)
64    print(msg)
65    sys.stdout.flush()
66
67def init(x):
68    global INITIALIZER_STATUS
69    INITIALIZER_STATUS = x
70
71def get_init_status():
72    return INITIALIZER_STATUS
73
74def init_fail(log_queue=None):
75    if log_queue is not None:
76        logger = logging.getLogger('concurrent.futures')
77        logger.addHandler(QueueHandler(log_queue))
78        logger.setLevel('CRITICAL')
79        logger.propagate = False
80    time.sleep(0.1)  # let some futures be scheduled
81    raise ValueError('error in initializer')
82
83
84class MyObject(object):
85    def my_method(self):
86        pass
87
88
89class EventfulGCObj():
90    def __init__(self, mgr):
91        self.event = mgr.Event()
92
93    def __del__(self):
94        self.event.set()
95
96
97def make_dummy_object(_):
98    return MyObject()
99
100
101class BaseTestCase(unittest.TestCase):
102    def setUp(self):
103        self._thread_key = test.support.threading_setup()
104
105    def tearDown(self):
106        test.support.reap_children()
107        test.support.threading_cleanup(*self._thread_key)
108
109
110class ExecutorMixin:
111    worker_count = 5
112    executor_kwargs = {}
113
114    def setUp(self):
115        super().setUp()
116
117        self.t1 = time.monotonic()
118        if hasattr(self, "ctx"):
119            self.executor = self.executor_type(
120                max_workers=self.worker_count,
121                mp_context=self.get_context(),
122                **self.executor_kwargs)
123        else:
124            self.executor = self.executor_type(
125                max_workers=self.worker_count,
126                **self.executor_kwargs)
127        self._prime_executor()
128
129    def tearDown(self):
130        self.executor.shutdown(wait=True)
131        self.executor = None
132
133        dt = time.monotonic() - self.t1
134        if test.support.verbose:
135            print("%.2fs" % dt, end=' ')
136        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
137
138        super().tearDown()
139
140    def get_context(self):
141        return get_context(self.ctx)
142
143    def _prime_executor(self):
144        # Make sure that the executor is ready to do work before running the
145        # tests. This should reduce the probability of timeouts in the tests.
146        futures = [self.executor.submit(time.sleep, 0.1)
147                   for _ in range(self.worker_count)]
148        for f in futures:
149            f.result()
150
151
152class ThreadPoolMixin(ExecutorMixin):
153    executor_type = futures.ThreadPoolExecutor
154
155
156class ProcessPoolForkMixin(ExecutorMixin):
157    executor_type = futures.ProcessPoolExecutor
158    ctx = "fork"
159
160    def get_context(self):
161        if sys.platform == "win32":
162            self.skipTest("require unix system")
163        return super().get_context()
164
165
166class ProcessPoolSpawnMixin(ExecutorMixin):
167    executor_type = futures.ProcessPoolExecutor
168    ctx = "spawn"
169
170
171class ProcessPoolForkserverMixin(ExecutorMixin):
172    executor_type = futures.ProcessPoolExecutor
173    ctx = "forkserver"
174
175    def get_context(self):
176        if sys.platform == "win32":
177            self.skipTest("require unix system")
178        return super().get_context()
179
180
181def create_executor_tests(mixin, bases=(BaseTestCase,),
182                          executor_mixins=(ThreadPoolMixin,
183                                           ProcessPoolForkMixin,
184                                           ProcessPoolForkserverMixin,
185                                           ProcessPoolSpawnMixin)):
186    def strip_mixin(name):
187        if name.endswith(('Mixin', 'Tests')):
188            return name[:-5]
189        elif name.endswith('Test'):
190            return name[:-4]
191        else:
192            return name
193
194    for exe in executor_mixins:
195        name = ("%s%sTest"
196                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
197        cls = type(name, (mixin,) + (exe,) + bases, {})
198        globals()[name] = cls
199
200
201class InitializerMixin(ExecutorMixin):
202    worker_count = 2
203
204    def setUp(self):
205        global INITIALIZER_STATUS
206        INITIALIZER_STATUS = 'uninitialized'
207        self.executor_kwargs = dict(initializer=init,
208                                    initargs=('initialized',))
209        super().setUp()
210
211    def test_initializer(self):
212        futures = [self.executor.submit(get_init_status)
213                   for _ in range(self.worker_count)]
214
215        for f in futures:
216            self.assertEqual(f.result(), 'initialized')
217
218
219class FailingInitializerMixin(ExecutorMixin):
220    worker_count = 2
221
222    def setUp(self):
223        if hasattr(self, "ctx"):
224            # Pass a queue to redirect the child's logging output
225            self.mp_context = self.get_context()
226            self.log_queue = self.mp_context.Queue()
227            self.executor_kwargs = dict(initializer=init_fail,
228                                        initargs=(self.log_queue,))
229        else:
230            # In a thread pool, the child shares our logging setup
231            # (see _assert_logged())
232            self.mp_context = None
233            self.log_queue = None
234            self.executor_kwargs = dict(initializer=init_fail)
235        super().setUp()
236
237    def test_initializer(self):
238        with self._assert_logged('ValueError: error in initializer'):
239            try:
240                future = self.executor.submit(get_init_status)
241            except BrokenExecutor:
242                # Perhaps the executor is already broken
243                pass
244            else:
245                with self.assertRaises(BrokenExecutor):
246                    future.result()
247            # At some point, the executor should break
248            t1 = time.monotonic()
249            while not self.executor._broken:
250                if time.monotonic() - t1 > 5:
251                    self.fail("executor not broken after 5 s.")
252                time.sleep(0.01)
253            # ... and from this point submit() is guaranteed to fail
254            with self.assertRaises(BrokenExecutor):
255                self.executor.submit(get_init_status)
256
257    def _prime_executor(self):
258        pass
259
260    @contextlib.contextmanager
261    def _assert_logged(self, msg):
262        if self.log_queue is not None:
263            yield
264            output = []
265            try:
266                while True:
267                    output.append(self.log_queue.get_nowait().getMessage())
268            except queue.Empty:
269                pass
270        else:
271            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
272                yield
273            output = cm.output
274        self.assertTrue(any(msg in line for line in output),
275                        output)
276
277
278create_executor_tests(InitializerMixin)
279create_executor_tests(FailingInitializerMixin)
280
281
282class ExecutorShutdownTest:
283    def test_run_after_shutdown(self):
284        self.executor.shutdown()
285        self.assertRaises(RuntimeError,
286                          self.executor.submit,
287                          pow, 2, 5)
288
289    def test_interpreter_shutdown(self):
290        # Test the atexit hook for shutdown of worker threads and processes
291        rc, out, err = assert_python_ok('-c', """if 1:
292            from concurrent.futures import {executor_type}
293            from time import sleep
294            from test.test_concurrent_futures import sleep_and_print
295            if __name__ == "__main__":
296                context = '{context}'
297                if context == "":
298                    t = {executor_type}(5)
299                else:
300                    from multiprocessing import get_context
301                    context = get_context(context)
302                    t = {executor_type}(5, mp_context=context)
303                t.submit(sleep_and_print, 1.0, "apple")
304            """.format(executor_type=self.executor_type.__name__,
305                       context=getattr(self, "ctx", "")))
306        # Errors in atexit hooks don't change the process exit code, check
307        # stderr manually.
308        self.assertFalse(err)
309        self.assertEqual(out.strip(), b"apple")
310
311    def test_submit_after_interpreter_shutdown(self):
312        # Test the atexit hook for shutdown of worker threads and processes
313        rc, out, err = assert_python_ok('-c', """if 1:
314            import atexit
315            @atexit.register
316            def run_last():
317                try:
318                    t.submit(id, None)
319                except RuntimeError:
320                    print("runtime-error")
321                    raise
322            from concurrent.futures import {executor_type}
323            if __name__ == "__main__":
324                context = '{context}'
325                if not context:
326                    t = {executor_type}(5)
327                else:
328                    from multiprocessing import get_context
329                    context = get_context(context)
330                    t = {executor_type}(5, mp_context=context)
331                    t.submit(id, 42).result()
332            """.format(executor_type=self.executor_type.__name__,
333                       context=getattr(self, "ctx", "")))
334        # Errors in atexit hooks don't change the process exit code, check
335        # stderr manually.
336        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
337        self.assertEqual(out.strip(), b"runtime-error")
338
339    def test_hang_issue12364(self):
340        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
341        self.executor.shutdown()
342        for f in fs:
343            f.result()
344
345
346class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
347    def _prime_executor(self):
348        pass
349
350    def test_threads_terminate(self):
351        def acquire_lock(lock):
352            lock.acquire()
353
354        sem = threading.Semaphore(0)
355        for i in range(3):
356            self.executor.submit(acquire_lock, sem)
357        self.assertEqual(len(self.executor._threads), 3)
358        for i in range(3):
359            sem.release()
360        self.executor.shutdown()
361        for t in self.executor._threads:
362            t.join()
363
364    def test_context_manager_shutdown(self):
365        with futures.ThreadPoolExecutor(max_workers=5) as e:
366            executor = e
367            self.assertEqual(list(e.map(abs, range(-5, 5))),
368                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
369
370        for t in executor._threads:
371            t.join()
372
373    def test_del_shutdown(self):
374        executor = futures.ThreadPoolExecutor(max_workers=5)
375        executor.map(abs, range(-5, 5))
376        threads = executor._threads
377        del executor
378
379        for t in threads:
380            t.join()
381
382    def test_thread_names_assigned(self):
383        executor = futures.ThreadPoolExecutor(
384            max_workers=5, thread_name_prefix='SpecialPool')
385        executor.map(abs, range(-5, 5))
386        threads = executor._threads
387        del executor
388
389        for t in threads:
390            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
391            t.join()
392
393    def test_thread_names_default(self):
394        executor = futures.ThreadPoolExecutor(max_workers=5)
395        executor.map(abs, range(-5, 5))
396        threads = executor._threads
397        del executor
398
399        for t in threads:
400            # Ensure that our default name is reasonably sane and unique when
401            # no thread_name_prefix was supplied.
402            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
403            t.join()
404
405
406class ProcessPoolShutdownTest(ExecutorShutdownTest):
407    def _prime_executor(self):
408        pass
409
410    def test_processes_terminate(self):
411        self.executor.submit(mul, 21, 2)
412        self.executor.submit(mul, 6, 7)
413        self.executor.submit(mul, 3, 14)
414        self.assertEqual(len(self.executor._processes), 5)
415        processes = self.executor._processes
416        self.executor.shutdown()
417
418        for p in processes.values():
419            p.join()
420
421    def test_context_manager_shutdown(self):
422        with futures.ProcessPoolExecutor(max_workers=5) as e:
423            processes = e._processes
424            self.assertEqual(list(e.map(abs, range(-5, 5))),
425                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
426
427        for p in processes.values():
428            p.join()
429
430    def test_del_shutdown(self):
431        executor = futures.ProcessPoolExecutor(max_workers=5)
432        list(executor.map(abs, range(-5, 5)))
433        queue_management_thread = executor._queue_management_thread
434        processes = executor._processes
435        call_queue = executor._call_queue
436        queue_management_thread = executor._queue_management_thread
437        del executor
438
439        # Make sure that all the executor resources were properly cleaned by
440        # the shutdown process
441        queue_management_thread.join()
442        for p in processes.values():
443            p.join()
444        call_queue.join_thread()
445
446
447create_executor_tests(ProcessPoolShutdownTest,
448                      executor_mixins=(ProcessPoolForkMixin,
449                                       ProcessPoolForkserverMixin,
450                                       ProcessPoolSpawnMixin))
451
452
453class WaitTests:
454
455    def test_first_completed(self):
456        future1 = self.executor.submit(mul, 21, 2)
457        future2 = self.executor.submit(time.sleep, 1.5)
458
459        done, not_done = futures.wait(
460                [CANCELLED_FUTURE, future1, future2],
461                 return_when=futures.FIRST_COMPLETED)
462
463        self.assertEqual(set([future1]), done)
464        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
465
466    def test_first_completed_some_already_completed(self):
467        future1 = self.executor.submit(time.sleep, 1.5)
468
469        finished, pending = futures.wait(
470                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
471                 return_when=futures.FIRST_COMPLETED)
472
473        self.assertEqual(
474                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
475                finished)
476        self.assertEqual(set([future1]), pending)
477
478    def test_first_exception(self):
479        future1 = self.executor.submit(mul, 2, 21)
480        future2 = self.executor.submit(sleep_and_raise, 1.5)
481        future3 = self.executor.submit(time.sleep, 3)
482
483        finished, pending = futures.wait(
484                [future1, future2, future3],
485                return_when=futures.FIRST_EXCEPTION)
486
487        self.assertEqual(set([future1, future2]), finished)
488        self.assertEqual(set([future3]), pending)
489
490    def test_first_exception_some_already_complete(self):
491        future1 = self.executor.submit(divmod, 21, 0)
492        future2 = self.executor.submit(time.sleep, 1.5)
493
494        finished, pending = futures.wait(
495                [SUCCESSFUL_FUTURE,
496                 CANCELLED_FUTURE,
497                 CANCELLED_AND_NOTIFIED_FUTURE,
498                 future1, future2],
499                return_when=futures.FIRST_EXCEPTION)
500
501        self.assertEqual(set([SUCCESSFUL_FUTURE,
502                              CANCELLED_AND_NOTIFIED_FUTURE,
503                              future1]), finished)
504        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
505
506    def test_first_exception_one_already_failed(self):
507        future1 = self.executor.submit(time.sleep, 2)
508
509        finished, pending = futures.wait(
510                 [EXCEPTION_FUTURE, future1],
511                 return_when=futures.FIRST_EXCEPTION)
512
513        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
514        self.assertEqual(set([future1]), pending)
515
516    def test_all_completed(self):
517        future1 = self.executor.submit(divmod, 2, 0)
518        future2 = self.executor.submit(mul, 2, 21)
519
520        finished, pending = futures.wait(
521                [SUCCESSFUL_FUTURE,
522                 CANCELLED_AND_NOTIFIED_FUTURE,
523                 EXCEPTION_FUTURE,
524                 future1,
525                 future2],
526                return_when=futures.ALL_COMPLETED)
527
528        self.assertEqual(set([SUCCESSFUL_FUTURE,
529                              CANCELLED_AND_NOTIFIED_FUTURE,
530                              EXCEPTION_FUTURE,
531                              future1,
532                              future2]), finished)
533        self.assertEqual(set(), pending)
534
535    def test_timeout(self):
536        future1 = self.executor.submit(mul, 6, 7)
537        future2 = self.executor.submit(time.sleep, 6)
538
539        finished, pending = futures.wait(
540                [CANCELLED_AND_NOTIFIED_FUTURE,
541                 EXCEPTION_FUTURE,
542                 SUCCESSFUL_FUTURE,
543                 future1, future2],
544                timeout=5,
545                return_when=futures.ALL_COMPLETED)
546
547        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
548                              EXCEPTION_FUTURE,
549                              SUCCESSFUL_FUTURE,
550                              future1]), finished)
551        self.assertEqual(set([future2]), pending)
552
553
554class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
555
556    def test_pending_calls_race(self):
557        # Issue #14406: multi-threaded race condition when waiting on all
558        # futures.
559        event = threading.Event()
560        def future_func():
561            event.wait()
562        oldswitchinterval = sys.getswitchinterval()
563        sys.setswitchinterval(1e-6)
564        try:
565            fs = {self.executor.submit(future_func) for i in range(100)}
566            event.set()
567            futures.wait(fs, return_when=futures.ALL_COMPLETED)
568        finally:
569            sys.setswitchinterval(oldswitchinterval)
570
571
572create_executor_tests(WaitTests,
573                      executor_mixins=(ProcessPoolForkMixin,
574                                       ProcessPoolForkserverMixin,
575                                       ProcessPoolSpawnMixin))
576
577
578class AsCompletedTests:
579    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
580    def test_no_timeout(self):
581        future1 = self.executor.submit(mul, 2, 21)
582        future2 = self.executor.submit(mul, 7, 6)
583
584        completed = set(futures.as_completed(
585                [CANCELLED_AND_NOTIFIED_FUTURE,
586                 EXCEPTION_FUTURE,
587                 SUCCESSFUL_FUTURE,
588                 future1, future2]))
589        self.assertEqual(set(
590                [CANCELLED_AND_NOTIFIED_FUTURE,
591                 EXCEPTION_FUTURE,
592                 SUCCESSFUL_FUTURE,
593                 future1, future2]),
594                completed)
595
596    def test_zero_timeout(self):
597        future1 = self.executor.submit(time.sleep, 2)
598        completed_futures = set()
599        try:
600            for future in futures.as_completed(
601                    [CANCELLED_AND_NOTIFIED_FUTURE,
602                     EXCEPTION_FUTURE,
603                     SUCCESSFUL_FUTURE,
604                     future1],
605                    timeout=0):
606                completed_futures.add(future)
607        except futures.TimeoutError:
608            pass
609
610        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
611                              EXCEPTION_FUTURE,
612                              SUCCESSFUL_FUTURE]),
613                         completed_futures)
614
615    def test_duplicate_futures(self):
616        # Issue 20367. Duplicate futures should not raise exceptions or give
617        # duplicate responses.
618        # Issue #31641: accept arbitrary iterables.
619        future1 = self.executor.submit(time.sleep, 2)
620        completed = [
621            f for f in futures.as_completed(itertools.repeat(future1, 3))
622        ]
623        self.assertEqual(len(completed), 1)
624
625    def test_free_reference_yielded_future(self):
626        # Issue #14406: Generator should not keep references
627        # to finished futures.
628        futures_list = [Future() for _ in range(8)]
629        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
630        futures_list.append(create_future(state=FINISHED, result=42))
631
632        with self.assertRaises(futures.TimeoutError):
633            for future in futures.as_completed(futures_list, timeout=0):
634                futures_list.remove(future)
635                wr = weakref.ref(future)
636                del future
637                self.assertIsNone(wr())
638
639        futures_list[0].set_result("test")
640        for future in futures.as_completed(futures_list):
641            futures_list.remove(future)
642            wr = weakref.ref(future)
643            del future
644            self.assertIsNone(wr())
645            if futures_list:
646                futures_list[0].set_result("test")
647
648    def test_correct_timeout_exception_msg(self):
649        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
650                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
651
652        with self.assertRaises(futures.TimeoutError) as cm:
653            list(futures.as_completed(futures_list, timeout=0))
654
655        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
656
657
658create_executor_tests(AsCompletedTests)
659
660
661class ExecutorTest:
662    # Executor.shutdown() and context manager usage is tested by
663    # ExecutorShutdownTest.
664    def test_submit(self):
665        future = self.executor.submit(pow, 2, 8)
666        self.assertEqual(256, future.result())
667
668    def test_submit_keyword(self):
669        future = self.executor.submit(mul, 2, y=8)
670        self.assertEqual(16, future.result())
671        future = self.executor.submit(capture, 1, self=2, fn=3)
672        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
673        with self.assertWarns(DeprecationWarning):
674            future = self.executor.submit(fn=capture, arg=1)
675        self.assertEqual(future.result(), ((), {'arg': 1}))
676        with self.assertRaises(TypeError):
677            self.executor.submit(arg=1)
678
679    def test_map(self):
680        self.assertEqual(
681                list(self.executor.map(pow, range(10), range(10))),
682                list(map(pow, range(10), range(10))))
683
684        self.assertEqual(
685                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
686                list(map(pow, range(10), range(10))))
687
688    def test_map_exception(self):
689        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
690        self.assertEqual(i.__next__(), (0, 1))
691        self.assertEqual(i.__next__(), (0, 1))
692        self.assertRaises(ZeroDivisionError, i.__next__)
693
694    def test_map_timeout(self):
695        results = []
696        try:
697            for i in self.executor.map(time.sleep,
698                                       [0, 0, 6],
699                                       timeout=5):
700                results.append(i)
701        except futures.TimeoutError:
702            pass
703        else:
704            self.fail('expected TimeoutError')
705
706        self.assertEqual([None, None], results)
707
708    def test_shutdown_race_issue12456(self):
709        # Issue #12456: race condition at shutdown where trying to post a
710        # sentinel in the call queue blocks (the queue is full while processes
711        # have exited).
712        self.executor.map(str, [2] * (self.worker_count + 1))
713        self.executor.shutdown()
714
715    @test.support.cpython_only
716    def test_no_stale_references(self):
717        # Issue #16284: check that the executors don't unnecessarily hang onto
718        # references.
719        my_object = MyObject()
720        my_object_collected = threading.Event()
721        my_object_callback = weakref.ref(
722            my_object, lambda obj: my_object_collected.set())
723        # Deliberately discarding the future.
724        self.executor.submit(my_object.my_method)
725        del my_object
726
727        collected = my_object_collected.wait(timeout=5.0)
728        self.assertTrue(collected,
729                        "Stale reference not collected within timeout.")
730
731    def test_max_workers_negative(self):
732        for number in (0, -1):
733            with self.assertRaisesRegex(ValueError,
734                                        "max_workers must be greater "
735                                        "than 0"):
736                self.executor_type(max_workers=number)
737
738    def test_free_reference(self):
739        # Issue #14406: Result iterator should not keep an internal
740        # reference to result objects.
741        for obj in self.executor.map(make_dummy_object, range(10)):
742            wr = weakref.ref(obj)
743            del obj
744            self.assertIsNone(wr())
745
746
747class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
748    def test_map_submits_without_iteration(self):
749        """Tests verifying issue 11777."""
750        finished = []
751        def record_finished(n):
752            finished.append(n)
753
754        self.executor.map(record_finished, range(10))
755        self.executor.shutdown(wait=True)
756        self.assertCountEqual(finished, range(10))
757
758    def test_default_workers(self):
759        executor = self.executor_type()
760        expected = min(32, (os.cpu_count() or 1) + 4)
761        self.assertEqual(executor._max_workers, expected)
762
763    def test_saturation(self):
764        executor = self.executor_type(4)
765        def acquire_lock(lock):
766            lock.acquire()
767
768        sem = threading.Semaphore(0)
769        for i in range(15 * executor._max_workers):
770            executor.submit(acquire_lock, sem)
771        self.assertEqual(len(executor._threads), executor._max_workers)
772        for i in range(15 * executor._max_workers):
773            sem.release()
774        executor.shutdown(wait=True)
775
776    def test_idle_thread_reuse(self):
777        executor = self.executor_type()
778        executor.submit(mul, 21, 2).result()
779        executor.submit(mul, 6, 7).result()
780        executor.submit(mul, 3, 14).result()
781        self.assertEqual(len(executor._threads), 1)
782        executor.shutdown(wait=True)
783
784
785class ProcessPoolExecutorTest(ExecutorTest):
786
787    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
788    def test_max_workers_too_large(self):
789        with self.assertRaisesRegex(ValueError,
790                                    "max_workers must be <= 61"):
791            futures.ProcessPoolExecutor(max_workers=62)
792
793    def test_killed_child(self):
794        # When a child process is abruptly terminated, the whole pool gets
795        # "broken".
796        futures = [self.executor.submit(time.sleep, 3)]
797        # Get one of the processes, and terminate (kill) it
798        p = next(iter(self.executor._processes.values()))
799        p.terminate()
800        for fut in futures:
801            self.assertRaises(BrokenProcessPool, fut.result)
802        # Submitting other jobs fails as well.
803        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
804
805    def test_map_chunksize(self):
806        def bad_map():
807            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
808
809        ref = list(map(pow, range(40), range(40)))
810        self.assertEqual(
811            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
812            ref)
813        self.assertEqual(
814            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
815            ref)
816        self.assertEqual(
817            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
818            ref)
819        self.assertRaises(ValueError, bad_map)
820
821    @classmethod
822    def _test_traceback(cls):
823        raise RuntimeError(123) # some comment
824
825    def test_traceback(self):
826        # We want ensure that the traceback from the child process is
827        # contained in the traceback raised in the main process.
828        future = self.executor.submit(self._test_traceback)
829        with self.assertRaises(Exception) as cm:
830            future.result()
831
832        exc = cm.exception
833        self.assertIs(type(exc), RuntimeError)
834        self.assertEqual(exc.args, (123,))
835        cause = exc.__cause__
836        self.assertIs(type(cause), futures.process._RemoteTraceback)
837        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
838
839        with test.support.captured_stderr() as f1:
840            try:
841                raise exc
842            except RuntimeError:
843                sys.excepthook(*sys.exc_info())
844        self.assertIn('raise RuntimeError(123) # some comment',
845                      f1.getvalue())
846
847    def test_ressources_gced_in_workers(self):
848        # Ensure that argument for a job are correctly gc-ed after the job
849        # is finished
850        mgr = get_context(self.ctx).Manager()
851        obj = EventfulGCObj(mgr)
852        future = self.executor.submit(id, obj)
853        future.result()
854
855        self.assertTrue(obj.event.wait(timeout=1))
856
857        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
858        # is called while manager is still running.
859        obj = None
860        test.support.gc_collect()
861
862        mgr.shutdown()
863        mgr.join()
864
865
866create_executor_tests(ProcessPoolExecutorTest,
867                      executor_mixins=(ProcessPoolForkMixin,
868                                       ProcessPoolForkserverMixin,
869                                       ProcessPoolSpawnMixin))
870
871def hide_process_stderr():
872    import io
873    sys.stderr = io.StringIO()
874
875
876def _crash(delay=None):
877    """Induces a segfault."""
878    if delay:
879        time.sleep(delay)
880    import faulthandler
881    faulthandler.disable()
882    faulthandler._sigsegv()
883
884
885def _exit():
886    """Induces a sys exit with exitcode 1."""
887    sys.exit(1)
888
889
890def _raise_error(Err):
891    """Function that raises an Exception in process."""
892    hide_process_stderr()
893    raise Err()
894
895
896def _return_instance(cls):
897    """Function that returns a instance of cls."""
898    hide_process_stderr()
899    return cls()
900
901
902class CrashAtPickle(object):
903    """Bad object that triggers a segfault at pickling time."""
904    def __reduce__(self):
905        _crash()
906
907
908class CrashAtUnpickle(object):
909    """Bad object that triggers a segfault at unpickling time."""
910    def __reduce__(self):
911        return _crash, ()
912
913
914class ExitAtPickle(object):
915    """Bad object that triggers a process exit at pickling time."""
916    def __reduce__(self):
917        _exit()
918
919
920class ExitAtUnpickle(object):
921    """Bad object that triggers a process exit at unpickling time."""
922    def __reduce__(self):
923        return _exit, ()
924
925
926class ErrorAtPickle(object):
927    """Bad object that triggers an error at pickling time."""
928    def __reduce__(self):
929        from pickle import PicklingError
930        raise PicklingError("Error in pickle")
931
932
933class ErrorAtUnpickle(object):
934    """Bad object that triggers an error at unpickling time."""
935    def __reduce__(self):
936        from pickle import UnpicklingError
937        return _raise_error, (UnpicklingError, )
938
939
940class ExecutorDeadlockTest:
941    TIMEOUT = 15
942
943    @classmethod
944    def _sleep_id(cls, x, delay):
945        time.sleep(delay)
946        return x
947
948    def _fail_on_deadlock(self, executor):
949        # If we did not recover before TIMEOUT seconds, consider that the
950        # executor is in a deadlock state and forcefully clean all its
951        # composants.
952        import faulthandler
953        from tempfile import TemporaryFile
954        with TemporaryFile(mode="w+") as f:
955            faulthandler.dump_traceback(file=f)
956            f.seek(0)
957            tb = f.read()
958        for p in executor._processes.values():
959            p.terminate()
960        # This should be safe to call executor.shutdown here as all possible
961        # deadlocks should have been broken.
962        executor.shutdown(wait=True)
963        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
964        self.fail(f"Executor deadlock:\n\n{tb}")
965
966
967    def test_crash(self):
968        # extensive testing for deadlock caused by crashes in a pool.
969        self.executor.shutdown(wait=True)
970        crash_cases = [
971            # Check problem occurring while pickling a task in
972            # the task_handler thread
973            (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"),
974            # Check problem occurring while unpickling a task on workers
975            (id, (ExitAtUnpickle(),), BrokenProcessPool,
976             "exit at task unpickle"),
977            (id, (ErrorAtUnpickle(),), BrokenProcessPool,
978             "error at task unpickle"),
979            (id, (CrashAtUnpickle(),), BrokenProcessPool,
980             "crash at task unpickle"),
981            # Check problem occurring during func execution on workers
982            (_crash, (), BrokenProcessPool,
983             "crash during func execution on worker"),
984            (_exit, (), SystemExit,
985             "exit during func execution on worker"),
986            (_raise_error, (RuntimeError, ), RuntimeError,
987             "error during func execution on worker"),
988            # Check problem occurring while pickling a task result
989            # on workers
990            (_return_instance, (CrashAtPickle,), BrokenProcessPool,
991             "crash during result pickle on worker"),
992            (_return_instance, (ExitAtPickle,), SystemExit,
993             "exit during result pickle on worker"),
994            (_return_instance, (ErrorAtPickle,), PicklingError,
995             "error during result pickle on worker"),
996            # Check problem occurring while unpickling a task in
997            # the result_handler thread
998            (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool,
999             "error during result unpickle in result_handler"),
1000            (_return_instance, (ExitAtUnpickle,), BrokenProcessPool,
1001             "exit during result unpickle in result_handler")
1002        ]
1003        for func, args, error, name in crash_cases:
1004            with self.subTest(name):
1005                # The captured_stderr reduces the noise in the test report
1006                with test.support.captured_stderr():
1007                    executor = self.executor_type(
1008                        max_workers=2, mp_context=get_context(self.ctx))
1009                    res = executor.submit(func, *args)
1010                    with self.assertRaises(error):
1011                        try:
1012                            res.result(timeout=self.TIMEOUT)
1013                        except futures.TimeoutError:
1014                            # If we did not recover before TIMEOUT seconds,
1015                            # consider that the executor is in a deadlock state
1016                            self._fail_on_deadlock(executor)
1017                    executor.shutdown(wait=True)
1018
1019    def test_shutdown_deadlock(self):
1020        # Test that the pool calling shutdown do not cause deadlock
1021        # if a worker fails after the shutdown call.
1022        self.executor.shutdown(wait=True)
1023        with self.executor_type(max_workers=2,
1024                                mp_context=get_context(self.ctx)) as executor:
1025            self.executor = executor  # Allow clean up in fail_on_deadlock
1026            f = executor.submit(_crash, delay=.1)
1027            executor.shutdown(wait=True)
1028            with self.assertRaises(BrokenProcessPool):
1029                f.result()
1030
1031
1032create_executor_tests(ExecutorDeadlockTest,
1033                      executor_mixins=(ProcessPoolForkMixin,
1034                                       ProcessPoolForkserverMixin,
1035                                       ProcessPoolSpawnMixin))
1036
1037
1038class FutureTests(BaseTestCase):
1039    def test_done_callback_with_result(self):
1040        callback_result = None
1041        def fn(callback_future):
1042            nonlocal callback_result
1043            callback_result = callback_future.result()
1044
1045        f = Future()
1046        f.add_done_callback(fn)
1047        f.set_result(5)
1048        self.assertEqual(5, callback_result)
1049
1050    def test_done_callback_with_exception(self):
1051        callback_exception = None
1052        def fn(callback_future):
1053            nonlocal callback_exception
1054            callback_exception = callback_future.exception()
1055
1056        f = Future()
1057        f.add_done_callback(fn)
1058        f.set_exception(Exception('test'))
1059        self.assertEqual(('test',), callback_exception.args)
1060
1061    def test_done_callback_with_cancel(self):
1062        was_cancelled = None
1063        def fn(callback_future):
1064            nonlocal was_cancelled
1065            was_cancelled = callback_future.cancelled()
1066
1067        f = Future()
1068        f.add_done_callback(fn)
1069        self.assertTrue(f.cancel())
1070        self.assertTrue(was_cancelled)
1071
1072    def test_done_callback_raises(self):
1073        with test.support.captured_stderr() as stderr:
1074            raising_was_called = False
1075            fn_was_called = False
1076
1077            def raising_fn(callback_future):
1078                nonlocal raising_was_called
1079                raising_was_called = True
1080                raise Exception('doh!')
1081
1082            def fn(callback_future):
1083                nonlocal fn_was_called
1084                fn_was_called = True
1085
1086            f = Future()
1087            f.add_done_callback(raising_fn)
1088            f.add_done_callback(fn)
1089            f.set_result(5)
1090            self.assertTrue(raising_was_called)
1091            self.assertTrue(fn_was_called)
1092            self.assertIn('Exception: doh!', stderr.getvalue())
1093
1094    def test_done_callback_already_successful(self):
1095        callback_result = None
1096        def fn(callback_future):
1097            nonlocal callback_result
1098            callback_result = callback_future.result()
1099
1100        f = Future()
1101        f.set_result(5)
1102        f.add_done_callback(fn)
1103        self.assertEqual(5, callback_result)
1104
1105    def test_done_callback_already_failed(self):
1106        callback_exception = None
1107        def fn(callback_future):
1108            nonlocal callback_exception
1109            callback_exception = callback_future.exception()
1110
1111        f = Future()
1112        f.set_exception(Exception('test'))
1113        f.add_done_callback(fn)
1114        self.assertEqual(('test',), callback_exception.args)
1115
1116    def test_done_callback_already_cancelled(self):
1117        was_cancelled = None
1118        def fn(callback_future):
1119            nonlocal was_cancelled
1120            was_cancelled = callback_future.cancelled()
1121
1122        f = Future()
1123        self.assertTrue(f.cancel())
1124        f.add_done_callback(fn)
1125        self.assertTrue(was_cancelled)
1126
1127    def test_done_callback_raises_already_succeeded(self):
1128        with test.support.captured_stderr() as stderr:
1129            def raising_fn(callback_future):
1130                raise Exception('doh!')
1131
1132            f = Future()
1133
1134            # Set the result first to simulate a future that runs instantly,
1135            # effectively allowing the callback to be run immediately.
1136            f.set_result(5)
1137            f.add_done_callback(raising_fn)
1138
1139            self.assertIn('exception calling callback for', stderr.getvalue())
1140            self.assertIn('doh!', stderr.getvalue())
1141
1142
1143    def test_repr(self):
1144        self.assertRegex(repr(PENDING_FUTURE),
1145                         '<Future at 0x[0-9a-f]+ state=pending>')
1146        self.assertRegex(repr(RUNNING_FUTURE),
1147                         '<Future at 0x[0-9a-f]+ state=running>')
1148        self.assertRegex(repr(CANCELLED_FUTURE),
1149                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1150        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1151                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1152        self.assertRegex(
1153                repr(EXCEPTION_FUTURE),
1154                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1155        self.assertRegex(
1156                repr(SUCCESSFUL_FUTURE),
1157                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1158
1159
1160    def test_cancel(self):
1161        f1 = create_future(state=PENDING)
1162        f2 = create_future(state=RUNNING)
1163        f3 = create_future(state=CANCELLED)
1164        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1165        f5 = create_future(state=FINISHED, exception=OSError())
1166        f6 = create_future(state=FINISHED, result=5)
1167
1168        self.assertTrue(f1.cancel())
1169        self.assertEqual(f1._state, CANCELLED)
1170
1171        self.assertFalse(f2.cancel())
1172        self.assertEqual(f2._state, RUNNING)
1173
1174        self.assertTrue(f3.cancel())
1175        self.assertEqual(f3._state, CANCELLED)
1176
1177        self.assertTrue(f4.cancel())
1178        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1179
1180        self.assertFalse(f5.cancel())
1181        self.assertEqual(f5._state, FINISHED)
1182
1183        self.assertFalse(f6.cancel())
1184        self.assertEqual(f6._state, FINISHED)
1185
1186    def test_cancelled(self):
1187        self.assertFalse(PENDING_FUTURE.cancelled())
1188        self.assertFalse(RUNNING_FUTURE.cancelled())
1189        self.assertTrue(CANCELLED_FUTURE.cancelled())
1190        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1191        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1192        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1193
1194    def test_done(self):
1195        self.assertFalse(PENDING_FUTURE.done())
1196        self.assertFalse(RUNNING_FUTURE.done())
1197        self.assertTrue(CANCELLED_FUTURE.done())
1198        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1199        self.assertTrue(EXCEPTION_FUTURE.done())
1200        self.assertTrue(SUCCESSFUL_FUTURE.done())
1201
1202    def test_running(self):
1203        self.assertFalse(PENDING_FUTURE.running())
1204        self.assertTrue(RUNNING_FUTURE.running())
1205        self.assertFalse(CANCELLED_FUTURE.running())
1206        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1207        self.assertFalse(EXCEPTION_FUTURE.running())
1208        self.assertFalse(SUCCESSFUL_FUTURE.running())
1209
1210    def test_result_with_timeout(self):
1211        self.assertRaises(futures.TimeoutError,
1212                          PENDING_FUTURE.result, timeout=0)
1213        self.assertRaises(futures.TimeoutError,
1214                          RUNNING_FUTURE.result, timeout=0)
1215        self.assertRaises(futures.CancelledError,
1216                          CANCELLED_FUTURE.result, timeout=0)
1217        self.assertRaises(futures.CancelledError,
1218                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1219        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1220        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1221
1222    def test_result_with_success(self):
1223        # TODO(brian@sweetapp.com): This test is timing dependent.
1224        def notification():
1225            # Wait until the main thread is waiting for the result.
1226            time.sleep(1)
1227            f1.set_result(42)
1228
1229        f1 = create_future(state=PENDING)
1230        t = threading.Thread(target=notification)
1231        t.start()
1232
1233        self.assertEqual(f1.result(timeout=5), 42)
1234        t.join()
1235
1236    def test_result_with_cancel(self):
1237        # TODO(brian@sweetapp.com): This test is timing dependent.
1238        def notification():
1239            # Wait until the main thread is waiting for the result.
1240            time.sleep(1)
1241            f1.cancel()
1242
1243        f1 = create_future(state=PENDING)
1244        t = threading.Thread(target=notification)
1245        t.start()
1246
1247        self.assertRaises(futures.CancelledError, f1.result, timeout=5)
1248        t.join()
1249
1250    def test_exception_with_timeout(self):
1251        self.assertRaises(futures.TimeoutError,
1252                          PENDING_FUTURE.exception, timeout=0)
1253        self.assertRaises(futures.TimeoutError,
1254                          RUNNING_FUTURE.exception, timeout=0)
1255        self.assertRaises(futures.CancelledError,
1256                          CANCELLED_FUTURE.exception, timeout=0)
1257        self.assertRaises(futures.CancelledError,
1258                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1259        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1260                                   OSError))
1261        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1262
1263    def test_exception_with_success(self):
1264        def notification():
1265            # Wait until the main thread is waiting for the exception.
1266            time.sleep(1)
1267            with f1._condition:
1268                f1._state = FINISHED
1269                f1._exception = OSError()
1270                f1._condition.notify_all()
1271
1272        f1 = create_future(state=PENDING)
1273        t = threading.Thread(target=notification)
1274        t.start()
1275
1276        self.assertTrue(isinstance(f1.exception(timeout=5), OSError))
1277        t.join()
1278
1279    def test_multiple_set_result(self):
1280        f = create_future(state=PENDING)
1281        f.set_result(1)
1282
1283        with self.assertRaisesRegex(
1284                futures.InvalidStateError,
1285                'FINISHED: <Future at 0x[0-9a-f]+ '
1286                'state=finished returned int>'
1287        ):
1288            f.set_result(2)
1289
1290        self.assertTrue(f.done())
1291        self.assertEqual(f.result(), 1)
1292
1293    def test_multiple_set_exception(self):
1294        f = create_future(state=PENDING)
1295        e = ValueError()
1296        f.set_exception(e)
1297
1298        with self.assertRaisesRegex(
1299                futures.InvalidStateError,
1300                'FINISHED: <Future at 0x[0-9a-f]+ '
1301                'state=finished raised ValueError>'
1302        ):
1303            f.set_exception(Exception())
1304
1305        self.assertEqual(f.exception(), e)
1306
1307
1308_threads_key = None
1309
1310def setUpModule():
1311    global _threads_key
1312    _threads_key = test.support.threading_setup()
1313
1314
1315def tearDownModule():
1316    test.support.threading_cleanup(*_threads_key)
1317    multiprocessing.util._cleanup_tests()
1318
1319
1320if __name__ == "__main__":
1321    unittest.main()
1322