1from test import support 2from test.support import import_helper 3from test.support import threading_helper 4 5# Skip tests if _multiprocessing wasn't built. 6import_helper.import_module('_multiprocessing') 7 8from test.support import hashlib_helper 9from test.support.script_helper import assert_python_ok 10 11import contextlib 12import itertools 13import logging 14from logging.handlers import QueueHandler 15import os 16import queue 17import sys 18import threading 19import time 20import unittest 21import weakref 22from pickle import PicklingError 23 24from concurrent import futures 25from concurrent.futures._base import ( 26 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 27 BrokenExecutor) 28from concurrent.futures.process import BrokenProcessPool, _check_system_limits 29from multiprocessing import get_context 30 31import multiprocessing.process 32import multiprocessing.util 33 34 35def create_future(state=PENDING, exception=None, result=None): 36 f = Future() 37 f._state = state 38 f._exception = exception 39 f._result = result 40 return f 41 42 43PENDING_FUTURE = create_future(state=PENDING) 44RUNNING_FUTURE = create_future(state=RUNNING) 45CANCELLED_FUTURE = create_future(state=CANCELLED) 46CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 47EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 48SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 49 50INITIALIZER_STATUS = 'uninitialized' 51 52 53def mul(x, y): 54 return x * y 55 56def capture(*args, **kwargs): 57 return args, kwargs 58 59def sleep_and_raise(t): 60 time.sleep(t) 61 raise Exception('this is an exception') 62 63def sleep_and_print(t, msg): 64 time.sleep(t) 65 print(msg) 66 sys.stdout.flush() 67 68def init(x): 69 global INITIALIZER_STATUS 70 INITIALIZER_STATUS = x 71 72def get_init_status(): 73 return INITIALIZER_STATUS 74 75def init_fail(log_queue=None): 76 if log_queue is not None: 77 logger = logging.getLogger('concurrent.futures') 78 logger.addHandler(QueueHandler(log_queue)) 79 logger.setLevel('CRITICAL') 80 logger.propagate = False 81 time.sleep(0.1) # let some futures be scheduled 82 raise ValueError('error in initializer') 83 84 85class MyObject(object): 86 def my_method(self): 87 pass 88 89 90class EventfulGCObj(): 91 def __init__(self, mgr): 92 self.event = mgr.Event() 93 94 def __del__(self): 95 self.event.set() 96 97 98def make_dummy_object(_): 99 return MyObject() 100 101 102class BaseTestCase(unittest.TestCase): 103 def setUp(self): 104 self._thread_key = threading_helper.threading_setup() 105 106 def tearDown(self): 107 support.reap_children() 108 threading_helper.threading_cleanup(*self._thread_key) 109 110 111class ExecutorMixin: 112 worker_count = 5 113 executor_kwargs = {} 114 115 def setUp(self): 116 super().setUp() 117 118 self.t1 = time.monotonic() 119 if hasattr(self, "ctx"): 120 self.executor = self.executor_type( 121 max_workers=self.worker_count, 122 mp_context=self.get_context(), 123 **self.executor_kwargs) 124 else: 125 self.executor = self.executor_type( 126 max_workers=self.worker_count, 127 **self.executor_kwargs) 128 self._prime_executor() 129 130 def tearDown(self): 131 self.executor.shutdown(wait=True) 132 self.executor = None 133 134 dt = time.monotonic() - self.t1 135 if support.verbose: 136 print("%.2fs" % dt, end=' ') 137 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 138 139 super().tearDown() 140 141 def get_context(self): 142 return get_context(self.ctx) 143 144 def _prime_executor(self): 145 # Make sure that the executor is ready to do work before running the 146 # tests. This should reduce the probability of timeouts in the tests. 147 futures = [self.executor.submit(time.sleep, 0.1) 148 for _ in range(self.worker_count)] 149 for f in futures: 150 f.result() 151 152 153class ThreadPoolMixin(ExecutorMixin): 154 executor_type = futures.ThreadPoolExecutor 155 156 157class ProcessPoolForkMixin(ExecutorMixin): 158 executor_type = futures.ProcessPoolExecutor 159 ctx = "fork" 160 161 def get_context(self): 162 try: 163 _check_system_limits() 164 except NotImplementedError: 165 self.skipTest("ProcessPoolExecutor unavailable on this system") 166 if sys.platform == "win32": 167 self.skipTest("require unix system") 168 return super().get_context() 169 170 171class ProcessPoolSpawnMixin(ExecutorMixin): 172 executor_type = futures.ProcessPoolExecutor 173 ctx = "spawn" 174 175 def get_context(self): 176 try: 177 _check_system_limits() 178 except NotImplementedError: 179 self.skipTest("ProcessPoolExecutor unavailable on this system") 180 return super().get_context() 181 182 183class ProcessPoolForkserverMixin(ExecutorMixin): 184 executor_type = futures.ProcessPoolExecutor 185 ctx = "forkserver" 186 187 def get_context(self): 188 try: 189 _check_system_limits() 190 except NotImplementedError: 191 self.skipTest("ProcessPoolExecutor unavailable on this system") 192 if sys.platform == "win32": 193 self.skipTest("require unix system") 194 return super().get_context() 195 196 197def create_executor_tests(mixin, bases=(BaseTestCase,), 198 executor_mixins=(ThreadPoolMixin, 199 ProcessPoolForkMixin, 200 ProcessPoolForkserverMixin, 201 ProcessPoolSpawnMixin)): 202 def strip_mixin(name): 203 if name.endswith(('Mixin', 'Tests')): 204 return name[:-5] 205 elif name.endswith('Test'): 206 return name[:-4] 207 else: 208 return name 209 210 for exe in executor_mixins: 211 name = ("%s%sTest" 212 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 213 cls = type(name, (mixin,) + (exe,) + bases, {}) 214 globals()[name] = cls 215 216 217class InitializerMixin(ExecutorMixin): 218 worker_count = 2 219 220 def setUp(self): 221 global INITIALIZER_STATUS 222 INITIALIZER_STATUS = 'uninitialized' 223 self.executor_kwargs = dict(initializer=init, 224 initargs=('initialized',)) 225 super().setUp() 226 227 def test_initializer(self): 228 futures = [self.executor.submit(get_init_status) 229 for _ in range(self.worker_count)] 230 231 for f in futures: 232 self.assertEqual(f.result(), 'initialized') 233 234 235class FailingInitializerMixin(ExecutorMixin): 236 worker_count = 2 237 238 def setUp(self): 239 if hasattr(self, "ctx"): 240 # Pass a queue to redirect the child's logging output 241 self.mp_context = self.get_context() 242 self.log_queue = self.mp_context.Queue() 243 self.executor_kwargs = dict(initializer=init_fail, 244 initargs=(self.log_queue,)) 245 else: 246 # In a thread pool, the child shares our logging setup 247 # (see _assert_logged()) 248 self.mp_context = None 249 self.log_queue = None 250 self.executor_kwargs = dict(initializer=init_fail) 251 super().setUp() 252 253 def test_initializer(self): 254 with self._assert_logged('ValueError: error in initializer'): 255 try: 256 future = self.executor.submit(get_init_status) 257 except BrokenExecutor: 258 # Perhaps the executor is already broken 259 pass 260 else: 261 with self.assertRaises(BrokenExecutor): 262 future.result() 263 # At some point, the executor should break 264 t1 = time.monotonic() 265 while not self.executor._broken: 266 if time.monotonic() - t1 > 5: 267 self.fail("executor not broken after 5 s.") 268 time.sleep(0.01) 269 # ... and from this point submit() is guaranteed to fail 270 with self.assertRaises(BrokenExecutor): 271 self.executor.submit(get_init_status) 272 273 def _prime_executor(self): 274 pass 275 276 @contextlib.contextmanager 277 def _assert_logged(self, msg): 278 if self.log_queue is not None: 279 yield 280 output = [] 281 try: 282 while True: 283 output.append(self.log_queue.get_nowait().getMessage()) 284 except queue.Empty: 285 pass 286 else: 287 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 288 yield 289 output = cm.output 290 self.assertTrue(any(msg in line for line in output), 291 output) 292 293 294create_executor_tests(InitializerMixin) 295create_executor_tests(FailingInitializerMixin) 296 297 298class ExecutorShutdownTest: 299 def test_run_after_shutdown(self): 300 self.executor.shutdown() 301 self.assertRaises(RuntimeError, 302 self.executor.submit, 303 pow, 2, 5) 304 305 def test_interpreter_shutdown(self): 306 # Test the atexit hook for shutdown of worker threads and processes 307 rc, out, err = assert_python_ok('-c', """if 1: 308 from concurrent.futures import {executor_type} 309 from time import sleep 310 from test.test_concurrent_futures import sleep_and_print 311 if __name__ == "__main__": 312 context = '{context}' 313 if context == "": 314 t = {executor_type}(5) 315 else: 316 from multiprocessing import get_context 317 context = get_context(context) 318 t = {executor_type}(5, mp_context=context) 319 t.submit(sleep_and_print, 1.0, "apple") 320 """.format(executor_type=self.executor_type.__name__, 321 context=getattr(self, "ctx", ""))) 322 # Errors in atexit hooks don't change the process exit code, check 323 # stderr manually. 324 self.assertFalse(err) 325 self.assertEqual(out.strip(), b"apple") 326 327 def test_submit_after_interpreter_shutdown(self): 328 # Test the atexit hook for shutdown of worker threads and processes 329 rc, out, err = assert_python_ok('-c', """if 1: 330 import atexit 331 @atexit.register 332 def run_last(): 333 try: 334 t.submit(id, None) 335 except RuntimeError: 336 print("runtime-error") 337 raise 338 from concurrent.futures import {executor_type} 339 if __name__ == "__main__": 340 context = '{context}' 341 if not context: 342 t = {executor_type}(5) 343 else: 344 from multiprocessing import get_context 345 context = get_context(context) 346 t = {executor_type}(5, mp_context=context) 347 t.submit(id, 42).result() 348 """.format(executor_type=self.executor_type.__name__, 349 context=getattr(self, "ctx", ""))) 350 # Errors in atexit hooks don't change the process exit code, check 351 # stderr manually. 352 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 353 self.assertEqual(out.strip(), b"runtime-error") 354 355 def test_hang_issue12364(self): 356 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 357 self.executor.shutdown() 358 for f in fs: 359 f.result() 360 361 def test_cancel_futures(self): 362 executor = self.executor_type(max_workers=3) 363 fs = [executor.submit(time.sleep, .1) for _ in range(50)] 364 executor.shutdown(cancel_futures=True) 365 # We can't guarantee the exact number of cancellations, but we can 366 # guarantee that *some* were cancelled. With setting max_workers to 3, 367 # most of the submitted futures should have been cancelled. 368 cancelled = [fut for fut in fs if fut.cancelled()] 369 self.assertTrue(len(cancelled) >= 35, msg=f"{len(cancelled)=}") 370 371 # Ensure the other futures were able to finish. 372 # Use "not fut.cancelled()" instead of "fut.done()" to include futures 373 # that may have been left in a pending state. 374 others = [fut for fut in fs if not fut.cancelled()] 375 for fut in others: 376 self.assertTrue(fut.done(), msg=f"{fut._state=}") 377 self.assertIsNone(fut.exception()) 378 379 # Similar to the number of cancelled futures, we can't guarantee the 380 # exact number that completed. But, we can guarantee that at least 381 # one finished. 382 self.assertTrue(len(others) > 0, msg=f"{len(others)=}") 383 384 def test_hang_issue39205(self): 385 """shutdown(wait=False) doesn't hang at exit with running futures. 386 387 See https://bugs.python.org/issue39205. 388 """ 389 if self.executor_type == futures.ProcessPoolExecutor: 390 raise unittest.SkipTest( 391 "Hangs due to https://bugs.python.org/issue39205") 392 393 rc, out, err = assert_python_ok('-c', """if True: 394 from concurrent.futures import {executor_type} 395 from test.test_concurrent_futures import sleep_and_print 396 if __name__ == "__main__": 397 t = {executor_type}(max_workers=3) 398 t.submit(sleep_and_print, 1.0, "apple") 399 t.shutdown(wait=False) 400 """.format(executor_type=self.executor_type.__name__)) 401 self.assertFalse(err) 402 self.assertEqual(out.strip(), b"apple") 403 404 405class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 406 def _prime_executor(self): 407 pass 408 409 def test_threads_terminate(self): 410 def acquire_lock(lock): 411 lock.acquire() 412 413 sem = threading.Semaphore(0) 414 for i in range(3): 415 self.executor.submit(acquire_lock, sem) 416 self.assertEqual(len(self.executor._threads), 3) 417 for i in range(3): 418 sem.release() 419 self.executor.shutdown() 420 for t in self.executor._threads: 421 t.join() 422 423 def test_context_manager_shutdown(self): 424 with futures.ThreadPoolExecutor(max_workers=5) as e: 425 executor = e 426 self.assertEqual(list(e.map(abs, range(-5, 5))), 427 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 428 429 for t in executor._threads: 430 t.join() 431 432 def test_del_shutdown(self): 433 executor = futures.ThreadPoolExecutor(max_workers=5) 434 res = executor.map(abs, range(-5, 5)) 435 threads = executor._threads 436 del executor 437 438 for t in threads: 439 t.join() 440 441 # Make sure the results were all computed before the 442 # executor got shutdown. 443 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 444 445 def test_shutdown_no_wait(self): 446 # Ensure that the executor cleans up the threads when calling 447 # shutdown with wait=False 448 executor = futures.ThreadPoolExecutor(max_workers=5) 449 res = executor.map(abs, range(-5, 5)) 450 threads = executor._threads 451 executor.shutdown(wait=False) 452 for t in threads: 453 t.join() 454 455 # Make sure the results were all computed before the 456 # executor got shutdown. 457 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 458 459 460 def test_thread_names_assigned(self): 461 executor = futures.ThreadPoolExecutor( 462 max_workers=5, thread_name_prefix='SpecialPool') 463 executor.map(abs, range(-5, 5)) 464 threads = executor._threads 465 del executor 466 support.gc_collect() # For PyPy or other GCs. 467 468 for t in threads: 469 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 470 t.join() 471 472 def test_thread_names_default(self): 473 executor = futures.ThreadPoolExecutor(max_workers=5) 474 executor.map(abs, range(-5, 5)) 475 threads = executor._threads 476 del executor 477 support.gc_collect() # For PyPy or other GCs. 478 479 for t in threads: 480 # Ensure that our default name is reasonably sane and unique when 481 # no thread_name_prefix was supplied. 482 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 483 t.join() 484 485 def test_cancel_futures_wait_false(self): 486 # Can only be reliably tested for TPE, since PPE often hangs with 487 # `wait=False` (even without *cancel_futures*). 488 rc, out, err = assert_python_ok('-c', """if True: 489 from concurrent.futures import ThreadPoolExecutor 490 from test.test_concurrent_futures import sleep_and_print 491 if __name__ == "__main__": 492 t = ThreadPoolExecutor() 493 t.submit(sleep_and_print, .1, "apple") 494 t.shutdown(wait=False, cancel_futures=True) 495 """.format(executor_type=self.executor_type.__name__)) 496 # Errors in atexit hooks don't change the process exit code, check 497 # stderr manually. 498 self.assertFalse(err) 499 self.assertEqual(out.strip(), b"apple") 500 501 502class ProcessPoolShutdownTest(ExecutorShutdownTest): 503 def _prime_executor(self): 504 pass 505 506 def test_processes_terminate(self): 507 def acquire_lock(lock): 508 lock.acquire() 509 510 mp_context = get_context() 511 sem = mp_context.Semaphore(0) 512 for _ in range(3): 513 self.executor.submit(acquire_lock, sem) 514 self.assertEqual(len(self.executor._processes), 3) 515 for _ in range(3): 516 sem.release() 517 processes = self.executor._processes 518 self.executor.shutdown() 519 520 for p in processes.values(): 521 p.join() 522 523 def test_context_manager_shutdown(self): 524 with futures.ProcessPoolExecutor(max_workers=5) as e: 525 processes = e._processes 526 self.assertEqual(list(e.map(abs, range(-5, 5))), 527 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 528 529 for p in processes.values(): 530 p.join() 531 532 def test_del_shutdown(self): 533 executor = futures.ProcessPoolExecutor(max_workers=5) 534 res = executor.map(abs, range(-5, 5)) 535 executor_manager_thread = executor._executor_manager_thread 536 processes = executor._processes 537 call_queue = executor._call_queue 538 executor_manager_thread = executor._executor_manager_thread 539 del executor 540 support.gc_collect() # For PyPy or other GCs. 541 542 # Make sure that all the executor resources were properly cleaned by 543 # the shutdown process 544 executor_manager_thread.join() 545 for p in processes.values(): 546 p.join() 547 call_queue.join_thread() 548 549 # Make sure the results were all computed before the 550 # executor got shutdown. 551 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 552 553 def test_shutdown_no_wait(self): 554 # Ensure that the executor cleans up the processes when calling 555 # shutdown with wait=False 556 executor = futures.ProcessPoolExecutor(max_workers=5) 557 res = executor.map(abs, range(-5, 5)) 558 processes = executor._processes 559 call_queue = executor._call_queue 560 executor_manager_thread = executor._executor_manager_thread 561 executor.shutdown(wait=False) 562 563 # Make sure that all the executor resources were properly cleaned by 564 # the shutdown process 565 executor_manager_thread.join() 566 for p in processes.values(): 567 p.join() 568 call_queue.join_thread() 569 570 # Make sure the results were all computed before the executor got 571 # shutdown. 572 assert all([r == abs(v) for r, v in zip(res, range(-5, 5))]) 573 574 575create_executor_tests(ProcessPoolShutdownTest, 576 executor_mixins=(ProcessPoolForkMixin, 577 ProcessPoolForkserverMixin, 578 ProcessPoolSpawnMixin)) 579 580 581class WaitTests: 582 583 def test_first_completed(self): 584 future1 = self.executor.submit(mul, 21, 2) 585 future2 = self.executor.submit(time.sleep, 1.5) 586 587 done, not_done = futures.wait( 588 [CANCELLED_FUTURE, future1, future2], 589 return_when=futures.FIRST_COMPLETED) 590 591 self.assertEqual(set([future1]), done) 592 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 593 594 def test_first_completed_some_already_completed(self): 595 future1 = self.executor.submit(time.sleep, 1.5) 596 597 finished, pending = futures.wait( 598 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 599 return_when=futures.FIRST_COMPLETED) 600 601 self.assertEqual( 602 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 603 finished) 604 self.assertEqual(set([future1]), pending) 605 606 def test_first_exception(self): 607 future1 = self.executor.submit(mul, 2, 21) 608 future2 = self.executor.submit(sleep_and_raise, 1.5) 609 future3 = self.executor.submit(time.sleep, 3) 610 611 finished, pending = futures.wait( 612 [future1, future2, future3], 613 return_when=futures.FIRST_EXCEPTION) 614 615 self.assertEqual(set([future1, future2]), finished) 616 self.assertEqual(set([future3]), pending) 617 618 def test_first_exception_some_already_complete(self): 619 future1 = self.executor.submit(divmod, 21, 0) 620 future2 = self.executor.submit(time.sleep, 1.5) 621 622 finished, pending = futures.wait( 623 [SUCCESSFUL_FUTURE, 624 CANCELLED_FUTURE, 625 CANCELLED_AND_NOTIFIED_FUTURE, 626 future1, future2], 627 return_when=futures.FIRST_EXCEPTION) 628 629 self.assertEqual(set([SUCCESSFUL_FUTURE, 630 CANCELLED_AND_NOTIFIED_FUTURE, 631 future1]), finished) 632 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 633 634 def test_first_exception_one_already_failed(self): 635 future1 = self.executor.submit(time.sleep, 2) 636 637 finished, pending = futures.wait( 638 [EXCEPTION_FUTURE, future1], 639 return_when=futures.FIRST_EXCEPTION) 640 641 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 642 self.assertEqual(set([future1]), pending) 643 644 def test_all_completed(self): 645 future1 = self.executor.submit(divmod, 2, 0) 646 future2 = self.executor.submit(mul, 2, 21) 647 648 finished, pending = futures.wait( 649 [SUCCESSFUL_FUTURE, 650 CANCELLED_AND_NOTIFIED_FUTURE, 651 EXCEPTION_FUTURE, 652 future1, 653 future2], 654 return_when=futures.ALL_COMPLETED) 655 656 self.assertEqual(set([SUCCESSFUL_FUTURE, 657 CANCELLED_AND_NOTIFIED_FUTURE, 658 EXCEPTION_FUTURE, 659 future1, 660 future2]), finished) 661 self.assertEqual(set(), pending) 662 663 def test_timeout(self): 664 future1 = self.executor.submit(mul, 6, 7) 665 future2 = self.executor.submit(time.sleep, 6) 666 667 finished, pending = futures.wait( 668 [CANCELLED_AND_NOTIFIED_FUTURE, 669 EXCEPTION_FUTURE, 670 SUCCESSFUL_FUTURE, 671 future1, future2], 672 timeout=5, 673 return_when=futures.ALL_COMPLETED) 674 675 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 676 EXCEPTION_FUTURE, 677 SUCCESSFUL_FUTURE, 678 future1]), finished) 679 self.assertEqual(set([future2]), pending) 680 681 682class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 683 684 def test_pending_calls_race(self): 685 # Issue #14406: multi-threaded race condition when waiting on all 686 # futures. 687 event = threading.Event() 688 def future_func(): 689 event.wait() 690 oldswitchinterval = sys.getswitchinterval() 691 sys.setswitchinterval(1e-6) 692 try: 693 fs = {self.executor.submit(future_func) for i in range(100)} 694 event.set() 695 futures.wait(fs, return_when=futures.ALL_COMPLETED) 696 finally: 697 sys.setswitchinterval(oldswitchinterval) 698 699 700create_executor_tests(WaitTests, 701 executor_mixins=(ProcessPoolForkMixin, 702 ProcessPoolForkserverMixin, 703 ProcessPoolSpawnMixin)) 704 705 706class AsCompletedTests: 707 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 708 def test_no_timeout(self): 709 future1 = self.executor.submit(mul, 2, 21) 710 future2 = self.executor.submit(mul, 7, 6) 711 712 completed = set(futures.as_completed( 713 [CANCELLED_AND_NOTIFIED_FUTURE, 714 EXCEPTION_FUTURE, 715 SUCCESSFUL_FUTURE, 716 future1, future2])) 717 self.assertEqual(set( 718 [CANCELLED_AND_NOTIFIED_FUTURE, 719 EXCEPTION_FUTURE, 720 SUCCESSFUL_FUTURE, 721 future1, future2]), 722 completed) 723 724 def test_zero_timeout(self): 725 future1 = self.executor.submit(time.sleep, 2) 726 completed_futures = set() 727 try: 728 for future in futures.as_completed( 729 [CANCELLED_AND_NOTIFIED_FUTURE, 730 EXCEPTION_FUTURE, 731 SUCCESSFUL_FUTURE, 732 future1], 733 timeout=0): 734 completed_futures.add(future) 735 except futures.TimeoutError: 736 pass 737 738 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 739 EXCEPTION_FUTURE, 740 SUCCESSFUL_FUTURE]), 741 completed_futures) 742 743 def test_duplicate_futures(self): 744 # Issue 20367. Duplicate futures should not raise exceptions or give 745 # duplicate responses. 746 # Issue #31641: accept arbitrary iterables. 747 future1 = self.executor.submit(time.sleep, 2) 748 completed = [ 749 f for f in futures.as_completed(itertools.repeat(future1, 3)) 750 ] 751 self.assertEqual(len(completed), 1) 752 753 def test_free_reference_yielded_future(self): 754 # Issue #14406: Generator should not keep references 755 # to finished futures. 756 futures_list = [Future() for _ in range(8)] 757 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 758 futures_list.append(create_future(state=FINISHED, result=42)) 759 760 with self.assertRaises(futures.TimeoutError): 761 for future in futures.as_completed(futures_list, timeout=0): 762 futures_list.remove(future) 763 wr = weakref.ref(future) 764 del future 765 support.gc_collect() # For PyPy or other GCs. 766 self.assertIsNone(wr()) 767 768 futures_list[0].set_result("test") 769 for future in futures.as_completed(futures_list): 770 futures_list.remove(future) 771 wr = weakref.ref(future) 772 del future 773 support.gc_collect() # For PyPy or other GCs. 774 self.assertIsNone(wr()) 775 if futures_list: 776 futures_list[0].set_result("test") 777 778 def test_correct_timeout_exception_msg(self): 779 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 780 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 781 782 with self.assertRaises(futures.TimeoutError) as cm: 783 list(futures.as_completed(futures_list, timeout=0)) 784 785 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 786 787 788create_executor_tests(AsCompletedTests) 789 790 791class ExecutorTest: 792 # Executor.shutdown() and context manager usage is tested by 793 # ExecutorShutdownTest. 794 def test_submit(self): 795 future = self.executor.submit(pow, 2, 8) 796 self.assertEqual(256, future.result()) 797 798 def test_submit_keyword(self): 799 future = self.executor.submit(mul, 2, y=8) 800 self.assertEqual(16, future.result()) 801 future = self.executor.submit(capture, 1, self=2, fn=3) 802 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 803 with self.assertRaises(TypeError): 804 self.executor.submit(fn=capture, arg=1) 805 with self.assertRaises(TypeError): 806 self.executor.submit(arg=1) 807 808 def test_map(self): 809 self.assertEqual( 810 list(self.executor.map(pow, range(10), range(10))), 811 list(map(pow, range(10), range(10)))) 812 813 self.assertEqual( 814 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 815 list(map(pow, range(10), range(10)))) 816 817 def test_map_exception(self): 818 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 819 self.assertEqual(i.__next__(), (0, 1)) 820 self.assertEqual(i.__next__(), (0, 1)) 821 self.assertRaises(ZeroDivisionError, i.__next__) 822 823 def test_map_timeout(self): 824 results = [] 825 try: 826 for i in self.executor.map(time.sleep, 827 [0, 0, 6], 828 timeout=5): 829 results.append(i) 830 except futures.TimeoutError: 831 pass 832 else: 833 self.fail('expected TimeoutError') 834 835 self.assertEqual([None, None], results) 836 837 def test_shutdown_race_issue12456(self): 838 # Issue #12456: race condition at shutdown where trying to post a 839 # sentinel in the call queue blocks (the queue is full while processes 840 # have exited). 841 self.executor.map(str, [2] * (self.worker_count + 1)) 842 self.executor.shutdown() 843 844 @support.cpython_only 845 def test_no_stale_references(self): 846 # Issue #16284: check that the executors don't unnecessarily hang onto 847 # references. 848 my_object = MyObject() 849 my_object_collected = threading.Event() 850 my_object_callback = weakref.ref( 851 my_object, lambda obj: my_object_collected.set()) 852 # Deliberately discarding the future. 853 self.executor.submit(my_object.my_method) 854 del my_object 855 856 collected = my_object_collected.wait(timeout=support.SHORT_TIMEOUT) 857 self.assertTrue(collected, 858 "Stale reference not collected within timeout.") 859 860 def test_max_workers_negative(self): 861 for number in (0, -1): 862 with self.assertRaisesRegex(ValueError, 863 "max_workers must be greater " 864 "than 0"): 865 self.executor_type(max_workers=number) 866 867 def test_free_reference(self): 868 # Issue #14406: Result iterator should not keep an internal 869 # reference to result objects. 870 for obj in self.executor.map(make_dummy_object, range(10)): 871 wr = weakref.ref(obj) 872 del obj 873 support.gc_collect() # For PyPy or other GCs. 874 self.assertIsNone(wr()) 875 876 877class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 878 def test_map_submits_without_iteration(self): 879 """Tests verifying issue 11777.""" 880 finished = [] 881 def record_finished(n): 882 finished.append(n) 883 884 self.executor.map(record_finished, range(10)) 885 self.executor.shutdown(wait=True) 886 self.assertCountEqual(finished, range(10)) 887 888 def test_default_workers(self): 889 executor = self.executor_type() 890 expected = min(32, (os.cpu_count() or 1) + 4) 891 self.assertEqual(executor._max_workers, expected) 892 893 def test_saturation(self): 894 executor = self.executor_type(4) 895 def acquire_lock(lock): 896 lock.acquire() 897 898 sem = threading.Semaphore(0) 899 for i in range(15 * executor._max_workers): 900 executor.submit(acquire_lock, sem) 901 self.assertEqual(len(executor._threads), executor._max_workers) 902 for i in range(15 * executor._max_workers): 903 sem.release() 904 executor.shutdown(wait=True) 905 906 def test_idle_thread_reuse(self): 907 executor = self.executor_type() 908 executor.submit(mul, 21, 2).result() 909 executor.submit(mul, 6, 7).result() 910 executor.submit(mul, 3, 14).result() 911 self.assertEqual(len(executor._threads), 1) 912 executor.shutdown(wait=True) 913 914 @unittest.skipUnless(hasattr(os, 'register_at_fork'), 'need os.register_at_fork') 915 def test_hang_global_shutdown_lock(self): 916 # bpo-45021: _global_shutdown_lock should be reinitialized in the child 917 # process, otherwise it will never exit 918 def submit(pool): 919 pool.submit(submit, pool) 920 921 with futures.ThreadPoolExecutor(1) as pool: 922 pool.submit(submit, pool) 923 924 for _ in range(50): 925 with futures.ProcessPoolExecutor(1, mp_context=get_context('fork')) as workers: 926 workers.submit(tuple) 927 928 929class ProcessPoolExecutorTest(ExecutorTest): 930 931 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 932 def test_max_workers_too_large(self): 933 with self.assertRaisesRegex(ValueError, 934 "max_workers must be <= 61"): 935 futures.ProcessPoolExecutor(max_workers=62) 936 937 def test_killed_child(self): 938 # When a child process is abruptly terminated, the whole pool gets 939 # "broken". 940 futures = [self.executor.submit(time.sleep, 3)] 941 # Get one of the processes, and terminate (kill) it 942 p = next(iter(self.executor._processes.values())) 943 p.terminate() 944 for fut in futures: 945 self.assertRaises(BrokenProcessPool, fut.result) 946 # Submitting other jobs fails as well. 947 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 948 949 def test_map_chunksize(self): 950 def bad_map(): 951 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 952 953 ref = list(map(pow, range(40), range(40))) 954 self.assertEqual( 955 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 956 ref) 957 self.assertEqual( 958 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 959 ref) 960 self.assertEqual( 961 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 962 ref) 963 self.assertRaises(ValueError, bad_map) 964 965 @classmethod 966 def _test_traceback(cls): 967 raise RuntimeError(123) # some comment 968 969 def test_traceback(self): 970 # We want ensure that the traceback from the child process is 971 # contained in the traceback raised in the main process. 972 future = self.executor.submit(self._test_traceback) 973 with self.assertRaises(Exception) as cm: 974 future.result() 975 976 exc = cm.exception 977 self.assertIs(type(exc), RuntimeError) 978 self.assertEqual(exc.args, (123,)) 979 cause = exc.__cause__ 980 self.assertIs(type(cause), futures.process._RemoteTraceback) 981 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 982 983 with support.captured_stderr() as f1: 984 try: 985 raise exc 986 except RuntimeError: 987 sys.excepthook(*sys.exc_info()) 988 self.assertIn('raise RuntimeError(123) # some comment', 989 f1.getvalue()) 990 991 @hashlib_helper.requires_hashdigest('md5') 992 def test_ressources_gced_in_workers(self): 993 # Ensure that argument for a job are correctly gc-ed after the job 994 # is finished 995 mgr = get_context(self.ctx).Manager() 996 obj = EventfulGCObj(mgr) 997 future = self.executor.submit(id, obj) 998 future.result() 999 1000 self.assertTrue(obj.event.wait(timeout=1)) 1001 1002 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 1003 # is called while manager is still running. 1004 obj = None 1005 support.gc_collect() 1006 1007 mgr.shutdown() 1008 mgr.join() 1009 1010 def test_saturation(self): 1011 executor = self.executor_type(4) 1012 mp_context = get_context() 1013 sem = mp_context.Semaphore(0) 1014 job_count = 15 * executor._max_workers 1015 try: 1016 for _ in range(job_count): 1017 executor.submit(sem.acquire) 1018 self.assertEqual(len(executor._processes), executor._max_workers) 1019 for _ in range(job_count): 1020 sem.release() 1021 finally: 1022 executor.shutdown() 1023 1024 def test_idle_process_reuse_one(self): 1025 executor = self.executor_type(4) 1026 executor.submit(mul, 21, 2).result() 1027 executor.submit(mul, 6, 7).result() 1028 executor.submit(mul, 3, 14).result() 1029 self.assertEqual(len(executor._processes), 1) 1030 executor.shutdown() 1031 1032 def test_idle_process_reuse_multiple(self): 1033 executor = self.executor_type(4) 1034 executor.submit(mul, 12, 7).result() 1035 executor.submit(mul, 33, 25) 1036 executor.submit(mul, 25, 26).result() 1037 executor.submit(mul, 18, 29) 1038 self.assertLessEqual(len(executor._processes), 2) 1039 executor.shutdown() 1040 1041create_executor_tests(ProcessPoolExecutorTest, 1042 executor_mixins=(ProcessPoolForkMixin, 1043 ProcessPoolForkserverMixin, 1044 ProcessPoolSpawnMixin)) 1045 1046def _crash(delay=None): 1047 """Induces a segfault.""" 1048 if delay: 1049 time.sleep(delay) 1050 import faulthandler 1051 faulthandler.disable() 1052 faulthandler._sigsegv() 1053 1054 1055def _exit(): 1056 """Induces a sys exit with exitcode 1.""" 1057 sys.exit(1) 1058 1059 1060def _raise_error(Err): 1061 """Function that raises an Exception in process.""" 1062 raise Err() 1063 1064 1065def _raise_error_ignore_stderr(Err): 1066 """Function that raises an Exception in process and ignores stderr.""" 1067 import io 1068 sys.stderr = io.StringIO() 1069 raise Err() 1070 1071 1072def _return_instance(cls): 1073 """Function that returns a instance of cls.""" 1074 return cls() 1075 1076 1077class CrashAtPickle(object): 1078 """Bad object that triggers a segfault at pickling time.""" 1079 def __reduce__(self): 1080 _crash() 1081 1082 1083class CrashAtUnpickle(object): 1084 """Bad object that triggers a segfault at unpickling time.""" 1085 def __reduce__(self): 1086 return _crash, () 1087 1088 1089class ExitAtPickle(object): 1090 """Bad object that triggers a process exit at pickling time.""" 1091 def __reduce__(self): 1092 _exit() 1093 1094 1095class ExitAtUnpickle(object): 1096 """Bad object that triggers a process exit at unpickling time.""" 1097 def __reduce__(self): 1098 return _exit, () 1099 1100 1101class ErrorAtPickle(object): 1102 """Bad object that triggers an error at pickling time.""" 1103 def __reduce__(self): 1104 from pickle import PicklingError 1105 raise PicklingError("Error in pickle") 1106 1107 1108class ErrorAtUnpickle(object): 1109 """Bad object that triggers an error at unpickling time.""" 1110 def __reduce__(self): 1111 from pickle import UnpicklingError 1112 return _raise_error_ignore_stderr, (UnpicklingError, ) 1113 1114 1115class ExecutorDeadlockTest: 1116 TIMEOUT = support.SHORT_TIMEOUT 1117 1118 def _fail_on_deadlock(self, executor): 1119 # If we did not recover before TIMEOUT seconds, consider that the 1120 # executor is in a deadlock state and forcefully clean all its 1121 # composants. 1122 import faulthandler 1123 from tempfile import TemporaryFile 1124 with TemporaryFile(mode="w+") as f: 1125 faulthandler.dump_traceback(file=f) 1126 f.seek(0) 1127 tb = f.read() 1128 for p in executor._processes.values(): 1129 p.terminate() 1130 # This should be safe to call executor.shutdown here as all possible 1131 # deadlocks should have been broken. 1132 executor.shutdown(wait=True) 1133 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 1134 self.fail(f"Executor deadlock:\n\n{tb}") 1135 1136 1137 def _check_crash(self, error, func, *args, ignore_stderr=False): 1138 # test for deadlock caused by crashes in a pool 1139 self.executor.shutdown(wait=True) 1140 1141 executor = self.executor_type( 1142 max_workers=2, mp_context=get_context(self.ctx)) 1143 res = executor.submit(func, *args) 1144 1145 if ignore_stderr: 1146 cm = support.captured_stderr() 1147 else: 1148 cm = contextlib.nullcontext() 1149 1150 try: 1151 with self.assertRaises(error): 1152 with cm: 1153 res.result(timeout=self.TIMEOUT) 1154 except futures.TimeoutError: 1155 # If we did not recover before TIMEOUT seconds, 1156 # consider that the executor is in a deadlock state 1157 self._fail_on_deadlock(executor) 1158 executor.shutdown(wait=True) 1159 1160 def test_error_at_task_pickle(self): 1161 # Check problem occurring while pickling a task in 1162 # the task_handler thread 1163 self._check_crash(PicklingError, id, ErrorAtPickle()) 1164 1165 def test_exit_at_task_unpickle(self): 1166 # Check problem occurring while unpickling a task on workers 1167 self._check_crash(BrokenProcessPool, id, ExitAtUnpickle()) 1168 1169 def test_error_at_task_unpickle(self): 1170 # Check problem occurring while unpickling a task on workers 1171 self._check_crash(BrokenProcessPool, id, ErrorAtUnpickle()) 1172 1173 def test_crash_at_task_unpickle(self): 1174 # Check problem occurring while unpickling a task on workers 1175 self._check_crash(BrokenProcessPool, id, CrashAtUnpickle()) 1176 1177 def test_crash_during_func_exec_on_worker(self): 1178 # Check problem occurring during func execution on workers 1179 self._check_crash(BrokenProcessPool, _crash) 1180 1181 def test_exit_during_func_exec_on_worker(self): 1182 # Check problem occurring during func execution on workers 1183 self._check_crash(SystemExit, _exit) 1184 1185 def test_error_during_func_exec_on_worker(self): 1186 # Check problem occurring during func execution on workers 1187 self._check_crash(RuntimeError, _raise_error, RuntimeError) 1188 1189 def test_crash_during_result_pickle_on_worker(self): 1190 # Check problem occurring while pickling a task result 1191 # on workers 1192 self._check_crash(BrokenProcessPool, _return_instance, CrashAtPickle) 1193 1194 def test_exit_during_result_pickle_on_worker(self): 1195 # Check problem occurring while pickling a task result 1196 # on workers 1197 self._check_crash(SystemExit, _return_instance, ExitAtPickle) 1198 1199 def test_error_during_result_pickle_on_worker(self): 1200 # Check problem occurring while pickling a task result 1201 # on workers 1202 self._check_crash(PicklingError, _return_instance, ErrorAtPickle) 1203 1204 def test_error_during_result_unpickle_in_result_handler(self): 1205 # Check problem occurring while unpickling a task in 1206 # the result_handler thread 1207 self._check_crash(BrokenProcessPool, 1208 _return_instance, ErrorAtUnpickle, 1209 ignore_stderr=True) 1210 1211 def test_exit_during_result_unpickle_in_result_handler(self): 1212 # Check problem occurring while unpickling a task in 1213 # the result_handler thread 1214 self._check_crash(BrokenProcessPool, _return_instance, ExitAtUnpickle) 1215 1216 def test_shutdown_deadlock(self): 1217 # Test that the pool calling shutdown do not cause deadlock 1218 # if a worker fails after the shutdown call. 1219 self.executor.shutdown(wait=True) 1220 with self.executor_type(max_workers=2, 1221 mp_context=get_context(self.ctx)) as executor: 1222 self.executor = executor # Allow clean up in fail_on_deadlock 1223 f = executor.submit(_crash, delay=.1) 1224 executor.shutdown(wait=True) 1225 with self.assertRaises(BrokenProcessPool): 1226 f.result() 1227 1228 def test_shutdown_deadlock_pickle(self): 1229 # Test that the pool calling shutdown with wait=False does not cause 1230 # a deadlock if a task fails at pickle after the shutdown call. 1231 # Reported in bpo-39104. 1232 self.executor.shutdown(wait=True) 1233 with self.executor_type(max_workers=2, 1234 mp_context=get_context(self.ctx)) as executor: 1235 self.executor = executor # Allow clean up in fail_on_deadlock 1236 1237 # Start the executor and get the executor_manager_thread to collect 1238 # the threads and avoid dangling thread that should be cleaned up 1239 # asynchronously. 1240 executor.submit(id, 42).result() 1241 executor_manager = executor._executor_manager_thread 1242 1243 # Submit a task that fails at pickle and shutdown the executor 1244 # without waiting 1245 f = executor.submit(id, ErrorAtPickle()) 1246 executor.shutdown(wait=False) 1247 with self.assertRaises(PicklingError): 1248 f.result() 1249 1250 # Make sure the executor is eventually shutdown and do not leave 1251 # dangling threads 1252 executor_manager.join() 1253 1254 1255create_executor_tests(ExecutorDeadlockTest, 1256 executor_mixins=(ProcessPoolForkMixin, 1257 ProcessPoolForkserverMixin, 1258 ProcessPoolSpawnMixin)) 1259 1260 1261class FutureTests(BaseTestCase): 1262 def test_done_callback_with_result(self): 1263 callback_result = None 1264 def fn(callback_future): 1265 nonlocal callback_result 1266 callback_result = callback_future.result() 1267 1268 f = Future() 1269 f.add_done_callback(fn) 1270 f.set_result(5) 1271 self.assertEqual(5, callback_result) 1272 1273 def test_done_callback_with_exception(self): 1274 callback_exception = None 1275 def fn(callback_future): 1276 nonlocal callback_exception 1277 callback_exception = callback_future.exception() 1278 1279 f = Future() 1280 f.add_done_callback(fn) 1281 f.set_exception(Exception('test')) 1282 self.assertEqual(('test',), callback_exception.args) 1283 1284 def test_done_callback_with_cancel(self): 1285 was_cancelled = None 1286 def fn(callback_future): 1287 nonlocal was_cancelled 1288 was_cancelled = callback_future.cancelled() 1289 1290 f = Future() 1291 f.add_done_callback(fn) 1292 self.assertTrue(f.cancel()) 1293 self.assertTrue(was_cancelled) 1294 1295 def test_done_callback_raises(self): 1296 with support.captured_stderr() as stderr: 1297 raising_was_called = False 1298 fn_was_called = False 1299 1300 def raising_fn(callback_future): 1301 nonlocal raising_was_called 1302 raising_was_called = True 1303 raise Exception('doh!') 1304 1305 def fn(callback_future): 1306 nonlocal fn_was_called 1307 fn_was_called = True 1308 1309 f = Future() 1310 f.add_done_callback(raising_fn) 1311 f.add_done_callback(fn) 1312 f.set_result(5) 1313 self.assertTrue(raising_was_called) 1314 self.assertTrue(fn_was_called) 1315 self.assertIn('Exception: doh!', stderr.getvalue()) 1316 1317 def test_done_callback_already_successful(self): 1318 callback_result = None 1319 def fn(callback_future): 1320 nonlocal callback_result 1321 callback_result = callback_future.result() 1322 1323 f = Future() 1324 f.set_result(5) 1325 f.add_done_callback(fn) 1326 self.assertEqual(5, callback_result) 1327 1328 def test_done_callback_already_failed(self): 1329 callback_exception = None 1330 def fn(callback_future): 1331 nonlocal callback_exception 1332 callback_exception = callback_future.exception() 1333 1334 f = Future() 1335 f.set_exception(Exception('test')) 1336 f.add_done_callback(fn) 1337 self.assertEqual(('test',), callback_exception.args) 1338 1339 def test_done_callback_already_cancelled(self): 1340 was_cancelled = None 1341 def fn(callback_future): 1342 nonlocal was_cancelled 1343 was_cancelled = callback_future.cancelled() 1344 1345 f = Future() 1346 self.assertTrue(f.cancel()) 1347 f.add_done_callback(fn) 1348 self.assertTrue(was_cancelled) 1349 1350 def test_done_callback_raises_already_succeeded(self): 1351 with support.captured_stderr() as stderr: 1352 def raising_fn(callback_future): 1353 raise Exception('doh!') 1354 1355 f = Future() 1356 1357 # Set the result first to simulate a future that runs instantly, 1358 # effectively allowing the callback to be run immediately. 1359 f.set_result(5) 1360 f.add_done_callback(raising_fn) 1361 1362 self.assertIn('exception calling callback for', stderr.getvalue()) 1363 self.assertIn('doh!', stderr.getvalue()) 1364 1365 1366 def test_repr(self): 1367 self.assertRegex(repr(PENDING_FUTURE), 1368 '<Future at 0x[0-9a-f]+ state=pending>') 1369 self.assertRegex(repr(RUNNING_FUTURE), 1370 '<Future at 0x[0-9a-f]+ state=running>') 1371 self.assertRegex(repr(CANCELLED_FUTURE), 1372 '<Future at 0x[0-9a-f]+ state=cancelled>') 1373 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1374 '<Future at 0x[0-9a-f]+ state=cancelled>') 1375 self.assertRegex( 1376 repr(EXCEPTION_FUTURE), 1377 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1378 self.assertRegex( 1379 repr(SUCCESSFUL_FUTURE), 1380 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1381 1382 1383 def test_cancel(self): 1384 f1 = create_future(state=PENDING) 1385 f2 = create_future(state=RUNNING) 1386 f3 = create_future(state=CANCELLED) 1387 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1388 f5 = create_future(state=FINISHED, exception=OSError()) 1389 f6 = create_future(state=FINISHED, result=5) 1390 1391 self.assertTrue(f1.cancel()) 1392 self.assertEqual(f1._state, CANCELLED) 1393 1394 self.assertFalse(f2.cancel()) 1395 self.assertEqual(f2._state, RUNNING) 1396 1397 self.assertTrue(f3.cancel()) 1398 self.assertEqual(f3._state, CANCELLED) 1399 1400 self.assertTrue(f4.cancel()) 1401 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1402 1403 self.assertFalse(f5.cancel()) 1404 self.assertEqual(f5._state, FINISHED) 1405 1406 self.assertFalse(f6.cancel()) 1407 self.assertEqual(f6._state, FINISHED) 1408 1409 def test_cancelled(self): 1410 self.assertFalse(PENDING_FUTURE.cancelled()) 1411 self.assertFalse(RUNNING_FUTURE.cancelled()) 1412 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1413 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1414 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1415 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1416 1417 def test_done(self): 1418 self.assertFalse(PENDING_FUTURE.done()) 1419 self.assertFalse(RUNNING_FUTURE.done()) 1420 self.assertTrue(CANCELLED_FUTURE.done()) 1421 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1422 self.assertTrue(EXCEPTION_FUTURE.done()) 1423 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1424 1425 def test_running(self): 1426 self.assertFalse(PENDING_FUTURE.running()) 1427 self.assertTrue(RUNNING_FUTURE.running()) 1428 self.assertFalse(CANCELLED_FUTURE.running()) 1429 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1430 self.assertFalse(EXCEPTION_FUTURE.running()) 1431 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1432 1433 def test_result_with_timeout(self): 1434 self.assertRaises(futures.TimeoutError, 1435 PENDING_FUTURE.result, timeout=0) 1436 self.assertRaises(futures.TimeoutError, 1437 RUNNING_FUTURE.result, timeout=0) 1438 self.assertRaises(futures.CancelledError, 1439 CANCELLED_FUTURE.result, timeout=0) 1440 self.assertRaises(futures.CancelledError, 1441 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1442 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1443 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1444 1445 def test_result_with_success(self): 1446 # TODO(brian@sweetapp.com): This test is timing dependent. 1447 def notification(): 1448 # Wait until the main thread is waiting for the result. 1449 time.sleep(1) 1450 f1.set_result(42) 1451 1452 f1 = create_future(state=PENDING) 1453 t = threading.Thread(target=notification) 1454 t.start() 1455 1456 self.assertEqual(f1.result(timeout=5), 42) 1457 t.join() 1458 1459 def test_result_with_cancel(self): 1460 # TODO(brian@sweetapp.com): This test is timing dependent. 1461 def notification(): 1462 # Wait until the main thread is waiting for the result. 1463 time.sleep(1) 1464 f1.cancel() 1465 1466 f1 = create_future(state=PENDING) 1467 t = threading.Thread(target=notification) 1468 t.start() 1469 1470 self.assertRaises(futures.CancelledError, 1471 f1.result, timeout=support.SHORT_TIMEOUT) 1472 t.join() 1473 1474 def test_exception_with_timeout(self): 1475 self.assertRaises(futures.TimeoutError, 1476 PENDING_FUTURE.exception, timeout=0) 1477 self.assertRaises(futures.TimeoutError, 1478 RUNNING_FUTURE.exception, timeout=0) 1479 self.assertRaises(futures.CancelledError, 1480 CANCELLED_FUTURE.exception, timeout=0) 1481 self.assertRaises(futures.CancelledError, 1482 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1483 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1484 OSError)) 1485 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1486 1487 def test_exception_with_success(self): 1488 def notification(): 1489 # Wait until the main thread is waiting for the exception. 1490 time.sleep(1) 1491 with f1._condition: 1492 f1._state = FINISHED 1493 f1._exception = OSError() 1494 f1._condition.notify_all() 1495 1496 f1 = create_future(state=PENDING) 1497 t = threading.Thread(target=notification) 1498 t.start() 1499 1500 self.assertTrue(isinstance(f1.exception(timeout=support.SHORT_TIMEOUT), OSError)) 1501 t.join() 1502 1503 def test_multiple_set_result(self): 1504 f = create_future(state=PENDING) 1505 f.set_result(1) 1506 1507 with self.assertRaisesRegex( 1508 futures.InvalidStateError, 1509 'FINISHED: <Future at 0x[0-9a-f]+ ' 1510 'state=finished returned int>' 1511 ): 1512 f.set_result(2) 1513 1514 self.assertTrue(f.done()) 1515 self.assertEqual(f.result(), 1) 1516 1517 def test_multiple_set_exception(self): 1518 f = create_future(state=PENDING) 1519 e = ValueError() 1520 f.set_exception(e) 1521 1522 with self.assertRaisesRegex( 1523 futures.InvalidStateError, 1524 'FINISHED: <Future at 0x[0-9a-f]+ ' 1525 'state=finished raised ValueError>' 1526 ): 1527 f.set_exception(Exception()) 1528 1529 self.assertEqual(f.exception(), e) 1530 1531 1532def setUpModule(): 1533 unittest.addModuleCleanup(multiprocessing.util._cleanup_tests) 1534 thread_info = threading_helper.threading_setup() 1535 unittest.addModuleCleanup(threading_helper.threading_cleanup, *thread_info) 1536 1537 1538if __name__ == "__main__": 1539 unittest.main() 1540