1# 2# Unit tests for the multiprocessing package 3# 4 5import unittest 6import queue as pyqueue 7import time 8import io 9import itertools 10import sys 11import os 12import gc 13import errno 14import signal 15import array 16import socket 17import random 18import logging 19import struct 20import operator 21import test.support 22import test.script_helper 23 24 25# Skip tests if _multiprocessing wasn't built. 26_multiprocessing = test.support.import_module('_multiprocessing') 27# Skip tests if sem_open implementation is broken. 28test.support.import_module('multiprocess.synchronize') 29# import threading after _multiprocessing to raise a more revelant error 30# message: "No module named _multiprocessing". _multiprocessing is not compiled 31# without thread support. 32import threading 33 34import multiprocess as multiprocessing 35import multiprocess.dummy 36import multiprocess.connection 37import multiprocess.managers 38import multiprocess.heap 39import multiprocess.pool 40 41from multiprocess import util 42 43try: 44 from multiprocess import reduction 45 HAS_REDUCTION = True 46except ImportError: 47 HAS_REDUCTION = False 48 49try: 50 from multiprocess.sharedctypes import Value, copy 51 HAS_SHAREDCTYPES = True 52except ImportError: 53 HAS_SHAREDCTYPES = False 54 55try: 56 import msvcrt 57except ImportError: 58 msvcrt = None 59 60# 61# 62# 63 64def latin(s): 65 return s.encode('latin') 66 67# 68# Constants 69# 70 71LOG_LEVEL = util.SUBWARNING 72#LOG_LEVEL = logging.DEBUG 73 74DELTA = 0.1 75CHECK_TIMINGS = False # making true makes tests take a lot longer 76 # and can sometimes cause some non-serious 77 # failures because some calls block a bit 78 # longer than expected 79if CHECK_TIMINGS: 80 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.82, 0.35, 1.4 81else: 82 TIMEOUT1, TIMEOUT2, TIMEOUT3 = 0.1, 0.1, 0.1 83 84HAVE_GETVALUE = not getattr(_multiprocessing, 85 'HAVE_BROKEN_SEM_GETVALUE', False) 86 87WIN32 = (sys.platform == "win32") 88 89from multiprocess.connection import wait 90 91def wait_for_handle(handle, timeout): 92 if timeout is not None and timeout < 0.0: 93 timeout = None 94 return wait([handle], timeout) 95 96try: 97 MAXFD = os.sysconf("SC_OPEN_MAX") 98except: 99 MAXFD = 256 100 101# 102# Some tests require ctypes 103# 104 105try: 106 from ctypes import Structure, c_int, c_double 107except ImportError: 108 Structure = object 109 c_int = c_double = None 110 111 112def check_enough_semaphores(): 113 """Check that the system supports enough semaphores to run the test.""" 114 # minimum number of semaphores available according to POSIX 115 nsems_min = 256 116 try: 117 nsems = os.sysconf("SC_SEM_NSEMS_MAX") 118 except (AttributeError, ValueError): 119 # sysconf not available or setting not available 120 return 121 if nsems == -1 or nsems >= nsems_min: 122 return 123 raise unittest.SkipTest("The OS doesn't support enough semaphores " 124 "to run the test (required: %d)." % nsems_min) 125 126 127# 128# Creates a wrapper for a function which records the time it takes to finish 129# 130 131class TimingWrapper(object): 132 133 def __init__(self, func): 134 self.func = func 135 self.elapsed = None 136 137 def __call__(self, *args, **kwds): 138 t = time.time() 139 try: 140 return self.func(*args, **kwds) 141 finally: 142 self.elapsed = time.time() - t 143 144# 145# Base class for test cases 146# 147 148class BaseTestCase(object): 149 150 ALLOWED_TYPES = ('processes', 'manager', 'threads') 151 152 def assertTimingAlmostEqual(self, a, b): 153 if CHECK_TIMINGS: 154 self.assertAlmostEqual(a, b, 1) 155 156 def assertReturnsIfImplemented(self, value, func, *args): 157 try: 158 res = func(*args) 159 except NotImplementedError: 160 pass 161 else: 162 return self.assertEqual(value, res) 163 164 # For the sanity of Windows users, rather than crashing or freezing in 165 # multiple ways. 166 def __reduce__(self, *args): 167 raise NotImplementedError("shouldn't try to pickle a test case") 168 169 __reduce_ex__ = __reduce__ 170 171# 172# Return the value of a semaphore 173# 174 175def get_value(self): 176 try: 177 return self.get_value() 178 except AttributeError: 179 try: 180 return self._Semaphore__value 181 except AttributeError: 182 try: 183 return self._value 184 except AttributeError: 185 raise NotImplementedError 186 187# 188# Testcases 189# 190 191class _TestProcess(BaseTestCase): 192 193 ALLOWED_TYPES = ('processes', 'threads') 194 195 def test_current(self): 196 if self.TYPE == 'threads': 197 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 198 199 current = self.current_process() 200 authkey = current.authkey 201 202 self.assertTrue(current.is_alive()) 203 self.assertTrue(not current.daemon) 204 self.assertIsInstance(authkey, bytes) 205 self.assertTrue(len(authkey) > 0) 206 self.assertEqual(current.ident, os.getpid()) 207 self.assertEqual(current.exitcode, None) 208 209 def test_daemon_argument(self): 210 if self.TYPE == "threads": 211 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 212 213 # By default uses the current process's daemon flag. 214 proc0 = self.Process(target=self._test) 215 self.assertEqual(proc0.daemon, self.current_process().daemon) 216 proc1 = self.Process(target=self._test, daemon=True) 217 self.assertTrue(proc1.daemon) 218 proc2 = self.Process(target=self._test, daemon=False) 219 self.assertFalse(proc2.daemon) 220 221 @classmethod 222 def _test(cls, q, *args, **kwds): 223 current = cls.current_process() 224 q.put(args) 225 q.put(kwds) 226 q.put(current.name) 227 if cls.TYPE != 'threads': 228 q.put(bytes(current.authkey)) 229 q.put(current.pid) 230 231 def test_process(self): 232 q = self.Queue(1) 233 e = self.Event() 234 args = (q, 1, 2) 235 kwargs = {'hello':23, 'bye':2.54} 236 name = 'SomeProcess' 237 p = self.Process( 238 target=self._test, args=args, kwargs=kwargs, name=name 239 ) 240 p.daemon = True 241 current = self.current_process() 242 243 if self.TYPE != 'threads': 244 self.assertEqual(p.authkey, current.authkey) 245 self.assertEqual(p.is_alive(), False) 246 self.assertEqual(p.daemon, True) 247 self.assertNotIn(p, self.active_children()) 248 self.assertTrue(type(self.active_children()) is list) 249 self.assertEqual(p.exitcode, None) 250 251 p.start() 252 253 self.assertEqual(p.exitcode, None) 254 self.assertEqual(p.is_alive(), True) 255 self.assertIn(p, self.active_children()) 256 257 self.assertEqual(q.get(), args[1:]) 258 self.assertEqual(q.get(), kwargs) 259 self.assertEqual(q.get(), p.name) 260 if self.TYPE != 'threads': 261 self.assertEqual(q.get(), current.authkey) 262 self.assertEqual(q.get(), p.pid) 263 264 p.join() 265 266 self.assertEqual(p.exitcode, 0) 267 self.assertEqual(p.is_alive(), False) 268 self.assertNotIn(p, self.active_children()) 269 270 @classmethod 271 def _test_terminate(cls): 272 time.sleep(100) 273 274 def test_terminate(self): 275 if self.TYPE == 'threads': 276 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 277 278 p = self.Process(target=self._test_terminate) 279 p.daemon = True 280 p.start() 281 282 self.assertEqual(p.is_alive(), True) 283 self.assertIn(p, self.active_children()) 284 self.assertEqual(p.exitcode, None) 285 286 join = TimingWrapper(p.join) 287 288 self.assertEqual(join(0), None) 289 self.assertTimingAlmostEqual(join.elapsed, 0.0) 290 self.assertEqual(p.is_alive(), True) 291 292 self.assertEqual(join(-1), None) 293 self.assertTimingAlmostEqual(join.elapsed, 0.0) 294 self.assertEqual(p.is_alive(), True) 295 296 p.terminate() 297 298 if hasattr(signal, 'alarm'): 299 def handler(*args): 300 raise RuntimeError('join took too long: %s' % p) 301 old_handler = signal.signal(signal.SIGALRM, handler) 302 try: 303 signal.alarm(10) 304 self.assertEqual(join(), None) 305 signal.alarm(0) 306 finally: 307 signal.signal(signal.SIGALRM, old_handler) 308 else: 309 self.assertEqual(join(), None) 310 311 self.assertTimingAlmostEqual(join.elapsed, 0.0) 312 313 self.assertEqual(p.is_alive(), False) 314 self.assertNotIn(p, self.active_children()) 315 316 p.join() 317 318 # XXX sometimes get p.exitcode == 0 on Windows ... 319 #self.assertEqual(p.exitcode, -signal.SIGTERM) 320 321 def test_cpu_count(self): 322 try: 323 cpus = multiprocessing.cpu_count() 324 except NotImplementedError: 325 cpus = 1 326 self.assertTrue(type(cpus) is int) 327 self.assertTrue(cpus >= 1) 328 329 def test_active_children(self): 330 self.assertEqual(type(self.active_children()), list) 331 332 p = self.Process(target=time.sleep, args=(DELTA,)) 333 self.assertNotIn(p, self.active_children()) 334 335 p.daemon = True 336 p.start() 337 self.assertIn(p, self.active_children()) 338 339 p.join() 340 self.assertNotIn(p, self.active_children()) 341 342 @classmethod 343 def _test_recursion(cls, wconn, id): 344 from multiprocess import forking 345 wconn.send(id) 346 if len(id) < 2: 347 for i in range(2): 348 p = cls.Process( 349 target=cls._test_recursion, args=(wconn, id+[i]) 350 ) 351 p.start() 352 p.join() 353 354 def test_recursion(self): 355 rconn, wconn = self.Pipe(duplex=False) 356 self._test_recursion(wconn, []) 357 358 time.sleep(DELTA) 359 result = [] 360 while rconn.poll(): 361 result.append(rconn.recv()) 362 363 expected = [ 364 [], 365 [0], 366 [0, 0], 367 [0, 1], 368 [1], 369 [1, 0], 370 [1, 1] 371 ] 372 self.assertEqual(result, expected) 373 374 @classmethod 375 def _test_sentinel(cls, event): 376 event.wait(10.0) 377 378 def test_sentinel(self): 379 if self.TYPE == "threads": 380 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 381 event = self.Event() 382 p = self.Process(target=self._test_sentinel, args=(event,)) 383 with self.assertRaises(ValueError): 384 p.sentinel 385 p.start() 386 self.addCleanup(p.join) 387 sentinel = p.sentinel 388 self.assertIsInstance(sentinel, int) 389 self.assertFalse(wait_for_handle(sentinel, timeout=0.0)) 390 event.set() 391 p.join() 392 self.assertTrue(wait_for_handle(sentinel, timeout=DELTA)) 393 394# 395# 396# 397 398class _UpperCaser(multiprocessing.Process): 399 400 def __init__(self): 401 multiprocessing.Process.__init__(self) 402 self.child_conn, self.parent_conn = multiprocessing.Pipe() 403 404 def run(self): 405 self.parent_conn.close() 406 for s in iter(self.child_conn.recv, None): 407 self.child_conn.send(s.upper()) 408 self.child_conn.close() 409 410 def submit(self, s): 411 assert type(s) is str 412 self.parent_conn.send(s) 413 return self.parent_conn.recv() 414 415 def stop(self): 416 self.parent_conn.send(None) 417 self.parent_conn.close() 418 self.child_conn.close() 419 420class _TestSubclassingProcess(BaseTestCase): 421 422 ALLOWED_TYPES = ('processes',) 423 424 def test_subclassing(self): 425 uppercaser = _UpperCaser() 426 uppercaser.daemon = True 427 uppercaser.start() 428 self.assertEqual(uppercaser.submit('hello'), 'HELLO') 429 self.assertEqual(uppercaser.submit('world'), 'WORLD') 430 uppercaser.stop() 431 uppercaser.join() 432 433 def test_stderr_flush(self): 434 # sys.stderr is flushed at process shutdown (issue #13812) 435 if self.TYPE == "threads": 436 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 437 438 testfn = test.support.TESTFN 439 self.addCleanup(test.support.unlink, testfn) 440 proc = self.Process(target=self._test_stderr_flush, args=(testfn,)) 441 proc.start() 442 proc.join() 443 with open(testfn, 'r') as f: 444 err = f.read() 445 # The whole traceback was printed 446 self.assertIn("ZeroDivisionError", err) 447 self.assertIn("__init__.py", err) 448 self.assertIn("1/0 # MARKER", err) 449 450 @classmethod 451 def _test_stderr_flush(cls, testfn): 452 sys.stderr = open(testfn, 'w') 453 1/0 # MARKER 454 455 456 @classmethod 457 def _test_sys_exit(cls, reason, testfn): 458 sys.stderr = open(testfn, 'w') 459 sys.exit(reason) 460 461 def test_sys_exit(self): 462 # See Issue 13854 463 if self.TYPE == 'threads': 464 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 465 466 testfn = test.support.TESTFN 467 self.addCleanup(test.support.unlink, testfn) 468 469 for reason, code in (([1, 2, 3], 1), ('ignore this', 1)): 470 p = self.Process(target=self._test_sys_exit, args=(reason, testfn)) 471 p.daemon = True 472 p.start() 473 p.join(5) 474 self.assertEqual(p.exitcode, code) 475 476 with open(testfn, 'r') as f: 477 self.assertEqual(f.read().rstrip(), str(reason)) 478 479 for reason in (True, False, 8): 480 p = self.Process(target=sys.exit, args=(reason,)) 481 p.daemon = True 482 p.start() 483 p.join(5) 484 self.assertEqual(p.exitcode, reason) 485 486# 487# 488# 489 490def queue_empty(q): 491 if hasattr(q, 'empty'): 492 return q.empty() 493 else: 494 return q.qsize() == 0 495 496def queue_full(q, maxsize): 497 if hasattr(q, 'full'): 498 return q.full() 499 else: 500 return q.qsize() == maxsize 501 502 503class _TestQueue(BaseTestCase): 504 505 506 @classmethod 507 def _test_put(cls, queue, child_can_start, parent_can_continue): 508 child_can_start.wait() 509 for i in range(6): 510 queue.get() 511 parent_can_continue.set() 512 513 def test_put(self): 514 MAXSIZE = 6 515 queue = self.Queue(maxsize=MAXSIZE) 516 child_can_start = self.Event() 517 parent_can_continue = self.Event() 518 519 proc = self.Process( 520 target=self._test_put, 521 args=(queue, child_can_start, parent_can_continue) 522 ) 523 proc.daemon = True 524 proc.start() 525 526 self.assertEqual(queue_empty(queue), True) 527 self.assertEqual(queue_full(queue, MAXSIZE), False) 528 529 queue.put(1) 530 queue.put(2, True) 531 queue.put(3, True, None) 532 queue.put(4, False) 533 queue.put(5, False, None) 534 queue.put_nowait(6) 535 536 # the values may be in buffer but not yet in pipe so sleep a bit 537 time.sleep(DELTA) 538 539 self.assertEqual(queue_empty(queue), False) 540 self.assertEqual(queue_full(queue, MAXSIZE), True) 541 542 put = TimingWrapper(queue.put) 543 put_nowait = TimingWrapper(queue.put_nowait) 544 545 self.assertRaises(pyqueue.Full, put, 7, False) 546 self.assertTimingAlmostEqual(put.elapsed, 0) 547 548 self.assertRaises(pyqueue.Full, put, 7, False, None) 549 self.assertTimingAlmostEqual(put.elapsed, 0) 550 551 self.assertRaises(pyqueue.Full, put_nowait, 7) 552 self.assertTimingAlmostEqual(put_nowait.elapsed, 0) 553 554 self.assertRaises(pyqueue.Full, put, 7, True, TIMEOUT1) 555 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT1) 556 557 self.assertRaises(pyqueue.Full, put, 7, False, TIMEOUT2) 558 self.assertTimingAlmostEqual(put.elapsed, 0) 559 560 self.assertRaises(pyqueue.Full, put, 7, True, timeout=TIMEOUT3) 561 self.assertTimingAlmostEqual(put.elapsed, TIMEOUT3) 562 563 child_can_start.set() 564 parent_can_continue.wait() 565 566 self.assertEqual(queue_empty(queue), True) 567 self.assertEqual(queue_full(queue, MAXSIZE), False) 568 569 proc.join() 570 571 @classmethod 572 def _test_get(cls, queue, child_can_start, parent_can_continue): 573 child_can_start.wait() 574 #queue.put(1) 575 queue.put(2) 576 queue.put(3) 577 queue.put(4) 578 queue.put(5) 579 parent_can_continue.set() 580 581 def test_get(self): 582 queue = self.Queue() 583 child_can_start = self.Event() 584 parent_can_continue = self.Event() 585 586 proc = self.Process( 587 target=self._test_get, 588 args=(queue, child_can_start, parent_can_continue) 589 ) 590 proc.daemon = True 591 proc.start() 592 593 self.assertEqual(queue_empty(queue), True) 594 595 child_can_start.set() 596 parent_can_continue.wait() 597 598 time.sleep(DELTA) 599 self.assertEqual(queue_empty(queue), False) 600 601 # Hangs unexpectedly, remove for now 602 #self.assertEqual(queue.get(), 1) 603 self.assertEqual(queue.get(True, None), 2) 604 self.assertEqual(queue.get(True), 3) 605 self.assertEqual(queue.get(timeout=1), 4) 606 self.assertEqual(queue.get_nowait(), 5) 607 608 self.assertEqual(queue_empty(queue), True) 609 610 get = TimingWrapper(queue.get) 611 get_nowait = TimingWrapper(queue.get_nowait) 612 613 self.assertRaises(pyqueue.Empty, get, False) 614 self.assertTimingAlmostEqual(get.elapsed, 0) 615 616 self.assertRaises(pyqueue.Empty, get, False, None) 617 self.assertTimingAlmostEqual(get.elapsed, 0) 618 619 self.assertRaises(pyqueue.Empty, get_nowait) 620 self.assertTimingAlmostEqual(get_nowait.elapsed, 0) 621 622 self.assertRaises(pyqueue.Empty, get, True, TIMEOUT1) 623 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 624 625 self.assertRaises(pyqueue.Empty, get, False, TIMEOUT2) 626 self.assertTimingAlmostEqual(get.elapsed, 0) 627 628 self.assertRaises(pyqueue.Empty, get, timeout=TIMEOUT3) 629 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT3) 630 631 proc.join() 632 633 @classmethod 634 def _test_fork(cls, queue): 635 for i in range(10, 20): 636 queue.put(i) 637 # note that at this point the items may only be buffered, so the 638 # process cannot shutdown until the feeder thread has finished 639 # pushing items onto the pipe. 640 641 def test_fork(self): 642 # Old versions of Queue would fail to create a new feeder 643 # thread for a forked process if the original process had its 644 # own feeder thread. This test checks that this no longer 645 # happens. 646 647 queue = self.Queue() 648 649 # put items on queue so that main process starts a feeder thread 650 for i in range(10): 651 queue.put(i) 652 653 # wait to make sure thread starts before we fork a new process 654 time.sleep(DELTA) 655 656 # fork process 657 p = self.Process(target=self._test_fork, args=(queue,)) 658 p.daemon = True 659 p.start() 660 661 # check that all expected items are in the queue 662 for i in range(20): 663 self.assertEqual(queue.get(), i) 664 self.assertRaises(pyqueue.Empty, queue.get, False) 665 666 p.join() 667 668 def test_qsize(self): 669 q = self.Queue() 670 try: 671 self.assertEqual(q.qsize(), 0) 672 except NotImplementedError: 673 self.skipTest('qsize method not implemented') 674 q.put(1) 675 self.assertEqual(q.qsize(), 1) 676 q.put(5) 677 self.assertEqual(q.qsize(), 2) 678 q.get() 679 self.assertEqual(q.qsize(), 1) 680 q.get() 681 self.assertEqual(q.qsize(), 0) 682 683 @classmethod 684 def _test_task_done(cls, q): 685 for obj in iter(q.get, None): 686 time.sleep(DELTA) 687 q.task_done() 688 689 def test_task_done(self): 690 queue = self.JoinableQueue() 691 692 if sys.version_info < (2, 5) and not hasattr(queue, 'task_done'): 693 self.skipTest("requires 'queue.task_done()' method") 694 695 workers = [self.Process(target=self._test_task_done, args=(queue,)) 696 for i in range(4)] 697 698 for p in workers: 699 p.daemon = True 700 p.start() 701 702 for i in range(10): 703 queue.put(i) 704 705 queue.join() 706 707 for p in workers: 708 queue.put(None) 709 710 for p in workers: 711 p.join() 712 713 def test_timeout(self): 714 q = multiprocessing.Queue() 715 start = time.time() 716 self.assertRaises(pyqueue.Empty, q.get, True, 0.2) 717 delta = time.time() - start 718 self.assertGreaterEqual(delta, 0.18) 719 720# 721# 722# 723 724class _TestLock(BaseTestCase): 725 726 def test_lock(self): 727 lock = self.Lock() 728 self.assertEqual(lock.acquire(), True) 729 self.assertEqual(lock.acquire(False), False) 730 self.assertEqual(lock.release(), None) 731 self.assertRaises((ValueError, threading.ThreadError), lock.release) 732 733 def test_rlock(self): 734 lock = self.RLock() 735 self.assertEqual(lock.acquire(), True) 736 self.assertEqual(lock.acquire(), True) 737 self.assertEqual(lock.acquire(), True) 738 self.assertEqual(lock.release(), None) 739 self.assertEqual(lock.release(), None) 740 self.assertEqual(lock.release(), None) 741 self.assertRaises((AssertionError, RuntimeError), lock.release) 742 743 def test_lock_context(self): 744 with self.Lock(): 745 pass 746 747 748class _TestSemaphore(BaseTestCase): 749 750 def _test_semaphore(self, sem): 751 self.assertReturnsIfImplemented(2, get_value, sem) 752 self.assertEqual(sem.acquire(), True) 753 self.assertReturnsIfImplemented(1, get_value, sem) 754 self.assertEqual(sem.acquire(), True) 755 self.assertReturnsIfImplemented(0, get_value, sem) 756 self.assertEqual(sem.acquire(False), False) 757 self.assertReturnsIfImplemented(0, get_value, sem) 758 self.assertEqual(sem.release(), None) 759 self.assertReturnsIfImplemented(1, get_value, sem) 760 self.assertEqual(sem.release(), None) 761 self.assertReturnsIfImplemented(2, get_value, sem) 762 763 def test_semaphore(self): 764 sem = self.Semaphore(2) 765 self._test_semaphore(sem) 766 self.assertEqual(sem.release(), None) 767 self.assertReturnsIfImplemented(3, get_value, sem) 768 self.assertEqual(sem.release(), None) 769 self.assertReturnsIfImplemented(4, get_value, sem) 770 771 def test_bounded_semaphore(self): 772 sem = self.BoundedSemaphore(2) 773 self._test_semaphore(sem) 774 # Currently fails on OS/X 775 #if HAVE_GETVALUE: 776 # self.assertRaises(ValueError, sem.release) 777 # self.assertReturnsIfImplemented(2, get_value, sem) 778 779 def test_timeout(self): 780 if self.TYPE != 'processes': 781 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 782 783 sem = self.Semaphore(0) 784 acquire = TimingWrapper(sem.acquire) 785 786 self.assertEqual(acquire(False), False) 787 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 788 789 self.assertEqual(acquire(False, None), False) 790 self.assertTimingAlmostEqual(acquire.elapsed, 0.0) 791 792 self.assertEqual(acquire(False, TIMEOUT1), False) 793 self.assertTimingAlmostEqual(acquire.elapsed, 0) 794 795 self.assertEqual(acquire(True, TIMEOUT2), False) 796 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT2) 797 798 self.assertEqual(acquire(timeout=TIMEOUT3), False) 799 self.assertTimingAlmostEqual(acquire.elapsed, TIMEOUT3) 800 801 802class _TestCondition(BaseTestCase): 803 804 @classmethod 805 def f(cls, cond, sleeping, woken, timeout=None): 806 cond.acquire() 807 sleeping.release() 808 cond.wait(timeout) 809 woken.release() 810 cond.release() 811 812 def check_invariant(self, cond): 813 # this is only supposed to succeed when there are no sleepers 814 if self.TYPE == 'processes': 815 try: 816 sleepers = (cond._sleeping_count.get_value() - 817 cond._woken_count.get_value()) 818 self.assertEqual(sleepers, 0) 819 self.assertEqual(cond._wait_semaphore.get_value(), 0) 820 except NotImplementedError: 821 pass 822 823 def test_notify(self): 824 cond = self.Condition() 825 sleeping = self.Semaphore(0) 826 woken = self.Semaphore(0) 827 828 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 829 p.daemon = True 830 p.start() 831 832 p = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 833 p.daemon = True 834 p.start() 835 836 # wait for both children to start sleeping 837 sleeping.acquire() 838 sleeping.acquire() 839 840 # check no process/thread has woken up 841 time.sleep(DELTA) 842 self.assertReturnsIfImplemented(0, get_value, woken) 843 844 # wake up one process/thread 845 cond.acquire() 846 cond.notify() 847 cond.release() 848 849 # check one process/thread has woken up 850 time.sleep(DELTA) 851 self.assertReturnsIfImplemented(1, get_value, woken) 852 853 # wake up another 854 cond.acquire() 855 cond.notify() 856 cond.release() 857 858 # check other has woken up 859 time.sleep(DELTA) 860 self.assertReturnsIfImplemented(2, get_value, woken) 861 862 # check state is not mucked up 863 self.check_invariant(cond) 864 p.join() 865 866 def test_notify_all(self): 867 cond = self.Condition() 868 sleeping = self.Semaphore(0) 869 woken = self.Semaphore(0) 870 871 # start some threads/processes which will timeout 872 for i in range(3): 873 p = self.Process(target=self.f, 874 args=(cond, sleeping, woken, TIMEOUT1)) 875 p.daemon = True 876 p.start() 877 878 t = threading.Thread(target=self.f, 879 args=(cond, sleeping, woken, TIMEOUT1)) 880 t.daemon = True 881 t.start() 882 883 # wait for them all to sleep 884 for i in range(6): 885 sleeping.acquire() 886 887 # check they have all timed out 888 for i in range(6): 889 woken.acquire() 890 self.assertReturnsIfImplemented(0, get_value, woken) 891 892 # check state is not mucked up 893 self.check_invariant(cond) 894 895 # start some more threads/processes 896 for i in range(3): 897 p = self.Process(target=self.f, args=(cond, sleeping, woken)) 898 p.daemon = True 899 p.start() 900 901 t = threading.Thread(target=self.f, args=(cond, sleeping, woken)) 902 t.daemon = True 903 t.start() 904 905 # wait for them to all sleep 906 for i in range(6): 907 sleeping.acquire() 908 909 # check no process/thread has woken up 910 time.sleep(DELTA) 911 self.assertReturnsIfImplemented(0, get_value, woken) 912 913 # wake them all up 914 cond.acquire() 915 cond.notify_all() 916 cond.release() 917 918 # check they have all woken 919 for i in range(10): 920 try: 921 if get_value(woken) == 6: 922 break 923 except NotImplementedError: 924 break 925 time.sleep(DELTA) 926 self.assertReturnsIfImplemented(6, get_value, woken) 927 928 # check state is not mucked up 929 self.check_invariant(cond) 930 931 def test_timeout(self): 932 cond = self.Condition() 933 wait = TimingWrapper(cond.wait) 934 cond.acquire() 935 res = wait(TIMEOUT1) 936 cond.release() 937 self.assertEqual(res, False) 938 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 939 940 @classmethod 941 def _test_waitfor_f(cls, cond, state): 942 with cond: 943 state.value = 0 944 cond.notify() 945 result = cond.wait_for(lambda : state.value==4) 946 if not result or state.value != 4: 947 sys.exit(1) 948 949 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 950 def test_waitfor(self): 951 # based on test in test/lock_tests.py 952 cond = self.Condition() 953 state = self.Value('i', -1) 954 955 p = self.Process(target=self._test_waitfor_f, args=(cond, state)) 956 p.daemon = True 957 p.start() 958 959 with cond: 960 result = cond.wait_for(lambda : state.value==0) 961 self.assertTrue(result) 962 self.assertEqual(state.value, 0) 963 964 for i in range(4): 965 time.sleep(0.01) 966 with cond: 967 state.value += 1 968 cond.notify() 969 970 p.join(5) 971 self.assertFalse(p.is_alive()) 972 self.assertEqual(p.exitcode, 0) 973 974 @classmethod 975 def _test_waitfor_timeout_f(cls, cond, state, success, sem): 976 sem.release() 977 with cond: 978 expected = 0.1 979 dt = time.time() 980 result = cond.wait_for(lambda : state.value==4, timeout=expected) 981 dt = time.time() - dt 982 # borrow logic in assertTimeout() from test/lock_tests.py 983 if not result and expected * 0.6 < dt < expected * 10.0: 984 success.value = True 985 986 @unittest.skipUnless(HAS_SHAREDCTYPES, 'needs sharedctypes') 987 def test_waitfor_timeout(self): 988 # based on test in test/lock_tests.py 989 cond = self.Condition() 990 state = self.Value('i', 0) 991 success = self.Value('i', False) 992 sem = self.Semaphore(0) 993 994 p = self.Process(target=self._test_waitfor_timeout_f, 995 args=(cond, state, success, sem)) 996 p.daemon = True 997 p.start() 998 self.assertTrue(sem.acquire(timeout=10)) 999 1000 # Only increment 3 times, so state == 4 is never reached. 1001 for i in range(3): 1002 time.sleep(0.01) 1003 with cond: 1004 state.value += 1 1005 cond.notify() 1006 1007 p.join(5) 1008 self.assertTrue(success.value) 1009 1010 @classmethod 1011 def _test_wait_result(cls, c, pid): 1012 with c: 1013 c.notify() 1014 time.sleep(1) 1015 if pid is not None: 1016 os.kill(pid, signal.SIGINT) 1017 1018 def test_wait_result(self): 1019 if isinstance(self, ProcessesMixin) and sys.platform != 'win32': 1020 pid = os.getpid() 1021 else: 1022 pid = None 1023 1024 c = self.Condition() 1025 with c: 1026 self.assertFalse(c.wait(0)) 1027 self.assertFalse(c.wait(0.1)) 1028 1029 p = self.Process(target=self._test_wait_result, args=(c, pid)) 1030 p.start() 1031 1032 self.assertTrue(c.wait(10)) 1033 if pid is not None: 1034 self.assertRaises(KeyboardInterrupt, c.wait, 10) 1035 1036 p.join() 1037 1038 1039class _TestEvent(BaseTestCase): 1040 1041 @classmethod 1042 def _test_event(cls, event): 1043 time.sleep(TIMEOUT2) 1044 event.set() 1045 1046 def test_event(self): 1047 event = self.Event() 1048 wait = TimingWrapper(event.wait) 1049 1050 # Removed temporarily, due to API shear, this does not 1051 # work with threading._Event objects. is_set == isSet 1052 self.assertEqual(event.is_set(), False) 1053 1054 # Removed, threading.Event.wait() will return the value of the __flag 1055 # instead of None. API Shear with the semaphore backed mp.Event 1056 self.assertEqual(wait(0.0), False) 1057 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1058 self.assertEqual(wait(TIMEOUT1), False) 1059 self.assertTimingAlmostEqual(wait.elapsed, TIMEOUT1) 1060 1061 event.set() 1062 1063 # See note above on the API differences 1064 self.assertEqual(event.is_set(), True) 1065 self.assertEqual(wait(), True) 1066 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1067 self.assertEqual(wait(TIMEOUT1), True) 1068 self.assertTimingAlmostEqual(wait.elapsed, 0.0) 1069 # self.assertEqual(event.is_set(), True) 1070 1071 event.clear() 1072 1073 #self.assertEqual(event.is_set(), False) 1074 1075 p = self.Process(target=self._test_event, args=(event,)) 1076 p.daemon = True 1077 p.start() 1078 self.assertEqual(wait(), True) 1079 1080# 1081# Tests for Barrier - adapted from tests in test/lock_tests.py 1082# 1083 1084# Many of the tests for threading.Barrier use a list as an atomic 1085# counter: a value is appended to increment the counter, and the 1086# length of the list gives the value. We use the class DummyList 1087# for the same purpose. 1088 1089class _DummyList(object): 1090 1091 def __init__(self): 1092 wrapper = multiprocessing.heap.BufferWrapper(struct.calcsize('i')) 1093 lock = multiprocessing.Lock() 1094 self.__setstate__((wrapper, lock)) 1095 self._lengthbuf[0] = 0 1096 1097 def __setstate__(self, state): 1098 (self._wrapper, self._lock) = state 1099 self._lengthbuf = self._wrapper.create_memoryview().cast('i') 1100 1101 def __getstate__(self): 1102 return (self._wrapper, self._lock) 1103 1104 def append(self, _): 1105 with self._lock: 1106 self._lengthbuf[0] += 1 1107 1108 def __len__(self): 1109 with self._lock: 1110 return self._lengthbuf[0] 1111 1112def _wait(): 1113 # A crude wait/yield function not relying on synchronization primitives. 1114 time.sleep(0.01) 1115 1116 1117class Bunch(object): 1118 """ 1119 A bunch of threads. 1120 """ 1121 def __init__(self, namespace, f, args, n, wait_before_exit=False): 1122 """ 1123 Construct a bunch of `n` threads running the same function `f`. 1124 If `wait_before_exit` is True, the threads won't terminate until 1125 do_finish() is called. 1126 """ 1127 self.f = f 1128 self.args = args 1129 self.n = n 1130 self.started = namespace.DummyList() 1131 self.finished = namespace.DummyList() 1132 self._can_exit = namespace.Event() 1133 if not wait_before_exit: 1134 self._can_exit.set() 1135 for i in range(n): 1136 p = namespace.Process(target=self.task) 1137 p.daemon = True 1138 p.start() 1139 1140 def task(self): 1141 pid = os.getpid() 1142 self.started.append(pid) 1143 try: 1144 self.f(*self.args) 1145 finally: 1146 self.finished.append(pid) 1147 self._can_exit.wait(30) 1148 assert self._can_exit.is_set() 1149 1150 def wait_for_started(self): 1151 while len(self.started) < self.n: 1152 _wait() 1153 1154 def wait_for_finished(self): 1155 while len(self.finished) < self.n: 1156 _wait() 1157 1158 def do_finish(self): 1159 self._can_exit.set() 1160 1161 1162class AppendTrue(object): 1163 def __init__(self, obj): 1164 self.obj = obj 1165 def __call__(self): 1166 self.obj.append(True) 1167 1168 1169class _TestBarrier(BaseTestCase): 1170 """ 1171 Tests for Barrier objects. 1172 """ 1173 N = 5 1174 defaultTimeout = 30.0 # XXX Slow Windows buildbots need generous timeout 1175 1176 def setUp(self): 1177 self.barrier = self.Barrier(self.N, timeout=self.defaultTimeout) 1178 1179 def tearDown(self): 1180 self.barrier.abort() 1181 self.barrier = None 1182 1183 def DummyList(self): 1184 if self.TYPE == 'threads': 1185 return [] 1186 elif self.TYPE == 'manager': 1187 return self.manager.list() 1188 else: 1189 return _DummyList() 1190 1191 def run_threads(self, f, args): 1192 b = Bunch(self, f, args, self.N-1) 1193 f(*args) 1194 b.wait_for_finished() 1195 1196 @classmethod 1197 def multipass(cls, barrier, results, n): 1198 m = barrier.parties 1199 assert m == cls.N 1200 for i in range(n): 1201 results[0].append(True) 1202 assert len(results[1]) == i * m 1203 barrier.wait() 1204 results[1].append(True) 1205 assert len(results[0]) == (i + 1) * m 1206 barrier.wait() 1207 try: 1208 assert barrier.n_waiting == 0 1209 except NotImplementedError: 1210 pass 1211 assert not barrier.broken 1212 1213 def test_barrier(self, passes=1): 1214 """ 1215 Test that a barrier is passed in lockstep 1216 """ 1217 results = [self.DummyList(), self.DummyList()] 1218 self.run_threads(self.multipass, (self.barrier, results, passes)) 1219 1220 def test_barrier_10(self): 1221 """ 1222 Test that a barrier works for 10 consecutive runs 1223 """ 1224 return self.test_barrier(10) 1225 1226 @classmethod 1227 def _test_wait_return_f(cls, barrier, queue): 1228 res = barrier.wait() 1229 queue.put(res) 1230 1231 def test_wait_return(self): 1232 """ 1233 test the return value from barrier.wait 1234 """ 1235 queue = self.Queue() 1236 self.run_threads(self._test_wait_return_f, (self.barrier, queue)) 1237 results = [queue.get() for i in range(self.N)] 1238 self.assertEqual(results.count(0), 1) 1239 1240 @classmethod 1241 def _test_action_f(cls, barrier, results): 1242 barrier.wait() 1243 if len(results) != 1: 1244 raise RuntimeError 1245 1246 def test_action(self): 1247 """ 1248 Test the 'action' callback 1249 """ 1250 results = self.DummyList() 1251 barrier = self.Barrier(self.N, action=AppendTrue(results)) 1252 self.run_threads(self._test_action_f, (barrier, results)) 1253 self.assertEqual(len(results), 1) 1254 1255 @classmethod 1256 def _test_abort_f(cls, barrier, results1, results2): 1257 try: 1258 i = barrier.wait() 1259 if i == cls.N//2: 1260 raise RuntimeError 1261 barrier.wait() 1262 results1.append(True) 1263 except threading.BrokenBarrierError: 1264 results2.append(True) 1265 except RuntimeError: 1266 barrier.abort() 1267 1268 def test_abort(self): 1269 """ 1270 Test that an abort will put the barrier in a broken state 1271 """ 1272 results1 = self.DummyList() 1273 results2 = self.DummyList() 1274 self.run_threads(self._test_abort_f, 1275 (self.barrier, results1, results2)) 1276 self.assertEqual(len(results1), 0) 1277 self.assertEqual(len(results2), self.N-1) 1278 self.assertTrue(self.barrier.broken) 1279 1280 @classmethod 1281 def _test_reset_f(cls, barrier, results1, results2, results3): 1282 i = barrier.wait() 1283 if i == cls.N//2: 1284 # Wait until the other threads are all in the barrier. 1285 while barrier.n_waiting < cls.N-1: 1286 time.sleep(0.001) 1287 barrier.reset() 1288 else: 1289 try: 1290 barrier.wait() 1291 results1.append(True) 1292 except threading.BrokenBarrierError: 1293 results2.append(True) 1294 # Now, pass the barrier again 1295 barrier.wait() 1296 results3.append(True) 1297 1298 def test_reset(self): 1299 """ 1300 Test that a 'reset' on a barrier frees the waiting threads 1301 """ 1302 results1 = self.DummyList() 1303 results2 = self.DummyList() 1304 results3 = self.DummyList() 1305 self.run_threads(self._test_reset_f, 1306 (self.barrier, results1, results2, results3)) 1307 self.assertEqual(len(results1), 0) 1308 self.assertEqual(len(results2), self.N-1) 1309 self.assertEqual(len(results3), self.N) 1310 1311 @classmethod 1312 def _test_abort_and_reset_f(cls, barrier, barrier2, 1313 results1, results2, results3): 1314 try: 1315 i = barrier.wait() 1316 if i == cls.N//2: 1317 raise RuntimeError 1318 barrier.wait() 1319 results1.append(True) 1320 except threading.BrokenBarrierError: 1321 results2.append(True) 1322 except RuntimeError: 1323 barrier.abort() 1324 # Synchronize and reset the barrier. Must synchronize first so 1325 # that everyone has left it when we reset, and after so that no 1326 # one enters it before the reset. 1327 if barrier2.wait() == cls.N//2: 1328 barrier.reset() 1329 barrier2.wait() 1330 barrier.wait() 1331 results3.append(True) 1332 1333 def test_abort_and_reset(self): 1334 """ 1335 Test that a barrier can be reset after being broken. 1336 """ 1337 results1 = self.DummyList() 1338 results2 = self.DummyList() 1339 results3 = self.DummyList() 1340 barrier2 = self.Barrier(self.N) 1341 1342 self.run_threads(self._test_abort_and_reset_f, 1343 (self.barrier, barrier2, results1, results2, results3)) 1344 self.assertEqual(len(results1), 0) 1345 self.assertEqual(len(results2), self.N-1) 1346 self.assertEqual(len(results3), self.N) 1347 1348 @classmethod 1349 def _test_timeout_f(cls, barrier, results): 1350 i = barrier.wait() 1351 if i == cls.N//2: 1352 # One thread is late! 1353 time.sleep(1.0) 1354 try: 1355 barrier.wait(0.5) 1356 except threading.BrokenBarrierError: 1357 results.append(True) 1358 1359 def test_timeout(self): 1360 """ 1361 Test wait(timeout) 1362 """ 1363 results = self.DummyList() 1364 self.run_threads(self._test_timeout_f, (self.barrier, results)) 1365 self.assertEqual(len(results), self.barrier.parties) 1366 1367 @classmethod 1368 def _test_default_timeout_f(cls, barrier, results): 1369 i = barrier.wait(cls.defaultTimeout) 1370 if i == cls.N//2: 1371 # One thread is later than the default timeout 1372 time.sleep(1.0) 1373 try: 1374 barrier.wait() 1375 except threading.BrokenBarrierError: 1376 results.append(True) 1377 1378 def test_default_timeout(self): 1379 """ 1380 Test the barrier's default timeout 1381 """ 1382 barrier = self.Barrier(self.N, timeout=0.5) 1383 results = self.DummyList() 1384 self.run_threads(self._test_default_timeout_f, (barrier, results)) 1385 self.assertEqual(len(results), barrier.parties) 1386 1387 def test_single_thread(self): 1388 b = self.Barrier(1) 1389 b.wait() 1390 b.wait() 1391 1392 @classmethod 1393 def _test_thousand_f(cls, barrier, passes, conn, lock): 1394 for i in range(passes): 1395 barrier.wait() 1396 with lock: 1397 conn.send(i) 1398 1399 def test_thousand(self): 1400 if self.TYPE == 'manager': 1401 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1402 passes = 1000 1403 lock = self.Lock() 1404 conn, child_conn = self.Pipe(False) 1405 for j in range(self.N): 1406 p = self.Process(target=self._test_thousand_f, 1407 args=(self.barrier, passes, child_conn, lock)) 1408 p.start() 1409 1410 for i in range(passes): 1411 for j in range(self.N): 1412 self.assertEqual(conn.recv(), i) 1413 1414# 1415# 1416# 1417 1418class _TestValue(BaseTestCase): 1419 1420 ALLOWED_TYPES = ('processes',) 1421 1422 codes_values = [ 1423 ('i', 4343, 24234), 1424 ('d', 3.625, -4.25), 1425 ('h', -232, 234), 1426 ('c', latin('x'), latin('y')) 1427 ] 1428 1429 def setUp(self): 1430 if not HAS_SHAREDCTYPES: 1431 self.skipTest("requires multiprocessing.sharedctypes") 1432 1433 @classmethod 1434 def _test(cls, values): 1435 for sv, cv in zip(values, cls.codes_values): 1436 sv.value = cv[2] 1437 1438 1439 def test_value(self, raw=False): 1440 if raw: 1441 values = [self.RawValue(code, value) 1442 for code, value, _ in self.codes_values] 1443 else: 1444 values = [self.Value(code, value) 1445 for code, value, _ in self.codes_values] 1446 1447 for sv, cv in zip(values, self.codes_values): 1448 self.assertEqual(sv.value, cv[1]) 1449 1450 proc = self.Process(target=self._test, args=(values,)) 1451 proc.daemon = True 1452 proc.start() 1453 proc.join() 1454 1455 for sv, cv in zip(values, self.codes_values): 1456 self.assertEqual(sv.value, cv[2]) 1457 1458 def test_rawvalue(self): 1459 self.test_value(raw=True) 1460 1461 def test_getobj_getlock(self): 1462 val1 = self.Value('i', 5) 1463 lock1 = val1.get_lock() 1464 obj1 = val1.get_obj() 1465 1466 val2 = self.Value('i', 5, lock=None) 1467 lock2 = val2.get_lock() 1468 obj2 = val2.get_obj() 1469 1470 lock = self.Lock() 1471 val3 = self.Value('i', 5, lock=lock) 1472 lock3 = val3.get_lock() 1473 obj3 = val3.get_obj() 1474 self.assertEqual(lock, lock3) 1475 1476 arr4 = self.Value('i', 5, lock=False) 1477 self.assertFalse(hasattr(arr4, 'get_lock')) 1478 self.assertFalse(hasattr(arr4, 'get_obj')) 1479 1480 self.assertRaises(AttributeError, self.Value, 'i', 5, lock='navalue') 1481 1482 arr5 = self.RawValue('i', 5) 1483 self.assertFalse(hasattr(arr5, 'get_lock')) 1484 self.assertFalse(hasattr(arr5, 'get_obj')) 1485 1486 1487class _TestArray(BaseTestCase): 1488 1489 ALLOWED_TYPES = ('processes',) 1490 1491 @classmethod 1492 def f(cls, seq): 1493 for i in range(1, len(seq)): 1494 seq[i] += seq[i-1] 1495 1496 @unittest.skipIf(c_int is None, "requires _ctypes") 1497 def test_array(self, raw=False): 1498 seq = [680, 626, 934, 821, 150, 233, 548, 982, 714, 831] 1499 if raw: 1500 arr = self.RawArray('i', seq) 1501 else: 1502 arr = self.Array('i', seq) 1503 1504 self.assertEqual(len(arr), len(seq)) 1505 self.assertEqual(arr[3], seq[3]) 1506 self.assertEqual(list(arr[2:7]), list(seq[2:7])) 1507 1508 arr[4:8] = seq[4:8] = array.array('i', [1, 2, 3, 4]) 1509 1510 self.assertEqual(list(arr[:]), seq) 1511 1512 self.f(seq) 1513 1514 p = self.Process(target=self.f, args=(arr,)) 1515 p.daemon = True 1516 p.start() 1517 p.join() 1518 1519 self.assertEqual(list(arr[:]), seq) 1520 1521 @unittest.skipIf(c_int is None, "requires _ctypes") 1522 def test_array_from_size(self): 1523 size = 10 1524 # Test for zeroing (see issue #11675). 1525 # The repetition below strengthens the test by increasing the chances 1526 # of previously allocated non-zero memory being used for the new array 1527 # on the 2nd and 3rd loops. 1528 for _ in range(3): 1529 arr = self.Array('i', size) 1530 self.assertEqual(len(arr), size) 1531 self.assertEqual(list(arr), [0] * size) 1532 arr[:] = range(10) 1533 self.assertEqual(list(arr), list(range(10))) 1534 del arr 1535 1536 @unittest.skipIf(c_int is None, "requires _ctypes") 1537 def test_rawarray(self): 1538 self.test_array(raw=True) 1539 1540 @unittest.skipIf(c_int is None, "requires _ctypes") 1541 def test_getobj_getlock_obj(self): 1542 arr1 = self.Array('i', list(range(10))) 1543 lock1 = arr1.get_lock() 1544 obj1 = arr1.get_obj() 1545 1546 arr2 = self.Array('i', list(range(10)), lock=None) 1547 lock2 = arr2.get_lock() 1548 obj2 = arr2.get_obj() 1549 1550 lock = self.Lock() 1551 arr3 = self.Array('i', list(range(10)), lock=lock) 1552 lock3 = arr3.get_lock() 1553 obj3 = arr3.get_obj() 1554 self.assertEqual(lock, lock3) 1555 1556 arr4 = self.Array('i', range(10), lock=False) 1557 self.assertFalse(hasattr(arr4, 'get_lock')) 1558 self.assertFalse(hasattr(arr4, 'get_obj')) 1559 self.assertRaises(AttributeError, 1560 self.Array, 'i', range(10), lock='notalock') 1561 1562 arr5 = self.RawArray('i', range(10)) 1563 self.assertFalse(hasattr(arr5, 'get_lock')) 1564 self.assertFalse(hasattr(arr5, 'get_obj')) 1565 1566# 1567# 1568# 1569 1570class _TestContainers(BaseTestCase): 1571 1572 ALLOWED_TYPES = ('manager',) 1573 1574 def test_list(self): 1575 a = self.list(list(range(10))) 1576 self.assertEqual(a[:], list(range(10))) 1577 1578 b = self.list() 1579 self.assertEqual(b[:], []) 1580 1581 b.extend(list(range(5))) 1582 self.assertEqual(b[:], list(range(5))) 1583 1584 self.assertEqual(b[2], 2) 1585 self.assertEqual(b[2:10], [2,3,4]) 1586 1587 b *= 2 1588 self.assertEqual(b[:], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]) 1589 1590 self.assertEqual(b + [5, 6], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4, 5, 6]) 1591 1592 self.assertEqual(a[:], list(range(10))) 1593 1594 d = [a, b] 1595 e = self.list(d) 1596 self.assertEqual( 1597 e[:], 1598 [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9], [0, 1, 2, 3, 4, 0, 1, 2, 3, 4]] 1599 ) 1600 1601 f = self.list([a]) 1602 a.append('hello') 1603 self.assertEqual(f[:], [[0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 'hello']]) 1604 1605 def test_dict(self): 1606 d = self.dict() 1607 indices = list(range(65, 70)) 1608 for i in indices: 1609 d[i] = chr(i) 1610 self.assertEqual(d.copy(), dict((i, chr(i)) for i in indices)) 1611 self.assertEqual(sorted(d.keys()), indices) 1612 self.assertEqual(sorted(d.values()), [chr(i) for i in indices]) 1613 self.assertEqual(sorted(d.items()), [(i, chr(i)) for i in indices]) 1614 1615 def test_namespace(self): 1616 n = self.Namespace() 1617 n.name = 'Bob' 1618 n.job = 'Builder' 1619 n._hidden = 'hidden' 1620 self.assertEqual((n.name, n.job), ('Bob', 'Builder')) 1621 del n.job 1622 self.assertEqual(str(n), "Namespace(name='Bob')") 1623 self.assertTrue(hasattr(n, 'name')) 1624 self.assertTrue(not hasattr(n, 'job')) 1625 1626# 1627# 1628# 1629 1630def sqr(x, wait=0.0): 1631 time.sleep(wait) 1632 return x*x 1633 1634def mul(x, y): 1635 return x*y 1636 1637class _TestPool(BaseTestCase): 1638 1639 @classmethod 1640 def setUpClass(cls): 1641 super().setUpClass() 1642 cls.pool = cls.Pool(4) 1643 1644 @classmethod 1645 def tearDownClass(cls): 1646 cls.pool.terminate() 1647 cls.pool.join() 1648 cls.pool = None 1649 super().tearDownClass() 1650 1651 def test_apply(self): 1652 papply = self.pool.apply 1653 self.assertEqual(papply(sqr, (5,)), sqr(5)) 1654 self.assertEqual(papply(sqr, (), {'x':3}), sqr(x=3)) 1655 1656 def test_map(self): 1657 pmap = self.pool.map 1658 self.assertEqual(pmap(sqr, list(range(10))), list(map(sqr, list(range(10))))) 1659 self.assertEqual(pmap(sqr, list(range(100)), chunksize=20), 1660 list(map(sqr, list(range(100))))) 1661 1662 def test_starmap(self): 1663 psmap = self.pool.starmap 1664 tuples = list(zip(range(10), range(9,-1, -1))) 1665 self.assertEqual(psmap(mul, tuples), 1666 list(itertools.starmap(mul, tuples))) 1667 tuples = list(zip(range(100), range(99,-1, -1))) 1668 self.assertEqual(psmap(mul, tuples, chunksize=20), 1669 list(itertools.starmap(mul, tuples))) 1670 1671 def test_starmap_async(self): 1672 tuples = list(zip(range(100), range(99,-1, -1))) 1673 self.assertEqual(self.pool.starmap_async(mul, tuples).get(), 1674 list(itertools.starmap(mul, tuples))) 1675 1676 def test_map_async(self): 1677 self.assertEqual(self.pool.map_async(sqr, list(range(10))).get(), 1678 list(map(sqr, list(range(10))))) 1679 1680 def _test_map_async_callbacks(self): 1681 call_args = self.manager.list() if self.TYPE == 'manager' else [] 1682 self.pool.map_async(int, ['1'], 1683 callback=call_args.append, 1684 error_callback=call_args.append).wait() 1685 self.assertEqual(1, len(call_args)) 1686 self.assertEqual([1], call_args[0]) 1687 self.pool.map_async(int, ['a'], 1688 callback=call_args.append, 1689 error_callback=call_args.append).wait() 1690 self.assertEqual(2, len(call_args)) 1691 self.assertIsInstance(call_args[1], ValueError) 1692 1693 def test_map_unplicklable(self): 1694 # Issue #19425 -- failure to pickle should not cause a hang 1695 if self.TYPE == 'threads': 1696 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 1697 class A(object): 1698 def __reduce__(self): 1699 raise RuntimeError('cannot pickle') 1700 with self.assertRaises(RuntimeError): 1701 self.pool.map(sqr, [A()]*10) 1702 1703 def test_map_chunksize(self): 1704 try: 1705 self.pool.map_async(sqr, [], chunksize=1).get(timeout=TIMEOUT1) 1706 except multiprocessing.TimeoutError: 1707 self.fail("pool.map_async with chunksize stalled on null list") 1708 1709 def test_async(self): 1710 res = self.pool.apply_async(sqr, (7, TIMEOUT1,)) 1711 get = TimingWrapper(res.get) 1712 self.assertEqual(get(), 49) 1713 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT1) 1714 1715 def test_async_timeout(self): 1716 res = self.pool.apply_async(sqr, (6, TIMEOUT2 + 1.0)) 1717 get = TimingWrapper(res.get) 1718 self.assertRaises(multiprocessing.TimeoutError, get, timeout=TIMEOUT2) 1719 self.assertTimingAlmostEqual(get.elapsed, TIMEOUT2) 1720 1721 def test_imap(self): 1722 it = self.pool.imap(sqr, list(range(10))) 1723 self.assertEqual(list(it), list(map(sqr, list(range(10))))) 1724 1725 it = self.pool.imap(sqr, list(range(10))) 1726 for i in range(10): 1727 self.assertEqual(next(it), i*i) 1728 self.assertRaises(StopIteration, it.__next__) 1729 1730 it = self.pool.imap(sqr, list(range(1000)), chunksize=100) 1731 for i in range(1000): 1732 self.assertEqual(next(it), i*i) 1733 self.assertRaises(StopIteration, it.__next__) 1734 1735 def test_imap_unordered(self): 1736 it = self.pool.imap_unordered(sqr, list(range(1000))) 1737 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1738 1739 it = self.pool.imap_unordered(sqr, list(range(1000)), chunksize=53) 1740 self.assertEqual(sorted(it), list(map(sqr, list(range(1000))))) 1741 1742 def test_make_pool(self): 1743 self.assertRaises(ValueError, multiprocessing.Pool, -1) 1744 self.assertRaises(ValueError, multiprocessing.Pool, 0) 1745 1746 p = multiprocessing.Pool(3) 1747 self.assertEqual(3, len(p._pool)) 1748 p.close() 1749 p.join() 1750 1751 def test_terminate(self): 1752 result = self.pool.map_async( 1753 time.sleep, [0.1 for i in range(10000)], chunksize=1 1754 ) 1755 self.pool.terminate() 1756 join = TimingWrapper(self.pool.join) 1757 join() 1758 self.assertLess(join.elapsed, 0.5) 1759 1760 def test_empty_iterable(self): 1761 # See Issue 12157 1762 p = self.Pool(1) 1763 1764 self.assertEqual(p.map(sqr, []), []) 1765 self.assertEqual(list(p.imap(sqr, [])), []) 1766 self.assertEqual(list(p.imap_unordered(sqr, [])), []) 1767 self.assertEqual(p.map_async(sqr, []).get(), []) 1768 1769 p.close() 1770 p.join() 1771 1772 def test_context(self): 1773 if self.TYPE == 'processes': 1774 L = list(range(10)) 1775 expected = [sqr(i) for i in L] 1776 with multiprocessing.Pool(2) as p: 1777 r = p.map_async(sqr, L) 1778 self.assertEqual(r.get(), expected) 1779 self.assertRaises(ValueError, p.map_async, sqr, L) 1780 1781def raising(): 1782 raise KeyError("key") 1783 1784def unpickleable_result(): 1785 return lambda: 42 1786 1787class _TestPoolWorkerErrors(BaseTestCase): 1788 ALLOWED_TYPES = ('processes', ) 1789 1790 def test_async_error_callback(self): 1791 p = multiprocessing.Pool(2) 1792 1793 scratchpad = [None] 1794 def errback(exc): 1795 scratchpad[0] = exc 1796 1797 res = p.apply_async(raising, error_callback=errback) 1798 self.assertRaises(KeyError, res.get) 1799 self.assertTrue(scratchpad[0]) 1800 self.assertIsInstance(scratchpad[0], KeyError) 1801 1802 p.close() 1803 p.join() 1804 1805 def _test_unpickleable_result(self): 1806 from multiprocess.pool import MaybeEncodingError 1807 p = multiprocessing.Pool(2) 1808 1809 # Make sure we don't lose pool processes because of encoding errors. 1810 for iteration in range(20): 1811 1812 scratchpad = [None] 1813 def errback(exc): 1814 scratchpad[0] = exc 1815 1816 res = p.apply_async(unpickleable_result, error_callback=errback) 1817 self.assertRaises(MaybeEncodingError, res.get) 1818 wrapped = scratchpad[0] 1819 self.assertTrue(wrapped) 1820 self.assertIsInstance(scratchpad[0], MaybeEncodingError) 1821 self.assertIsNotNone(wrapped.exc) 1822 self.assertIsNotNone(wrapped.value) 1823 1824 p.close() 1825 p.join() 1826 1827class _TestPoolWorkerLifetime(BaseTestCase): 1828 ALLOWED_TYPES = ('processes', ) 1829 1830 def test_pool_worker_lifetime(self): 1831 p = multiprocessing.Pool(3, maxtasksperchild=10) 1832 self.assertEqual(3, len(p._pool)) 1833 origworkerpids = [w.pid for w in p._pool] 1834 # Run many tasks so each worker gets replaced (hopefully) 1835 results = [] 1836 for i in range(100): 1837 results.append(p.apply_async(sqr, (i, ))) 1838 # Fetch the results and verify we got the right answers, 1839 # also ensuring all the tasks have completed. 1840 for (j, res) in enumerate(results): 1841 self.assertEqual(res.get(), sqr(j)) 1842 # Refill the pool 1843 p._repopulate_pool() 1844 # Wait until all workers are alive 1845 # (countdown * DELTA = 5 seconds max startup process time) 1846 countdown = 50 1847 while countdown and not all(w.is_alive() for w in p._pool): 1848 countdown -= 1 1849 time.sleep(DELTA) 1850 finalworkerpids = [w.pid for w in p._pool] 1851 # All pids should be assigned. See issue #7805. 1852 self.assertNotIn(None, origworkerpids) 1853 self.assertNotIn(None, finalworkerpids) 1854 # Finally, check that the worker pids have changed 1855 self.assertNotEqual(sorted(origworkerpids), sorted(finalworkerpids)) 1856 p.close() 1857 p.join() 1858 1859 def test_pool_worker_lifetime_early_close(self): 1860 # Issue #10332: closing a pool whose workers have limited lifetimes 1861 # before all the tasks completed would make join() hang. 1862 p = multiprocessing.Pool(3, maxtasksperchild=1) 1863 results = [] 1864 for i in range(6): 1865 results.append(p.apply_async(sqr, (i, 0.3))) 1866 p.close() 1867 p.join() 1868 # check the results 1869 for (j, res) in enumerate(results): 1870 self.assertEqual(res.get(), sqr(j)) 1871 1872# 1873# Test of creating a customized manager class 1874# 1875 1876from multiprocess.managers import BaseManager, BaseProxy, RemoteError 1877 1878class FooBar(object): 1879 def f(self): 1880 return 'f()' 1881 def g(self): 1882 raise ValueError 1883 def _h(self): 1884 return '_h()' 1885 1886def baz(): 1887 for i in range(10): 1888 yield i*i 1889 1890class IteratorProxy(BaseProxy): 1891 _exposed_ = ('__next__',) 1892 def __iter__(self): 1893 return self 1894 def __next__(self): 1895 return self._callmethod('__next__') 1896 1897class MyManager(BaseManager): 1898 pass 1899 1900MyManager.register('Foo', callable=FooBar) 1901MyManager.register('Bar', callable=FooBar, exposed=('f', '_h')) 1902MyManager.register('baz', callable=baz, proxytype=IteratorProxy) 1903 1904 1905class _TestMyManager(BaseTestCase): 1906 1907 ALLOWED_TYPES = ('manager',) 1908 1909 def test_mymanager(self): 1910 manager = MyManager() 1911 manager.start() 1912 self.common(manager) 1913 manager.shutdown() 1914 1915 # If the manager process exited cleanly then the exitcode 1916 # will be zero. Otherwise (after a short timeout) 1917 # terminate() is used, resulting in an exitcode of -SIGTERM. 1918 self.assertEqual(manager._process.exitcode, 0) 1919 1920 def test_mymanager_context(self): 1921 with MyManager() as manager: 1922 self.common(manager) 1923 self.assertEqual(manager._process.exitcode, 0) 1924 1925 def test_mymanager_context_prestarted(self): 1926 manager = MyManager() 1927 manager.start() 1928 with manager: 1929 self.common(manager) 1930 self.assertEqual(manager._process.exitcode, 0) 1931 1932 def common(self, manager): 1933 foo = manager.Foo() 1934 bar = manager.Bar() 1935 baz = manager.baz() 1936 1937 foo_methods = [name for name in ('f', 'g', '_h') if hasattr(foo, name)] 1938 bar_methods = [name for name in ('f', 'g', '_h') if hasattr(bar, name)] 1939 1940 self.assertEqual(foo_methods, ['f', 'g']) 1941 self.assertEqual(bar_methods, ['f', '_h']) 1942 1943 self.assertEqual(foo.f(), 'f()') 1944 self.assertRaises(ValueError, foo.g) 1945 self.assertEqual(foo._callmethod('f'), 'f()') 1946 self.assertRaises(RemoteError, foo._callmethod, '_h') 1947 1948 self.assertEqual(bar.f(), 'f()') 1949 self.assertEqual(bar._h(), '_h()') 1950 self.assertEqual(bar._callmethod('f'), 'f()') 1951 self.assertEqual(bar._callmethod('_h'), '_h()') 1952 1953 self.assertEqual(list(baz), [i*i for i in range(10)]) 1954 1955 1956# 1957# Test of connecting to a remote server and using xmlrpclib for serialization 1958# 1959 1960_queue = pyqueue.Queue() 1961def get_queue(): 1962 return _queue 1963 1964class QueueManager(BaseManager): 1965 '''manager class used by server process''' 1966QueueManager.register('get_queue', callable=get_queue) 1967 1968class QueueManager2(BaseManager): 1969 '''manager class which specifies the same interface as QueueManager''' 1970QueueManager2.register('get_queue') 1971 1972 1973SERIALIZER = 'xmlrpclib' 1974 1975class _TestRemoteManager(BaseTestCase): 1976 1977 ALLOWED_TYPES = ('manager',) 1978 1979 @classmethod 1980 def _putter(cls, address, authkey): 1981 manager = QueueManager2( 1982 address=address, authkey=authkey, serializer=SERIALIZER 1983 ) 1984 manager.connect() 1985 queue = manager.get_queue() 1986 queue.put(('hello world', None, True, 2.25)) 1987 1988 def test_remote(self): 1989 authkey = os.urandom(32) 1990 1991 manager = QueueManager( 1992 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER 1993 ) 1994 manager.start() 1995 1996 p = self.Process(target=self._putter, args=(manager.address, authkey)) 1997 p.daemon = True 1998 p.start() 1999 2000 manager2 = QueueManager2( 2001 address=manager.address, authkey=authkey, serializer=SERIALIZER 2002 ) 2003 manager2.connect() 2004 queue = manager2.get_queue() 2005 2006 # Note that xmlrpclib will deserialize object as a list not a tuple 2007 self.assertEqual(queue.get(), ['hello world', None, True, 2.25]) 2008 2009 # Because we are using xmlrpclib for serialization instead of 2010 # pickle this will cause a serialization error. 2011 self.assertRaises(Exception, queue.put, time.sleep) 2012 2013 # Make queue finalizer run before the server is stopped 2014 del queue 2015 manager.shutdown() 2016 2017class _TestManagerRestart(BaseTestCase): 2018 2019 @classmethod 2020 def _putter(cls, address, authkey): 2021 manager = QueueManager( 2022 address=address, authkey=authkey, serializer=SERIALIZER) 2023 manager.connect() 2024 queue = manager.get_queue() 2025 queue.put('hello world') 2026 2027 def test_rapid_restart(self): 2028 authkey = os.urandom(32) 2029 manager = QueueManager( 2030 address=(test.support.HOST, 0), authkey=authkey, serializer=SERIALIZER) 2031 srvr = manager.get_server() 2032 addr = srvr.address 2033 # Close the connection.Listener socket which gets opened as a part 2034 # of manager.get_server(). It's not needed for the test. 2035 srvr.listener.close() 2036 manager.start() 2037 2038 p = self.Process(target=self._putter, args=(manager.address, authkey)) 2039 p.daemon = True 2040 p.start() 2041 queue = manager.get_queue() 2042 self.assertEqual(queue.get(), 'hello world') 2043 del queue 2044 manager.shutdown() 2045 manager = QueueManager( 2046 address=addr, authkey=authkey, serializer=SERIALIZER) 2047 try: 2048 manager.start() 2049 except OSError as e: 2050 if e.errno != errno.EADDRINUSE: 2051 raise 2052 # Retry after some time, in case the old socket was lingering 2053 # (sporadic failure on buildbots) 2054 time.sleep(1.0) 2055 manager = QueueManager( 2056 address=addr, authkey=authkey, serializer=SERIALIZER) 2057 manager.shutdown() 2058 2059# 2060# 2061# 2062 2063SENTINEL = latin('') 2064 2065class _TestConnection(BaseTestCase): 2066 2067 ALLOWED_TYPES = ('processes', 'threads') 2068 2069 @classmethod 2070 def _echo(cls, conn): 2071 for msg in iter(conn.recv_bytes, SENTINEL): 2072 conn.send_bytes(msg) 2073 conn.close() 2074 2075 def test_connection(self): 2076 conn, child_conn = self.Pipe() 2077 2078 p = self.Process(target=self._echo, args=(child_conn,)) 2079 p.daemon = True 2080 p.start() 2081 2082 seq = [1, 2.25, None] 2083 msg = latin('hello world') 2084 longmsg = msg * 10 2085 arr = array.array('i', list(range(4))) 2086 2087 if self.TYPE == 'processes': 2088 self.assertEqual(type(conn.fileno()), int) 2089 2090 self.assertEqual(conn.send(seq), None) 2091 self.assertEqual(conn.recv(), seq) 2092 2093 self.assertEqual(conn.send_bytes(msg), None) 2094 self.assertEqual(conn.recv_bytes(), msg) 2095 2096 if self.TYPE == 'processes': 2097 buffer = array.array('i', [0]*10) 2098 expected = list(arr) + [0] * (10 - len(arr)) 2099 self.assertEqual(conn.send_bytes(arr), None) 2100 self.assertEqual(conn.recv_bytes_into(buffer), 2101 len(arr) * buffer.itemsize) 2102 self.assertEqual(list(buffer), expected) 2103 2104 buffer = array.array('i', [0]*10) 2105 expected = [0] * 3 + list(arr) + [0] * (10 - 3 - len(arr)) 2106 self.assertEqual(conn.send_bytes(arr), None) 2107 self.assertEqual(conn.recv_bytes_into(buffer, 3 * buffer.itemsize), 2108 len(arr) * buffer.itemsize) 2109 self.assertEqual(list(buffer), expected) 2110 2111 buffer = bytearray(latin(' ' * 40)) 2112 self.assertEqual(conn.send_bytes(longmsg), None) 2113 try: 2114 res = conn.recv_bytes_into(buffer) 2115 except multiprocessing.BufferTooShort as e: 2116 self.assertEqual(e.args, (longmsg,)) 2117 else: 2118 self.fail('expected BufferTooShort, got %s' % res) 2119 2120 poll = TimingWrapper(conn.poll) 2121 2122 self.assertEqual(poll(), False) 2123 self.assertTimingAlmostEqual(poll.elapsed, 0) 2124 2125 self.assertEqual(poll(-1), False) 2126 self.assertTimingAlmostEqual(poll.elapsed, 0) 2127 2128 self.assertEqual(poll(TIMEOUT1), False) 2129 self.assertTimingAlmostEqual(poll.elapsed, TIMEOUT1) 2130 2131 conn.send(None) 2132 time.sleep(.1) 2133 2134 self.assertEqual(poll(TIMEOUT1), True) 2135 self.assertTimingAlmostEqual(poll.elapsed, 0) 2136 2137 self.assertEqual(conn.recv(), None) 2138 2139 really_big_msg = latin('X') * (1024 * 1024 * 16) # 16Mb 2140 conn.send_bytes(really_big_msg) 2141 self.assertEqual(conn.recv_bytes(), really_big_msg) 2142 2143 conn.send_bytes(SENTINEL) # tell child to quit 2144 child_conn.close() 2145 2146 if self.TYPE == 'processes': 2147 self.assertEqual(conn.readable, True) 2148 self.assertEqual(conn.writable, True) 2149 self.assertRaises(EOFError, conn.recv) 2150 self.assertRaises(EOFError, conn.recv_bytes) 2151 2152 p.join() 2153 2154 def test_duplex_false(self): 2155 reader, writer = self.Pipe(duplex=False) 2156 self.assertEqual(writer.send(1), None) 2157 self.assertEqual(reader.recv(), 1) 2158 if self.TYPE == 'processes': 2159 self.assertEqual(reader.readable, True) 2160 self.assertEqual(reader.writable, False) 2161 self.assertEqual(writer.readable, False) 2162 self.assertEqual(writer.writable, True) 2163 self.assertRaises(OSError, reader.send, 2) 2164 self.assertRaises(OSError, writer.recv) 2165 self.assertRaises(OSError, writer.poll) 2166 2167 def test_spawn_close(self): 2168 # We test that a pipe connection can be closed by parent 2169 # process immediately after child is spawned. On Windows this 2170 # would have sometimes failed on old versions because 2171 # child_conn would be closed before the child got a chance to 2172 # duplicate it. 2173 conn, child_conn = self.Pipe() 2174 2175 p = self.Process(target=self._echo, args=(child_conn,)) 2176 p.daemon = True 2177 p.start() 2178 child_conn.close() # this might complete before child initializes 2179 2180 msg = latin('hello') 2181 conn.send_bytes(msg) 2182 self.assertEqual(conn.recv_bytes(), msg) 2183 2184 conn.send_bytes(SENTINEL) 2185 conn.close() 2186 p.join() 2187 2188 def test_sendbytes(self): 2189 if self.TYPE != 'processes': 2190 self.skipTest('test not appropriate for {}'.format(self.TYPE)) 2191 2192 msg = latin('abcdefghijklmnopqrstuvwxyz') 2193 a, b = self.Pipe() 2194 2195 a.send_bytes(msg) 2196 self.assertEqual(b.recv_bytes(), msg) 2197 2198 a.send_bytes(msg, 5) 2199 self.assertEqual(b.recv_bytes(), msg[5:]) 2200 2201 a.send_bytes(msg, 7, 8) 2202 self.assertEqual(b.recv_bytes(), msg[7:7+8]) 2203 2204 a.send_bytes(msg, 26) 2205 self.assertEqual(b.recv_bytes(), latin('')) 2206 2207 a.send_bytes(msg, 26, 0) 2208 self.assertEqual(b.recv_bytes(), latin('')) 2209 2210 self.assertRaises(ValueError, a.send_bytes, msg, 27) 2211 2212 self.assertRaises(ValueError, a.send_bytes, msg, 22, 5) 2213 2214 self.assertRaises(ValueError, a.send_bytes, msg, 26, 1) 2215 2216 self.assertRaises(ValueError, a.send_bytes, msg, -1) 2217 2218 self.assertRaises(ValueError, a.send_bytes, msg, 4, -1) 2219 2220 @classmethod 2221 def _is_fd_assigned(cls, fd): 2222 try: 2223 os.fstat(fd) 2224 except OSError as e: 2225 if e.errno == errno.EBADF: 2226 return False 2227 raise 2228 else: 2229 return True 2230 2231 @classmethod 2232 def _writefd(cls, conn, data, create_dummy_fds=False): 2233 if create_dummy_fds: 2234 for i in range(0, 256): 2235 if not cls._is_fd_assigned(i): 2236 os.dup2(conn.fileno(), i) 2237 fd = reduction.recv_handle(conn) 2238 if msvcrt: 2239 fd = msvcrt.open_osfhandle(fd, os.O_WRONLY) 2240 os.write(fd, data) 2241 os.close(fd) 2242 2243 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2244 def test_fd_transfer(self): 2245 if self.TYPE != 'processes': 2246 self.skipTest("only makes sense with processes") 2247 conn, child_conn = self.Pipe(duplex=True) 2248 2249 p = self.Process(target=self._writefd, args=(child_conn, b"foo")) 2250 p.daemon = True 2251 p.start() 2252 self.addCleanup(test.support.unlink, test.support.TESTFN) 2253 with open(test.support.TESTFN, "wb") as f: 2254 fd = f.fileno() 2255 if msvcrt: 2256 fd = msvcrt.get_osfhandle(fd) 2257 reduction.send_handle(conn, fd, p.pid) 2258 p.join() 2259 with open(test.support.TESTFN, "rb") as f: 2260 self.assertEqual(f.read(), b"foo") 2261 2262 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2263 @unittest.skipIf(sys.platform == "win32", 2264 "test semantics don't make sense on Windows") 2265 @unittest.skipIf(MAXFD <= 256, 2266 "largest assignable fd number is too small") 2267 @unittest.skipUnless(hasattr(os, "dup2"), 2268 "test needs os.dup2()") 2269 def test_large_fd_transfer(self): 2270 # With fd > 256 (issue #11657) 2271 if self.TYPE != 'processes': 2272 self.skipTest("only makes sense with processes") 2273 conn, child_conn = self.Pipe(duplex=True) 2274 2275 p = self.Process(target=self._writefd, args=(child_conn, b"bar", True)) 2276 p.daemon = True 2277 p.start() 2278 self.addCleanup(test.support.unlink, test.support.TESTFN) 2279 with open(test.support.TESTFN, "wb") as f: 2280 fd = f.fileno() 2281 for newfd in range(256, MAXFD): 2282 if not self._is_fd_assigned(newfd): 2283 break 2284 else: 2285 self.fail("could not find an unassigned large file descriptor") 2286 os.dup2(fd, newfd) 2287 try: 2288 reduction.send_handle(conn, newfd, p.pid) 2289 finally: 2290 os.close(newfd) 2291 p.join() 2292 with open(test.support.TESTFN, "rb") as f: 2293 self.assertEqual(f.read(), b"bar") 2294 2295 @classmethod 2296 def _send_data_without_fd(self, conn): 2297 os.write(conn.fileno(), b"\0") 2298 2299 @unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2300 @unittest.skipIf(sys.platform == "win32", "doesn't make sense on Windows") 2301 def test_missing_fd_transfer(self): 2302 # Check that exception is raised when received data is not 2303 # accompanied by a file descriptor in ancillary data. 2304 if self.TYPE != 'processes': 2305 self.skipTest("only makes sense with processes") 2306 conn, child_conn = self.Pipe(duplex=True) 2307 2308 p = self.Process(target=self._send_data_without_fd, args=(child_conn,)) 2309 p.daemon = True 2310 p.start() 2311 self.assertRaises(RuntimeError, reduction.recv_handle, conn) 2312 p.join() 2313 2314 def test_context(self): 2315 a, b = self.Pipe() 2316 2317 with a, b: 2318 a.send(1729) 2319 self.assertEqual(b.recv(), 1729) 2320 if self.TYPE == 'processes': 2321 self.assertFalse(a.closed) 2322 self.assertFalse(b.closed) 2323 2324 if self.TYPE == 'processes': 2325 self.assertTrue(a.closed) 2326 self.assertTrue(b.closed) 2327 self.assertRaises(OSError, a.recv) 2328 self.assertRaises(OSError, b.recv) 2329 2330class _TestListener(BaseTestCase): 2331 2332 ALLOWED_TYPES = ('processes',) 2333 2334 def test_multiple_bind(self): 2335 for family in self.connection.families: 2336 l = self.connection.Listener(family=family) 2337 self.addCleanup(l.close) 2338 self.assertRaises(OSError, self.connection.Listener, 2339 l.address, family) 2340 2341 def test_context(self): 2342 with self.connection.Listener() as l: 2343 with self.connection.Client(l.address) as c: 2344 with l.accept() as d: 2345 c.send(1729) 2346 self.assertEqual(d.recv(), 1729) 2347 2348 if self.TYPE == 'processes': 2349 self.assertRaises(OSError, l.accept) 2350 2351class _TestListenerClient(BaseTestCase): 2352 2353 ALLOWED_TYPES = ('processes', 'threads') 2354 2355 @classmethod 2356 def _test(cls, address): 2357 conn = cls.connection.Client(address) 2358 conn.send('hello') 2359 conn.close() 2360 2361 def test_listener_client(self): 2362 for family in self.connection.families: 2363 l = self.connection.Listener(family=family) 2364 p = self.Process(target=self._test, args=(l.address,)) 2365 p.daemon = True 2366 p.start() 2367 conn = l.accept() 2368 self.assertEqual(conn.recv(), 'hello') 2369 p.join() 2370 l.close() 2371 2372 def test_issue14725(self): 2373 l = self.connection.Listener() 2374 p = self.Process(target=self._test, args=(l.address,)) 2375 p.daemon = True 2376 p.start() 2377 time.sleep(1) 2378 # On Windows the client process should by now have connected, 2379 # written data and closed the pipe handle by now. This causes 2380 # ConnectNamdedPipe() to fail with ERROR_NO_DATA. See Issue 2381 # 14725. 2382 conn = l.accept() 2383 self.assertEqual(conn.recv(), 'hello') 2384 conn.close() 2385 p.join() 2386 l.close() 2387 2388 def test_issue16955(self): 2389 for fam in self.connection.families: 2390 l = self.connection.Listener(family=fam) 2391 c = self.connection.Client(l.address) 2392 a = l.accept() 2393 a.send_bytes(b"hello") 2394 self.assertTrue(c.poll(1)) 2395 a.close() 2396 c.close() 2397 l.close() 2398 2399class _TestPoll(BaseTestCase): 2400 2401 ALLOWED_TYPES = ('processes', 'threads') 2402 2403 def test_empty_string(self): 2404 a, b = self.Pipe() 2405 self.assertEqual(a.poll(), False) 2406 b.send_bytes(b'') 2407 self.assertEqual(a.poll(), True) 2408 self.assertEqual(a.poll(), True) 2409 2410 @classmethod 2411 def _child_strings(cls, conn, strings): 2412 for s in strings: 2413 time.sleep(0.1) 2414 conn.send_bytes(s) 2415 conn.close() 2416 2417 def test_strings(self): 2418 strings = (b'hello', b'', b'a', b'b', b'', b'bye', b'', b'lop') 2419 a, b = self.Pipe() 2420 p = self.Process(target=self._child_strings, args=(b, strings)) 2421 p.start() 2422 2423 for s in strings: 2424 for i in range(200): 2425 if a.poll(0.01): 2426 break 2427 x = a.recv_bytes() 2428 self.assertEqual(s, x) 2429 2430 p.join() 2431 2432 @classmethod 2433 def _child_boundaries(cls, r): 2434 # Polling may "pull" a message in to the child process, but we 2435 # don't want it to pull only part of a message, as that would 2436 # corrupt the pipe for any other processes which might later 2437 # read from it. 2438 r.poll(5) 2439 2440 def test_boundaries(self): 2441 r, w = self.Pipe(False) 2442 p = self.Process(target=self._child_boundaries, args=(r,)) 2443 p.start() 2444 time.sleep(2) 2445 L = [b"first", b"second"] 2446 for obj in L: 2447 w.send_bytes(obj) 2448 w.close() 2449 p.join() 2450 self.assertIn(r.recv_bytes(), L) 2451 2452 @classmethod 2453 def _child_dont_merge(cls, b): 2454 b.send_bytes(b'a') 2455 b.send_bytes(b'b') 2456 b.send_bytes(b'cd') 2457 2458 def test_dont_merge(self): 2459 a, b = self.Pipe() 2460 self.assertEqual(a.poll(0.0), False) 2461 self.assertEqual(a.poll(0.1), False) 2462 2463 p = self.Process(target=self._child_dont_merge, args=(b,)) 2464 p.start() 2465 2466 self.assertEqual(a.recv_bytes(), b'a') 2467 self.assertEqual(a.poll(1.0), True) 2468 self.assertEqual(a.poll(1.0), True) 2469 self.assertEqual(a.recv_bytes(), b'b') 2470 self.assertEqual(a.poll(1.0), True) 2471 self.assertEqual(a.poll(1.0), True) 2472 self.assertEqual(a.poll(0.0), True) 2473 self.assertEqual(a.recv_bytes(), b'cd') 2474 2475 p.join() 2476 2477# 2478# Test of sending connection and socket objects between processes 2479# 2480 2481@unittest.skipUnless(HAS_REDUCTION, "test needs multiprocessing.reduction") 2482class _TestPicklingConnections(BaseTestCase): 2483 2484 ALLOWED_TYPES = ('processes',) 2485 2486 @classmethod 2487 def tearDownClass(cls): 2488 from multiprocess.reduction import resource_sharer 2489 resource_sharer.stop(timeout=5) 2490 2491 @classmethod 2492 def _listener(cls, conn, families): 2493 for fam in families: 2494 l = cls.connection.Listener(family=fam) 2495 conn.send(l.address) 2496 new_conn = l.accept() 2497 conn.send(new_conn) 2498 new_conn.close() 2499 l.close() 2500 2501 l = socket.socket() 2502 l.bind((test.support.HOST, 0)) 2503 l.listen(1) 2504 conn.send(l.getsockname()) 2505 new_conn, addr = l.accept() 2506 conn.send(new_conn) 2507 new_conn.close() 2508 l.close() 2509 2510 conn.recv() 2511 2512 @classmethod 2513 def _remote(cls, conn): 2514 for (address, msg) in iter(conn.recv, None): 2515 client = cls.connection.Client(address) 2516 client.send(msg.upper()) 2517 client.close() 2518 2519 address, msg = conn.recv() 2520 client = socket.socket() 2521 client.connect(address) 2522 client.sendall(msg.upper()) 2523 client.close() 2524 2525 conn.close() 2526 2527 def test_pickling(self): 2528 families = self.connection.families 2529 2530 lconn, lconn0 = self.Pipe() 2531 lp = self.Process(target=self._listener, args=(lconn0, families)) 2532 lp.daemon = True 2533 lp.start() 2534 lconn0.close() 2535 2536 rconn, rconn0 = self.Pipe() 2537 rp = self.Process(target=self._remote, args=(rconn0,)) 2538 rp.daemon = True 2539 rp.start() 2540 rconn0.close() 2541 2542 for fam in families: 2543 msg = ('This connection uses family %s' % fam).encode('ascii') 2544 address = lconn.recv() 2545 rconn.send((address, msg)) 2546 new_conn = lconn.recv() 2547 self.assertEqual(new_conn.recv(), msg.upper()) 2548 2549 rconn.send(None) 2550 2551 msg = latin('This connection uses a normal socket') 2552 address = lconn.recv() 2553 rconn.send((address, msg)) 2554 new_conn = lconn.recv() 2555 buf = [] 2556 while True: 2557 s = new_conn.recv(100) 2558 if not s: 2559 break 2560 buf.append(s) 2561 buf = b''.join(buf) 2562 self.assertEqual(buf, msg.upper()) 2563 new_conn.close() 2564 2565 lconn.send(None) 2566 2567 rconn.close() 2568 lconn.close() 2569 2570 lp.join() 2571 rp.join() 2572 2573 @classmethod 2574 def child_access(cls, conn): 2575 w = conn.recv() 2576 w.send('all is well') 2577 w.close() 2578 2579 r = conn.recv() 2580 msg = r.recv() 2581 conn.send(msg*2) 2582 2583 conn.close() 2584 2585 def test_access(self): 2586 # On Windows, if we do not specify a destination pid when 2587 # using DupHandle then we need to be careful to use the 2588 # correct access flags for DuplicateHandle(), or else 2589 # DupHandle.detach() will raise PermissionError. For example, 2590 # for a read only pipe handle we should use 2591 # access=FILE_GENERIC_READ. (Unfortunately 2592 # DUPLICATE_SAME_ACCESS does not work.) 2593 conn, child_conn = self.Pipe() 2594 p = self.Process(target=self.child_access, args=(child_conn,)) 2595 p.daemon = True 2596 p.start() 2597 child_conn.close() 2598 2599 r, w = self.Pipe(duplex=False) 2600 conn.send(w) 2601 w.close() 2602 self.assertEqual(r.recv(), 'all is well') 2603 r.close() 2604 2605 r, w = self.Pipe(duplex=False) 2606 conn.send(r) 2607 r.close() 2608 w.send('foobar') 2609 w.close() 2610 self.assertEqual(conn.recv(), 'foobar'*2) 2611 2612# 2613# 2614# 2615 2616class _TestHeap(BaseTestCase): 2617 2618 ALLOWED_TYPES = ('processes',) 2619 2620 def test_heap(self): 2621 iterations = 5000 2622 maxblocks = 50 2623 blocks = [] 2624 2625 # create and destroy lots of blocks of different sizes 2626 for i in range(iterations): 2627 size = int(random.lognormvariate(0, 1) * 1000) 2628 b = multiprocessing.heap.BufferWrapper(size) 2629 blocks.append(b) 2630 if len(blocks) > maxblocks: 2631 i = random.randrange(maxblocks) 2632 del blocks[i] 2633 2634 # get the heap object 2635 heap = multiprocessing.heap.BufferWrapper._heap 2636 2637 # verify the state of the heap 2638 all = [] 2639 occupied = 0 2640 heap._lock.acquire() 2641 self.addCleanup(heap._lock.release) 2642 for L in list(heap._len_to_seq.values()): 2643 for arena, start, stop in L: 2644 all.append((heap._arenas.index(arena), start, stop, 2645 stop-start, 'free')) 2646 for arena, start, stop in heap._allocated_blocks: 2647 all.append((heap._arenas.index(arena), start, stop, 2648 stop-start, 'occupied')) 2649 occupied += (stop-start) 2650 2651 all.sort() 2652 2653 for i in range(len(all)-1): 2654 (arena, start, stop) = all[i][:3] 2655 (narena, nstart, nstop) = all[i+1][:3] 2656 self.assertTrue((arena != narena and nstart == 0) or 2657 (stop == nstart)) 2658 2659 def test_free_from_gc(self): 2660 # Check that freeing of blocks by the garbage collector doesn't deadlock 2661 # (issue #12352). 2662 # Make sure the GC is enabled, and set lower collection thresholds to 2663 # make collections more frequent (and increase the probability of 2664 # deadlock). 2665 if not gc.isenabled(): 2666 gc.enable() 2667 self.addCleanup(gc.disable) 2668 thresholds = gc.get_threshold() 2669 self.addCleanup(gc.set_threshold, *thresholds) 2670 gc.set_threshold(10) 2671 2672 # perform numerous block allocations, with cyclic references to make 2673 # sure objects are collected asynchronously by the gc 2674 for i in range(5000): 2675 a = multiprocessing.heap.BufferWrapper(1) 2676 b = multiprocessing.heap.BufferWrapper(1) 2677 # circular references 2678 a.buddy = b 2679 b.buddy = a 2680 2681# 2682# 2683# 2684 2685class _Foo(Structure): 2686 _fields_ = [ 2687 ('x', c_int), 2688 ('y', c_double) 2689 ] 2690 2691class _TestSharedCTypes(BaseTestCase): 2692 2693 ALLOWED_TYPES = ('processes',) 2694 2695 def setUp(self): 2696 if not HAS_SHAREDCTYPES: 2697 self.skipTest("requires multiprocessing.sharedctypes") 2698 2699 @classmethod 2700 def _double(cls, x, y, foo, arr, string): 2701 x.value *= 2 2702 y.value *= 2 2703 foo.x *= 2 2704 foo.y *= 2 2705 string.value *= 2 2706 for i in range(len(arr)): 2707 arr[i] *= 2 2708 2709 def test_sharedctypes(self, lock=False): 2710 x = Value('i', 7, lock=lock) 2711 y = Value(c_double, 1.0/3.0, lock=lock) 2712 foo = Value(_Foo, 3, 2, lock=lock) 2713 arr = self.Array('d', list(range(10)), lock=lock) 2714 string = self.Array('c', 20, lock=lock) 2715 string.value = latin('hello') 2716 2717 p = self.Process(target=self._double, args=(x, y, foo, arr, string)) 2718 p.daemon = True 2719 p.start() 2720 p.join() 2721 2722 self.assertEqual(x.value, 14) 2723 self.assertAlmostEqual(y.value, 2.0/3.0) 2724 self.assertEqual(foo.x, 6) 2725 self.assertAlmostEqual(foo.y, 4.0) 2726 for i in range(10): 2727 self.assertAlmostEqual(arr[i], i*2) 2728 self.assertEqual(string.value, latin('hellohello')) 2729 2730 def test_synchronize(self): 2731 self.test_sharedctypes(lock=True) 2732 2733 def test_copy(self): 2734 foo = _Foo(2, 5.0) 2735 bar = copy(foo) 2736 foo.x = 0 2737 foo.y = 0 2738 self.assertEqual(bar.x, 2) 2739 self.assertAlmostEqual(bar.y, 5.0) 2740 2741# 2742# 2743# 2744 2745class _TestFinalize(BaseTestCase): 2746 2747 ALLOWED_TYPES = ('processes',) 2748 2749 @classmethod 2750 def _test_finalize(cls, conn): 2751 class Foo(object): 2752 pass 2753 2754 a = Foo() 2755 util.Finalize(a, conn.send, args=('a',)) 2756 del a # triggers callback for a 2757 2758 b = Foo() 2759 close_b = util.Finalize(b, conn.send, args=('b',)) 2760 close_b() # triggers callback for b 2761 close_b() # does nothing because callback has already been called 2762 del b # does nothing because callback has already been called 2763 2764 c = Foo() 2765 util.Finalize(c, conn.send, args=('c',)) 2766 2767 d10 = Foo() 2768 util.Finalize(d10, conn.send, args=('d10',), exitpriority=1) 2769 2770 d01 = Foo() 2771 util.Finalize(d01, conn.send, args=('d01',), exitpriority=0) 2772 d02 = Foo() 2773 util.Finalize(d02, conn.send, args=('d02',), exitpriority=0) 2774 d03 = Foo() 2775 util.Finalize(d03, conn.send, args=('d03',), exitpriority=0) 2776 2777 util.Finalize(None, conn.send, args=('e',), exitpriority=-10) 2778 2779 util.Finalize(None, conn.send, args=('STOP',), exitpriority=-100) 2780 2781 # call multiprocessing's cleanup function then exit process without 2782 # garbage collecting locals 2783 util._exit_function() 2784 conn.close() 2785 os._exit(0) 2786 2787 def test_finalize(self): 2788 conn, child_conn = self.Pipe() 2789 2790 p = self.Process(target=self._test_finalize, args=(child_conn,)) 2791 p.daemon = True 2792 p.start() 2793 p.join() 2794 2795 result = [obj for obj in iter(conn.recv, 'STOP')] 2796 self.assertEqual(result, ['a', 'b', 'd10', 'd03', 'd02', 'd01', 'e']) 2797 2798# 2799# Test that from ... import * works for each module 2800# 2801 2802class _TestImportStar(BaseTestCase): 2803 2804 ALLOWED_TYPES = ('processes',) 2805 2806 def test_import(self): 2807 modules = [ 2808 'multiprocess', 'multiprocess.connection', 2809 'multiprocess.heap', 'multiprocess.managers', 2810 'multiprocess.pool', 'multiprocess.process', 2811 'multiprocess.synchronize', 'multiprocess.util' 2812 ] 2813 2814 if HAS_REDUCTION: 2815 modules.append('multiprocess.reduction') 2816 2817 if c_int is not None: 2818 # This module requires _ctypes 2819 modules.append('multiprocess.sharedctypes') 2820 2821 for name in modules: 2822 __import__(name) 2823 mod = sys.modules[name] 2824 2825 for attr in getattr(mod, '__all__', ()): 2826 self.assertTrue( 2827 hasattr(mod, attr), 2828 '%r does not have attribute %r' % (mod, attr) 2829 ) 2830 2831# 2832# Quick test that logging works -- does not test logging output 2833# 2834 2835class _TestLogging(BaseTestCase): 2836 2837 ALLOWED_TYPES = ('processes',) 2838 2839 def test_enable_logging(self): 2840 logger = multiprocessing.get_logger() 2841 logger.setLevel(util.SUBWARNING) 2842 self.assertTrue(logger is not None) 2843 logger.debug('this will not be printed') 2844 logger.info('nor will this') 2845 logger.setLevel(LOG_LEVEL) 2846 2847 @classmethod 2848 def _test_level(cls, conn): 2849 logger = multiprocessing.get_logger() 2850 conn.send(logger.getEffectiveLevel()) 2851 2852 def test_level(self): 2853 LEVEL1 = 32 2854 LEVEL2 = 37 2855 2856 logger = multiprocessing.get_logger() 2857 root_logger = logging.getLogger() 2858 root_level = root_logger.level 2859 2860 reader, writer = multiprocessing.Pipe(duplex=False) 2861 2862 logger.setLevel(LEVEL1) 2863 p = self.Process(target=self._test_level, args=(writer,)) 2864 p.daemon = True 2865 p.start() 2866 self.assertEqual(LEVEL1, reader.recv()) 2867 2868 logger.setLevel(logging.NOTSET) 2869 root_logger.setLevel(LEVEL2) 2870 p = self.Process(target=self._test_level, args=(writer,)) 2871 p.daemon = True 2872 p.start() 2873 self.assertEqual(LEVEL2, reader.recv()) 2874 2875 root_logger.setLevel(root_level) 2876 logger.setLevel(level=LOG_LEVEL) 2877 2878 2879# class _TestLoggingProcessName(BaseTestCase): 2880# 2881# def handle(self, record): 2882# assert record.processName == multiprocessing.current_process().name 2883# self.__handled = True 2884# 2885# def test_logging(self): 2886# handler = logging.Handler() 2887# handler.handle = self.handle 2888# self.__handled = False 2889# # Bypass getLogger() and side-effects 2890# logger = logging.getLoggerClass()( 2891# 'multiprocessing.test.TestLoggingProcessName') 2892# logger.addHandler(handler) 2893# logger.propagate = False 2894# 2895# logger.warn('foo') 2896# assert self.__handled 2897 2898# 2899# Check that Process.join() retries if os.waitpid() fails with EINTR 2900# 2901 2902class _TestPollEintr(BaseTestCase): 2903 2904 ALLOWED_TYPES = ('processes',) 2905 2906 @classmethod 2907 def _killer(cls, pid): 2908 time.sleep(0.5) 2909 os.kill(pid, signal.SIGUSR1) 2910 2911 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 2912 def test_poll_eintr(self): 2913 got_signal = [False] 2914 def record(*args): 2915 got_signal[0] = True 2916 pid = os.getpid() 2917 oldhandler = signal.signal(signal.SIGUSR1, record) 2918 try: 2919 killer = self.Process(target=self._killer, args=(pid,)) 2920 killer.start() 2921 p = self.Process(target=time.sleep, args=(1,)) 2922 p.start() 2923 p.join() 2924 self.assertTrue(got_signal[0]) 2925 self.assertEqual(p.exitcode, 0) 2926 killer.join() 2927 finally: 2928 signal.signal(signal.SIGUSR1, oldhandler) 2929 2930# 2931# Test to verify handle verification, see issue 3321 2932# 2933 2934class TestInvalidHandle(unittest.TestCase): 2935 2936 @unittest.skipIf(WIN32, "skipped on Windows") 2937 def test_invalid_handles(self): 2938 conn = multiprocessing.connection.Connection(44977608) 2939 try: 2940 self.assertRaises((ValueError, OSError), conn.poll) 2941 finally: 2942 # Hack private attribute _handle to avoid printing an error 2943 # in conn.__del__ 2944 conn._handle = None 2945 self.assertRaises((ValueError, OSError), 2946 multiprocessing.connection.Connection, -1) 2947 2948# 2949# Functions used to create test cases from the base ones in this module 2950# 2951 2952def create_test_cases(Mixin, type): 2953 result = {} 2954 glob = globals() 2955 Type = type.capitalize() 2956 ALL_TYPES = {'processes', 'threads', 'manager'} 2957 2958 for name in list(glob.keys()): 2959 if name.startswith('_Test'): 2960 base = glob[name] 2961 assert set(base.ALLOWED_TYPES) <= ALL_TYPES, set(base.ALLOWED_TYPES) 2962 if type in base.ALLOWED_TYPES: 2963 newname = 'With' + Type + name[1:] 2964 class Temp(base, Mixin, unittest.TestCase): 2965 pass 2966 result[newname] = Temp 2967 Temp.__name__ = Temp.__qualname__ = newname 2968 Temp.__module__ = Mixin.__module__ 2969 return result 2970 2971# 2972# Create test cases 2973# 2974 2975class ProcessesMixin(object): 2976 TYPE = 'processes' 2977 Process = multiprocessing.Process 2978 connection = multiprocessing.connection 2979 current_process = staticmethod(multiprocessing.current_process) 2980 active_children = staticmethod(multiprocessing.active_children) 2981 Pool = staticmethod(multiprocessing.Pool) 2982 Pipe = staticmethod(multiprocessing.Pipe) 2983 Queue = staticmethod(multiprocessing.Queue) 2984 JoinableQueue = staticmethod(multiprocessing.JoinableQueue) 2985 Lock = staticmethod(multiprocessing.Lock) 2986 RLock = staticmethod(multiprocessing.RLock) 2987 Semaphore = staticmethod(multiprocessing.Semaphore) 2988 BoundedSemaphore = staticmethod(multiprocessing.BoundedSemaphore) 2989 Condition = staticmethod(multiprocessing.Condition) 2990 Event = staticmethod(multiprocessing.Event) 2991 Barrier = staticmethod(multiprocessing.Barrier) 2992 Value = staticmethod(multiprocessing.Value) 2993 Array = staticmethod(multiprocessing.Array) 2994 RawValue = staticmethod(multiprocessing.RawValue) 2995 RawArray = staticmethod(multiprocessing.RawArray) 2996 2997testcases_processes = create_test_cases(ProcessesMixin, type='processes') 2998globals().update(testcases_processes) 2999 3000 3001class ManagerMixin(object): 3002 TYPE = 'manager' 3003 Process = multiprocessing.Process 3004 Queue = property(operator.attrgetter('manager.Queue')) 3005 JoinableQueue = property(operator.attrgetter('manager.JoinableQueue')) 3006 Lock = property(operator.attrgetter('manager.Lock')) 3007 RLock = property(operator.attrgetter('manager.RLock')) 3008 Semaphore = property(operator.attrgetter('manager.Semaphore')) 3009 BoundedSemaphore = property(operator.attrgetter('manager.BoundedSemaphore')) 3010 Condition = property(operator.attrgetter('manager.Condition')) 3011 Event = property(operator.attrgetter('manager.Event')) 3012 Barrier = property(operator.attrgetter('manager.Barrier')) 3013 Value = property(operator.attrgetter('manager.Value')) 3014 Array = property(operator.attrgetter('manager.Array')) 3015 list = property(operator.attrgetter('manager.list')) 3016 dict = property(operator.attrgetter('manager.dict')) 3017 Namespace = property(operator.attrgetter('manager.Namespace')) 3018 3019 @classmethod 3020 def Pool(cls, *args, **kwds): 3021 return cls.manager.Pool(*args, **kwds) 3022 3023 @classmethod 3024 def setUpClass(cls): 3025 cls.manager = multiprocessing.Manager() 3026 3027 @classmethod 3028 def tearDownClass(cls): 3029 # only the manager process should be returned by active_children() 3030 # but this can take a bit on slow machines, so wait a few seconds 3031 # if there are other children too (see #17395) 3032 t = 0.01 3033 while len(multiprocessing.active_children()) > 1 and t < 5: 3034 time.sleep(t) 3035 t *= 2 3036 gc.collect() # do garbage collection 3037 if cls.manager._number_of_objects() != 0: 3038 # This is not really an error since some tests do not 3039 # ensure that all processes which hold a reference to a 3040 # managed object have been joined. 3041 print('Shared objects which still exist at manager shutdown:') 3042 print(cls.manager._debug_info()) 3043 cls.manager.shutdown() 3044 cls.manager.join() 3045 cls.manager = None 3046 3047testcases_manager = create_test_cases(ManagerMixin, type='manager') 3048globals().update(testcases_manager) 3049 3050 3051class ThreadsMixin(object): 3052 TYPE = 'threads' 3053 Process = multiprocessing.dummy.Process 3054 connection = multiprocessing.dummy.connection 3055 current_process = staticmethod(multiprocessing.dummy.current_process) 3056 active_children = staticmethod(multiprocessing.dummy.active_children) 3057 Pool = staticmethod(multiprocessing.Pool) 3058 Pipe = staticmethod(multiprocessing.dummy.Pipe) 3059 Queue = staticmethod(multiprocessing.dummy.Queue) 3060 JoinableQueue = staticmethod(multiprocessing.dummy.JoinableQueue) 3061 Lock = staticmethod(multiprocessing.dummy.Lock) 3062 RLock = staticmethod(multiprocessing.dummy.RLock) 3063 Semaphore = staticmethod(multiprocessing.dummy.Semaphore) 3064 BoundedSemaphore = staticmethod(multiprocessing.dummy.BoundedSemaphore) 3065 Condition = staticmethod(multiprocessing.dummy.Condition) 3066 Event = staticmethod(multiprocessing.dummy.Event) 3067 Barrier = staticmethod(multiprocessing.dummy.Barrier) 3068 Value = staticmethod(multiprocessing.dummy.Value) 3069 Array = staticmethod(multiprocessing.dummy.Array) 3070 3071testcases_threads = create_test_cases(ThreadsMixin, type='threads') 3072globals().update(testcases_threads) 3073 3074 3075class OtherTest(unittest.TestCase): 3076 # TODO: add more tests for deliver/answer challenge. 3077 def test_deliver_challenge_auth_failure(self): 3078 class _FakeConnection(object): 3079 def recv_bytes(self, size): 3080 return b'something bogus' 3081 def send_bytes(self, data): 3082 pass 3083 self.assertRaises(multiprocessing.AuthenticationError, 3084 multiprocessing.connection.deliver_challenge, 3085 _FakeConnection(), b'abc') 3086 3087 def test_answer_challenge_auth_failure(self): 3088 class _FakeConnection(object): 3089 def __init__(self): 3090 self.count = 0 3091 def recv_bytes(self, size): 3092 self.count += 1 3093 if self.count == 1: 3094 return multiprocessing.connection.CHALLENGE 3095 elif self.count == 2: 3096 return b'something bogus' 3097 return b'' 3098 def send_bytes(self, data): 3099 pass 3100 self.assertRaises(multiprocessing.AuthenticationError, 3101 multiprocessing.connection.answer_challenge, 3102 _FakeConnection(), b'abc') 3103 3104# 3105# Test Manager.start()/Pool.__init__() initializer feature - see issue 5585 3106# 3107 3108def initializer(ns): 3109 ns.test += 1 3110 3111class TestInitializers(unittest.TestCase): 3112 def setUp(self): 3113 self.mgr = multiprocessing.Manager() 3114 self.ns = self.mgr.Namespace() 3115 self.ns.test = 0 3116 3117 def tearDown(self): 3118 self.mgr.shutdown() 3119 self.mgr.join() 3120 3121 def test_manager_initializer(self): 3122 m = multiprocessing.managers.SyncManager() 3123 self.assertRaises(TypeError, m.start, 1) 3124 m.start(initializer, (self.ns,)) 3125 self.assertEqual(self.ns.test, 1) 3126 m.shutdown() 3127 m.join() 3128 3129 def test_pool_initializer(self): 3130 self.assertRaises(TypeError, multiprocessing.Pool, initializer=1) 3131 p = multiprocessing.Pool(1, initializer, (self.ns,)) 3132 p.close() 3133 p.join() 3134 self.assertEqual(self.ns.test, 1) 3135 3136# 3137# Issue 5155, 5313, 5331: Test process in processes 3138# Verifies os.close(sys.stdin.fileno) vs. sys.stdin.close() behavior 3139# 3140 3141def _this_sub_process(q): 3142 try: 3143 item = q.get(block=False) 3144 except pyqueue.Empty: 3145 pass 3146 3147def _test_process(q): 3148 queue = multiprocessing.Queue() 3149 subProc = multiprocessing.Process(target=_this_sub_process, args=(queue,)) 3150 subProc.daemon = True 3151 subProc.start() 3152 subProc.join() 3153 3154def _afunc(x): 3155 return x*x 3156 3157def pool_in_process(): 3158 pool = multiprocessing.Pool(processes=4) 3159 x = pool.map(_afunc, [1, 2, 3, 4, 5, 6, 7]) 3160 pool.close() 3161 pool.join() 3162 3163class _file_like(object): 3164 def __init__(self, delegate): 3165 self._delegate = delegate 3166 self._pid = None 3167 3168 @property 3169 def cache(self): 3170 pid = os.getpid() 3171 # There are no race conditions since fork keeps only the running thread 3172 if pid != self._pid: 3173 self._pid = pid 3174 self._cache = [] 3175 return self._cache 3176 3177 def write(self, data): 3178 self.cache.append(data) 3179 3180 def flush(self): 3181 self._delegate.write(''.join(self.cache)) 3182 self._cache = [] 3183 3184class TestStdinBadfiledescriptor(unittest.TestCase): 3185 3186 def test_queue_in_process(self): 3187 queue = multiprocessing.Queue() 3188 proc = multiprocessing.Process(target=_test_process, args=(queue,)) 3189 proc.start() 3190 proc.join() 3191 3192 def test_pool_in_process(self): 3193 p = multiprocessing.Process(target=pool_in_process) 3194 p.start() 3195 p.join() 3196 3197 def test_flushing(self): 3198 sio = io.StringIO() 3199 flike = _file_like(sio) 3200 flike.write('foo') 3201 proc = multiprocessing.Process(target=lambda: flike.flush()) 3202 flike.flush() 3203 assert sio.getvalue() == 'foo' 3204 3205 3206class TestWait(unittest.TestCase): 3207 3208 @classmethod 3209 def _child_test_wait(cls, w, slow): 3210 for i in range(10): 3211 if slow: 3212 time.sleep(random.random()*0.1) 3213 w.send((i, os.getpid())) 3214 w.close() 3215 3216 def test_wait(self, slow=False): 3217 from multiprocess.connection import wait 3218 readers = [] 3219 procs = [] 3220 messages = [] 3221 3222 for i in range(4): 3223 r, w = multiprocessing.Pipe(duplex=False) 3224 p = multiprocessing.Process(target=self._child_test_wait, args=(w, slow)) 3225 p.daemon = True 3226 p.start() 3227 w.close() 3228 readers.append(r) 3229 procs.append(p) 3230 self.addCleanup(p.join) 3231 3232 while readers: 3233 for r in wait(readers): 3234 try: 3235 msg = r.recv() 3236 except EOFError: 3237 readers.remove(r) 3238 r.close() 3239 else: 3240 messages.append(msg) 3241 3242 messages.sort() 3243 expected = sorted((i, p.pid) for i in range(10) for p in procs) 3244 self.assertEqual(messages, expected) 3245 3246 @classmethod 3247 def _child_test_wait_socket(cls, address, slow): 3248 s = socket.socket() 3249 s.connect(address) 3250 for i in range(10): 3251 if slow: 3252 time.sleep(random.random()*0.1) 3253 s.sendall(('%s\n' % i).encode('ascii')) 3254 s.close() 3255 3256 def test_wait_socket(self, slow=False): 3257 from multiprocess.connection import wait 3258 l = socket.socket() 3259 l.bind((test.support.HOST, 0)) 3260 l.listen(4) 3261 addr = l.getsockname() 3262 readers = [] 3263 procs = [] 3264 dic = {} 3265 3266 for i in range(4): 3267 p = multiprocessing.Process(target=self._child_test_wait_socket, 3268 args=(addr, slow)) 3269 p.daemon = True 3270 p.start() 3271 procs.append(p) 3272 self.addCleanup(p.join) 3273 3274 for i in range(4): 3275 r, _ = l.accept() 3276 readers.append(r) 3277 dic[r] = [] 3278 l.close() 3279 3280 while readers: 3281 for r in wait(readers): 3282 msg = r.recv(32) 3283 if not msg: 3284 readers.remove(r) 3285 r.close() 3286 else: 3287 dic[r].append(msg) 3288 3289 expected = ''.join('%s\n' % i for i in range(10)).encode('ascii') 3290 for v in dic.values(): 3291 self.assertEqual(b''.join(v), expected) 3292 3293 def test_wait_slow(self): 3294 self.test_wait(True) 3295 3296 def test_wait_socket_slow(self): 3297 self.test_wait_socket(True) 3298 3299 def test_wait_timeout(self): 3300 from multiprocess.connection import wait 3301 3302 expected = 5 3303 a, b = multiprocessing.Pipe() 3304 3305 start = time.time() 3306 res = wait([a, b], expected) 3307 delta = time.time() - start 3308 3309 self.assertEqual(res, []) 3310 self.assertLess(delta, expected * 2) 3311 self.assertGreater(delta, expected * 0.5) 3312 3313 b.send(None) 3314 3315 start = time.time() 3316 res = wait([a, b], 20) 3317 delta = time.time() - start 3318 3319 self.assertEqual(res, [a]) 3320 self.assertLess(delta, 0.4) 3321 3322 @classmethod 3323 def signal_and_sleep(cls, sem, period): 3324 sem.release() 3325 time.sleep(period) 3326 3327 def test_wait_integer(self): 3328 from multiprocess.connection import wait 3329 3330 expected = 3 3331 sorted_ = lambda l: sorted(l, key=lambda x: id(x)) 3332 sem = multiprocessing.Semaphore(0) 3333 a, b = multiprocessing.Pipe() 3334 p = multiprocessing.Process(target=self.signal_and_sleep, 3335 args=(sem, expected)) 3336 3337 p.start() 3338 self.assertIsInstance(p.sentinel, int) 3339 self.assertTrue(sem.acquire(timeout=20)) 3340 3341 start = time.time() 3342 res = wait([a, p.sentinel, b], expected + 20) 3343 delta = time.time() - start 3344 3345 self.assertEqual(res, [p.sentinel]) 3346 self.assertLess(delta, expected + 2) 3347 self.assertGreater(delta, expected - 2) 3348 3349 a.send(None) 3350 3351 start = time.time() 3352 res = wait([a, p.sentinel, b], 20) 3353 delta = time.time() - start 3354 3355 self.assertEqual(sorted_(res), sorted_([p.sentinel, b])) 3356 self.assertLess(delta, 0.4) 3357 3358 b.send(None) 3359 3360 start = time.time() 3361 res = wait([a, p.sentinel, b], 20) 3362 delta = time.time() - start 3363 3364 self.assertEqual(sorted_(res), sorted_([a, p.sentinel, b])) 3365 self.assertLess(delta, 0.4) 3366 3367 p.terminate() 3368 p.join() 3369 3370 def test_neg_timeout(self): 3371 from multiprocess.connection import wait 3372 a, b = multiprocessing.Pipe() 3373 t = time.time() 3374 res = wait([a], timeout=-1) 3375 t = time.time() - t 3376 self.assertEqual(res, []) 3377 self.assertLess(t, 1) 3378 a.close() 3379 b.close() 3380 3381# 3382# Issue 14151: Test invalid family on invalid environment 3383# 3384 3385class TestInvalidFamily(unittest.TestCase): 3386 3387 @unittest.skipIf(WIN32, "skipped on Windows") 3388 def test_invalid_family(self): 3389 with self.assertRaises(ValueError): 3390 multiprocessing.connection.Listener(r'\\.\test') 3391 3392 @unittest.skipUnless(WIN32, "skipped on non-Windows platforms") 3393 def test_invalid_family_win32(self): 3394 with self.assertRaises(ValueError): 3395 multiprocessing.connection.Listener('/var/test.pipe') 3396 3397# 3398# Issue 12098: check sys.flags of child matches that for parent 3399# 3400 3401class TestFlags(unittest.TestCase): 3402 @classmethod 3403 def run_in_grandchild(cls, conn): 3404 conn.send(tuple(sys.flags)) 3405 3406 @classmethod 3407 def run_in_child(cls): 3408 import json 3409 r, w = multiprocessing.Pipe(duplex=False) 3410 p = multiprocessing.Process(target=cls.run_in_grandchild, args=(w,)) 3411 p.start() 3412 grandchild_flags = r.recv() 3413 p.join() 3414 r.close() 3415 w.close() 3416 flags = (tuple(sys.flags), grandchild_flags) 3417 print(json.dumps(flags)) 3418 3419 def _test_flags(self): 3420 import json, subprocess 3421 # start child process using unusual flags 3422 prog = ('from multiprocess.tests import TestFlags; ' + 3423 'TestFlags.run_in_child()') 3424 data = subprocess.check_output( 3425 [sys.executable, '-E', '-S', '-O', '-c', prog]) 3426 child_flags, grandchild_flags = json.loads(data.decode('ascii')) 3427 self.assertEqual(child_flags, grandchild_flags) 3428 3429# 3430# Test interaction with socket timeouts - see Issue #6056 3431# 3432 3433class TestTimeouts(unittest.TestCase): 3434 @classmethod 3435 def _test_timeout(cls, child, address): 3436 time.sleep(1) 3437 child.send(123) 3438 child.close() 3439 conn = multiprocessing.connection.Client(address) 3440 conn.send(456) 3441 conn.close() 3442 3443 def test_timeout(self): 3444 old_timeout = socket.getdefaulttimeout() 3445 try: 3446 socket.setdefaulttimeout(0.1) 3447 parent, child = multiprocessing.Pipe(duplex=True) 3448 l = multiprocessing.connection.Listener(family='AF_INET') 3449 p = multiprocessing.Process(target=self._test_timeout, 3450 args=(child, l.address)) 3451 p.start() 3452 child.close() 3453 self.assertEqual(parent.recv(), 123) 3454 parent.close() 3455 conn = l.accept() 3456 self.assertEqual(conn.recv(), 456) 3457 conn.close() 3458 l.close() 3459 p.join(10) 3460 finally: 3461 socket.setdefaulttimeout(old_timeout) 3462 3463# 3464# Test what happens with no "if __name__ == '__main__'" 3465# 3466 3467class TestNoForkBomb(unittest.TestCase): 3468 def _test_noforkbomb(self): 3469 name = os.path.join(os.path.dirname(__file__), 'mp_fork_bomb.py') 3470 if WIN32: 3471 rc, out, err = test.script_helper.assert_python_failure(name) 3472 self.assertEqual('', out.decode('ascii')) 3473 self.assertIn('RuntimeError', err.decode('ascii')) 3474 else: 3475 rc, out, err = test.script_helper.assert_python_ok(name) 3476 self.assertEqual('123', out.decode('ascii').rstrip()) 3477 self.assertEqual('', err.decode('ascii')) 3478 3479# 3480# Issue #17555: ForkAwareThreadLock 3481# 3482 3483class TestForkAwareThreadLock(unittest.TestCase): 3484 # We recurisvely start processes. Issue #17555 meant that the 3485 # after fork registry would get duplicate entries for the same 3486 # lock. The size of the registry at generation n was ~2**n. 3487 3488 @classmethod 3489 def child(cls, n, conn): 3490 if n > 1: 3491 p = multiprocessing.Process(target=cls.child, args=(n-1, conn)) 3492 p.start() 3493 p.join() 3494 else: 3495 conn.send(len(util._afterfork_registry)) 3496 conn.close() 3497 3498 def test_lock(self): 3499 r, w = multiprocessing.Pipe(False) 3500 l = util.ForkAwareThreadLock() 3501 old_size = len(util._afterfork_registry) 3502 p = multiprocessing.Process(target=self.child, args=(5, w)) 3503 p.start() 3504 new_size = r.recv() 3505 p.join() 3506 self.assertLessEqual(new_size, old_size) 3507 3508# 3509# Issue #17097: EINTR should be ignored by recv(), send(), accept() etc 3510# 3511 3512class TestIgnoreEINTR(unittest.TestCase): 3513 3514 @classmethod 3515 def _test_ignore(cls, conn): 3516 def handler(signum, frame): 3517 pass 3518 signal.signal(signal.SIGUSR1, handler) 3519 conn.send('ready') 3520 x = conn.recv() 3521 conn.send(x) 3522 conn.send_bytes(b'x'*(1024*1024)) # sending 1 MB should block 3523 3524 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3525 def _test_ignore(self): 3526 conn, child_conn = multiprocessing.Pipe() 3527 try: 3528 p = multiprocessing.Process(target=self._test_ignore, 3529 args=(child_conn,)) 3530 p.daemon = True 3531 p.start() 3532 child_conn.close() 3533 self.assertEqual(conn.recv(), 'ready') 3534 time.sleep(0.1) 3535 os.kill(p.pid, signal.SIGUSR1) 3536 time.sleep(0.1) 3537 conn.send(1234) 3538 self.assertEqual(conn.recv(), 1234) 3539 time.sleep(0.1) 3540 os.kill(p.pid, signal.SIGUSR1) 3541 self.assertEqual(conn.recv_bytes(), b'x'*(1024*1024)) 3542 time.sleep(0.1) 3543 p.join() 3544 finally: 3545 conn.close() 3546 3547 @classmethod 3548 def _test_ignore_listener(cls, conn): 3549 def handler(signum, frame): 3550 pass 3551 signal.signal(signal.SIGUSR1, handler) 3552 l = multiprocessing.connection.Listener() 3553 conn.send(l.address) 3554 a = l.accept() 3555 a.send('welcome') 3556 3557 @unittest.skipUnless(hasattr(signal, 'SIGUSR1'), 'requires SIGUSR1') 3558 def test_ignore_listener(self): 3559 conn, child_conn = multiprocessing.Pipe() 3560 try: 3561 p = multiprocessing.Process(target=self._test_ignore_listener, 3562 args=(child_conn,)) 3563 p.daemon = True 3564 p.start() 3565 child_conn.close() 3566 address = conn.recv() 3567 time.sleep(0.1) 3568 os.kill(p.pid, signal.SIGUSR1) 3569 time.sleep(0.1) 3570 client = multiprocessing.connection.Client(address) 3571 self.assertEqual(client.recv(), 'welcome') 3572 p.join() 3573 finally: 3574 conn.close() 3575 3576# 3577# 3578# 3579 3580def setUpModule(): 3581 if sys.platform.startswith("linux"): 3582 try: 3583 lock = multiprocessing.RLock() 3584 except OSError: 3585 raise unittest.SkipTest("OSError raises on RLock creation, " 3586 "see issue 3111!") 3587 check_enough_semaphores() 3588 util.get_temp_dir() # creates temp directory for use by all processes 3589 multiprocessing.get_logger().setLevel(LOG_LEVEL) 3590 3591 3592def tearDownModule(): 3593 # pause a bit so we don't get warning about dangling threads/processes 3594 time.sleep(0.5) 3595 3596 3597if __name__ == '__main__': 3598 unittest.main() 3599