1"""
2Various tests for synchronization primitives.
3"""
4
5import gc
6import sys
7import time
8from _thread import start_new_thread, TIMEOUT_MAX
9import threading
10import unittest
11import weakref
12
13from test import support
14
15
16def _wait():
17    # A crude wait/yield function not relying on synchronization primitives.
18    time.sleep(0.01)
19
20class Bunch(object):
21    """
22    A bunch of threads.
23    """
24    def __init__(self, f, n, wait_before_exit=False):
25        """
26        Construct a bunch of `n` threads running the same function `f`.
27        If `wait_before_exit` is True, the threads won't terminate until
28        do_finish() is called.
29        """
30        self.f = f
31        self.n = n
32        self.started = []
33        self.finished = []
34        self._can_exit = not wait_before_exit
35        self.wait_thread = support.wait_threads_exit()
36        self.wait_thread.__enter__()
37
38        def task():
39            tid = threading.get_ident()
40            self.started.append(tid)
41            try:
42                f()
43            finally:
44                self.finished.append(tid)
45                while not self._can_exit:
46                    _wait()
47
48        try:
49            for i in range(n):
50                start_new_thread(task, ())
51        except:
52            self._can_exit = True
53            raise
54
55    def wait_for_started(self):
56        while len(self.started) < self.n:
57            _wait()
58
59    def wait_for_finished(self):
60        while len(self.finished) < self.n:
61            _wait()
62        # Wait for threads exit
63        self.wait_thread.__exit__(None, None, None)
64
65    def do_finish(self):
66        self._can_exit = True
67
68
69class BaseTestCase(unittest.TestCase):
70    def setUp(self):
71        self._threads = support.threading_setup()
72
73    def tearDown(self):
74        support.threading_cleanup(*self._threads)
75        support.reap_children()
76
77    def assertTimeout(self, actual, expected):
78        # The waiting and/or time.time() can be imprecise, which
79        # is why comparing to the expected value would sometimes fail
80        # (especially under Windows).
81        self.assertGreaterEqual(actual, expected * 0.6)
82        # Test nothing insane happened
83        self.assertLess(actual, expected * 10.0)
84
85
86class BaseLockTests(BaseTestCase):
87    """
88    Tests for both recursive and non-recursive locks.
89    """
90
91    def test_constructor(self):
92        lock = self.locktype()
93        del lock
94
95    def test_repr(self):
96        lock = self.locktype()
97        self.assertRegex(repr(lock), "<unlocked .* object (.*)?at .*>")
98        del lock
99
100    def test_locked_repr(self):
101        lock = self.locktype()
102        lock.acquire()
103        self.assertRegex(repr(lock), "<locked .* object (.*)?at .*>")
104        del lock
105
106    def test_acquire_destroy(self):
107        lock = self.locktype()
108        lock.acquire()
109        del lock
110
111    def test_acquire_release(self):
112        lock = self.locktype()
113        lock.acquire()
114        lock.release()
115        del lock
116
117    def test_try_acquire(self):
118        lock = self.locktype()
119        self.assertTrue(lock.acquire(False))
120        lock.release()
121
122    def test_try_acquire_contended(self):
123        lock = self.locktype()
124        lock.acquire()
125        result = []
126        def f():
127            result.append(lock.acquire(False))
128        Bunch(f, 1).wait_for_finished()
129        self.assertFalse(result[0])
130        lock.release()
131
132    def test_acquire_contended(self):
133        lock = self.locktype()
134        lock.acquire()
135        N = 5
136        def f():
137            lock.acquire()
138            lock.release()
139
140        b = Bunch(f, N)
141        b.wait_for_started()
142        _wait()
143        self.assertEqual(len(b.finished), 0)
144        lock.release()
145        b.wait_for_finished()
146        self.assertEqual(len(b.finished), N)
147
148    def test_with(self):
149        lock = self.locktype()
150        def f():
151            lock.acquire()
152            lock.release()
153        def _with(err=None):
154            with lock:
155                if err is not None:
156                    raise err
157        _with()
158        # Check the lock is unacquired
159        Bunch(f, 1).wait_for_finished()
160        self.assertRaises(TypeError, _with, TypeError)
161        # Check the lock is unacquired
162        Bunch(f, 1).wait_for_finished()
163
164    def test_thread_leak(self):
165        # The lock shouldn't leak a Thread instance when used from a foreign
166        # (non-threading) thread.
167        lock = self.locktype()
168        def f():
169            lock.acquire()
170            lock.release()
171        n = len(threading.enumerate())
172        # We run many threads in the hope that existing threads ids won't
173        # be recycled.
174        Bunch(f, 15).wait_for_finished()
175        if len(threading.enumerate()) != n:
176            # There is a small window during which a Thread instance's
177            # target function has finished running, but the Thread is still
178            # alive and registered.  Avoid spurious failures by waiting a
179            # bit more (seen on a buildbot).
180            time.sleep(0.4)
181            self.assertEqual(n, len(threading.enumerate()))
182
183    def test_timeout(self):
184        lock = self.locktype()
185        # Can't set timeout if not blocking
186        self.assertRaises(ValueError, lock.acquire, 0, 1)
187        # Invalid timeout values
188        self.assertRaises(ValueError, lock.acquire, timeout=-100)
189        self.assertRaises(OverflowError, lock.acquire, timeout=1e100)
190        self.assertRaises(OverflowError, lock.acquire, timeout=TIMEOUT_MAX + 1)
191        # TIMEOUT_MAX is ok
192        lock.acquire(timeout=TIMEOUT_MAX)
193        lock.release()
194        t1 = time.time()
195        self.assertTrue(lock.acquire(timeout=5))
196        t2 = time.time()
197        # Just a sanity test that it didn't actually wait for the timeout.
198        self.assertLess(t2 - t1, 5)
199        results = []
200        def f():
201            t1 = time.time()
202            results.append(lock.acquire(timeout=0.5))
203            t2 = time.time()
204            results.append(t2 - t1)
205        Bunch(f, 1).wait_for_finished()
206        self.assertFalse(results[0])
207        self.assertTimeout(results[1], 0.5)
208
209    def test_weakref_exists(self):
210        lock = self.locktype()
211        ref = weakref.ref(lock)
212        self.assertIsNotNone(ref())
213
214    def test_weakref_deleted(self):
215        lock = self.locktype()
216        ref = weakref.ref(lock)
217        del lock
218        gc.collect()
219        self.assertIsNone(ref())
220
221
222class LockTests(BaseLockTests):
223    """
224    Tests for non-recursive, weak locks
225    (which can be acquired and released from different threads).
226    """
227    def test_reacquire(self):
228        # Lock needs to be released before re-acquiring.
229        lock = self.locktype()
230        phase = []
231
232        def f():
233            lock.acquire()
234            phase.append(None)
235            lock.acquire()
236            phase.append(None)
237
238        with support.wait_threads_exit():
239            start_new_thread(f, ())
240            while len(phase) == 0:
241                _wait()
242            _wait()
243            self.assertEqual(len(phase), 1)
244            lock.release()
245            while len(phase) == 1:
246                _wait()
247            self.assertEqual(len(phase), 2)
248
249    def test_different_thread(self):
250        # Lock can be released from a different thread.
251        lock = self.locktype()
252        lock.acquire()
253        def f():
254            lock.release()
255        b = Bunch(f, 1)
256        b.wait_for_finished()
257        lock.acquire()
258        lock.release()
259
260    def test_state_after_timeout(self):
261        # Issue #11618: check that lock is in a proper state after a
262        # (non-zero) timeout.
263        lock = self.locktype()
264        lock.acquire()
265        self.assertFalse(lock.acquire(timeout=0.01))
266        lock.release()
267        self.assertFalse(lock.locked())
268        self.assertTrue(lock.acquire(blocking=False))
269
270
271class RLockTests(BaseLockTests):
272    """
273    Tests for recursive locks.
274    """
275    def test_reacquire(self):
276        lock = self.locktype()
277        lock.acquire()
278        lock.acquire()
279        lock.release()
280        lock.acquire()
281        lock.release()
282        lock.release()
283
284    def test_release_unacquired(self):
285        # Cannot release an unacquired lock
286        lock = self.locktype()
287        self.assertRaises(RuntimeError, lock.release)
288        lock.acquire()
289        lock.acquire()
290        lock.release()
291        lock.acquire()
292        lock.release()
293        lock.release()
294        self.assertRaises(RuntimeError, lock.release)
295
296    def test_release_save_unacquired(self):
297        # Cannot _release_save an unacquired lock
298        lock = self.locktype()
299        self.assertRaises(RuntimeError, lock._release_save)
300        lock.acquire()
301        lock.acquire()
302        lock.release()
303        lock.acquire()
304        lock.release()
305        lock.release()
306        self.assertRaises(RuntimeError, lock._release_save)
307
308    def test_different_thread(self):
309        # Cannot release from a different thread
310        lock = self.locktype()
311        def f():
312            lock.acquire()
313        b = Bunch(f, 1, True)
314        try:
315            self.assertRaises(RuntimeError, lock.release)
316        finally:
317            b.do_finish()
318        b.wait_for_finished()
319
320    def test__is_owned(self):
321        lock = self.locktype()
322        self.assertFalse(lock._is_owned())
323        lock.acquire()
324        self.assertTrue(lock._is_owned())
325        lock.acquire()
326        self.assertTrue(lock._is_owned())
327        result = []
328        def f():
329            result.append(lock._is_owned())
330        Bunch(f, 1).wait_for_finished()
331        self.assertFalse(result[0])
332        lock.release()
333        self.assertTrue(lock._is_owned())
334        lock.release()
335        self.assertFalse(lock._is_owned())
336
337
338class EventTests(BaseTestCase):
339    """
340    Tests for Event objects.
341    """
342
343    def test_is_set(self):
344        evt = self.eventtype()
345        self.assertFalse(evt.is_set())
346        evt.set()
347        self.assertTrue(evt.is_set())
348        evt.set()
349        self.assertTrue(evt.is_set())
350        evt.clear()
351        self.assertFalse(evt.is_set())
352        evt.clear()
353        self.assertFalse(evt.is_set())
354
355    def _check_notify(self, evt):
356        # All threads get notified
357        N = 5
358        results1 = []
359        results2 = []
360        def f():
361            results1.append(evt.wait())
362            results2.append(evt.wait())
363        b = Bunch(f, N)
364        b.wait_for_started()
365        _wait()
366        self.assertEqual(len(results1), 0)
367        evt.set()
368        b.wait_for_finished()
369        self.assertEqual(results1, [True] * N)
370        self.assertEqual(results2, [True] * N)
371
372    def test_notify(self):
373        evt = self.eventtype()
374        self._check_notify(evt)
375        # Another time, after an explicit clear()
376        evt.set()
377        evt.clear()
378        self._check_notify(evt)
379
380    def test_timeout(self):
381        evt = self.eventtype()
382        results1 = []
383        results2 = []
384        N = 5
385        def f():
386            results1.append(evt.wait(0.0))
387            t1 = time.time()
388            r = evt.wait(0.5)
389            t2 = time.time()
390            results2.append((r, t2 - t1))
391        Bunch(f, N).wait_for_finished()
392        self.assertEqual(results1, [False] * N)
393        for r, dt in results2:
394            self.assertFalse(r)
395            self.assertTimeout(dt, 0.5)
396        # The event is set
397        results1 = []
398        results2 = []
399        evt.set()
400        Bunch(f, N).wait_for_finished()
401        self.assertEqual(results1, [True] * N)
402        for r, dt in results2:
403            self.assertTrue(r)
404
405    def test_set_and_clear(self):
406        # Issue #13502: check that wait() returns true even when the event is
407        # cleared before the waiting thread is woken up.
408        evt = self.eventtype()
409        results = []
410        timeout = 0.250
411        N = 5
412        def f():
413            results.append(evt.wait(timeout * 4))
414        b = Bunch(f, N)
415        b.wait_for_started()
416        time.sleep(timeout)
417        evt.set()
418        evt.clear()
419        b.wait_for_finished()
420        self.assertEqual(results, [True] * N)
421
422    def test_reset_internal_locks(self):
423        # ensure that condition is still using a Lock after reset
424        evt = self.eventtype()
425        with evt._cond:
426            self.assertFalse(evt._cond.acquire(False))
427        evt._reset_internal_locks()
428        with evt._cond:
429            self.assertFalse(evt._cond.acquire(False))
430
431
432class ConditionTests(BaseTestCase):
433    """
434    Tests for condition variables.
435    """
436
437    def test_acquire(self):
438        cond = self.condtype()
439        # Be default we have an RLock: the condition can be acquired multiple
440        # times.
441        cond.acquire()
442        cond.acquire()
443        cond.release()
444        cond.release()
445        lock = threading.Lock()
446        cond = self.condtype(lock)
447        cond.acquire()
448        self.assertFalse(lock.acquire(False))
449        cond.release()
450        self.assertTrue(lock.acquire(False))
451        self.assertFalse(cond.acquire(False))
452        lock.release()
453        with cond:
454            self.assertFalse(lock.acquire(False))
455
456    def test_unacquired_wait(self):
457        cond = self.condtype()
458        self.assertRaises(RuntimeError, cond.wait)
459
460    def test_unacquired_notify(self):
461        cond = self.condtype()
462        self.assertRaises(RuntimeError, cond.notify)
463
464    def _check_notify(self, cond):
465        # Note that this test is sensitive to timing.  If the worker threads
466        # don't execute in a timely fashion, the main thread may think they
467        # are further along then they are.  The main thread therefore issues
468        # _wait() statements to try to make sure that it doesn't race ahead
469        # of the workers.
470        # Secondly, this test assumes that condition variables are not subject
471        # to spurious wakeups.  The absence of spurious wakeups is an implementation
472        # detail of Condition Cariables in current CPython, but in general, not
473        # a guaranteed property of condition variables as a programming
474        # construct.  In particular, it is possible that this can no longer
475        # be conveniently guaranteed should their implementation ever change.
476        N = 5
477        ready = []
478        results1 = []
479        results2 = []
480        phase_num = 0
481        def f():
482            cond.acquire()
483            ready.append(phase_num)
484            result = cond.wait()
485            cond.release()
486            results1.append((result, phase_num))
487            cond.acquire()
488            ready.append(phase_num)
489            result = cond.wait()
490            cond.release()
491            results2.append((result, phase_num))
492        b = Bunch(f, N)
493        b.wait_for_started()
494        # first wait, to ensure all workers settle into cond.wait() before
495        # we continue. See issues #8799 and #30727.
496        while len(ready) < 5:
497            _wait()
498        ready.clear()
499        self.assertEqual(results1, [])
500        # Notify 3 threads at first
501        cond.acquire()
502        cond.notify(3)
503        _wait()
504        phase_num = 1
505        cond.release()
506        while len(results1) < 3:
507            _wait()
508        self.assertEqual(results1, [(True, 1)] * 3)
509        self.assertEqual(results2, [])
510        # make sure all awaken workers settle into cond.wait()
511        while len(ready) < 3:
512            _wait()
513        # Notify 5 threads: they might be in their first or second wait
514        cond.acquire()
515        cond.notify(5)
516        _wait()
517        phase_num = 2
518        cond.release()
519        while len(results1) + len(results2) < 8:
520            _wait()
521        self.assertEqual(results1, [(True, 1)] * 3 + [(True, 2)] * 2)
522        self.assertEqual(results2, [(True, 2)] * 3)
523        # make sure all workers settle into cond.wait()
524        while len(ready) < 5:
525            _wait()
526        # Notify all threads: they are all in their second wait
527        cond.acquire()
528        cond.notify_all()
529        _wait()
530        phase_num = 3
531        cond.release()
532        while len(results2) < 5:
533            _wait()
534        self.assertEqual(results1, [(True, 1)] * 3 + [(True,2)] * 2)
535        self.assertEqual(results2, [(True, 2)] * 3 + [(True, 3)] * 2)
536        b.wait_for_finished()
537
538    def test_notify(self):
539        cond = self.condtype()
540        self._check_notify(cond)
541        # A second time, to check internal state is still ok.
542        self._check_notify(cond)
543
544    def test_timeout(self):
545        cond = self.condtype()
546        results = []
547        N = 5
548        def f():
549            cond.acquire()
550            t1 = time.time()
551            result = cond.wait(0.5)
552            t2 = time.time()
553            cond.release()
554            results.append((t2 - t1, result))
555        Bunch(f, N).wait_for_finished()
556        self.assertEqual(len(results), N)
557        for dt, result in results:
558            self.assertTimeout(dt, 0.5)
559            # Note that conceptually (that"s the condition variable protocol)
560            # a wait() may succeed even if no one notifies us and before any
561            # timeout occurs.  Spurious wakeups can occur.
562            # This makes it hard to verify the result value.
563            # In practice, this implementation has no spurious wakeups.
564            self.assertFalse(result)
565
566    def test_waitfor(self):
567        cond = self.condtype()
568        state = 0
569        def f():
570            with cond:
571                result = cond.wait_for(lambda : state==4)
572                self.assertTrue(result)
573                self.assertEqual(state, 4)
574        b = Bunch(f, 1)
575        b.wait_for_started()
576        for i in range(4):
577            time.sleep(0.01)
578            with cond:
579                state += 1
580                cond.notify()
581        b.wait_for_finished()
582
583    def test_waitfor_timeout(self):
584        cond = self.condtype()
585        state = 0
586        success = []
587        def f():
588            with cond:
589                dt = time.time()
590                result = cond.wait_for(lambda : state==4, timeout=0.1)
591                dt = time.time() - dt
592                self.assertFalse(result)
593                self.assertTimeout(dt, 0.1)
594                success.append(None)
595        b = Bunch(f, 1)
596        b.wait_for_started()
597        # Only increment 3 times, so state == 4 is never reached.
598        for i in range(3):
599            time.sleep(0.01)
600            with cond:
601                state += 1
602                cond.notify()
603        b.wait_for_finished()
604        self.assertEqual(len(success), 1)
605
606
607class BaseSemaphoreTests(BaseTestCase):
608    """
609    Common tests for {bounded, unbounded} semaphore objects.
610    """
611
612    def test_constructor(self):
613        self.assertRaises(ValueError, self.semtype, value = -1)
614        self.assertRaises(ValueError, self.semtype, value = -sys.maxsize)
615
616    def test_acquire(self):
617        sem = self.semtype(1)
618        sem.acquire()
619        sem.release()
620        sem = self.semtype(2)
621        sem.acquire()
622        sem.acquire()
623        sem.release()
624        sem.release()
625
626    def test_acquire_destroy(self):
627        sem = self.semtype()
628        sem.acquire()
629        del sem
630
631    def test_acquire_contended(self):
632        sem = self.semtype(7)
633        sem.acquire()
634        N = 10
635        sem_results = []
636        results1 = []
637        results2 = []
638        phase_num = 0
639        def f():
640            sem_results.append(sem.acquire())
641            results1.append(phase_num)
642            sem_results.append(sem.acquire())
643            results2.append(phase_num)
644        b = Bunch(f, 10)
645        b.wait_for_started()
646        while len(results1) + len(results2) < 6:
647            _wait()
648        self.assertEqual(results1 + results2, [0] * 6)
649        phase_num = 1
650        for i in range(7):
651            sem.release()
652        while len(results1) + len(results2) < 13:
653            _wait()
654        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7)
655        phase_num = 2
656        for i in range(6):
657            sem.release()
658        while len(results1) + len(results2) < 19:
659            _wait()
660        self.assertEqual(sorted(results1 + results2), [0] * 6 + [1] * 7 + [2] * 6)
661        # The semaphore is still locked
662        self.assertFalse(sem.acquire(False))
663        # Final release, to let the last thread finish
664        sem.release()
665        b.wait_for_finished()
666        self.assertEqual(sem_results, [True] * (6 + 7 + 6 + 1))
667
668    def test_try_acquire(self):
669        sem = self.semtype(2)
670        self.assertTrue(sem.acquire(False))
671        self.assertTrue(sem.acquire(False))
672        self.assertFalse(sem.acquire(False))
673        sem.release()
674        self.assertTrue(sem.acquire(False))
675
676    def test_try_acquire_contended(self):
677        sem = self.semtype(4)
678        sem.acquire()
679        results = []
680        def f():
681            results.append(sem.acquire(False))
682            results.append(sem.acquire(False))
683        Bunch(f, 5).wait_for_finished()
684        # There can be a thread switch between acquiring the semaphore and
685        # appending the result, therefore results will not necessarily be
686        # ordered.
687        self.assertEqual(sorted(results), [False] * 7 + [True] *  3 )
688
689    def test_acquire_timeout(self):
690        sem = self.semtype(2)
691        self.assertRaises(ValueError, sem.acquire, False, timeout=1.0)
692        self.assertTrue(sem.acquire(timeout=0.005))
693        self.assertTrue(sem.acquire(timeout=0.005))
694        self.assertFalse(sem.acquire(timeout=0.005))
695        sem.release()
696        self.assertTrue(sem.acquire(timeout=0.005))
697        t = time.time()
698        self.assertFalse(sem.acquire(timeout=0.5))
699        dt = time.time() - t
700        self.assertTimeout(dt, 0.5)
701
702    def test_default_value(self):
703        # The default initial value is 1.
704        sem = self.semtype()
705        sem.acquire()
706        def f():
707            sem.acquire()
708            sem.release()
709        b = Bunch(f, 1)
710        b.wait_for_started()
711        _wait()
712        self.assertFalse(b.finished)
713        sem.release()
714        b.wait_for_finished()
715
716    def test_with(self):
717        sem = self.semtype(2)
718        def _with(err=None):
719            with sem:
720                self.assertTrue(sem.acquire(False))
721                sem.release()
722                with sem:
723                    self.assertFalse(sem.acquire(False))
724                    if err:
725                        raise err
726        _with()
727        self.assertTrue(sem.acquire(False))
728        sem.release()
729        self.assertRaises(TypeError, _with, TypeError)
730        self.assertTrue(sem.acquire(False))
731        sem.release()
732
733class SemaphoreTests(BaseSemaphoreTests):
734    """
735    Tests for unbounded semaphores.
736    """
737
738    def test_release_unacquired(self):
739        # Unbounded releases are allowed and increment the semaphore's value
740        sem = self.semtype(1)
741        sem.release()
742        sem.acquire()
743        sem.acquire()
744        sem.release()
745
746
747class BoundedSemaphoreTests(BaseSemaphoreTests):
748    """
749    Tests for bounded semaphores.
750    """
751
752    def test_release_unacquired(self):
753        # Cannot go past the initial value
754        sem = self.semtype()
755        self.assertRaises(ValueError, sem.release)
756        sem.acquire()
757        sem.release()
758        self.assertRaises(ValueError, sem.release)
759
760
761class BarrierTests(BaseTestCase):
762    """
763    Tests for Barrier objects.
764    """
765    N = 5
766    defaultTimeout = 2.0
767
768    def setUp(self):
769        self.barrier = self.barriertype(self.N, timeout=self.defaultTimeout)
770    def tearDown(self):
771        self.barrier.abort()
772
773    def run_threads(self, f):
774        b = Bunch(f, self.N-1)
775        f()
776        b.wait_for_finished()
777
778    def multipass(self, results, n):
779        m = self.barrier.parties
780        self.assertEqual(m, self.N)
781        for i in range(n):
782            results[0].append(True)
783            self.assertEqual(len(results[1]), i * m)
784            self.barrier.wait()
785            results[1].append(True)
786            self.assertEqual(len(results[0]), (i + 1) * m)
787            self.barrier.wait()
788        self.assertEqual(self.barrier.n_waiting, 0)
789        self.assertFalse(self.barrier.broken)
790
791    def test_barrier(self, passes=1):
792        """
793        Test that a barrier is passed in lockstep
794        """
795        results = [[],[]]
796        def f():
797            self.multipass(results, passes)
798        self.run_threads(f)
799
800    def test_barrier_10(self):
801        """
802        Test that a barrier works for 10 consecutive runs
803        """
804        return self.test_barrier(10)
805
806    def test_wait_return(self):
807        """
808        test the return value from barrier.wait
809        """
810        results = []
811        def f():
812            r = self.barrier.wait()
813            results.append(r)
814
815        self.run_threads(f)
816        self.assertEqual(sum(results), sum(range(self.N)))
817
818    def test_action(self):
819        """
820        Test the 'action' callback
821        """
822        results = []
823        def action():
824            results.append(True)
825        barrier = self.barriertype(self.N, action)
826        def f():
827            barrier.wait()
828            self.assertEqual(len(results), 1)
829
830        self.run_threads(f)
831
832    def test_abort(self):
833        """
834        Test that an abort will put the barrier in a broken state
835        """
836        results1 = []
837        results2 = []
838        def f():
839            try:
840                i = self.barrier.wait()
841                if i == self.N//2:
842                    raise RuntimeError
843                self.barrier.wait()
844                results1.append(True)
845            except threading.BrokenBarrierError:
846                results2.append(True)
847            except RuntimeError:
848                self.barrier.abort()
849                pass
850
851        self.run_threads(f)
852        self.assertEqual(len(results1), 0)
853        self.assertEqual(len(results2), self.N-1)
854        self.assertTrue(self.barrier.broken)
855
856    def test_reset(self):
857        """
858        Test that a 'reset' on a barrier frees the waiting threads
859        """
860        results1 = []
861        results2 = []
862        results3 = []
863        def f():
864            i = self.barrier.wait()
865            if i == self.N//2:
866                # Wait until the other threads are all in the barrier.
867                while self.barrier.n_waiting < self.N-1:
868                    time.sleep(0.001)
869                self.barrier.reset()
870            else:
871                try:
872                    self.barrier.wait()
873                    results1.append(True)
874                except threading.BrokenBarrierError:
875                    results2.append(True)
876            # Now, pass the barrier again
877            self.barrier.wait()
878            results3.append(True)
879
880        self.run_threads(f)
881        self.assertEqual(len(results1), 0)
882        self.assertEqual(len(results2), self.N-1)
883        self.assertEqual(len(results3), self.N)
884
885
886    def test_abort_and_reset(self):
887        """
888        Test that a barrier can be reset after being broken.
889        """
890        results1 = []
891        results2 = []
892        results3 = []
893        barrier2 = self.barriertype(self.N)
894        def f():
895            try:
896                i = self.barrier.wait()
897                if i == self.N//2:
898                    raise RuntimeError
899                self.barrier.wait()
900                results1.append(True)
901            except threading.BrokenBarrierError:
902                results2.append(True)
903            except RuntimeError:
904                self.barrier.abort()
905                pass
906            # Synchronize and reset the barrier.  Must synchronize first so
907            # that everyone has left it when we reset, and after so that no
908            # one enters it before the reset.
909            if barrier2.wait() == self.N//2:
910                self.barrier.reset()
911            barrier2.wait()
912            self.barrier.wait()
913            results3.append(True)
914
915        self.run_threads(f)
916        self.assertEqual(len(results1), 0)
917        self.assertEqual(len(results2), self.N-1)
918        self.assertEqual(len(results3), self.N)
919
920    def test_timeout(self):
921        """
922        Test wait(timeout)
923        """
924        def f():
925            i = self.barrier.wait()
926            if i == self.N // 2:
927                # One thread is late!
928                time.sleep(1.0)
929            # Default timeout is 2.0, so this is shorter.
930            self.assertRaises(threading.BrokenBarrierError,
931                              self.barrier.wait, 0.5)
932        self.run_threads(f)
933
934    def test_default_timeout(self):
935        """
936        Test the barrier's default timeout
937        """
938        # create a barrier with a low default timeout
939        barrier = self.barriertype(self.N, timeout=0.3)
940        def f():
941            i = barrier.wait()
942            if i == self.N // 2:
943                # One thread is later than the default timeout of 0.3s.
944                time.sleep(1.0)
945            self.assertRaises(threading.BrokenBarrierError, barrier.wait)
946        self.run_threads(f)
947
948    def test_single_thread(self):
949        b = self.barriertype(1)
950        b.wait()
951        b.wait()
952