1from test import support
2from test.support import import_helper
3from test.support import threading_helper
4
5# Skip tests if _multiprocessing wasn't built.
6import_helper.import_module('_multiprocessing')
7
8from test.support import hashlib_helper
9from test.support.script_helper import assert_python_ok
10
11import contextlib
12import itertools
13import logging
14from logging.handlers import QueueHandler
15import os
16import queue
17import sys
18import threading
19import time
20import unittest
21import weakref
22from pickle import PicklingError
23
24from concurrent import futures
25from concurrent.futures._base import (
26    PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future,
27    BrokenExecutor)
28from concurrent.futures.process import BrokenProcessPool, _check_system_limits
29from multiprocessing import get_context
30
31import multiprocessing.process
32import multiprocessing.util
33
34
35def create_future(state=PENDING, exception=None, result=None):
36    f = Future()
37    f._state = state
38    f._exception = exception
39    f._result = result
40    return f
41
42
43PENDING_FUTURE = create_future(state=PENDING)
44RUNNING_FUTURE = create_future(state=RUNNING)
45CANCELLED_FUTURE = create_future(state=CANCELLED)
46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED)
47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError())
48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42)
49
50INITIALIZER_STATUS = 'uninitialized'
51
52
53def mul(x, y):
54    return x * y
55
56def capture(*args, **kwargs):
57    return args, kwargs
58
59def sleep_and_raise(t):
60    time.sleep(t)
61    raise Exception('this is an exception')
62
63def sleep_and_print(t, msg):
64    time.sleep(t)
65    print(msg)
66    sys.stdout.flush()
67
68def init(x):
69    global INITIALIZER_STATUS
70    INITIALIZER_STATUS = x
71
72def get_init_status():
73    return INITIALIZER_STATUS
74
75def init_fail(log_queue=None):
76    if log_queue is not None:
77        logger = logging.getLogger('concurrent.futures')
78        logger.addHandler(QueueHandler(log_queue))
79        logger.setLevel('CRITICAL')
80        logger.propagate = False
81    time.sleep(0.1)  # let some futures be scheduled
82    raise ValueError('error in initializer')
83
84
85class MyObject(object):
86    def my_method(self):
87        pass
88
89
90class EventfulGCObj():
91    def __init__(self, mgr):
92        self.event = mgr.Event()
93
94    def __del__(self):
95        self.event.set()
96
97
98def make_dummy_object(_):
99    return MyObject()
100
101
102class BaseTestCase(unittest.TestCase):
103    def setUp(self):
104        self._thread_key = threading_helper.threading_setup()
105
106    def tearDown(self):
107        support.reap_children()
108        threading_helper.threading_cleanup(*self._thread_key)
109
110
111class ExecutorMixin:
112    worker_count = 5
113    executor_kwargs = {}
114
115    def setUp(self):
116        super().setUp()
117
118        self.t1 = time.monotonic()
119        if hasattr(self, "ctx"):
120            self.executor = self.executor_type(
121                max_workers=self.worker_count,
122                mp_context=self.get_context(),
123                **self.executor_kwargs)
124        else:
125            self.executor = self.executor_type(
126                max_workers=self.worker_count,
127                **self.executor_kwargs)
128        self._prime_executor()
129
130    def tearDown(self):
131        self.executor.shutdown(wait=True)
132        self.executor = None
133
134        dt = time.monotonic() - self.t1
135        if support.verbose:
136            print("%.2fs" % dt, end=' ')
137        self.assertLess(dt, 300, "synchronization issue: test lasted too long")
138
139        super().tearDown()
140
141    def get_context(self):
142        return get_context(self.ctx)
143
144    def _prime_executor(self):
145        # Make sure that the executor is ready to do work before running the
146        # tests. This should reduce the probability of timeouts in the tests.
147        futures = [self.executor.submit(time.sleep, 0.1)
148                   for _ in range(self.worker_count)]
149        for f in futures:
150            f.result()
151
152
153class ThreadPoolMixin(ExecutorMixin):
154    executor_type = futures.ThreadPoolExecutor
155
156
157class ProcessPoolForkMixin(ExecutorMixin):
158    executor_type = futures.ProcessPoolExecutor
159    ctx = "fork"
160
161    def get_context(self):
162        try:
163            _check_system_limits()
164        except NotImplementedError:
165            self.skipTest("ProcessPoolExecutor unavailable on this system")
166        if sys.platform == "win32":
167            self.skipTest("require unix system")
168        return super().get_context()
169
170
171class ProcessPoolSpawnMixin(ExecutorMixin):
172    executor_type = futures.ProcessPoolExecutor
173    ctx = "spawn"
174
175    def get_context(self):
176        try:
177            _check_system_limits()
178        except NotImplementedError:
179            self.skipTest("ProcessPoolExecutor unavailable on this system")
180        return super().get_context()
181
182
183class ProcessPoolForkserverMixin(ExecutorMixin):
184    executor_type = futures.ProcessPoolExecutor
185    ctx = "forkserver"
186
187    def get_context(self):
188        try:
189            _check_system_limits()
190        except NotImplementedError:
191            self.skipTest("ProcessPoolExecutor unavailable on this system")
192        if sys.platform == "win32":
193            self.skipTest("require unix system")
194        return super().get_context()
195
196
197def create_executor_tests(mixin, bases=(BaseTestCase,),
198                          executor_mixins=(ThreadPoolMixin,
199                                           ProcessPoolForkMixin,
200                                           ProcessPoolForkserverMixin,
201                                           ProcessPoolSpawnMixin)):
202    def strip_mixin(name):
203        if name.endswith(('Mixin', 'Tests')):
204            return name[:-5]
205        elif name.endswith('Test'):
206            return name[:-4]
207        else:
208            return name
209
210    for exe in executor_mixins:
211        name = ("%s%sTest"
212                % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__)))
213        cls = type(name, (mixin,) + (exe,) + bases, {})
214        globals()[name] = cls
215
216
217class InitializerMixin(ExecutorMixin):
218    worker_count = 2
219
220    def setUp(self):
221        global INITIALIZER_STATUS
222        INITIALIZER_STATUS = 'uninitialized'
223        self.executor_kwargs = dict(initializer=init,
224                                    initargs=('initialized',))
225        super().setUp()
226
227    def test_initializer(self):
228        futures = [self.executor.submit(get_init_status)
229                   for _ in range(self.worker_count)]
230
231        for f in futures:
232            self.assertEqual(f.result(), 'initialized')
233
234
235class FailingInitializerMixin(ExecutorMixin):
236    worker_count = 2
237
238    def setUp(self):
239        if hasattr(self, "ctx"):
240            # Pass a queue to redirect the child's logging output
241            self.mp_context = self.get_context()
242            self.log_queue = self.mp_context.Queue()
243            self.executor_kwargs = dict(initializer=init_fail,
244                                        initargs=(self.log_queue,))
245        else:
246            # In a thread pool, the child shares our logging setup
247            # (see _assert_logged())
248            self.mp_context = None
249            self.log_queue = None
250            self.executor_kwargs = dict(initializer=init_fail)
251        super().setUp()
252
253    def test_initializer(self):
254        with self._assert_logged('ValueError: error in initializer'):
255            try:
256                future = self.executor.submit(get_init_status)
257            except BrokenExecutor:
258                # Perhaps the executor is already broken
259                pass
260            else:
261                with self.assertRaises(BrokenExecutor):
262                    future.result()
263            # At some point, the executor should break
264            t1 = time.monotonic()
265            while not self.executor._broken:
266                if time.monotonic() - t1 > 5:
267                    self.fail("executor not broken after 5 s.")
268                time.sleep(0.01)
269            # ... and from this point submit() is guaranteed to fail
270            with self.assertRaises(BrokenExecutor):
271                self.executor.submit(get_init_status)
272
273    def _prime_executor(self):
274        pass
275
276    @contextlib.contextmanager
277    def _assert_logged(self, msg):
278        if self.log_queue is not None:
279            yield
280            output = []
281            try:
282                while True:
283                    output.append(self.log_queue.get_nowait().getMessage())
284            except queue.Empty:
285                pass
286        else:
287            with self.assertLogs('concurrent.futures', 'CRITICAL') as cm:
288                yield
289            output = cm.output
290        self.assertTrue(any(msg in line for line in output),
291                        output)
292
293
294create_executor_tests(InitializerMixin)
295create_executor_tests(FailingInitializerMixin)
296
297
298class ExecutorShutdownTest:
299    def test_run_after_shutdown(self):
300        self.executor.shutdown()
301        self.assertRaises(RuntimeError,
302                          self.executor.submit,
303                          pow, 2, 5)
304
305    def test_interpreter_shutdown(self):
306        # Test the atexit hook for shutdown of worker threads and processes
307        rc, out, err = assert_python_ok('-c', """if 1:
308            from concurrent.futures import {executor_type}
309            from time import sleep
310            from test.test_concurrent_futures import sleep_and_print
311            if __name__ == "__main__":
312                context = '{context}'
313                if context == "":
314                    t = {executor_type}(5)
315                else:
316                    from multiprocessing import get_context
317                    context = get_context(context)
318                    t = {executor_type}(5, mp_context=context)
319                t.submit(sleep_and_print, 1.0, "apple")
320            """.format(executor_type=self.executor_type.__name__,
321                       context=getattr(self, "ctx", "")))
322        # Errors in atexit hooks don't change the process exit code, check
323        # stderr manually.
324        self.assertFalse(err)
325        self.assertEqual(out.strip(), b"apple")
326
327    def test_submit_after_interpreter_shutdown(self):
328        # Test the atexit hook for shutdown of worker threads and processes
329        rc, out, err = assert_python_ok('-c', """if 1:
330            import atexit
331            @atexit.register
332            def run_last():
333                try:
334                    t.submit(id, None)
335                except RuntimeError:
336                    print("runtime-error")
337                    raise
338            from concurrent.futures import {executor_type}
339            if __name__ == "__main__":
340                context = '{context}'
341                if not context:
342                    t = {executor_type}(5)
343                else:
344                    from multiprocessing import get_context
345                    context = get_context(context)
346                    t = {executor_type}(5, mp_context=context)
347                    t.submit(id, 42).result()
348            """.format(executor_type=self.executor_type.__name__,
349                       context=getattr(self, "ctx", "")))
350        # Errors in atexit hooks don't change the process exit code, check
351        # stderr manually.
352        self.assertIn("RuntimeError: cannot schedule new futures", err.decode())
353        self.assertEqual(out.strip(), b"runtime-error")
354
355    def test_hang_issue12364(self):
356        fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)]
357        self.executor.shutdown()
358        for f in fs:
359            f.result()
360
361    def test_cancel_futures(self):
362        executor = self.executor_type(max_workers=3)
363        fs = [executor.submit(time.sleep, .1) for _ in range(50)]
364        executor.shutdown(cancel_futures=True)
365        # We can't guarantee the exact number of cancellations, but we can
366        # guarantee that *some* were cancelled. With setting max_workers to 3,
367        # most of the submitted futures should have been cancelled.
368        cancelled = [fut for fut in fs if fut.cancelled()]
369        self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}")
370
371        # Ensure the other futures were able to finish.
372        # Use "not fut.cancelled()" instead of "fut.done()" to include futures
373        # that may have been left in a pending state.
374        others = [fut for fut in fs if not fut.cancelled()]
375        for fut in others:
376            self.assertTrue(fut.done(), msg=f"{fut._state=}")
377            self.assertIsNone(fut.exception())
378
379        # Similar to the number of cancelled futures, we can't guarantee the
380        # exact number that completed. But, we can guarantee that at least
381        # one finished.
382        self.assertTrue(len(others) > 0, msg=f"{len(others)=}")
383
384    def test_hang_issue39205(self):
385        """shutdown(wait=False) doesn't hang at exit with running futures.
386
387        See https://bugs.python.org/issue39205.
388        """
389        if self.executor_type == futures.ProcessPoolExecutor:
390            raise unittest.SkipTest(
391                "Hangs due to https://bugs.python.org/issue39205")
392
393        rc, out, err = assert_python_ok('-c', """if True:
394            from concurrent.futures import {executor_type}
395            from test.test_concurrent_futures import sleep_and_print
396            if __name__ == "__main__":
397                t = {executor_type}(max_workers=3)
398                t.submit(sleep_and_print, 1.0, "apple")
399                t.shutdown(wait=False)
400            """.format(executor_type=self.executor_type.__name__))
401        self.assertFalse(err)
402        self.assertEqual(out.strip(), b"apple")
403
404
405class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase):
406    def _prime_executor(self):
407        pass
408
409    def test_threads_terminate(self):
410        def acquire_lock(lock):
411            lock.acquire()
412
413        sem = threading.Semaphore(0)
414        for i in range(3):
415            self.executor.submit(acquire_lock, sem)
416        self.assertEqual(len(self.executor._threads), 3)
417        for i in range(3):
418            sem.release()
419        self.executor.shutdown()
420        for t in self.executor._threads:
421            t.join()
422
423    def test_context_manager_shutdown(self):
424        with futures.ThreadPoolExecutor(max_workers=5) as e:
425            executor = e
426            self.assertEqual(list(e.map(abs, range(-5, 5))),
427                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
428
429        for t in executor._threads:
430            t.join()
431
432    def test_del_shutdown(self):
433        executor = futures.ThreadPoolExecutor(max_workers=5)
434        res = executor.map(abs, range(-5, 5))
435        threads = executor._threads
436        del executor
437
438        for t in threads:
439            t.join()
440
441        # Make sure the results were all computed before the
442        # executor got shutdown.
443        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
444
445    def test_shutdown_no_wait(self):
446        # Ensure that the executor cleans up the threads when calling
447        # shutdown with wait=False
448        executor = futures.ThreadPoolExecutor(max_workers=5)
449        res = executor.map(abs, range(-5, 5))
450        threads = executor._threads
451        executor.shutdown(wait=False)
452        for t in threads:
453            t.join()
454
455        # Make sure the results were all computed before the
456        # executor got shutdown.
457        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
458
459
460    def test_thread_names_assigned(self):
461        executor = futures.ThreadPoolExecutor(
462            max_workers=5, thread_name_prefix='SpecialPool')
463        executor.map(abs, range(-5, 5))
464        threads = executor._threads
465        del executor
466        support.gc_collect()  # For PyPy or other GCs.
467
468        for t in threads:
469            self.assertRegex(t.name, r'^SpecialPool_[0-4]$')
470            t.join()
471
472    def test_thread_names_default(self):
473        executor = futures.ThreadPoolExecutor(max_workers=5)
474        executor.map(abs, range(-5, 5))
475        threads = executor._threads
476        del executor
477        support.gc_collect()  # For PyPy or other GCs.
478
479        for t in threads:
480            # Ensure that our default name is reasonably sane and unique when
481            # no thread_name_prefix was supplied.
482            self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$')
483            t.join()
484
485    def test_cancel_futures_wait_false(self):
486        # Can only be reliably tested for TPE, since PPE often hangs with
487        # `wait=False` (even without *cancel_futures*).
488        rc, out, err = assert_python_ok('-c', """if True:
489            from concurrent.futures import ThreadPoolExecutor
490            from test.test_concurrent_futures import sleep_and_print
491            if __name__ == "__main__":
492                t = ThreadPoolExecutor()
493                t.submit(sleep_and_print, .1, "apple")
494                t.shutdown(wait=False, cancel_futures=True)
495            """.format(executor_type=self.executor_type.__name__))
496        # Errors in atexit hooks don't change the process exit code, check
497        # stderr manually.
498        self.assertFalse(err)
499        self.assertEqual(out.strip(), b"apple")
500
501
502class ProcessPoolShutdownTest(ExecutorShutdownTest):
503    def _prime_executor(self):
504        pass
505
506    def test_processes_terminate(self):
507        def acquire_lock(lock):
508            lock.acquire()
509
510        mp_context = get_context()
511        sem = mp_context.Semaphore(0)
512        for _ in range(3):
513            self.executor.submit(acquire_lock, sem)
514        self.assertEqual(len(self.executor._processes), 3)
515        for _ in range(3):
516            sem.release()
517        processes = self.executor._processes
518        self.executor.shutdown()
519
520        for p in processes.values():
521            p.join()
522
523    def test_context_manager_shutdown(self):
524        with futures.ProcessPoolExecutor(max_workers=5) as e:
525            processes = e._processes
526            self.assertEqual(list(e.map(abs, range(-5, 5))),
527                             [5, 4, 3, 2, 1, 0, 1, 2, 3, 4])
528
529        for p in processes.values():
530            p.join()
531
532    def test_del_shutdown(self):
533        executor = futures.ProcessPoolExecutor(max_workers=5)
534        res = executor.map(abs, range(-5, 5))
535        executor_manager_thread = executor._executor_manager_thread
536        processes = executor._processes
537        call_queue = executor._call_queue
538        executor_manager_thread = executor._executor_manager_thread
539        del executor
540        support.gc_collect()  # For PyPy or other GCs.
541
542        # Make sure that all the executor resources were properly cleaned by
543        # the shutdown process
544        executor_manager_thread.join()
545        for p in processes.values():
546            p.join()
547        call_queue.join_thread()
548
549        # Make sure the results were all computed before the
550        # executor got shutdown.
551        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
552
553    def test_shutdown_no_wait(self):
554        # Ensure that the executor cleans up the processes when calling
555        # shutdown with wait=False
556        executor = futures.ProcessPoolExecutor(max_workers=5)
557        res = executor.map(abs, range(-5, 5))
558        processes = executor._processes
559        call_queue = executor._call_queue
560        executor_manager_thread = executor._executor_manager_thread
561        executor.shutdown(wait=False)
562
563        # Make sure that all the executor resources were properly cleaned by
564        # the shutdown process
565        executor_manager_thread.join()
566        for p in processes.values():
567            p.join()
568        call_queue.join_thread()
569
570        # Make sure the results were all computed before the executor got
571        # shutdown.
572        assert all([r == abs(v) for r, v in zip(res, range(-5, 5))])
573
574
575create_executor_tests(ProcessPoolShutdownTest,
576                      executor_mixins=(ProcessPoolForkMixin,
577                                       ProcessPoolForkserverMixin,
578                                       ProcessPoolSpawnMixin))
579
580
581class WaitTests:
582
583    def test_first_completed(self):
584        future1 = self.executor.submit(mul, 21, 2)
585        future2 = self.executor.submit(time.sleep, 1.5)
586
587        done, not_done = futures.wait(
588                [CANCELLED_FUTURE, future1, future2],
589                 return_when=futures.FIRST_COMPLETED)
590
591        self.assertEqual(set([future1]), done)
592        self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done)
593
594    def test_first_completed_some_already_completed(self):
595        future1 = self.executor.submit(time.sleep, 1.5)
596
597        finished, pending = futures.wait(
598                 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1],
599                 return_when=futures.FIRST_COMPLETED)
600
601        self.assertEqual(
602                set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]),
603                finished)
604        self.assertEqual(set([future1]), pending)
605
606    def test_first_exception(self):
607        future1 = self.executor.submit(mul, 2, 21)
608        future2 = self.executor.submit(sleep_and_raise, 1.5)
609        future3 = self.executor.submit(time.sleep, 3)
610
611        finished, pending = futures.wait(
612                [future1, future2, future3],
613                return_when=futures.FIRST_EXCEPTION)
614
615        self.assertEqual(set([future1, future2]), finished)
616        self.assertEqual(set([future3]), pending)
617
618    def test_first_exception_some_already_complete(self):
619        future1 = self.executor.submit(divmod, 21, 0)
620        future2 = self.executor.submit(time.sleep, 1.5)
621
622        finished, pending = futures.wait(
623                [SUCCESSFUL_FUTURE,
624                 CANCELLED_FUTURE,
625                 CANCELLED_AND_NOTIFIED_FUTURE,
626                 future1, future2],
627                return_when=futures.FIRST_EXCEPTION)
628
629        self.assertEqual(set([SUCCESSFUL_FUTURE,
630                              CANCELLED_AND_NOTIFIED_FUTURE,
631                              future1]), finished)
632        self.assertEqual(set([CANCELLED_FUTURE, future2]), pending)
633
634    def test_first_exception_one_already_failed(self):
635        future1 = self.executor.submit(time.sleep, 2)
636
637        finished, pending = futures.wait(
638                 [EXCEPTION_FUTURE, future1],
639                 return_when=futures.FIRST_EXCEPTION)
640
641        self.assertEqual(set([EXCEPTION_FUTURE]), finished)
642        self.assertEqual(set([future1]), pending)
643
644    def test_all_completed(self):
645        future1 = self.executor.submit(divmod, 2, 0)
646        future2 = self.executor.submit(mul, 2, 21)
647
648        finished, pending = futures.wait(
649                [SUCCESSFUL_FUTURE,
650                 CANCELLED_AND_NOTIFIED_FUTURE,
651                 EXCEPTION_FUTURE,
652                 future1,
653                 future2],
654                return_when=futures.ALL_COMPLETED)
655
656        self.assertEqual(set([SUCCESSFUL_FUTURE,
657                              CANCELLED_AND_NOTIFIED_FUTURE,
658                              EXCEPTION_FUTURE,
659                              future1,
660                              future2]), finished)
661        self.assertEqual(set(), pending)
662
663    def test_timeout(self):
664        future1 = self.executor.submit(mul, 6, 7)
665        future2 = self.executor.submit(time.sleep, 6)
666
667        finished, pending = futures.wait(
668                [CANCELLED_AND_NOTIFIED_FUTURE,
669                 EXCEPTION_FUTURE,
670                 SUCCESSFUL_FUTURE,
671                 future1, future2],
672                timeout=5,
673                return_when=futures.ALL_COMPLETED)
674
675        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
676                              EXCEPTION_FUTURE,
677                              SUCCESSFUL_FUTURE,
678                              future1]), finished)
679        self.assertEqual(set([future2]), pending)
680
681
682class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase):
683
684    def test_pending_calls_race(self):
685        # Issue #14406: multi-threaded race condition when waiting on all
686        # futures.
687        event = threading.Event()
688        def future_func():
689            event.wait()
690        oldswitchinterval = sys.getswitchinterval()
691        sys.setswitchinterval(1e-6)
692        try:
693            fs = {self.executor.submit(future_func) for i in range(100)}
694            event.set()
695            futures.wait(fs, return_when=futures.ALL_COMPLETED)
696        finally:
697            sys.setswitchinterval(oldswitchinterval)
698
699
700create_executor_tests(WaitTests,
701                      executor_mixins=(ProcessPoolForkMixin,
702                                       ProcessPoolForkserverMixin,
703                                       ProcessPoolSpawnMixin))
704
705
706class AsCompletedTests:
707    # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout.
708    def test_no_timeout(self):
709        future1 = self.executor.submit(mul, 2, 21)
710        future2 = self.executor.submit(mul, 7, 6)
711
712        completed = set(futures.as_completed(
713                [CANCELLED_AND_NOTIFIED_FUTURE,
714                 EXCEPTION_FUTURE,
715                 SUCCESSFUL_FUTURE,
716                 future1, future2]))
717        self.assertEqual(set(
718                [CANCELLED_AND_NOTIFIED_FUTURE,
719                 EXCEPTION_FUTURE,
720                 SUCCESSFUL_FUTURE,
721                 future1, future2]),
722                completed)
723
724    def test_zero_timeout(self):
725        future1 = self.executor.submit(time.sleep, 2)
726        completed_futures = set()
727        try:
728            for future in futures.as_completed(
729                    [CANCELLED_AND_NOTIFIED_FUTURE,
730                     EXCEPTION_FUTURE,
731                     SUCCESSFUL_FUTURE,
732                     future1],
733                    timeout=0):
734                completed_futures.add(future)
735        except futures.TimeoutError:
736            pass
737
738        self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE,
739                              EXCEPTION_FUTURE,
740                              SUCCESSFUL_FUTURE]),
741                         completed_futures)
742
743    def test_duplicate_futures(self):
744        # Issue 20367. Duplicate futures should not raise exceptions or give
745        # duplicate responses.
746        # Issue #31641: accept arbitrary iterables.
747        future1 = self.executor.submit(time.sleep, 2)
748        completed = [
749            f for f in futures.as_completed(itertools.repeat(future1, 3))
750        ]
751        self.assertEqual(len(completed), 1)
752
753    def test_free_reference_yielded_future(self):
754        # Issue #14406: Generator should not keep references
755        # to finished futures.
756        futures_list = [Future() for _ in range(8)]
757        futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED))
758        futures_list.append(create_future(state=FINISHED, result=42))
759
760        with self.assertRaises(futures.TimeoutError):
761            for future in futures.as_completed(futures_list, timeout=0):
762                futures_list.remove(future)
763                wr = weakref.ref(future)
764                del future
765                support.gc_collect()  # For PyPy or other GCs.
766                self.assertIsNone(wr())
767
768        futures_list[0].set_result("test")
769        for future in futures.as_completed(futures_list):
770            futures_list.remove(future)
771            wr = weakref.ref(future)
772            del future
773            support.gc_collect()  # For PyPy or other GCs.
774            self.assertIsNone(wr())
775            if futures_list:
776                futures_list[0].set_result("test")
777
778    def test_correct_timeout_exception_msg(self):
779        futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE,
780                        RUNNING_FUTURE, SUCCESSFUL_FUTURE]
781
782        with self.assertRaises(futures.TimeoutError) as cm:
783            list(futures.as_completed(futures_list, timeout=0))
784
785        self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished')
786
787
788create_executor_tests(AsCompletedTests)
789
790
791class ExecutorTest:
792    # Executor.shutdown() and context manager usage is tested by
793    # ExecutorShutdownTest.
794    def test_submit(self):
795        future = self.executor.submit(pow, 2, 8)
796        self.assertEqual(256, future.result())
797
798    def test_submit_keyword(self):
799        future = self.executor.submit(mul, 2, y=8)
800        self.assertEqual(16, future.result())
801        future = self.executor.submit(capture, 1, self=2, fn=3)
802        self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3}))
803        with self.assertRaises(TypeError):
804            self.executor.submit(fn=capture, arg=1)
805        with self.assertRaises(TypeError):
806            self.executor.submit(arg=1)
807
808    def test_map(self):
809        self.assertEqual(
810                list(self.executor.map(pow, range(10), range(10))),
811                list(map(pow, range(10), range(10))))
812
813        self.assertEqual(
814                list(self.executor.map(pow, range(10), range(10), chunksize=3)),
815                list(map(pow, range(10), range(10))))
816
817    def test_map_exception(self):
818        i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5])
819        self.assertEqual(i.__next__(), (0, 1))
820        self.assertEqual(i.__next__(), (0, 1))
821        self.assertRaises(ZeroDivisionError, i.__next__)
822
823    def test_map_timeout(self):
824        results = []
825        try:
826            for i in self.executor.map(time.sleep,
827                                       [0, 0, 6],
828                                       timeout=5):
829                results.append(i)
830        except futures.TimeoutError:
831            pass
832        else:
833            self.fail('expected TimeoutError')
834
835        self.assertEqual([None, None], results)
836
837    def test_shutdown_race_issue12456(self):
838        # Issue #12456: race condition at shutdown where trying to post a
839        # sentinel in the call queue blocks (the queue is full while processes
840        # have exited).
841        self.executor.map(str, [2] * (self.worker_count + 1))
842        self.executor.shutdown()
843
844    @support.cpython_only
845    def test_no_stale_references(self):
846        # Issue #16284: check that the executors don't unnecessarily hang onto
847        # references.
848        my_object = MyObject()
849        my_object_collected = threading.Event()
850        my_object_callback = weakref.ref(
851            my_object, lambda obj: my_object_collected.set())
852        # Deliberately discarding the future.
853        self.executor.submit(my_object.my_method)
854        del my_object
855
856        collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT)
857        self.assertTrue(collected,
858                        "Stale reference not collected within timeout.")
859
860    def test_max_workers_negative(self):
861        for number in (0, -1):
862            with self.assertRaisesRegex(ValueError,
863                                        "max_workers must be greater "
864                                        "than 0"):
865                self.executor_type(max_workers=number)
866
867    def test_free_reference(self):
868        # Issue #14406: Result iterator should not keep an internal
869        # reference to result objects.
870        for obj in self.executor.map(make_dummy_object, range(10)):
871            wr = weakref.ref(obj)
872            del obj
873            support.gc_collect()  # For PyPy or other GCs.
874            self.assertIsNone(wr())
875
876
877class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase):
878    def test_map_submits_without_iteration(self):
879        """Tests verifying issue 11777."""
880        finished = []
881        def record_finished(n):
882            finished.append(n)
883
884        self.executor.map(record_finished, range(10))
885        self.executor.shutdown(wait=True)
886        self.assertCountEqual(finished, range(10))
887
888    def test_default_workers(self):
889        executor = self.executor_type()
890        expected = min(32, (os.cpu_count() or 1) + 4)
891        self.assertEqual(executor._max_workers, expected)
892
893    def test_saturation(self):
894        executor = self.executor_type(4)
895        def acquire_lock(lock):
896            lock.acquire()
897
898        sem = threading.Semaphore(0)
899        for i in range(15 * executor._max_workers):
900            executor.submit(acquire_lock, sem)
901        self.assertEqual(len(executor._threads), executor._max_workers)
902        for i in range(15 * executor._max_workers):
903            sem.release()
904        executor.shutdown(wait=True)
905
906    def test_idle_thread_reuse(self):
907        executor = self.executor_type()
908        executor.submit(mul, 21, 2).result()
909        executor.submit(mul, 6, 7).result()
910        executor.submit(mul, 3, 14).result()
911        self.assertEqual(len(executor._threads), 1)
912        executor.shutdown(wait=True)
913
914    @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork')
915    def test_hang_global_shutdown_lock(self):
916        # bpo-45021: _global_shutdown_lock should be reinitialized in the child
917        # process, otherwise it will never exit
918        def submit(pool):
919            pool.submit(submit, pool)
920
921        with futures.ThreadPoolExecutor(1) as pool:
922            pool.submit(submit, pool)
923
924            for _ in range(50):
925                with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers:
926                    workers.submit(tuple)
927
928
929class ProcessPoolExecutorTest(ExecutorTest):
930
931    @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit')
932    def test_max_workers_too_large(self):
933        with self.assertRaisesRegex(ValueError,
934                                    "max_workers must be <= 61"):
935            futures.ProcessPoolExecutor(max_workers=62)
936
937    def test_killed_child(self):
938        # When a child process is abruptly terminated, the whole pool gets
939        # "broken".
940        futures = [self.executor.submit(time.sleep, 3)]
941        # Get one of the processes, and terminate (kill) it
942        p = next(iter(self.executor._processes.values()))
943        p.terminate()
944        for fut in futures:
945            self.assertRaises(BrokenProcessPool, fut.result)
946        # Submitting other jobs fails as well.
947        self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8)
948
949    def test_map_chunksize(self):
950        def bad_map():
951            list(self.executor.map(pow, range(40), range(40), chunksize=-1))
952
953        ref = list(map(pow, range(40), range(40)))
954        self.assertEqual(
955            list(self.executor.map(pow, range(40), range(40), chunksize=6)),
956            ref)
957        self.assertEqual(
958            list(self.executor.map(pow, range(40), range(40), chunksize=50)),
959            ref)
960        self.assertEqual(
961            list(self.executor.map(pow, range(40), range(40), chunksize=40)),
962            ref)
963        self.assertRaises(ValueError, bad_map)
964
965    @classmethod
966    def _test_traceback(cls):
967        raise RuntimeError(123) # some comment
968
969    def test_traceback(self):
970        # We want ensure that the traceback from the child process is
971        # contained in the traceback raised in the main process.
972        future = self.executor.submit(self._test_traceback)
973        with self.assertRaises(Exception) as cm:
974            future.result()
975
976        exc = cm.exception
977        self.assertIs(type(exc), RuntimeError)
978        self.assertEqual(exc.args, (123,))
979        cause = exc.__cause__
980        self.assertIs(type(cause), futures.process._RemoteTraceback)
981        self.assertIn('raise RuntimeError(123) # some comment', cause.tb)
982
983        with support.captured_stderr() as f1:
984            try:
985                raise exc
986            except RuntimeError:
987                sys.excepthook(*sys.exc_info())
988        self.assertIn('raise RuntimeError(123) # some comment',
989                      f1.getvalue())
990
991    @hashlib_helper.requires_hashdigest('md5')
992    def test_ressources_gced_in_workers(self):
993        # Ensure that argument for a job are correctly gc-ed after the job
994        # is finished
995        mgr = get_context(self.ctx).Manager()
996        obj = EventfulGCObj(mgr)
997        future = self.executor.submit(id, obj)
998        future.result()
999
1000        self.assertTrue(obj.event.wait(timeout=1))
1001
1002        # explicitly destroy the object to ensure that EventfulGCObj.__del__()
1003        # is called while manager is still running.
1004        obj = None
1005        support.gc_collect()
1006
1007        mgr.shutdown()
1008        mgr.join()
1009
1010    def test_saturation(self):
1011        executor = self.executor_type(4)
1012        mp_context = get_context()
1013        sem = mp_context.Semaphore(0)
1014        job_count = 15 * executor._max_workers
1015        try:
1016            for _ in range(job_count):
1017                executor.submit(sem.acquire)
1018            self.assertEqual(len(executor._processes), executor._max_workers)
1019            for _ in range(job_count):
1020                sem.release()
1021        finally:
1022            executor.shutdown()
1023
1024    def test_idle_process_reuse_one(self):
1025        executor = self.executor_type(4)
1026        executor.submit(mul, 21, 2).result()
1027        executor.submit(mul, 6, 7).result()
1028        executor.submit(mul, 3, 14).result()
1029        self.assertEqual(len(executor._processes), 1)
1030        executor.shutdown()
1031
1032    def test_idle_process_reuse_multiple(self):
1033        executor = self.executor_type(4)
1034        executor.submit(mul, 12, 7).result()
1035        executor.submit(mul, 33, 25)
1036        executor.submit(mul, 25, 26).result()
1037        executor.submit(mul, 18, 29)
1038        self.assertLessEqual(len(executor._processes), 2)
1039        executor.shutdown()
1040
1041create_executor_tests(ProcessPoolExecutorTest,
1042                      executor_mixins=(ProcessPoolForkMixin,
1043                                       ProcessPoolForkserverMixin,
1044                                       ProcessPoolSpawnMixin))
1045
1046def _crash(delay=None):
1047    """Induces a segfault."""
1048    if delay:
1049        time.sleep(delay)
1050    import faulthandler
1051    faulthandler.disable()
1052    faulthandler._sigsegv()
1053
1054
1055def _exit():
1056    """Induces a sys exit with exitcode 1."""
1057    sys.exit(1)
1058
1059
1060def _raise_error(Err):
1061    """Function that raises an Exception in process."""
1062    raise Err()
1063
1064
1065def _raise_error_ignore_stderr(Err):
1066    """Function that raises an Exception in process and ignores stderr."""
1067    import io
1068    sys.stderr = io.StringIO()
1069    raise Err()
1070
1071
1072def _return_instance(cls):
1073    """Function that returns a instance of cls."""
1074    return cls()
1075
1076
1077class CrashAtPickle(object):
1078    """Bad object that triggers a segfault at pickling time."""
1079    def __reduce__(self):
1080        _crash()
1081
1082
1083class CrashAtUnpickle(object):
1084    """Bad object that triggers a segfault at unpickling time."""
1085    def __reduce__(self):
1086        return _crash, ()
1087
1088
1089class ExitAtPickle(object):
1090    """Bad object that triggers a process exit at pickling time."""
1091    def __reduce__(self):
1092        _exit()
1093
1094
1095class ExitAtUnpickle(object):
1096    """Bad object that triggers a process exit at unpickling time."""
1097    def __reduce__(self):
1098        return _exit, ()
1099
1100
1101class ErrorAtPickle(object):
1102    """Bad object that triggers an error at pickling time."""
1103    def __reduce__(self):
1104        from pickle import PicklingError
1105        raise PicklingError("Error in pickle")
1106
1107
1108class ErrorAtUnpickle(object):
1109    """Bad object that triggers an error at unpickling time."""
1110    def __reduce__(self):
1111        from pickle import UnpicklingError
1112        return _raise_error_ignore_stderr, (UnpicklingError, )
1113
1114
1115class ExecutorDeadlockTest:
1116    TIMEOUT = support.SHORT_TIMEOUT
1117
1118    def _fail_on_deadlock(self, executor):
1119        # If we did not recover before TIMEOUT seconds, consider that the
1120        # executor is in a deadlock state and forcefully clean all its
1121        # composants.
1122        import faulthandler
1123        from tempfile import TemporaryFile
1124        with TemporaryFile(mode="w+") as f:
1125            faulthandler.dump_traceback(file=f)
1126            f.seek(0)
1127            tb = f.read()
1128        for p in executor._processes.values():
1129            p.terminate()
1130        # This should be safe to call executor.shutdown here as all possible
1131        # deadlocks should have been broken.
1132        executor.shutdown(wait=True)
1133        print(f"\nTraceback:\n {tb}", file=sys.__stderr__)
1134        self.fail(f"Executor deadlock:\n\n{tb}")
1135
1136
1137    def _check_crash(self, error, func, *args, ignore_stderr=False):
1138        # test for deadlock caused by crashes in a pool
1139        self.executor.shutdown(wait=True)
1140
1141        executor = self.executor_type(
1142            max_workers=2, mp_context=get_context(self.ctx))
1143        res = executor.submit(func, *args)
1144
1145        if ignore_stderr:
1146            cm = support.captured_stderr()
1147        else:
1148            cm = contextlib.nullcontext()
1149
1150        try:
1151            with self.assertRaises(error):
1152                with cm:
1153                    res.result(timeout=self.TIMEOUT)
1154        except futures.TimeoutError:
1155            # If we did not recover before TIMEOUT seconds,
1156            # consider that the executor is in a deadlock state
1157            self._fail_on_deadlock(executor)
1158        executor.shutdown(wait=True)
1159
1160    def test_error_at_task_pickle(self):
1161        # Check problem occurring while pickling a task in
1162        # the task_handler thread
1163        self._check_crash(PicklingError, id, ErrorAtPickle())
1164
1165    def test_exit_at_task_unpickle(self):
1166        # Check problem occurring while unpickling a task on workers
1167        self._check_crash(BrokenProcessPool, id, ExitAtUnpickle())
1168
1169    def test_error_at_task_unpickle(self):
1170        # Check problem occurring while unpickling a task on workers
1171        self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle())
1172
1173    def test_crash_at_task_unpickle(self):
1174        # Check problem occurring while unpickling a task on workers
1175        self._check_crash(BrokenProcessPool, id, CrashAtUnpickle())
1176
1177    def test_crash_during_func_exec_on_worker(self):
1178        # Check problem occurring during func execution on workers
1179        self._check_crash(BrokenProcessPool, _crash)
1180
1181    def test_exit_during_func_exec_on_worker(self):
1182        # Check problem occurring during func execution on workers
1183        self._check_crash(SystemExit, _exit)
1184
1185    def test_error_during_func_exec_on_worker(self):
1186        # Check problem occurring during func execution on workers
1187        self._check_crash(RuntimeError, _raise_error, RuntimeError)
1188
1189    def test_crash_during_result_pickle_on_worker(self):
1190        # Check problem occurring while pickling a task result
1191        # on workers
1192        self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle)
1193
1194    def test_exit_during_result_pickle_on_worker(self):
1195        # Check problem occurring while pickling a task result
1196        # on workers
1197        self._check_crash(SystemExit, _return_instance, ExitAtPickle)
1198
1199    def test_error_during_result_pickle_on_worker(self):
1200        # Check problem occurring while pickling a task result
1201        # on workers
1202        self._check_crash(PicklingError, _return_instance, ErrorAtPickle)
1203
1204    def test_error_during_result_unpickle_in_result_handler(self):
1205        # Check problem occurring while unpickling a task in
1206        # the result_handler thread
1207        self._check_crash(BrokenProcessPool,
1208                          _return_instance, ErrorAtUnpickle,
1209                          ignore_stderr=True)
1210
1211    def test_exit_during_result_unpickle_in_result_handler(self):
1212        # Check problem occurring while unpickling a task in
1213        # the result_handler thread
1214        self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle)
1215
1216    def test_shutdown_deadlock(self):
1217        # Test that the pool calling shutdown do not cause deadlock
1218        # if a worker fails after the shutdown call.
1219        self.executor.shutdown(wait=True)
1220        with self.executor_type(max_workers=2,
1221                                mp_context=get_context(self.ctx)) as executor:
1222            self.executor = executor  # Allow clean up in fail_on_deadlock
1223            f = executor.submit(_crash, delay=.1)
1224            executor.shutdown(wait=True)
1225            with self.assertRaises(BrokenProcessPool):
1226                f.result()
1227
1228    def test_shutdown_deadlock_pickle(self):
1229        # Test that the pool calling shutdown with wait=False does not cause
1230        # a deadlock if a task fails at pickle after the shutdown call.
1231        # Reported in bpo-39104.
1232        self.executor.shutdown(wait=True)
1233        with self.executor_type(max_workers=2,
1234                                mp_context=get_context(self.ctx)) as executor:
1235            self.executor = executor  # Allow clean up in fail_on_deadlock
1236
1237            # Start the executor and get the executor_manager_thread to collect
1238            # the threads and avoid dangling thread that should be cleaned up
1239            # asynchronously.
1240            executor.submit(id, 42).result()
1241            executor_manager = executor._executor_manager_thread
1242
1243            # Submit a task that fails at pickle and shutdown the executor
1244            # without waiting
1245            f = executor.submit(id, ErrorAtPickle())
1246            executor.shutdown(wait=False)
1247            with self.assertRaises(PicklingError):
1248                f.result()
1249
1250        # Make sure the executor is eventually shutdown and do not leave
1251        # dangling threads
1252        executor_manager.join()
1253
1254
1255create_executor_tests(ExecutorDeadlockTest,
1256                      executor_mixins=(ProcessPoolForkMixin,
1257                                       ProcessPoolForkserverMixin,
1258                                       ProcessPoolSpawnMixin))
1259
1260
1261class FutureTests(BaseTestCase):
1262    def test_done_callback_with_result(self):
1263        callback_result = None
1264        def fn(callback_future):
1265            nonlocal callback_result
1266            callback_result = callback_future.result()
1267
1268        f = Future()
1269        f.add_done_callback(fn)
1270        f.set_result(5)
1271        self.assertEqual(5, callback_result)
1272
1273    def test_done_callback_with_exception(self):
1274        callback_exception = None
1275        def fn(callback_future):
1276            nonlocal callback_exception
1277            callback_exception = callback_future.exception()
1278
1279        f = Future()
1280        f.add_done_callback(fn)
1281        f.set_exception(Exception('test'))
1282        self.assertEqual(('test',), callback_exception.args)
1283
1284    def test_done_callback_with_cancel(self):
1285        was_cancelled = None
1286        def fn(callback_future):
1287            nonlocal was_cancelled
1288            was_cancelled = callback_future.cancelled()
1289
1290        f = Future()
1291        f.add_done_callback(fn)
1292        self.assertTrue(f.cancel())
1293        self.assertTrue(was_cancelled)
1294
1295    def test_done_callback_raises(self):
1296        with support.captured_stderr() as stderr:
1297            raising_was_called = False
1298            fn_was_called = False
1299
1300            def raising_fn(callback_future):
1301                nonlocal raising_was_called
1302                raising_was_called = True
1303                raise Exception('doh!')
1304
1305            def fn(callback_future):
1306                nonlocal fn_was_called
1307                fn_was_called = True
1308
1309            f = Future()
1310            f.add_done_callback(raising_fn)
1311            f.add_done_callback(fn)
1312            f.set_result(5)
1313            self.assertTrue(raising_was_called)
1314            self.assertTrue(fn_was_called)
1315            self.assertIn('Exception: doh!', stderr.getvalue())
1316
1317    def test_done_callback_already_successful(self):
1318        callback_result = None
1319        def fn(callback_future):
1320            nonlocal callback_result
1321            callback_result = callback_future.result()
1322
1323        f = Future()
1324        f.set_result(5)
1325        f.add_done_callback(fn)
1326        self.assertEqual(5, callback_result)
1327
1328    def test_done_callback_already_failed(self):
1329        callback_exception = None
1330        def fn(callback_future):
1331            nonlocal callback_exception
1332            callback_exception = callback_future.exception()
1333
1334        f = Future()
1335        f.set_exception(Exception('test'))
1336        f.add_done_callback(fn)
1337        self.assertEqual(('test',), callback_exception.args)
1338
1339    def test_done_callback_already_cancelled(self):
1340        was_cancelled = None
1341        def fn(callback_future):
1342            nonlocal was_cancelled
1343            was_cancelled = callback_future.cancelled()
1344
1345        f = Future()
1346        self.assertTrue(f.cancel())
1347        f.add_done_callback(fn)
1348        self.assertTrue(was_cancelled)
1349
1350    def test_done_callback_raises_already_succeeded(self):
1351        with support.captured_stderr() as stderr:
1352            def raising_fn(callback_future):
1353                raise Exception('doh!')
1354
1355            f = Future()
1356
1357            # Set the result first to simulate a future that runs instantly,
1358            # effectively allowing the callback to be run immediately.
1359            f.set_result(5)
1360            f.add_done_callback(raising_fn)
1361
1362            self.assertIn('exception calling callback for', stderr.getvalue())
1363            self.assertIn('doh!', stderr.getvalue())
1364
1365
1366    def test_repr(self):
1367        self.assertRegex(repr(PENDING_FUTURE),
1368                         '<Future at 0x[0-9a-f]+ state=pending>')
1369        self.assertRegex(repr(RUNNING_FUTURE),
1370                         '<Future at 0x[0-9a-f]+ state=running>')
1371        self.assertRegex(repr(CANCELLED_FUTURE),
1372                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1373        self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE),
1374                         '<Future at 0x[0-9a-f]+ state=cancelled>')
1375        self.assertRegex(
1376                repr(EXCEPTION_FUTURE),
1377                '<Future at 0x[0-9a-f]+ state=finished raised OSError>')
1378        self.assertRegex(
1379                repr(SUCCESSFUL_FUTURE),
1380                '<Future at 0x[0-9a-f]+ state=finished returned int>')
1381
1382
1383    def test_cancel(self):
1384        f1 = create_future(state=PENDING)
1385        f2 = create_future(state=RUNNING)
1386        f3 = create_future(state=CANCELLED)
1387        f4 = create_future(state=CANCELLED_AND_NOTIFIED)
1388        f5 = create_future(state=FINISHED, exception=OSError())
1389        f6 = create_future(state=FINISHED, result=5)
1390
1391        self.assertTrue(f1.cancel())
1392        self.assertEqual(f1._state, CANCELLED)
1393
1394        self.assertFalse(f2.cancel())
1395        self.assertEqual(f2._state, RUNNING)
1396
1397        self.assertTrue(f3.cancel())
1398        self.assertEqual(f3._state, CANCELLED)
1399
1400        self.assertTrue(f4.cancel())
1401        self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED)
1402
1403        self.assertFalse(f5.cancel())
1404        self.assertEqual(f5._state, FINISHED)
1405
1406        self.assertFalse(f6.cancel())
1407        self.assertEqual(f6._state, FINISHED)
1408
1409    def test_cancelled(self):
1410        self.assertFalse(PENDING_FUTURE.cancelled())
1411        self.assertFalse(RUNNING_FUTURE.cancelled())
1412        self.assertTrue(CANCELLED_FUTURE.cancelled())
1413        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled())
1414        self.assertFalse(EXCEPTION_FUTURE.cancelled())
1415        self.assertFalse(SUCCESSFUL_FUTURE.cancelled())
1416
1417    def test_done(self):
1418        self.assertFalse(PENDING_FUTURE.done())
1419        self.assertFalse(RUNNING_FUTURE.done())
1420        self.assertTrue(CANCELLED_FUTURE.done())
1421        self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done())
1422        self.assertTrue(EXCEPTION_FUTURE.done())
1423        self.assertTrue(SUCCESSFUL_FUTURE.done())
1424
1425    def test_running(self):
1426        self.assertFalse(PENDING_FUTURE.running())
1427        self.assertTrue(RUNNING_FUTURE.running())
1428        self.assertFalse(CANCELLED_FUTURE.running())
1429        self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running())
1430        self.assertFalse(EXCEPTION_FUTURE.running())
1431        self.assertFalse(SUCCESSFUL_FUTURE.running())
1432
1433    def test_result_with_timeout(self):
1434        self.assertRaises(futures.TimeoutError,
1435                          PENDING_FUTURE.result, timeout=0)
1436        self.assertRaises(futures.TimeoutError,
1437                          RUNNING_FUTURE.result, timeout=0)
1438        self.assertRaises(futures.CancelledError,
1439                          CANCELLED_FUTURE.result, timeout=0)
1440        self.assertRaises(futures.CancelledError,
1441                          CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0)
1442        self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0)
1443        self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42)
1444
1445    def test_result_with_success(self):
1446        # TODO(brian@sweetapp.com): This test is timing dependent.
1447        def notification():
1448            # Wait until the main thread is waiting for the result.
1449            time.sleep(1)
1450            f1.set_result(42)
1451
1452        f1 = create_future(state=PENDING)
1453        t = threading.Thread(target=notification)
1454        t.start()
1455
1456        self.assertEqual(f1.result(timeout=5), 42)
1457        t.join()
1458
1459    def test_result_with_cancel(self):
1460        # TODO(brian@sweetapp.com): This test is timing dependent.
1461        def notification():
1462            # Wait until the main thread is waiting for the result.
1463            time.sleep(1)
1464            f1.cancel()
1465
1466        f1 = create_future(state=PENDING)
1467        t = threading.Thread(target=notification)
1468        t.start()
1469
1470        self.assertRaises(futures.CancelledError,
1471                          f1.result, timeout=support.SHORT_TIMEOUT)
1472        t.join()
1473
1474    def test_exception_with_timeout(self):
1475        self.assertRaises(futures.TimeoutError,
1476                          PENDING_FUTURE.exception, timeout=0)
1477        self.assertRaises(futures.TimeoutError,
1478                          RUNNING_FUTURE.exception, timeout=0)
1479        self.assertRaises(futures.CancelledError,
1480                          CANCELLED_FUTURE.exception, timeout=0)
1481        self.assertRaises(futures.CancelledError,
1482                          CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0)
1483        self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0),
1484                                   OSError))
1485        self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None)
1486
1487    def test_exception_with_success(self):
1488        def notification():
1489            # Wait until the main thread is waiting for the exception.
1490            time.sleep(1)
1491            with f1._condition:
1492                f1._state = FINISHED
1493                f1._exception = OSError()
1494                f1._condition.notify_all()
1495
1496        f1 = create_future(state=PENDING)
1497        t = threading.Thread(target=notification)
1498        t.start()
1499
1500        self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError))
1501        t.join()
1502
1503    def test_multiple_set_result(self):
1504        f = create_future(state=PENDING)
1505        f.set_result(1)
1506
1507        with self.assertRaisesRegex(
1508                futures.InvalidStateError,
1509                'FINISHED: <Future at 0x[0-9a-f]+ '
1510                'state=finished returned int>'
1511        ):
1512            f.set_result(2)
1513
1514        self.assertTrue(f.done())
1515        self.assertEqual(f.result(), 1)
1516
1517    def test_multiple_set_exception(self):
1518        f = create_future(state=PENDING)
1519        e = ValueError()
1520        f.set_exception(e)
1521
1522        with self.assertRaisesRegex(
1523                futures.InvalidStateError,
1524                'FINISHED: <Future at 0x[0-9a-f]+ '
1525                'state=finished raised ValueError>'
1526        ):
1527            f.set_exception(Exception())
1528
1529        self.assertEqual(f.exception(), e)
1530
1531
1532def setUpModule():
1533    unittest.addModuleCleanup(multiprocessing.util._cleanup_tests)
1534    thread_info = threading_helper.threading_setup()
1535    unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info)
1536
1537
1538if __name__ == "__main__":
1539    unittest.main()
1540