1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import queue as pyqueue 7import time 8import io 9import itertools 10import sys 11import os 12import gc 13import errno 14import signal 15import array 16import socket 17import random 18import logging 19import struct 20import operator 21import test.support 22import test.support.script_helper 23 24 25# Skip tests if _multiprocessing wasn't built. 26_multiprocessing = test.support.import_module('_multiprocessing') 27# Skip tests if sem_open implementation is broken. 28test.support.import_module('multiprocess.synchronize') 29# import threading after _multiprocessing to raise a more relevant error 30# message: "No module named _multiprocessing". _multiprocessing is not compiled 31# without thread support. 32import threading 33 34import multiprocess as multiprocessing 35import multiprocess.dummy 36import multiprocess.connection 37import multiprocess.managers 38import multiprocess.heap 39import multiprocess.pool 40 41from multiprocess import util 42 43try: 44 from multiprocess import reduction 45 HAS_REDUCTION = reduction.HAVE_SEND_HANDLE 46except ImportError: 47 HAS_REDUCTION = False 48 49try: 50 from multiprocess.sharedctypes import Value, copy 51 HAS_SHAREDCTYPES = True 52except ImportError: 53 HAS_SHAREDCTYPES = False 54 55try: 56 import msvcrt 57except ImportError: 58 msvcrt = None 59 60# 61# 62# 63 64def latin(s): 65 return s.encode('latin') 66 67# 68# Constants 69# 70 71LOG_LEVEL = util.SUBWARNING 72#LOG_LEVEL = logging.DEBUG 73 74DELTA = 0.1 75CHECK_TIMINGS = False # making true makes tests take a lot longer 76 # and can sometimes cause some non-serious 77 # failures because some calls block a bit 78 # longer than expected 79if CHECK_TIMINGS: 80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 81else: 82 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 83 84HAVE_GETVALUE = not getattr(_multiprocessing, 85 'HAVE_BROKEN_SEM_GETVALUE', False) 86 87WIN32 = (sys.platform == "win32") 88 89from multiprocess.connection import wait 90 91def wait_for_handle(handle, timeout): 92 if timeout is not None and timeout < 0.0: 93 timeout = None 94 return wait([handle], timeout) 95 96try: 97 MAXFD = os.sysconf("SC_OPEN_MAX") 98except: 99 MAXFD = 256 100 101# To speed up tests when using the forkserver, we can preload these: 102PRELOAD = ['__main__', 'test_multiprocessing_forkserver'] 103 104# 105# Some tests require ctypes 106# 107 108try: 109 from ctypes import Structure, c_int, c_double 110except ImportError: 111 Structure = object 112 c_int = c_double = None 113 114 115def check_enough_semaphores(): 116 """Check that the system supports enough semaphores to run the test.""" 117 # minimum number of semaphores available according to POSIX 118 nsems_min = 256 119 try: 120 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 121 except (AttributeError, ValueError): 122 # sysconf not available or setting not available 123 return 124 if nsems == -1 or nsems >= nsems_min: 125 return 126 raise unittest.SkipTest("The OS doesn't support enough semaphores " 127 "to run the test (required: %d)." % nsems_min) 128 129 130# 131# Creates a wrapper for a function which records the time it takes to finish 132# 133 134class TimingWrapper(object): 135 136 def __init__(self, func): 137 self.func = func 138 self.elapsed = None 139 140 def __call__(self, *args, **kwds): 141 t = time.time() 142 try: 143 return self.func(*args, **kwds) 144 finally: 145 self.elapsed = time.time() - t 146 147# 148# Base class for test cases 149# 150 151class BaseTestCase(object): 152 153 ALLOWED_TYPES = ('processes', 'manager', 'threads') 154 155 def assertTimingAlmostEqual(self, a, b): 156 if CHECK_TIMINGS: 157 self.assertAlmostEqual(a, b, 1) 158 159 def assertReturnsIfImplemented(self, value, func, *args): 160 try: 161 res = func(*args) 162 except NotImplementedError: 163 pass 164 else: 165 return self.assertEqual(value, res) 166 167 # For the sanity of Windows users, rather than crashing or freezing in 168 # multiple ways. 169 def __reduce__(self, *args): 170 raise NotImplementedError("shouldn't try to pickle a test case") 171 172 __reduce_ex__ = __reduce__ 173 174# 175# Return the value of a semaphore 176# 177 178def get_value(self): 179 try: 180 return self.get_value() 181 except AttributeError: 182 try: 183 return self._Semaphore__value 184 except AttributeError: 185 try: 186 return self._value 187 except AttributeError: 188 raise NotImplementedError 189 190# 191# Testcases 192# 193 194class _TestProcess(BaseTestCase): 195 196 ALLOWED_TYPES = ('processes', 'threads') 197 198 def test_current(self): 199 if self.TYPE == 'threads': 200 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 201 202 current = self.current_process() 203 authkey = current.authkey 204 205 self.assertTrue(current.is_alive()) 206 self.assertTrue(not current.daemon) 207 self.assertIsInstance(authkey, bytes) 208 self.assertTrue(len(authkey) > 0) 209 self.assertEqual(current.ident, os.getpid()) 210 self.assertEqual(current.exitcode, None) 211 212 def test_daemon_argument(self): 213 if self.TYPE == "threads": 214 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 215 216 # By default uses the current process's daemon flag. 217 proc0 = self.Process(target=self._test) 218 self.assertEqual(proc0.daemon, self.current_process().daemon) 219 proc1 = self.Process(target=self._test, daemon=True) 220 self.assertTrue(proc1.daemon) 221 proc2 = self.Process(target=self._test, daemon=False) 222 self.assertFalse(proc2.daemon) 223 224 @classmethod 225 def _test(cls, q, *args, **kwds): 226 current = cls.current_process() 227 q.put(args) 228 q.put(kwds) 229 q.put(current.name) 230 if cls.TYPE != 'threads': 231 q.put(bytes(current.authkey)) 232 q.put(current.pid) 233 234 def test_process(self): 235 q = self.Queue(1) 236 e = self.Event() 237 args = (q, 1, 2) 238 kwargs = {'hello':23, 'bye':2.54} 239 name = 'SomeProcess' 240 p = self.Process( 241 target=self._test, args=args, kwargs=kwargs, name=name 242 ) 243 p.daemon = True 244 current = self.current_process() 245 246 if self.TYPE != 'threads': 247 self.assertEqual(p.authkey, current.authkey) 248 self.assertEqual(p.is_alive(), False) 249 self.assertEqual(p.daemon, True) 250 self.assertNotIn(p, self.active_children()) 251 self.assertTrue(type(self.active_children()) is list) 252 self.assertEqual(p.exitcode, None) 253 254 p.start() 255 256 self.assertEqual(p.exitcode, None) 257 self.assertEqual(p.is_alive(), True) 258 self.assertIn(p, self.active_children()) 259 260 self.assertEqual(q.get(), args[1:]) 261 self.assertEqual(q.get(), kwargs) 262 self.assertEqual(q.get(), p.name) 263 if self.TYPE != 'threads': 264 self.assertEqual(q.get(), current.authkey) 265 self.assertEqual(q.get(), p.pid) 266 267 p.join() 268 269 self.assertEqual(p.exitcode, 0) 270 self.assertEqual(p.is_alive(), False) 271 self.assertNotIn(p, self.active_children()) 272 273 @classmethod 274 def _test_terminate(cls): 275 time.sleep(100) 276 277 def test_terminate(self): 278 if self.TYPE == 'threads': 279 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 280 281 p = self.Process(target=self._test_terminate) 282 p.daemon = True 283 p.start() 284 285 self.assertEqual(p.is_alive(), True) 286 self.assertIn(p, self.active_children()) 287 self.assertEqual(p.exitcode, None) 288 289 join = TimingWrapper(p.join) 290 291 self.assertEqual(join(0), None) 292 self.assertTimingAlmostEqual(join.elapsed, 0.0) 293 self.assertEqual(p.is_alive(), True) 294 295 self.assertEqual(join(-1), None) 296 self.assertTimingAlmostEqual(join.elapsed, 0.0) 297 self.assertEqual(p.is_alive(), True) 298 299 # XXX maybe terminating too soon causes the problems on Gentoo... 300 time.sleep(1) 301 302 p.terminate() 303 304 if hasattr(signal, 'alarm'): 305 # On the Gentoo buildbot waitpid() often seems to block forever. 306 # We use alarm() to interrupt it if it blocks for too long. 307 def handler(*args): 308 raise RuntimeError('join took too long: %s' % p) 309 old_handler = signal.signal(signal.SIGALRM, handler) 310 try: 311 signal.alarm(10) 312 self.assertEqual(join(), None) 313 finally: 314 signal.alarm(0) 315 signal.signal(signal.SIGALRM, old_handler) 316 else: 317 self.assertEqual(join(), None) 318 319 self.assertTimingAlmostEqual(join.elapsed, 0.0) 320 321 self.assertEqual(p.is_alive(), False) 322 self.assertNotIn(p, self.active_children()) 323 324 p.join() 325 326 # XXX sometimes get p.exitcode == 0 on Windows ... 327 #self.assertEqual(p.exitcode, -signal.SIGTERM) 328 329 def test_cpu_count(self): 330 try: 331 cpus = multiprocessing.cpu_count() 332 except NotImplementedError: 333 cpus = 1 334 self.assertTrue(type(cpus) is int) 335 self.assertTrue(cpus >= 1) 336 337 def test_active_children(self): 338 self.assertEqual(type(self.active_children()), list) 339 340 p = self.Process(target=time.sleep, args=(DELTA,)) 341 self.assertNotIn(p, self.active_children()) 342 343 p.daemon = True 344 p.start() 345 self.assertIn(p, self.active_children()) 346 347 p.join() 348 self.assertNotIn(p, self.active_children()) 349 350 @classmethod 351 def _test_recursion(cls, wconn, id): 352 wconn.send(id) 353 if len(id) < 2: 354 for i in range(2): 355 p = cls.Process( 356 target=cls._test_recursion, args=(wconn, id+[i]) 357 ) 358 p.start() 359 p.join() 360 361 @unittest.skipIf(True, "fails with is_dill(obj, child=True)") 362 def test_recursion(self): 363 rconn, wconn = self.Pipe(duplex=False) 364 self._test_recursion(wconn, []) 365 366 time.sleep(DELTA) 367 result = [] 368 while rconn.poll(): 369 result.append(rconn.recv()) 370 371 expected = [ 372 [], 373 [0], 374 [0, 0], 375 [0, 1], 376 [1], 377 [1, 0], 378 [1, 1] 379 ] 380 self.assertEqual(result, expected) 381 382 @classmethod 383 def _test_sentinel(cls, event): 384 event.wait(10.0) 385 386 def test_sentinel(self): 387 if self.TYPE == "threads": 388 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 389 event = self.Event() 390 p = self.Process(target=self._test_sentinel, args=(event,)) 391 with self.assertRaises(ValueError): 392 p.sentinel 393 p.start() 394 self.addCleanup(p.join) 395 sentinel = p.sentinel 396 self.assertIsInstance(sentinel, int) 397 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 398 event.set() 399 p.join() 400 self.assertTrue(wait_for_handle(sentinel, timeout=1)) 401 402# 403# 404# 405 406class _UpperCaser(multiprocessing.Process): 407 408 def __init__(self): 409 multiprocessing.Process.__init__(self) 410 self.child_conn, self.parent_conn = multiprocessing.Pipe() 411 412 def run(self): 413 self.parent_conn.close() 414 for s in iter(self.child_conn.recv, None): 415 self.child_conn.send(s.upper()) 416 self.child_conn.close() 417 418 def submit(self, s): 419 assert type(s) is str 420 self.parent_conn.send(s) 421 return self.parent_conn.recv() 422 423 def stop(self): 424 self.parent_conn.send(None) 425 self.parent_conn.close() 426 self.child_conn.close() 427 428class _TestSubclassingProcess(BaseTestCase): 429 430 ALLOWED_TYPES = ('processes',) 431 432 def test_subclassing(self): 433 uppercaser = _UpperCaser() 434 uppercaser.daemon = True 435 uppercaser.start() 436 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 437 self.assertEqual(uppercaser.submit('world'), 'WORLD') 438 uppercaser.stop() 439 uppercaser.join() 440 441 def test_stderr_flush(self): 442 # sys.stderr is flushed at process shutdown (issue #13812) 443 if self.TYPE == "threads": 444 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 445 446 testfn = test.support.TESTFN 447 self.addCleanup(test.support.unlink, testfn) 448 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 449 proc.start() 450 proc.join() 451 with open(testfn, 'r') as f: 452 err = f.read() 453 # The whole traceback was printed 454 self.assertIn("ZeroDivisionError", err) 455 self.assertIn("__init__.py", err) 456 self.assertIn("1/0 # MARKER", err) 457 458 @classmethod 459 def _test_stderr_flush(cls, testfn): 460 sys.stderr = open(testfn, 'w') 461 1/0 # MARKER 462 463 464 @classmethod 465 def _test_sys_exit(cls, reason, testfn): 466 sys.stderr = open(testfn, 'w') 467 sys.exit(reason) 468 469 def test_sys_exit(self): 470 # See Issue 13854 471 if self.TYPE == 'threads': 472 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 473 474 testfn = test.support.TESTFN 475 self.addCleanup(test.support.unlink, testfn) 476 477 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)): 478 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 479 p.daemon = True 480 p.start() 481 p.join(5) 482 self.assertEqual(p.exitcode, code) 483 484 with open(testfn, 'r') as f: 485 self.assertEqual(f.read().rstrip(), str(reason)) 486 487 for reason in (True, False, 8): 488 p = self.Process(target=sys.exit, args=(reason,)) 489 p.daemon = True 490 p.start() 491 p.join(5) 492 self.assertEqual(p.exitcode, reason) 493 494# 495# 496# 497 498def queue_empty(q): 499 if hasattr(q, 'empty'): 500 return q.empty() 501 else: 502 return q.qsize() == 0 503 504def queue_full(q, maxsize): 505 if hasattr(q, 'full'): 506 return q.full() 507 else: 508 return q.qsize() == maxsize 509 510 511class _TestQueue(BaseTestCase): 512 513 514 @classmethod 515 def _test_put(cls, queue, child_can_start, parent_can_continue): 516 child_can_start.wait() 517 for i in range(6): 518 queue.get() 519 parent_can_continue.set() 520 521 def test_put(self): 522 MAXSIZE = 6 523 queue = self.Queue(maxsize=MAXSIZE) 524 child_can_start = self.Event() 525 parent_can_continue = self.Event() 526 527 proc = self.Process( 528 target=self._test_put, 529 args=(queue, child_can_start, parent_can_continue) 530 ) 531 proc.daemon = True 532 proc.start() 533 534 self.assertEqual(queue_empty(queue), True) 535 self.assertEqual(queue_full(queue, MAXSIZE), False) 536 537 queue.put(1) 538 queue.put(2, True) 539 queue.put(3, True, None) 540 queue.put(4, False) 541 queue.put(5, False, None) 542 queue.put_nowait(6) 543 544 # the values may be in buffer but not yet in pipe so sleep a bit 545 time.sleep(DELTA) 546 547 self.assertEqual(queue_empty(queue), False) 548 self.assertEqual(queue_full(queue, MAXSIZE), True) 549 550 put = TimingWrapper(queue.put) 551 put_nowait = TimingWrapper(queue.put_nowait) 552 553 self.assertRaises(pyqueue.Full, put, 7, False) 554 self.assertTimingAlmostEqual(put.elapsed, 0) 555 556 self.assertRaises(pyqueue.Full, put, 7, False, None) 557 self.assertTimingAlmostEqual(put.elapsed, 0) 558 559 self.assertRaises(pyqueue.Full, put_nowait, 7) 560 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 561 562 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 563 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 564 565 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 566 self.assertTimingAlmostEqual(put.elapsed, 0) 567 568 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 569 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 570 571 child_can_start.set() 572 parent_can_continue.wait() 573 574 self.assertEqual(queue_empty(queue), True) 575 self.assertEqual(queue_full(queue, MAXSIZE), False) 576 577 proc.join() 578 579 @classmethod 580 def _test_get(cls, queue, child_can_start, parent_can_continue): 581 child_can_start.wait() 582 #queue.put(1) 583 queue.put(2) 584 queue.put(3) 585 queue.put(4) 586 queue.put(5) 587 parent_can_continue.set() 588 589 def test_get(self): 590 queue = self.Queue() 591 child_can_start = self.Event() 592 parent_can_continue = self.Event() 593 594 proc = self.Process( 595 target=self._test_get, 596 args=(queue, child_can_start, parent_can_continue) 597 ) 598 proc.daemon = True 599 proc.start() 600 601 self.assertEqual(queue_empty(queue), True) 602 603 child_can_start.set() 604 parent_can_continue.wait() 605 606 time.sleep(DELTA) 607 self.assertEqual(queue_empty(queue), False) 608 609 # Hangs unexpectedly, remove for now 610 #self.assertEqual(queue.get(), 1) 611 self.assertEqual(queue.get(True, None), 2) 612 self.assertEqual(queue.get(True), 3) 613 self.assertEqual(queue.get(timeout=1), 4) 614 self.assertEqual(queue.get_nowait(), 5) 615 616 self.assertEqual(queue_empty(queue), True) 617 618 get = TimingWrapper(queue.get) 619 get_nowait = TimingWrapper(queue.get_nowait) 620 621 self.assertRaises(pyqueue.Empty, get, False) 622 self.assertTimingAlmostEqual(get.elapsed, 0) 623 624 self.assertRaises(pyqueue.Empty, get, False, None) 625 self.assertTimingAlmostEqual(get.elapsed, 0) 626 627 self.assertRaises(pyqueue.Empty, get_nowait) 628 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 629 630 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 631 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 632 633 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 634 self.assertTimingAlmostEqual(get.elapsed, 0) 635 636 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 637 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 638 639 proc.join() 640 641 @classmethod 642 def _test_fork(cls, queue): 643 for i in range(10, 20): 644 queue.put(i) 645 # note that at this point the items may only be buffered, so the 646 # process cannot shutdown until the feeder thread has finished 647 # pushing items onto the pipe. 648 649 def test_fork(self): 650 # Old versions of Queue would fail to create a new feeder 651 # thread for a forked process if the original process had its 652 # own feeder thread. This test checks that this no longer 653 # happens. 654 655 queue = self.Queue() 656 657 # put items on queue so that main process starts a feeder thread 658 for i in range(10): 659 queue.put(i) 660 661 # wait to make sure thread starts before we fork a new process 662 time.sleep(DELTA) 663 664 # fork process 665 p = self.Process(target=self._test_fork, args=(queue,)) 666 p.daemon = True 667 p.start() 668 669 # check that all expected items are in the queue 670 for i in range(20): 671 self.assertEqual(queue.get(), i) 672 self.assertRaises(pyqueue.Empty, queue.get, False) 673 674 p.join() 675 676 def test_qsize(self): 677 q = self.Queue() 678 try: 679 self.assertEqual(q.qsize(), 0) 680 except NotImplementedError: 681 self.skipTest('qsize method not implemented') 682 q.put(1) 683 self.assertEqual(q.qsize(), 1) 684 q.put(5) 685 self.assertEqual(q.qsize(), 2) 686 q.get() 687 self.assertEqual(q.qsize(), 1) 688 q.get() 689 self.assertEqual(q.qsize(), 0) 690 691 @classmethod 692 def _test_task_done(cls, q): 693 for obj in iter(q.get, None): 694 time.sleep(DELTA) 695 q.task_done() 696 697 def test_task_done(self): 698 queue = self.JoinableQueue() 699 700 workers = [self.Process(target=self._test_task_done, args=(queue,)) 701 for i in range(4)] 702 703 for p in workers: 704 p.daemon = True 705 p.start() 706 707 for i in range(10): 708 queue.put(i) 709 710 queue.join() 711 712 for p in workers: 713 queue.put(None) 714 715 for p in workers: 716 p.join() 717 718 def test_no_import_lock_contention(self): 719 with test.support.temp_cwd(): 720 module_name = 'imported_by_an_imported_module' 721 with open(module_name + '.py', 'w') as f: 722 f.write("""if 1: 723 import multiprocess as multiprocessing 724 725 q = multiprocessing.Queue() 726 q.put('knock knock') 727 q.get(timeout=3) 728 q.close() 729 del q 730 """) 731 732 with test.support.DirsOnSysPath(os.getcwd()): 733 try: 734 __import__(module_name) 735 except pyqueue.Empty: 736 self.fail("Probable regression on import lock contention;" 737 " see Issue #22853") 738 739 def test_timeout(self): 740 q = multiprocessing.Queue() 741 start = time.time() 742 self.assertRaises(pyqueue.Empty, q.get, True, 0.200) 743 delta = time.time() - start 744 # Tolerate a delta of 30 ms because of the bad clock resolution on 745 # Windows (usually 15.6 ms) 746 self.assertGreaterEqual(delta, 0.170) 747 748# 749# 750# 751 752class _TestLock(BaseTestCase): 753 754 def test_lock(self): 755 lock = self.Lock() 756 self.assertEqual(lock.acquire(), True) 757 self.assertEqual(lock.acquire(False), False) 758 self.assertEqual(lock.release(), None) 759 self.assertRaises((ValueError, threading.ThreadError), lock.release) 760 761 def test_rlock(self): 762 lock = self.RLock() 763 self.assertEqual(lock.acquire(), True) 764 self.assertEqual(lock.acquire(), True) 765 self.assertEqual(lock.acquire(), True) 766 self.assertEqual(lock.release(), None) 767 self.assertEqual(lock.release(), None) 768 self.assertEqual(lock.release(), None) 769 self.assertRaises((AssertionError, RuntimeError), lock.release) 770 771 def test_lock_context(self): 772 with self.Lock(): 773 pass 774 775 776class _TestSemaphore(BaseTestCase): 777 778 def _test_semaphore(self, sem): 779 self.assertReturnsIfImplemented(2, get_value, sem) 780 self.assertEqual(sem.acquire(), True) 781 self.assertReturnsIfImplemented(1, get_value, sem) 782 self.assertEqual(sem.acquire(), True) 783 self.assertReturnsIfImplemented(0, get_value, sem) 784 self.assertEqual(sem.acquire(False), False) 785 self.assertReturnsIfImplemented(0, get_value, sem) 786 self.assertEqual(sem.release(), None) 787 self.assertReturnsIfImplemented(1, get_value, sem) 788 self.assertEqual(sem.release(), None) 789 self.assertReturnsIfImplemented(2, get_value, sem) 790 791 def test_semaphore(self): 792 sem = self.Semaphore(2) 793 self._test_semaphore(sem) 794 self.assertEqual(sem.release(), None) 795 self.assertReturnsIfImplemented(3, get_value, sem) 796 self.assertEqual(sem.release(), None) 797 self.assertReturnsIfImplemented(4, get_value, sem) 798 799 def test_bounded_semaphore(self): 800 sem = self.BoundedSemaphore(2) 801 self._test_semaphore(sem) 802 # Currently fails on OS/X 803 #if HAVE_GETVALUE: 804 # self.assertRaises(ValueError, sem.release) 805 # self.assertReturnsIfImplemented(2, get_value, sem) 806 807 def test_timeout(self): 808 if self.TYPE != 'processes': 809 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 810 811 sem = self.Semaphore(0) 812 acquire = TimingWrapper(sem.acquire) 813 814 self.assertEqual(acquire(False), False) 815 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 816 817 self.assertEqual(acquire(False, None), False) 818 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 819 820 self.assertEqual(acquire(False, TIMEOUT1), False) 821 self.assertTimingAlmostEqual(acquire.elapsed, 0) 822 823 self.assertEqual(acquire(True, TIMEOUT2), False) 824 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 825 826 self.assertEqual(acquire(timeout=TIMEOUT3), False) 827 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 828 829 830class _TestCondition(BaseTestCase): 831 832 @classmethod 833 def f(cls, cond, sleeping, woken, timeout=None): 834 cond.acquire() 835 sleeping.release() 836 cond.wait(timeout) 837 woken.release() 838 cond.release() 839 840 def check_invariant(self, cond): 841 # this is only supposed to succeed when there are no sleepers 842 if self.TYPE == 'processes': 843 try: 844 sleepers = (cond._sleeping_count.get_value() - 845 cond._woken_count.get_value()) 846 self.assertEqual(sleepers, 0) 847 self.assertEqual(cond._wait_semaphore.get_value(), 0) 848 except NotImplementedError: 849 pass 850 851 def test_notify(self): 852 cond = self.Condition() 853 sleeping = self.Semaphore(0) 854 woken = self.Semaphore(0) 855 856 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 857 p.daemon = True 858 p.start() 859 860 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 861 p.daemon = True 862 p.start() 863 864 # wait for both children to start sleeping 865 sleeping.acquire() 866 sleeping.acquire() 867 868 # check no process/thread has woken up 869 time.sleep(DELTA) 870 self.assertReturnsIfImplemented(0, get_value, woken) 871 872 # wake up one process/thread 873 cond.acquire() 874 cond.notify() 875 cond.release() 876 877 # check one process/thread has woken up 878 time.sleep(DELTA) 879 self.assertReturnsIfImplemented(1, get_value, woken) 880 881 # wake up another 882 cond.acquire() 883 cond.notify() 884 cond.release() 885 886 # check other has woken up 887 time.sleep(DELTA) 888 self.assertReturnsIfImplemented(2, get_value, woken) 889 890 # check state is not mucked up 891 self.check_invariant(cond) 892 p.join() 893 894 def test_notify_all(self): 895 cond = self.Condition() 896 sleeping = self.Semaphore(0) 897 woken = self.Semaphore(0) 898 899 # start some threads/processes which will timeout 900 for i in range(3): 901 p = self.Process(target=self.f, 902 args=(cond, sleeping, woken, TIMEOUT1)) 903 p.daemon = True 904 p.start() 905 906 t = threading.Thread(target=self.f, 907 args=(cond, sleeping, woken, TIMEOUT1)) 908 t.daemon = True 909 t.start() 910 911 # wait for them all to sleep 912 for i in range(6): 913 sleeping.acquire() 914 915 # check they have all timed out 916 for i in range(6): 917 woken.acquire() 918 self.assertReturnsIfImplemented(0, get_value, woken) 919 920 # check state is not mucked up 921 self.check_invariant(cond) 922 923 # start some more threads/processes 924 for i in range(3): 925 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 926 p.daemon = True 927 p.start() 928 929 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 930 t.daemon = True 931 t.start() 932 933 # wait for them to all sleep 934 for i in range(6): 935 sleeping.acquire() 936 937 # check no process/thread has woken up 938 time.sleep(DELTA) 939 self.assertReturnsIfImplemented(0, get_value, woken) 940 941 # wake them all up 942 cond.acquire() 943 cond.notify_all() 944 cond.release() 945 946 # check they have all woken 947 for i in range(10): 948 try: 949 if get_value(woken) == 6: 950 break 951 except NotImplementedError: 952 break 953 time.sleep(DELTA) 954 self.assertReturnsIfImplemented(6, get_value, woken) 955 956 # check state is not mucked up 957 self.check_invariant(cond) 958 959 def test_timeout(self): 960 cond = self.Condition() 961 wait = TimingWrapper(cond.wait) 962 cond.acquire() 963 res = wait(TIMEOUT1) 964 cond.release() 965 self.assertEqual(res, False) 966 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 967 968 @classmethod 969 def _test_waitfor_f(cls, cond, state): 970 with cond: 971 state.value = 0 972 cond.notify() 973 result = cond.wait_for(lambda : state.value==4) 974 if not result or state.value != 4: 975 sys.exit(1) 976 977 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 978 def test_waitfor(self): 979 # based on test in test/lock_tests.py 980 cond = self.Condition() 981 state = self.Value('i', -1) 982 983 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 984 p.daemon = True 985 p.start() 986 987 with cond: 988 result = cond.wait_for(lambda : state.value==0) 989 self.assertTrue(result) 990 self.assertEqual(state.value, 0) 991 992 for i in range(4): 993 time.sleep(0.01) 994 with cond: 995 state.value += 1 996 cond.notify() 997 998 p.join(5) 999 self.assertFalse(p.is_alive()) 1000 self.assertEqual(p.exitcode, 0) 1001 1002 @classmethod 1003 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 1004 sem.release() 1005 with cond: 1006 expected = 0.1 1007 dt = time.time() 1008 result = cond.wait_for(lambda : state.value==4, timeout=expected) 1009 dt = time.time() - dt 1010 # borrow logic in assertTimeout() from test/lock_tests.py 1011 if not result and expected * 0.6 < dt < expected * 10.0: 1012 success.value = True 1013 1014 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 1015 def test_waitfor_timeout(self): 1016 # based on test in test/lock_tests.py 1017 cond = self.Condition() 1018 state = self.Value('i', 0) 1019 success = self.Value('i', False) 1020 sem = self.Semaphore(0) 1021 1022 p = self.Process(target=self._test_waitfor_timeout_f, 1023 args=(cond, state, success, sem)) 1024 p.daemon = True 1025 p.start() 1026 self.assertTrue(sem.acquire(timeout=10)) 1027 1028 # Only increment 3 times, so state == 4 is never reached. 1029 for i in range(3): 1030 time.sleep(0.01) 1031 with cond: 1032 state.value += 1 1033 cond.notify() 1034 1035 p.join(5) 1036 self.assertTrue(success.value) 1037 1038 @classmethod 1039 def _test_wait_result(cls, c, pid): 1040 with c: 1041 c.notify() 1042 time.sleep(1) 1043 if pid is not None: 1044 os.kill(pid, signal.SIGINT) 1045 1046 def test_wait_result(self): 1047 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1048 pid = os.getpid() 1049 else: 1050 pid = None 1051 1052 c = self.Condition() 1053 with c: 1054 self.assertFalse(c.wait(0)) 1055 self.assertFalse(c.wait(0.1)) 1056 1057 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1058 p.start() 1059 1060 self.assertTrue(c.wait(10)) 1061 if pid is not None: 1062 self.assertRaises(KeyboardInterrupt, c.wait, 10) 1063 1064 p.join() 1065 1066 1067class _TestEvent(BaseTestCase): 1068 1069 @classmethod 1070 def _test_event(cls, event): 1071 time.sleep(TIMEOUT2) 1072 event.set() 1073 1074 def test_event(self): 1075 event = self.Event() 1076 wait = TimingWrapper(event.wait) 1077 1078 # Removed temporarily, due to API shear, this does not 1079 # work with threading._Event objects. is_set == isSet 1080 self.assertEqual(event.is_set(), False) 1081 1082 # Removed, threading.Event.wait() will return the value of the __flag 1083 # instead of None. API Shear with the semaphore backed mp.Event 1084 self.assertEqual(wait(0.0), False) 1085 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1086 self.assertEqual(wait(TIMEOUT1), False) 1087 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1088 1089 event.set() 1090 1091 # See note above on the API differences 1092 self.assertEqual(event.is_set(), True) 1093 self.assertEqual(wait(), True) 1094 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1095 self.assertEqual(wait(TIMEOUT1), True) 1096 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1097 # self.assertEqual(event.is_set(), True) 1098 1099 event.clear() 1100 1101 #self.assertEqual(event.is_set(), False) 1102 1103 p = self.Process(target=self._test_event, args=(event,)) 1104 p.daemon = True 1105 p.start() 1106 self.assertEqual(wait(), True) 1107 1108# 1109# Tests for Barrier - adapted from tests in test/lock_tests.py 1110# 1111 1112# Many of the tests for threading.Barrier use a list as an atomic 1113# counter: a value is appended to increment the counter, and the 1114# length of the list gives the value. We use the class DummyList 1115# for the same purpose. 1116 1117class _DummyList(object): 1118 1119 def __init__(self): 1120 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1121 lock = multiprocessing.Lock() 1122 self.__setstate__((wrapper, lock)) 1123 self._lengthbuf[0] = 0 1124 1125 def __setstate__(self, state): 1126 (self._wrapper, self._lock) = state 1127 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1128 1129 def __getstate__(self): 1130 return (self._wrapper, self._lock) 1131 1132 def append(self, _): 1133 with self._lock: 1134 self._lengthbuf[0] += 1 1135 1136 def __len__(self): 1137 with self._lock: 1138 return self._lengthbuf[0] 1139 1140def _wait(): 1141 # A crude wait/yield function not relying on synchronization primitives. 1142 time.sleep(0.01) 1143 1144 1145class Bunch(object): 1146 """ 1147 A bunch of threads. 1148 """ 1149 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1150 """ 1151 Construct a bunch of `n` threads running the same function `f`. 1152 If `wait_before_exit` is True, the threads won't terminate until 1153 do_finish() is called. 1154 """ 1155 self.f = f 1156 self.args = args 1157 self.n = n 1158 self.started = namespace.DummyList() 1159 self.finished = namespace.DummyList() 1160 self._can_exit = namespace.Event() 1161 if not wait_before_exit: 1162 self._can_exit.set() 1163 for i in range(n): 1164 p = namespace.Process(target=self.task) 1165 p.daemon = True 1166 p.start() 1167 1168 def task(self): 1169 pid = os.getpid() 1170 self.started.append(pid) 1171 try: 1172 self.f(*self.args) 1173 finally: 1174 self.finished.append(pid) 1175 self._can_exit.wait(30) 1176 assert self._can_exit.is_set() 1177 1178 def wait_for_started(self): 1179 while len(self.started) < self.n: 1180 _wait() 1181 1182 def wait_for_finished(self): 1183 while len(self.finished) < self.n: 1184 _wait() 1185 1186 def do_finish(self): 1187 self._can_exit.set() 1188 1189 1190class AppendTrue(object): 1191 def __init__(self, obj): 1192 self.obj = obj 1193 def __call__(self): 1194 self.obj.append(True) 1195 1196 1197class _TestBarrier(BaseTestCase): 1198 """ 1199 Tests for Barrier objects. 1200 """ 1201 N = 5 1202 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1203 1204 def setUp(self): 1205 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1206 1207 def tearDown(self): 1208 self.barrier.abort() 1209 self.barrier = None 1210 1211 def DummyList(self): 1212 if self.TYPE == 'threads': 1213 return [] 1214 elif self.TYPE == 'manager': 1215 return self.manager.list() 1216 else: 1217 return _DummyList() 1218 1219 def run_threads(self, f, args): 1220 b = Bunch(self, f, args, self.N-1) 1221 f(*args) 1222 b.wait_for_finished() 1223 1224 @classmethod 1225 def multipass(cls, barrier, results, n): 1226 m = barrier.parties 1227 assert m == cls.N 1228 for i in range(n): 1229 results[0].append(True) 1230 assert len(results[1]) == i * m 1231 barrier.wait() 1232 results[1].append(True) 1233 assert len(results[0]) == (i + 1) * m 1234 barrier.wait() 1235 try: 1236 assert barrier.n_waiting == 0 1237 except NotImplementedError: 1238 pass 1239 assert not barrier.broken 1240 1241 def test_barrier(self, passes=1): 1242 """ 1243 Test that a barrier is passed in lockstep 1244 """ 1245 results = [self.DummyList(), self.DummyList()] 1246 self.run_threads(self.multipass, (self.barrier, results, passes)) 1247 1248 def test_barrier_10(self): 1249 """ 1250 Test that a barrier works for 10 consecutive runs 1251 """ 1252 return self.test_barrier(10) 1253 1254 @classmethod 1255 def _test_wait_return_f(cls, barrier, queue): 1256 res = barrier.wait() 1257 queue.put(res) 1258 1259 def test_wait_return(self): 1260 """ 1261 test the return value from barrier.wait 1262 """ 1263 queue = self.Queue() 1264 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1265 results = [queue.get() for i in range(self.N)] 1266 self.assertEqual(results.count(0), 1) 1267 1268 @classmethod 1269 def _test_action_f(cls, barrier, results): 1270 barrier.wait() 1271 if len(results) != 1: 1272 raise RuntimeError 1273 1274 def test_action(self): 1275 """ 1276 Test the 'action' callback 1277 """ 1278 results = self.DummyList() 1279 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1280 self.run_threads(self._test_action_f, (barrier, results)) 1281 self.assertEqual(len(results), 1) 1282 1283 @classmethod 1284 def _test_abort_f(cls, barrier, results1, results2): 1285 try: 1286 i = barrier.wait() 1287 if i == cls.N//2: 1288 raise RuntimeError 1289 barrier.wait() 1290 results1.append(True) 1291 except threading.BrokenBarrierError: 1292 results2.append(True) 1293 except RuntimeError: 1294 barrier.abort() 1295 1296 def test_abort(self): 1297 """ 1298 Test that an abort will put the barrier in a broken state 1299 """ 1300 results1 = self.DummyList() 1301 results2 = self.DummyList() 1302 self.run_threads(self._test_abort_f, 1303 (self.barrier, results1, results2)) 1304 self.assertEqual(len(results1), 0) 1305 self.assertEqual(len(results2), self.N-1) 1306 self.assertTrue(self.barrier.broken) 1307 1308 @classmethod 1309 def _test_reset_f(cls, barrier, results1, results2, results3): 1310 i = barrier.wait() 1311 if i == cls.N//2: 1312 # Wait until the other threads are all in the barrier. 1313 while barrier.n_waiting < cls.N-1: 1314 time.sleep(0.001) 1315 barrier.reset() 1316 else: 1317 try: 1318 barrier.wait() 1319 results1.append(True) 1320 except threading.BrokenBarrierError: 1321 results2.append(True) 1322 # Now, pass the barrier again 1323 barrier.wait() 1324 results3.append(True) 1325 1326 def test_reset(self): 1327 """ 1328 Test that a 'reset' on a barrier frees the waiting threads 1329 """ 1330 results1 = self.DummyList() 1331 results2 = self.DummyList() 1332 results3 = self.DummyList() 1333 self.run_threads(self._test_reset_f, 1334 (self.barrier, results1, results2, results3)) 1335 self.assertEqual(len(results1), 0) 1336 self.assertEqual(len(results2), self.N-1) 1337 self.assertEqual(len(results3), self.N) 1338 1339 @classmethod 1340 def _test_abort_and_reset_f(cls, barrier, barrier2, 1341 results1, results2, results3): 1342 try: 1343 i = barrier.wait() 1344 if i == cls.N//2: 1345 raise RuntimeError 1346 barrier.wait() 1347 results1.append(True) 1348 except threading.BrokenBarrierError: 1349 results2.append(True) 1350 except RuntimeError: 1351 barrier.abort() 1352 # Synchronize and reset the barrier. Must synchronize first so 1353 # that everyone has left it when we reset, and after so that no 1354 # one enters it before the reset. 1355 if barrier2.wait() == cls.N//2: 1356 barrier.reset() 1357 barrier2.wait() 1358 barrier.wait() 1359 results3.append(True) 1360 1361 def test_abort_and_reset(self): 1362 """ 1363 Test that a barrier can be reset after being broken. 1364 """ 1365 results1 = self.DummyList() 1366 results2 = self.DummyList() 1367 results3 = self.DummyList() 1368 barrier2 = self.Barrier(self.N) 1369 1370 self.run_threads(self._test_abort_and_reset_f, 1371 (self.barrier, barrier2, results1, results2, results3)) 1372 self.assertEqual(len(results1), 0) 1373 self.assertEqual(len(results2), self.N-1) 1374 self.assertEqual(len(results3), self.N) 1375 1376 @classmethod 1377 def _test_timeout_f(cls, barrier, results): 1378 i = barrier.wait() 1379 if i == cls.N//2: 1380 # One thread is late! 1381 time.sleep(1.0) 1382 try: 1383 barrier.wait(0.5) 1384 except threading.BrokenBarrierError: 1385 results.append(True) 1386 1387 def test_timeout(self): 1388 """ 1389 Test wait(timeout) 1390 """ 1391 results = self.DummyList() 1392 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1393 self.assertEqual(len(results), self.barrier.parties) 1394 1395 @classmethod 1396 def _test_default_timeout_f(cls, barrier, results): 1397 i = barrier.wait(cls.defaultTimeout) 1398 if i == cls.N//2: 1399 # One thread is later than the default timeout 1400 time.sleep(1.0) 1401 try: 1402 barrier.wait() 1403 except threading.BrokenBarrierError: 1404 results.append(True) 1405 1406 def test_default_timeout(self): 1407 """ 1408 Test the barrier's default timeout 1409 """ 1410 barrier = self.Barrier(self.N, timeout=0.5) 1411 results = self.DummyList() 1412 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1413 self.assertEqual(len(results), barrier.parties) 1414 1415 def test_single_thread(self): 1416 b = self.Barrier(1) 1417 b.wait() 1418 b.wait() 1419 1420 @classmethod 1421 def _test_thousand_f(cls, barrier, passes, conn, lock): 1422 for i in range(passes): 1423 barrier.wait() 1424 with lock: 1425 conn.send(i) 1426 1427 def test_thousand(self): 1428 if self.TYPE == 'manager': 1429 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1430 passes = 1000 1431 lock = self.Lock() 1432 conn, child_conn = self.Pipe(False) 1433 for j in range(self.N): 1434 p = self.Process(target=self._test_thousand_f, 1435 args=(self.barrier, passes, child_conn, lock)) 1436 p.start() 1437 1438 for i in range(passes): 1439 for j in range(self.N): 1440 self.assertEqual(conn.recv(), i) 1441 1442# 1443# 1444# 1445 1446class _TestValue(BaseTestCase): 1447 1448 ALLOWED_TYPES = ('processes',) 1449 1450 codes_values = [ 1451 ('i', 4343, 24234), 1452 ('d', 3.625, -4.25), 1453 ('h', -232, 234), 1454 ('c', latin('x'), latin('y')) 1455 ] 1456 1457 def setUp(self): 1458 if not HAS_SHAREDCTYPES: 1459 self.skipTest("requires multiprocessing.sharedctypes") 1460 1461 @classmethod 1462 def _test(cls, values): 1463 for sv, cv in zip(values, cls.codes_values): 1464 sv.value = cv[2] 1465 1466 1467 def test_value(self, raw=False): 1468 if raw: 1469 values = [self.RawValue(code, value) 1470 for code, value, _ in self.codes_values] 1471 else: 1472 values = [self.Value(code, value) 1473 for code, value, _ in self.codes_values] 1474 1475 for sv, cv in zip(values, self.codes_values): 1476 self.assertEqual(sv.value, cv[1]) 1477 1478 proc = self.Process(target=self._test, args=(values,)) 1479 proc.daemon = True 1480 proc.start() 1481 proc.join() 1482 1483 for sv, cv in zip(values, self.codes_values): 1484 self.assertEqual(sv.value, cv[2]) 1485 1486 def test_rawvalue(self): 1487 self.test_value(raw=True) 1488 1489 def test_getobj_getlock(self): 1490 val1 = self.Value('i', 5) 1491 lock1 = val1.get_lock() 1492 obj1 = val1.get_obj() 1493 1494 val2 = self.Value('i', 5, lock=None) 1495 lock2 = val2.get_lock() 1496 obj2 = val2.get_obj() 1497 1498 lock = self.Lock() 1499 val3 = self.Value('i', 5, lock=lock) 1500 lock3 = val3.get_lock() 1501 obj3 = val3.get_obj() 1502 self.assertEqual(lock, lock3) 1503 1504 arr4 = self.Value('i', 5, lock=False) 1505 self.assertFalse(hasattr(arr4, 'get_lock')) 1506 self.assertFalse(hasattr(arr4, 'get_obj')) 1507 1508 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 1509 1510 arr5 = self.RawValue('i', 5) 1511 self.assertFalse(hasattr(arr5, 'get_lock')) 1512 self.assertFalse(hasattr(arr5, 'get_obj')) 1513 1514 1515class _TestArray(BaseTestCase): 1516 1517 ALLOWED_TYPES = ('processes',) 1518 1519 @classmethod 1520 def f(cls, seq): 1521 for i in range(1, len(seq)): 1522 seq[i] += seq[i-1] 1523 1524 @unittest.skipIf(c_int is None, "requires _ctypes") 1525 def test_array(self, raw=False): 1526 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 1527 if raw: 1528 arr = self.RawArray('i', seq) 1529 else: 1530 arr = self.Array('i', seq) 1531 1532 self.assertEqual(len(arr), len(seq)) 1533 self.assertEqual(arr[3], seq[3]) 1534 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 1535 1536 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 1537 1538 self.assertEqual(list(arr[:]), seq) 1539 1540 self.f(seq) 1541 1542 p = self.Process(target=self.f, args=(arr,)) 1543 p.daemon = True 1544 p.start() 1545 p.join() 1546 1547 self.assertEqual(list(arr[:]), seq) 1548 1549 @unittest.skipIf(c_int is None, "requires _ctypes") 1550 def test_array_from_size(self): 1551 size = 10 1552 # Test for zeroing (see issue #11675). 1553 # The repetition below strengthens the test by increasing the chances 1554 # of previously allocated non-zero memory being used for the new array 1555 # on the 2nd and 3rd loops. 1556 for _ in range(3): 1557 arr = self.Array('i', size) 1558 self.assertEqual(len(arr), size) 1559 self.assertEqual(list(arr), [0] * size) 1560 arr[:] = range(10) 1561 self.assertEqual(list(arr), list(range(10))) 1562 del arr 1563 1564 @unittest.skipIf(c_int is None, "requires _ctypes") 1565 def test_rawarray(self): 1566 self.test_array(raw=True) 1567 1568 @unittest.skipIf(c_int is None, "requires _ctypes") 1569 def test_getobj_getlock_obj(self): 1570 arr1 = self.Array('i', list(range(10))) 1571 lock1 = arr1.get_lock() 1572 obj1 = arr1.get_obj() 1573 1574 arr2 = self.Array('i', list(range(10)), lock=None) 1575 lock2 = arr2.get_lock() 1576 obj2 = arr2.get_obj() 1577 1578 lock = self.Lock() 1579 arr3 = self.Array('i', list(range(10)), lock=lock) 1580 lock3 = arr3.get_lock() 1581 obj3 = arr3.get_obj() 1582 self.assertEqual(lock, lock3) 1583 1584 arr4 = self.Array('i', range(10), lock=False) 1585 self.assertFalse(hasattr(arr4, 'get_lock')) 1586 self.assertFalse(hasattr(arr4, 'get_obj')) 1587 self.assertRaises(AttributeError, 1588 self.Array, 'i', range(10), lock='notalock') 1589 1590 arr5 = self.RawArray('i', range(10)) 1591 self.assertFalse(hasattr(arr5, 'get_lock')) 1592 self.assertFalse(hasattr(arr5, 'get_obj')) 1593 1594# 1595# 1596# 1597 1598class _TestContainers(BaseTestCase): 1599 1600 ALLOWED_TYPES = ('manager',) 1601 1602 def test_list(self): 1603 a = self.list(list(range(10))) 1604 self.assertEqual(a[:], list(range(10))) 1605 1606 b = self.list() 1607 self.assertEqual(b[:], []) 1608 1609 b.extend(list(range(5))) 1610 self.assertEqual(b[:], list(range(5))) 1611 1612 self.assertEqual(b[2], 2) 1613 self.assertEqual(b[2:10], [2,3,4]) 1614 1615 b *= 2 1616 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1617 1618 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1619 1620 self.assertEqual(a[:], list(range(10))) 1621 1622 d = [a, b] 1623 e = self.list(d) 1624 self.assertEqual( 1625 e[:], 1626 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1627 ) 1628 1629 f = self.list([a]) 1630 a.append('hello') 1631 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1632 1633 def test_dict(self): 1634 d = self.dict() 1635 indices = list(range(65, 70)) 1636 for i in indices: 1637 d[i] = chr(i) 1638 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1639 self.assertEqual(sorted(d.keys()), indices) 1640 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1641 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1642 1643 def test_namespace(self): 1644 n = self.Namespace() 1645 n.name = 'Bob' 1646 n.job = 'Builder' 1647 n._hidden = 'hidden' 1648 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1649 del n.job 1650 self.assertEqual(str(n), "Namespace(name='Bob')") 1651 self.assertTrue(hasattr(n, 'name')) 1652 self.assertTrue(not hasattr(n, 'job')) 1653 1654# 1655# 1656# 1657 1658def sqr(x, wait=0.0): 1659 time.sleep(wait) 1660 return x*x 1661 1662def mul(x, y): 1663 return x*y 1664 1665class SayWhenError(ValueError): pass 1666 1667def exception_throwing_generator(total, when): 1668 for i in range(total): 1669 if i == when: 1670 raise SayWhenError("Somebody said when") 1671 yield i 1672 1673class _TestPool(BaseTestCase): 1674 1675 @classmethod 1676 def setUpClass(cls): 1677 super().setUpClass() 1678 cls.pool = cls.Pool(4) 1679 1680 @classmethod 1681 def tearDownClass(cls): 1682 cls.pool.terminate() 1683 cls.pool.join() 1684 cls.pool = None 1685 super().tearDownClass() 1686 1687 def test_apply(self): 1688 papply = self.pool.apply 1689 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1690 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1691 1692 def test_map(self): 1693 pmap = self.pool.map 1694 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 1695 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 1696 list(map(sqr, list(range(100))))) 1697 1698 def test_starmap(self): 1699 psmap = self.pool.starmap 1700 tuples = list(zip(range(10), range(9,-1, -1))) 1701 self.assertEqual(psmap(mul, tuples), 1702 list(itertools.starmap(mul, tuples))) 1703 tuples = list(zip(range(100), range(99,-1, -1))) 1704 self.assertEqual(psmap(mul, tuples, chunksize=20), 1705 list(itertools.starmap(mul, tuples))) 1706 1707 def test_starmap_async(self): 1708 tuples = list(zip(range(100), range(99,-1, -1))) 1709 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 1710 list(itertools.starmap(mul, tuples))) 1711 1712 def test_map_async(self): 1713 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 1714 list(map(sqr, list(range(10))))) 1715 1716 def _test_map_async_callbacks(self): 1717 call_args = self.manager.list() if self.TYPE == 'manager' else [] 1718 self.pool.map_async(int, ['1'], 1719 callback=call_args.append, 1720 error_callback=call_args.append).wait() 1721 self.assertEqual(1, len(call_args)) 1722 self.assertEqual([1], call_args[0]) 1723 self.pool.map_async(int, ['a'], 1724 callback=call_args.append, 1725 error_callback=call_args.append).wait() 1726 self.assertEqual(2, len(call_args)) 1727 self.assertIsInstance(call_args[1], ValueError) 1728 1729 def test_map_unplicklable(self): 1730 # Issue #19425 -- failure to pickle should not cause a hang 1731 if self.TYPE == 'threads': 1732 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1733 class A(object): 1734 def __reduce__(self): 1735 raise RuntimeError('cannot pickle') 1736 with self.assertRaises(RuntimeError): 1737 self.pool.map(sqr, [A()]*10) 1738 1739 def test_map_chunksize(self): 1740 try: 1741 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1742 except multiprocessing.TimeoutError: 1743 self.fail("pool.map_async with chunksize stalled on null list") 1744 1745 def test_async(self): 1746 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1747 get = TimingWrapper(res.get) 1748 self.assertEqual(get(), 49) 1749 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1750 1751 def test_async_timeout(self): 1752 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 1753 get = TimingWrapper(res.get) 1754 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1755 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1756 1757 def test_imap(self): 1758 it = self.pool.imap(sqr, list(range(10))) 1759 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 1760 1761 it = self.pool.imap(sqr, list(range(10))) 1762 for i in range(10): 1763 self.assertEqual(next(it), i*i) 1764 self.assertRaises(StopIteration, it.__next__) 1765 1766 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 1767 for i in range(1000): 1768 self.assertEqual(next(it), i*i) 1769 self.assertRaises(StopIteration, it.__next__) 1770 1771 def test_imap_handle_iterable_exception(self): 1772 if self.TYPE == 'manager': 1773 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1774 1775 it = self.pool.imap(sqr, exception_throwing_generator(10, 3), 1) 1776 for i in range(3): 1777 self.assertEqual(next(it), i*i) 1778 self.assertRaises(SayWhenError, it.__next__) 1779 1780 # SayWhenError seen at start of problematic chunk's results 1781 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 2) 1782 for i in range(6): 1783 self.assertEqual(next(it), i*i) 1784 self.assertRaises(SayWhenError, it.__next__) 1785 it = self.pool.imap(sqr, exception_throwing_generator(20, 7), 4) 1786 for i in range(4): 1787 self.assertEqual(next(it), i*i) 1788 self.assertRaises(SayWhenError, it.__next__) 1789 1790 def test_imap_unordered(self): 1791 it = self.pool.imap_unordered(sqr, list(range(1000))) 1792 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1793 1794 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) 1795 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1796 1797 def test_imap_unordered_handle_iterable_exception(self): 1798 if self.TYPE == 'manager': 1799 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1800 1801 it = self.pool.imap_unordered(sqr, 1802 exception_throwing_generator(10, 3), 1803 1) 1804 expected_values = list(map(sqr, list(range(10)))) 1805 with self.assertRaises(SayWhenError): 1806 # imap_unordered makes it difficult to anticipate the SayWhenError 1807 for i in range(10): 1808 value = next(it) 1809 self.assertIn(value, expected_values) 1810 expected_values.remove(value) 1811 1812 it = self.pool.imap_unordered(sqr, 1813 exception_throwing_generator(20, 7), 1814 2) 1815 expected_values = list(map(sqr, list(range(20)))) 1816 with self.assertRaises(SayWhenError): 1817 for i in range(20): 1818 value = next(it) 1819 self.assertIn(value, expected_values) 1820 expected_values.remove(value) 1821 1822 def test_make_pool(self): 1823 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1824 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1825 1826 p = multiprocessing.Pool(3) 1827 self.assertEqual(3, len(p._pool)) 1828 p.close() 1829 p.join() 1830 1831 def test_terminate(self): 1832 result = self.pool.map_async( 1833 time.sleep, [0.1 for i in range(10000)], chunksize=1 1834 ) 1835 self.pool.terminate() 1836 join = TimingWrapper(self.pool.join) 1837 join() 1838 self.assertLess(join.elapsed, 0.5) 1839 1840 def test_empty_iterable(self): 1841 # See Issue 12157 1842 p = self.Pool(1) 1843 1844 self.assertEqual(p.map(sqr, []), []) 1845 self.assertEqual(list(p.imap(sqr, [])), []) 1846 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1847 self.assertEqual(p.map_async(sqr, []).get(), []) 1848 1849 p.close() 1850 p.join() 1851 1852 def test_context(self): 1853 if self.TYPE == 'processes': 1854 L = list(range(10)) 1855 expected = [sqr(i) for i in L] 1856 with multiprocessing.Pool(2) as p: 1857 r = p.map_async(sqr, L) 1858 self.assertEqual(r.get(), expected) 1859 self.assertRaises(ValueError, p.map_async, sqr, L) 1860 1861 @classmethod 1862 def _test_traceback(cls): 1863 raise RuntimeError(123) # some comment 1864 1865 @unittest.skipIf(True, "fails with is_dill(obj, child=True)") 1866 def test_traceback(self): 1867 # We want ensure that the traceback from the child process is 1868 # contained in the traceback raised in the main process. 1869 if self.TYPE == 'processes': 1870 with self.Pool(1) as p: 1871 try: 1872 p.apply(self._test_traceback) 1873 except Exception as e: 1874 exc = e 1875 else: 1876 raise AssertionError('expected RuntimeError') 1877 self.assertIs(type(exc), RuntimeError) 1878 self.assertEqual(exc.args, (123,)) 1879 cause = exc.__cause__ 1880 self.assertIs(type(cause), multiprocessing.pool.RemoteTraceback) 1881 self.assertIn('raise RuntimeError(123) # some comment', cause.tb) 1882 1883 with test.support.captured_stderr() as f1: 1884 try: 1885 raise exc 1886 except RuntimeError: 1887 sys.excepthook(*sys.exc_info()) 1888 self.assertIn('raise RuntimeError(123) # some comment', 1889 f1.getvalue()) 1890 1891 @classmethod 1892 def _test_wrapped_exception(cls): 1893 raise RuntimeError('foo') 1894 1895 @unittest.skipIf(True, "fails with is_dill(obj, child=True)") 1896 def test_wrapped_exception(self): 1897 # Issue #20980: Should not wrap exception when using thread pool 1898 with self.Pool(1) as p: 1899 with self.assertRaises(RuntimeError): 1900 p.apply(self._test_wrapped_exception) 1901 1902 1903def raising(): 1904 raise KeyError("key") 1905 1906def unpickleable_result(): 1907 return lambda: 42 1908 1909class _TestPoolWorkerErrors(BaseTestCase): 1910 ALLOWED_TYPES = ('processes', ) 1911 1912 def test_async_error_callback(self): 1913 p = multiprocessing.Pool(2) 1914 1915 scratchpad = [None] 1916 def errback(exc): 1917 scratchpad[0] = exc 1918 1919 res = p.apply_async(raising, error_callback=errback) 1920 self.assertRaises(KeyError, res.get) 1921 self.assertTrue(scratchpad[0]) 1922 self.assertIsInstance(scratchpad[0], KeyError) 1923 1924 p.close() 1925 p.join() 1926 1927 def _test_unpickleable_result(self): 1928 from multiprocess.pool import MaybeEncodingError 1929 p = multiprocessing.Pool(2) 1930 1931 # Make sure we don't lose pool processes because of encoding errors. 1932 for iteration in range(20): 1933 1934 scratchpad = [None] 1935 def errback(exc): 1936 scratchpad[0] = exc 1937 1938 res = p.apply_async(unpickleable_result, error_callback=errback) 1939 self.assertRaises(MaybeEncodingError, res.get) 1940 wrapped = scratchpad[0] 1941 self.assertTrue(wrapped) 1942 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 1943 self.assertIsNotNone(wrapped.exc) 1944 self.assertIsNotNone(wrapped.value) 1945 1946 p.close() 1947 p.join() 1948 1949class _TestPoolWorkerLifetime(BaseTestCase): 1950 ALLOWED_TYPES = ('processes', ) 1951 1952 def test_pool_worker_lifetime(self): 1953 p = multiprocessing.Pool(3, maxtasksperchild=10) 1954 self.assertEqual(3, len(p._pool)) 1955 origworkerpids = [w.pid for w in p._pool] 1956 # Run many tasks so each worker gets replaced (hopefully) 1957 results = [] 1958 for i in range(100): 1959 results.append(p.apply_async(sqr, (i, ))) 1960 # Fetch the results and verify we got the right answers, 1961 # also ensuring all the tasks have completed. 1962 for (j, res) in enumerate(results): 1963 self.assertEqual(res.get(), sqr(j)) 1964 # Refill the pool 1965 p._repopulate_pool() 1966 # Wait until all workers are alive 1967 # (countdown * DELTA = 5 seconds max startup process time) 1968 countdown = 50 1969 while countdown and not all(w.is_alive() for w in p._pool): 1970 countdown -= 1 1971 time.sleep(DELTA) 1972 finalworkerpids = [w.pid for w in p._pool] 1973 # All pids should be assigned. See issue #7805. 1974 self.assertNotIn(None, origworkerpids) 1975 self.assertNotIn(None, finalworkerpids) 1976 # Finally, check that the worker pids have changed 1977 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1978 p.close() 1979 p.join() 1980 1981 def test_pool_worker_lifetime_early_close(self): 1982 # Issue #10332: closing a pool whose workers have limited lifetimes 1983 # before all the tasks completed would make join() hang. 1984 p = multiprocessing.Pool(3, maxtasksperchild=1) 1985 results = [] 1986 for i in range(6): 1987 results.append(p.apply_async(sqr, (i, 0.3))) 1988 p.close() 1989 p.join() 1990 # check the results 1991 for (j, res) in enumerate(results): 1992 self.assertEqual(res.get(), sqr(j)) 1993 1994# 1995# Test of creating a customized manager class 1996# 1997 1998from multiprocess.managers import BaseManager, BaseProxy, RemoteError 1999 2000class FooBar(object): 2001 def f(self): 2002 return 'f()' 2003 def g(self): 2004 raise ValueError 2005 def _h(self): 2006 return '_h()' 2007 2008def baz(): 2009 for i in range(10): 2010 yield i*i 2011 2012class IteratorProxy(BaseProxy): 2013 _exposed_ = ('__next__',) 2014 def __iter__(self): 2015 return self 2016 def __next__(self): 2017 return self._callmethod('__next__') 2018 2019class MyManager(BaseManager): 2020 pass 2021 2022MyManager.register('Foo', callable=FooBar) 2023MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 2024MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 2025 2026 2027class _TestMyManager(BaseTestCase): 2028 2029 ALLOWED_TYPES = ('manager',) 2030 2031 def test_mymanager(self): 2032 manager = MyManager() 2033 manager.start() 2034 self.common(manager) 2035 manager.shutdown() 2036 2037 # If the manager process exited cleanly then the exitcode 2038 # will be zero. Otherwise (after a short timeout) 2039 # terminate() is used, resulting in an exitcode of -SIGTERM. 2040 self.assertEqual(manager._process.exitcode, 0) 2041 2042 def test_mymanager_context(self): 2043 with MyManager() as manager: 2044 self.common(manager) 2045 self.assertEqual(manager._process.exitcode, 0) 2046 2047 def test_mymanager_context_prestarted(self): 2048 manager = MyManager() 2049 manager.start() 2050 with manager: 2051 self.common(manager) 2052 self.assertEqual(manager._process.exitcode, 0) 2053 2054 def common(self, manager): 2055 foo = manager.Foo() 2056 bar = manager.Bar() 2057 baz = manager.baz() 2058 2059 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 2060 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 2061 2062 self.assertEqual(foo_methods, ['f', 'g']) 2063 self.assertEqual(bar_methods, ['f', '_h']) 2064 2065 self.assertEqual(foo.f(), 'f()') 2066 self.assertRaises(ValueError, foo.g) 2067 self.assertEqual(foo._callmethod('f'), 'f()') 2068 self.assertRaises(RemoteError, foo._callmethod, '_h') 2069 2070 self.assertEqual(bar.f(), 'f()') 2071 self.assertEqual(bar._h(), '_h()') 2072 self.assertEqual(bar._callmethod('f'), 'f()') 2073 self.assertEqual(bar._callmethod('_h'), '_h()') 2074 2075 self.assertEqual(list(baz), [i*i for i in range(10)]) 2076 2077 2078# 2079# Test of connecting to a remote server and using xmlrpclib for serialization 2080# 2081 2082_queue = pyqueue.Queue() 2083def get_queue(): 2084 return _queue 2085 2086class QueueManager(BaseManager): 2087 '''manager class used by server process''' 2088QueueManager.register('get_queue', callable=get_queue) 2089 2090class QueueManager2(BaseManager): 2091 '''manager class which specifies the same interface as QueueManager''' 2092QueueManager2.register('get_queue') 2093 2094 2095SERIALIZER = 'xmlrpclib' 2096 2097class _TestRemoteManager(BaseTestCase): 2098 2099 ALLOWED_TYPES = ('manager',) 2100 values = ['hello world', None, True, 2.25, 2101 'hall\xe5 v\xe4rlden', 2102 '\u043f\u0440\u0438\u0432\u0456\u0442 \u0441\u0432\u0456\u0442', 2103 b'hall\xe5 v\xe4rlden', 2104 ] 2105 result = values[:] 2106 2107 @classmethod 2108 def _putter(cls, address, authkey): 2109 manager = QueueManager2( 2110 address=address, authkey=authkey, serializer=SERIALIZER 2111 ) 2112 manager.connect() 2113 queue = manager.get_queue() 2114 # Note that xmlrpclib will deserialize object as a list not a tuple 2115 queue.put(tuple(cls.values)) 2116 2117 def test_remote(self): 2118 authkey = os.urandom(32) 2119 2120 manager = QueueManager( 2121 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 2122 ) 2123 manager.start() 2124 2125 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2126 p.daemon = True 2127 p.start() 2128 2129 manager2 = QueueManager2( 2130 address=manager.address, authkey=authkey, serializer=SERIALIZER 2131 ) 2132 manager2.connect() 2133 queue = manager2.get_queue() 2134 2135 self.assertEqual(queue.get(), self.result) 2136 2137 # Because we are using xmlrpclib for serialization instead of 2138 # pickle this will cause a serialization error. 2139 self.assertRaises(Exception, queue.put, time.sleep) 2140 2141 # Make queue finalizer run before the server is stopped 2142 del queue 2143 manager.shutdown() 2144 2145class _TestManagerRestart(BaseTestCase): 2146 2147 @classmethod 2148 def _putter(cls, address, authkey): 2149 manager = QueueManager( 2150 address=address, authkey=authkey, serializer=SERIALIZER) 2151 manager.connect() 2152 queue = manager.get_queue() 2153 queue.put('hello world') 2154 2155 def test_rapid_restart(self): 2156 authkey = os.urandom(32) 2157 manager = QueueManager( 2158 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2159 srvr = manager.get_server() 2160 addr = srvr.address 2161 # Close the connection.Listener socket which gets opened as a part 2162 # of manager.get_server(). It's not needed for the test. 2163 srvr.listener.close() 2164 manager.start() 2165 2166 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2167 p.daemon = True 2168 p.start() 2169 queue = manager.get_queue() 2170 self.assertEqual(queue.get(), 'hello world') 2171 del queue 2172 manager.shutdown() 2173 manager = QueueManager( 2174 address=addr, authkey=authkey, serializer=SERIALIZER) 2175 try: 2176 manager.start() 2177 except OSError as e: 2178 if e.errno != errno.EADDRINUSE: 2179 raise 2180 # Retry after some time, in case the old socket was lingering 2181 # (sporadic failure on buildbots) 2182 time.sleep(1.0) 2183 manager = QueueManager( 2184 address=addr, authkey=authkey, serializer=SERIALIZER) 2185 manager.shutdown() 2186 2187# 2188# 2189# 2190 2191SENTINEL = latin('') 2192 2193class _TestConnection(BaseTestCase): 2194 2195 ALLOWED_TYPES = ('processes', 'threads') 2196 2197 @classmethod 2198 def _echo(cls, conn): 2199 for msg in iter(conn.recv_bytes, SENTINEL): 2200 conn.send_bytes(msg) 2201 conn.close() 2202 2203 def test_connection(self): 2204 conn, child_conn = self.Pipe() 2205 2206 p = self.Process(target=self._echo, args=(child_conn,)) 2207 p.daemon = True 2208 p.start() 2209 2210 seq = [1, 2.25, None] 2211 msg = latin('hello world') 2212 longmsg = msg * 10 2213 arr = array.array('i', list(range(4))) 2214 2215 if self.TYPE == 'processes': 2216 self.assertEqual(type(conn.fileno()), int) 2217 2218 self.assertEqual(conn.send(seq), None) 2219 self.assertEqual(conn.recv(), seq) 2220 2221 self.assertEqual(conn.send_bytes(msg), None) 2222 self.assertEqual(conn.recv_bytes(), msg) 2223 2224 if self.TYPE == 'processes': 2225 buffer = array.array('i', [0]*10) 2226 expected = list(arr) + [0] * (10 - len(arr)) 2227 self.assertEqual(conn.send_bytes(arr), None) 2228 self.assertEqual(conn.recv_bytes_into(buffer), 2229 len(arr) * buffer.itemsize) 2230 self.assertEqual(list(buffer), expected) 2231 2232 buffer = array.array('i', [0]*10) 2233 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 2234 self.assertEqual(conn.send_bytes(arr), None) 2235 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 2236 len(arr) * buffer.itemsize) 2237 self.assertEqual(list(buffer), expected) 2238 2239 buffer = bytearray(latin(' ' * 40)) 2240 self.assertEqual(conn.send_bytes(longmsg), None) 2241 try: 2242 res = conn.recv_bytes_into(buffer) 2243 except multiprocessing.BufferTooShort as e: 2244 self.assertEqual(e.args, (longmsg,)) 2245 else: 2246 self.fail('expected BufferTooShort, got %s' % res) 2247 2248 poll = TimingWrapper(conn.poll) 2249 2250 self.assertEqual(poll(), False) 2251 self.assertTimingAlmostEqual(poll.elapsed, 0) 2252 2253 self.assertEqual(poll(-1), False) 2254 self.assertTimingAlmostEqual(poll.elapsed, 0) 2255 2256 self.assertEqual(poll(TIMEOUT1), False) 2257 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 2258 2259 conn.send(None) 2260 time.sleep(.1) 2261 2262 self.assertEqual(poll(TIMEOUT1), True) 2263 self.assertTimingAlmostEqual(poll.elapsed, 0) 2264 2265 self.assertEqual(conn.recv(), None) 2266 2267 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 2268 conn.send_bytes(really_big_msg) 2269 self.assertEqual(conn.recv_bytes(), really_big_msg) 2270 2271 conn.send_bytes(SENTINEL) # tell child to quit 2272 child_conn.close() 2273 2274 if self.TYPE == 'processes': 2275 self.assertEqual(conn.readable, True) 2276 self.assertEqual(conn.writable, True) 2277 self.assertRaises(EOFError, conn.recv) 2278 self.assertRaises(EOFError, conn.recv_bytes) 2279 2280 p.join() 2281 2282 def test_duplex_false(self): 2283 reader, writer = self.Pipe(duplex=False) 2284 self.assertEqual(writer.send(1), None) 2285 self.assertEqual(reader.recv(), 1) 2286 if self.TYPE == 'processes': 2287 self.assertEqual(reader.readable, True) 2288 self.assertEqual(reader.writable, False) 2289 self.assertEqual(writer.readable, False) 2290 self.assertEqual(writer.writable, True) 2291 self.assertRaises(OSError, reader.send, 2) 2292 self.assertRaises(OSError, writer.recv) 2293 self.assertRaises(OSError, writer.poll) 2294 2295 def test_spawn_close(self): 2296 # We test that a pipe connection can be closed by parent 2297 # process immediately after child is spawned. On Windows this 2298 # would have sometimes failed on old versions because 2299 # child_conn would be closed before the child got a chance to 2300 # duplicate it. 2301 conn, child_conn = self.Pipe() 2302 2303 p = self.Process(target=self._echo, args=(child_conn,)) 2304 p.daemon = True 2305 p.start() 2306 child_conn.close() # this might complete before child initializes 2307 2308 msg = latin('hello') 2309 conn.send_bytes(msg) 2310 self.assertEqual(conn.recv_bytes(), msg) 2311 2312 conn.send_bytes(SENTINEL) 2313 conn.close() 2314 p.join() 2315 2316 def test_sendbytes(self): 2317 if self.TYPE != 'processes': 2318 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2319 2320 msg = latin('abcdefghijklmnopqrstuvwxyz') 2321 a, b = self.Pipe() 2322 2323 a.send_bytes(msg) 2324 self.assertEqual(b.recv_bytes(), msg) 2325 2326 a.send_bytes(msg, 5) 2327 self.assertEqual(b.recv_bytes(), msg[5:]) 2328 2329 a.send_bytes(msg, 7, 8) 2330 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 2331 2332 a.send_bytes(msg, 26) 2333 self.assertEqual(b.recv_bytes(), latin('')) 2334 2335 a.send_bytes(msg, 26, 0) 2336 self.assertEqual(b.recv_bytes(), latin('')) 2337 2338 self.assertRaises(ValueError, a.send_bytes, msg, 27) 2339 2340 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 2341 2342 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 2343 2344 self.assertRaises(ValueError, a.send_bytes, msg, -1) 2345 2346 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 2347 2348 @classmethod 2349 def _is_fd_assigned(cls, fd): 2350 try: 2351 os.fstat(fd) 2352 except OSError as e: 2353 if e.errno == errno.EBADF: 2354 return False 2355 raise 2356 else: 2357 return True 2358 2359 @classmethod 2360 def _writefd(cls, conn, data, create_dummy_fds=False): 2361 if create_dummy_fds: 2362 for i in range(0, 256): 2363 if not cls._is_fd_assigned(i): 2364 os.dup2(conn.fileno(), i) 2365 fd = reduction.recv_handle(conn) 2366 if msvcrt: 2367 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 2368 os.write(fd, data) 2369 os.close(fd) 2370 2371 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2372 def test_fd_transfer(self): 2373 if self.TYPE != 'processes': 2374 self.skipTest("only makes sense with processes") 2375 conn, child_conn = self.Pipe(duplex=True) 2376 2377 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 2378 p.daemon = True 2379 p.start() 2380 self.addCleanup(test.support.unlink, test.support.TESTFN) 2381 with open(test.support.TESTFN, "wb") as f: 2382 fd = f.fileno() 2383 if msvcrt: 2384 fd = msvcrt.get_osfhandle(fd) 2385 reduction.send_handle(conn, fd, p.pid) 2386 p.join() 2387 with open(test.support.TESTFN, "rb") as f: 2388 self.assertEqual(f.read(), b"foo") 2389 2390 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2391 @unittest.skipIf(sys.platform == "win32", 2392 "test semantics don't make sense on Windows") 2393 @unittest.skipIf(MAXFD <= 256, 2394 "largest assignable fd number is too small") 2395 @unittest.skipUnless(hasattr(os, "dup2"), 2396 "test needs os.dup2()") 2397 def test_large_fd_transfer(self): 2398 # With fd > 256 (issue #11657) 2399 if self.TYPE != 'processes': 2400 self.skipTest("only makes sense with processes") 2401 conn, child_conn = self.Pipe(duplex=True) 2402 2403 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 2404 p.daemon = True 2405 p.start() 2406 self.addCleanup(test.support.unlink, test.support.TESTFN) 2407 with open(test.support.TESTFN, "wb") as f: 2408 fd = f.fileno() 2409 for newfd in range(256, MAXFD): 2410 if not self._is_fd_assigned(newfd): 2411 break 2412 else: 2413 self.fail("could not find an unassigned large file descriptor") 2414 os.dup2(fd, newfd) 2415 try: 2416 reduction.send_handle(conn, newfd, p.pid) 2417 finally: 2418 os.close(newfd) 2419 p.join() 2420 with open(test.support.TESTFN, "rb") as f: 2421 self.assertEqual(f.read(), b"bar") 2422 2423 @classmethod 2424 def _send_data_without_fd(self, conn): 2425 os.write(conn.fileno(), b"\0") 2426 2427 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2428 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 2429 def test_missing_fd_transfer(self): 2430 # Check that exception is raised when received data is not 2431 # accompanied by a file descriptor in ancillary data. 2432 if self.TYPE != 'processes': 2433 self.skipTest("only makes sense with processes") 2434 conn, child_conn = self.Pipe(duplex=True) 2435 2436 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 2437 p.daemon = True 2438 p.start() 2439 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 2440 p.join() 2441 2442 def test_context(self): 2443 a, b = self.Pipe() 2444 2445 with a, b: 2446 a.send(1729) 2447 self.assertEqual(b.recv(), 1729) 2448 if self.TYPE == 'processes': 2449 self.assertFalse(a.closed) 2450 self.assertFalse(b.closed) 2451 2452 if self.TYPE == 'processes': 2453 self.assertTrue(a.closed) 2454 self.assertTrue(b.closed) 2455 self.assertRaises(OSError, a.recv) 2456 self.assertRaises(OSError, b.recv) 2457 2458class _TestListener(BaseTestCase): 2459 2460 ALLOWED_TYPES = ('processes',) 2461 2462 def test_multiple_bind(self): 2463 for family in self.connection.families: 2464 l = self.connection.Listener(family=family) 2465 self.addCleanup(l.close) 2466 self.assertRaises(OSError, self.connection.Listener, 2467 l.address, family) 2468 2469 def test_context(self): 2470 with self.connection.Listener() as l: 2471 with self.connection.Client(l.address) as c: 2472 with l.accept() as d: 2473 c.send(1729) 2474 self.assertEqual(d.recv(), 1729) 2475 2476 if self.TYPE == 'processes': 2477 self.assertRaises(OSError, l.accept) 2478 2479class _TestListenerClient(BaseTestCase): 2480 2481 ALLOWED_TYPES = ('processes', 'threads') 2482 2483 @classmethod 2484 def _test(cls, address): 2485 conn = cls.connection.Client(address) 2486 conn.send('hello') 2487 conn.close() 2488 2489 def test_listener_client(self): 2490 for family in self.connection.families: 2491 l = self.connection.Listener(family=family) 2492 p = self.Process(target=self._test, args=(l.address,)) 2493 p.daemon = True 2494 p.start() 2495 conn = l.accept() 2496 self.assertEqual(conn.recv(), 'hello') 2497 p.join() 2498 l.close() 2499 2500 def test_issue14725(self): 2501 l = self.connection.Listener() 2502 p = self.Process(target=self._test, args=(l.address,)) 2503 p.daemon = True 2504 p.start() 2505 time.sleep(1) 2506 # On Windows the client process should by now have connected, 2507 # written data and closed the pipe handle by now. This causes 2508 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 2509 # 14725. 2510 conn = l.accept() 2511 self.assertEqual(conn.recv(), 'hello') 2512 conn.close() 2513 p.join() 2514 l.close() 2515 2516 def test_issue16955(self): 2517 for fam in self.connection.families: 2518 l = self.connection.Listener(family=fam) 2519 c = self.connection.Client(l.address) 2520 a = l.accept() 2521 a.send_bytes(b"hello") 2522 self.assertTrue(c.poll(1)) 2523 a.close() 2524 c.close() 2525 l.close() 2526 2527class _TestPoll(BaseTestCase): 2528 2529 ALLOWED_TYPES = ('processes', 'threads') 2530 2531 def test_empty_string(self): 2532 a, b = self.Pipe() 2533 self.assertEqual(a.poll(), False) 2534 b.send_bytes(b'') 2535 self.assertEqual(a.poll(), True) 2536 self.assertEqual(a.poll(), True) 2537 2538 @classmethod 2539 def _child_strings(cls, conn, strings): 2540 for s in strings: 2541 time.sleep(0.1) 2542 conn.send_bytes(s) 2543 conn.close() 2544 2545 def test_strings(self): 2546 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 2547 a, b = self.Pipe() 2548 p = self.Process(target=self._child_strings, args=(b, strings)) 2549 p.start() 2550 2551 for s in strings: 2552 for i in range(200): 2553 if a.poll(0.01): 2554 break 2555 x = a.recv_bytes() 2556 self.assertEqual(s, x) 2557 2558 p.join() 2559 2560 @classmethod 2561 def _child_boundaries(cls, r): 2562 # Polling may "pull" a message in to the child process, but we 2563 # don't want it to pull only part of a message, as that would 2564 # corrupt the pipe for any other processes which might later 2565 # read from it. 2566 r.poll(5) 2567 2568 def test_boundaries(self): 2569 r, w = self.Pipe(False) 2570 p = self.Process(target=self._child_boundaries, args=(r,)) 2571 p.start() 2572 time.sleep(2) 2573 L = [b"first", b"second"] 2574 for obj in L: 2575 w.send_bytes(obj) 2576 w.close() 2577 p.join() 2578 self.assertIn(r.recv_bytes(), L) 2579 2580 @classmethod 2581 def _child_dont_merge(cls, b): 2582 b.send_bytes(b'a') 2583 b.send_bytes(b'b') 2584 b.send_bytes(b'cd') 2585 2586 def test_dont_merge(self): 2587 a, b = self.Pipe() 2588 self.assertEqual(a.poll(0.0), False) 2589 self.assertEqual(a.poll(0.1), False) 2590 2591 p = self.Process(target=self._child_dont_merge, args=(b,)) 2592 p.start() 2593 2594 self.assertEqual(a.recv_bytes(), b'a') 2595 self.assertEqual(a.poll(1.0), True) 2596 self.assertEqual(a.poll(1.0), True) 2597 self.assertEqual(a.recv_bytes(), b'b') 2598 self.assertEqual(a.poll(1.0), True) 2599 self.assertEqual(a.poll(1.0), True) 2600 self.assertEqual(a.poll(0.0), True) 2601 self.assertEqual(a.recv_bytes(), b'cd') 2602 2603 p.join() 2604 2605# 2606# Test of sending connection and socket objects between processes 2607# 2608 2609@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2610class _TestPicklingConnections(BaseTestCase): 2611 2612 ALLOWED_TYPES = ('processes',) 2613 2614 @classmethod 2615 def tearDownClass(cls): 2616 from multiprocess import resource_sharer 2617 resource_sharer.stop(timeout=5) 2618 2619 @classmethod 2620 def _listener(cls, conn, families): 2621 for fam in families: 2622 l = cls.connection.Listener(family=fam) 2623 conn.send(l.address) 2624 new_conn = l.accept() 2625 conn.send(new_conn) 2626 new_conn.close() 2627 l.close() 2628 2629 l = socket.socket() 2630 l.bind((test.support.HOST, 0)) 2631 l.listen() 2632 conn.send(l.getsockname()) 2633 new_conn, addr = l.accept() 2634 conn.send(new_conn) 2635 new_conn.close() 2636 l.close() 2637 2638 conn.recv() 2639 2640 @classmethod 2641 def _remote(cls, conn): 2642 for (address, msg) in iter(conn.recv, None): 2643 client = cls.connection.Client(address) 2644 client.send(msg.upper()) 2645 client.close() 2646 2647 address, msg = conn.recv() 2648 client = socket.socket() 2649 client.connect(address) 2650 client.sendall(msg.upper()) 2651 client.close() 2652 2653 conn.close() 2654 2655 def test_pickling(self): 2656 families = self.connection.families 2657 2658 lconn, lconn0 = self.Pipe() 2659 lp = self.Process(target=self._listener, args=(lconn0, families)) 2660 lp.daemon = True 2661 lp.start() 2662 lconn0.close() 2663 2664 rconn, rconn0 = self.Pipe() 2665 rp = self.Process(target=self._remote, args=(rconn0,)) 2666 rp.daemon = True 2667 rp.start() 2668 rconn0.close() 2669 2670 for fam in families: 2671 msg = ('This connection uses family %s' % fam).encode('ascii') 2672 address = lconn.recv() 2673 rconn.send((address, msg)) 2674 new_conn = lconn.recv() 2675 self.assertEqual(new_conn.recv(), msg.upper()) 2676 2677 rconn.send(None) 2678 2679 msg = latin('This connection uses a normal socket') 2680 address = lconn.recv() 2681 rconn.send((address, msg)) 2682 new_conn = lconn.recv() 2683 buf = [] 2684 while True: 2685 s = new_conn.recv(100) 2686 if not s: 2687 break 2688 buf.append(s) 2689 buf = b''.join(buf) 2690 self.assertEqual(buf, msg.upper()) 2691 new_conn.close() 2692 2693 lconn.send(None) 2694 2695 rconn.close() 2696 lconn.close() 2697 2698 lp.join() 2699 rp.join() 2700 2701 @classmethod 2702 def child_access(cls, conn): 2703 w = conn.recv() 2704 w.send('all is well') 2705 w.close() 2706 2707 r = conn.recv() 2708 msg = r.recv() 2709 conn.send(msg*2) 2710 2711 conn.close() 2712 2713 def test_access(self): 2714 # On Windows, if we do not specify a destination pid when 2715 # using DupHandle then we need to be careful to use the 2716 # correct access flags for DuplicateHandle(), or else 2717 # DupHandle.detach() will raise PermissionError. For example, 2718 # for a read only pipe handle we should use 2719 # access=FILE_GENERIC_READ. (Unfortunately 2720 # DUPLICATE_SAME_ACCESS does not work.) 2721 conn, child_conn = self.Pipe() 2722 p = self.Process(target=self.child_access, args=(child_conn,)) 2723 p.daemon = True 2724 p.start() 2725 child_conn.close() 2726 2727 r, w = self.Pipe(duplex=False) 2728 conn.send(w) 2729 w.close() 2730 self.assertEqual(r.recv(), 'all is well') 2731 r.close() 2732 2733 r, w = self.Pipe(duplex=False) 2734 conn.send(r) 2735 r.close() 2736 w.send('foobar') 2737 w.close() 2738 self.assertEqual(conn.recv(), 'foobar'*2) 2739 2740# 2741# 2742# 2743 2744class _TestHeap(BaseTestCase): 2745 2746 ALLOWED_TYPES = ('processes',) 2747 2748 def test_heap(self): 2749 iterations = 5000 2750 maxblocks = 50 2751 blocks = [] 2752 2753 # create and destroy lots of blocks of different sizes 2754 for i in range(iterations): 2755 size = int(random.lognormvariate(0, 1) * 1000) 2756 b = multiprocessing.heap.BufferWrapper(size) 2757 blocks.append(b) 2758 if len(blocks) > maxblocks: 2759 i = random.randrange(maxblocks) 2760 del blocks[i] 2761 2762 # get the heap object 2763 heap = multiprocessing.heap.BufferWrapper._heap 2764 2765 # verify the state of the heap 2766 all = [] 2767 occupied = 0 2768 heap._lock.acquire() 2769 self.addCleanup(heap._lock.release) 2770 for L in list(heap._len_to_seq.values()): 2771 for arena, start, stop in L: 2772 all.append((heap._arenas.index(arena), start, stop, 2773 stop-start, 'free')) 2774 for arena, start, stop in heap._allocated_blocks: 2775 all.append((heap._arenas.index(arena), start, stop, 2776 stop-start, 'occupied')) 2777 occupied += (stop-start) 2778 2779 all.sort() 2780 2781 for i in range(len(all)-1): 2782 (arena, start, stop) = all[i][:3] 2783 (narena, nstart, nstop) = all[i+1][:3] 2784 self.assertTrue((arena != narena and nstart == 0) or 2785 (stop == nstart)) 2786 2787 def test_free_from_gc(self): 2788 # Check that freeing of blocks by the garbage collector doesn't deadlock 2789 # (issue #12352). 2790 # Make sure the GC is enabled, and set lower collection thresholds to 2791 # make collections more frequent (and increase the probability of 2792 # deadlock). 2793 if not gc.isenabled(): 2794 gc.enable() 2795 self.addCleanup(gc.disable) 2796 thresholds = gc.get_threshold() 2797 self.addCleanup(gc.set_threshold, *thresholds) 2798 gc.set_threshold(10) 2799 2800 # perform numerous block allocations, with cyclic references to make 2801 # sure objects are collected asynchronously by the gc 2802 for i in range(5000): 2803 a = multiprocessing.heap.BufferWrapper(1) 2804 b = multiprocessing.heap.BufferWrapper(1) 2805 # circular references 2806 a.buddy = b 2807 b.buddy = a 2808 2809# 2810# 2811# 2812 2813class _Foo(Structure): 2814 _fields_ = [ 2815 ('x', c_int), 2816 ('y', c_double) 2817 ] 2818 2819class _TestSharedCTypes(BaseTestCase): 2820 2821 ALLOWED_TYPES = ('processes',) 2822 2823 def setUp(self): 2824 if not HAS_SHAREDCTYPES: 2825 self.skipTest("requires multiprocessing.sharedctypes") 2826 2827 @classmethod 2828 def _double(cls, x, y, foo, arr, string): 2829 x.value *= 2 2830 y.value *= 2 2831 foo.x *= 2 2832 foo.y *= 2 2833 string.value *= 2 2834 for i in range(len(arr)): 2835 arr[i] *= 2 2836 2837 def test_sharedctypes(self, lock=False): 2838 x = Value('i', 7, lock=lock) 2839 y = Value(c_double, 1.0/3.0, lock=lock) 2840 foo = Value(_Foo, 3, 2, lock=lock) 2841 arr = self.Array('d', list(range(10)), lock=lock) 2842 string = self.Array('c', 20, lock=lock) 2843 string.value = latin('hello') 2844 2845 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 2846 p.daemon = True 2847 p.start() 2848 p.join() 2849 2850 self.assertEqual(x.value, 14) 2851 self.assertAlmostEqual(y.value, 2.0/3.0) 2852 self.assertEqual(foo.x, 6) 2853 self.assertAlmostEqual(foo.y, 4.0) 2854 for i in range(10): 2855 self.assertAlmostEqual(arr[i], i*2) 2856 self.assertEqual(string.value, latin('hellohello')) 2857 2858 def test_synchronize(self): 2859 self.test_sharedctypes(lock=True) 2860 2861 def test_copy(self): 2862 foo = _Foo(2, 5.0) 2863 bar = copy(foo) 2864 foo.x = 0 2865 foo.y = 0 2866 self.assertEqual(bar.x, 2) 2867 self.assertAlmostEqual(bar.y, 5.0) 2868 2869# 2870# 2871# 2872 2873class _TestFinalize(BaseTestCase): 2874 2875 ALLOWED_TYPES = ('processes',) 2876 2877 @classmethod 2878 def _test_finalize(cls, conn): 2879 class Foo(object): 2880 pass 2881 2882 a = Foo() 2883 util.Finalize(a, conn.send, args=('a',)) 2884 del a # triggers callback for a 2885 2886 b = Foo() 2887 close_b = util.Finalize(b, conn.send, args=('b',)) 2888 close_b() # triggers callback for b 2889 close_b() # does nothing because callback has already been called 2890 del b # does nothing because callback has already been called 2891 2892 c = Foo() 2893 util.Finalize(c, conn.send, args=('c',)) 2894 2895 d10 = Foo() 2896 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 2897 2898 d01 = Foo() 2899 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 2900 d02 = Foo() 2901 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 2902 d03 = Foo() 2903 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 2904 2905 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 2906 2907 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 2908 2909 # call multiprocessing's cleanup function then exit process without 2910 # garbage collecting locals 2911 util._exit_function() 2912 conn.close() 2913 os._exit(0) 2914 2915 def test_finalize(self): 2916 conn, child_conn = self.Pipe() 2917 2918 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2919 p.daemon = True 2920 p.start() 2921 p.join() 2922 2923 result = [obj for obj in iter(conn.recv, 'STOP')] 2924 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 2925 2926# 2927# Test that from ... import * works for each module 2928# 2929 2930class _TestImportStar(unittest.TestCase): 2931 2932 def get_module_names(self): 2933 import glob 2934 folder = os.path.dirname(multiprocessing.__file__) 2935 pattern = os.path.join(folder, '*.py') 2936 files = glob.glob(pattern) 2937 modules = [os.path.splitext(os.path.split(f)[1])[0] for f in files] 2938 modules = ['multiprocess.' + m for m in modules] 2939 modules.remove('multiprocess.__init__') 2940 modules.append('multiprocess') 2941 return modules 2942 2943 def test_import(self): 2944 modules = self.get_module_names() 2945 if sys.platform == 'win32': 2946 modules.remove('multiprocess.popen_fork') 2947 modules.remove('multiprocess.popen_forkserver') 2948 modules.remove('multiprocess.popen_spawn_posix') 2949 else: 2950 modules.remove('multiprocess.popen_spawn_win32') 2951 if not HAS_REDUCTION: 2952 modules.remove('multiprocess.popen_forkserver') 2953 2954 if c_int is None: 2955 # This module requires _ctypes 2956 modules.remove('multiprocess.sharedctypes') 2957 2958 for name in modules: 2959 __import__(name) 2960 mod = sys.modules[name] 2961 self.assertTrue(hasattr(mod, '__all__'), name) 2962 2963 for attr in mod.__all__: 2964 self.assertTrue( 2965 hasattr(mod, attr), 2966 '%r does not have attribute %r' % (mod, attr) 2967 ) 2968 2969# 2970# Quick test that logging works -- does not test logging output 2971# 2972 2973class _TestLogging(BaseTestCase): 2974 2975 ALLOWED_TYPES = ('processes',) 2976 2977 def test_enable_logging(self): 2978 logger = multiprocessing.get_logger() 2979 logger.setLevel(util.SUBWARNING) 2980 self.assertTrue(logger is not None) 2981 logger.debug('this will not be printed') 2982 logger.info('nor will this') 2983 logger.setLevel(LOG_LEVEL) 2984 2985 @classmethod 2986 def _test_level(cls, conn): 2987 logger = multiprocessing.get_logger() 2988 conn.send(logger.getEffectiveLevel()) 2989 2990 def test_level(self): 2991 LEVEL1 = 32 2992 LEVEL2 = 37 2993 2994 logger = multiprocessing.get_logger() 2995 root_logger = logging.getLogger() 2996 root_level = root_logger.level 2997 2998 reader, writer = multiprocessing.Pipe(duplex=False) 2999 3000 logger.setLevel(LEVEL1) 3001 p = self.Process(target=self._test_level, args=(writer,)) 3002 p.daemon = True 3003 p.start() 3004 self.assertEqual(LEVEL1, reader.recv()) 3005 3006 logger.setLevel(logging.NOTSET) 3007 root_logger.setLevel(LEVEL2) 3008 p = self.Process(target=self._test_level, args=(writer,)) 3009 p.daemon = True 3010 p.start() 3011 self.assertEqual(LEVEL2, reader.recv()) 3012 3013 root_logger.setLevel(root_level) 3014 logger.setLevel(level=LOG_LEVEL) 3015 3016 3017# class _TestLoggingProcessName(BaseTestCase): 3018# 3019# def handle(self, record): 3020# assert record.processName == multiprocessing.current_process().name 3021# self.__handled = True 3022# 3023# def test_logging(self): 3024# handler = logging.Handler() 3025# handler.handle = self.handle 3026# self.__handled = False 3027# # Bypass getLogger() and side-effects 3028# logger = logging.getLoggerClass()( 3029# 'multiprocessing.test.TestLoggingProcessName') 3030# logger.addHandler(handler) 3031# logger.propagate = False 3032# 3033# logger.warn('foo') 3034# assert self.__handled 3035 3036# 3037# Check that Process.join() retries if os.waitpid() fails with EINTR 3038# 3039 3040class _TestPollEintr(BaseTestCase): 3041 3042 ALLOWED_TYPES = ('processes',) 3043 3044 @classmethod 3045 def _killer(cls, pid): 3046 time.sleep(0.1) 3047 os.kill(pid, signal.SIGUSR1) 3048 3049 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3050 def test_poll_eintr(self): 3051 got_signal = [False] 3052 def record(*args): 3053 got_signal[0] = True 3054 pid = os.getpid() 3055 oldhandler = signal.signal(signal.SIGUSR1, record) 3056 try: 3057 killer = self.Process(target=self._killer, args=(pid,)) 3058 killer.start() 3059 try: 3060 p = self.Process(target=time.sleep, args=(2,)) 3061 p.start() 3062 p.join() 3063 finally: 3064 killer.join() 3065 self.assertTrue(got_signal[0]) 3066 self.assertEqual(p.exitcode, 0) 3067 finally: 3068 signal.signal(signal.SIGUSR1, oldhandler) 3069 3070# 3071# Test to verify handle verification, see issue 3321 3072# 3073 3074class TestInvalidHandle(unittest.TestCase): 3075 3076 @unittest.skipIf(WIN32, "skipped on Windows") 3077 def test_invalid_handles(self): 3078 conn = multiprocessing.connection.Connection(44977608) 3079 # check that poll() doesn't crash 3080 try: 3081 conn.poll() 3082 except (ValueError, OSError): 3083 pass 3084 finally: 3085 # Hack private attribute _handle to avoid printing an error 3086 # in conn.__del__ 3087 conn._handle = None 3088 self.assertRaises((ValueError, OSError), 3089 multiprocessing.connection.Connection, -1) 3090 3091 3092 3093class OtherTest(unittest.TestCase): 3094 # TODO: add more tests for deliver/answer challenge. 3095 def test_deliver_challenge_auth_failure(self): 3096 class _FakeConnection(object): 3097 def recv_bytes(self, size): 3098 return b'something bogus' 3099 def send_bytes(self, data): 3100 pass 3101 self.assertRaises(multiprocessing.AuthenticationError, 3102 multiprocessing.connection.deliver_challenge, 3103 _FakeConnection(), b'abc') 3104 3105 def test_answer_challenge_auth_failure(self): 3106 class _FakeConnection(object): 3107 def __init__(self): 3108 self.count = 0 3109 def recv_bytes(self, size): 3110 self.count += 1 3111 if self.count == 1: 3112 return multiprocessing.connection.CHALLENGE 3113 elif self.count == 2: 3114 return b'something bogus' 3115 return b'' 3116 def send_bytes(self, data): 3117 pass 3118 self.assertRaises(multiprocessing.AuthenticationError, 3119 multiprocessing.connection.answer_challenge, 3120 _FakeConnection(), b'abc') 3121 3122# 3123# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 3124# 3125 3126def initializer(ns): 3127 ns.test += 1 3128 3129class TestInitializers(unittest.TestCase): 3130 def setUp(self): 3131 self.mgr = multiprocessing.Manager() 3132 self.ns = self.mgr.Namespace() 3133 self.ns.test = 0 3134 3135 def tearDown(self): 3136 self.mgr.shutdown() 3137 self.mgr.join() 3138 3139 def test_manager_initializer(self): 3140 m = multiprocessing.managers.SyncManager() 3141 self.assertRaises(TypeError, m.start, 1) 3142 m.start(initializer, (self.ns,)) 3143 self.assertEqual(self.ns.test, 1) 3144 m.shutdown() 3145 m.join() 3146 3147 def test_pool_initializer(self): 3148 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 3149 p = multiprocessing.Pool(1, initializer, (self.ns,)) 3150 p.close() 3151 p.join() 3152 self.assertEqual(self.ns.test, 1) 3153 3154# 3155# Issue 5155, 5313, 5331: Test process in processes 3156# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 3157# 3158 3159def _this_sub_process(q): 3160 try: 3161 item = q.get(block=False) 3162 except pyqueue.Empty: 3163 pass 3164 3165def _test_process(q): 3166 queue = multiprocessing.Queue() 3167 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 3168 subProc.daemon = True 3169 subProc.start() 3170 subProc.join() 3171 3172def _afunc(x): 3173 return x*x 3174 3175def pool_in_process(): 3176 pool = multiprocessing.Pool(processes=4) 3177 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 3178 pool.close() 3179 pool.join() 3180 3181class _file_like(object): 3182 def __init__(self, delegate): 3183 self._delegate = delegate 3184 self._pid = None 3185 3186 @property 3187 def cache(self): 3188 pid = os.getpid() 3189 # There are no race conditions since fork keeps only the running thread 3190 if pid != self._pid: 3191 self._pid = pid 3192 self._cache = [] 3193 return self._cache 3194 3195 def write(self, data): 3196 self.cache.append(data) 3197 3198 def flush(self): 3199 self._delegate.write(''.join(self.cache)) 3200 self._cache = [] 3201 3202class TestStdinBadfiledescriptor(unittest.TestCase): 3203 3204 def test_queue_in_process(self): 3205 queue = multiprocessing.Queue() 3206 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 3207 proc.start() 3208 proc.join() 3209 3210 def test_pool_in_process(self): 3211 p = multiprocessing.Process(target=pool_in_process) 3212 p.start() 3213 p.join() 3214 3215 def test_flushing(self): 3216 sio = io.StringIO() 3217 flike = _file_like(sio) 3218 flike.write('foo') 3219 proc = multiprocessing.Process(target=lambda: flike.flush()) 3220 flike.flush() 3221 assert sio.getvalue() == 'foo' 3222 3223 3224class TestWait(unittest.TestCase): 3225 3226 @classmethod 3227 def _child_test_wait(cls, w, slow): 3228 for i in range(10): 3229 if slow: 3230 time.sleep(random.random()*0.1) 3231 w.send((i, os.getpid())) 3232 w.close() 3233 3234 def test_wait(self, slow=False): 3235 from multiprocess.connection import wait 3236 readers = [] 3237 procs = [] 3238 messages = [] 3239 3240 for i in range(4): 3241 r, w = multiprocessing.Pipe(duplex=False) 3242 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 3243 p.daemon = True 3244 p.start() 3245 w.close() 3246 readers.append(r) 3247 procs.append(p) 3248 self.addCleanup(p.join) 3249 3250 while readers: 3251 for r in wait(readers): 3252 try: 3253 msg = r.recv() 3254 except EOFError: 3255 readers.remove(r) 3256 r.close() 3257 else: 3258 messages.append(msg) 3259 3260 messages.sort() 3261 expected = sorted((i, p.pid) for i in range(10) for p in procs) 3262 self.assertEqual(messages, expected) 3263 3264 @classmethod 3265 def _child_test_wait_socket(cls, address, slow): 3266 s = socket.socket() 3267 s.connect(address) 3268 for i in range(10): 3269 if slow: 3270 time.sleep(random.random()*0.1) 3271 s.sendall(('%s\n' % i).encode('ascii')) 3272 s.close() 3273 3274 def test_wait_socket(self, slow=False): 3275 from multiprocess.connection import wait 3276 l = socket.socket() 3277 l.bind((test.support.HOST, 0)) 3278 l.listen() 3279 addr = l.getsockname() 3280 readers = [] 3281 procs = [] 3282 dic = {} 3283 3284 for i in range(4): 3285 p = multiprocessing.Process(target=self._child_test_wait_socket, 3286 args=(addr, slow)) 3287 p.daemon = True 3288 p.start() 3289 procs.append(p) 3290 self.addCleanup(p.join) 3291 3292 for i in range(4): 3293 r, _ = l.accept() 3294 readers.append(r) 3295 dic[r] = [] 3296 l.close() 3297 3298 while readers: 3299 for r in wait(readers): 3300 msg = r.recv(32) 3301 if not msg: 3302 readers.remove(r) 3303 r.close() 3304 else: 3305 dic[r].append(msg) 3306 3307 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 3308 for v in dic.values(): 3309 self.assertEqual(b''.join(v), expected) 3310 3311 def test_wait_slow(self): 3312 self.test_wait(True) 3313 3314 def test_wait_socket_slow(self): 3315 self.test_wait_socket(True) 3316 3317 def test_wait_timeout(self): 3318 from multiprocess.connection import wait 3319 3320 expected = 5 3321 a, b = multiprocessing.Pipe() 3322 3323 start = time.time() 3324 res = wait([a, b], expected) 3325 delta = time.time() - start 3326 3327 self.assertEqual(res, []) 3328 self.assertLess(delta, expected * 2) 3329 self.assertGreater(delta, expected * 0.5) 3330 3331 b.send(None) 3332 3333 start = time.time() 3334 res = wait([a, b], 20) 3335 delta = time.time() - start 3336 3337 self.assertEqual(res, [a]) 3338 self.assertLess(delta, 0.4) 3339 3340 @classmethod 3341 def signal_and_sleep(cls, sem, period): 3342 sem.release() 3343 time.sleep(period) 3344 3345 def test_wait_integer(self): 3346 from multiprocess.connection import wait 3347 3348 expected = 3 3349 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 3350 sem = multiprocessing.Semaphore(0) 3351 a, b = multiprocessing.Pipe() 3352 p = multiprocessing.Process(target=self.signal_and_sleep, 3353 args=(sem, expected)) 3354 3355 p.start() 3356 self.assertIsInstance(p.sentinel, int) 3357 self.assertTrue(sem.acquire(timeout=20)) 3358 3359 start = time.time() 3360 res = wait([a, p.sentinel, b], expected + 20) 3361 delta = time.time() - start 3362 3363 self.assertEqual(res, [p.sentinel]) 3364 self.assertLess(delta, expected + 2) 3365 self.assertGreater(delta, expected - 2) 3366 3367 a.send(None) 3368 3369 start = time.time() 3370 res = wait([a, p.sentinel, b], 20) 3371 delta = time.time() - start 3372 3373 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 3374 self.assertLess(delta, 0.4) 3375 3376 b.send(None) 3377 3378 start = time.time() 3379 res = wait([a, p.sentinel, b], 20) 3380 delta = time.time() - start 3381 3382 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 3383 self.assertLess(delta, 0.4) 3384 3385 p.terminate() 3386 p.join() 3387 3388 def test_neg_timeout(self): 3389 from multiprocess.connection import wait 3390 a, b = multiprocessing.Pipe() 3391 t = time.time() 3392 res = wait([a], timeout=-1) 3393 t = time.time() - t 3394 self.assertEqual(res, []) 3395 self.assertLess(t, 1) 3396 a.close() 3397 b.close() 3398 3399# 3400# Issue 14151: Test invalid family on invalid environment 3401# 3402 3403class TestInvalidFamily(unittest.TestCase): 3404 3405 @unittest.skipIf(WIN32, "skipped on Windows") 3406 def test_invalid_family(self): 3407 with self.assertRaises(ValueError): 3408 multiprocessing.connection.Listener(r'\\.\test') 3409 3410 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 3411 def test_invalid_family_win32(self): 3412 with self.assertRaises(ValueError): 3413 multiprocessing.connection.Listener('/var/test.pipe') 3414 3415# 3416# Issue 12098: check sys.flags of child matches that for parent 3417# 3418 3419class TestFlags(unittest.TestCase): 3420 @classmethod 3421 def run_in_grandchild(cls, conn): 3422 conn.send(tuple(sys.flags)) 3423 3424 @classmethod 3425 def run_in_child(cls): 3426 import json 3427 r, w = multiprocessing.Pipe(duplex=False) 3428 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 3429 p.start() 3430 grandchild_flags = r.recv() 3431 p.join() 3432 r.close() 3433 w.close() 3434 flags = (tuple(sys.flags), grandchild_flags) 3435 print(json.dumps(flags)) 3436 3437 def _test_flags(self): 3438 import json, subprocess 3439 # start child process using unusual flags 3440 prog = ('from multiprocess.tests import TestFlags; ' + 3441 'TestFlags.run_in_child()') 3442 data = subprocess.check_output( 3443 [sys.executable, '-E', '-S', '-O', '-c', prog]) 3444 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 3445 self.assertEqual(child_flags, grandchild_flags) 3446 3447# 3448# Test interaction with socket timeouts - see Issue #6056 3449# 3450 3451class TestTimeouts(unittest.TestCase): 3452 @classmethod 3453 def _test_timeout(cls, child, address): 3454 time.sleep(1) 3455 child.send(123) 3456 child.close() 3457 conn = multiprocessing.connection.Client(address) 3458 conn.send(456) 3459 conn.close() 3460 3461 def test_timeout(self): 3462 old_timeout = socket.getdefaulttimeout() 3463 try: 3464 socket.setdefaulttimeout(0.1) 3465 parent, child = multiprocessing.Pipe(duplex=True) 3466 l = multiprocessing.connection.Listener(family='AF_INET') 3467 p = multiprocessing.Process(target=self._test_timeout, 3468 args=(child, l.address)) 3469 p.start() 3470 child.close() 3471 self.assertEqual(parent.recv(), 123) 3472 parent.close() 3473 conn = l.accept() 3474 self.assertEqual(conn.recv(), 456) 3475 conn.close() 3476 l.close() 3477 p.join(10) 3478 finally: 3479 socket.setdefaulttimeout(old_timeout) 3480 3481# 3482# Test what happens with no "if __name__ == '__main__'" 3483# 3484 3485class TestNoForkBomb(unittest.TestCase): 3486 def _test_noforkbomb(self): 3487 sm = multiprocessing.get_start_method() 3488 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 3489 if sm != 'fork': 3490 rc, out, err = test.support.script_helper.assert_python_failure(name, sm) 3491 self.assertEqual(out, b'') 3492 self.assertIn(b'RuntimeError', err) 3493 else: 3494 rc, out, err = test.support.script_helper.assert_python_ok(name, sm) 3495 self.assertEqual(out.rstrip(), b'123') 3496 self.assertEqual(err, b'') 3497 3498# 3499# Issue #17555: ForkAwareThreadLock 3500# 3501 3502class TestForkAwareThreadLock(unittest.TestCase): 3503 # We recurisvely start processes. Issue #17555 meant that the 3504 # after fork registry would get duplicate entries for the same 3505 # lock. The size of the registry at generation n was ~2**n. 3506 3507 @classmethod 3508 def child(cls, n, conn): 3509 if n > 1: 3510 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 3511 p.start() 3512 conn.close() 3513 p.join(timeout=5) 3514 else: 3515 conn.send(len(util._afterfork_registry)) 3516 conn.close() 3517 3518 def test_lock(self): 3519 r, w = multiprocessing.Pipe(False) 3520 l = util.ForkAwareThreadLock() 3521 old_size = len(util._afterfork_registry) 3522 p = multiprocessing.Process(target=self.child, args=(5, w)) 3523 p.start() 3524 w.close() 3525 new_size = r.recv() 3526 p.join(timeout=5) 3527 self.assertLessEqual(new_size, old_size) 3528 3529# 3530# Check that non-forked child processes do not inherit unneeded fds/handles 3531# 3532 3533class TestCloseFds(unittest.TestCase): 3534 3535 def get_high_socket_fd(self): 3536 if WIN32: 3537 # The child process will not have any socket handles, so 3538 # calling socket.fromfd() should produce WSAENOTSOCK even 3539 # if there is a handle of the same number. 3540 return socket.socket().detach() 3541 else: 3542 # We want to produce a socket with an fd high enough that a 3543 # freshly created child process will not have any fds as high. 3544 fd = socket.socket().detach() 3545 to_close = [] 3546 while fd < 50: 3547 to_close.append(fd) 3548 fd = os.dup(fd) 3549 for x in to_close: 3550 os.close(x) 3551 return fd 3552 3553 def close(self, fd): 3554 if WIN32: 3555 socket.socket(fileno=fd).close() 3556 else: 3557 os.close(fd) 3558 3559 @classmethod 3560 def _test_closefds(cls, conn, fd): 3561 try: 3562 s = socket.fromfd(fd, socket.AF_INET, socket.SOCK_STREAM) 3563 except Exception as e: 3564 conn.send(e) 3565 else: 3566 s.close() 3567 conn.send(None) 3568 3569 def test_closefd(self): 3570 if not HAS_REDUCTION: 3571 raise unittest.SkipTest('requires fd pickling') 3572 3573 reader, writer = multiprocessing.Pipe() 3574 fd = self.get_high_socket_fd() 3575 try: 3576 p = multiprocessing.Process(target=self._test_closefds, 3577 args=(writer, fd)) 3578 p.start() 3579 writer.close() 3580 e = reader.recv() 3581 p.join(timeout=5) 3582 finally: 3583 self.close(fd) 3584 writer.close() 3585 reader.close() 3586 3587 if multiprocessing.get_start_method() == 'fork': 3588 self.assertIs(e, None) 3589 else: 3590 WSAENOTSOCK = 10038 3591 self.assertIsInstance(e, OSError) 3592 self.assertTrue(e.errno == errno.EBADF or 3593 e.winerror == WSAENOTSOCK, e) 3594 3595# 3596# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 3597# 3598 3599class TestIgnoreEINTR(unittest.TestCase): 3600 3601 @classmethod 3602 def _test_ignore(cls, conn): 3603 def handler(signum, frame): 3604 pass 3605 signal.signal(signal.SIGUSR1, handler) 3606 conn.send('ready') 3607 x = conn.recv() 3608 conn.send(x) 3609 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block 3610 3611 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3612 def _test_ignore(self): 3613 conn, child_conn = multiprocessing.Pipe() 3614 try: 3615 p = multiprocessing.Process(target=self._test_ignore, 3616 args=(child_conn,)) 3617 p.daemon = True 3618 p.start() 3619 child_conn.close() 3620 self.assertEqual(conn.recv(), 'ready') 3621 time.sleep(0.1) 3622 os.kill(p.pid, signal.SIGUSR1) 3623 time.sleep(0.1) 3624 conn.send(1234) 3625 self.assertEqual(conn.recv(), 1234) 3626 time.sleep(0.1) 3627 os.kill(p.pid, signal.SIGUSR1) 3628 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) 3629 time.sleep(0.1) 3630 p.join() 3631 finally: 3632 conn.close() 3633 3634 @classmethod 3635 def _test_ignore_listener(cls, conn): 3636 def handler(signum, frame): 3637 pass 3638 signal.signal(signal.SIGUSR1, handler) 3639 with multiprocessing.connection.Listener() as l: 3640 conn.send(l.address) 3641 a = l.accept() 3642 a.send('welcome') 3643 3644 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3645 def test_ignore_listener(self): 3646 conn, child_conn = multiprocessing.Pipe() 3647 try: 3648 p = multiprocessing.Process(target=self._test_ignore_listener, 3649 args=(child_conn,)) 3650 p.daemon = True 3651 p.start() 3652 child_conn.close() 3653 address = conn.recv() 3654 time.sleep(0.1) 3655 os.kill(p.pid, signal.SIGUSR1) 3656 time.sleep(0.1) 3657 client = multiprocessing.connection.Client(address) 3658 self.assertEqual(client.recv(), 'welcome') 3659 p.join() 3660 finally: 3661 conn.close() 3662 3663class TestStartMethod(unittest.TestCase): 3664 @classmethod 3665 def _check_context(cls, conn): 3666 conn.send(multiprocessing.get_start_method()) 3667 3668 def check_context(self, ctx): 3669 r, w = ctx.Pipe(duplex=False) 3670 p = ctx.Process(target=self._check_context, args=(w,)) 3671 p.start() 3672 w.close() 3673 child_method = r.recv() 3674 r.close() 3675 p.join() 3676 self.assertEqual(child_method, ctx.get_start_method()) 3677 3678 def test_context(self): 3679 for method in ('fork', 'spawn', 'forkserver'): 3680 try: 3681 ctx = multiprocessing.get_context(method) 3682 except ValueError: 3683 continue 3684 self.assertEqual(ctx.get_start_method(), method) 3685 self.assertIs(ctx.get_context(), ctx) 3686 self.assertRaises(ValueError, ctx.set_start_method, 'spawn') 3687 self.assertRaises(ValueError, ctx.set_start_method, None) 3688 self.check_context(ctx) 3689 3690 def test_set_get(self): 3691 multiprocessing.set_forkserver_preload(PRELOAD) 3692 count = 0 3693 old_method = multiprocessing.get_start_method() 3694 try: 3695 for method in ('fork', 'spawn', 'forkserver'): 3696 try: 3697 multiprocessing.set_start_method(method, force=True) 3698 except ValueError: 3699 continue 3700 self.assertEqual(multiprocessing.get_start_method(), method) 3701 ctx = multiprocessing.get_context() 3702 self.assertEqual(ctx.get_start_method(), method) 3703 self.assertTrue(type(ctx).__name__.lower().startswith(method)) 3704 self.assertTrue( 3705 ctx.Process.__name__.lower().startswith(method)) 3706 self.check_context(multiprocessing) 3707 count += 1 3708 finally: 3709 multiprocessing.set_start_method(old_method, force=True) 3710 self.assertGreaterEqual(count, 1) 3711 3712 def test_get_all(self): 3713 methods = multiprocessing.get_all_start_methods() 3714 if sys.platform == 'win32': 3715 self.assertEqual(methods, ['spawn']) 3716 else: 3717 self.assertTrue(methods == ['fork', 'spawn'] or 3718 methods == ['fork', 'spawn', 'forkserver']) 3719 3720# 3721# Check that killing process does not leak named semaphores 3722# 3723 3724@unittest.skipIf(sys.platform == "win32", 3725 "test semantics don't make sense on Windows") 3726class TestSemaphoreTracker(unittest.TestCase): 3727 def test_semaphore_tracker(self): 3728 import subprocess 3729 cmd = '''if 1: 3730 import multiprocess as mp, time, os 3731 mp.set_start_method("spawn") 3732 lock1 = mp.Lock() 3733 lock2 = mp.Lock() 3734 os.write(%d, lock1._semlock.name.encode("ascii") + b"\\n") 3735 os.write(%d, lock2._semlock.name.encode("ascii") + b"\\n") 3736 time.sleep(10) 3737 ''' 3738 r, w = os.pipe() 3739 p = subprocess.Popen([sys.executable, 3740 '-c', cmd % (w, w)], 3741 pass_fds=[w], 3742 stderr=subprocess.PIPE) 3743 os.close(w) 3744 with open(r, 'rb', closefd=True) as f: 3745 name1 = f.readline().rstrip().decode('ascii') 3746 name2 = f.readline().rstrip().decode('ascii') 3747 _multiprocessing.sem_unlink(name1) 3748 p.terminate() 3749 p.wait() 3750 time.sleep(2.0) 3751 with self.assertRaises(OSError) as ctx: 3752 _multiprocessing.sem_unlink(name2) 3753 # docs say it should be ENOENT, but OSX seems to give EINVAL 3754 self.assertIn(ctx.exception.errno, (errno.ENOENT, errno.EINVAL)) 3755 err = p.stderr.read().decode('utf-8') 3756 p.stderr.close() 3757 expected = 'semaphore_tracker: There appear to be 2 leaked semaphores' 3758 self.assertRegex(err, expected) 3759 self.assertRegex(err, 'semaphore_tracker: %r: \[Errno' % name1) 3760 3761# 3762# Mixins 3763# 3764 3765class ProcessesMixin(object): 3766 TYPE = 'processes' 3767 Process = multiprocessing.Process 3768 connection = multiprocessing.connection 3769 current_process = staticmethod(multiprocessing.current_process) 3770 active_children = staticmethod(multiprocessing.active_children) 3771 Pool = staticmethod(multiprocessing.Pool) 3772 Pipe = staticmethod(multiprocessing.Pipe) 3773 Queue = staticmethod(multiprocessing.Queue) 3774 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 3775 Lock = staticmethod(multiprocessing.Lock) 3776 RLock = staticmethod(multiprocessing.RLock) 3777 Semaphore = staticmethod(multiprocessing.Semaphore) 3778 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 3779 Condition = staticmethod(multiprocessing.Condition) 3780 Event = staticmethod(multiprocessing.Event) 3781 Barrier = staticmethod(multiprocessing.Barrier) 3782 Value = staticmethod(multiprocessing.Value) 3783 Array = staticmethod(multiprocessing.Array) 3784 RawValue = staticmethod(multiprocessing.RawValue) 3785 RawArray = staticmethod(multiprocessing.RawArray) 3786 3787 3788class ManagerMixin(object): 3789 TYPE = 'manager' 3790 Process = multiprocessing.Process 3791 Queue = property(operator.attrgetter('manager.Queue')) 3792 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 3793 Lock = property(operator.attrgetter('manager.Lock')) 3794 RLock = property(operator.attrgetter('manager.RLock')) 3795 Semaphore = property(operator.attrgetter('manager.Semaphore')) 3796 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 3797 Condition = property(operator.attrgetter('manager.Condition')) 3798 Event = property(operator.attrgetter('manager.Event')) 3799 Barrier = property(operator.attrgetter('manager.Barrier')) 3800 Value = property(operator.attrgetter('manager.Value')) 3801 Array = property(operator.attrgetter('manager.Array')) 3802 list = property(operator.attrgetter('manager.list')) 3803 dict = property(operator.attrgetter('manager.dict')) 3804 Namespace = property(operator.attrgetter('manager.Namespace')) 3805 3806 @classmethod 3807 def Pool(cls, *args, **kwds): 3808 return cls.manager.Pool(*args, **kwds) 3809 3810 @classmethod 3811 def setUpClass(cls): 3812 cls.manager = multiprocessing.Manager() 3813 3814 @classmethod 3815 def tearDownClass(cls): 3816 # only the manager process should be returned by active_children() 3817 # but this can take a bit on slow machines, so wait a few seconds 3818 # if there are other children too (see #17395) 3819 t = 0.01 3820 while len(multiprocessing.active_children()) > 1 and t < 5: 3821 time.sleep(t) 3822 t *= 2 3823 gc.collect() # do garbage collection 3824 if cls.manager._number_of_objects() != 0: 3825 # This is not really an error since some tests do not 3826 # ensure that all processes which hold a reference to a 3827 # managed object have been joined. 3828 print('Shared objects which still exist at manager shutdown:') 3829 print(cls.manager._debug_info()) 3830 cls.manager.shutdown() 3831 cls.manager.join() 3832 cls.manager = None 3833 3834 3835class ThreadsMixin(object): 3836 TYPE = 'threads' 3837 Process = multiprocessing.dummy.Process 3838 connection = multiprocessing.dummy.connection 3839 current_process = staticmethod(multiprocessing.dummy.current_process) 3840 active_children = staticmethod(multiprocessing.dummy.active_children) 3841 Pool = staticmethod(multiprocessing.Pool) 3842 Pipe = staticmethod(multiprocessing.dummy.Pipe) 3843 Queue = staticmethod(multiprocessing.dummy.Queue) 3844 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 3845 Lock = staticmethod(multiprocessing.dummy.Lock) 3846 RLock = staticmethod(multiprocessing.dummy.RLock) 3847 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 3848 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 3849 Condition = staticmethod(multiprocessing.dummy.Condition) 3850 Event = staticmethod(multiprocessing.dummy.Event) 3851 Barrier = staticmethod(multiprocessing.dummy.Barrier) 3852 Value = staticmethod(multiprocessing.dummy.Value) 3853 Array = staticmethod(multiprocessing.dummy.Array) 3854 3855# 3856# Functions used to create test cases from the base ones in this module 3857# 3858 3859def install_tests_in_module_dict(remote_globs, start_method): 3860 __module__ = remote_globs['__name__'] 3861 local_globs = globals() 3862 ALL_TYPES = {'processes', 'threads', 'manager'} 3863 3864 for name, base in local_globs.items(): 3865 if not isinstance(base, type): 3866 continue 3867 if issubclass(base, BaseTestCase): 3868 if base is BaseTestCase: 3869 continue 3870 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, base.ALLOWED_TYPES 3871 for type_ in base.ALLOWED_TYPES: 3872 newname = 'With' + type_.capitalize() + name[1:] 3873 Mixin = local_globs[type_.capitalize() + 'Mixin'] 3874 class Temp(base, Mixin, unittest.TestCase): 3875 pass 3876 Temp.__name__ = Temp.__qualname__ = newname 3877 Temp.__module__ = __module__ 3878 remote_globs[newname] = Temp 3879 elif issubclass(base, unittest.TestCase): 3880 class Temp(base, object): 3881 pass 3882 Temp.__name__ = Temp.__qualname__ = name 3883 Temp.__module__ = __module__ 3884 remote_globs[name] = Temp 3885 3886 dangling = [None, None] 3887 old_start_method = [None] 3888 3889 def setUpModule(): 3890 multiprocessing.set_forkserver_preload(PRELOAD) 3891 multiprocessing.process._cleanup() 3892 dangling[0] = multiprocessing.process._dangling.copy() 3893 dangling[1] = threading._dangling.copy() 3894 old_start_method[0] = multiprocessing.get_start_method(allow_none=True) 3895 try: 3896 multiprocessing.set_start_method(start_method, force=True) 3897 except ValueError: 3898 raise unittest.SkipTest(start_method + 3899 ' start method not supported') 3900 3901 if sys.platform.startswith("linux"): 3902 try: 3903 lock = multiprocessing.RLock() 3904 except OSError: 3905 raise unittest.SkipTest("OSError raises on RLock creation, " 3906 "see issue 3111!") 3907 check_enough_semaphores() 3908 util.get_temp_dir() # creates temp directory 3909 multiprocessing.get_logger().setLevel(LOG_LEVEL) 3910 3911 def tearDownModule(): 3912 multiprocessing.set_start_method(old_start_method[0], force=True) 3913 # pause a bit so we don't get warning about dangling threads/processes 3914 time.sleep(0.5) 3915 multiprocessing.process._cleanup() 3916 gc.collect() 3917 tmp = set(multiprocessing.process._dangling) - set(dangling[0]) 3918 if tmp: 3919 print('Dangling processes:', tmp, file=sys.stderr) 3920 del tmp 3921 tmp = set(threading._dangling) - set(dangling[1]) 3922 if tmp: 3923 print('Dangling threads:', tmp, file=sys.stderr) 3924 3925 remote_globs['setUpModule'] = setUpModule 3926 remote_globs['tearDownModule'] = tearDownModule 3927