1""" 2Various tests for synchronization primitives. 3""" 4 5import gc 6import sys 7import time 8from _thread import start_new_thread, TIMEOUT_MAX 9import threading 10import unittest 11import weakref 12 13from test import support 14 15 16def _wait(): 17 # A crude wait/yield function not relying on synchronization primitives. 18 time.sleep(0.01) 19 20class Bunch(object): 21 """ 22 A bunch of threads. 23 """ 24 def __init__(self, f, n, wait_before_exit=False): 25 """ 26 Construct a bunch of `n` threads running the same function `f`. 27 If `wait_before_exit` is True, the threads won't terminate until 28 do_finish() is called. 29 """ 30 self.f = f 31 self.n = n 32 self.started = [] 33 self.finished = [] 34 self._can_exit = not wait_before_exit 35 self.wait_thread = support.wait_threads_exit() 36 self.wait_thread.__enter__() 37 38 def task(): 39 tid = threading.get_ident() 40 self.started.append(tid) 41 try: 42 f() 43 finally: 44 self.finished.append(tid) 45 while not self._can_exit: 46 _wait() 47 48 try: 49 for i in range(n): 50 start_new_thread(task, ()) 51 except: 52 self._can_exit = True 53 raise 54 55 def wait_for_started(self): 56 while len(self.started) < self.n: 57 _wait() 58 59 def wait_for_finished(self): 60 while len(self.finished) < self.n: 61 _wait() 62 # Wait for threads exit 63 self.wait_thread.__exit__(None, None, None) 64 65 def do_finish(self): 66 self._can_exit = True 67 68 69class BaseTestCase(unittest.TestCase): 70 def setUp(self): 71 self._threads = support.threading_setup() 72 73 def tearDown(self): 74 support.threading_cleanup(*self._threads) 75 support.reap_children() 76 77 def assertTimeout(self, actual, expected): 78 # The waiting and/or time.time() can be imprecise, which 79 # is why comparing to the expected value would sometimes fail 80 # (especially under Windows). 81 self.assertGreaterEqual(actual, expected * 0.6) 82 # Test nothing insane happened 83 self.assertLess(actual, expected * 10.0) 84 85 86class BaseLockTests(BaseTestCase): 87 """ 88 Tests for both recursive and non-recursive locks. 89 """ 90 91 def test_constructor(self): 92 lock = self.locktype() 93 del lock 94 95 def test_repr(self): 96 lock = self.locktype() 97 self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>") 98 del lock 99 100 def test_locked_repr(self): 101 lock = self.locktype() 102 lock.acquire() 103 self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>") 104 del lock 105 106 def test_acquire_destroy(self): 107 lock = self.locktype() 108 lock.acquire() 109 del lock 110 111 def test_acquire_release(self): 112 lock = self.locktype() 113 lock.acquire() 114 lock.release() 115 del lock 116 117 def test_try_acquire(self): 118 lock = self.locktype() 119 self.assertTrue(lock.acquire(False)) 120 lock.release() 121 122 def test_try_acquire_contended(self): 123 lock = self.locktype() 124 lock.acquire() 125 result = [] 126 def f(): 127 result.append(lock.acquire(False)) 128 Bunch(f, 1).wait_for_finished() 129 self.assertFalse(result[0]) 130 lock.release() 131 132 def test_acquire_contended(self): 133 lock = self.locktype() 134 lock.acquire() 135 N = 5 136 def f(): 137 lock.acquire() 138 lock.release() 139 140 b = Bunch(f, N) 141 b.wait_for_started() 142 _wait() 143 self.assertEqual(len(b.finished), 0) 144 lock.release() 145 b.wait_for_finished() 146 self.assertEqual(len(b.finished), N) 147 148 def test_with(self): 149 lock = self.locktype() 150 def f(): 151 lock.acquire() 152 lock.release() 153 def _with(err=None): 154 with lock: 155 if err is not None: 156 raise err 157 _with() 158 # Check the lock is unacquired 159 Bunch(f, 1).wait_for_finished() 160 self.assertRaises(TypeError, _with, TypeError) 161 # Check the lock is unacquired 162 Bunch(f, 1).wait_for_finished() 163 164 def test_thread_leak(self): 165 # The lock shouldn't leak a Thread instance when used from a foreign 166 # (non-threading) thread. 167 lock = self.locktype() 168 def f(): 169 lock.acquire() 170 lock.release() 171 n = len(threading.enumerate()) 172 # We run many threads in the hope that existing threads ids won't 173 # be recycled. 174 Bunch(f, 15).wait_for_finished() 175 if len(threading.enumerate()) != n: 176 # There is a small window during which a Thread instance's 177 # target function has finished running, but the Thread is still 178 # alive and registered. Avoid spurious failures by waiting a 179 # bit more (seen on a buildbot). 180 time.sleep(0.4) 181 self.assertEqual(n, len(threading.enumerate())) 182 183 def test_timeout(self): 184 lock = self.locktype() 185 # Can't set timeout if not blocking 186 self.assertRaises(ValueError, lock.acquire, 0, 1) 187 # Invalid timeout values 188 self.assertRaises(ValueError, lock.acquire, timeout=-100) 189 self.assertRaises(OverflowError, lock.acquire, timeout=1e100) 190 self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1) 191 # TIMEOUT_MAX is ok 192 lock.acquire(timeout=TIMEOUT_MAX) 193 lock.release() 194 t1 = time.time() 195 self.assertTrue(lock.acquire(timeout=5)) 196 t2 = time.time() 197 # Just a sanity test that it didn't actually wait for the timeout. 198 self.assertLess(t2 - t1, 5) 199 results = [] 200 def f(): 201 t1 = time.time() 202 results.append(lock.acquire(timeout=0.5)) 203 t2 = time.time() 204 results.append(t2 - t1) 205 Bunch(f, 1).wait_for_finished() 206 self.assertFalse(results[0]) 207 self.assertTimeout(results[1], 0.5) 208 209 def test_weakref_exists(self): 210 lock = self.locktype() 211 ref = weakref.ref(lock) 212 self.assertIsNotNone(ref()) 213 214 def test_weakref_deleted(self): 215 lock = self.locktype() 216 ref = weakref.ref(lock) 217 del lock 218 gc.collect() 219 self.assertIsNone(ref()) 220 221 222class LockTests(BaseLockTests): 223 """ 224 Tests for non-recursive, weak locks 225 (which can be acquired and released from different threads). 226 """ 227 def test_reacquire(self): 228 # Lock needs to be released before re-acquiring. 229 lock = self.locktype() 230 phase = [] 231 232 def f(): 233 lock.acquire() 234 phase.append(None) 235 lock.acquire() 236 phase.append(None) 237 238 with support.wait_threads_exit(): 239 start_new_thread(f, ()) 240 while len(phase) == 0: 241 _wait() 242 _wait() 243 self.assertEqual(len(phase), 1) 244 lock.release() 245 while len(phase) == 1: 246 _wait() 247 self.assertEqual(len(phase), 2) 248 249 def test_different_thread(self): 250 # Lock can be released from a different thread. 251 lock = self.locktype() 252 lock.acquire() 253 def f(): 254 lock.release() 255 b = Bunch(f, 1) 256 b.wait_for_finished() 257 lock.acquire() 258 lock.release() 259 260 def test_state_after_timeout(self): 261 # Issue #11618: check that lock is in a proper state after a 262 # (non-zero) timeout. 263 lock = self.locktype() 264 lock.acquire() 265 self.assertFalse(lock.acquire(timeout=0.01)) 266 lock.release() 267 self.assertFalse(lock.locked()) 268 self.assertTrue(lock.acquire(blocking=False)) 269 270 271class RLockTests(BaseLockTests): 272 """ 273 Tests for recursive locks. 274 """ 275 def test_reacquire(self): 276 lock = self.locktype() 277 lock.acquire() 278 lock.acquire() 279 lock.release() 280 lock.acquire() 281 lock.release() 282 lock.release() 283 284 def test_release_unacquired(self): 285 # Cannot release an unacquired lock 286 lock = self.locktype() 287 self.assertRaises(RuntimeError, lock.release) 288 lock.acquire() 289 lock.acquire() 290 lock.release() 291 lock.acquire() 292 lock.release() 293 lock.release() 294 self.assertRaises(RuntimeError, lock.release) 295 296 def test_release_save_unacquired(self): 297 # Cannot _release_save an unacquired lock 298 lock = self.locktype() 299 self.assertRaises(RuntimeError, lock._release_save) 300 lock.acquire() 301 lock.acquire() 302 lock.release() 303 lock.acquire() 304 lock.release() 305 lock.release() 306 self.assertRaises(RuntimeError, lock._release_save) 307 308 def test_different_thread(self): 309 # Cannot release from a different thread 310 lock = self.locktype() 311 def f(): 312 lock.acquire() 313 b = Bunch(f, 1, True) 314 try: 315 self.assertRaises(RuntimeError, lock.release) 316 finally: 317 b.do_finish() 318 b.wait_for_finished() 319 320 def test__is_owned(self): 321 lock = self.locktype() 322 self.assertFalse(lock._is_owned()) 323 lock.acquire() 324 self.assertTrue(lock._is_owned()) 325 lock.acquire() 326 self.assertTrue(lock._is_owned()) 327 result = [] 328 def f(): 329 result.append(lock._is_owned()) 330 Bunch(f, 1).wait_for_finished() 331 self.assertFalse(result[0]) 332 lock.release() 333 self.assertTrue(lock._is_owned()) 334 lock.release() 335 self.assertFalse(lock._is_owned()) 336 337 338class EventTests(BaseTestCase): 339 """ 340 Tests for Event objects. 341 """ 342 343 def test_is_set(self): 344 evt = self.eventtype() 345 self.assertFalse(evt.is_set()) 346 evt.set() 347 self.assertTrue(evt.is_set()) 348 evt.set() 349 self.assertTrue(evt.is_set()) 350 evt.clear() 351 self.assertFalse(evt.is_set()) 352 evt.clear() 353 self.assertFalse(evt.is_set()) 354 355 def _check_notify(self, evt): 356 # All threads get notified 357 N = 5 358 results1 = [] 359 results2 = [] 360 def f(): 361 results1.append(evt.wait()) 362 results2.append(evt.wait()) 363 b = Bunch(f, N) 364 b.wait_for_started() 365 _wait() 366 self.assertEqual(len(results1), 0) 367 evt.set() 368 b.wait_for_finished() 369 self.assertEqual(results1, [True] * N) 370 self.assertEqual(results2, [True] * N) 371 372 def test_notify(self): 373 evt = self.eventtype() 374 self._check_notify(evt) 375 # Another time, after an explicit clear() 376 evt.set() 377 evt.clear() 378 self._check_notify(evt) 379 380 def test_timeout(self): 381 evt = self.eventtype() 382 results1 = [] 383 results2 = [] 384 N = 5 385 def f(): 386 results1.append(evt.wait(0.0)) 387 t1 = time.time() 388 r = evt.wait(0.5) 389 t2 = time.time() 390 results2.append((r, t2 - t1)) 391 Bunch(f, N).wait_for_finished() 392 self.assertEqual(results1, [False] * N) 393 for r, dt in results2: 394 self.assertFalse(r) 395 self.assertTimeout(dt, 0.5) 396 # The event is set 397 results1 = [] 398 results2 = [] 399 evt.set() 400 Bunch(f, N).wait_for_finished() 401 self.assertEqual(results1, [True] * N) 402 for r, dt in results2: 403 self.assertTrue(r) 404 405 def test_set_and_clear(self): 406 # Issue #13502: check that wait() returns true even when the event is 407 # cleared before the waiting thread is woken up. 408 evt = self.eventtype() 409 results = [] 410 timeout = 0.250 411 N = 5 412 def f(): 413 results.append(evt.wait(timeout * 4)) 414 b = Bunch(f, N) 415 b.wait_for_started() 416 time.sleep(timeout) 417 evt.set() 418 evt.clear() 419 b.wait_for_finished() 420 self.assertEqual(results, [True] * N) 421 422 def test_reset_internal_locks(self): 423 # ensure that condition is still using a Lock after reset 424 evt = self.eventtype() 425 with evt._cond: 426 self.assertFalse(evt._cond.acquire(False)) 427 evt._reset_internal_locks() 428 with evt._cond: 429 self.assertFalse(evt._cond.acquire(False)) 430 431 432class ConditionTests(BaseTestCase): 433 """ 434 Tests for condition variables. 435 """ 436 437 def test_acquire(self): 438 cond = self.condtype() 439 # Be default we have an RLock: the condition can be acquired multiple 440 # times. 441 cond.acquire() 442 cond.acquire() 443 cond.release() 444 cond.release() 445 lock = threading.Lock() 446 cond = self.condtype(lock) 447 cond.acquire() 448 self.assertFalse(lock.acquire(False)) 449 cond.release() 450 self.assertTrue(lock.acquire(False)) 451 self.assertFalse(cond.acquire(False)) 452 lock.release() 453 with cond: 454 self.assertFalse(lock.acquire(False)) 455 456 def test_unacquired_wait(self): 457 cond = self.condtype() 458 self.assertRaises(RuntimeError, cond.wait) 459 460 def test_unacquired_notify(self): 461 cond = self.condtype() 462 self.assertRaises(RuntimeError, cond.notify) 463 464 def _check_notify(self, cond): 465 # Note that this test is sensitive to timing. If the worker threads 466 # don't execute in a timely fashion, the main thread may think they 467 # are further along then they are. The main thread therefore issues 468 # _wait() statements to try to make sure that it doesn't race ahead 469 # of the workers. 470 # Secondly, this test assumes that condition variables are not subject 471 # to spurious wakeups. The absence of spurious wakeups is an implementation 472 # detail of Condition Cariables in current CPython, but in general, not 473 # a guaranteed property of condition variables as a programming 474 # construct. In particular, it is possible that this can no longer 475 # be conveniently guaranteed should their implementation ever change. 476 N = 5 477 ready = [] 478 results1 = [] 479 results2 = [] 480 phase_num = 0 481 def f(): 482 cond.acquire() 483 ready.append(phase_num) 484 result = cond.wait() 485 cond.release() 486 results1.append((result, phase_num)) 487 cond.acquire() 488 ready.append(phase_num) 489 result = cond.wait() 490 cond.release() 491 results2.append((result, phase_num)) 492 b = Bunch(f, N) 493 b.wait_for_started() 494 # first wait, to ensure all workers settle into cond.wait() before 495 # we continue. See issues #8799 and #30727. 496 while len(ready) < 5: 497 _wait() 498 ready.clear() 499 self.assertEqual(results1, []) 500 # Notify 3 threads at first 501 cond.acquire() 502 cond.notify(3) 503 _wait() 504 phase_num = 1 505 cond.release() 506 while len(results1) < 3: 507 _wait() 508 self.assertEqual(results1, [(True, 1)] * 3) 509 self.assertEqual(results2, []) 510 # make sure all awaken workers settle into cond.wait() 511 while len(ready) < 3: 512 _wait() 513 # Notify 5 threads: they might be in their first or second wait 514 cond.acquire() 515 cond.notify(5) 516 _wait() 517 phase_num = 2 518 cond.release() 519 while len(results1) + len(results2) < 8: 520 _wait() 521 self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2) 522 self.assertEqual(results2, [(True, 2)] * 3) 523 # make sure all workers settle into cond.wait() 524 while len(ready) < 5: 525 _wait() 526 # Notify all threads: they are all in their second wait 527 cond.acquire() 528 cond.notify_all() 529 _wait() 530 phase_num = 3 531 cond.release() 532 while len(results2) < 5: 533 _wait() 534 self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2) 535 self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2) 536 b.wait_for_finished() 537 538 def test_notify(self): 539 cond = self.condtype() 540 self._check_notify(cond) 541 # A second time, to check internal state is still ok. 542 self._check_notify(cond) 543 544 def test_timeout(self): 545 cond = self.condtype() 546 results = [] 547 N = 5 548 def f(): 549 cond.acquire() 550 t1 = time.time() 551 result = cond.wait(0.5) 552 t2 = time.time() 553 cond.release() 554 results.append((t2 - t1, result)) 555 Bunch(f, N).wait_for_finished() 556 self.assertEqual(len(results), N) 557 for dt, result in results: 558 self.assertTimeout(dt, 0.5) 559 # Note that conceptually (that"s the condition variable protocol) 560 # a wait() may succeed even if no one notifies us and before any 561 # timeout occurs. Spurious wakeups can occur. 562 # This makes it hard to verify the result value. 563 # In practice, this implementation has no spurious wakeups. 564 self.assertFalse(result) 565 566 def test_waitfor(self): 567 cond = self.condtype() 568 state = 0 569 def f(): 570 with cond: 571 result = cond.wait_for(lambda : state==4) 572 self.assertTrue(result) 573 self.assertEqual(state, 4) 574 b = Bunch(f, 1) 575 b.wait_for_started() 576 for i in range(4): 577 time.sleep(0.01) 578 with cond: 579 state += 1 580 cond.notify() 581 b.wait_for_finished() 582 583 def test_waitfor_timeout(self): 584 cond = self.condtype() 585 state = 0 586 success = [] 587 def f(): 588 with cond: 589 dt = time.time() 590 result = cond.wait_for(lambda : state==4, timeout=0.1) 591 dt = time.time() - dt 592 self.assertFalse(result) 593 self.assertTimeout(dt, 0.1) 594 success.append(None) 595 b = Bunch(f, 1) 596 b.wait_for_started() 597 # Only increment 3 times, so state == 4 is never reached. 598 for i in range(3): 599 time.sleep(0.01) 600 with cond: 601 state += 1 602 cond.notify() 603 b.wait_for_finished() 604 self.assertEqual(len(success), 1) 605 606 607class BaseSemaphoreTests(BaseTestCase): 608 """ 609 Common tests for {bounded, unbounded} semaphore objects. 610 """ 611 612 def test_constructor(self): 613 self.assertRaises(ValueError, self.semtype, value = -1) 614 self.assertRaises(ValueError, self.semtype, value = -sys.maxsize) 615 616 def test_acquire(self): 617 sem = self.semtype(1) 618 sem.acquire() 619 sem.release() 620 sem = self.semtype(2) 621 sem.acquire() 622 sem.acquire() 623 sem.release() 624 sem.release() 625 626 def test_acquire_destroy(self): 627 sem = self.semtype() 628 sem.acquire() 629 del sem 630 631 def test_acquire_contended(self): 632 sem = self.semtype(7) 633 sem.acquire() 634 N = 10 635 sem_results = [] 636 results1 = [] 637 results2 = [] 638 phase_num = 0 639 def f(): 640 sem_results.append(sem.acquire()) 641 results1.append(phase_num) 642 sem_results.append(sem.acquire()) 643 results2.append(phase_num) 644 b = Bunch(f, 10) 645 b.wait_for_started() 646 while len(results1) + len(results2) < 6: 647 _wait() 648 self.assertEqual(results1 + results2, [0] * 6) 649 phase_num = 1 650 for i in range(7): 651 sem.release() 652 while len(results1) + len(results2) < 13: 653 _wait() 654 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7) 655 phase_num = 2 656 for i in range(6): 657 sem.release() 658 while len(results1) + len(results2) < 19: 659 _wait() 660 self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6) 661 # The semaphore is still locked 662 self.assertFalse(sem.acquire(False)) 663 # Final release, to let the last thread finish 664 sem.release() 665 b.wait_for_finished() 666 self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1)) 667 668 def test_try_acquire(self): 669 sem = self.semtype(2) 670 self.assertTrue(sem.acquire(False)) 671 self.assertTrue(sem.acquire(False)) 672 self.assertFalse(sem.acquire(False)) 673 sem.release() 674 self.assertTrue(sem.acquire(False)) 675 676 def test_try_acquire_contended(self): 677 sem = self.semtype(4) 678 sem.acquire() 679 results = [] 680 def f(): 681 results.append(sem.acquire(False)) 682 results.append(sem.acquire(False)) 683 Bunch(f, 5).wait_for_finished() 684 # There can be a thread switch between acquiring the semaphore and 685 # appending the result, therefore results will not necessarily be 686 # ordered. 687 self.assertEqual(sorted(results), [False] * 7 + [True] * 3 ) 688 689 def test_acquire_timeout(self): 690 sem = self.semtype(2) 691 self.assertRaises(ValueError, sem.acquire, False, timeout=1.0) 692 self.assertTrue(sem.acquire(timeout=0.005)) 693 self.assertTrue(sem.acquire(timeout=0.005)) 694 self.assertFalse(sem.acquire(timeout=0.005)) 695 sem.release() 696 self.assertTrue(sem.acquire(timeout=0.005)) 697 t = time.time() 698 self.assertFalse(sem.acquire(timeout=0.5)) 699 dt = time.time() - t 700 self.assertTimeout(dt, 0.5) 701 702 def test_default_value(self): 703 # The default initial value is 1. 704 sem = self.semtype() 705 sem.acquire() 706 def f(): 707 sem.acquire() 708 sem.release() 709 b = Bunch(f, 1) 710 b.wait_for_started() 711 _wait() 712 self.assertFalse(b.finished) 713 sem.release() 714 b.wait_for_finished() 715 716 def test_with(self): 717 sem = self.semtype(2) 718 def _with(err=None): 719 with sem: 720 self.assertTrue(sem.acquire(False)) 721 sem.release() 722 with sem: 723 self.assertFalse(sem.acquire(False)) 724 if err: 725 raise err 726 _with() 727 self.assertTrue(sem.acquire(False)) 728 sem.release() 729 self.assertRaises(TypeError, _with, TypeError) 730 self.assertTrue(sem.acquire(False)) 731 sem.release() 732 733class SemaphoreTests(BaseSemaphoreTests): 734 """ 735 Tests for unbounded semaphores. 736 """ 737 738 def test_release_unacquired(self): 739 # Unbounded releases are allowed and increment the semaphore's value 740 sem = self.semtype(1) 741 sem.release() 742 sem.acquire() 743 sem.acquire() 744 sem.release() 745 746 747class BoundedSemaphoreTests(BaseSemaphoreTests): 748 """ 749 Tests for bounded semaphores. 750 """ 751 752 def test_release_unacquired(self): 753 # Cannot go past the initial value 754 sem = self.semtype() 755 self.assertRaises(ValueError, sem.release) 756 sem.acquire() 757 sem.release() 758 self.assertRaises(ValueError, sem.release) 759 760 761class BarrierTests(BaseTestCase): 762 """ 763 Tests for Barrier objects. 764 """ 765 N = 5 766 defaultTimeout = 2.0 767 768 def setUp(self): 769 self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout) 770 def tearDown(self): 771 self.barrier.abort() 772 773 def run_threads(self, f): 774 b = Bunch(f, self.N-1) 775 f() 776 b.wait_for_finished() 777 778 def multipass(self, results, n): 779 m = self.barrier.parties 780 self.assertEqual(m, self.N) 781 for i in range(n): 782 results[0].append(True) 783 self.assertEqual(len(results[1]), i * m) 784 self.barrier.wait() 785 results[1].append(True) 786 self.assertEqual(len(results[0]), (i + 1) * m) 787 self.barrier.wait() 788 self.assertEqual(self.barrier.n_waiting, 0) 789 self.assertFalse(self.barrier.broken) 790 791 def test_barrier(self, passes=1): 792 """ 793 Test that a barrier is passed in lockstep 794 """ 795 results = [[],[]] 796 def f(): 797 self.multipass(results, passes) 798 self.run_threads(f) 799 800 def test_barrier_10(self): 801 """ 802 Test that a barrier works for 10 consecutive runs 803 """ 804 return self.test_barrier(10) 805 806 def test_wait_return(self): 807 """ 808 test the return value from barrier.wait 809 """ 810 results = [] 811 def f(): 812 r = self.barrier.wait() 813 results.append(r) 814 815 self.run_threads(f) 816 self.assertEqual(sum(results), sum(range(self.N))) 817 818 def test_action(self): 819 """ 820 Test the 'action' callback 821 """ 822 results = [] 823 def action(): 824 results.append(True) 825 barrier = self.barriertype(self.N, action) 826 def f(): 827 barrier.wait() 828 self.assertEqual(len(results), 1) 829 830 self.run_threads(f) 831 832 def test_abort(self): 833 """ 834 Test that an abort will put the barrier in a broken state 835 """ 836 results1 = [] 837 results2 = [] 838 def f(): 839 try: 840 i = self.barrier.wait() 841 if i == self.N//2: 842 raise RuntimeError 843 self.barrier.wait() 844 results1.append(True) 845 except threading.BrokenBarrierError: 846 results2.append(True) 847 except RuntimeError: 848 self.barrier.abort() 849 pass 850 851 self.run_threads(f) 852 self.assertEqual(len(results1), 0) 853 self.assertEqual(len(results2), self.N-1) 854 self.assertTrue(self.barrier.broken) 855 856 def test_reset(self): 857 """ 858 Test that a 'reset' on a barrier frees the waiting threads 859 """ 860 results1 = [] 861 results2 = [] 862 results3 = [] 863 def f(): 864 i = self.barrier.wait() 865 if i == self.N//2: 866 # Wait until the other threads are all in the barrier. 867 while self.barrier.n_waiting < self.N-1: 868 time.sleep(0.001) 869 self.barrier.reset() 870 else: 871 try: 872 self.barrier.wait() 873 results1.append(True) 874 except threading.BrokenBarrierError: 875 results2.append(True) 876 # Now, pass the barrier again 877 self.barrier.wait() 878 results3.append(True) 879 880 self.run_threads(f) 881 self.assertEqual(len(results1), 0) 882 self.assertEqual(len(results2), self.N-1) 883 self.assertEqual(len(results3), self.N) 884 885 886 def test_abort_and_reset(self): 887 """ 888 Test that a barrier can be reset after being broken. 889 """ 890 results1 = [] 891 results2 = [] 892 results3 = [] 893 barrier2 = self.barriertype(self.N) 894 def f(): 895 try: 896 i = self.barrier.wait() 897 if i == self.N//2: 898 raise RuntimeError 899 self.barrier.wait() 900 results1.append(True) 901 except threading.BrokenBarrierError: 902 results2.append(True) 903 except RuntimeError: 904 self.barrier.abort() 905 pass 906 # Synchronize and reset the barrier. Must synchronize first so 907 # that everyone has left it when we reset, and after so that no 908 # one enters it before the reset. 909 if barrier2.wait() == self.N//2: 910 self.barrier.reset() 911 barrier2.wait() 912 self.barrier.wait() 913 results3.append(True) 914 915 self.run_threads(f) 916 self.assertEqual(len(results1), 0) 917 self.assertEqual(len(results2), self.N-1) 918 self.assertEqual(len(results3), self.N) 919 920 def test_timeout(self): 921 """ 922 Test wait(timeout) 923 """ 924 def f(): 925 i = self.barrier.wait() 926 if i == self.N // 2: 927 # One thread is late! 928 time.sleep(1.0) 929 # Default timeout is 2.0, so this is shorter. 930 self.assertRaises(threading.BrokenBarrierError, 931 self.barrier.wait, 0.5) 932 self.run_threads(f) 933 934 def test_default_timeout(self): 935 """ 936 Test the barrier's default timeout 937 """ 938 # create a barrier with a low default timeout 939 barrier = self.barriertype(self.N, timeout=0.3) 940 def f(): 941 i = barrier.wait() 942 if i == self.N // 2: 943 # One thread is later than the default timeout of 0.3s. 944 time.sleep(1.0) 945 self.assertRaises(threading.BrokenBarrierError, barrier.wait) 946 self.run_threads(f) 947 948 def test_single_thread(self): 949 b = self.barriertype(1) 950 b.wait() 951 b.wait() 952