1"""Synchronization primitives."""
2
3__all__ = ('Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore')
4
5import collections
6import types
7import warnings
8
9from . import events
10from . import futures
11from . import exceptions
12from .import coroutines
13
14
15class _ContextManager:
16    """Context manager.
17
18    This enables the following idiom for acquiring and releasing a
19    lock around a block:
20
21        with (yield from lock):
22            <block>
23
24    while failing loudly when accidentally using:
25
26        with lock:
27            <block>
28
29    Deprecated, use 'async with' statement:
30        async with lock:
31            <block>
32    """
33
34    def __init__(self, lock):
35        self._lock = lock
36
37    def __enter__(self):
38        # We have no use for the "as ..."  clause in the with
39        # statement for locks.
40        return None
41
42    def __exit__(self, *args):
43        try:
44            self._lock.release()
45        finally:
46            self._lock = None  # Crudely prevent reuse.
47
48
49class _ContextManagerMixin:
50    def __enter__(self):
51        raise RuntimeError(
52            '"yield from" should be used as context manager expression')
53
54    def __exit__(self, *args):
55        # This must exist because __enter__ exists, even though that
56        # always raises; that's how the with-statement works.
57        pass
58
59    @types.coroutine
60    def __iter__(self):
61        # This is not a coroutine.  It is meant to enable the idiom:
62        #
63        #     with (yield from lock):
64        #         <block>
65        #
66        # as an alternative to:
67        #
68        #     yield from lock.acquire()
69        #     try:
70        #         <block>
71        #     finally:
72        #         lock.release()
73        # Deprecated, use 'async with' statement:
74        #     async with lock:
75        #         <block>
76        warnings.warn("'with (yield from lock)' is deprecated "
77                      "use 'async with lock' instead",
78                      DeprecationWarning, stacklevel=2)
79        yield from self.acquire()
80        return _ContextManager(self)
81
82    # The flag is needed for legacy asyncio.iscoroutine()
83    __iter__._is_coroutine = coroutines._is_coroutine
84
85    async def __acquire_ctx(self):
86        await self.acquire()
87        return _ContextManager(self)
88
89    def __await__(self):
90        warnings.warn("'with await lock' is deprecated "
91                      "use 'async with lock' instead",
92                      DeprecationWarning, stacklevel=2)
93        # To make "with await lock" work.
94        return self.__acquire_ctx().__await__()
95
96    async def __aenter__(self):
97        await self.acquire()
98        # We have no use for the "as ..."  clause in the with
99        # statement for locks.
100        return None
101
102    async def __aexit__(self, exc_type, exc, tb):
103        self.release()
104
105
106class Lock(_ContextManagerMixin):
107    """Primitive lock objects.
108
109    A primitive lock is a synchronization primitive that is not owned
110    by a particular coroutine when locked.  A primitive lock is in one
111    of two states, 'locked' or 'unlocked'.
112
113    It is created in the unlocked state.  It has two basic methods,
114    acquire() and release().  When the state is unlocked, acquire()
115    changes the state to locked and returns immediately.  When the
116    state is locked, acquire() blocks until a call to release() in
117    another coroutine changes it to unlocked, then the acquire() call
118    resets it to locked and returns.  The release() method should only
119    be called in the locked state; it changes the state to unlocked
120    and returns immediately.  If an attempt is made to release an
121    unlocked lock, a RuntimeError will be raised.
122
123    When more than one coroutine is blocked in acquire() waiting for
124    the state to turn to unlocked, only one coroutine proceeds when a
125    release() call resets the state to unlocked; first coroutine which
126    is blocked in acquire() is being processed.
127
128    acquire() is a coroutine and should be called with 'await'.
129
130    Locks also support the asynchronous context management protocol.
131    'async with lock' statement should be used.
132
133    Usage:
134
135        lock = Lock()
136        ...
137        await lock.acquire()
138        try:
139            ...
140        finally:
141            lock.release()
142
143    Context manager usage:
144
145        lock = Lock()
146        ...
147        async with lock:
148             ...
149
150    Lock objects can be tested for locking state:
151
152        if not lock.locked():
153           await lock.acquire()
154        else:
155           # lock is acquired
156           ...
157
158    """
159
160    def __init__(self, *, loop=None):
161        self._waiters = None
162        self._locked = False
163        if loop is None:
164            self._loop = events.get_event_loop()
165        else:
166            self._loop = loop
167            warnings.warn("The loop argument is deprecated since Python 3.8, "
168                          "and scheduled for removal in Python 3.10.",
169                          DeprecationWarning, stacklevel=2)
170
171    def __repr__(self):
172        res = super().__repr__()
173        extra = 'locked' if self._locked else 'unlocked'
174        if self._waiters:
175            extra = f'{extra}, waiters:{len(self._waiters)}'
176        return f'<{res[1:-1]} [{extra}]>'
177
178    def locked(self):
179        """Return True if lock is acquired."""
180        return self._locked
181
182    async def acquire(self):
183        """Acquire a lock.
184
185        This method blocks until the lock is unlocked, then sets it to
186        locked and returns True.
187        """
188        if (not self._locked and (self._waiters is None or
189                all(w.cancelled() for w in self._waiters))):
190            self._locked = True
191            return True
192
193        if self._waiters is None:
194            self._waiters = collections.deque()
195        fut = self._loop.create_future()
196        self._waiters.append(fut)
197
198        # Finally block should be called before the CancelledError
199        # handling as we don't want CancelledError to call
200        # _wake_up_first() and attempt to wake up itself.
201        try:
202            try:
203                await fut
204            finally:
205                self._waiters.remove(fut)
206        except exceptions.CancelledError:
207            if not self._locked:
208                self._wake_up_first()
209            raise
210
211        self._locked = True
212        return True
213
214    def release(self):
215        """Release a lock.
216
217        When the lock is locked, reset it to unlocked, and return.
218        If any other coroutines are blocked waiting for the lock to become
219        unlocked, allow exactly one of them to proceed.
220
221        When invoked on an unlocked lock, a RuntimeError is raised.
222
223        There is no return value.
224        """
225        if self._locked:
226            self._locked = False
227            self._wake_up_first()
228        else:
229            raise RuntimeError('Lock is not acquired.')
230
231    def _wake_up_first(self):
232        """Wake up the first waiter if it isn't done."""
233        if not self._waiters:
234            return
235        try:
236            fut = next(iter(self._waiters))
237        except StopIteration:
238            return
239
240        # .done() necessarily means that a waiter will wake up later on and
241        # either take the lock, or, if it was cancelled and lock wasn't
242        # taken already, will hit this again and wake up a new waiter.
243        if not fut.done():
244            fut.set_result(True)
245
246
247class Event:
248    """Asynchronous equivalent to threading.Event.
249
250    Class implementing event objects. An event manages a flag that can be set
251    to true with the set() method and reset to false with the clear() method.
252    The wait() method blocks until the flag is true. The flag is initially
253    false.
254    """
255
256    def __init__(self, *, loop=None):
257        self._waiters = collections.deque()
258        self._value = False
259        if loop is None:
260            self._loop = events.get_event_loop()
261        else:
262            self._loop = loop
263            warnings.warn("The loop argument is deprecated since Python 3.8, "
264                          "and scheduled for removal in Python 3.10.",
265                          DeprecationWarning, stacklevel=2)
266
267    def __repr__(self):
268        res = super().__repr__()
269        extra = 'set' if self._value else 'unset'
270        if self._waiters:
271            extra = f'{extra}, waiters:{len(self._waiters)}'
272        return f'<{res[1:-1]} [{extra}]>'
273
274    def is_set(self):
275        """Return True if and only if the internal flag is true."""
276        return self._value
277
278    def set(self):
279        """Set the internal flag to true. All coroutines waiting for it to
280        become true are awakened. Coroutine that call wait() once the flag is
281        true will not block at all.
282        """
283        if not self._value:
284            self._value = True
285
286            for fut in self._waiters:
287                if not fut.done():
288                    fut.set_result(True)
289
290    def clear(self):
291        """Reset the internal flag to false. Subsequently, coroutines calling
292        wait() will block until set() is called to set the internal flag
293        to true again."""
294        self._value = False
295
296    async def wait(self):
297        """Block until the internal flag is true.
298
299        If the internal flag is true on entry, return True
300        immediately.  Otherwise, block until another coroutine calls
301        set() to set the flag to true, then return True.
302        """
303        if self._value:
304            return True
305
306        fut = self._loop.create_future()
307        self._waiters.append(fut)
308        try:
309            await fut
310            return True
311        finally:
312            self._waiters.remove(fut)
313
314
315class Condition(_ContextManagerMixin):
316    """Asynchronous equivalent to threading.Condition.
317
318    This class implements condition variable objects. A condition variable
319    allows one or more coroutines to wait until they are notified by another
320    coroutine.
321
322    A new Lock object is created and used as the underlying lock.
323    """
324
325    def __init__(self, lock=None, *, loop=None):
326        if loop is None:
327            self._loop = events.get_event_loop()
328        else:
329            self._loop = loop
330            warnings.warn("The loop argument is deprecated since Python 3.8, "
331                          "and scheduled for removal in Python 3.10.",
332                          DeprecationWarning, stacklevel=2)
333
334        if lock is None:
335            lock = Lock(loop=loop)
336        elif lock._loop is not self._loop:
337            raise ValueError("loop argument must agree with lock")
338
339        self._lock = lock
340        # Export the lock's locked(), acquire() and release() methods.
341        self.locked = lock.locked
342        self.acquire = lock.acquire
343        self.release = lock.release
344
345        self._waiters = collections.deque()
346
347    def __repr__(self):
348        res = super().__repr__()
349        extra = 'locked' if self.locked() else 'unlocked'
350        if self._waiters:
351            extra = f'{extra}, waiters:{len(self._waiters)}'
352        return f'<{res[1:-1]} [{extra}]>'
353
354    async def wait(self):
355        """Wait until notified.
356
357        If the calling coroutine has not acquired the lock when this
358        method is called, a RuntimeError is raised.
359
360        This method releases the underlying lock, and then blocks
361        until it is awakened by a notify() or notify_all() call for
362        the same condition variable in another coroutine.  Once
363        awakened, it re-acquires the lock and returns True.
364        """
365        if not self.locked():
366            raise RuntimeError('cannot wait on un-acquired lock')
367
368        self.release()
369        try:
370            fut = self._loop.create_future()
371            self._waiters.append(fut)
372            try:
373                await fut
374                return True
375            finally:
376                self._waiters.remove(fut)
377
378        finally:
379            # Must reacquire lock even if wait is cancelled
380            cancelled = False
381            while True:
382                try:
383                    await self.acquire()
384                    break
385                except exceptions.CancelledError:
386                    cancelled = True
387
388            if cancelled:
389                raise exceptions.CancelledError
390
391    async def wait_for(self, predicate):
392        """Wait until a predicate becomes true.
393
394        The predicate should be a callable which result will be
395        interpreted as a boolean value.  The final predicate value is
396        the return value.
397        """
398        result = predicate()
399        while not result:
400            await self.wait()
401            result = predicate()
402        return result
403
404    def notify(self, n=1):
405        """By default, wake up one coroutine waiting on this condition, if any.
406        If the calling coroutine has not acquired the lock when this method
407        is called, a RuntimeError is raised.
408
409        This method wakes up at most n of the coroutines waiting for the
410        condition variable; it is a no-op if no coroutines are waiting.
411
412        Note: an awakened coroutine does not actually return from its
413        wait() call until it can reacquire the lock. Since notify() does
414        not release the lock, its caller should.
415        """
416        if not self.locked():
417            raise RuntimeError('cannot notify on un-acquired lock')
418
419        idx = 0
420        for fut in self._waiters:
421            if idx >= n:
422                break
423
424            if not fut.done():
425                idx += 1
426                fut.set_result(False)
427
428    def notify_all(self):
429        """Wake up all threads waiting on this condition. This method acts
430        like notify(), but wakes up all waiting threads instead of one. If the
431        calling thread has not acquired the lock when this method is called,
432        a RuntimeError is raised.
433        """
434        self.notify(len(self._waiters))
435
436
437class Semaphore(_ContextManagerMixin):
438    """A Semaphore implementation.
439
440    A semaphore manages an internal counter which is decremented by each
441    acquire() call and incremented by each release() call. The counter
442    can never go below zero; when acquire() finds that it is zero, it blocks,
443    waiting until some other thread calls release().
444
445    Semaphores also support the context management protocol.
446
447    The optional argument gives the initial value for the internal
448    counter; it defaults to 1. If the value given is less than 0,
449    ValueError is raised.
450    """
451
452    def __init__(self, value=1, *, loop=None):
453        if value < 0:
454            raise ValueError("Semaphore initial value must be >= 0")
455        self._value = value
456        self._waiters = collections.deque()
457        if loop is None:
458            self._loop = events.get_event_loop()
459        else:
460            self._loop = loop
461            warnings.warn("The loop argument is deprecated since Python 3.8, "
462                          "and scheduled for removal in Python 3.10.",
463                          DeprecationWarning, stacklevel=2)
464
465    def __repr__(self):
466        res = super().__repr__()
467        extra = 'locked' if self.locked() else f'unlocked, value:{self._value}'
468        if self._waiters:
469            extra = f'{extra}, waiters:{len(self._waiters)}'
470        return f'<{res[1:-1]} [{extra}]>'
471
472    def _wake_up_next(self):
473        while self._waiters:
474            waiter = self._waiters.popleft()
475            if not waiter.done():
476                waiter.set_result(None)
477                return
478
479    def locked(self):
480        """Returns True if semaphore can not be acquired immediately."""
481        return self._value == 0
482
483    async def acquire(self):
484        """Acquire a semaphore.
485
486        If the internal counter is larger than zero on entry,
487        decrement it by one and return True immediately.  If it is
488        zero on entry, block, waiting until some other coroutine has
489        called release() to make it larger than 0, and then return
490        True.
491        """
492        while self._value <= 0:
493            fut = self._loop.create_future()
494            self._waiters.append(fut)
495            try:
496                await fut
497            except:
498                # See the similar code in Queue.get.
499                fut.cancel()
500                if self._value > 0 and not fut.cancelled():
501                    self._wake_up_next()
502                raise
503        self._value -= 1
504        return True
505
506    def release(self):
507        """Release a semaphore, incrementing the internal counter by one.
508        When it was zero on entry and another coroutine is waiting for it to
509        become larger than zero again, wake up that coroutine.
510        """
511        self._value += 1
512        self._wake_up_next()
513
514
515class BoundedSemaphore(Semaphore):
516    """A bounded semaphore implementation.
517
518    This raises ValueError in release() if it would increase the value
519    above the initial value.
520    """
521
522    def __init__(self, value=1, *, loop=None):
523        if loop:
524            warnings.warn("The loop argument is deprecated since Python 3.8, "
525                          "and scheduled for removal in Python 3.10.",
526                          DeprecationWarning, stacklevel=2)
527
528        self._bound_value = value
529        super().__init__(value, loop=loop)
530
531    def release(self):
532        if self._value >= self._bound_value:
533            raise ValueError('BoundedSemaphore released too many times')
534        super().release()
535