1import pytest
2
3import weakref
4
5from ..testing import wait_all_tasks_blocked, assert_checkpoints
6
7from .. import _core
8from .. import _timeouts
9from .._timeouts import sleep_forever, move_on_after
10from .._sync import *
11
12
13async def test_Event():
14    e = Event()
15    assert not e.is_set()
16    assert e.statistics().tasks_waiting == 0
17
18    e.set()
19    assert e.is_set()
20    with assert_checkpoints():
21        await e.wait()
22
23    e = Event()
24
25    record = []
26
27    async def child():
28        record.append("sleeping")
29        await e.wait()
30        record.append("woken")
31
32    async with _core.open_nursery() as nursery:
33        nursery.start_soon(child)
34        nursery.start_soon(child)
35        await wait_all_tasks_blocked()
36        assert record == ["sleeping", "sleeping"]
37        assert e.statistics().tasks_waiting == 2
38        e.set()
39        await wait_all_tasks_blocked()
40        assert record == ["sleeping", "sleeping", "woken", "woken"]
41
42
43async def test_CapacityLimiter():
44    with pytest.raises(TypeError):
45        CapacityLimiter(1.0)
46    with pytest.raises(ValueError):
47        CapacityLimiter(-1)
48    c = CapacityLimiter(2)
49    repr(c)  # smoke test
50    assert c.total_tokens == 2
51    assert c.borrowed_tokens == 0
52    assert c.available_tokens == 2
53    with pytest.raises(RuntimeError):
54        c.release()
55    assert c.borrowed_tokens == 0
56    c.acquire_nowait()
57    assert c.borrowed_tokens == 1
58    assert c.available_tokens == 1
59
60    stats = c.statistics()
61    assert stats.borrowed_tokens == 1
62    assert stats.total_tokens == 2
63    assert stats.borrowers == [_core.current_task()]
64    assert stats.tasks_waiting == 0
65
66    # Can't re-acquire when we already have it
67    with pytest.raises(RuntimeError):
68        c.acquire_nowait()
69    assert c.borrowed_tokens == 1
70    with pytest.raises(RuntimeError):
71        await c.acquire()
72    assert c.borrowed_tokens == 1
73
74    # We can acquire on behalf of someone else though
75    with assert_checkpoints():
76        await c.acquire_on_behalf_of("someone")
77
78    # But then we've run out of capacity
79    assert c.borrowed_tokens == 2
80    with pytest.raises(_core.WouldBlock):
81        c.acquire_on_behalf_of_nowait("third party")
82
83    assert set(c.statistics().borrowers) == {_core.current_task(), "someone"}
84
85    # Until we release one
86    c.release_on_behalf_of(_core.current_task())
87    assert c.statistics().borrowers == ["someone"]
88
89    c.release_on_behalf_of("someone")
90    assert c.borrowed_tokens == 0
91    with assert_checkpoints():
92        async with c:
93            assert c.borrowed_tokens == 1
94
95    async with _core.open_nursery() as nursery:
96        await c.acquire_on_behalf_of("value 1")
97        await c.acquire_on_behalf_of("value 2")
98        nursery.start_soon(c.acquire_on_behalf_of, "value 3")
99        await wait_all_tasks_blocked()
100        assert c.borrowed_tokens == 2
101        assert c.statistics().tasks_waiting == 1
102        c.release_on_behalf_of("value 2")
103        # Fairness:
104        assert c.borrowed_tokens == 2
105        with pytest.raises(_core.WouldBlock):
106            c.acquire_nowait()
107
108    c.release_on_behalf_of("value 3")
109    c.release_on_behalf_of("value 1")
110
111
112async def test_CapacityLimiter_inf():
113    from math import inf
114
115    c = CapacityLimiter(inf)
116    repr(c)  # smoke test
117    assert c.total_tokens == inf
118    assert c.borrowed_tokens == 0
119    assert c.available_tokens == inf
120    with pytest.raises(RuntimeError):
121        c.release()
122    assert c.borrowed_tokens == 0
123    c.acquire_nowait()
124    assert c.borrowed_tokens == 1
125    assert c.available_tokens == inf
126
127
128async def test_CapacityLimiter_change_total_tokens():
129    c = CapacityLimiter(2)
130
131    with pytest.raises(TypeError):
132        c.total_tokens = 1.0
133
134    with pytest.raises(ValueError):
135        c.total_tokens = 0
136
137    with pytest.raises(ValueError):
138        c.total_tokens = -10
139
140    assert c.total_tokens == 2
141
142    async with _core.open_nursery() as nursery:
143        for i in range(5):
144            nursery.start_soon(c.acquire_on_behalf_of, i)
145            await wait_all_tasks_blocked()
146        assert set(c.statistics().borrowers) == {0, 1}
147        assert c.statistics().tasks_waiting == 3
148        c.total_tokens += 2
149        assert set(c.statistics().borrowers) == {0, 1, 2, 3}
150        assert c.statistics().tasks_waiting == 1
151        c.total_tokens -= 3
152        assert c.borrowed_tokens == 4
153        assert c.total_tokens == 1
154        c.release_on_behalf_of(0)
155        c.release_on_behalf_of(1)
156        c.release_on_behalf_of(2)
157        assert set(c.statistics().borrowers) == {3}
158        assert c.statistics().tasks_waiting == 1
159        c.release_on_behalf_of(3)
160        assert set(c.statistics().borrowers) == {4}
161        assert c.statistics().tasks_waiting == 0
162
163
164# regression test for issue #548
165async def test_CapacityLimiter_memleak_548():
166    limiter = CapacityLimiter(total_tokens=1)
167    await limiter.acquire()
168
169    async with _core.open_nursery() as n:
170        n.start_soon(limiter.acquire)
171        await wait_all_tasks_blocked()  # give it a chance to run the task
172        n.cancel_scope.cancel()
173
174    # if this is 1, the acquire call (despite being killed) is still there in the task, and will
175    # leak memory all the while the limiter is active
176    assert len(limiter._pending_borrowers) == 0
177
178
179async def test_Semaphore():
180    with pytest.raises(TypeError):
181        Semaphore(1.0)
182    with pytest.raises(ValueError):
183        Semaphore(-1)
184    s = Semaphore(1)
185    repr(s)  # smoke test
186    assert s.value == 1
187    assert s.max_value is None
188    s.release()
189    assert s.value == 2
190    assert s.statistics().tasks_waiting == 0
191    s.acquire_nowait()
192    assert s.value == 1
193    with assert_checkpoints():
194        await s.acquire()
195    assert s.value == 0
196    with pytest.raises(_core.WouldBlock):
197        s.acquire_nowait()
198
199    s.release()
200    assert s.value == 1
201    with assert_checkpoints():
202        async with s:
203            assert s.value == 0
204    assert s.value == 1
205    s.acquire_nowait()
206
207    record = []
208
209    async def do_acquire(s):
210        record.append("started")
211        await s.acquire()
212        record.append("finished")
213
214    async with _core.open_nursery() as nursery:
215        nursery.start_soon(do_acquire, s)
216        await wait_all_tasks_blocked()
217        assert record == ["started"]
218        assert s.value == 0
219        s.release()
220        # Fairness:
221        assert s.value == 0
222        with pytest.raises(_core.WouldBlock):
223            s.acquire_nowait()
224    assert record == ["started", "finished"]
225
226
227async def test_Semaphore_bounded():
228    with pytest.raises(TypeError):
229        Semaphore(1, max_value=1.0)
230    with pytest.raises(ValueError):
231        Semaphore(2, max_value=1)
232    bs = Semaphore(1, max_value=1)
233    assert bs.max_value == 1
234    repr(bs)  # smoke test
235    with pytest.raises(ValueError):
236        bs.release()
237    assert bs.value == 1
238    bs.acquire_nowait()
239    assert bs.value == 0
240    bs.release()
241    assert bs.value == 1
242
243
244@pytest.mark.parametrize("lockcls", [Lock, StrictFIFOLock], ids=lambda fn: fn.__name__)
245async def test_Lock_and_StrictFIFOLock(lockcls):
246    l = lockcls()  # noqa
247    assert not l.locked()
248
249    # make sure locks can be weakref'ed (gh-331)
250    r = weakref.ref(l)
251    assert r() is l
252
253    repr(l)  # smoke test
254    # make sure repr uses the right name for subclasses
255    assert lockcls.__name__ in repr(l)
256    with assert_checkpoints():
257        async with l:
258            assert l.locked()
259            repr(l)  # smoke test (repr branches on locked/unlocked)
260    assert not l.locked()
261    l.acquire_nowait()
262    assert l.locked()
263    l.release()
264    assert not l.locked()
265    with assert_checkpoints():
266        await l.acquire()
267    assert l.locked()
268    l.release()
269    assert not l.locked()
270
271    l.acquire_nowait()
272    with pytest.raises(RuntimeError):
273        # Error out if we already own the lock
274        l.acquire_nowait()
275    l.release()
276    with pytest.raises(RuntimeError):
277        # Error out if we don't own the lock
278        l.release()
279
280    holder_task = None
281
282    async def holder():
283        nonlocal holder_task
284        holder_task = _core.current_task()
285        async with l:
286            await sleep_forever()
287
288    async with _core.open_nursery() as nursery:
289        assert not l.locked()
290        nursery.start_soon(holder)
291        await wait_all_tasks_blocked()
292        assert l.locked()
293        # WouldBlock if someone else holds the lock
294        with pytest.raises(_core.WouldBlock):
295            l.acquire_nowait()
296        # Can't release a lock someone else holds
297        with pytest.raises(RuntimeError):
298            l.release()
299
300        statistics = l.statistics()
301        print(statistics)
302        assert statistics.locked
303        assert statistics.owner is holder_task
304        assert statistics.tasks_waiting == 0
305
306        nursery.start_soon(holder)
307        await wait_all_tasks_blocked()
308        statistics = l.statistics()
309        print(statistics)
310        assert statistics.tasks_waiting == 1
311
312        nursery.cancel_scope.cancel()
313
314    statistics = l.statistics()
315    assert not statistics.locked
316    assert statistics.owner is None
317    assert statistics.tasks_waiting == 0
318
319
320async def test_Condition():
321    with pytest.raises(TypeError):
322        Condition(Semaphore(1))
323    with pytest.raises(TypeError):
324        Condition(StrictFIFOLock)
325    l = Lock()  # noqa
326    c = Condition(l)
327    assert not l.locked()
328    assert not c.locked()
329    with assert_checkpoints():
330        await c.acquire()
331    assert l.locked()
332    assert c.locked()
333
334    c = Condition()
335    assert not c.locked()
336    c.acquire_nowait()
337    assert c.locked()
338    with pytest.raises(RuntimeError):
339        c.acquire_nowait()
340    c.release()
341
342    with pytest.raises(RuntimeError):
343        # Can't wait without holding the lock
344        await c.wait()
345    with pytest.raises(RuntimeError):
346        # Can't notify without holding the lock
347        c.notify()
348    with pytest.raises(RuntimeError):
349        # Can't notify without holding the lock
350        c.notify_all()
351
352    finished_waiters = set()
353
354    async def waiter(i):
355        async with c:
356            await c.wait()
357        finished_waiters.add(i)
358
359    async with _core.open_nursery() as nursery:
360        for i in range(3):
361            nursery.start_soon(waiter, i)
362            await wait_all_tasks_blocked()
363        async with c:
364            c.notify()
365        assert c.locked()
366        await wait_all_tasks_blocked()
367        assert finished_waiters == {0}
368        async with c:
369            c.notify_all()
370        await wait_all_tasks_blocked()
371        assert finished_waiters == {0, 1, 2}
372
373    finished_waiters = set()
374    async with _core.open_nursery() as nursery:
375        for i in range(3):
376            nursery.start_soon(waiter, i)
377            await wait_all_tasks_blocked()
378        async with c:
379            c.notify(2)
380            statistics = c.statistics()
381            print(statistics)
382            assert statistics.tasks_waiting == 1
383            assert statistics.lock_statistics.tasks_waiting == 2
384        # exiting the context manager hands off the lock to the first task
385        assert c.statistics().lock_statistics.tasks_waiting == 1
386
387        await wait_all_tasks_blocked()
388        assert finished_waiters == {0, 1}
389
390        async with c:
391            c.notify_all()
392
393    # After being cancelled still hold the lock (!)
394    # (Note that c.__aexit__ checks that we hold the lock as well)
395    with _core.CancelScope() as scope:
396        async with c:
397            scope.cancel()
398            try:
399                await c.wait()
400            finally:
401                assert c.locked()
402
403
404from .._sync import async_cm
405from .._channel import open_memory_channel
406
407# Three ways of implementing a Lock in terms of a channel. Used to let us put
408# the channel through the generic lock tests.
409
410
411@async_cm
412class ChannelLock1:
413    def __init__(self, capacity):
414        self.s, self.r = open_memory_channel(capacity)
415        for _ in range(capacity - 1):
416            self.s.send_nowait(None)
417
418    def acquire_nowait(self):
419        self.s.send_nowait(None)
420
421    async def acquire(self):
422        await self.s.send(None)
423
424    def release(self):
425        self.r.receive_nowait()
426
427
428@async_cm
429class ChannelLock2:
430    def __init__(self):
431        self.s, self.r = open_memory_channel(10)
432        self.s.send_nowait(None)
433
434    def acquire_nowait(self):
435        self.r.receive_nowait()
436
437    async def acquire(self):
438        await self.r.receive()
439
440    def release(self):
441        self.s.send_nowait(None)
442
443
444@async_cm
445class ChannelLock3:
446    def __init__(self):
447        self.s, self.r = open_memory_channel(0)
448        # self.acquired is true when one task acquires the lock and
449        # only becomes false when it's released and no tasks are
450        # waiting to acquire.
451        self.acquired = False
452
453    def acquire_nowait(self):
454        assert not self.acquired
455        self.acquired = True
456
457    async def acquire(self):
458        if self.acquired:
459            await self.s.send(None)
460        else:
461            self.acquired = True
462            await _core.checkpoint()
463
464    def release(self):
465        try:
466            self.r.receive_nowait()
467        except _core.WouldBlock:
468            assert self.acquired
469            self.acquired = False
470
471
472lock_factories = [
473    lambda: CapacityLimiter(1),
474    lambda: Semaphore(1),
475    Lock,
476    StrictFIFOLock,
477    lambda: ChannelLock1(10),
478    lambda: ChannelLock1(1),
479    ChannelLock2,
480    ChannelLock3,
481]
482lock_factory_names = [
483    "CapacityLimiter(1)",
484    "Semaphore(1)",
485    "Lock",
486    "StrictFIFOLock",
487    "ChannelLock1(10)",
488    "ChannelLock1(1)",
489    "ChannelLock2",
490    "ChannelLock3",
491]
492
493generic_lock_test = pytest.mark.parametrize(
494    "lock_factory", lock_factories, ids=lock_factory_names
495)
496
497
498# Spawn a bunch of workers that take a lock and then yield; make sure that
499# only one worker is ever in the critical section at a time.
500@generic_lock_test
501async def test_generic_lock_exclusion(lock_factory):
502    LOOPS = 10
503    WORKERS = 5
504    in_critical_section = False
505    acquires = 0
506
507    async def worker(lock_like):
508        nonlocal in_critical_section, acquires
509        for _ in range(LOOPS):
510            async with lock_like:
511                acquires += 1
512                assert not in_critical_section
513                in_critical_section = True
514                await _core.checkpoint()
515                await _core.checkpoint()
516                assert in_critical_section
517                in_critical_section = False
518
519    async with _core.open_nursery() as nursery:
520        lock_like = lock_factory()
521        for _ in range(WORKERS):
522            nursery.start_soon(worker, lock_like)
523    assert not in_critical_section
524    assert acquires == LOOPS * WORKERS
525
526
527# Several workers queue on the same lock; make sure they each get it, in
528# order.
529@generic_lock_test
530async def test_generic_lock_fifo_fairness(lock_factory):
531    initial_order = []
532    record = []
533    LOOPS = 5
534
535    async def loopy(name, lock_like):
536        # Record the order each task was initially scheduled in
537        initial_order.append(name)
538        for _ in range(LOOPS):
539            async with lock_like:
540                record.append(name)
541
542    lock_like = lock_factory()
543    async with _core.open_nursery() as nursery:
544        nursery.start_soon(loopy, 1, lock_like)
545        nursery.start_soon(loopy, 2, lock_like)
546        nursery.start_soon(loopy, 3, lock_like)
547    # The first three could be in any order due to scheduling randomness,
548    # but after that they should repeat in the same order
549    for i in range(LOOPS):
550        assert record[3 * i : 3 * (i + 1)] == initial_order
551
552
553@generic_lock_test
554async def test_generic_lock_acquire_nowait_blocks_acquire(lock_factory):
555    lock_like = lock_factory()
556
557    record = []
558
559    async def lock_taker():
560        record.append("started")
561        async with lock_like:
562            pass
563        record.append("finished")
564
565    async with _core.open_nursery() as nursery:
566        lock_like.acquire_nowait()
567        nursery.start_soon(lock_taker)
568        await wait_all_tasks_blocked()
569        assert record == ["started"]
570        lock_like.release()
571