1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import unittest.mock 7import queue as pyqueue 8import time 9import io 10import itertools 11import sys 12import os 13import gc 14import errno 15import signal 16import array 17import socket 18import random 19import logging 20import subprocess 21import struct 22import operator 23import pickle 24import weakref 25import warnings 26import test.support 27import test.support.script_helper 28from test import support 29 30 31# Skip tests if _multiprocessing wasn't built. 32_multiprocessing = test.support.import_module('_multiprocessing') 33# Skip tests if sem_open implementation is broken. 34support.skip_if_broken_multiprocessing_synchronize() 35import threading 36 37import multiprocessing.connection 38import multiprocessing.dummy 39import multiprocessing.heap 40import multiprocessing.managers 41import multiprocessing.pool 42import multiprocessing.queues 43 44from multiprocessing import util 45 46try: 47 from multiprocessing import reduction 48 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 49except ImportError: 50 HAS_REDUCTION = False 51 52try: 53 from multiprocessing.sharedctypes import Value, copy 54 HAS_SHAREDCTYPES = True 55except ImportError: 56 HAS_SHAREDCTYPES = False 57 58try: 59 from multiprocessing import shared_memory 60 HAS_SHMEM = True 61except ImportError: 62 HAS_SHMEM = False 63 64try: 65 import msvcrt 66except ImportError: 67 msvcrt = None 68 69# 70# 71# 72 73# Timeout to wait until a process completes 74TIMEOUT = 60.0 # seconds 75 76def latin(s): 77 return s.encode('latin') 78 79 80def close_queue(queue): 81 if isinstance(queue, multiprocessing.queues.Queue): 82 queue.close() 83 queue.join_thread() 84 85 86def join_process(process): 87 # Since multiprocessing.Process has the same API than threading.Thread 88 # (join() and is_alive(), the support function can be reused 89 support.join_thread(process, timeout=TIMEOUT) 90 91 92if os.name == "posix": 93 from multiprocessing import resource_tracker 94 95 def _resource_unlink(name, rtype): 96 resource_tracker._CLEANUP_FUNCS[rtype](name) 97 98 99# 100# Constants 101# 102 103LOG_LEVEL = util.SUBWARNING 104#LOG_LEVEL = logging.DEBUG 105 106DELTA = 0.1 107CHECK_TIMINGS = False # making true makes tests take a lot longer 108 # and can sometimes cause some non-serious 109 # failures because some calls block a bit 110 # longer than expected 111if CHECK_TIMINGS: 112 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 113else: 114 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 115 116HAVE_GETVALUE = not getattr(_multiprocessing, 117 'HAVE_BROKEN_SEM_GETVALUE', False) 118 119WIN32 = (sys.platform == "win32") 120 121from multiprocessing.connection import wait 122 123def wait_for_handle(handle, timeout): 124 if timeout is not None and timeout < 0.0: 125 timeout = None 126 return wait([handle], timeout) 127 128try: 129 MAXFD = os.sysconf("SC_OPEN_MAX") 130except: 131 MAXFD = 256 132 133# To speed up tests when using the forkserver, we can preload these: 134PRELOAD = ['__main__', 'test.test_multiprocessing_forkserver'] 135 136# 137# Some tests require ctypes 138# 139 140try: 141 from ctypes import Structure, c_int, c_double, c_longlong 142except ImportError: 143 Structure = object 144 c_int = c_double = c_longlong = None 145 146 147def check_enough_semaphores(): 148 """Check that the system supports enough semaphores to run the test.""" 149 # minimum number of semaphores available according to POSIX 150 nsems_min = 256 151 try: 152 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 153 except (AttributeError, ValueError): 154 # sysconf not available or setting not available 155 return 156 if nsems == -1 or nsems >= nsems_min: 157 return 158 raise unittest.SkipTest("The OS doesn't support enough semaphores " 159 "to run the test (required: %d)." % nsems_min) 160 161 162# 163# Creates a wrapper for a function which records the time it takes to finish 164# 165 166class TimingWrapper(object): 167 168 def __init__(self, func): 169 self.func = func 170 self.elapsed = None 171 172 def __call__(self, *args, **kwds): 173 t = time.monotonic() 174 try: 175 return self.func(*args, **kwds) 176 finally: 177 self.elapsed = time.monotonic() - t 178 179# 180# Base class for test cases 181# 182 183class BaseTestCase(object): 184 185 ALLOWED_TYPES = ('processes', 'manager', 'threads') 186 187 def assertTimingAlmostEqual(self, a, b): 188 if CHECK_TIMINGS: 189 self.assertAlmostEqual(a, b, 1) 190 191 def assertReturnsIfImplemented(self, value, func, *args): 192 try: 193 res = func(*args) 194 except NotImplementedError: 195 pass 196 else: 197 return self.assertEqual(value, res) 198 199 # For the sanity of Windows users, rather than crashing or freezing in 200 # multiple ways. 201 def __reduce__(self, *args): 202 raise NotImplementedError("shouldn't try to pickle a test case") 203 204 __reduce_ex__ = __reduce__ 205 206# 207# Return the value of a semaphore 208# 209 210def get_value(self): 211 try: 212 return self.get_value() 213 except AttributeError: 214 try: 215 return self._Semaphore__value 216 except AttributeError: 217 try: 218 return self._value 219 except AttributeError: 220 raise NotImplementedError 221 222# 223# Testcases 224# 225 226class DummyCallable: 227 def __call__(self, q, c): 228 assert isinstance(c, DummyCallable) 229 q.put(5) 230 231 232class _TestProcess(BaseTestCase): 233 234 ALLOWED_TYPES = ('processes', 'threads') 235 236 def test_current(self): 237 if self.TYPE == 'threads': 238 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 239 240 current = self.current_process() 241 authkey = current.authkey 242 243 self.assertTrue(current.is_alive()) 244 self.assertTrue(not current.daemon) 245 self.assertIsInstance(authkey, bytes) 246 self.assertTrue(len(authkey) > 0) 247 self.assertEqual(current.ident, os.getpid()) 248 self.assertEqual(current.exitcode, None) 249 250 def test_daemon_argument(self): 251 if self.TYPE == "threads": 252 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 253 254 # By default uses the current process's daemon flag. 255 proc0 = self.Process(target=self._test) 256 self.assertEqual(proc0.daemon, self.current_process().daemon) 257 proc1 = self.Process(target=self._test, daemon=True) 258 self.assertTrue(proc1.daemon) 259 proc2 = self.Process(target=self._test, daemon=False) 260 self.assertFalse(proc2.daemon) 261 262 @classmethod 263 def _test(cls, q, *args, **kwds): 264 current = cls.current_process() 265 q.put(args) 266 q.put(kwds) 267 q.put(current.name) 268 if cls.TYPE != 'threads': 269 q.put(bytes(current.authkey)) 270 q.put(current.pid) 271 272 def test_parent_process_attributes(self): 273 if self.TYPE == "threads": 274 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 275 276 self.assertIsNone(self.parent_process()) 277 278 rconn, wconn = self.Pipe(duplex=False) 279 p = self.Process(target=self._test_send_parent_process, args=(wconn,)) 280 p.start() 281 p.join() 282 parent_pid, parent_name = rconn.recv() 283 self.assertEqual(parent_pid, self.current_process().pid) 284 self.assertEqual(parent_pid, os.getpid()) 285 self.assertEqual(parent_name, self.current_process().name) 286 287 @classmethod 288 def _test_send_parent_process(cls, wconn): 289 from multiprocessing.process import parent_process 290 wconn.send([parent_process().pid, parent_process().name]) 291 292 def test_parent_process(self): 293 if self.TYPE == "threads": 294 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 295 296 # Launch a child process. Make it launch a grandchild process. Kill the 297 # child process and make sure that the grandchild notices the death of 298 # its parent (a.k.a the child process). 299 rconn, wconn = self.Pipe(duplex=False) 300 p = self.Process( 301 target=self._test_create_grandchild_process, args=(wconn, )) 302 p.start() 303 304 if not rconn.poll(timeout=60): 305 raise AssertionError("Could not communicate with child process") 306 parent_process_status = rconn.recv() 307 self.assertEqual(parent_process_status, "alive") 308 309 p.terminate() 310 p.join() 311 312 if not rconn.poll(timeout=60): 313 raise AssertionError("Could not communicate with child process") 314 parent_process_status = rconn.recv() 315 self.assertEqual(parent_process_status, "not alive") 316 317 @classmethod 318 def _test_create_grandchild_process(cls, wconn): 319 p = cls.Process(target=cls._test_report_parent_status, args=(wconn, )) 320 p.start() 321 time.sleep(300) 322 323 @classmethod 324 def _test_report_parent_status(cls, wconn): 325 from multiprocessing.process import parent_process 326 wconn.send("alive" if parent_process().is_alive() else "not alive") 327 parent_process().join(timeout=5) 328 wconn.send("alive" if parent_process().is_alive() else "not alive") 329 330 def test_process(self): 331 q = self.Queue(1) 332 e = self.Event() 333 args = (q, 1, 2) 334 kwargs = {'hello':23, 'bye':2.54} 335 name = 'SomeProcess' 336 p = self.Process( 337 target=self._test, args=args, kwargs=kwargs, name=name 338 ) 339 p.daemon = True 340 current = self.current_process() 341 342 if self.TYPE != 'threads': 343 self.assertEqual(p.authkey, current.authkey) 344 self.assertEqual(p.is_alive(), False) 345 self.assertEqual(p.daemon, True) 346 self.assertNotIn(p, self.active_children()) 347 self.assertTrue(type(self.active_children()) is list) 348 self.assertEqual(p.exitcode, None) 349 350 p.start() 351 352 self.assertEqual(p.exitcode, None) 353 self.assertEqual(p.is_alive(), True) 354 self.assertIn(p, self.active_children()) 355 356 self.assertEqual(q.get(), args[1:]) 357 self.assertEqual(q.get(), kwargs) 358 self.assertEqual(q.get(), p.name) 359 if self.TYPE != 'threads': 360 self.assertEqual(q.get(), current.authkey) 361 self.assertEqual(q.get(), p.pid) 362 363 p.join() 364 365 self.assertEqual(p.exitcode, 0) 366 self.assertEqual(p.is_alive(), False) 367 self.assertNotIn(p, self.active_children()) 368 close_queue(q) 369 370 @unittest.skipUnless(threading._HAVE_THREAD_NATIVE_ID, "needs native_id") 371 def test_process_mainthread_native_id(self): 372 if self.TYPE == 'threads': 373 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 374 375 current_mainthread_native_id = threading.main_thread().native_id 376 377 q = self.Queue(1) 378 p = self.Process(target=self._test_process_mainthread_native_id, args=(q,)) 379 p.start() 380 381 child_mainthread_native_id = q.get() 382 p.join() 383 close_queue(q) 384 385 self.assertNotEqual(current_mainthread_native_id, child_mainthread_native_id) 386 387 @classmethod 388 def _test_process_mainthread_native_id(cls, q): 389 mainthread_native_id = threading.main_thread().native_id 390 q.put(mainthread_native_id) 391 392 @classmethod 393 def _sleep_some(cls): 394 time.sleep(100) 395 396 @classmethod 397 def _test_sleep(cls, delay): 398 time.sleep(delay) 399 400 def _kill_process(self, meth): 401 if self.TYPE == 'threads': 402 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 403 404 p = self.Process(target=self._sleep_some) 405 p.daemon = True 406 p.start() 407 408 self.assertEqual(p.is_alive(), True) 409 self.assertIn(p, self.active_children()) 410 self.assertEqual(p.exitcode, None) 411 412 join = TimingWrapper(p.join) 413 414 self.assertEqual(join(0), None) 415 self.assertTimingAlmostEqual(join.elapsed, 0.0) 416 self.assertEqual(p.is_alive(), True) 417 418 self.assertEqual(join(-1), None) 419 self.assertTimingAlmostEqual(join.elapsed, 0.0) 420 self.assertEqual(p.is_alive(), True) 421 422 # XXX maybe terminating too soon causes the problems on Gentoo... 423 time.sleep(1) 424 425 meth(p) 426 427 if hasattr(signal, 'alarm'): 428 # On the Gentoo buildbot waitpid() often seems to block forever. 429 # We use alarm() to interrupt it if it blocks for too long. 430 def handler(*args): 431 raise RuntimeError('join took too long: %s' % p) 432 old_handler = signal.signal(signal.SIGALRM, handler) 433 try: 434 signal.alarm(10) 435 self.assertEqual(join(), None) 436 finally: 437 signal.alarm(0) 438 signal.signal(signal.SIGALRM, old_handler) 439 else: 440 self.assertEqual(join(), None) 441 442 self.assertTimingAlmostEqual(join.elapsed, 0.0) 443 444 self.assertEqual(p.is_alive(), False) 445 self.assertNotIn(p, self.active_children()) 446 447 p.join() 448 449 return p.exitcode 450 451 def test_terminate(self): 452 exitcode = self._kill_process(multiprocessing.Process.terminate) 453 if os.name != 'nt': 454 self.assertEqual(exitcode, -signal.SIGTERM) 455 456 def test_kill(self): 457 exitcode = self._kill_process(multiprocessing.Process.kill) 458 if os.name != 'nt': 459 self.assertEqual(exitcode, -signal.SIGKILL) 460 461 def test_cpu_count(self): 462 try: 463 cpus = multiprocessing.cpu_count() 464 except NotImplementedError: 465 cpus = 1 466 self.assertTrue(type(cpus) is int) 467 self.assertTrue(cpus >= 1) 468 469 def test_active_children(self): 470 self.assertEqual(type(self.active_children()), list) 471 472 p = self.Process(target=time.sleep, args=(DELTA,)) 473 self.assertNotIn(p, self.active_children()) 474 475 p.daemon = True 476 p.start() 477 self.assertIn(p, self.active_children()) 478 479 p.join() 480 self.assertNotIn(p, self.active_children()) 481 482 @classmethod 483 def _test_recursion(cls, wconn, id): 484 wconn.send(id) 485 if len(id) < 2: 486 for i in range(2): 487 p = cls.Process( 488 target=cls._test_recursion, args=(wconn, id+[i]) 489 ) 490 p.start() 491 p.join() 492 493 def test_recursion(self): 494 rconn, wconn = self.Pipe(duplex=False) 495 self._test_recursion(wconn, []) 496 497 time.sleep(DELTA) 498 result = [] 499 while rconn.poll(): 500 result.append(rconn.recv()) 501 502 expected = [ 503 [], 504 [0], 505 [0, 0], 506 [0, 1], 507 [1], 508 [1, 0], 509 [1, 1] 510 ] 511 self.assertEqual(result, expected) 512 513 @classmethod 514 def _test_sentinel(cls, event): 515 event.wait(10.0) 516 517 def test_sentinel(self): 518 if self.TYPE == "threads": 519 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 520 event = self.Event() 521 p = self.Process(target=self._test_sentinel, args=(event,)) 522 with self.assertRaises(ValueError): 523 p.sentinel 524 p.start() 525 self.addCleanup(p.join) 526 sentinel = p.sentinel 527 self.assertIsInstance(sentinel, int) 528 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 529 event.set() 530 p.join() 531 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 532 533 @classmethod 534 def _test_close(cls, rc=0, q=None): 535 if q is not None: 536 q.get() 537 sys.exit(rc) 538 539 def test_close(self): 540 if self.TYPE == "threads": 541 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 542 q = self.Queue() 543 p = self.Process(target=self._test_close, kwargs={'q': q}) 544 p.daemon = True 545 p.start() 546 self.assertEqual(p.is_alive(), True) 547 # Child is still alive, cannot close 548 with self.assertRaises(ValueError): 549 p.close() 550 551 q.put(None) 552 p.join() 553 self.assertEqual(p.is_alive(), False) 554 self.assertEqual(p.exitcode, 0) 555 p.close() 556 with self.assertRaises(ValueError): 557 p.is_alive() 558 with self.assertRaises(ValueError): 559 p.join() 560 with self.assertRaises(ValueError): 561 p.terminate() 562 p.close() 563 564 wr = weakref.ref(p) 565 del p 566 gc.collect() 567 self.assertIs(wr(), None) 568 569 close_queue(q) 570 571 def test_many_processes(self): 572 if self.TYPE == 'threads': 573 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 574 575 sm = multiprocessing.get_start_method() 576 N = 5 if sm == 'spawn' else 100 577 578 # Try to overwhelm the forkserver loop with events 579 procs = [self.Process(target=self._test_sleep, args=(0.01,)) 580 for i in range(N)] 581 for p in procs: 582 p.start() 583 for p in procs: 584 join_process(p) 585 for p in procs: 586 self.assertEqual(p.exitcode, 0) 587 588 procs = [self.Process(target=self._sleep_some) 589 for i in range(N)] 590 for p in procs: 591 p.start() 592 time.sleep(0.001) # let the children start... 593 for p in procs: 594 p.terminate() 595 for p in procs: 596 join_process(p) 597 if os.name != 'nt': 598 exitcodes = [-signal.SIGTERM] 599 if sys.platform == 'darwin': 600 # bpo-31510: On macOS, killing a freshly started process with 601 # SIGTERM sometimes kills the process with SIGKILL. 602 exitcodes.append(-signal.SIGKILL) 603 for p in procs: 604 self.assertIn(p.exitcode, exitcodes) 605 606 def test_lose_target_ref(self): 607 c = DummyCallable() 608 wr = weakref.ref(c) 609 q = self.Queue() 610 p = self.Process(target=c, args=(q, c)) 611 del c 612 p.start() 613 p.join() 614 self.assertIs(wr(), None) 615 self.assertEqual(q.get(), 5) 616 close_queue(q) 617 618 @classmethod 619 def _test_child_fd_inflation(self, evt, q): 620 q.put(test.support.fd_count()) 621 evt.wait() 622 623 def test_child_fd_inflation(self): 624 # Number of fds in child processes should not grow with the 625 # number of running children. 626 if self.TYPE == 'threads': 627 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 628 629 sm = multiprocessing.get_start_method() 630 if sm == 'fork': 631 # The fork method by design inherits all fds from the parent, 632 # trying to go against it is a lost battle 633 self.skipTest('test not appropriate for {}'.format(sm)) 634 635 N = 5 636 evt = self.Event() 637 q = self.Queue() 638 639 procs = [self.Process(target=self._test_child_fd_inflation, args=(evt, q)) 640 for i in range(N)] 641 for p in procs: 642 p.start() 643 644 try: 645 fd_counts = [q.get() for i in range(N)] 646 self.assertEqual(len(set(fd_counts)), 1, fd_counts) 647 648 finally: 649 evt.set() 650 for p in procs: 651 p.join() 652 close_queue(q) 653 654 @classmethod 655 def _test_wait_for_threads(self, evt): 656 def func1(): 657 time.sleep(0.5) 658 evt.set() 659 660 def func2(): 661 time.sleep(20) 662 evt.clear() 663 664 threading.Thread(target=func1).start() 665 threading.Thread(target=func2, daemon=True).start() 666 667 def test_wait_for_threads(self): 668 # A child process should wait for non-daemonic threads to end 669 # before exiting 670 if self.TYPE == 'threads': 671 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 672 673 evt = self.Event() 674 proc = self.Process(target=self._test_wait_for_threads, args=(evt,)) 675 proc.start() 676 proc.join() 677 self.assertTrue(evt.is_set()) 678 679 @classmethod 680 def _test_error_on_stdio_flush(self, evt, break_std_streams={}): 681 for stream_name, action in break_std_streams.items(): 682 if action == 'close': 683 stream = io.StringIO() 684 stream.close() 685 else: 686 assert action == 'remove' 687 stream = None 688 setattr(sys, stream_name, None) 689 evt.set() 690 691 def test_error_on_stdio_flush_1(self): 692 # Check that Process works with broken standard streams 693 streams = [io.StringIO(), None] 694 streams[0].close() 695 for stream_name in ('stdout', 'stderr'): 696 for stream in streams: 697 old_stream = getattr(sys, stream_name) 698 setattr(sys, stream_name, stream) 699 try: 700 evt = self.Event() 701 proc = self.Process(target=self._test_error_on_stdio_flush, 702 args=(evt,)) 703 proc.start() 704 proc.join() 705 self.assertTrue(evt.is_set()) 706 self.assertEqual(proc.exitcode, 0) 707 finally: 708 setattr(sys, stream_name, old_stream) 709 710 def test_error_on_stdio_flush_2(self): 711 # Same as test_error_on_stdio_flush_1(), but standard streams are 712 # broken by the child process 713 for stream_name in ('stdout', 'stderr'): 714 for action in ('close', 'remove'): 715 old_stream = getattr(sys, stream_name) 716 try: 717 evt = self.Event() 718 proc = self.Process(target=self._test_error_on_stdio_flush, 719 args=(evt, {stream_name: action})) 720 proc.start() 721 proc.join() 722 self.assertTrue(evt.is_set()) 723 self.assertEqual(proc.exitcode, 0) 724 finally: 725 setattr(sys, stream_name, old_stream) 726 727 @classmethod 728 def _sleep_and_set_event(self, evt, delay=0.0): 729 time.sleep(delay) 730 evt.set() 731 732 def check_forkserver_death(self, signum): 733 # bpo-31308: if the forkserver process has died, we should still 734 # be able to create and run new Process instances (the forkserver 735 # is implicitly restarted). 736 if self.TYPE == 'threads': 737 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 738 sm = multiprocessing.get_start_method() 739 if sm != 'forkserver': 740 # The fork method by design inherits all fds from the parent, 741 # trying to go against it is a lost battle 742 self.skipTest('test not appropriate for {}'.format(sm)) 743 744 from multiprocessing.forkserver import _forkserver 745 _forkserver.ensure_running() 746 747 # First process sleeps 500 ms 748 delay = 0.5 749 750 evt = self.Event() 751 proc = self.Process(target=self._sleep_and_set_event, args=(evt, delay)) 752 proc.start() 753 754 pid = _forkserver._forkserver_pid 755 os.kill(pid, signum) 756 # give time to the fork server to die and time to proc to complete 757 time.sleep(delay * 2.0) 758 759 evt2 = self.Event() 760 proc2 = self.Process(target=self._sleep_and_set_event, args=(evt2,)) 761 proc2.start() 762 proc2.join() 763 self.assertTrue(evt2.is_set()) 764 self.assertEqual(proc2.exitcode, 0) 765 766 proc.join() 767 self.assertTrue(evt.is_set()) 768 self.assertIn(proc.exitcode, (0, 255)) 769 770 def test_forkserver_sigint(self): 771 # Catchable signal 772 self.check_forkserver_death(signal.SIGINT) 773 774 def test_forkserver_sigkill(self): 775 # Uncatchable signal 776 if os.name != 'nt': 777 self.check_forkserver_death(signal.SIGKILL) 778 779 780# 781# 782# 783 784class _UpperCaser(multiprocessing.Process): 785 786 def __init__(self): 787 multiprocessing.Process.__init__(self) 788 self.child_conn, self.parent_conn = multiprocessing.Pipe() 789 790 def run(self): 791 self.parent_conn.close() 792 for s in iter(self.child_conn.recv, None): 793 self.child_conn.send(s.upper()) 794 self.child_conn.close() 795 796 def submit(self, s): 797 assert type(s) is str 798 self.parent_conn.send(s) 799 return self.parent_conn.recv() 800 801 def stop(self): 802 self.parent_conn.send(None) 803 self.parent_conn.close() 804 self.child_conn.close() 805 806class _TestSubclassingProcess(BaseTestCase): 807 808 ALLOWED_TYPES = ('processes',) 809 810 def test_subclassing(self): 811 uppercaser = _UpperCaser() 812 uppercaser.daemon = True 813 uppercaser.start() 814 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 815 self.assertEqual(uppercaser.submit('world'), 'WORLD') 816 uppercaser.stop() 817 uppercaser.join() 818 819 def test_stderr_flush(self): 820 # sys.stderr is flushed at process shutdown (issue #13812) 821 if self.TYPE == "threads": 822 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 823 824 testfn = test.support.TESTFN 825 self.addCleanup(test.support.unlink, testfn) 826 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 827 proc.start() 828 proc.join() 829 with open(testfn, 'r') as f: 830 err = f.read() 831 # The whole traceback was printed 832 self.assertIn("ZeroDivisionError", err) 833 self.assertIn("test_multiprocessing.py", err) 834 self.assertIn("1/0 # MARKER", err) 835 836 @classmethod 837 def _test_stderr_flush(cls, testfn): 838 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 839 sys.stderr = open(fd, 'w', closefd=False) 840 1/0 # MARKER 841 842 843 @classmethod 844 def _test_sys_exit(cls, reason, testfn): 845 fd = os.open(testfn, os.O_WRONLY | os.O_CREAT | os.O_EXCL) 846 sys.stderr = open(fd, 'w', closefd=False) 847 sys.exit(reason) 848 849 def test_sys_exit(self): 850 # See Issue 13854 851 if self.TYPE == 'threads': 852 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 853 854 testfn = test.support.TESTFN 855 self.addCleanup(test.support.unlink, testfn) 856 857 for reason in ( 858 [1, 2, 3], 859 'ignore this', 860 ): 861 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 862 p.daemon = True 863 p.start() 864 join_process(p) 865 self.assertEqual(p.exitcode, 1) 866 867 with open(testfn, 'r') as f: 868 content = f.read() 869 self.assertEqual(content.rstrip(), str(reason)) 870 871 os.unlink(testfn) 872 873 for reason in (True, False, 8): 874 p = self.Process(target=sys.exit, args=(reason,)) 875 p.daemon = True 876 p.start() 877 join_process(p) 878 self.assertEqual(p.exitcode, reason) 879 880# 881# 882# 883 884def queue_empty(q): 885 if hasattr(q, 'empty'): 886 return q.empty() 887 else: 888 return q.qsize() == 0 889 890def queue_full(q, maxsize): 891 if hasattr(q, 'full'): 892 return q.full() 893 else: 894 return q.qsize() == maxsize 895 896 897class _TestQueue(BaseTestCase): 898 899 900 @classmethod 901 def _test_put(cls, queue, child_can_start, parent_can_continue): 902 child_can_start.wait() 903 for i in range(6): 904 queue.get() 905 parent_can_continue.set() 906 907 def test_put(self): 908 MAXSIZE = 6 909 queue = self.Queue(maxsize=MAXSIZE) 910 child_can_start = self.Event() 911 parent_can_continue = self.Event() 912 913 proc = self.Process( 914 target=self._test_put, 915 args=(queue, child_can_start, parent_can_continue) 916 ) 917 proc.daemon = True 918 proc.start() 919 920 self.assertEqual(queue_empty(queue), True) 921 self.assertEqual(queue_full(queue, MAXSIZE), False) 922 923 queue.put(1) 924 queue.put(2, True) 925 queue.put(3, True, None) 926 queue.put(4, False) 927 queue.put(5, False, None) 928 queue.put_nowait(6) 929 930 # the values may be in buffer but not yet in pipe so sleep a bit 931 time.sleep(DELTA) 932 933 self.assertEqual(queue_empty(queue), False) 934 self.assertEqual(queue_full(queue, MAXSIZE), True) 935 936 put = TimingWrapper(queue.put) 937 put_nowait = TimingWrapper(queue.put_nowait) 938 939 self.assertRaises(pyqueue.Full, put, 7, False) 940 self.assertTimingAlmostEqual(put.elapsed, 0) 941 942 self.assertRaises(pyqueue.Full, put, 7, False, None) 943 self.assertTimingAlmostEqual(put.elapsed, 0) 944 945 self.assertRaises(pyqueue.Full, put_nowait, 7) 946 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 947 948 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 949 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 950 951 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 952 self.assertTimingAlmostEqual(put.elapsed, 0) 953 954 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 955 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 956 957 child_can_start.set() 958 parent_can_continue.wait() 959 960 self.assertEqual(queue_empty(queue), True) 961 self.assertEqual(queue_full(queue, MAXSIZE), False) 962 963 proc.join() 964 close_queue(queue) 965 966 @classmethod 967 def _test_get(cls, queue, child_can_start, parent_can_continue): 968 child_can_start.wait() 969 #queue.put(1) 970 queue.put(2) 971 queue.put(3) 972 queue.put(4) 973 queue.put(5) 974 parent_can_continue.set() 975 976 def test_get(self): 977 queue = self.Queue() 978 child_can_start = self.Event() 979 parent_can_continue = self.Event() 980 981 proc = self.Process( 982 target=self._test_get, 983 args=(queue, child_can_start, parent_can_continue) 984 ) 985 proc.daemon = True 986 proc.start() 987 988 self.assertEqual(queue_empty(queue), True) 989 990 child_can_start.set() 991 parent_can_continue.wait() 992 993 time.sleep(DELTA) 994 self.assertEqual(queue_empty(queue), False) 995 996 # Hangs unexpectedly, remove for now 997 #self.assertEqual(queue.get(), 1) 998 self.assertEqual(queue.get(True, None), 2) 999 self.assertEqual(queue.get(True), 3) 1000 self.assertEqual(queue.get(timeout=1), 4) 1001 self.assertEqual(queue.get_nowait(), 5) 1002 1003 self.assertEqual(queue_empty(queue), True) 1004 1005 get = TimingWrapper(queue.get) 1006 get_nowait = TimingWrapper(queue.get_nowait) 1007 1008 self.assertRaises(pyqueue.Empty, get, False) 1009 self.assertTimingAlmostEqual(get.elapsed, 0) 1010 1011 self.assertRaises(pyqueue.Empty, get, False, None) 1012 self.assertTimingAlmostEqual(get.elapsed, 0) 1013 1014 self.assertRaises(pyqueue.Empty, get_nowait) 1015 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 1016 1017 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 1018 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1019 1020 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 1021 self.assertTimingAlmostEqual(get.elapsed, 0) 1022 1023 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 1024 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 1025 1026 proc.join() 1027 close_queue(queue) 1028 1029 @classmethod 1030 def _test_fork(cls, queue): 1031 for i in range(10, 20): 1032 queue.put(i) 1033 # note that at this point the items may only be buffered, so the 1034 # process cannot shutdown until the feeder thread has finished 1035 # pushing items onto the pipe. 1036 1037 def test_fork(self): 1038 # Old versions of Queue would fail to create a new feeder 1039 # thread for a forked process if the original process had its 1040 # own feeder thread. This test checks that this no longer 1041 # happens. 1042 1043 queue = self.Queue() 1044 1045 # put items on queue so that main process starts a feeder thread 1046 for i in range(10): 1047 queue.put(i) 1048 1049 # wait to make sure thread starts before we fork a new process 1050 time.sleep(DELTA) 1051 1052 # fork process 1053 p = self.Process(target=self._test_fork, args=(queue,)) 1054 p.daemon = True 1055 p.start() 1056 1057 # check that all expected items are in the queue 1058 for i in range(20): 1059 self.assertEqual(queue.get(), i) 1060 self.assertRaises(pyqueue.Empty, queue.get, False) 1061 1062 p.join() 1063 close_queue(queue) 1064 1065 def test_qsize(self): 1066 q = self.Queue() 1067 try: 1068 self.assertEqual(q.qsize(), 0) 1069 except NotImplementedError: 1070 self.skipTest('qsize method not implemented') 1071 q.put(1) 1072 self.assertEqual(q.qsize(), 1) 1073 q.put(5) 1074 self.assertEqual(q.qsize(), 2) 1075 q.get() 1076 self.assertEqual(q.qsize(), 1) 1077 q.get() 1078 self.assertEqual(q.qsize(), 0) 1079 close_queue(q) 1080 1081 @classmethod 1082 def _test_task_done(cls, q): 1083 for obj in iter(q.get, None): 1084 time.sleep(DELTA) 1085 q.task_done() 1086 1087 def test_task_done(self): 1088 queue = self.JoinableQueue() 1089 1090 workers = [self.Process(target=self._test_task_done, args=(queue,)) 1091 for i in range(4)] 1092 1093 for p in workers: 1094 p.daemon = True 1095 p.start() 1096 1097 for i in range(10): 1098 queue.put(i) 1099 1100 queue.join() 1101 1102 for p in workers: 1103 queue.put(None) 1104 1105 for p in workers: 1106 p.join() 1107 close_queue(queue) 1108 1109 def test_no_import_lock_contention(self): 1110 with test.support.temp_cwd(): 1111 module_name = 'imported_by_an_imported_module' 1112 with open(module_name + '.py', 'w') as f: 1113 f.write("""if 1: 1114 import multiprocessing 1115 1116 q = multiprocessing.Queue() 1117 q.put('knock knock') 1118 q.get(timeout=3) 1119 q.close() 1120 del q 1121 """) 1122 1123 with test.support.DirsOnSysPath(os.getcwd()): 1124 try: 1125 __import__(module_name) 1126 except pyqueue.Empty: 1127 self.fail("Probable regression on import lock contention;" 1128 " see Issue #22853") 1129 1130 def test_timeout(self): 1131 q = multiprocessing.Queue() 1132 start = time.monotonic() 1133 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 1134 delta = time.monotonic() - start 1135 # bpo-30317: Tolerate a delta of 100 ms because of the bad clock 1136 # resolution on Windows (usually 15.6 ms). x86 Windows7 3.x once 1137 # failed because the delta was only 135.8 ms. 1138 self.assertGreaterEqual(delta, 0.100) 1139 close_queue(q) 1140 1141 def test_queue_feeder_donot_stop_onexc(self): 1142 # bpo-30414: verify feeder handles exceptions correctly 1143 if self.TYPE != 'processes': 1144 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1145 1146 class NotSerializable(object): 1147 def __reduce__(self): 1148 raise AttributeError 1149 with test.support.captured_stderr(): 1150 q = self.Queue() 1151 q.put(NotSerializable()) 1152 q.put(True) 1153 self.assertTrue(q.get(timeout=TIMEOUT)) 1154 close_queue(q) 1155 1156 with test.support.captured_stderr(): 1157 # bpo-33078: verify that the queue size is correctly handled 1158 # on errors. 1159 q = self.Queue(maxsize=1) 1160 q.put(NotSerializable()) 1161 q.put(True) 1162 try: 1163 self.assertEqual(q.qsize(), 1) 1164 except NotImplementedError: 1165 # qsize is not available on all platform as it 1166 # relies on sem_getvalue 1167 pass 1168 # bpo-30595: use a timeout of 1 second for slow buildbots 1169 self.assertTrue(q.get(timeout=1.0)) 1170 # Check that the size of the queue is correct 1171 self.assertTrue(q.empty()) 1172 close_queue(q) 1173 1174 def test_queue_feeder_on_queue_feeder_error(self): 1175 # bpo-30006: verify feeder handles exceptions using the 1176 # _on_queue_feeder_error hook. 1177 if self.TYPE != 'processes': 1178 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1179 1180 class NotSerializable(object): 1181 """Mock unserializable object""" 1182 def __init__(self): 1183 self.reduce_was_called = False 1184 self.on_queue_feeder_error_was_called = False 1185 1186 def __reduce__(self): 1187 self.reduce_was_called = True 1188 raise AttributeError 1189 1190 class SafeQueue(multiprocessing.queues.Queue): 1191 """Queue with overloaded _on_queue_feeder_error hook""" 1192 @staticmethod 1193 def _on_queue_feeder_error(e, obj): 1194 if (isinstance(e, AttributeError) and 1195 isinstance(obj, NotSerializable)): 1196 obj.on_queue_feeder_error_was_called = True 1197 1198 not_serializable_obj = NotSerializable() 1199 # The captured_stderr reduces the noise in the test report 1200 with test.support.captured_stderr(): 1201 q = SafeQueue(ctx=multiprocessing.get_context()) 1202 q.put(not_serializable_obj) 1203 1204 # Verify that q is still functioning correctly 1205 q.put(True) 1206 self.assertTrue(q.get(timeout=1.0)) 1207 1208 # Assert that the serialization and the hook have been called correctly 1209 self.assertTrue(not_serializable_obj.reduce_was_called) 1210 self.assertTrue(not_serializable_obj.on_queue_feeder_error_was_called) 1211 1212 def test_closed_queue_put_get_exceptions(self): 1213 for q in multiprocessing.Queue(), multiprocessing.JoinableQueue(): 1214 q.close() 1215 with self.assertRaisesRegex(ValueError, 'is closed'): 1216 q.put('foo') 1217 with self.assertRaisesRegex(ValueError, 'is closed'): 1218 q.get() 1219# 1220# 1221# 1222 1223class _TestLock(BaseTestCase): 1224 1225 def test_lock(self): 1226 lock = self.Lock() 1227 self.assertEqual(lock.acquire(), True) 1228 self.assertEqual(lock.acquire(False), False) 1229 self.assertEqual(lock.release(), None) 1230 self.assertRaises((ValueError, threading.ThreadError), lock.release) 1231 1232 def test_rlock(self): 1233 lock = self.RLock() 1234 self.assertEqual(lock.acquire(), True) 1235 self.assertEqual(lock.acquire(), True) 1236 self.assertEqual(lock.acquire(), True) 1237 self.assertEqual(lock.release(), None) 1238 self.assertEqual(lock.release(), None) 1239 self.assertEqual(lock.release(), None) 1240 self.assertRaises((AssertionError, RuntimeError), lock.release) 1241 1242 def test_lock_context(self): 1243 with self.Lock(): 1244 pass 1245 1246 1247class _TestSemaphore(BaseTestCase): 1248 1249 def _test_semaphore(self, sem): 1250 self.assertReturnsIfImplemented(2, get_value, sem) 1251 self.assertEqual(sem.acquire(), True) 1252 self.assertReturnsIfImplemented(1, get_value, sem) 1253 self.assertEqual(sem.acquire(), True) 1254 self.assertReturnsIfImplemented(0, get_value, sem) 1255 self.assertEqual(sem.acquire(False), False) 1256 self.assertReturnsIfImplemented(0, get_value, sem) 1257 self.assertEqual(sem.release(), None) 1258 self.assertReturnsIfImplemented(1, get_value, sem) 1259 self.assertEqual(sem.release(), None) 1260 self.assertReturnsIfImplemented(2, get_value, sem) 1261 1262 def test_semaphore(self): 1263 sem = self.Semaphore(2) 1264 self._test_semaphore(sem) 1265 self.assertEqual(sem.release(), None) 1266 self.assertReturnsIfImplemented(3, get_value, sem) 1267 self.assertEqual(sem.release(), None) 1268 self.assertReturnsIfImplemented(4, get_value, sem) 1269 1270 def test_bounded_semaphore(self): 1271 sem = self.BoundedSemaphore(2) 1272 self._test_semaphore(sem) 1273 # Currently fails on OS/X 1274 #if HAVE_GETVALUE: 1275 # self.assertRaises(ValueError, sem.release) 1276 # self.assertReturnsIfImplemented(2, get_value, sem) 1277 1278 def test_timeout(self): 1279 if self.TYPE != 'processes': 1280 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1281 1282 sem = self.Semaphore(0) 1283 acquire = TimingWrapper(sem.acquire) 1284 1285 self.assertEqual(acquire(False), False) 1286 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1287 1288 self.assertEqual(acquire(False, None), False) 1289 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 1290 1291 self.assertEqual(acquire(False, TIMEOUT1), False) 1292 self.assertTimingAlmostEqual(acquire.elapsed, 0) 1293 1294 self.assertEqual(acquire(True, TIMEOUT2), False) 1295 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 1296 1297 self.assertEqual(acquire(timeout=TIMEOUT3), False) 1298 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 1299 1300 1301class _TestCondition(BaseTestCase): 1302 1303 @classmethod 1304 def f(cls, cond, sleeping, woken, timeout=None): 1305 cond.acquire() 1306 sleeping.release() 1307 cond.wait(timeout) 1308 woken.release() 1309 cond.release() 1310 1311 def assertReachesEventually(self, func, value): 1312 for i in range(10): 1313 try: 1314 if func() == value: 1315 break 1316 except NotImplementedError: 1317 break 1318 time.sleep(DELTA) 1319 time.sleep(DELTA) 1320 self.assertReturnsIfImplemented(value, func) 1321 1322 def check_invariant(self, cond): 1323 # this is only supposed to succeed when there are no sleepers 1324 if self.TYPE == 'processes': 1325 try: 1326 sleepers = (cond._sleeping_count.get_value() - 1327 cond._woken_count.get_value()) 1328 self.assertEqual(sleepers, 0) 1329 self.assertEqual(cond._wait_semaphore.get_value(), 0) 1330 except NotImplementedError: 1331 pass 1332 1333 def test_notify(self): 1334 cond = self.Condition() 1335 sleeping = self.Semaphore(0) 1336 woken = self.Semaphore(0) 1337 1338 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1339 p.daemon = True 1340 p.start() 1341 self.addCleanup(p.join) 1342 1343 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1344 p.daemon = True 1345 p.start() 1346 self.addCleanup(p.join) 1347 1348 # wait for both children to start sleeping 1349 sleeping.acquire() 1350 sleeping.acquire() 1351 1352 # check no process/thread has woken up 1353 time.sleep(DELTA) 1354 self.assertReturnsIfImplemented(0, get_value, woken) 1355 1356 # wake up one process/thread 1357 cond.acquire() 1358 cond.notify() 1359 cond.release() 1360 1361 # check one process/thread has woken up 1362 time.sleep(DELTA) 1363 self.assertReturnsIfImplemented(1, get_value, woken) 1364 1365 # wake up another 1366 cond.acquire() 1367 cond.notify() 1368 cond.release() 1369 1370 # check other has woken up 1371 time.sleep(DELTA) 1372 self.assertReturnsIfImplemented(2, get_value, woken) 1373 1374 # check state is not mucked up 1375 self.check_invariant(cond) 1376 p.join() 1377 1378 def test_notify_all(self): 1379 cond = self.Condition() 1380 sleeping = self.Semaphore(0) 1381 woken = self.Semaphore(0) 1382 1383 # start some threads/processes which will timeout 1384 for i in range(3): 1385 p = self.Process(target=self.f, 1386 args=(cond, sleeping, woken, TIMEOUT1)) 1387 p.daemon = True 1388 p.start() 1389 self.addCleanup(p.join) 1390 1391 t = threading.Thread(target=self.f, 1392 args=(cond, sleeping, woken, TIMEOUT1)) 1393 t.daemon = True 1394 t.start() 1395 self.addCleanup(t.join) 1396 1397 # wait for them all to sleep 1398 for i in range(6): 1399 sleeping.acquire() 1400 1401 # check they have all timed out 1402 for i in range(6): 1403 woken.acquire() 1404 self.assertReturnsIfImplemented(0, get_value, woken) 1405 1406 # check state is not mucked up 1407 self.check_invariant(cond) 1408 1409 # start some more threads/processes 1410 for i in range(3): 1411 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1412 p.daemon = True 1413 p.start() 1414 self.addCleanup(p.join) 1415 1416 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1417 t.daemon = True 1418 t.start() 1419 self.addCleanup(t.join) 1420 1421 # wait for them to all sleep 1422 for i in range(6): 1423 sleeping.acquire() 1424 1425 # check no process/thread has woken up 1426 time.sleep(DELTA) 1427 self.assertReturnsIfImplemented(0, get_value, woken) 1428 1429 # wake them all up 1430 cond.acquire() 1431 cond.notify_all() 1432 cond.release() 1433 1434 # check they have all woken 1435 self.assertReachesEventually(lambda: get_value(woken), 6) 1436 1437 # check state is not mucked up 1438 self.check_invariant(cond) 1439 1440 def test_notify_n(self): 1441 cond = self.Condition() 1442 sleeping = self.Semaphore(0) 1443 woken = self.Semaphore(0) 1444 1445 # start some threads/processes 1446 for i in range(3): 1447 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 1448 p.daemon = True 1449 p.start() 1450 self.addCleanup(p.join) 1451 1452 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 1453 t.daemon = True 1454 t.start() 1455 self.addCleanup(t.join) 1456 1457 # wait for them to all sleep 1458 for i in range(6): 1459 sleeping.acquire() 1460 1461 # check no process/thread has woken up 1462 time.sleep(DELTA) 1463 self.assertReturnsIfImplemented(0, get_value, woken) 1464 1465 # wake some of them up 1466 cond.acquire() 1467 cond.notify(n=2) 1468 cond.release() 1469 1470 # check 2 have woken 1471 self.assertReachesEventually(lambda: get_value(woken), 2) 1472 1473 # wake the rest of them 1474 cond.acquire() 1475 cond.notify(n=4) 1476 cond.release() 1477 1478 self.assertReachesEventually(lambda: get_value(woken), 6) 1479 1480 # doesn't do anything more 1481 cond.acquire() 1482 cond.notify(n=3) 1483 cond.release() 1484 1485 self.assertReturnsIfImplemented(6, get_value, woken) 1486 1487 # check state is not mucked up 1488 self.check_invariant(cond) 1489 1490 def test_timeout(self): 1491 cond = self.Condition() 1492 wait = TimingWrapper(cond.wait) 1493 cond.acquire() 1494 res = wait(TIMEOUT1) 1495 cond.release() 1496 self.assertEqual(res, False) 1497 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1498 1499 @classmethod 1500 def _test_waitfor_f(cls, cond, state): 1501 with cond: 1502 state.value = 0 1503 cond.notify() 1504 result = cond.wait_for(lambda : state.value==4) 1505 if not result or state.value != 4: 1506 sys.exit(1) 1507 1508 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1509 def test_waitfor(self): 1510 # based on test in test/lock_tests.py 1511 cond = self.Condition() 1512 state = self.Value('i', -1) 1513 1514 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 1515 p.daemon = True 1516 p.start() 1517 1518 with cond: 1519 result = cond.wait_for(lambda : state.value==0) 1520 self.assertTrue(result) 1521 self.assertEqual(state.value, 0) 1522 1523 for i in range(4): 1524 time.sleep(0.01) 1525 with cond: 1526 state.value += 1 1527 cond.notify() 1528 1529 join_process(p) 1530 self.assertEqual(p.exitcode, 0) 1531 1532 @classmethod 1533 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1534 sem.release() 1535 with cond: 1536 expected = 0.1 1537 dt = time.monotonic() 1538 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1539 dt = time.monotonic() - dt 1540 # borrow logic in assertTimeout() from test/lock_tests.py 1541 if not result and expected * 0.6 < dt < expected * 10.0: 1542 success.value = True 1543 1544 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1545 def test_waitfor_timeout(self): 1546 # based on test in test/lock_tests.py 1547 cond = self.Condition() 1548 state = self.Value('i', 0) 1549 success = self.Value('i', False) 1550 sem = self.Semaphore(0) 1551 1552 p = self.Process(target=self._test_waitfor_timeout_f, 1553 args=(cond, state, success, sem)) 1554 p.daemon = True 1555 p.start() 1556 self.assertTrue(sem.acquire(timeout=TIMEOUT)) 1557 1558 # Only increment 3 times, so state == 4 is never reached. 1559 for i in range(3): 1560 time.sleep(0.01) 1561 with cond: 1562 state.value += 1 1563 cond.notify() 1564 1565 join_process(p) 1566 self.assertTrue(success.value) 1567 1568 @classmethod 1569 def _test_wait_result(cls, c, pid): 1570 with c: 1571 c.notify() 1572 time.sleep(1) 1573 if pid is not None: 1574 os.kill(pid, signal.SIGINT) 1575 1576 def test_wait_result(self): 1577 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1578 pid = os.getpid() 1579 else: 1580 pid = None 1581 1582 c = self.Condition() 1583 with c: 1584 self.assertFalse(c.wait(0)) 1585 self.assertFalse(c.wait(0.1)) 1586 1587 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1588 p.start() 1589 1590 self.assertTrue(c.wait(60)) 1591 if pid is not None: 1592 self.assertRaises(KeyboardInterrupt, c.wait, 60) 1593 1594 p.join() 1595 1596 1597class _TestEvent(BaseTestCase): 1598 1599 @classmethod 1600 def _test_event(cls, event): 1601 time.sleep(TIMEOUT2) 1602 event.set() 1603 1604 def test_event(self): 1605 event = self.Event() 1606 wait = TimingWrapper(event.wait) 1607 1608 # Removed temporarily, due to API shear, this does not 1609 # work with threading._Event objects. is_set == isSet 1610 self.assertEqual(event.is_set(), False) 1611 1612 # Removed, threading.Event.wait() will return the value of the __flag 1613 # instead of None. API Shear with the semaphore backed mp.Event 1614 self.assertEqual(wait(0.0), False) 1615 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1616 self.assertEqual(wait(TIMEOUT1), False) 1617 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1618 1619 event.set() 1620 1621 # See note above on the API differences 1622 self.assertEqual(event.is_set(), True) 1623 self.assertEqual(wait(), True) 1624 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1625 self.assertEqual(wait(TIMEOUT1), True) 1626 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1627 # self.assertEqual(event.is_set(), True) 1628 1629 event.clear() 1630 1631 #self.assertEqual(event.is_set(), False) 1632 1633 p = self.Process(target=self._test_event, args=(event,)) 1634 p.daemon = True 1635 p.start() 1636 self.assertEqual(wait(), True) 1637 p.join() 1638 1639# 1640# Tests for Barrier - adapted from tests in test/lock_tests.py 1641# 1642 1643# Many of the tests for threading.Barrier use a list as an atomic 1644# counter: a value is appended to increment the counter, and the 1645# length of the list gives the value. We use the class DummyList 1646# for the same purpose. 1647 1648class _DummyList(object): 1649 1650 def __init__(self): 1651 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1652 lock = multiprocessing.Lock() 1653 self.__setstate__((wrapper, lock)) 1654 self._lengthbuf[0] = 0 1655 1656 def __setstate__(self, state): 1657 (self._wrapper, self._lock) = state 1658 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1659 1660 def __getstate__(self): 1661 return (self._wrapper, self._lock) 1662 1663 def append(self, _): 1664 with self._lock: 1665 self._lengthbuf[0] += 1 1666 1667 def __len__(self): 1668 with self._lock: 1669 return self._lengthbuf[0] 1670 1671def _wait(): 1672 # A crude wait/yield function not relying on synchronization primitives. 1673 time.sleep(0.01) 1674 1675 1676class Bunch(object): 1677 """ 1678 A bunch of threads. 1679 """ 1680 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1681 """ 1682 Construct a bunch of `n` threads running the same function `f`. 1683 If `wait_before_exit` is True, the threads won't terminate until 1684 do_finish() is called. 1685 """ 1686 self.f = f 1687 self.args = args 1688 self.n = n 1689 self.started = namespace.DummyList() 1690 self.finished = namespace.DummyList() 1691 self._can_exit = namespace.Event() 1692 if not wait_before_exit: 1693 self._can_exit.set() 1694 1695 threads = [] 1696 for i in range(n): 1697 p = namespace.Process(target=self.task) 1698 p.daemon = True 1699 p.start() 1700 threads.append(p) 1701 1702 def finalize(threads): 1703 for p in threads: 1704 p.join() 1705 1706 self._finalizer = weakref.finalize(self, finalize, threads) 1707 1708 def task(self): 1709 pid = os.getpid() 1710 self.started.append(pid) 1711 try: 1712 self.f(*self.args) 1713 finally: 1714 self.finished.append(pid) 1715 self._can_exit.wait(30) 1716 assert self._can_exit.is_set() 1717 1718 def wait_for_started(self): 1719 while len(self.started) < self.n: 1720 _wait() 1721 1722 def wait_for_finished(self): 1723 while len(self.finished) < self.n: 1724 _wait() 1725 1726 def do_finish(self): 1727 self._can_exit.set() 1728 1729 def close(self): 1730 self._finalizer() 1731 1732 1733class AppendTrue(object): 1734 def __init__(self, obj): 1735 self.obj = obj 1736 def __call__(self): 1737 self.obj.append(True) 1738 1739 1740class _TestBarrier(BaseTestCase): 1741 """ 1742 Tests for Barrier objects. 1743 """ 1744 N = 5 1745 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1746 1747 def setUp(self): 1748 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1749 1750 def tearDown(self): 1751 self.barrier.abort() 1752 self.barrier = None 1753 1754 def DummyList(self): 1755 if self.TYPE == 'threads': 1756 return [] 1757 elif self.TYPE == 'manager': 1758 return self.manager.list() 1759 else: 1760 return _DummyList() 1761 1762 def run_threads(self, f, args): 1763 b = Bunch(self, f, args, self.N-1) 1764 try: 1765 f(*args) 1766 b.wait_for_finished() 1767 finally: 1768 b.close() 1769 1770 @classmethod 1771 def multipass(cls, barrier, results, n): 1772 m = barrier.parties 1773 assert m == cls.N 1774 for i in range(n): 1775 results[0].append(True) 1776 assert len(results[1]) == i * m 1777 barrier.wait() 1778 results[1].append(True) 1779 assert len(results[0]) == (i + 1) * m 1780 barrier.wait() 1781 try: 1782 assert barrier.n_waiting == 0 1783 except NotImplementedError: 1784 pass 1785 assert not barrier.broken 1786 1787 def test_barrier(self, passes=1): 1788 """ 1789 Test that a barrier is passed in lockstep 1790 """ 1791 results = [self.DummyList(), self.DummyList()] 1792 self.run_threads(self.multipass, (self.barrier, results, passes)) 1793 1794 def test_barrier_10(self): 1795 """ 1796 Test that a barrier works for 10 consecutive runs 1797 """ 1798 return self.test_barrier(10) 1799 1800 @classmethod 1801 def _test_wait_return_f(cls, barrier, queue): 1802 res = barrier.wait() 1803 queue.put(res) 1804 1805 def test_wait_return(self): 1806 """ 1807 test the return value from barrier.wait 1808 """ 1809 queue = self.Queue() 1810 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1811 results = [queue.get() for i in range(self.N)] 1812 self.assertEqual(results.count(0), 1) 1813 close_queue(queue) 1814 1815 @classmethod 1816 def _test_action_f(cls, barrier, results): 1817 barrier.wait() 1818 if len(results) != 1: 1819 raise RuntimeError 1820 1821 def test_action(self): 1822 """ 1823 Test the 'action' callback 1824 """ 1825 results = self.DummyList() 1826 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1827 self.run_threads(self._test_action_f, (barrier, results)) 1828 self.assertEqual(len(results), 1) 1829 1830 @classmethod 1831 def _test_abort_f(cls, barrier, results1, results2): 1832 try: 1833 i = barrier.wait() 1834 if i == cls.N//2: 1835 raise RuntimeError 1836 barrier.wait() 1837 results1.append(True) 1838 except threading.BrokenBarrierError: 1839 results2.append(True) 1840 except RuntimeError: 1841 barrier.abort() 1842 1843 def test_abort(self): 1844 """ 1845 Test that an abort will put the barrier in a broken state 1846 """ 1847 results1 = self.DummyList() 1848 results2 = self.DummyList() 1849 self.run_threads(self._test_abort_f, 1850 (self.barrier, results1, results2)) 1851 self.assertEqual(len(results1), 0) 1852 self.assertEqual(len(results2), self.N-1) 1853 self.assertTrue(self.barrier.broken) 1854 1855 @classmethod 1856 def _test_reset_f(cls, barrier, results1, results2, results3): 1857 i = barrier.wait() 1858 if i == cls.N//2: 1859 # Wait until the other threads are all in the barrier. 1860 while barrier.n_waiting < cls.N-1: 1861 time.sleep(0.001) 1862 barrier.reset() 1863 else: 1864 try: 1865 barrier.wait() 1866 results1.append(True) 1867 except threading.BrokenBarrierError: 1868 results2.append(True) 1869 # Now, pass the barrier again 1870 barrier.wait() 1871 results3.append(True) 1872 1873 def test_reset(self): 1874 """ 1875 Test that a 'reset' on a barrier frees the waiting threads 1876 """ 1877 results1 = self.DummyList() 1878 results2 = self.DummyList() 1879 results3 = self.DummyList() 1880 self.run_threads(self._test_reset_f, 1881 (self.barrier, results1, results2, results3)) 1882 self.assertEqual(len(results1), 0) 1883 self.assertEqual(len(results2), self.N-1) 1884 self.assertEqual(len(results3), self.N) 1885 1886 @classmethod 1887 def _test_abort_and_reset_f(cls, barrier, barrier2, 1888 results1, results2, results3): 1889 try: 1890 i = barrier.wait() 1891 if i == cls.N//2: 1892 raise RuntimeError 1893 barrier.wait() 1894 results1.append(True) 1895 except threading.BrokenBarrierError: 1896 results2.append(True) 1897 except RuntimeError: 1898 barrier.abort() 1899 # Synchronize and reset the barrier. Must synchronize first so 1900 # that everyone has left it when we reset, and after so that no 1901 # one enters it before the reset. 1902 if barrier2.wait() == cls.N//2: 1903 barrier.reset() 1904 barrier2.wait() 1905 barrier.wait() 1906 results3.append(True) 1907 1908 def test_abort_and_reset(self): 1909 """ 1910 Test that a barrier can be reset after being broken. 1911 """ 1912 results1 = self.DummyList() 1913 results2 = self.DummyList() 1914 results3 = self.DummyList() 1915 barrier2 = self.Barrier(self.N) 1916 1917 self.run_threads(self._test_abort_and_reset_f, 1918 (self.barrier, barrier2, results1, results2, results3)) 1919 self.assertEqual(len(results1), 0) 1920 self.assertEqual(len(results2), self.N-1) 1921 self.assertEqual(len(results3), self.N) 1922 1923 @classmethod 1924 def _test_timeout_f(cls, barrier, results): 1925 i = barrier.wait() 1926 if i == cls.N//2: 1927 # One thread is late! 1928 time.sleep(1.0) 1929 try: 1930 barrier.wait(0.5) 1931 except threading.BrokenBarrierError: 1932 results.append(True) 1933 1934 def test_timeout(self): 1935 """ 1936 Test wait(timeout) 1937 """ 1938 results = self.DummyList() 1939 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1940 self.assertEqual(len(results), self.barrier.parties) 1941 1942 @classmethod 1943 def _test_default_timeout_f(cls, barrier, results): 1944 i = barrier.wait(cls.defaultTimeout) 1945 if i == cls.N//2: 1946 # One thread is later than the default timeout 1947 time.sleep(1.0) 1948 try: 1949 barrier.wait() 1950 except threading.BrokenBarrierError: 1951 results.append(True) 1952 1953 def test_default_timeout(self): 1954 """ 1955 Test the barrier's default timeout 1956 """ 1957 barrier = self.Barrier(self.N, timeout=0.5) 1958 results = self.DummyList() 1959 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1960 self.assertEqual(len(results), barrier.parties) 1961 1962 def test_single_thread(self): 1963 b = self.Barrier(1) 1964 b.wait() 1965 b.wait() 1966 1967 @classmethod 1968 def _test_thousand_f(cls, barrier, passes, conn, lock): 1969 for i in range(passes): 1970 barrier.wait() 1971 with lock: 1972 conn.send(i) 1973 1974 def test_thousand(self): 1975 if self.TYPE == 'manager': 1976 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1977 passes = 1000 1978 lock = self.Lock() 1979 conn, child_conn = self.Pipe(False) 1980 for j in range(self.N): 1981 p = self.Process(target=self._test_thousand_f, 1982 args=(self.barrier, passes, child_conn, lock)) 1983 p.start() 1984 self.addCleanup(p.join) 1985 1986 for i in range(passes): 1987 for j in range(self.N): 1988 self.assertEqual(conn.recv(), i) 1989 1990# 1991# 1992# 1993 1994class _TestValue(BaseTestCase): 1995 1996 ALLOWED_TYPES = ('processes',) 1997 1998 codes_values = [ 1999 ('i', 4343, 24234), 2000 ('d', 3.625, -4.25), 2001 ('h', -232, 234), 2002 ('q', 2 ** 33, 2 ** 34), 2003 ('c', latin('x'), latin('y')) 2004 ] 2005 2006 def setUp(self): 2007 if not HAS_SHAREDCTYPES: 2008 self.skipTest("requires multiprocessing.sharedctypes") 2009 2010 @classmethod 2011 def _test(cls, values): 2012 for sv, cv in zip(values, cls.codes_values): 2013 sv.value = cv[2] 2014 2015 2016 def test_value(self, raw=False): 2017 if raw: 2018 values = [self.RawValue(code, value) 2019 for code, value, _ in self.codes_values] 2020 else: 2021 values = [self.Value(code, value) 2022 for code, value, _ in self.codes_values] 2023 2024 for sv, cv in zip(values, self.codes_values): 2025 self.assertEqual(sv.value, cv[1]) 2026 2027 proc = self.Process(target=self._test, args=(values,)) 2028 proc.daemon = True 2029 proc.start() 2030 proc.join() 2031 2032 for sv, cv in zip(values, self.codes_values): 2033 self.assertEqual(sv.value, cv[2]) 2034 2035 def test_rawvalue(self): 2036 self.test_value(raw=True) 2037 2038 def test_getobj_getlock(self): 2039 val1 = self.Value('i', 5) 2040 lock1 = val1.get_lock() 2041 obj1 = val1.get_obj() 2042 2043 val2 = self.Value('i', 5, lock=None) 2044 lock2 = val2.get_lock() 2045 obj2 = val2.get_obj() 2046 2047 lock = self.Lock() 2048 val3 = self.Value('i', 5, lock=lock) 2049 lock3 = val3.get_lock() 2050 obj3 = val3.get_obj() 2051 self.assertEqual(lock, lock3) 2052 2053 arr4 = self.Value('i', 5, lock=False) 2054 self.assertFalse(hasattr(arr4, 'get_lock')) 2055 self.assertFalse(hasattr(arr4, 'get_obj')) 2056 2057 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 2058 2059 arr5 = self.RawValue('i', 5) 2060 self.assertFalse(hasattr(arr5, 'get_lock')) 2061 self.assertFalse(hasattr(arr5, 'get_obj')) 2062 2063 2064class _TestArray(BaseTestCase): 2065 2066 ALLOWED_TYPES = ('processes',) 2067 2068 @classmethod 2069 def f(cls, seq): 2070 for i in range(1, len(seq)): 2071 seq[i] += seq[i-1] 2072 2073 @unittest.skipIf(c_int is None, "requires _ctypes") 2074 def test_array(self, raw=False): 2075 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 2076 if raw: 2077 arr = self.RawArray('i', seq) 2078 else: 2079 arr = self.Array('i', seq) 2080 2081 self.assertEqual(len(arr), len(seq)) 2082 self.assertEqual(arr[3], seq[3]) 2083 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 2084 2085 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 2086 2087 self.assertEqual(list(arr[:]), seq) 2088 2089 self.f(seq) 2090 2091 p = self.Process(target=self.f, args=(arr,)) 2092 p.daemon = True 2093 p.start() 2094 p.join() 2095 2096 self.assertEqual(list(arr[:]), seq) 2097 2098 @unittest.skipIf(c_int is None, "requires _ctypes") 2099 def test_array_from_size(self): 2100 size = 10 2101 # Test for zeroing (see issue #11675). 2102 # The repetition below strengthens the test by increasing the chances 2103 # of previously allocated non-zero memory being used for the new array 2104 # on the 2nd and 3rd loops. 2105 for _ in range(3): 2106 arr = self.Array('i', size) 2107 self.assertEqual(len(arr), size) 2108 self.assertEqual(list(arr), [0] * size) 2109 arr[:] = range(10) 2110 self.assertEqual(list(arr), list(range(10))) 2111 del arr 2112 2113 @unittest.skipIf(c_int is None, "requires _ctypes") 2114 def test_rawarray(self): 2115 self.test_array(raw=True) 2116 2117 @unittest.skipIf(c_int is None, "requires _ctypes") 2118 def test_getobj_getlock_obj(self): 2119 arr1 = self.Array('i', list(range(10))) 2120 lock1 = arr1.get_lock() 2121 obj1 = arr1.get_obj() 2122 2123 arr2 = self.Array('i', list(range(10)), lock=None) 2124 lock2 = arr2.get_lock() 2125 obj2 = arr2.get_obj() 2126 2127 lock = self.Lock() 2128 arr3 = self.Array('i', list(range(10)), lock=lock) 2129 lock3 = arr3.get_lock() 2130 obj3 = arr3.get_obj() 2131 self.assertEqual(lock, lock3) 2132 2133 arr4 = self.Array('i', range(10), lock=False) 2134 self.assertFalse(hasattr(arr4, 'get_lock')) 2135 self.assertFalse(hasattr(arr4, 'get_obj')) 2136 self.assertRaises(AttributeError, 2137 self.Array, 'i', range(10), lock='notalock') 2138 2139 arr5 = self.RawArray('i', range(10)) 2140 self.assertFalse(hasattr(arr5, 'get_lock')) 2141 self.assertFalse(hasattr(arr5, 'get_obj')) 2142 2143# 2144# 2145# 2146 2147class _TestContainers(BaseTestCase): 2148 2149 ALLOWED_TYPES = ('manager',) 2150 2151 def test_list(self): 2152 a = self.list(list(range(10))) 2153 self.assertEqual(a[:], list(range(10))) 2154 2155 b = self.list() 2156 self.assertEqual(b[:], []) 2157 2158 b.extend(list(range(5))) 2159 self.assertEqual(b[:], list(range(5))) 2160 2161 self.assertEqual(b[2], 2) 2162 self.assertEqual(b[2:10], [2,3,4]) 2163 2164 b *= 2 2165 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 2166 2167 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 2168 2169 self.assertEqual(a[:], list(range(10))) 2170 2171 d = [a, b] 2172 e = self.list(d) 2173 self.assertEqual( 2174 [element[:] for element in e], 2175 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 2176 ) 2177 2178 f = self.list([a]) 2179 a.append('hello') 2180 self.assertEqual(f[0][:], [0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']) 2181 2182 def test_list_iter(self): 2183 a = self.list(list(range(10))) 2184 it = iter(a) 2185 self.assertEqual(list(it), list(range(10))) 2186 self.assertEqual(list(it), []) # exhausted 2187 # list modified during iteration 2188 it = iter(a) 2189 a[0] = 100 2190 self.assertEqual(next(it), 100) 2191 2192 def test_list_proxy_in_list(self): 2193 a = self.list([self.list(range(3)) for _i in range(3)]) 2194 self.assertEqual([inner[:] for inner in a], [[0, 1, 2]] * 3) 2195 2196 a[0][-1] = 55 2197 self.assertEqual(a[0][:], [0, 1, 55]) 2198 for i in range(1, 3): 2199 self.assertEqual(a[i][:], [0, 1, 2]) 2200 2201 self.assertEqual(a[1].pop(), 2) 2202 self.assertEqual(len(a[1]), 2) 2203 for i in range(0, 3, 2): 2204 self.assertEqual(len(a[i]), 3) 2205 2206 del a 2207 2208 b = self.list() 2209 b.append(b) 2210 del b 2211 2212 def test_dict(self): 2213 d = self.dict() 2214 indices = list(range(65, 70)) 2215 for i in indices: 2216 d[i] = chr(i) 2217 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 2218 self.assertEqual(sorted(d.keys()), indices) 2219 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 2220 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 2221 2222 def test_dict_iter(self): 2223 d = self.dict() 2224 indices = list(range(65, 70)) 2225 for i in indices: 2226 d[i] = chr(i) 2227 it = iter(d) 2228 self.assertEqual(list(it), indices) 2229 self.assertEqual(list(it), []) # exhausted 2230 # dictionary changed size during iteration 2231 it = iter(d) 2232 d.clear() 2233 self.assertRaises(RuntimeError, next, it) 2234 2235 def test_dict_proxy_nested(self): 2236 pets = self.dict(ferrets=2, hamsters=4) 2237 supplies = self.dict(water=10, feed=3) 2238 d = self.dict(pets=pets, supplies=supplies) 2239 2240 self.assertEqual(supplies['water'], 10) 2241 self.assertEqual(d['supplies']['water'], 10) 2242 2243 d['supplies']['blankets'] = 5 2244 self.assertEqual(supplies['blankets'], 5) 2245 self.assertEqual(d['supplies']['blankets'], 5) 2246 2247 d['supplies']['water'] = 7 2248 self.assertEqual(supplies['water'], 7) 2249 self.assertEqual(d['supplies']['water'], 7) 2250 2251 del pets 2252 del supplies 2253 self.assertEqual(d['pets']['ferrets'], 2) 2254 d['supplies']['blankets'] = 11 2255 self.assertEqual(d['supplies']['blankets'], 11) 2256 2257 pets = d['pets'] 2258 supplies = d['supplies'] 2259 supplies['water'] = 7 2260 self.assertEqual(supplies['water'], 7) 2261 self.assertEqual(d['supplies']['water'], 7) 2262 2263 d.clear() 2264 self.assertEqual(len(d), 0) 2265 self.assertEqual(supplies['water'], 7) 2266 self.assertEqual(pets['hamsters'], 4) 2267 2268 l = self.list([pets, supplies]) 2269 l[0]['marmots'] = 1 2270 self.assertEqual(pets['marmots'], 1) 2271 self.assertEqual(l[0]['marmots'], 1) 2272 2273 del pets 2274 del supplies 2275 self.assertEqual(l[0]['marmots'], 1) 2276 2277 outer = self.list([[88, 99], l]) 2278 self.assertIsInstance(outer[0], list) # Not a ListProxy 2279 self.assertEqual(outer[-1][-1]['feed'], 3) 2280 2281 def test_namespace(self): 2282 n = self.Namespace() 2283 n.name = 'Bob' 2284 n.job = 'Builder' 2285 n._hidden = 'hidden' 2286 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 2287 del n.job 2288 self.assertEqual(str(n), "Namespace(name='Bob')") 2289 self.assertTrue(hasattr(n, 'name')) 2290 self.assertTrue(not hasattr(n, 'job')) 2291 2292# 2293# 2294# 2295 2296def sqr(x, wait=0.0): 2297 time.sleep(wait) 2298 return x*x 2299 2300def mul(x, y): 2301 return x*y 2302 2303def raise_large_valuerror(wait): 2304 time.sleep(wait) 2305 raise ValueError("x" * 1024**2) 2306 2307def identity(x): 2308 return x 2309 2310class CountedObject(object): 2311 n_instances = 0 2312 2313 def __new__(cls): 2314 cls.n_instances += 1 2315 return object.__new__(cls) 2316 2317 def __del__(self): 2318 type(self).n_instances -= 1 2319 2320class SayWhenError(ValueError): pass 2321 2322def exception_throwing_generator(total, when): 2323 if when == -1: 2324 raise SayWhenError("Somebody said when") 2325 for i in range(total): 2326 if i == when: 2327 raise SayWhenError("Somebody said when") 2328 yield i 2329 2330 2331class _TestPool(BaseTestCase): 2332 2333 @classmethod 2334 def setUpClass(cls): 2335 super().setUpClass() 2336 cls.pool = cls.Pool(4) 2337 2338 @classmethod 2339 def tearDownClass(cls): 2340 cls.pool.terminate() 2341 cls.pool.join() 2342 cls.pool = None 2343 super().tearDownClass() 2344 2345 def test_apply(self): 2346 papply = self.pool.apply 2347 self.assertEqual(papply(sqr, (5,)), sqr(5)) 2348 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 2349 2350 def test_map(self): 2351 pmap = self.pool.map 2352 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 2353 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 2354 list(map(sqr, list(range(100))))) 2355 2356 def test_starmap(self): 2357 psmap = self.pool.starmap 2358 tuples = list(zip(range(10), range(9,-1, -1))) 2359 self.assertEqual(psmap(mul, tuples), 2360 list(itertools.starmap(mul, tuples))) 2361 tuples = list(zip(range(100), range(99,-1, -1))) 2362 self.assertEqual(psmap(mul, tuples, chunksize=20), 2363 list(itertools.starmap(mul, tuples))) 2364 2365 def test_starmap_async(self): 2366 tuples = list(zip(range(100), range(99,-1, -1))) 2367 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 2368 list(itertools.starmap(mul, tuples))) 2369 2370 def test_map_async(self): 2371 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 2372 list(map(sqr, list(range(10))))) 2373 2374 def test_map_async_callbacks(self): 2375 call_args = self.manager.list() if self.TYPE == 'manager' else [] 2376 self.pool.map_async(int, ['1'], 2377 callback=call_args.append, 2378 error_callback=call_args.append).wait() 2379 self.assertEqual(1, len(call_args)) 2380 self.assertEqual([1], call_args[0]) 2381 self.pool.map_async(int, ['a'], 2382 callback=call_args.append, 2383 error_callback=call_args.append).wait() 2384 self.assertEqual(2, len(call_args)) 2385 self.assertIsInstance(call_args[1], ValueError) 2386 2387 def test_map_unplicklable(self): 2388 # Issue #19425 -- failure to pickle should not cause a hang 2389 if self.TYPE == 'threads': 2390 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2391 class A(object): 2392 def __reduce__(self): 2393 raise RuntimeError('cannot pickle') 2394 with self.assertRaises(RuntimeError): 2395 self.pool.map(sqr, [A()]*10) 2396 2397 def test_map_chunksize(self): 2398 try: 2399 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 2400 except multiprocessing.TimeoutError: 2401 self.fail("pool.map_async with chunksize stalled on null list") 2402 2403 def test_map_handle_iterable_exception(self): 2404 if self.TYPE == 'manager': 2405 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2406 2407 # SayWhenError seen at the very first of the iterable 2408 with self.assertRaises(SayWhenError): 2409 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2410 # again, make sure it's reentrant 2411 with self.assertRaises(SayWhenError): 2412 self.pool.map(sqr, exception_throwing_generator(1, -1), 1) 2413 2414 with self.assertRaises(SayWhenError): 2415 self.pool.map(sqr, exception_throwing_generator(10, 3), 1) 2416 2417 class SpecialIterable: 2418 def __iter__(self): 2419 return self 2420 def __next__(self): 2421 raise SayWhenError 2422 def __len__(self): 2423 return 1 2424 with self.assertRaises(SayWhenError): 2425 self.pool.map(sqr, SpecialIterable(), 1) 2426 with self.assertRaises(SayWhenError): 2427 self.pool.map(sqr, SpecialIterable(), 1) 2428 2429 def test_async(self): 2430 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 2431 get = TimingWrapper(res.get) 2432 self.assertEqual(get(), 49) 2433 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 2434 2435 def test_async_timeout(self): 2436 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 2437 get = TimingWrapper(res.get) 2438 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 2439 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 2440 2441 def test_imap(self): 2442 it = self.pool.imap(sqr, list(range(10))) 2443 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 2444 2445 it = self.pool.imap(sqr, list(range(10))) 2446 for i in range(10): 2447 self.assertEqual(next(it), i*i) 2448 self.assertRaises(StopIteration, it.__next__) 2449 2450 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 2451 for i in range(1000): 2452 self.assertEqual(next(it), i*i) 2453 self.assertRaises(StopIteration, it.__next__) 2454 2455 def test_imap_handle_iterable_exception(self): 2456 if self.TYPE == 'manager': 2457 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2458 2459 # SayWhenError seen at the very first of the iterable 2460 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2461 self.assertRaises(SayWhenError, it.__next__) 2462 # again, make sure it's reentrant 2463 it = self.pool.imap(sqr, exception_throwing_generator(1, -1), 1) 2464 self.assertRaises(SayWhenError, it.__next__) 2465 2466 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 2467 for i in range(3): 2468 self.assertEqual(next(it), i*i) 2469 self.assertRaises(SayWhenError, it.__next__) 2470 2471 # SayWhenError seen at start of problematic chunk's results 2472 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 2473 for i in range(6): 2474 self.assertEqual(next(it), i*i) 2475 self.assertRaises(SayWhenError, it.__next__) 2476 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 2477 for i in range(4): 2478 self.assertEqual(next(it), i*i) 2479 self.assertRaises(SayWhenError, it.__next__) 2480 2481 def test_imap_unordered(self): 2482 it = self.pool.imap_unordered(sqr, list(range(10))) 2483 self.assertEqual(sorted(it), list(map(sqr, list(range(10))))) 2484 2485 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=100) 2486 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 2487 2488 def test_imap_unordered_handle_iterable_exception(self): 2489 if self.TYPE == 'manager': 2490 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2491 2492 # SayWhenError seen at the very first of the iterable 2493 it = self.pool.imap_unordered(sqr, 2494 exception_throwing_generator(1, -1), 2495 1) 2496 self.assertRaises(SayWhenError, it.__next__) 2497 # again, make sure it's reentrant 2498 it = self.pool.imap_unordered(sqr, 2499 exception_throwing_generator(1, -1), 2500 1) 2501 self.assertRaises(SayWhenError, it.__next__) 2502 2503 it = self.pool.imap_unordered(sqr, 2504 exception_throwing_generator(10, 3), 2505 1) 2506 expected_values = list(map(sqr, list(range(10)))) 2507 with self.assertRaises(SayWhenError): 2508 # imap_unordered makes it difficult to anticipate the SayWhenError 2509 for i in range(10): 2510 value = next(it) 2511 self.assertIn(value, expected_values) 2512 expected_values.remove(value) 2513 2514 it = self.pool.imap_unordered(sqr, 2515 exception_throwing_generator(20, 7), 2516 2) 2517 expected_values = list(map(sqr, list(range(20)))) 2518 with self.assertRaises(SayWhenError): 2519 for i in range(20): 2520 value = next(it) 2521 self.assertIn(value, expected_values) 2522 expected_values.remove(value) 2523 2524 def test_make_pool(self): 2525 expected_error = (RemoteError if self.TYPE == 'manager' 2526 else ValueError) 2527 2528 self.assertRaises(expected_error, self.Pool, -1) 2529 self.assertRaises(expected_error, self.Pool, 0) 2530 2531 if self.TYPE != 'manager': 2532 p = self.Pool(3) 2533 try: 2534 self.assertEqual(3, len(p._pool)) 2535 finally: 2536 p.close() 2537 p.join() 2538 2539 def test_terminate(self): 2540 result = self.pool.map_async( 2541 time.sleep, [0.1 for i in range(10000)], chunksize=1 2542 ) 2543 self.pool.terminate() 2544 join = TimingWrapper(self.pool.join) 2545 join() 2546 # Sanity check the pool didn't wait for all tasks to finish 2547 self.assertLess(join.elapsed, 2.0) 2548 2549 def test_empty_iterable(self): 2550 # See Issue 12157 2551 p = self.Pool(1) 2552 2553 self.assertEqual(p.map(sqr, []), []) 2554 self.assertEqual(list(p.imap(sqr, [])), []) 2555 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 2556 self.assertEqual(p.map_async(sqr, []).get(), []) 2557 2558 p.close() 2559 p.join() 2560 2561 def test_context(self): 2562 if self.TYPE == 'processes': 2563 L = list(range(10)) 2564 expected = [sqr(i) for i in L] 2565 with self.Pool(2) as p: 2566 r = p.map_async(sqr, L) 2567 self.assertEqual(r.get(), expected) 2568 p.join() 2569 self.assertRaises(ValueError, p.map_async, sqr, L) 2570 2571 @classmethod 2572 def _test_traceback(cls): 2573 raise RuntimeError(123) # some comment 2574 2575 def test_traceback(self): 2576 # We want ensure that the traceback from the child process is 2577 # contained in the traceback raised in the main process. 2578 if self.TYPE == 'processes': 2579 with self.Pool(1) as p: 2580 try: 2581 p.apply(self._test_traceback) 2582 except Exception as e: 2583 exc = e 2584 else: 2585 self.fail('expected RuntimeError') 2586 p.join() 2587 self.assertIs(type(exc), RuntimeError) 2588 self.assertEqual(exc.args, (123,)) 2589 cause = exc.__cause__ 2590 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 2591 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 2592 2593 with test.support.captured_stderr() as f1: 2594 try: 2595 raise exc 2596 except RuntimeError: 2597 sys.excepthook(*sys.exc_info()) 2598 self.assertIn('raise RuntimeError(123) # some comment', 2599 f1.getvalue()) 2600 # _helper_reraises_exception should not make the error 2601 # a remote exception 2602 with self.Pool(1) as p: 2603 try: 2604 p.map(sqr, exception_throwing_generator(1, -1), 1) 2605 except Exception as e: 2606 exc = e 2607 else: 2608 self.fail('expected SayWhenError') 2609 self.assertIs(type(exc), SayWhenError) 2610 self.assertIs(exc.__cause__, None) 2611 p.join() 2612 2613 @classmethod 2614 def _test_wrapped_exception(cls): 2615 raise RuntimeError('foo') 2616 2617 def test_wrapped_exception(self): 2618 # Issue #20980: Should not wrap exception when using thread pool 2619 with self.Pool(1) as p: 2620 with self.assertRaises(RuntimeError): 2621 p.apply(self._test_wrapped_exception) 2622 p.join() 2623 2624 def test_map_no_failfast(self): 2625 # Issue #23992: the fail-fast behaviour when an exception is raised 2626 # during map() would make Pool.join() deadlock, because a worker 2627 # process would fill the result queue (after the result handler thread 2628 # terminated, hence not draining it anymore). 2629 2630 t_start = time.monotonic() 2631 2632 with self.assertRaises(ValueError): 2633 with self.Pool(2) as p: 2634 try: 2635 p.map(raise_large_valuerror, [0, 1]) 2636 finally: 2637 time.sleep(0.5) 2638 p.close() 2639 p.join() 2640 2641 # check that we indeed waited for all jobs 2642 self.assertGreater(time.monotonic() - t_start, 0.9) 2643 2644 def test_release_task_refs(self): 2645 # Issue #29861: task arguments and results should not be kept 2646 # alive after we are done with them. 2647 objs = [CountedObject() for i in range(10)] 2648 refs = [weakref.ref(o) for o in objs] 2649 self.pool.map(identity, objs) 2650 2651 del objs 2652 time.sleep(DELTA) # let threaded cleanup code run 2653 self.assertEqual(set(wr() for wr in refs), {None}) 2654 # With a process pool, copies of the objects are returned, check 2655 # they were released too. 2656 self.assertEqual(CountedObject.n_instances, 0) 2657 2658 def test_enter(self): 2659 if self.TYPE == 'manager': 2660 self.skipTest("test not applicable to manager") 2661 2662 pool = self.Pool(1) 2663 with pool: 2664 pass 2665 # call pool.terminate() 2666 # pool is no longer running 2667 2668 with self.assertRaises(ValueError): 2669 # bpo-35477: pool.__enter__() fails if the pool is not running 2670 with pool: 2671 pass 2672 pool.join() 2673 2674 def test_resource_warning(self): 2675 if self.TYPE == 'manager': 2676 self.skipTest("test not applicable to manager") 2677 2678 pool = self.Pool(1) 2679 pool.terminate() 2680 pool.join() 2681 2682 # force state to RUN to emit ResourceWarning in __del__() 2683 pool._state = multiprocessing.pool.RUN 2684 2685 with support.check_warnings(('unclosed running multiprocessing pool', 2686 ResourceWarning)): 2687 pool = None 2688 support.gc_collect() 2689 2690def raising(): 2691 raise KeyError("key") 2692 2693def unpickleable_result(): 2694 return lambda: 42 2695 2696class _TestPoolWorkerErrors(BaseTestCase): 2697 ALLOWED_TYPES = ('processes', ) 2698 2699 def test_async_error_callback(self): 2700 p = multiprocessing.Pool(2) 2701 2702 scratchpad = [None] 2703 def errback(exc): 2704 scratchpad[0] = exc 2705 2706 res = p.apply_async(raising, error_callback=errback) 2707 self.assertRaises(KeyError, res.get) 2708 self.assertTrue(scratchpad[0]) 2709 self.assertIsInstance(scratchpad[0], KeyError) 2710 2711 p.close() 2712 p.join() 2713 2714 def test_unpickleable_result(self): 2715 from multiprocessing.pool import MaybeEncodingError 2716 p = multiprocessing.Pool(2) 2717 2718 # Make sure we don't lose pool processes because of encoding errors. 2719 for iteration in range(20): 2720 2721 scratchpad = [None] 2722 def errback(exc): 2723 scratchpad[0] = exc 2724 2725 res = p.apply_async(unpickleable_result, error_callback=errback) 2726 self.assertRaises(MaybeEncodingError, res.get) 2727 wrapped = scratchpad[0] 2728 self.assertTrue(wrapped) 2729 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 2730 self.assertIsNotNone(wrapped.exc) 2731 self.assertIsNotNone(wrapped.value) 2732 2733 p.close() 2734 p.join() 2735 2736class _TestPoolWorkerLifetime(BaseTestCase): 2737 ALLOWED_TYPES = ('processes', ) 2738 2739 def test_pool_worker_lifetime(self): 2740 p = multiprocessing.Pool(3, maxtasksperchild=10) 2741 self.assertEqual(3, len(p._pool)) 2742 origworkerpids = [w.pid for w in p._pool] 2743 # Run many tasks so each worker gets replaced (hopefully) 2744 results = [] 2745 for i in range(100): 2746 results.append(p.apply_async(sqr, (i, ))) 2747 # Fetch the results and verify we got the right answers, 2748 # also ensuring all the tasks have completed. 2749 for (j, res) in enumerate(results): 2750 self.assertEqual(res.get(), sqr(j)) 2751 # Refill the pool 2752 p._repopulate_pool() 2753 # Wait until all workers are alive 2754 # (countdown * DELTA = 5 seconds max startup process time) 2755 countdown = 50 2756 while countdown and not all(w.is_alive() for w in p._pool): 2757 countdown -= 1 2758 time.sleep(DELTA) 2759 finalworkerpids = [w.pid for w in p._pool] 2760 # All pids should be assigned. See issue #7805. 2761 self.assertNotIn(None, origworkerpids) 2762 self.assertNotIn(None, finalworkerpids) 2763 # Finally, check that the worker pids have changed 2764 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 2765 p.close() 2766 p.join() 2767 2768 def test_pool_worker_lifetime_early_close(self): 2769 # Issue #10332: closing a pool whose workers have limited lifetimes 2770 # before all the tasks completed would make join() hang. 2771 p = multiprocessing.Pool(3, maxtasksperchild=1) 2772 results = [] 2773 for i in range(6): 2774 results.append(p.apply_async(sqr, (i, 0.3))) 2775 p.close() 2776 p.join() 2777 # check the results 2778 for (j, res) in enumerate(results): 2779 self.assertEqual(res.get(), sqr(j)) 2780 2781 def test_worker_finalization_via_atexit_handler_of_multiprocessing(self): 2782 # tests cases against bpo-38744 and bpo-39360 2783 cmd = '''if 1: 2784 from multiprocessing import Pool 2785 problem = None 2786 class A: 2787 def __init__(self): 2788 self.pool = Pool(processes=1) 2789 def test(): 2790 global problem 2791 problem = A() 2792 problem.pool.map(float, tuple(range(10))) 2793 if __name__ == "__main__": 2794 test() 2795 ''' 2796 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 2797 self.assertEqual(rc, 0) 2798 2799# 2800# Test of creating a customized manager class 2801# 2802 2803from multiprocessing.managers import BaseManager, BaseProxy, RemoteError 2804 2805class FooBar(object): 2806 def f(self): 2807 return 'f()' 2808 def g(self): 2809 raise ValueError 2810 def _h(self): 2811 return '_h()' 2812 2813def baz(): 2814 for i in range(10): 2815 yield i*i 2816 2817class IteratorProxy(BaseProxy): 2818 _exposed_ = ('__next__',) 2819 def __iter__(self): 2820 return self 2821 def __next__(self): 2822 return self._callmethod('__next__') 2823 2824class MyManager(BaseManager): 2825 pass 2826 2827MyManager.register('Foo', callable=FooBar) 2828MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2829MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2830 2831 2832class _TestMyManager(BaseTestCase): 2833 2834 ALLOWED_TYPES = ('manager',) 2835 2836 def test_mymanager(self): 2837 manager = MyManager() 2838 manager.start() 2839 self.common(manager) 2840 manager.shutdown() 2841 2842 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2843 # to the manager process if it takes longer than 1 second to stop, 2844 # which happens on slow buildbots. 2845 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2846 2847 def test_mymanager_context(self): 2848 with MyManager() as manager: 2849 self.common(manager) 2850 # bpo-30356: BaseManager._finalize_manager() sends SIGTERM 2851 # to the manager process if it takes longer than 1 second to stop, 2852 # which happens on slow buildbots. 2853 self.assertIn(manager._process.exitcode, (0, -signal.SIGTERM)) 2854 2855 def test_mymanager_context_prestarted(self): 2856 manager = MyManager() 2857 manager.start() 2858 with manager: 2859 self.common(manager) 2860 self.assertEqual(manager._process.exitcode, 0) 2861 2862 def common(self, manager): 2863 foo = manager.Foo() 2864 bar = manager.Bar() 2865 baz = manager.baz() 2866 2867 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2868 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2869 2870 self.assertEqual(foo_methods, ['f', 'g']) 2871 self.assertEqual(bar_methods, ['f', '_h']) 2872 2873 self.assertEqual(foo.f(), 'f()') 2874 self.assertRaises(ValueError, foo.g) 2875 self.assertEqual(foo._callmethod('f'), 'f()') 2876 self.assertRaises(RemoteError, foo._callmethod, '_h') 2877 2878 self.assertEqual(bar.f(), 'f()') 2879 self.assertEqual(bar._h(), '_h()') 2880 self.assertEqual(bar._callmethod('f'), 'f()') 2881 self.assertEqual(bar._callmethod('_h'), '_h()') 2882 2883 self.assertEqual(list(baz), [i*i for i in range(10)]) 2884 2885 2886# 2887# Test of connecting to a remote server and using xmlrpclib for serialization 2888# 2889 2890_queue = pyqueue.Queue() 2891def get_queue(): 2892 return _queue 2893 2894class QueueManager(BaseManager): 2895 '''manager class used by server process''' 2896QueueManager.register('get_queue', callable=get_queue) 2897 2898class QueueManager2(BaseManager): 2899 '''manager class which specifies the same interface as QueueManager''' 2900QueueManager2.register('get_queue') 2901 2902 2903SERIALIZER = 'xmlrpclib' 2904 2905class _TestRemoteManager(BaseTestCase): 2906 2907 ALLOWED_TYPES = ('manager',) 2908 values = ['hello world', None, True, 2.25, 2909 'hall\xe5 v\xe4rlden', 2910 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2911 b'hall\xe5 v\xe4rlden', 2912 ] 2913 result = values[:] 2914 2915 @classmethod 2916 def _putter(cls, address, authkey): 2917 manager = QueueManager2( 2918 address=address, authkey=authkey, serializer=SERIALIZER 2919 ) 2920 manager.connect() 2921 queue = manager.get_queue() 2922 # Note that xmlrpclib will deserialize object as a list not a tuple 2923 queue.put(tuple(cls.values)) 2924 2925 def test_remote(self): 2926 authkey = os.urandom(32) 2927 2928 manager = QueueManager( 2929 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2930 ) 2931 manager.start() 2932 self.addCleanup(manager.shutdown) 2933 2934 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2935 p.daemon = True 2936 p.start() 2937 2938 manager2 = QueueManager2( 2939 address=manager.address, authkey=authkey, serializer=SERIALIZER 2940 ) 2941 manager2.connect() 2942 queue = manager2.get_queue() 2943 2944 self.assertEqual(queue.get(), self.result) 2945 2946 # Because we are using xmlrpclib for serialization instead of 2947 # pickle this will cause a serialization error. 2948 self.assertRaises(Exception, queue.put, time.sleep) 2949 2950 # Make queue finalizer run before the server is stopped 2951 del queue 2952 2953class _TestManagerRestart(BaseTestCase): 2954 2955 @classmethod 2956 def _putter(cls, address, authkey): 2957 manager = QueueManager( 2958 address=address, authkey=authkey, serializer=SERIALIZER) 2959 manager.connect() 2960 queue = manager.get_queue() 2961 queue.put('hello world') 2962 2963 def test_rapid_restart(self): 2964 authkey = os.urandom(32) 2965 manager = QueueManager( 2966 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2967 try: 2968 srvr = manager.get_server() 2969 addr = srvr.address 2970 # Close the connection.Listener socket which gets opened as a part 2971 # of manager.get_server(). It's not needed for the test. 2972 srvr.listener.close() 2973 manager.start() 2974 2975 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2976 p.start() 2977 p.join() 2978 queue = manager.get_queue() 2979 self.assertEqual(queue.get(), 'hello world') 2980 del queue 2981 finally: 2982 if hasattr(manager, "shutdown"): 2983 manager.shutdown() 2984 2985 manager = QueueManager( 2986 address=addr, authkey=authkey, serializer=SERIALIZER) 2987 try: 2988 manager.start() 2989 self.addCleanup(manager.shutdown) 2990 except OSError as e: 2991 if e.errno != errno.EADDRINUSE: 2992 raise 2993 # Retry after some time, in case the old socket was lingering 2994 # (sporadic failure on buildbots) 2995 time.sleep(1.0) 2996 manager = QueueManager( 2997 address=addr, authkey=authkey, serializer=SERIALIZER) 2998 if hasattr(manager, "shutdown"): 2999 self.addCleanup(manager.shutdown) 3000 3001# 3002# 3003# 3004 3005SENTINEL = latin('') 3006 3007class _TestConnection(BaseTestCase): 3008 3009 ALLOWED_TYPES = ('processes', 'threads') 3010 3011 @classmethod 3012 def _echo(cls, conn): 3013 for msg in iter(conn.recv_bytes, SENTINEL): 3014 conn.send_bytes(msg) 3015 conn.close() 3016 3017 def test_connection(self): 3018 conn, child_conn = self.Pipe() 3019 3020 p = self.Process(target=self._echo, args=(child_conn,)) 3021 p.daemon = True 3022 p.start() 3023 3024 seq = [1, 2.25, None] 3025 msg = latin('hello world') 3026 longmsg = msg * 10 3027 arr = array.array('i', list(range(4))) 3028 3029 if self.TYPE == 'processes': 3030 self.assertEqual(type(conn.fileno()), int) 3031 3032 self.assertEqual(conn.send(seq), None) 3033 self.assertEqual(conn.recv(), seq) 3034 3035 self.assertEqual(conn.send_bytes(msg), None) 3036 self.assertEqual(conn.recv_bytes(), msg) 3037 3038 if self.TYPE == 'processes': 3039 buffer = array.array('i', [0]*10) 3040 expected = list(arr) + [0] * (10 - len(arr)) 3041 self.assertEqual(conn.send_bytes(arr), None) 3042 self.assertEqual(conn.recv_bytes_into(buffer), 3043 len(arr) * buffer.itemsize) 3044 self.assertEqual(list(buffer), expected) 3045 3046 buffer = array.array('i', [0]*10) 3047 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 3048 self.assertEqual(conn.send_bytes(arr), None) 3049 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 3050 len(arr) * buffer.itemsize) 3051 self.assertEqual(list(buffer), expected) 3052 3053 buffer = bytearray(latin(' ' * 40)) 3054 self.assertEqual(conn.send_bytes(longmsg), None) 3055 try: 3056 res = conn.recv_bytes_into(buffer) 3057 except multiprocessing.BufferTooShort as e: 3058 self.assertEqual(e.args, (longmsg,)) 3059 else: 3060 self.fail('expected BufferTooShort, got %s' % res) 3061 3062 poll = TimingWrapper(conn.poll) 3063 3064 self.assertEqual(poll(), False) 3065 self.assertTimingAlmostEqual(poll.elapsed, 0) 3066 3067 self.assertEqual(poll(-1), False) 3068 self.assertTimingAlmostEqual(poll.elapsed, 0) 3069 3070 self.assertEqual(poll(TIMEOUT1), False) 3071 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 3072 3073 conn.send(None) 3074 time.sleep(.1) 3075 3076 self.assertEqual(poll(TIMEOUT1), True) 3077 self.assertTimingAlmostEqual(poll.elapsed, 0) 3078 3079 self.assertEqual(conn.recv(), None) 3080 3081 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 3082 conn.send_bytes(really_big_msg) 3083 self.assertEqual(conn.recv_bytes(), really_big_msg) 3084 3085 conn.send_bytes(SENTINEL) # tell child to quit 3086 child_conn.close() 3087 3088 if self.TYPE == 'processes': 3089 self.assertEqual(conn.readable, True) 3090 self.assertEqual(conn.writable, True) 3091 self.assertRaises(EOFError, conn.recv) 3092 self.assertRaises(EOFError, conn.recv_bytes) 3093 3094 p.join() 3095 3096 def test_duplex_false(self): 3097 reader, writer = self.Pipe(duplex=False) 3098 self.assertEqual(writer.send(1), None) 3099 self.assertEqual(reader.recv(), 1) 3100 if self.TYPE == 'processes': 3101 self.assertEqual(reader.readable, True) 3102 self.assertEqual(reader.writable, False) 3103 self.assertEqual(writer.readable, False) 3104 self.assertEqual(writer.writable, True) 3105 self.assertRaises(OSError, reader.send, 2) 3106 self.assertRaises(OSError, writer.recv) 3107 self.assertRaises(OSError, writer.poll) 3108 3109 def test_spawn_close(self): 3110 # We test that a pipe connection can be closed by parent 3111 # process immediately after child is spawned. On Windows this 3112 # would have sometimes failed on old versions because 3113 # child_conn would be closed before the child got a chance to 3114 # duplicate it. 3115 conn, child_conn = self.Pipe() 3116 3117 p = self.Process(target=self._echo, args=(child_conn,)) 3118 p.daemon = True 3119 p.start() 3120 child_conn.close() # this might complete before child initializes 3121 3122 msg = latin('hello') 3123 conn.send_bytes(msg) 3124 self.assertEqual(conn.recv_bytes(), msg) 3125 3126 conn.send_bytes(SENTINEL) 3127 conn.close() 3128 p.join() 3129 3130 def test_sendbytes(self): 3131 if self.TYPE != 'processes': 3132 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 3133 3134 msg = latin('abcdefghijklmnopqrstuvwxyz') 3135 a, b = self.Pipe() 3136 3137 a.send_bytes(msg) 3138 self.assertEqual(b.recv_bytes(), msg) 3139 3140 a.send_bytes(msg, 5) 3141 self.assertEqual(b.recv_bytes(), msg[5:]) 3142 3143 a.send_bytes(msg, 7, 8) 3144 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 3145 3146 a.send_bytes(msg, 26) 3147 self.assertEqual(b.recv_bytes(), latin('')) 3148 3149 a.send_bytes(msg, 26, 0) 3150 self.assertEqual(b.recv_bytes(), latin('')) 3151 3152 self.assertRaises(ValueError, a.send_bytes, msg, 27) 3153 3154 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 3155 3156 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 3157 3158 self.assertRaises(ValueError, a.send_bytes, msg, -1) 3159 3160 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 3161 3162 @classmethod 3163 def _is_fd_assigned(cls, fd): 3164 try: 3165 os.fstat(fd) 3166 except OSError as e: 3167 if e.errno == errno.EBADF: 3168 return False 3169 raise 3170 else: 3171 return True 3172 3173 @classmethod 3174 def _writefd(cls, conn, data, create_dummy_fds=False): 3175 if create_dummy_fds: 3176 for i in range(0, 256): 3177 if not cls._is_fd_assigned(i): 3178 os.dup2(conn.fileno(), i) 3179 fd = reduction.recv_handle(conn) 3180 if msvcrt: 3181 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 3182 os.write(fd, data) 3183 os.close(fd) 3184 3185 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3186 def test_fd_transfer(self): 3187 if self.TYPE != 'processes': 3188 self.skipTest("only makes sense with processes") 3189 conn, child_conn = self.Pipe(duplex=True) 3190 3191 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 3192 p.daemon = True 3193 p.start() 3194 self.addCleanup(test.support.unlink, test.support.TESTFN) 3195 with open(test.support.TESTFN, "wb") as f: 3196 fd = f.fileno() 3197 if msvcrt: 3198 fd = msvcrt.get_osfhandle(fd) 3199 reduction.send_handle(conn, fd, p.pid) 3200 p.join() 3201 with open(test.support.TESTFN, "rb") as f: 3202 self.assertEqual(f.read(), b"foo") 3203 3204 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3205 @unittest.skipIf(sys.platform == "win32", 3206 "test semantics don't make sense on Windows") 3207 @unittest.skipIf(MAXFD <= 256, 3208 "largest assignable fd number is too small") 3209 @unittest.skipUnless(hasattr(os, "dup2"), 3210 "test needs os.dup2()") 3211 def test_large_fd_transfer(self): 3212 # With fd > 256 (issue #11657) 3213 if self.TYPE != 'processes': 3214 self.skipTest("only makes sense with processes") 3215 conn, child_conn = self.Pipe(duplex=True) 3216 3217 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 3218 p.daemon = True 3219 p.start() 3220 self.addCleanup(test.support.unlink, test.support.TESTFN) 3221 with open(test.support.TESTFN, "wb") as f: 3222 fd = f.fileno() 3223 for newfd in range(256, MAXFD): 3224 if not self._is_fd_assigned(newfd): 3225 break 3226 else: 3227 self.fail("could not find an unassigned large file descriptor") 3228 os.dup2(fd, newfd) 3229 try: 3230 reduction.send_handle(conn, newfd, p.pid) 3231 finally: 3232 os.close(newfd) 3233 p.join() 3234 with open(test.support.TESTFN, "rb") as f: 3235 self.assertEqual(f.read(), b"bar") 3236 3237 @classmethod 3238 def _send_data_without_fd(self, conn): 3239 os.write(conn.fileno(), b"\0") 3240 3241 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3242 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 3243 def test_missing_fd_transfer(self): 3244 # Check that exception is raised when received data is not 3245 # accompanied by a file descriptor in ancillary data. 3246 if self.TYPE != 'processes': 3247 self.skipTest("only makes sense with processes") 3248 conn, child_conn = self.Pipe(duplex=True) 3249 3250 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 3251 p.daemon = True 3252 p.start() 3253 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 3254 p.join() 3255 3256 def test_context(self): 3257 a, b = self.Pipe() 3258 3259 with a, b: 3260 a.send(1729) 3261 self.assertEqual(b.recv(), 1729) 3262 if self.TYPE == 'processes': 3263 self.assertFalse(a.closed) 3264 self.assertFalse(b.closed) 3265 3266 if self.TYPE == 'processes': 3267 self.assertTrue(a.closed) 3268 self.assertTrue(b.closed) 3269 self.assertRaises(OSError, a.recv) 3270 self.assertRaises(OSError, b.recv) 3271 3272class _TestListener(BaseTestCase): 3273 3274 ALLOWED_TYPES = ('processes',) 3275 3276 def test_multiple_bind(self): 3277 for family in self.connection.families: 3278 l = self.connection.Listener(family=family) 3279 self.addCleanup(l.close) 3280 self.assertRaises(OSError, self.connection.Listener, 3281 l.address, family) 3282 3283 def test_context(self): 3284 with self.connection.Listener() as l: 3285 with self.connection.Client(l.address) as c: 3286 with l.accept() as d: 3287 c.send(1729) 3288 self.assertEqual(d.recv(), 1729) 3289 3290 if self.TYPE == 'processes': 3291 self.assertRaises(OSError, l.accept) 3292 3293 @unittest.skipUnless(util.abstract_sockets_supported, 3294 "test needs abstract socket support") 3295 def test_abstract_socket(self): 3296 with self.connection.Listener("\0something") as listener: 3297 with self.connection.Client(listener.address) as client: 3298 with listener.accept() as d: 3299 client.send(1729) 3300 self.assertEqual(d.recv(), 1729) 3301 3302 if self.TYPE == 'processes': 3303 self.assertRaises(OSError, listener.accept) 3304 3305 3306class _TestListenerClient(BaseTestCase): 3307 3308 ALLOWED_TYPES = ('processes', 'threads') 3309 3310 @classmethod 3311 def _test(cls, address): 3312 conn = cls.connection.Client(address) 3313 conn.send('hello') 3314 conn.close() 3315 3316 def test_listener_client(self): 3317 for family in self.connection.families: 3318 l = self.connection.Listener(family=family) 3319 p = self.Process(target=self._test, args=(l.address,)) 3320 p.daemon = True 3321 p.start() 3322 conn = l.accept() 3323 self.assertEqual(conn.recv(), 'hello') 3324 p.join() 3325 l.close() 3326 3327 def test_issue14725(self): 3328 l = self.connection.Listener() 3329 p = self.Process(target=self._test, args=(l.address,)) 3330 p.daemon = True 3331 p.start() 3332 time.sleep(1) 3333 # On Windows the client process should by now have connected, 3334 # written data and closed the pipe handle by now. This causes 3335 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 3336 # 14725. 3337 conn = l.accept() 3338 self.assertEqual(conn.recv(), 'hello') 3339 conn.close() 3340 p.join() 3341 l.close() 3342 3343 def test_issue16955(self): 3344 for fam in self.connection.families: 3345 l = self.connection.Listener(family=fam) 3346 c = self.connection.Client(l.address) 3347 a = l.accept() 3348 a.send_bytes(b"hello") 3349 self.assertTrue(c.poll(1)) 3350 a.close() 3351 c.close() 3352 l.close() 3353 3354class _TestPoll(BaseTestCase): 3355 3356 ALLOWED_TYPES = ('processes', 'threads') 3357 3358 def test_empty_string(self): 3359 a, b = self.Pipe() 3360 self.assertEqual(a.poll(), False) 3361 b.send_bytes(b'') 3362 self.assertEqual(a.poll(), True) 3363 self.assertEqual(a.poll(), True) 3364 3365 @classmethod 3366 def _child_strings(cls, conn, strings): 3367 for s in strings: 3368 time.sleep(0.1) 3369 conn.send_bytes(s) 3370 conn.close() 3371 3372 def test_strings(self): 3373 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 3374 a, b = self.Pipe() 3375 p = self.Process(target=self._child_strings, args=(b, strings)) 3376 p.start() 3377 3378 for s in strings: 3379 for i in range(200): 3380 if a.poll(0.01): 3381 break 3382 x = a.recv_bytes() 3383 self.assertEqual(s, x) 3384 3385 p.join() 3386 3387 @classmethod 3388 def _child_boundaries(cls, r): 3389 # Polling may "pull" a message in to the child process, but we 3390 # don't want it to pull only part of a message, as that would 3391 # corrupt the pipe for any other processes which might later 3392 # read from it. 3393 r.poll(5) 3394 3395 def test_boundaries(self): 3396 r, w = self.Pipe(False) 3397 p = self.Process(target=self._child_boundaries, args=(r,)) 3398 p.start() 3399 time.sleep(2) 3400 L = [b"first", b"second"] 3401 for obj in L: 3402 w.send_bytes(obj) 3403 w.close() 3404 p.join() 3405 self.assertIn(r.recv_bytes(), L) 3406 3407 @classmethod 3408 def _child_dont_merge(cls, b): 3409 b.send_bytes(b'a') 3410 b.send_bytes(b'b') 3411 b.send_bytes(b'cd') 3412 3413 def test_dont_merge(self): 3414 a, b = self.Pipe() 3415 self.assertEqual(a.poll(0.0), False) 3416 self.assertEqual(a.poll(0.1), False) 3417 3418 p = self.Process(target=self._child_dont_merge, args=(b,)) 3419 p.start() 3420 3421 self.assertEqual(a.recv_bytes(), b'a') 3422 self.assertEqual(a.poll(1.0), True) 3423 self.assertEqual(a.poll(1.0), True) 3424 self.assertEqual(a.recv_bytes(), b'b') 3425 self.assertEqual(a.poll(1.0), True) 3426 self.assertEqual(a.poll(1.0), True) 3427 self.assertEqual(a.poll(0.0), True) 3428 self.assertEqual(a.recv_bytes(), b'cd') 3429 3430 p.join() 3431 3432# 3433# Test of sending connection and socket objects between processes 3434# 3435 3436@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 3437class _TestPicklingConnections(BaseTestCase): 3438 3439 ALLOWED_TYPES = ('processes',) 3440 3441 @classmethod 3442 def tearDownClass(cls): 3443 from multiprocessing import resource_sharer 3444 resource_sharer.stop(timeout=TIMEOUT) 3445 3446 @classmethod 3447 def _listener(cls, conn, families): 3448 for fam in families: 3449 l = cls.connection.Listener(family=fam) 3450 conn.send(l.address) 3451 new_conn = l.accept() 3452 conn.send(new_conn) 3453 new_conn.close() 3454 l.close() 3455 3456 l = socket.create_server((test.support.HOST, 0)) 3457 conn.send(l.getsockname()) 3458 new_conn, addr = l.accept() 3459 conn.send(new_conn) 3460 new_conn.close() 3461 l.close() 3462 3463 conn.recv() 3464 3465 @classmethod 3466 def _remote(cls, conn): 3467 for (address, msg) in iter(conn.recv, None): 3468 client = cls.connection.Client(address) 3469 client.send(msg.upper()) 3470 client.close() 3471 3472 address, msg = conn.recv() 3473 client = socket.socket() 3474 client.connect(address) 3475 client.sendall(msg.upper()) 3476 client.close() 3477 3478 conn.close() 3479 3480 def test_pickling(self): 3481 families = self.connection.families 3482 3483 lconn, lconn0 = self.Pipe() 3484 lp = self.Process(target=self._listener, args=(lconn0, families)) 3485 lp.daemon = True 3486 lp.start() 3487 lconn0.close() 3488 3489 rconn, rconn0 = self.Pipe() 3490 rp = self.Process(target=self._remote, args=(rconn0,)) 3491 rp.daemon = True 3492 rp.start() 3493 rconn0.close() 3494 3495 for fam in families: 3496 msg = ('This connection uses family %s' % fam).encode('ascii') 3497 address = lconn.recv() 3498 rconn.send((address, msg)) 3499 new_conn = lconn.recv() 3500 self.assertEqual(new_conn.recv(), msg.upper()) 3501 3502 rconn.send(None) 3503 3504 msg = latin('This connection uses a normal socket') 3505 address = lconn.recv() 3506 rconn.send((address, msg)) 3507 new_conn = lconn.recv() 3508 buf = [] 3509 while True: 3510 s = new_conn.recv(100) 3511 if not s: 3512 break 3513 buf.append(s) 3514 buf = b''.join(buf) 3515 self.assertEqual(buf, msg.upper()) 3516 new_conn.close() 3517 3518 lconn.send(None) 3519 3520 rconn.close() 3521 lconn.close() 3522 3523 lp.join() 3524 rp.join() 3525 3526 @classmethod 3527 def child_access(cls, conn): 3528 w = conn.recv() 3529 w.send('all is well') 3530 w.close() 3531 3532 r = conn.recv() 3533 msg = r.recv() 3534 conn.send(msg*2) 3535 3536 conn.close() 3537 3538 def test_access(self): 3539 # On Windows, if we do not specify a destination pid when 3540 # using DupHandle then we need to be careful to use the 3541 # correct access flags for DuplicateHandle(), or else 3542 # DupHandle.detach() will raise PermissionError. For example, 3543 # for a read only pipe handle we should use 3544 # access=FILE_GENERIC_READ. (Unfortunately 3545 # DUPLICATE_SAME_ACCESS does not work.) 3546 conn, child_conn = self.Pipe() 3547 p = self.Process(target=self.child_access, args=(child_conn,)) 3548 p.daemon = True 3549 p.start() 3550 child_conn.close() 3551 3552 r, w = self.Pipe(duplex=False) 3553 conn.send(w) 3554 w.close() 3555 self.assertEqual(r.recv(), 'all is well') 3556 r.close() 3557 3558 r, w = self.Pipe(duplex=False) 3559 conn.send(r) 3560 r.close() 3561 w.send('foobar') 3562 w.close() 3563 self.assertEqual(conn.recv(), 'foobar'*2) 3564 3565 p.join() 3566 3567# 3568# 3569# 3570 3571class _TestHeap(BaseTestCase): 3572 3573 ALLOWED_TYPES = ('processes',) 3574 3575 def setUp(self): 3576 super().setUp() 3577 # Make pristine heap for these tests 3578 self.old_heap = multiprocessing.heap.BufferWrapper._heap 3579 multiprocessing.heap.BufferWrapper._heap = multiprocessing.heap.Heap() 3580 3581 def tearDown(self): 3582 multiprocessing.heap.BufferWrapper._heap = self.old_heap 3583 super().tearDown() 3584 3585 def test_heap(self): 3586 iterations = 5000 3587 maxblocks = 50 3588 blocks = [] 3589 3590 # get the heap object 3591 heap = multiprocessing.heap.BufferWrapper._heap 3592 heap._DISCARD_FREE_SPACE_LARGER_THAN = 0 3593 3594 # create and destroy lots of blocks of different sizes 3595 for i in range(iterations): 3596 size = int(random.lognormvariate(0, 1) * 1000) 3597 b = multiprocessing.heap.BufferWrapper(size) 3598 blocks.append(b) 3599 if len(blocks) > maxblocks: 3600 i = random.randrange(maxblocks) 3601 del blocks[i] 3602 del b 3603 3604 # verify the state of the heap 3605 with heap._lock: 3606 all = [] 3607 free = 0 3608 occupied = 0 3609 for L in list(heap._len_to_seq.values()): 3610 # count all free blocks in arenas 3611 for arena, start, stop in L: 3612 all.append((heap._arenas.index(arena), start, stop, 3613 stop-start, 'free')) 3614 free += (stop-start) 3615 for arena, arena_blocks in heap._allocated_blocks.items(): 3616 # count all allocated blocks in arenas 3617 for start, stop in arena_blocks: 3618 all.append((heap._arenas.index(arena), start, stop, 3619 stop-start, 'occupied')) 3620 occupied += (stop-start) 3621 3622 self.assertEqual(free + occupied, 3623 sum(arena.size for arena in heap._arenas)) 3624 3625 all.sort() 3626 3627 for i in range(len(all)-1): 3628 (arena, start, stop) = all[i][:3] 3629 (narena, nstart, nstop) = all[i+1][:3] 3630 if arena != narena: 3631 # Two different arenas 3632 self.assertEqual(stop, heap._arenas[arena].size) # last block 3633 self.assertEqual(nstart, 0) # first block 3634 else: 3635 # Same arena: two adjacent blocks 3636 self.assertEqual(stop, nstart) 3637 3638 # test free'ing all blocks 3639 random.shuffle(blocks) 3640 while blocks: 3641 blocks.pop() 3642 3643 self.assertEqual(heap._n_frees, heap._n_mallocs) 3644 self.assertEqual(len(heap._pending_free_blocks), 0) 3645 self.assertEqual(len(heap._arenas), 0) 3646 self.assertEqual(len(heap._allocated_blocks), 0, heap._allocated_blocks) 3647 self.assertEqual(len(heap._len_to_seq), 0) 3648 3649 def test_free_from_gc(self): 3650 # Check that freeing of blocks by the garbage collector doesn't deadlock 3651 # (issue #12352). 3652 # Make sure the GC is enabled, and set lower collection thresholds to 3653 # make collections more frequent (and increase the probability of 3654 # deadlock). 3655 if not gc.isenabled(): 3656 gc.enable() 3657 self.addCleanup(gc.disable) 3658 thresholds = gc.get_threshold() 3659 self.addCleanup(gc.set_threshold, *thresholds) 3660 gc.set_threshold(10) 3661 3662 # perform numerous block allocations, with cyclic references to make 3663 # sure objects are collected asynchronously by the gc 3664 for i in range(5000): 3665 a = multiprocessing.heap.BufferWrapper(1) 3666 b = multiprocessing.heap.BufferWrapper(1) 3667 # circular references 3668 a.buddy = b 3669 b.buddy = a 3670 3671# 3672# 3673# 3674 3675class _Foo(Structure): 3676 _fields_ = [ 3677 ('x', c_int), 3678 ('y', c_double), 3679 ('z', c_longlong,) 3680 ] 3681 3682class _TestSharedCTypes(BaseTestCase): 3683 3684 ALLOWED_TYPES = ('processes',) 3685 3686 def setUp(self): 3687 if not HAS_SHAREDCTYPES: 3688 self.skipTest("requires multiprocessing.sharedctypes") 3689 3690 @classmethod 3691 def _double(cls, x, y, z, foo, arr, string): 3692 x.value *= 2 3693 y.value *= 2 3694 z.value *= 2 3695 foo.x *= 2 3696 foo.y *= 2 3697 string.value *= 2 3698 for i in range(len(arr)): 3699 arr[i] *= 2 3700 3701 def test_sharedctypes(self, lock=False): 3702 x = Value('i', 7, lock=lock) 3703 y = Value(c_double, 1.0/3.0, lock=lock) 3704 z = Value(c_longlong, 2 ** 33, lock=lock) 3705 foo = Value(_Foo, 3, 2, lock=lock) 3706 arr = self.Array('d', list(range(10)), lock=lock) 3707 string = self.Array('c', 20, lock=lock) 3708 string.value = latin('hello') 3709 3710 p = self.Process(target=self._double, args=(x, y, z, foo, arr, string)) 3711 p.daemon = True 3712 p.start() 3713 p.join() 3714 3715 self.assertEqual(x.value, 14) 3716 self.assertAlmostEqual(y.value, 2.0/3.0) 3717 self.assertEqual(z.value, 2 ** 34) 3718 self.assertEqual(foo.x, 6) 3719 self.assertAlmostEqual(foo.y, 4.0) 3720 for i in range(10): 3721 self.assertAlmostEqual(arr[i], i*2) 3722 self.assertEqual(string.value, latin('hellohello')) 3723 3724 def test_synchronize(self): 3725 self.test_sharedctypes(lock=True) 3726 3727 def test_copy(self): 3728 foo = _Foo(2, 5.0, 2 ** 33) 3729 bar = copy(foo) 3730 foo.x = 0 3731 foo.y = 0 3732 foo.z = 0 3733 self.assertEqual(bar.x, 2) 3734 self.assertAlmostEqual(bar.y, 5.0) 3735 self.assertEqual(bar.z, 2 ** 33) 3736 3737 3738@unittest.skipUnless(HAS_SHMEM, "requires multiprocessing.shared_memory") 3739class _TestSharedMemory(BaseTestCase): 3740 3741 ALLOWED_TYPES = ('processes',) 3742 3743 @staticmethod 3744 def _attach_existing_shmem_then_write(shmem_name_or_obj, binary_data): 3745 if isinstance(shmem_name_or_obj, str): 3746 local_sms = shared_memory.SharedMemory(shmem_name_or_obj) 3747 else: 3748 local_sms = shmem_name_or_obj 3749 local_sms.buf[:len(binary_data)] = binary_data 3750 local_sms.close() 3751 3752 def test_shared_memory_basics(self): 3753 sms = shared_memory.SharedMemory('test01_tsmb', create=True, size=512) 3754 self.addCleanup(sms.unlink) 3755 3756 # Verify attributes are readable. 3757 self.assertEqual(sms.name, 'test01_tsmb') 3758 self.assertGreaterEqual(sms.size, 512) 3759 self.assertGreaterEqual(len(sms.buf), sms.size) 3760 3761 # Modify contents of shared memory segment through memoryview. 3762 sms.buf[0] = 42 3763 self.assertEqual(sms.buf[0], 42) 3764 3765 # Attach to existing shared memory segment. 3766 also_sms = shared_memory.SharedMemory('test01_tsmb') 3767 self.assertEqual(also_sms.buf[0], 42) 3768 also_sms.close() 3769 3770 # Attach to existing shared memory segment but specify a new size. 3771 same_sms = shared_memory.SharedMemory('test01_tsmb', size=20*sms.size) 3772 self.assertLess(same_sms.size, 20*sms.size) # Size was ignored. 3773 same_sms.close() 3774 3775 if shared_memory._USE_POSIX: 3776 # Posix Shared Memory can only be unlinked once. Here we 3777 # test an implementation detail that is not observed across 3778 # all supported platforms (since WindowsNamedSharedMemory 3779 # manages unlinking on its own and unlink() does nothing). 3780 # True release of shared memory segment does not necessarily 3781 # happen until process exits, depending on the OS platform. 3782 with self.assertRaises(FileNotFoundError): 3783 sms_uno = shared_memory.SharedMemory( 3784 'test01_dblunlink', 3785 create=True, 3786 size=5000 3787 ) 3788 3789 try: 3790 self.assertGreaterEqual(sms_uno.size, 5000) 3791 3792 sms_duo = shared_memory.SharedMemory('test01_dblunlink') 3793 sms_duo.unlink() # First shm_unlink() call. 3794 sms_duo.close() 3795 sms_uno.close() 3796 3797 finally: 3798 sms_uno.unlink() # A second shm_unlink() call is bad. 3799 3800 with self.assertRaises(FileExistsError): 3801 # Attempting to create a new shared memory segment with a 3802 # name that is already in use triggers an exception. 3803 there_can_only_be_one_sms = shared_memory.SharedMemory( 3804 'test01_tsmb', 3805 create=True, 3806 size=512 3807 ) 3808 3809 if shared_memory._USE_POSIX: 3810 # Requesting creation of a shared memory segment with the option 3811 # to attach to an existing segment, if that name is currently in 3812 # use, should not trigger an exception. 3813 # Note: Using a smaller size could possibly cause truncation of 3814 # the existing segment but is OS platform dependent. In the 3815 # case of MacOS/darwin, requesting a smaller size is disallowed. 3816 class OptionalAttachSharedMemory(shared_memory.SharedMemory): 3817 _flags = os.O_CREAT | os.O_RDWR 3818 ok_if_exists_sms = OptionalAttachSharedMemory('test01_tsmb') 3819 self.assertEqual(ok_if_exists_sms.size, sms.size) 3820 ok_if_exists_sms.close() 3821 3822 # Attempting to attach to an existing shared memory segment when 3823 # no segment exists with the supplied name triggers an exception. 3824 with self.assertRaises(FileNotFoundError): 3825 nonexisting_sms = shared_memory.SharedMemory('test01_notthere') 3826 nonexisting_sms.unlink() # Error should occur on prior line. 3827 3828 sms.close() 3829 3830 # Test creating a shared memory segment with negative size 3831 with self.assertRaises(ValueError): 3832 sms_invalid = shared_memory.SharedMemory(create=True, size=-1) 3833 3834 # Test creating a shared memory segment with size 0 3835 with self.assertRaises(ValueError): 3836 sms_invalid = shared_memory.SharedMemory(create=True, size=0) 3837 3838 # Test creating a shared memory segment without size argument 3839 with self.assertRaises(ValueError): 3840 sms_invalid = shared_memory.SharedMemory(create=True) 3841 3842 def test_shared_memory_across_processes(self): 3843 # bpo-40135: don't define shared memory block's name in case of 3844 # the failure when we run multiprocessing tests in parallel. 3845 sms = shared_memory.SharedMemory(create=True, size=512) 3846 self.addCleanup(sms.unlink) 3847 3848 # Verify remote attachment to existing block by name is working. 3849 p = self.Process( 3850 target=self._attach_existing_shmem_then_write, 3851 args=(sms.name, b'howdy') 3852 ) 3853 p.daemon = True 3854 p.start() 3855 p.join() 3856 self.assertEqual(bytes(sms.buf[:5]), b'howdy') 3857 3858 # Verify pickling of SharedMemory instance also works. 3859 p = self.Process( 3860 target=self._attach_existing_shmem_then_write, 3861 args=(sms, b'HELLO') 3862 ) 3863 p.daemon = True 3864 p.start() 3865 p.join() 3866 self.assertEqual(bytes(sms.buf[:5]), b'HELLO') 3867 3868 sms.close() 3869 3870 @unittest.skipIf(os.name != "posix", "not feasible in non-posix platforms") 3871 def test_shared_memory_SharedMemoryServer_ignores_sigint(self): 3872 # bpo-36368: protect SharedMemoryManager server process from 3873 # KeyboardInterrupt signals. 3874 smm = multiprocessing.managers.SharedMemoryManager() 3875 smm.start() 3876 3877 # make sure the manager works properly at the beginning 3878 sl = smm.ShareableList(range(10)) 3879 3880 # the manager's server should ignore KeyboardInterrupt signals, and 3881 # maintain its connection with the current process, and success when 3882 # asked to deliver memory segments. 3883 os.kill(smm._process.pid, signal.SIGINT) 3884 3885 sl2 = smm.ShareableList(range(10)) 3886 3887 # test that the custom signal handler registered in the Manager does 3888 # not affect signal handling in the parent process. 3889 with self.assertRaises(KeyboardInterrupt): 3890 os.kill(os.getpid(), signal.SIGINT) 3891 3892 smm.shutdown() 3893 3894 @unittest.skipIf(os.name != "posix", "resource_tracker is posix only") 3895 def test_shared_memory_SharedMemoryManager_reuses_resource_tracker(self): 3896 # bpo-36867: test that a SharedMemoryManager uses the 3897 # same resource_tracker process as its parent. 3898 cmd = '''if 1: 3899 from multiprocessing.managers import SharedMemoryManager 3900 3901 3902 smm = SharedMemoryManager() 3903 smm.start() 3904 sl = smm.ShareableList(range(10)) 3905 smm.shutdown() 3906 ''' 3907 rc, out, err = test.support.script_helper.assert_python_ok('-c', cmd) 3908 3909 # Before bpo-36867 was fixed, a SharedMemoryManager not using the same 3910 # resource_tracker process as its parent would make the parent's 3911 # tracker complain about sl being leaked even though smm.shutdown() 3912 # properly released sl. 3913 self.assertFalse(err) 3914 3915 def test_shared_memory_SharedMemoryManager_basics(self): 3916 smm1 = multiprocessing.managers.SharedMemoryManager() 3917 with self.assertRaises(ValueError): 3918 smm1.SharedMemory(size=9) # Fails if SharedMemoryServer not started 3919 smm1.start() 3920 lol = [ smm1.ShareableList(range(i)) for i in range(5, 10) ] 3921 lom = [ smm1.SharedMemory(size=j) for j in range(32, 128, 16) ] 3922 doppleganger_list0 = shared_memory.ShareableList(name=lol[0].shm.name) 3923 self.assertEqual(len(doppleganger_list0), 5) 3924 doppleganger_shm0 = shared_memory.SharedMemory(name=lom[0].name) 3925 self.assertGreaterEqual(len(doppleganger_shm0.buf), 32) 3926 held_name = lom[0].name 3927 smm1.shutdown() 3928 if sys.platform != "win32": 3929 # Calls to unlink() have no effect on Windows platform; shared 3930 # memory will only be released once final process exits. 3931 with self.assertRaises(FileNotFoundError): 3932 # No longer there to be attached to again. 3933 absent_shm = shared_memory.SharedMemory(name=held_name) 3934 3935 with multiprocessing.managers.SharedMemoryManager() as smm2: 3936 sl = smm2.ShareableList("howdy") 3937 shm = smm2.SharedMemory(size=128) 3938 held_name = sl.shm.name 3939 if sys.platform != "win32": 3940 with self.assertRaises(FileNotFoundError): 3941 # No longer there to be attached to again. 3942 absent_sl = shared_memory.ShareableList(name=held_name) 3943 3944 3945 def test_shared_memory_ShareableList_basics(self): 3946 sl = shared_memory.ShareableList( 3947 ['howdy', b'HoWdY', -273.154, 100, None, True, 42] 3948 ) 3949 self.addCleanup(sl.shm.unlink) 3950 3951 # Verify attributes are readable. 3952 self.assertEqual(sl.format, '8s8sdqxxxxxx?xxxxxxxx?q') 3953 3954 # Exercise len(). 3955 self.assertEqual(len(sl), 7) 3956 3957 # Exercise index(). 3958 with warnings.catch_warnings(): 3959 # Suppress BytesWarning when comparing against b'HoWdY'. 3960 warnings.simplefilter('ignore') 3961 with self.assertRaises(ValueError): 3962 sl.index('100') 3963 self.assertEqual(sl.index(100), 3) 3964 3965 # Exercise retrieving individual values. 3966 self.assertEqual(sl[0], 'howdy') 3967 self.assertEqual(sl[-2], True) 3968 3969 # Exercise iterability. 3970 self.assertEqual( 3971 tuple(sl), 3972 ('howdy', b'HoWdY', -273.154, 100, None, True, 42) 3973 ) 3974 3975 # Exercise modifying individual values. 3976 sl[3] = 42 3977 self.assertEqual(sl[3], 42) 3978 sl[4] = 'some' # Change type at a given position. 3979 self.assertEqual(sl[4], 'some') 3980 self.assertEqual(sl.format, '8s8sdq8sxxxxxxx?q') 3981 with self.assertRaisesRegex(ValueError, 3982 "exceeds available storage"): 3983 sl[4] = 'far too many' 3984 self.assertEqual(sl[4], 'some') 3985 sl[0] = 'encodés' # Exactly 8 bytes of UTF-8 data 3986 self.assertEqual(sl[0], 'encodés') 3987 self.assertEqual(sl[1], b'HoWdY') # no spillage 3988 with self.assertRaisesRegex(ValueError, 3989 "exceeds available storage"): 3990 sl[0] = 'encodées' # Exactly 9 bytes of UTF-8 data 3991 self.assertEqual(sl[1], b'HoWdY') 3992 with self.assertRaisesRegex(ValueError, 3993 "exceeds available storage"): 3994 sl[1] = b'123456789' 3995 self.assertEqual(sl[1], b'HoWdY') 3996 3997 # Exercise count(). 3998 with warnings.catch_warnings(): 3999 # Suppress BytesWarning when comparing against b'HoWdY'. 4000 warnings.simplefilter('ignore') 4001 self.assertEqual(sl.count(42), 2) 4002 self.assertEqual(sl.count(b'HoWdY'), 1) 4003 self.assertEqual(sl.count(b'adios'), 0) 4004 4005 # Exercise creating a duplicate. 4006 sl_copy = shared_memory.ShareableList(sl, name='test03_duplicate') 4007 try: 4008 self.assertNotEqual(sl.shm.name, sl_copy.shm.name) 4009 self.assertEqual('test03_duplicate', sl_copy.shm.name) 4010 self.assertEqual(list(sl), list(sl_copy)) 4011 self.assertEqual(sl.format, sl_copy.format) 4012 sl_copy[-1] = 77 4013 self.assertEqual(sl_copy[-1], 77) 4014 self.assertNotEqual(sl[-1], 77) 4015 sl_copy.shm.close() 4016 finally: 4017 sl_copy.shm.unlink() 4018 4019 # Obtain a second handle on the same ShareableList. 4020 sl_tethered = shared_memory.ShareableList(name=sl.shm.name) 4021 self.assertEqual(sl.shm.name, sl_tethered.shm.name) 4022 sl_tethered[-1] = 880 4023 self.assertEqual(sl[-1], 880) 4024 sl_tethered.shm.close() 4025 4026 sl.shm.close() 4027 4028 # Exercise creating an empty ShareableList. 4029 empty_sl = shared_memory.ShareableList() 4030 try: 4031 self.assertEqual(len(empty_sl), 0) 4032 self.assertEqual(empty_sl.format, '') 4033 self.assertEqual(empty_sl.count('any'), 0) 4034 with self.assertRaises(ValueError): 4035 empty_sl.index(None) 4036 empty_sl.shm.close() 4037 finally: 4038 empty_sl.shm.unlink() 4039 4040 def test_shared_memory_ShareableList_pickling(self): 4041 sl = shared_memory.ShareableList(range(10)) 4042 self.addCleanup(sl.shm.unlink) 4043 4044 serialized_sl = pickle.dumps(sl) 4045 deserialized_sl = pickle.loads(serialized_sl) 4046 self.assertTrue( 4047 isinstance(deserialized_sl, shared_memory.ShareableList) 4048 ) 4049 self.assertTrue(deserialized_sl[-1], 9) 4050 self.assertFalse(sl is deserialized_sl) 4051 deserialized_sl[4] = "changed" 4052 self.assertEqual(sl[4], "changed") 4053 4054 # Verify data is not being put into the pickled representation. 4055 name = 'a' * len(sl.shm.name) 4056 larger_sl = shared_memory.ShareableList(range(400)) 4057 self.addCleanup(larger_sl.shm.unlink) 4058 serialized_larger_sl = pickle.dumps(larger_sl) 4059 self.assertTrue(len(serialized_sl) == len(serialized_larger_sl)) 4060 larger_sl.shm.close() 4061 4062 deserialized_sl.shm.close() 4063 sl.shm.close() 4064 4065 def test_shared_memory_cleaned_after_process_termination(self): 4066 cmd = '''if 1: 4067 import os, time, sys 4068 from multiprocessing import shared_memory 4069 4070 # Create a shared_memory segment, and send the segment name 4071 sm = shared_memory.SharedMemory(create=True, size=10) 4072 sys.stdout.write(sm.name + '\\n') 4073 sys.stdout.flush() 4074 time.sleep(100) 4075 ''' 4076 with subprocess.Popen([sys.executable, '-E', '-c', cmd], 4077 stdout=subprocess.PIPE, 4078 stderr=subprocess.PIPE) as p: 4079 name = p.stdout.readline().strip().decode() 4080 4081 # killing abruptly processes holding reference to a shared memory 4082 # segment should not leak the given memory segment. 4083 p.terminate() 4084 p.wait() 4085 4086 deadline = time.monotonic() + 60 4087 t = 0.1 4088 while time.monotonic() < deadline: 4089 time.sleep(t) 4090 t = min(t*2, 5) 4091 try: 4092 smm = shared_memory.SharedMemory(name, create=False) 4093 except FileNotFoundError: 4094 break 4095 else: 4096 raise AssertionError("A SharedMemory segment was leaked after" 4097 " a process was abruptly terminated.") 4098 4099 if os.name == 'posix': 4100 # A warning was emitted by the subprocess' own 4101 # resource_tracker (on Windows, shared memory segments 4102 # are released automatically by the OS). 4103 err = p.stderr.read().decode() 4104 self.assertIn( 4105 "resource_tracker: There appear to be 1 leaked " 4106 "shared_memory objects to clean up at shutdown", err) 4107 4108# 4109# 4110# 4111 4112class _TestFinalize(BaseTestCase): 4113 4114 ALLOWED_TYPES = ('processes',) 4115 4116 def setUp(self): 4117 self.registry_backup = util._finalizer_registry.copy() 4118 util._finalizer_registry.clear() 4119 4120 def tearDown(self): 4121 self.assertFalse(util._finalizer_registry) 4122 util._finalizer_registry.update(self.registry_backup) 4123 4124 @classmethod 4125 def _test_finalize(cls, conn): 4126 class Foo(object): 4127 pass 4128 4129 a = Foo() 4130 util.Finalize(a, conn.send, args=('a',)) 4131 del a # triggers callback for a 4132 4133 b = Foo() 4134 close_b = util.Finalize(b, conn.send, args=('b',)) 4135 close_b() # triggers callback for b 4136 close_b() # does nothing because callback has already been called 4137 del b # does nothing because callback has already been called 4138 4139 c = Foo() 4140 util.Finalize(c, conn.send, args=('c',)) 4141 4142 d10 = Foo() 4143 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 4144 4145 d01 = Foo() 4146 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 4147 d02 = Foo() 4148 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 4149 d03 = Foo() 4150 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 4151 4152 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 4153 4154 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 4155 4156 # call multiprocessing's cleanup function then exit process without 4157 # garbage collecting locals 4158 util._exit_function() 4159 conn.close() 4160 os._exit(0) 4161 4162 def test_finalize(self): 4163 conn, child_conn = self.Pipe() 4164 4165 p = self.Process(target=self._test_finalize, args=(child_conn,)) 4166 p.daemon = True 4167 p.start() 4168 p.join() 4169 4170 result = [obj for obj in iter(conn.recv, 'STOP')] 4171 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 4172 4173 def test_thread_safety(self): 4174 # bpo-24484: _run_finalizers() should be thread-safe 4175 def cb(): 4176 pass 4177 4178 class Foo(object): 4179 def __init__(self): 4180 self.ref = self # create reference cycle 4181 # insert finalizer at random key 4182 util.Finalize(self, cb, exitpriority=random.randint(1, 100)) 4183 4184 finish = False 4185 exc = None 4186 4187 def run_finalizers(): 4188 nonlocal exc 4189 while not finish: 4190 time.sleep(random.random() * 1e-1) 4191 try: 4192 # A GC run will eventually happen during this, 4193 # collecting stale Foo's and mutating the registry 4194 util._run_finalizers() 4195 except Exception as e: 4196 exc = e 4197 4198 def make_finalizers(): 4199 nonlocal exc 4200 d = {} 4201 while not finish: 4202 try: 4203 # Old Foo's get gradually replaced and later 4204 # collected by the GC (because of the cyclic ref) 4205 d[random.getrandbits(5)] = {Foo() for i in range(10)} 4206 except Exception as e: 4207 exc = e 4208 d.clear() 4209 4210 old_interval = sys.getswitchinterval() 4211 old_threshold = gc.get_threshold() 4212 try: 4213 sys.setswitchinterval(1e-6) 4214 gc.set_threshold(5, 5, 5) 4215 threads = [threading.Thread(target=run_finalizers), 4216 threading.Thread(target=make_finalizers)] 4217 with test.support.start_threads(threads): 4218 time.sleep(4.0) # Wait a bit to trigger race condition 4219 finish = True 4220 if exc is not None: 4221 raise exc 4222 finally: 4223 sys.setswitchinterval(old_interval) 4224 gc.set_threshold(*old_threshold) 4225 gc.collect() # Collect remaining Foo's 4226 4227 4228# 4229# Test that from ... import * works for each module 4230# 4231 4232class _TestImportStar(unittest.TestCase): 4233 4234 def get_module_names(self): 4235 import glob 4236 folder = os.path.dirname(multiprocessing.__file__) 4237 pattern = os.path.join(glob.escape(folder), '*.py') 4238 files = glob.glob(pattern) 4239 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 4240 modules = ['multiprocessing.' + m for m in modules] 4241 modules.remove('multiprocessing.__init__') 4242 modules.append('multiprocessing') 4243 return modules 4244 4245 def test_import(self): 4246 modules = self.get_module_names() 4247 if sys.platform == 'win32': 4248 modules.remove('multiprocessing.popen_fork') 4249 modules.remove('multiprocessing.popen_forkserver') 4250 modules.remove('multiprocessing.popen_spawn_posix') 4251 else: 4252 modules.remove('multiprocessing.popen_spawn_win32') 4253 if not HAS_REDUCTION: 4254 modules.remove('multiprocessing.popen_forkserver') 4255 4256 if c_int is None: 4257 # This module requires _ctypes 4258 modules.remove('multiprocessing.sharedctypes') 4259 4260 for name in modules: 4261 __import__(name) 4262 mod = sys.modules[name] 4263 self.assertTrue(hasattr(mod, '__all__'), name) 4264 4265 for attr in mod.__all__: 4266 self.assertTrue( 4267 hasattr(mod, attr), 4268 '%r does not have attribute %r' % (mod, attr) 4269 ) 4270 4271# 4272# Quick test that logging works -- does not test logging output 4273# 4274 4275class _TestLogging(BaseTestCase): 4276 4277 ALLOWED_TYPES = ('processes',) 4278 4279 def test_enable_logging(self): 4280 logger = multiprocessing.get_logger() 4281 logger.setLevel(util.SUBWARNING) 4282 self.assertTrue(logger is not None) 4283 logger.debug('this will not be printed') 4284 logger.info('nor will this') 4285 logger.setLevel(LOG_LEVEL) 4286 4287 @classmethod 4288 def _test_level(cls, conn): 4289 logger = multiprocessing.get_logger() 4290 conn.send(logger.getEffectiveLevel()) 4291 4292 def test_level(self): 4293 LEVEL1 = 32 4294 LEVEL2 = 37 4295 4296 logger = multiprocessing.get_logger() 4297 root_logger = logging.getLogger() 4298 root_level = root_logger.level 4299 4300 reader, writer = multiprocessing.Pipe(duplex=False) 4301 4302 logger.setLevel(LEVEL1) 4303 p = self.Process(target=self._test_level, args=(writer,)) 4304 p.start() 4305 self.assertEqual(LEVEL1, reader.recv()) 4306 p.join() 4307 p.close() 4308 4309 logger.setLevel(logging.NOTSET) 4310 root_logger.setLevel(LEVEL2) 4311 p = self.Process(target=self._test_level, args=(writer,)) 4312 p.start() 4313 self.assertEqual(LEVEL2, reader.recv()) 4314 p.join() 4315 p.close() 4316 4317 root_logger.setLevel(root_level) 4318 logger.setLevel(level=LOG_LEVEL) 4319 4320 4321# class _TestLoggingProcessName(BaseTestCase): 4322# 4323# def handle(self, record): 4324# assert record.processName == multiprocessing.current_process().name 4325# self.__handled = True 4326# 4327# def test_logging(self): 4328# handler = logging.Handler() 4329# handler.handle = self.handle 4330# self.__handled = False 4331# # Bypass getLogger() and side-effects 4332# logger = logging.getLoggerClass()( 4333# 'multiprocessing.test.TestLoggingProcessName') 4334# logger.addHandler(handler) 4335# logger.propagate = False 4336# 4337# logger.warn('foo') 4338# assert self.__handled 4339 4340# 4341# Check that Process.join() retries if os.waitpid() fails with EINTR 4342# 4343 4344class _TestPollEintr(BaseTestCase): 4345 4346 ALLOWED_TYPES = ('processes',) 4347 4348 @classmethod 4349 def _killer(cls, pid): 4350 time.sleep(0.1) 4351 os.kill(pid, signal.SIGUSR1) 4352 4353 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4354 def test_poll_eintr(self): 4355 got_signal = [False] 4356 def record(*args): 4357 got_signal[0] = True 4358 pid = os.getpid() 4359 oldhandler = signal.signal(signal.SIGUSR1, record) 4360 try: 4361 killer = self.Process(target=self._killer, args=(pid,)) 4362 killer.start() 4363 try: 4364 p = self.Process(target=time.sleep, args=(2,)) 4365 p.start() 4366 p.join() 4367 finally: 4368 killer.join() 4369 self.assertTrue(got_signal[0]) 4370 self.assertEqual(p.exitcode, 0) 4371 finally: 4372 signal.signal(signal.SIGUSR1, oldhandler) 4373 4374# 4375# Test to verify handle verification, see issue 3321 4376# 4377 4378class TestInvalidHandle(unittest.TestCase): 4379 4380 @unittest.skipIf(WIN32, "skipped on Windows") 4381 def test_invalid_handles(self): 4382 conn = multiprocessing.connection.Connection(44977608) 4383 # check that poll() doesn't crash 4384 try: 4385 conn.poll() 4386 except (ValueError, OSError): 4387 pass 4388 finally: 4389 # Hack private attribute _handle to avoid printing an error 4390 # in conn.__del__ 4391 conn._handle = None 4392 self.assertRaises((ValueError, OSError), 4393 multiprocessing.connection.Connection, -1) 4394 4395 4396 4397class OtherTest(unittest.TestCase): 4398 # TODO: add more tests for deliver/answer challenge. 4399 def test_deliver_challenge_auth_failure(self): 4400 class _FakeConnection(object): 4401 def recv_bytes(self, size): 4402 return b'something bogus' 4403 def send_bytes(self, data): 4404 pass 4405 self.assertRaises(multiprocessing.AuthenticationError, 4406 multiprocessing.connection.deliver_challenge, 4407 _FakeConnection(), b'abc') 4408 4409 def test_answer_challenge_auth_failure(self): 4410 class _FakeConnection(object): 4411 def __init__(self): 4412 self.count = 0 4413 def recv_bytes(self, size): 4414 self.count += 1 4415 if self.count == 1: 4416 return multiprocessing.connection.CHALLENGE 4417 elif self.count == 2: 4418 return b'something bogus' 4419 return b'' 4420 def send_bytes(self, data): 4421 pass 4422 self.assertRaises(multiprocessing.AuthenticationError, 4423 multiprocessing.connection.answer_challenge, 4424 _FakeConnection(), b'abc') 4425 4426# 4427# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 4428# 4429 4430def initializer(ns): 4431 ns.test += 1 4432 4433class TestInitializers(unittest.TestCase): 4434 def setUp(self): 4435 self.mgr = multiprocessing.Manager() 4436 self.ns = self.mgr.Namespace() 4437 self.ns.test = 0 4438 4439 def tearDown(self): 4440 self.mgr.shutdown() 4441 self.mgr.join() 4442 4443 def test_manager_initializer(self): 4444 m = multiprocessing.managers.SyncManager() 4445 self.assertRaises(TypeError, m.start, 1) 4446 m.start(initializer, (self.ns,)) 4447 self.assertEqual(self.ns.test, 1) 4448 m.shutdown() 4449 m.join() 4450 4451 def test_pool_initializer(self): 4452 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 4453 p = multiprocessing.Pool(1, initializer, (self.ns,)) 4454 p.close() 4455 p.join() 4456 self.assertEqual(self.ns.test, 1) 4457 4458# 4459# Issue 5155, 5313, 5331: Test process in processes 4460# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 4461# 4462 4463def _this_sub_process(q): 4464 try: 4465 item = q.get(block=False) 4466 except pyqueue.Empty: 4467 pass 4468 4469def _test_process(): 4470 queue = multiprocessing.Queue() 4471 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 4472 subProc.daemon = True 4473 subProc.start() 4474 subProc.join() 4475 4476def _afunc(x): 4477 return x*x 4478 4479def pool_in_process(): 4480 pool = multiprocessing.Pool(processes=4) 4481 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 4482 pool.close() 4483 pool.join() 4484 4485class _file_like(object): 4486 def __init__(self, delegate): 4487 self._delegate = delegate 4488 self._pid = None 4489 4490 @property 4491 def cache(self): 4492 pid = os.getpid() 4493 # There are no race conditions since fork keeps only the running thread 4494 if pid != self._pid: 4495 self._pid = pid 4496 self._cache = [] 4497 return self._cache 4498 4499 def write(self, data): 4500 self.cache.append(data) 4501 4502 def flush(self): 4503 self._delegate.write(''.join(self.cache)) 4504 self._cache = [] 4505 4506class TestStdinBadfiledescriptor(unittest.TestCase): 4507 4508 def test_queue_in_process(self): 4509 proc = multiprocessing.Process(target=_test_process) 4510 proc.start() 4511 proc.join() 4512 4513 def test_pool_in_process(self): 4514 p = multiprocessing.Process(target=pool_in_process) 4515 p.start() 4516 p.join() 4517 4518 def test_flushing(self): 4519 sio = io.StringIO() 4520 flike = _file_like(sio) 4521 flike.write('foo') 4522 proc = multiprocessing.Process(target=lambda: flike.flush()) 4523 flike.flush() 4524 assert sio.getvalue() == 'foo' 4525 4526 4527class TestWait(unittest.TestCase): 4528 4529 @classmethod 4530 def _child_test_wait(cls, w, slow): 4531 for i in range(10): 4532 if slow: 4533 time.sleep(random.random()*0.1) 4534 w.send((i, os.getpid())) 4535 w.close() 4536 4537 def test_wait(self, slow=False): 4538 from multiprocessing.connection import wait 4539 readers = [] 4540 procs = [] 4541 messages = [] 4542 4543 for i in range(4): 4544 r, w = multiprocessing.Pipe(duplex=False) 4545 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 4546 p.daemon = True 4547 p.start() 4548 w.close() 4549 readers.append(r) 4550 procs.append(p) 4551 self.addCleanup(p.join) 4552 4553 while readers: 4554 for r in wait(readers): 4555 try: 4556 msg = r.recv() 4557 except EOFError: 4558 readers.remove(r) 4559 r.close() 4560 else: 4561 messages.append(msg) 4562 4563 messages.sort() 4564 expected = sorted((i, p.pid) for i in range(10) for p in procs) 4565 self.assertEqual(messages, expected) 4566 4567 @classmethod 4568 def _child_test_wait_socket(cls, address, slow): 4569 s = socket.socket() 4570 s.connect(address) 4571 for i in range(10): 4572 if slow: 4573 time.sleep(random.random()*0.1) 4574 s.sendall(('%s\n' % i).encode('ascii')) 4575 s.close() 4576 4577 def test_wait_socket(self, slow=False): 4578 from multiprocessing.connection import wait 4579 l = socket.create_server((test.support.HOST, 0)) 4580 addr = l.getsockname() 4581 readers = [] 4582 procs = [] 4583 dic = {} 4584 4585 for i in range(4): 4586 p = multiprocessing.Process(target=self._child_test_wait_socket, 4587 args=(addr, slow)) 4588 p.daemon = True 4589 p.start() 4590 procs.append(p) 4591 self.addCleanup(p.join) 4592 4593 for i in range(4): 4594 r, _ = l.accept() 4595 readers.append(r) 4596 dic[r] = [] 4597 l.close() 4598 4599 while readers: 4600 for r in wait(readers): 4601 msg = r.recv(32) 4602 if not msg: 4603 readers.remove(r) 4604 r.close() 4605 else: 4606 dic[r].append(msg) 4607 4608 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 4609 for v in dic.values(): 4610 self.assertEqual(b''.join(v), expected) 4611 4612 def test_wait_slow(self): 4613 self.test_wait(True) 4614 4615 def test_wait_socket_slow(self): 4616 self.test_wait_socket(True) 4617 4618 def test_wait_timeout(self): 4619 from multiprocessing.connection import wait 4620 4621 expected = 5 4622 a, b = multiprocessing.Pipe() 4623 4624 start = time.monotonic() 4625 res = wait([a, b], expected) 4626 delta = time.monotonic() - start 4627 4628 self.assertEqual(res, []) 4629 self.assertLess(delta, expected * 2) 4630 self.assertGreater(delta, expected * 0.5) 4631 4632 b.send(None) 4633 4634 start = time.monotonic() 4635 res = wait([a, b], 20) 4636 delta = time.monotonic() - start 4637 4638 self.assertEqual(res, [a]) 4639 self.assertLess(delta, 0.4) 4640 4641 @classmethod 4642 def signal_and_sleep(cls, sem, period): 4643 sem.release() 4644 time.sleep(period) 4645 4646 def test_wait_integer(self): 4647 from multiprocessing.connection import wait 4648 4649 expected = 3 4650 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 4651 sem = multiprocessing.Semaphore(0) 4652 a, b = multiprocessing.Pipe() 4653 p = multiprocessing.Process(target=self.signal_and_sleep, 4654 args=(sem, expected)) 4655 4656 p.start() 4657 self.assertIsInstance(p.sentinel, int) 4658 self.assertTrue(sem.acquire(timeout=20)) 4659 4660 start = time.monotonic() 4661 res = wait([a, p.sentinel, b], expected + 20) 4662 delta = time.monotonic() - start 4663 4664 self.assertEqual(res, [p.sentinel]) 4665 self.assertLess(delta, expected + 2) 4666 self.assertGreater(delta, expected - 2) 4667 4668 a.send(None) 4669 4670 start = time.monotonic() 4671 res = wait([a, p.sentinel, b], 20) 4672 delta = time.monotonic() - start 4673 4674 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 4675 self.assertLess(delta, 0.4) 4676 4677 b.send(None) 4678 4679 start = time.monotonic() 4680 res = wait([a, p.sentinel, b], 20) 4681 delta = time.monotonic() - start 4682 4683 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 4684 self.assertLess(delta, 0.4) 4685 4686 p.terminate() 4687 p.join() 4688 4689 def test_neg_timeout(self): 4690 from multiprocessing.connection import wait 4691 a, b = multiprocessing.Pipe() 4692 t = time.monotonic() 4693 res = wait([a], timeout=-1) 4694 t = time.monotonic() - t 4695 self.assertEqual(res, []) 4696 self.assertLess(t, 1) 4697 a.close() 4698 b.close() 4699 4700# 4701# Issue 14151: Test invalid family on invalid environment 4702# 4703 4704class TestInvalidFamily(unittest.TestCase): 4705 4706 @unittest.skipIf(WIN32, "skipped on Windows") 4707 def test_invalid_family(self): 4708 with self.assertRaises(ValueError): 4709 multiprocessing.connection.Listener(r'\\.\test') 4710 4711 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 4712 def test_invalid_family_win32(self): 4713 with self.assertRaises(ValueError): 4714 multiprocessing.connection.Listener('/var/test.pipe') 4715 4716# 4717# Issue 12098: check sys.flags of child matches that for parent 4718# 4719 4720class TestFlags(unittest.TestCase): 4721 @classmethod 4722 def run_in_grandchild(cls, conn): 4723 conn.send(tuple(sys.flags)) 4724 4725 @classmethod 4726 def run_in_child(cls): 4727 import json 4728 r, w = multiprocessing.Pipe(duplex=False) 4729 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 4730 p.start() 4731 grandchild_flags = r.recv() 4732 p.join() 4733 r.close() 4734 w.close() 4735 flags = (tuple(sys.flags), grandchild_flags) 4736 print(json.dumps(flags)) 4737 4738 def test_flags(self): 4739 import json 4740 # start child process using unusual flags 4741 prog = ('from test._test_multiprocessing import TestFlags; ' + 4742 'TestFlags.run_in_child()') 4743 data = subprocess.check_output( 4744 [sys.executable, '-E', '-S', '-O', '-c', prog]) 4745 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 4746 self.assertEqual(child_flags, grandchild_flags) 4747 4748# 4749# Test interaction with socket timeouts - see Issue #6056 4750# 4751 4752class TestTimeouts(unittest.TestCase): 4753 @classmethod 4754 def _test_timeout(cls, child, address): 4755 time.sleep(1) 4756 child.send(123) 4757 child.close() 4758 conn = multiprocessing.connection.Client(address) 4759 conn.send(456) 4760 conn.close() 4761 4762 def test_timeout(self): 4763 old_timeout = socket.getdefaulttimeout() 4764 try: 4765 socket.setdefaulttimeout(0.1) 4766 parent, child = multiprocessing.Pipe(duplex=True) 4767 l = multiprocessing.connection.Listener(family='AF_INET') 4768 p = multiprocessing.Process(target=self._test_timeout, 4769 args=(child, l.address)) 4770 p.start() 4771 child.close() 4772 self.assertEqual(parent.recv(), 123) 4773 parent.close() 4774 conn = l.accept() 4775 self.assertEqual(conn.recv(), 456) 4776 conn.close() 4777 l.close() 4778 join_process(p) 4779 finally: 4780 socket.setdefaulttimeout(old_timeout) 4781 4782# 4783# Test what happens with no "if __name__ == '__main__'" 4784# 4785 4786class TestNoForkBomb(unittest.TestCase): 4787 def test_noforkbomb(self): 4788 sm = multiprocessing.get_start_method() 4789 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 4790 if sm != 'fork': 4791 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 4792 self.assertEqual(out, b'') 4793 self.assertIn(b'RuntimeError', err) 4794 else: 4795 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 4796 self.assertEqual(out.rstrip(), b'123') 4797 self.assertEqual(err, b'') 4798 4799# 4800# Issue #17555: ForkAwareThreadLock 4801# 4802 4803class TestForkAwareThreadLock(unittest.TestCase): 4804 # We recursively start processes. Issue #17555 meant that the 4805 # after fork registry would get duplicate entries for the same 4806 # lock. The size of the registry at generation n was ~2**n. 4807 4808 @classmethod 4809 def child(cls, n, conn): 4810 if n > 1: 4811 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 4812 p.start() 4813 conn.close() 4814 join_process(p) 4815 else: 4816 conn.send(len(util._afterfork_registry)) 4817 conn.close() 4818 4819 def test_lock(self): 4820 r, w = multiprocessing.Pipe(False) 4821 l = util.ForkAwareThreadLock() 4822 old_size = len(util._afterfork_registry) 4823 p = multiprocessing.Process(target=self.child, args=(5, w)) 4824 p.start() 4825 w.close() 4826 new_size = r.recv() 4827 join_process(p) 4828 self.assertLessEqual(new_size, old_size) 4829 4830# 4831# Check that non-forked child processes do not inherit unneeded fds/handles 4832# 4833 4834class TestCloseFds(unittest.TestCase): 4835 4836 def get_high_socket_fd(self): 4837 if WIN32: 4838 # The child process will not have any socket handles, so 4839 # calling socket.fromfd() should produce WSAENOTSOCK even 4840 # if there is a handle of the same number. 4841 return socket.socket().detach() 4842 else: 4843 # We want to produce a socket with an fd high enough that a 4844 # freshly created child process will not have any fds as high. 4845 fd = socket.socket().detach() 4846 to_close = [] 4847 while fd < 50: 4848 to_close.append(fd) 4849 fd = os.dup(fd) 4850 for x in to_close: 4851 os.close(x) 4852 return fd 4853 4854 def close(self, fd): 4855 if WIN32: 4856 socket.socket(socket.AF_INET, socket.SOCK_STREAM, fileno=fd).close() 4857 else: 4858 os.close(fd) 4859 4860 @classmethod 4861 def _test_closefds(cls, conn, fd): 4862 try: 4863 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 4864 except Exception as e: 4865 conn.send(e) 4866 else: 4867 s.close() 4868 conn.send(None) 4869 4870 def test_closefd(self): 4871 if not HAS_REDUCTION: 4872 raise unittest.SkipTest('requires fd pickling') 4873 4874 reader, writer = multiprocessing.Pipe() 4875 fd = self.get_high_socket_fd() 4876 try: 4877 p = multiprocessing.Process(target=self._test_closefds, 4878 args=(writer, fd)) 4879 p.start() 4880 writer.close() 4881 e = reader.recv() 4882 join_process(p) 4883 finally: 4884 self.close(fd) 4885 writer.close() 4886 reader.close() 4887 4888 if multiprocessing.get_start_method() == 'fork': 4889 self.assertIs(e, None) 4890 else: 4891 WSAENOTSOCK = 10038 4892 self.assertIsInstance(e, OSError) 4893 self.assertTrue(e.errno == errno.EBADF or 4894 e.winerror == WSAENOTSOCK, e) 4895 4896# 4897# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 4898# 4899 4900class TestIgnoreEINTR(unittest.TestCase): 4901 4902 # Sending CONN_MAX_SIZE bytes into a multiprocessing pipe must block 4903 CONN_MAX_SIZE = max(support.PIPE_MAX_SIZE, support.SOCK_MAX_SIZE) 4904 4905 @classmethod 4906 def _test_ignore(cls, conn): 4907 def handler(signum, frame): 4908 pass 4909 signal.signal(signal.SIGUSR1, handler) 4910 conn.send('ready') 4911 x = conn.recv() 4912 conn.send(x) 4913 conn.send_bytes(b'x' * cls.CONN_MAX_SIZE) 4914 4915 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4916 def test_ignore(self): 4917 conn, child_conn = multiprocessing.Pipe() 4918 try: 4919 p = multiprocessing.Process(target=self._test_ignore, 4920 args=(child_conn,)) 4921 p.daemon = True 4922 p.start() 4923 child_conn.close() 4924 self.assertEqual(conn.recv(), 'ready') 4925 time.sleep(0.1) 4926 os.kill(p.pid, signal.SIGUSR1) 4927 time.sleep(0.1) 4928 conn.send(1234) 4929 self.assertEqual(conn.recv(), 1234) 4930 time.sleep(0.1) 4931 os.kill(p.pid, signal.SIGUSR1) 4932 self.assertEqual(conn.recv_bytes(), b'x' * self.CONN_MAX_SIZE) 4933 time.sleep(0.1) 4934 p.join() 4935 finally: 4936 conn.close() 4937 4938 @classmethod 4939 def _test_ignore_listener(cls, conn): 4940 def handler(signum, frame): 4941 pass 4942 signal.signal(signal.SIGUSR1, handler) 4943 with multiprocessing.connection.Listener() as l: 4944 conn.send(l.address) 4945 a = l.accept() 4946 a.send('welcome') 4947 4948 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 4949 def test_ignore_listener(self): 4950 conn, child_conn = multiprocessing.Pipe() 4951 try: 4952 p = multiprocessing.Process(target=self._test_ignore_listener, 4953 args=(child_conn,)) 4954 p.daemon = True 4955 p.start() 4956 child_conn.close() 4957 address = conn.recv() 4958 time.sleep(0.1) 4959 os.kill(p.pid, signal.SIGUSR1) 4960 time.sleep(0.1) 4961 client = multiprocessing.connection.Client(address) 4962 self.assertEqual(client.recv(), 'welcome') 4963 p.join() 4964 finally: 4965 conn.close() 4966 4967class TestStartMethod(unittest.TestCase): 4968 @classmethod 4969 def _check_context(cls, conn): 4970 conn.send(multiprocessing.get_start_method()) 4971 4972 def check_context(self, ctx): 4973 r, w = ctx.Pipe(duplex=False) 4974 p = ctx.Process(target=self._check_context, args=(w,)) 4975 p.start() 4976 w.close() 4977 child_method = r.recv() 4978 r.close() 4979 p.join() 4980 self.assertEqual(child_method, ctx.get_start_method()) 4981 4982 def test_context(self): 4983 for method in ('fork', 'spawn', 'forkserver'): 4984 try: 4985 ctx = multiprocessing.get_context(method) 4986 except ValueError: 4987 continue 4988 self.assertEqual(ctx.get_start_method(), method) 4989 self.assertIs(ctx.get_context(), ctx) 4990 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 4991 self.assertRaises(ValueError, ctx.set_start_method, None) 4992 self.check_context(ctx) 4993 4994 def test_set_get(self): 4995 multiprocessing.set_forkserver_preload(PRELOAD) 4996 count = 0 4997 old_method = multiprocessing.get_start_method() 4998 try: 4999 for method in ('fork', 'spawn', 'forkserver'): 5000 try: 5001 multiprocessing.set_start_method(method, force=True) 5002 except ValueError: 5003 continue 5004 self.assertEqual(multiprocessing.get_start_method(), method) 5005 ctx = multiprocessing.get_context() 5006 self.assertEqual(ctx.get_start_method(), method) 5007 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 5008 self.assertTrue( 5009 ctx.Process.__name__.lower().startswith(method)) 5010 self.check_context(multiprocessing) 5011 count += 1 5012 finally: 5013 multiprocessing.set_start_method(old_method, force=True) 5014 self.assertGreaterEqual(count, 1) 5015 5016 def test_get_all(self): 5017 methods = multiprocessing.get_all_start_methods() 5018 if sys.platform == 'win32': 5019 self.assertEqual(methods, ['spawn']) 5020 else: 5021 self.assertTrue(methods == ['fork', 'spawn'] or 5022 methods == ['spawn', 'fork'] or 5023 methods == ['fork', 'spawn', 'forkserver'] or 5024 methods == ['spawn', 'fork', 'forkserver']) 5025 5026 def test_preload_resources(self): 5027 if multiprocessing.get_start_method() != 'forkserver': 5028 self.skipTest("test only relevant for 'forkserver' method") 5029 name = os.path.join(os.path.dirname(__file__), 'mp_preload.py') 5030 rc, out, err = test.support.script_helper.assert_python_ok(name) 5031 out = out.decode() 5032 err = err.decode() 5033 if out.rstrip() != 'ok' or err != '': 5034 print(out) 5035 print(err) 5036 self.fail("failed spawning forkserver or grandchild") 5037 5038 5039@unittest.skipIf(sys.platform == "win32", 5040 "test semantics don't make sense on Windows") 5041class TestResourceTracker(unittest.TestCase): 5042 5043 def test_resource_tracker(self): 5044 # 5045 # Check that killing process does not leak named semaphores 5046 # 5047 cmd = '''if 1: 5048 import time, os, tempfile 5049 import multiprocessing as mp 5050 from multiprocessing import resource_tracker 5051 from multiprocessing.shared_memory import SharedMemory 5052 5053 mp.set_start_method("spawn") 5054 rand = tempfile._RandomNameSequence() 5055 5056 5057 def create_and_register_resource(rtype): 5058 if rtype == "semaphore": 5059 lock = mp.Lock() 5060 return lock, lock._semlock.name 5061 elif rtype == "shared_memory": 5062 sm = SharedMemory(create=True, size=10) 5063 return sm, sm._name 5064 else: 5065 raise ValueError( 5066 "Resource type {{}} not understood".format(rtype)) 5067 5068 5069 resource1, rname1 = create_and_register_resource("{rtype}") 5070 resource2, rname2 = create_and_register_resource("{rtype}") 5071 5072 os.write({w}, rname1.encode("ascii") + b"\\n") 5073 os.write({w}, rname2.encode("ascii") + b"\\n") 5074 5075 time.sleep(10) 5076 ''' 5077 for rtype in resource_tracker._CLEANUP_FUNCS: 5078 with self.subTest(rtype=rtype): 5079 if rtype == "noop": 5080 # Artefact resource type used by the resource_tracker 5081 continue 5082 r, w = os.pipe() 5083 p = subprocess.Popen([sys.executable, 5084 '-E', '-c', cmd.format(w=w, rtype=rtype)], 5085 pass_fds=[w], 5086 stderr=subprocess.PIPE) 5087 os.close(w) 5088 with open(r, 'rb', closefd=True) as f: 5089 name1 = f.readline().rstrip().decode('ascii') 5090 name2 = f.readline().rstrip().decode('ascii') 5091 _resource_unlink(name1, rtype) 5092 p.terminate() 5093 p.wait() 5094 5095 deadline = time.monotonic() + 60 5096 while time.monotonic() < deadline: 5097 time.sleep(.5) 5098 try: 5099 _resource_unlink(name2, rtype) 5100 except OSError as e: 5101 # docs say it should be ENOENT, but OSX seems to give 5102 # EINVAL 5103 self.assertIn(e.errno, (errno.ENOENT, errno.EINVAL)) 5104 break 5105 else: 5106 raise AssertionError( 5107 f"A {rtype} resource was leaked after a process was " 5108 f"abruptly terminated.") 5109 err = p.stderr.read().decode('utf-8') 5110 p.stderr.close() 5111 expected = ('resource_tracker: There appear to be 2 leaked {} ' 5112 'objects'.format( 5113 rtype)) 5114 self.assertRegex(err, expected) 5115 self.assertRegex(err, r'resource_tracker: %r: \[Errno' % name1) 5116 5117 def check_resource_tracker_death(self, signum, should_die): 5118 # bpo-31310: if the semaphore tracker process has died, it should 5119 # be restarted implicitly. 5120 from multiprocessing.resource_tracker import _resource_tracker 5121 pid = _resource_tracker._pid 5122 if pid is not None: 5123 os.kill(pid, signal.SIGKILL) 5124 os.waitpid(pid, 0) 5125 with warnings.catch_warnings(): 5126 warnings.simplefilter("ignore") 5127 _resource_tracker.ensure_running() 5128 pid = _resource_tracker._pid 5129 5130 os.kill(pid, signum) 5131 time.sleep(1.0) # give it time to die 5132 5133 ctx = multiprocessing.get_context("spawn") 5134 with warnings.catch_warnings(record=True) as all_warn: 5135 warnings.simplefilter("always") 5136 sem = ctx.Semaphore() 5137 sem.acquire() 5138 sem.release() 5139 wr = weakref.ref(sem) 5140 # ensure `sem` gets collected, which triggers communication with 5141 # the semaphore tracker 5142 del sem 5143 gc.collect() 5144 self.assertIsNone(wr()) 5145 if should_die: 5146 self.assertEqual(len(all_warn), 1) 5147 the_warn = all_warn[0] 5148 self.assertTrue(issubclass(the_warn.category, UserWarning)) 5149 self.assertTrue("resource_tracker: process died" 5150 in str(the_warn.message)) 5151 else: 5152 self.assertEqual(len(all_warn), 0) 5153 5154 def test_resource_tracker_sigint(self): 5155 # Catchable signal (ignored by semaphore tracker) 5156 self.check_resource_tracker_death(signal.SIGINT, False) 5157 5158 def test_resource_tracker_sigterm(self): 5159 # Catchable signal (ignored by semaphore tracker) 5160 self.check_resource_tracker_death(signal.SIGTERM, False) 5161 5162 def test_resource_tracker_sigkill(self): 5163 # Uncatchable signal. 5164 self.check_resource_tracker_death(signal.SIGKILL, True) 5165 5166 @staticmethod 5167 def _is_resource_tracker_reused(conn, pid): 5168 from multiprocessing.resource_tracker import _resource_tracker 5169 _resource_tracker.ensure_running() 5170 # The pid should be None in the child process, expect for the fork 5171 # context. It should not be a new value. 5172 reused = _resource_tracker._pid in (None, pid) 5173 reused &= _resource_tracker._check_alive() 5174 conn.send(reused) 5175 5176 def test_resource_tracker_reused(self): 5177 from multiprocessing.resource_tracker import _resource_tracker 5178 _resource_tracker.ensure_running() 5179 pid = _resource_tracker._pid 5180 5181 r, w = multiprocessing.Pipe(duplex=False) 5182 p = multiprocessing.Process(target=self._is_resource_tracker_reused, 5183 args=(w, pid)) 5184 p.start() 5185 is_resource_tracker_reused = r.recv() 5186 5187 # Clean up 5188 p.join() 5189 w.close() 5190 r.close() 5191 5192 self.assertTrue(is_resource_tracker_reused) 5193 5194 5195class TestSimpleQueue(unittest.TestCase): 5196 5197 @classmethod 5198 def _test_empty(cls, queue, child_can_start, parent_can_continue): 5199 child_can_start.wait() 5200 # issue 30301, could fail under spawn and forkserver 5201 try: 5202 queue.put(queue.empty()) 5203 queue.put(queue.empty()) 5204 finally: 5205 parent_can_continue.set() 5206 5207 def test_empty(self): 5208 queue = multiprocessing.SimpleQueue() 5209 child_can_start = multiprocessing.Event() 5210 parent_can_continue = multiprocessing.Event() 5211 5212 proc = multiprocessing.Process( 5213 target=self._test_empty, 5214 args=(queue, child_can_start, parent_can_continue) 5215 ) 5216 proc.daemon = True 5217 proc.start() 5218 5219 self.assertTrue(queue.empty()) 5220 5221 child_can_start.set() 5222 parent_can_continue.wait() 5223 5224 self.assertFalse(queue.empty()) 5225 self.assertEqual(queue.get(), True) 5226 self.assertEqual(queue.get(), False) 5227 self.assertTrue(queue.empty()) 5228 5229 proc.join() 5230 5231 5232class TestPoolNotLeakOnFailure(unittest.TestCase): 5233 5234 def test_release_unused_processes(self): 5235 # Issue #19675: During pool creation, if we can't create a process, 5236 # don't leak already created ones. 5237 will_fail_in = 3 5238 forked_processes = [] 5239 5240 class FailingForkProcess: 5241 def __init__(self, **kwargs): 5242 self.name = 'Fake Process' 5243 self.exitcode = None 5244 self.state = None 5245 forked_processes.append(self) 5246 5247 def start(self): 5248 nonlocal will_fail_in 5249 if will_fail_in <= 0: 5250 raise OSError("Manually induced OSError") 5251 will_fail_in -= 1 5252 self.state = 'started' 5253 5254 def terminate(self): 5255 self.state = 'stopping' 5256 5257 def join(self): 5258 if self.state == 'stopping': 5259 self.state = 'stopped' 5260 5261 def is_alive(self): 5262 return self.state == 'started' or self.state == 'stopping' 5263 5264 with self.assertRaisesRegex(OSError, 'Manually induced OSError'): 5265 p = multiprocessing.pool.Pool(5, context=unittest.mock.MagicMock( 5266 Process=FailingForkProcess)) 5267 p.close() 5268 p.join() 5269 self.assertFalse( 5270 any(process.is_alive() for process in forked_processes)) 5271 5272 5273class TestSyncManagerTypes(unittest.TestCase): 5274 """Test all the types which can be shared between a parent and a 5275 child process by using a manager which acts as an intermediary 5276 between them. 5277 5278 In the following unit-tests the base type is created in the parent 5279 process, the @classmethod represents the worker process and the 5280 shared object is readable and editable between the two. 5281 5282 # The child. 5283 @classmethod 5284 def _test_list(cls, obj): 5285 assert obj[0] == 5 5286 assert obj.append(6) 5287 5288 # The parent. 5289 def test_list(self): 5290 o = self.manager.list() 5291 o.append(5) 5292 self.run_worker(self._test_list, o) 5293 assert o[1] == 6 5294 """ 5295 manager_class = multiprocessing.managers.SyncManager 5296 5297 def setUp(self): 5298 self.manager = self.manager_class() 5299 self.manager.start() 5300 self.proc = None 5301 5302 def tearDown(self): 5303 if self.proc is not None and self.proc.is_alive(): 5304 self.proc.terminate() 5305 self.proc.join() 5306 self.manager.shutdown() 5307 self.manager = None 5308 self.proc = None 5309 5310 @classmethod 5311 def setUpClass(cls): 5312 support.reap_children() 5313 5314 tearDownClass = setUpClass 5315 5316 def wait_proc_exit(self): 5317 # Only the manager process should be returned by active_children() 5318 # but this can take a bit on slow machines, so wait a few seconds 5319 # if there are other children too (see #17395). 5320 join_process(self.proc) 5321 start_time = time.monotonic() 5322 t = 0.01 5323 while len(multiprocessing.active_children()) > 1: 5324 time.sleep(t) 5325 t *= 2 5326 dt = time.monotonic() - start_time 5327 if dt >= 5.0: 5328 test.support.environment_altered = True 5329 support.print_warning(f"multiprocessing.Manager still has " 5330 f"{multiprocessing.active_children()} " 5331 f"active children after {dt} seconds") 5332 break 5333 5334 def run_worker(self, worker, obj): 5335 self.proc = multiprocessing.Process(target=worker, args=(obj, )) 5336 self.proc.daemon = True 5337 self.proc.start() 5338 self.wait_proc_exit() 5339 self.assertEqual(self.proc.exitcode, 0) 5340 5341 @classmethod 5342 def _test_event(cls, obj): 5343 assert obj.is_set() 5344 obj.wait() 5345 obj.clear() 5346 obj.wait(0.001) 5347 5348 def test_event(self): 5349 o = self.manager.Event() 5350 o.set() 5351 self.run_worker(self._test_event, o) 5352 assert not o.is_set() 5353 o.wait(0.001) 5354 5355 @classmethod 5356 def _test_lock(cls, obj): 5357 obj.acquire() 5358 5359 def test_lock(self, lname="Lock"): 5360 o = getattr(self.manager, lname)() 5361 self.run_worker(self._test_lock, o) 5362 o.release() 5363 self.assertRaises(RuntimeError, o.release) # already released 5364 5365 @classmethod 5366 def _test_rlock(cls, obj): 5367 obj.acquire() 5368 obj.release() 5369 5370 def test_rlock(self, lname="Lock"): 5371 o = getattr(self.manager, lname)() 5372 self.run_worker(self._test_rlock, o) 5373 5374 @classmethod 5375 def _test_semaphore(cls, obj): 5376 obj.acquire() 5377 5378 def test_semaphore(self, sname="Semaphore"): 5379 o = getattr(self.manager, sname)() 5380 self.run_worker(self._test_semaphore, o) 5381 o.release() 5382 5383 def test_bounded_semaphore(self): 5384 self.test_semaphore(sname="BoundedSemaphore") 5385 5386 @classmethod 5387 def _test_condition(cls, obj): 5388 obj.acquire() 5389 obj.release() 5390 5391 def test_condition(self): 5392 o = self.manager.Condition() 5393 self.run_worker(self._test_condition, o) 5394 5395 @classmethod 5396 def _test_barrier(cls, obj): 5397 assert obj.parties == 5 5398 obj.reset() 5399 5400 def test_barrier(self): 5401 o = self.manager.Barrier(5) 5402 self.run_worker(self._test_barrier, o) 5403 5404 @classmethod 5405 def _test_pool(cls, obj): 5406 # TODO: fix https://bugs.python.org/issue35919 5407 with obj: 5408 pass 5409 5410 def test_pool(self): 5411 o = self.manager.Pool(processes=4) 5412 self.run_worker(self._test_pool, o) 5413 5414 @classmethod 5415 def _test_queue(cls, obj): 5416 assert obj.qsize() == 2 5417 assert obj.full() 5418 assert not obj.empty() 5419 assert obj.get() == 5 5420 assert not obj.empty() 5421 assert obj.get() == 6 5422 assert obj.empty() 5423 5424 def test_queue(self, qname="Queue"): 5425 o = getattr(self.manager, qname)(2) 5426 o.put(5) 5427 o.put(6) 5428 self.run_worker(self._test_queue, o) 5429 assert o.empty() 5430 assert not o.full() 5431 5432 def test_joinable_queue(self): 5433 self.test_queue("JoinableQueue") 5434 5435 @classmethod 5436 def _test_list(cls, obj): 5437 assert obj[0] == 5 5438 assert obj.count(5) == 1 5439 assert obj.index(5) == 0 5440 obj.sort() 5441 obj.reverse() 5442 for x in obj: 5443 pass 5444 assert len(obj) == 1 5445 assert obj.pop(0) == 5 5446 5447 def test_list(self): 5448 o = self.manager.list() 5449 o.append(5) 5450 self.run_worker(self._test_list, o) 5451 assert not o 5452 self.assertEqual(len(o), 0) 5453 5454 @classmethod 5455 def _test_dict(cls, obj): 5456 assert len(obj) == 1 5457 assert obj['foo'] == 5 5458 assert obj.get('foo') == 5 5459 assert list(obj.items()) == [('foo', 5)] 5460 assert list(obj.keys()) == ['foo'] 5461 assert list(obj.values()) == [5] 5462 assert obj.copy() == {'foo': 5} 5463 assert obj.popitem() == ('foo', 5) 5464 5465 def test_dict(self): 5466 o = self.manager.dict() 5467 o['foo'] = 5 5468 self.run_worker(self._test_dict, o) 5469 assert not o 5470 self.assertEqual(len(o), 0) 5471 5472 @classmethod 5473 def _test_value(cls, obj): 5474 assert obj.value == 1 5475 assert obj.get() == 1 5476 obj.set(2) 5477 5478 def test_value(self): 5479 o = self.manager.Value('i', 1) 5480 self.run_worker(self._test_value, o) 5481 self.assertEqual(o.value, 2) 5482 self.assertEqual(o.get(), 2) 5483 5484 @classmethod 5485 def _test_array(cls, obj): 5486 assert obj[0] == 0 5487 assert obj[1] == 1 5488 assert len(obj) == 2 5489 assert list(obj) == [0, 1] 5490 5491 def test_array(self): 5492 o = self.manager.Array('i', [0, 1]) 5493 self.run_worker(self._test_array, o) 5494 5495 @classmethod 5496 def _test_namespace(cls, obj): 5497 assert obj.x == 0 5498 assert obj.y == 1 5499 5500 def test_namespace(self): 5501 o = self.manager.Namespace() 5502 o.x = 0 5503 o.y = 1 5504 self.run_worker(self._test_namespace, o) 5505 5506 5507class MiscTestCase(unittest.TestCase): 5508 def test__all__(self): 5509 # Just make sure names in blacklist are excluded 5510 support.check__all__(self, multiprocessing, extra=multiprocessing.__all__, 5511 blacklist=['SUBDEBUG', 'SUBWARNING']) 5512# 5513# Mixins 5514# 5515 5516class BaseMixin(object): 5517 @classmethod 5518 def setUpClass(cls): 5519 cls.dangling = (multiprocessing.process._dangling.copy(), 5520 threading._dangling.copy()) 5521 5522 @classmethod 5523 def tearDownClass(cls): 5524 # bpo-26762: Some multiprocessing objects like Pool create reference 5525 # cycles. Trigger a garbage collection to break these cycles. 5526 test.support.gc_collect() 5527 5528 processes = set(multiprocessing.process._dangling) - set(cls.dangling[0]) 5529 if processes: 5530 test.support.environment_altered = True 5531 support.print_warning(f'Dangling processes: {processes}') 5532 processes = None 5533 5534 threads = set(threading._dangling) - set(cls.dangling[1]) 5535 if threads: 5536 test.support.environment_altered = True 5537 support.print_warning(f'Dangling threads: {threads}') 5538 threads = None 5539 5540 5541class ProcessesMixin(BaseMixin): 5542 TYPE = 'processes' 5543 Process = multiprocessing.Process 5544 connection = multiprocessing.connection 5545 current_process = staticmethod(multiprocessing.current_process) 5546 parent_process = staticmethod(multiprocessing.parent_process) 5547 active_children = staticmethod(multiprocessing.active_children) 5548 Pool = staticmethod(multiprocessing.Pool) 5549 Pipe = staticmethod(multiprocessing.Pipe) 5550 Queue = staticmethod(multiprocessing.Queue) 5551 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 5552 Lock = staticmethod(multiprocessing.Lock) 5553 RLock = staticmethod(multiprocessing.RLock) 5554 Semaphore = staticmethod(multiprocessing.Semaphore) 5555 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 5556 Condition = staticmethod(multiprocessing.Condition) 5557 Event = staticmethod(multiprocessing.Event) 5558 Barrier = staticmethod(multiprocessing.Barrier) 5559 Value = staticmethod(multiprocessing.Value) 5560 Array = staticmethod(multiprocessing.Array) 5561 RawValue = staticmethod(multiprocessing.RawValue) 5562 RawArray = staticmethod(multiprocessing.RawArray) 5563 5564 5565class ManagerMixin(BaseMixin): 5566 TYPE = 'manager' 5567 Process = multiprocessing.Process 5568 Queue = property(operator.attrgetter('manager.Queue')) 5569 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 5570 Lock = property(operator.attrgetter('manager.Lock')) 5571 RLock = property(operator.attrgetter('manager.RLock')) 5572 Semaphore = property(operator.attrgetter('manager.Semaphore')) 5573 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 5574 Condition = property(operator.attrgetter('manager.Condition')) 5575 Event = property(operator.attrgetter('manager.Event')) 5576 Barrier = property(operator.attrgetter('manager.Barrier')) 5577 Value = property(operator.attrgetter('manager.Value')) 5578 Array = property(operator.attrgetter('manager.Array')) 5579 list = property(operator.attrgetter('manager.list')) 5580 dict = property(operator.attrgetter('manager.dict')) 5581 Namespace = property(operator.attrgetter('manager.Namespace')) 5582 5583 @classmethod 5584 def Pool(cls, *args, **kwds): 5585 return cls.manager.Pool(*args, **kwds) 5586 5587 @classmethod 5588 def setUpClass(cls): 5589 super().setUpClass() 5590 cls.manager = multiprocessing.Manager() 5591 5592 @classmethod 5593 def tearDownClass(cls): 5594 # only the manager process should be returned by active_children() 5595 # but this can take a bit on slow machines, so wait a few seconds 5596 # if there are other children too (see #17395) 5597 start_time = time.monotonic() 5598 t = 0.01 5599 while len(multiprocessing.active_children()) > 1: 5600 time.sleep(t) 5601 t *= 2 5602 dt = time.monotonic() - start_time 5603 if dt >= 5.0: 5604 test.support.environment_altered = True 5605 support.print_warning(f"multiprocessing.Manager still has " 5606 f"{multiprocessing.active_children()} " 5607 f"active children after {dt} seconds") 5608 break 5609 5610 gc.collect() # do garbage collection 5611 if cls.manager._number_of_objects() != 0: 5612 # This is not really an error since some tests do not 5613 # ensure that all processes which hold a reference to a 5614 # managed object have been joined. 5615 test.support.environment_altered = True 5616 support.print_warning('Shared objects which still exist ' 5617 'at manager shutdown:') 5618 support.print_warning(cls.manager._debug_info()) 5619 cls.manager.shutdown() 5620 cls.manager.join() 5621 cls.manager = None 5622 5623 super().tearDownClass() 5624 5625 5626class ThreadsMixin(BaseMixin): 5627 TYPE = 'threads' 5628 Process = multiprocessing.dummy.Process 5629 connection = multiprocessing.dummy.connection 5630 current_process = staticmethod(multiprocessing.dummy.current_process) 5631 active_children = staticmethod(multiprocessing.dummy.active_children) 5632 Pool = staticmethod(multiprocessing.dummy.Pool) 5633 Pipe = staticmethod(multiprocessing.dummy.Pipe) 5634 Queue = staticmethod(multiprocessing.dummy.Queue) 5635 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 5636 Lock = staticmethod(multiprocessing.dummy.Lock) 5637 RLock = staticmethod(multiprocessing.dummy.RLock) 5638 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 5639 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 5640 Condition = staticmethod(multiprocessing.dummy.Condition) 5641 Event = staticmethod(multiprocessing.dummy.Event) 5642 Barrier = staticmethod(multiprocessing.dummy.Barrier) 5643 Value = staticmethod(multiprocessing.dummy.Value) 5644 Array = staticmethod(multiprocessing.dummy.Array) 5645 5646# 5647# Functions used to create test cases from the base ones in this module 5648# 5649 5650def install_tests_in_module_dict(remote_globs, start_method): 5651 __module__ = remote_globs['__name__'] 5652 local_globs = globals() 5653 ALL_TYPES = {'processes', 'threads', 'manager'} 5654 5655 for name, base in local_globs.items(): 5656 if not isinstance(base, type): 5657 continue 5658 if issubclass(base, BaseTestCase): 5659 if base is BaseTestCase: 5660 continue 5661 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 5662 for type_ in base.ALLOWED_TYPES: 5663 newname = 'With' + type_.capitalize() + name[1:] 5664 Mixin = local_globs[type_.capitalize() + 'Mixin'] 5665 class Temp(base, Mixin, unittest.TestCase): 5666 pass 5667 Temp.__name__ = Temp.__qualname__ = newname 5668 Temp.__module__ = __module__ 5669 remote_globs[newname] = Temp 5670 elif issubclass(base, unittest.TestCase): 5671 class Temp(base, object): 5672 pass 5673 Temp.__name__ = Temp.__qualname__ = name 5674 Temp.__module__ = __module__ 5675 remote_globs[name] = Temp 5676 5677 dangling = [None, None] 5678 old_start_method = [None] 5679 5680 def setUpModule(): 5681 multiprocessing.set_forkserver_preload(PRELOAD) 5682 multiprocessing.process._cleanup() 5683 dangling[0] = multiprocessing.process._dangling.copy() 5684 dangling[1] = threading._dangling.copy() 5685 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 5686 try: 5687 multiprocessing.set_start_method(start_method, force=True) 5688 except ValueError: 5689 raise unittest.SkipTest(start_method + 5690 ' start method not supported') 5691 5692 if sys.platform.startswith("linux"): 5693 try: 5694 lock = multiprocessing.RLock() 5695 except OSError: 5696 raise unittest.SkipTest("OSError raises on RLock creation, " 5697 "see issue 3111!") 5698 check_enough_semaphores() 5699 util.get_temp_dir() # creates temp directory 5700 multiprocessing.get_logger().setLevel(LOG_LEVEL) 5701 5702 def tearDownModule(): 5703 need_sleep = False 5704 5705 # bpo-26762: Some multiprocessing objects like Pool create reference 5706 # cycles. Trigger a garbage collection to break these cycles. 5707 test.support.gc_collect() 5708 5709 multiprocessing.set_start_method(old_start_method[0], force=True) 5710 # pause a bit so we don't get warning about dangling threads/processes 5711 processes = set(multiprocessing.process._dangling) - set(dangling[0]) 5712 if processes: 5713 need_sleep = True 5714 test.support.environment_altered = True 5715 support.print_warning(f'Dangling processes: {processes}') 5716 processes = None 5717 5718 threads = set(threading._dangling) - set(dangling[1]) 5719 if threads: 5720 need_sleep = True 5721 test.support.environment_altered = True 5722 support.print_warning(f'Dangling threads: {threads}') 5723 threads = None 5724 5725 # Sleep 500 ms to give time to child processes to complete. 5726 if need_sleep: 5727 time.sleep(0.5) 5728 5729 multiprocessing.util._cleanup_tests() 5730 5731 remote_globs['setUpModule'] = setUpModule 5732 remote_globs['tearDownModule'] = tearDownModule 5733