1import test.support 2 3# Skip tests if _multiprocessing wasn't built. 4test.support.import_module('_multiprocessing') 5# Skip tests if sem_open implementation is broken. 6test.support.skip_if_broken_multiprocessing_synchronize() 7 8from test.support.script_helper import assert_python_ok 9 10import contextlib 11import itertools 12import logging 13from logging.handlers import QueueHandler 14import os 15import queue 16import sys 17import threading 18import time 19import unittest 20import weakref 21from pickle import PicklingError 22 23from concurrent import futures 24from concurrent.futures._base import ( 25 PENDING, RUNNING, CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED, Future, 26 BrokenExecutor) 27from concurrent.futures.process import BrokenProcessPool 28from multiprocessing import get_context 29 30import multiprocessing.process 31import multiprocessing.util 32 33 34def create_future(state=PENDING, exception=None, result=None): 35 f = Future() 36 f._state = state 37 f._exception = exception 38 f._result = result 39 return f 40 41 42PENDING_FUTURE = create_future(state=PENDING) 43RUNNING_FUTURE = create_future(state=RUNNING) 44CANCELLED_FUTURE = create_future(state=CANCELLED) 45CANCELLED_AND_NOTIFIED_FUTURE = create_future(state=CANCELLED_AND_NOTIFIED) 46EXCEPTION_FUTURE = create_future(state=FINISHED, exception=OSError()) 47SUCCESSFUL_FUTURE = create_future(state=FINISHED, result=42) 48 49INITIALIZER_STATUS = 'uninitialized' 50 51 52def mul(x, y): 53 return x * y 54 55def capture(*args, **kwargs): 56 return args, kwargs 57 58def sleep_and_raise(t): 59 time.sleep(t) 60 raise Exception('this is an exception') 61 62def sleep_and_print(t, msg): 63 time.sleep(t) 64 print(msg) 65 sys.stdout.flush() 66 67def init(x): 68 global INITIALIZER_STATUS 69 INITIALIZER_STATUS = x 70 71def get_init_status(): 72 return INITIALIZER_STATUS 73 74def init_fail(log_queue=None): 75 if log_queue is not None: 76 logger = logging.getLogger('concurrent.futures') 77 logger.addHandler(QueueHandler(log_queue)) 78 logger.setLevel('CRITICAL') 79 logger.propagate = False 80 time.sleep(0.1) # let some futures be scheduled 81 raise ValueError('error in initializer') 82 83 84class MyObject(object): 85 def my_method(self): 86 pass 87 88 89class EventfulGCObj(): 90 def __init__(self, mgr): 91 self.event = mgr.Event() 92 93 def __del__(self): 94 self.event.set() 95 96 97def make_dummy_object(_): 98 return MyObject() 99 100 101class BaseTestCase(unittest.TestCase): 102 def setUp(self): 103 self._thread_key = test.support.threading_setup() 104 105 def tearDown(self): 106 test.support.reap_children() 107 test.support.threading_cleanup(*self._thread_key) 108 109 110class ExecutorMixin: 111 worker_count = 5 112 executor_kwargs = {} 113 114 def setUp(self): 115 super().setUp() 116 117 self.t1 = time.monotonic() 118 if hasattr(self, "ctx"): 119 self.executor = self.executor_type( 120 max_workers=self.worker_count, 121 mp_context=self.get_context(), 122 **self.executor_kwargs) 123 else: 124 self.executor = self.executor_type( 125 max_workers=self.worker_count, 126 **self.executor_kwargs) 127 self._prime_executor() 128 129 def tearDown(self): 130 self.executor.shutdown(wait=True) 131 self.executor = None 132 133 dt = time.monotonic() - self.t1 134 if test.support.verbose: 135 print("%.2fs" % dt, end=' ') 136 self.assertLess(dt, 300, "synchronization issue: test lasted too long") 137 138 super().tearDown() 139 140 def get_context(self): 141 return get_context(self.ctx) 142 143 def _prime_executor(self): 144 # Make sure that the executor is ready to do work before running the 145 # tests. This should reduce the probability of timeouts in the tests. 146 futures = [self.executor.submit(time.sleep, 0.1) 147 for _ in range(self.worker_count)] 148 for f in futures: 149 f.result() 150 151 152class ThreadPoolMixin(ExecutorMixin): 153 executor_type = futures.ThreadPoolExecutor 154 155 156class ProcessPoolForkMixin(ExecutorMixin): 157 executor_type = futures.ProcessPoolExecutor 158 ctx = "fork" 159 160 def get_context(self): 161 if sys.platform == "win32": 162 self.skipTest("require unix system") 163 return super().get_context() 164 165 166class ProcessPoolSpawnMixin(ExecutorMixin): 167 executor_type = futures.ProcessPoolExecutor 168 ctx = "spawn" 169 170 171class ProcessPoolForkserverMixin(ExecutorMixin): 172 executor_type = futures.ProcessPoolExecutor 173 ctx = "forkserver" 174 175 def get_context(self): 176 if sys.platform == "win32": 177 self.skipTest("require unix system") 178 return super().get_context() 179 180 181def create_executor_tests(mixin, bases=(BaseTestCase,), 182 executor_mixins=(ThreadPoolMixin, 183 ProcessPoolForkMixin, 184 ProcessPoolForkserverMixin, 185 ProcessPoolSpawnMixin)): 186 def strip_mixin(name): 187 if name.endswith(('Mixin', 'Tests')): 188 return name[:-5] 189 elif name.endswith('Test'): 190 return name[:-4] 191 else: 192 return name 193 194 for exe in executor_mixins: 195 name = ("%s%sTest" 196 % (strip_mixin(exe.__name__), strip_mixin(mixin.__name__))) 197 cls = type(name, (mixin,) + (exe,) + bases, {}) 198 globals()[name] = cls 199 200 201class InitializerMixin(ExecutorMixin): 202 worker_count = 2 203 204 def setUp(self): 205 global INITIALIZER_STATUS 206 INITIALIZER_STATUS = 'uninitialized' 207 self.executor_kwargs = dict(initializer=init, 208 initargs=('initialized',)) 209 super().setUp() 210 211 def test_initializer(self): 212 futures = [self.executor.submit(get_init_status) 213 for _ in range(self.worker_count)] 214 215 for f in futures: 216 self.assertEqual(f.result(), 'initialized') 217 218 219class FailingInitializerMixin(ExecutorMixin): 220 worker_count = 2 221 222 def setUp(self): 223 if hasattr(self, "ctx"): 224 # Pass a queue to redirect the child's logging output 225 self.mp_context = self.get_context() 226 self.log_queue = self.mp_context.Queue() 227 self.executor_kwargs = dict(initializer=init_fail, 228 initargs=(self.log_queue,)) 229 else: 230 # In a thread pool, the child shares our logging setup 231 # (see _assert_logged()) 232 self.mp_context = None 233 self.log_queue = None 234 self.executor_kwargs = dict(initializer=init_fail) 235 super().setUp() 236 237 def test_initializer(self): 238 with self._assert_logged('ValueError: error in initializer'): 239 try: 240 future = self.executor.submit(get_init_status) 241 except BrokenExecutor: 242 # Perhaps the executor is already broken 243 pass 244 else: 245 with self.assertRaises(BrokenExecutor): 246 future.result() 247 # At some point, the executor should break 248 t1 = time.monotonic() 249 while not self.executor._broken: 250 if time.monotonic() - t1 > 5: 251 self.fail("executor not broken after 5 s.") 252 time.sleep(0.01) 253 # ... and from this point submit() is guaranteed to fail 254 with self.assertRaises(BrokenExecutor): 255 self.executor.submit(get_init_status) 256 257 def _prime_executor(self): 258 pass 259 260 @contextlib.contextmanager 261 def _assert_logged(self, msg): 262 if self.log_queue is not None: 263 yield 264 output = [] 265 try: 266 while True: 267 output.append(self.log_queue.get_nowait().getMessage()) 268 except queue.Empty: 269 pass 270 else: 271 with self.assertLogs('concurrent.futures', 'CRITICAL') as cm: 272 yield 273 output = cm.output 274 self.assertTrue(any(msg in line for line in output), 275 output) 276 277 278create_executor_tests(InitializerMixin) 279create_executor_tests(FailingInitializerMixin) 280 281 282class ExecutorShutdownTest: 283 def test_run_after_shutdown(self): 284 self.executor.shutdown() 285 self.assertRaises(RuntimeError, 286 self.executor.submit, 287 pow, 2, 5) 288 289 def test_interpreter_shutdown(self): 290 # Test the atexit hook for shutdown of worker threads and processes 291 rc, out, err = assert_python_ok('-c', """if 1: 292 from concurrent.futures import {executor_type} 293 from time import sleep 294 from test.test_concurrent_futures import sleep_and_print 295 if __name__ == "__main__": 296 context = '{context}' 297 if context == "": 298 t = {executor_type}(5) 299 else: 300 from multiprocessing import get_context 301 context = get_context(context) 302 t = {executor_type}(5, mp_context=context) 303 t.submit(sleep_and_print, 1.0, "apple") 304 """.format(executor_type=self.executor_type.__name__, 305 context=getattr(self, "ctx", ""))) 306 # Errors in atexit hooks don't change the process exit code, check 307 # stderr manually. 308 self.assertFalse(err) 309 self.assertEqual(out.strip(), b"apple") 310 311 def test_submit_after_interpreter_shutdown(self): 312 # Test the atexit hook for shutdown of worker threads and processes 313 rc, out, err = assert_python_ok('-c', """if 1: 314 import atexit 315 @atexit.register 316 def run_last(): 317 try: 318 t.submit(id, None) 319 except RuntimeError: 320 print("runtime-error") 321 raise 322 from concurrent.futures import {executor_type} 323 if __name__ == "__main__": 324 context = '{context}' 325 if not context: 326 t = {executor_type}(5) 327 else: 328 from multiprocessing import get_context 329 context = get_context(context) 330 t = {executor_type}(5, mp_context=context) 331 t.submit(id, 42).result() 332 """.format(executor_type=self.executor_type.__name__, 333 context=getattr(self, "ctx", ""))) 334 # Errors in atexit hooks don't change the process exit code, check 335 # stderr manually. 336 self.assertIn("RuntimeError: cannot schedule new futures", err.decode()) 337 self.assertEqual(out.strip(), b"runtime-error") 338 339 def test_hang_issue12364(self): 340 fs = [self.executor.submit(time.sleep, 0.1) for _ in range(50)] 341 self.executor.shutdown() 342 for f in fs: 343 f.result() 344 345 346class ThreadPoolShutdownTest(ThreadPoolMixin, ExecutorShutdownTest, BaseTestCase): 347 def _prime_executor(self): 348 pass 349 350 def test_threads_terminate(self): 351 def acquire_lock(lock): 352 lock.acquire() 353 354 sem = threading.Semaphore(0) 355 for i in range(3): 356 self.executor.submit(acquire_lock, sem) 357 self.assertEqual(len(self.executor._threads), 3) 358 for i in range(3): 359 sem.release() 360 self.executor.shutdown() 361 for t in self.executor._threads: 362 t.join() 363 364 def test_context_manager_shutdown(self): 365 with futures.ThreadPoolExecutor(max_workers=5) as e: 366 executor = e 367 self.assertEqual(list(e.map(abs, range(-5, 5))), 368 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 369 370 for t in executor._threads: 371 t.join() 372 373 def test_del_shutdown(self): 374 executor = futures.ThreadPoolExecutor(max_workers=5) 375 executor.map(abs, range(-5, 5)) 376 threads = executor._threads 377 del executor 378 379 for t in threads: 380 t.join() 381 382 def test_thread_names_assigned(self): 383 executor = futures.ThreadPoolExecutor( 384 max_workers=5, thread_name_prefix='SpecialPool') 385 executor.map(abs, range(-5, 5)) 386 threads = executor._threads 387 del executor 388 389 for t in threads: 390 self.assertRegex(t.name, r'^SpecialPool_[0-4]$') 391 t.join() 392 393 def test_thread_names_default(self): 394 executor = futures.ThreadPoolExecutor(max_workers=5) 395 executor.map(abs, range(-5, 5)) 396 threads = executor._threads 397 del executor 398 399 for t in threads: 400 # Ensure that our default name is reasonably sane and unique when 401 # no thread_name_prefix was supplied. 402 self.assertRegex(t.name, r'ThreadPoolExecutor-\d+_[0-4]$') 403 t.join() 404 405 406class ProcessPoolShutdownTest(ExecutorShutdownTest): 407 def _prime_executor(self): 408 pass 409 410 def test_processes_terminate(self): 411 self.executor.submit(mul, 21, 2) 412 self.executor.submit(mul, 6, 7) 413 self.executor.submit(mul, 3, 14) 414 self.assertEqual(len(self.executor._processes), 5) 415 processes = self.executor._processes 416 self.executor.shutdown() 417 418 for p in processes.values(): 419 p.join() 420 421 def test_context_manager_shutdown(self): 422 with futures.ProcessPoolExecutor(max_workers=5) as e: 423 processes = e._processes 424 self.assertEqual(list(e.map(abs, range(-5, 5))), 425 [5, 4, 3, 2, 1, 0, 1, 2, 3, 4]) 426 427 for p in processes.values(): 428 p.join() 429 430 def test_del_shutdown(self): 431 executor = futures.ProcessPoolExecutor(max_workers=5) 432 list(executor.map(abs, range(-5, 5))) 433 queue_management_thread = executor._queue_management_thread 434 processes = executor._processes 435 call_queue = executor._call_queue 436 queue_management_thread = executor._queue_management_thread 437 del executor 438 439 # Make sure that all the executor resources were properly cleaned by 440 # the shutdown process 441 queue_management_thread.join() 442 for p in processes.values(): 443 p.join() 444 call_queue.join_thread() 445 446 447create_executor_tests(ProcessPoolShutdownTest, 448 executor_mixins=(ProcessPoolForkMixin, 449 ProcessPoolForkserverMixin, 450 ProcessPoolSpawnMixin)) 451 452 453class WaitTests: 454 455 def test_first_completed(self): 456 future1 = self.executor.submit(mul, 21, 2) 457 future2 = self.executor.submit(time.sleep, 1.5) 458 459 done, not_done = futures.wait( 460 [CANCELLED_FUTURE, future1, future2], 461 return_when=futures.FIRST_COMPLETED) 462 463 self.assertEqual(set([future1]), done) 464 self.assertEqual(set([CANCELLED_FUTURE, future2]), not_done) 465 466 def test_first_completed_some_already_completed(self): 467 future1 = self.executor.submit(time.sleep, 1.5) 468 469 finished, pending = futures.wait( 470 [CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE, future1], 471 return_when=futures.FIRST_COMPLETED) 472 473 self.assertEqual( 474 set([CANCELLED_AND_NOTIFIED_FUTURE, SUCCESSFUL_FUTURE]), 475 finished) 476 self.assertEqual(set([future1]), pending) 477 478 def test_first_exception(self): 479 future1 = self.executor.submit(mul, 2, 21) 480 future2 = self.executor.submit(sleep_and_raise, 1.5) 481 future3 = self.executor.submit(time.sleep, 3) 482 483 finished, pending = futures.wait( 484 [future1, future2, future3], 485 return_when=futures.FIRST_EXCEPTION) 486 487 self.assertEqual(set([future1, future2]), finished) 488 self.assertEqual(set([future3]), pending) 489 490 def test_first_exception_some_already_complete(self): 491 future1 = self.executor.submit(divmod, 21, 0) 492 future2 = self.executor.submit(time.sleep, 1.5) 493 494 finished, pending = futures.wait( 495 [SUCCESSFUL_FUTURE, 496 CANCELLED_FUTURE, 497 CANCELLED_AND_NOTIFIED_FUTURE, 498 future1, future2], 499 return_when=futures.FIRST_EXCEPTION) 500 501 self.assertEqual(set([SUCCESSFUL_FUTURE, 502 CANCELLED_AND_NOTIFIED_FUTURE, 503 future1]), finished) 504 self.assertEqual(set([CANCELLED_FUTURE, future2]), pending) 505 506 def test_first_exception_one_already_failed(self): 507 future1 = self.executor.submit(time.sleep, 2) 508 509 finished, pending = futures.wait( 510 [EXCEPTION_FUTURE, future1], 511 return_when=futures.FIRST_EXCEPTION) 512 513 self.assertEqual(set([EXCEPTION_FUTURE]), finished) 514 self.assertEqual(set([future1]), pending) 515 516 def test_all_completed(self): 517 future1 = self.executor.submit(divmod, 2, 0) 518 future2 = self.executor.submit(mul, 2, 21) 519 520 finished, pending = futures.wait( 521 [SUCCESSFUL_FUTURE, 522 CANCELLED_AND_NOTIFIED_FUTURE, 523 EXCEPTION_FUTURE, 524 future1, 525 future2], 526 return_when=futures.ALL_COMPLETED) 527 528 self.assertEqual(set([SUCCESSFUL_FUTURE, 529 CANCELLED_AND_NOTIFIED_FUTURE, 530 EXCEPTION_FUTURE, 531 future1, 532 future2]), finished) 533 self.assertEqual(set(), pending) 534 535 def test_timeout(self): 536 future1 = self.executor.submit(mul, 6, 7) 537 future2 = self.executor.submit(time.sleep, 6) 538 539 finished, pending = futures.wait( 540 [CANCELLED_AND_NOTIFIED_FUTURE, 541 EXCEPTION_FUTURE, 542 SUCCESSFUL_FUTURE, 543 future1, future2], 544 timeout=5, 545 return_when=futures.ALL_COMPLETED) 546 547 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 548 EXCEPTION_FUTURE, 549 SUCCESSFUL_FUTURE, 550 future1]), finished) 551 self.assertEqual(set([future2]), pending) 552 553 554class ThreadPoolWaitTests(ThreadPoolMixin, WaitTests, BaseTestCase): 555 556 def test_pending_calls_race(self): 557 # Issue #14406: multi-threaded race condition when waiting on all 558 # futures. 559 event = threading.Event() 560 def future_func(): 561 event.wait() 562 oldswitchinterval = sys.getswitchinterval() 563 sys.setswitchinterval(1e-6) 564 try: 565 fs = {self.executor.submit(future_func) for i in range(100)} 566 event.set() 567 futures.wait(fs, return_when=futures.ALL_COMPLETED) 568 finally: 569 sys.setswitchinterval(oldswitchinterval) 570 571 572create_executor_tests(WaitTests, 573 executor_mixins=(ProcessPoolForkMixin, 574 ProcessPoolForkserverMixin, 575 ProcessPoolSpawnMixin)) 576 577 578class AsCompletedTests: 579 # TODO(brian@sweetapp.com): Should have a test with a non-zero timeout. 580 def test_no_timeout(self): 581 future1 = self.executor.submit(mul, 2, 21) 582 future2 = self.executor.submit(mul, 7, 6) 583 584 completed = set(futures.as_completed( 585 [CANCELLED_AND_NOTIFIED_FUTURE, 586 EXCEPTION_FUTURE, 587 SUCCESSFUL_FUTURE, 588 future1, future2])) 589 self.assertEqual(set( 590 [CANCELLED_AND_NOTIFIED_FUTURE, 591 EXCEPTION_FUTURE, 592 SUCCESSFUL_FUTURE, 593 future1, future2]), 594 completed) 595 596 def test_zero_timeout(self): 597 future1 = self.executor.submit(time.sleep, 2) 598 completed_futures = set() 599 try: 600 for future in futures.as_completed( 601 [CANCELLED_AND_NOTIFIED_FUTURE, 602 EXCEPTION_FUTURE, 603 SUCCESSFUL_FUTURE, 604 future1], 605 timeout=0): 606 completed_futures.add(future) 607 except futures.TimeoutError: 608 pass 609 610 self.assertEqual(set([CANCELLED_AND_NOTIFIED_FUTURE, 611 EXCEPTION_FUTURE, 612 SUCCESSFUL_FUTURE]), 613 completed_futures) 614 615 def test_duplicate_futures(self): 616 # Issue 20367. Duplicate futures should not raise exceptions or give 617 # duplicate responses. 618 # Issue #31641: accept arbitrary iterables. 619 future1 = self.executor.submit(time.sleep, 2) 620 completed = [ 621 f for f in futures.as_completed(itertools.repeat(future1, 3)) 622 ] 623 self.assertEqual(len(completed), 1) 624 625 def test_free_reference_yielded_future(self): 626 # Issue #14406: Generator should not keep references 627 # to finished futures. 628 futures_list = [Future() for _ in range(8)] 629 futures_list.append(create_future(state=CANCELLED_AND_NOTIFIED)) 630 futures_list.append(create_future(state=FINISHED, result=42)) 631 632 with self.assertRaises(futures.TimeoutError): 633 for future in futures.as_completed(futures_list, timeout=0): 634 futures_list.remove(future) 635 wr = weakref.ref(future) 636 del future 637 self.assertIsNone(wr()) 638 639 futures_list[0].set_result("test") 640 for future in futures.as_completed(futures_list): 641 futures_list.remove(future) 642 wr = weakref.ref(future) 643 del future 644 self.assertIsNone(wr()) 645 if futures_list: 646 futures_list[0].set_result("test") 647 648 def test_correct_timeout_exception_msg(self): 649 futures_list = [CANCELLED_AND_NOTIFIED_FUTURE, PENDING_FUTURE, 650 RUNNING_FUTURE, SUCCESSFUL_FUTURE] 651 652 with self.assertRaises(futures.TimeoutError) as cm: 653 list(futures.as_completed(futures_list, timeout=0)) 654 655 self.assertEqual(str(cm.exception), '2 (of 4) futures unfinished') 656 657 658create_executor_tests(AsCompletedTests) 659 660 661class ExecutorTest: 662 # Executor.shutdown() and context manager usage is tested by 663 # ExecutorShutdownTest. 664 def test_submit(self): 665 future = self.executor.submit(pow, 2, 8) 666 self.assertEqual(256, future.result()) 667 668 def test_submit_keyword(self): 669 future = self.executor.submit(mul, 2, y=8) 670 self.assertEqual(16, future.result()) 671 future = self.executor.submit(capture, 1, self=2, fn=3) 672 self.assertEqual(future.result(), ((1,), {'self': 2, 'fn': 3})) 673 with self.assertWarns(DeprecationWarning): 674 future = self.executor.submit(fn=capture, arg=1) 675 self.assertEqual(future.result(), ((), {'arg': 1})) 676 with self.assertRaises(TypeError): 677 self.executor.submit(arg=1) 678 679 def test_map(self): 680 self.assertEqual( 681 list(self.executor.map(pow, range(10), range(10))), 682 list(map(pow, range(10), range(10)))) 683 684 self.assertEqual( 685 list(self.executor.map(pow, range(10), range(10), chunksize=3)), 686 list(map(pow, range(10), range(10)))) 687 688 def test_map_exception(self): 689 i = self.executor.map(divmod, [1, 1, 1, 1], [2, 3, 0, 5]) 690 self.assertEqual(i.__next__(), (0, 1)) 691 self.assertEqual(i.__next__(), (0, 1)) 692 self.assertRaises(ZeroDivisionError, i.__next__) 693 694 def test_map_timeout(self): 695 results = [] 696 try: 697 for i in self.executor.map(time.sleep, 698 [0, 0, 6], 699 timeout=5): 700 results.append(i) 701 except futures.TimeoutError: 702 pass 703 else: 704 self.fail('expected TimeoutError') 705 706 self.assertEqual([None, None], results) 707 708 def test_shutdown_race_issue12456(self): 709 # Issue #12456: race condition at shutdown where trying to post a 710 # sentinel in the call queue blocks (the queue is full while processes 711 # have exited). 712 self.executor.map(str, [2] * (self.worker_count + 1)) 713 self.executor.shutdown() 714 715 @test.support.cpython_only 716 def test_no_stale_references(self): 717 # Issue #16284: check that the executors don't unnecessarily hang onto 718 # references. 719 my_object = MyObject() 720 my_object_collected = threading.Event() 721 my_object_callback = weakref.ref( 722 my_object, lambda obj: my_object_collected.set()) 723 # Deliberately discarding the future. 724 self.executor.submit(my_object.my_method) 725 del my_object 726 727 collected = my_object_collected.wait(timeout=5.0) 728 self.assertTrue(collected, 729 "Stale reference not collected within timeout.") 730 731 def test_max_workers_negative(self): 732 for number in (0, -1): 733 with self.assertRaisesRegex(ValueError, 734 "max_workers must be greater " 735 "than 0"): 736 self.executor_type(max_workers=number) 737 738 def test_free_reference(self): 739 # Issue #14406: Result iterator should not keep an internal 740 # reference to result objects. 741 for obj in self.executor.map(make_dummy_object, range(10)): 742 wr = weakref.ref(obj) 743 del obj 744 self.assertIsNone(wr()) 745 746 747class ThreadPoolExecutorTest(ThreadPoolMixin, ExecutorTest, BaseTestCase): 748 def test_map_submits_without_iteration(self): 749 """Tests verifying issue 11777.""" 750 finished = [] 751 def record_finished(n): 752 finished.append(n) 753 754 self.executor.map(record_finished, range(10)) 755 self.executor.shutdown(wait=True) 756 self.assertCountEqual(finished, range(10)) 757 758 def test_default_workers(self): 759 executor = self.executor_type() 760 expected = min(32, (os.cpu_count() or 1) + 4) 761 self.assertEqual(executor._max_workers, expected) 762 763 def test_saturation(self): 764 executor = self.executor_type(4) 765 def acquire_lock(lock): 766 lock.acquire() 767 768 sem = threading.Semaphore(0) 769 for i in range(15 * executor._max_workers): 770 executor.submit(acquire_lock, sem) 771 self.assertEqual(len(executor._threads), executor._max_workers) 772 for i in range(15 * executor._max_workers): 773 sem.release() 774 executor.shutdown(wait=True) 775 776 def test_idle_thread_reuse(self): 777 executor = self.executor_type() 778 executor.submit(mul, 21, 2).result() 779 executor.submit(mul, 6, 7).result() 780 executor.submit(mul, 3, 14).result() 781 self.assertEqual(len(executor._threads), 1) 782 executor.shutdown(wait=True) 783 784 785class ProcessPoolExecutorTest(ExecutorTest): 786 787 @unittest.skipUnless(sys.platform=='win32', 'Windows-only process limit') 788 def test_max_workers_too_large(self): 789 with self.assertRaisesRegex(ValueError, 790 "max_workers must be <= 61"): 791 futures.ProcessPoolExecutor(max_workers=62) 792 793 def test_killed_child(self): 794 # When a child process is abruptly terminated, the whole pool gets 795 # "broken". 796 futures = [self.executor.submit(time.sleep, 3)] 797 # Get one of the processes, and terminate (kill) it 798 p = next(iter(self.executor._processes.values())) 799 p.terminate() 800 for fut in futures: 801 self.assertRaises(BrokenProcessPool, fut.result) 802 # Submitting other jobs fails as well. 803 self.assertRaises(BrokenProcessPool, self.executor.submit, pow, 2, 8) 804 805 def test_map_chunksize(self): 806 def bad_map(): 807 list(self.executor.map(pow, range(40), range(40), chunksize=-1)) 808 809 ref = list(map(pow, range(40), range(40))) 810 self.assertEqual( 811 list(self.executor.map(pow, range(40), range(40), chunksize=6)), 812 ref) 813 self.assertEqual( 814 list(self.executor.map(pow, range(40), range(40), chunksize=50)), 815 ref) 816 self.assertEqual( 817 list(self.executor.map(pow, range(40), range(40), chunksize=40)), 818 ref) 819 self.assertRaises(ValueError, bad_map) 820 821 @classmethod 822 def _test_traceback(cls): 823 raise RuntimeError(123) # some comment 824 825 def test_traceback(self): 826 # We want ensure that the traceback from the child process is 827 # contained in the traceback raised in the main process. 828 future = self.executor.submit(self._test_traceback) 829 with self.assertRaises(Exception) as cm: 830 future.result() 831 832 exc = cm.exception 833 self.assertIs(type(exc), RuntimeError) 834 self.assertEqual(exc.args, (123,)) 835 cause = exc.__cause__ 836 self.assertIs(type(cause), futures.process._RemoteTraceback) 837 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 838 839 with test.support.captured_stderr() as f1: 840 try: 841 raise exc 842 except RuntimeError: 843 sys.excepthook(*sys.exc_info()) 844 self.assertIn('raise RuntimeError(123) # some comment', 845 f1.getvalue()) 846 847 def test_ressources_gced_in_workers(self): 848 # Ensure that argument for a job are correctly gc-ed after the job 849 # is finished 850 mgr = get_context(self.ctx).Manager() 851 obj = EventfulGCObj(mgr) 852 future = self.executor.submit(id, obj) 853 future.result() 854 855 self.assertTrue(obj.event.wait(timeout=1)) 856 857 # explicitly destroy the object to ensure that EventfulGCObj.__del__() 858 # is called while manager is still running. 859 obj = None 860 test.support.gc_collect() 861 862 mgr.shutdown() 863 mgr.join() 864 865 866create_executor_tests(ProcessPoolExecutorTest, 867 executor_mixins=(ProcessPoolForkMixin, 868 ProcessPoolForkserverMixin, 869 ProcessPoolSpawnMixin)) 870 871def hide_process_stderr(): 872 import io 873 sys.stderr = io.StringIO() 874 875 876def _crash(delay=None): 877 """Induces a segfault.""" 878 if delay: 879 time.sleep(delay) 880 import faulthandler 881 faulthandler.disable() 882 faulthandler._sigsegv() 883 884 885def _exit(): 886 """Induces a sys exit with exitcode 1.""" 887 sys.exit(1) 888 889 890def _raise_error(Err): 891 """Function that raises an Exception in process.""" 892 hide_process_stderr() 893 raise Err() 894 895 896def _return_instance(cls): 897 """Function that returns a instance of cls.""" 898 hide_process_stderr() 899 return cls() 900 901 902class CrashAtPickle(object): 903 """Bad object that triggers a segfault at pickling time.""" 904 def __reduce__(self): 905 _crash() 906 907 908class CrashAtUnpickle(object): 909 """Bad object that triggers a segfault at unpickling time.""" 910 def __reduce__(self): 911 return _crash, () 912 913 914class ExitAtPickle(object): 915 """Bad object that triggers a process exit at pickling time.""" 916 def __reduce__(self): 917 _exit() 918 919 920class ExitAtUnpickle(object): 921 """Bad object that triggers a process exit at unpickling time.""" 922 def __reduce__(self): 923 return _exit, () 924 925 926class ErrorAtPickle(object): 927 """Bad object that triggers an error at pickling time.""" 928 def __reduce__(self): 929 from pickle import PicklingError 930 raise PicklingError("Error in pickle") 931 932 933class ErrorAtUnpickle(object): 934 """Bad object that triggers an error at unpickling time.""" 935 def __reduce__(self): 936 from pickle import UnpicklingError 937 return _raise_error, (UnpicklingError, ) 938 939 940class ExecutorDeadlockTest: 941 TIMEOUT = 15 942 943 @classmethod 944 def _sleep_id(cls, x, delay): 945 time.sleep(delay) 946 return x 947 948 def _fail_on_deadlock(self, executor): 949 # If we did not recover before TIMEOUT seconds, consider that the 950 # executor is in a deadlock state and forcefully clean all its 951 # composants. 952 import faulthandler 953 from tempfile import TemporaryFile 954 with TemporaryFile(mode="w+") as f: 955 faulthandler.dump_traceback(file=f) 956 f.seek(0) 957 tb = f.read() 958 for p in executor._processes.values(): 959 p.terminate() 960 # This should be safe to call executor.shutdown here as all possible 961 # deadlocks should have been broken. 962 executor.shutdown(wait=True) 963 print(f"\nTraceback:\n {tb}", file=sys.__stderr__) 964 self.fail(f"Executor deadlock:\n\n{tb}") 965 966 967 def test_crash(self): 968 # extensive testing for deadlock caused by crashes in a pool. 969 self.executor.shutdown(wait=True) 970 crash_cases = [ 971 # Check problem occurring while pickling a task in 972 # the task_handler thread 973 (id, (ErrorAtPickle(),), PicklingError, "error at task pickle"), 974 # Check problem occurring while unpickling a task on workers 975 (id, (ExitAtUnpickle(),), BrokenProcessPool, 976 "exit at task unpickle"), 977 (id, (ErrorAtUnpickle(),), BrokenProcessPool, 978 "error at task unpickle"), 979 (id, (CrashAtUnpickle(),), BrokenProcessPool, 980 "crash at task unpickle"), 981 # Check problem occurring during func execution on workers 982 (_crash, (), BrokenProcessPool, 983 "crash during func execution on worker"), 984 (_exit, (), SystemExit, 985 "exit during func execution on worker"), 986 (_raise_error, (RuntimeError, ), RuntimeError, 987 "error during func execution on worker"), 988 # Check problem occurring while pickling a task result 989 # on workers 990 (_return_instance, (CrashAtPickle,), BrokenProcessPool, 991 "crash during result pickle on worker"), 992 (_return_instance, (ExitAtPickle,), SystemExit, 993 "exit during result pickle on worker"), 994 (_return_instance, (ErrorAtPickle,), PicklingError, 995 "error during result pickle on worker"), 996 # Check problem occurring while unpickling a task in 997 # the result_handler thread 998 (_return_instance, (ErrorAtUnpickle,), BrokenProcessPool, 999 "error during result unpickle in result_handler"), 1000 (_return_instance, (ExitAtUnpickle,), BrokenProcessPool, 1001 "exit during result unpickle in result_handler") 1002 ] 1003 for func, args, error, name in crash_cases: 1004 with self.subTest(name): 1005 # The captured_stderr reduces the noise in the test report 1006 with test.support.captured_stderr(): 1007 executor = self.executor_type( 1008 max_workers=2, mp_context=get_context(self.ctx)) 1009 res = executor.submit(func, *args) 1010 with self.assertRaises(error): 1011 try: 1012 res.result(timeout=self.TIMEOUT) 1013 except futures.TimeoutError: 1014 # If we did not recover before TIMEOUT seconds, 1015 # consider that the executor is in a deadlock state 1016 self._fail_on_deadlock(executor) 1017 executor.shutdown(wait=True) 1018 1019 def test_shutdown_deadlock(self): 1020 # Test that the pool calling shutdown do not cause deadlock 1021 # if a worker fails after the shutdown call. 1022 self.executor.shutdown(wait=True) 1023 with self.executor_type(max_workers=2, 1024 mp_context=get_context(self.ctx)) as executor: 1025 self.executor = executor # Allow clean up in fail_on_deadlock 1026 f = executor.submit(_crash, delay=.1) 1027 executor.shutdown(wait=True) 1028 with self.assertRaises(BrokenProcessPool): 1029 f.result() 1030 1031 1032create_executor_tests(ExecutorDeadlockTest, 1033 executor_mixins=(ProcessPoolForkMixin, 1034 ProcessPoolForkserverMixin, 1035 ProcessPoolSpawnMixin)) 1036 1037 1038class FutureTests(BaseTestCase): 1039 def test_done_callback_with_result(self): 1040 callback_result = None 1041 def fn(callback_future): 1042 nonlocal callback_result 1043 callback_result = callback_future.result() 1044 1045 f = Future() 1046 f.add_done_callback(fn) 1047 f.set_result(5) 1048 self.assertEqual(5, callback_result) 1049 1050 def test_done_callback_with_exception(self): 1051 callback_exception = None 1052 def fn(callback_future): 1053 nonlocal callback_exception 1054 callback_exception = callback_future.exception() 1055 1056 f = Future() 1057 f.add_done_callback(fn) 1058 f.set_exception(Exception('test')) 1059 self.assertEqual(('test',), callback_exception.args) 1060 1061 def test_done_callback_with_cancel(self): 1062 was_cancelled = None 1063 def fn(callback_future): 1064 nonlocal was_cancelled 1065 was_cancelled = callback_future.cancelled() 1066 1067 f = Future() 1068 f.add_done_callback(fn) 1069 self.assertTrue(f.cancel()) 1070 self.assertTrue(was_cancelled) 1071 1072 def test_done_callback_raises(self): 1073 with test.support.captured_stderr() as stderr: 1074 raising_was_called = False 1075 fn_was_called = False 1076 1077 def raising_fn(callback_future): 1078 nonlocal raising_was_called 1079 raising_was_called = True 1080 raise Exception('doh!') 1081 1082 def fn(callback_future): 1083 nonlocal fn_was_called 1084 fn_was_called = True 1085 1086 f = Future() 1087 f.add_done_callback(raising_fn) 1088 f.add_done_callback(fn) 1089 f.set_result(5) 1090 self.assertTrue(raising_was_called) 1091 self.assertTrue(fn_was_called) 1092 self.assertIn('Exception: doh!', stderr.getvalue()) 1093 1094 def test_done_callback_already_successful(self): 1095 callback_result = None 1096 def fn(callback_future): 1097 nonlocal callback_result 1098 callback_result = callback_future.result() 1099 1100 f = Future() 1101 f.set_result(5) 1102 f.add_done_callback(fn) 1103 self.assertEqual(5, callback_result) 1104 1105 def test_done_callback_already_failed(self): 1106 callback_exception = None 1107 def fn(callback_future): 1108 nonlocal callback_exception 1109 callback_exception = callback_future.exception() 1110 1111 f = Future() 1112 f.set_exception(Exception('test')) 1113 f.add_done_callback(fn) 1114 self.assertEqual(('test',), callback_exception.args) 1115 1116 def test_done_callback_already_cancelled(self): 1117 was_cancelled = None 1118 def fn(callback_future): 1119 nonlocal was_cancelled 1120 was_cancelled = callback_future.cancelled() 1121 1122 f = Future() 1123 self.assertTrue(f.cancel()) 1124 f.add_done_callback(fn) 1125 self.assertTrue(was_cancelled) 1126 1127 def test_done_callback_raises_already_succeeded(self): 1128 with test.support.captured_stderr() as stderr: 1129 def raising_fn(callback_future): 1130 raise Exception('doh!') 1131 1132 f = Future() 1133 1134 # Set the result first to simulate a future that runs instantly, 1135 # effectively allowing the callback to be run immediately. 1136 f.set_result(5) 1137 f.add_done_callback(raising_fn) 1138 1139 self.assertIn('exception calling callback for', stderr.getvalue()) 1140 self.assertIn('doh!', stderr.getvalue()) 1141 1142 1143 def test_repr(self): 1144 self.assertRegex(repr(PENDING_FUTURE), 1145 '<Future at 0x[0-9a-f]+ state=pending>') 1146 self.assertRegex(repr(RUNNING_FUTURE), 1147 '<Future at 0x[0-9a-f]+ state=running>') 1148 self.assertRegex(repr(CANCELLED_FUTURE), 1149 '<Future at 0x[0-9a-f]+ state=cancelled>') 1150 self.assertRegex(repr(CANCELLED_AND_NOTIFIED_FUTURE), 1151 '<Future at 0x[0-9a-f]+ state=cancelled>') 1152 self.assertRegex( 1153 repr(EXCEPTION_FUTURE), 1154 '<Future at 0x[0-9a-f]+ state=finished raised OSError>') 1155 self.assertRegex( 1156 repr(SUCCESSFUL_FUTURE), 1157 '<Future at 0x[0-9a-f]+ state=finished returned int>') 1158 1159 1160 def test_cancel(self): 1161 f1 = create_future(state=PENDING) 1162 f2 = create_future(state=RUNNING) 1163 f3 = create_future(state=CANCELLED) 1164 f4 = create_future(state=CANCELLED_AND_NOTIFIED) 1165 f5 = create_future(state=FINISHED, exception=OSError()) 1166 f6 = create_future(state=FINISHED, result=5) 1167 1168 self.assertTrue(f1.cancel()) 1169 self.assertEqual(f1._state, CANCELLED) 1170 1171 self.assertFalse(f2.cancel()) 1172 self.assertEqual(f2._state, RUNNING) 1173 1174 self.assertTrue(f3.cancel()) 1175 self.assertEqual(f3._state, CANCELLED) 1176 1177 self.assertTrue(f4.cancel()) 1178 self.assertEqual(f4._state, CANCELLED_AND_NOTIFIED) 1179 1180 self.assertFalse(f5.cancel()) 1181 self.assertEqual(f5._state, FINISHED) 1182 1183 self.assertFalse(f6.cancel()) 1184 self.assertEqual(f6._state, FINISHED) 1185 1186 def test_cancelled(self): 1187 self.assertFalse(PENDING_FUTURE.cancelled()) 1188 self.assertFalse(RUNNING_FUTURE.cancelled()) 1189 self.assertTrue(CANCELLED_FUTURE.cancelled()) 1190 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.cancelled()) 1191 self.assertFalse(EXCEPTION_FUTURE.cancelled()) 1192 self.assertFalse(SUCCESSFUL_FUTURE.cancelled()) 1193 1194 def test_done(self): 1195 self.assertFalse(PENDING_FUTURE.done()) 1196 self.assertFalse(RUNNING_FUTURE.done()) 1197 self.assertTrue(CANCELLED_FUTURE.done()) 1198 self.assertTrue(CANCELLED_AND_NOTIFIED_FUTURE.done()) 1199 self.assertTrue(EXCEPTION_FUTURE.done()) 1200 self.assertTrue(SUCCESSFUL_FUTURE.done()) 1201 1202 def test_running(self): 1203 self.assertFalse(PENDING_FUTURE.running()) 1204 self.assertTrue(RUNNING_FUTURE.running()) 1205 self.assertFalse(CANCELLED_FUTURE.running()) 1206 self.assertFalse(CANCELLED_AND_NOTIFIED_FUTURE.running()) 1207 self.assertFalse(EXCEPTION_FUTURE.running()) 1208 self.assertFalse(SUCCESSFUL_FUTURE.running()) 1209 1210 def test_result_with_timeout(self): 1211 self.assertRaises(futures.TimeoutError, 1212 PENDING_FUTURE.result, timeout=0) 1213 self.assertRaises(futures.TimeoutError, 1214 RUNNING_FUTURE.result, timeout=0) 1215 self.assertRaises(futures.CancelledError, 1216 CANCELLED_FUTURE.result, timeout=0) 1217 self.assertRaises(futures.CancelledError, 1218 CANCELLED_AND_NOTIFIED_FUTURE.result, timeout=0) 1219 self.assertRaises(OSError, EXCEPTION_FUTURE.result, timeout=0) 1220 self.assertEqual(SUCCESSFUL_FUTURE.result(timeout=0), 42) 1221 1222 def test_result_with_success(self): 1223 # TODO(brian@sweetapp.com): This test is timing dependent. 1224 def notification(): 1225 # Wait until the main thread is waiting for the result. 1226 time.sleep(1) 1227 f1.set_result(42) 1228 1229 f1 = create_future(state=PENDING) 1230 t = threading.Thread(target=notification) 1231 t.start() 1232 1233 self.assertEqual(f1.result(timeout=5), 42) 1234 t.join() 1235 1236 def test_result_with_cancel(self): 1237 # TODO(brian@sweetapp.com): This test is timing dependent. 1238 def notification(): 1239 # Wait until the main thread is waiting for the result. 1240 time.sleep(1) 1241 f1.cancel() 1242 1243 f1 = create_future(state=PENDING) 1244 t = threading.Thread(target=notification) 1245 t.start() 1246 1247 self.assertRaises(futures.CancelledError, f1.result, timeout=5) 1248 t.join() 1249 1250 def test_exception_with_timeout(self): 1251 self.assertRaises(futures.TimeoutError, 1252 PENDING_FUTURE.exception, timeout=0) 1253 self.assertRaises(futures.TimeoutError, 1254 RUNNING_FUTURE.exception, timeout=0) 1255 self.assertRaises(futures.CancelledError, 1256 CANCELLED_FUTURE.exception, timeout=0) 1257 self.assertRaises(futures.CancelledError, 1258 CANCELLED_AND_NOTIFIED_FUTURE.exception, timeout=0) 1259 self.assertTrue(isinstance(EXCEPTION_FUTURE.exception(timeout=0), 1260 OSError)) 1261 self.assertEqual(SUCCESSFUL_FUTURE.exception(timeout=0), None) 1262 1263 def test_exception_with_success(self): 1264 def notification(): 1265 # Wait until the main thread is waiting for the exception. 1266 time.sleep(1) 1267 with f1._condition: 1268 f1._state = FINISHED 1269 f1._exception = OSError() 1270 f1._condition.notify_all() 1271 1272 f1 = create_future(state=PENDING) 1273 t = threading.Thread(target=notification) 1274 t.start() 1275 1276 self.assertTrue(isinstance(f1.exception(timeout=5), OSError)) 1277 t.join() 1278 1279 def test_multiple_set_result(self): 1280 f = create_future(state=PENDING) 1281 f.set_result(1) 1282 1283 with self.assertRaisesRegex( 1284 futures.InvalidStateError, 1285 'FINISHED: <Future at 0x[0-9a-f]+ ' 1286 'state=finished returned int>' 1287 ): 1288 f.set_result(2) 1289 1290 self.assertTrue(f.done()) 1291 self.assertEqual(f.result(), 1) 1292 1293 def test_multiple_set_exception(self): 1294 f = create_future(state=PENDING) 1295 e = ValueError() 1296 f.set_exception(e) 1297 1298 with self.assertRaisesRegex( 1299 futures.InvalidStateError, 1300 'FINISHED: <Future at 0x[0-9a-f]+ ' 1301 'state=finished raised ValueError>' 1302 ): 1303 f.set_exception(Exception()) 1304 1305 self.assertEqual(f.exception(), e) 1306 1307 1308_threads_key = None 1309 1310def setUpModule(): 1311 global _threads_key 1312 _threads_key = test.support.threading_setup() 1313 1314 1315def tearDownModule(): 1316 test.support.threading_cleanup(*_threads_key) 1317 multiprocessing.util._cleanup_tests() 1318 1319 1320if __name__ == "__main__": 1321 unittest.main() 1322