1"""Synchronization primitives."""
2
3__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore']
4
5import collections
6
7from . import events
8from . import futures
9from .coroutines import coroutine
10
11
12class _ContextManager:
13    """Context manager.
14
15    This enables the following idiom for acquiring and releasing a
16    lock around a block:
17
18        with (yield from lock):
19            <block>
20
21    while failing loudly when accidentally using:
22
23        with lock:
24            <block>
25    """
26
27    def __init__(self, lock):
28        self._lock = lock
29
30    def __enter__(self):
31        # We have no use for the "as ..."  clause in the with
32        # statement for locks.
33        return None
34
35    def __exit__(self, *args):
36        try:
37            self._lock.release()
38        finally:
39            self._lock = None  # Crudely prevent reuse.
40
41
42class Lock:
43    """Primitive lock objects.
44
45    A primitive lock is a synchronization primitive that is not owned
46    by a particular coroutine when locked.  A primitive lock is in one
47    of two states, 'locked' or 'unlocked'.
48
49    It is created in the unlocked state.  It has two basic methods,
50    acquire() and release().  When the state is unlocked, acquire()
51    changes the state to locked and returns immediately.  When the
52    state is locked, acquire() blocks until a call to release() in
53    another coroutine changes it to unlocked, then the acquire() call
54    resets it to locked and returns.  The release() method should only
55    be called in the locked state; it changes the state to unlocked
56    and returns immediately.  If an attempt is made to release an
57    unlocked lock, a RuntimeError will be raised.
58
59    When more than one coroutine is blocked in acquire() waiting for
60    the state to turn to unlocked, only one coroutine proceeds when a
61    release() call resets the state to unlocked; first coroutine which
62    is blocked in acquire() is being processed.
63
64    acquire() is a coroutine and should be called with 'yield from'.
65
66    Locks also support the context management protocol.  '(yield from lock)'
67    should be used as context manager expression.
68
69    Usage:
70
71        lock = Lock()
72        ...
73        yield from lock
74        try:
75            ...
76        finally:
77            lock.release()
78
79    Context manager usage:
80
81        lock = Lock()
82        ...
83        with (yield from lock):
84             ...
85
86    Lock objects can be tested for locking state:
87
88        if not lock.locked():
89           yield from lock
90        else:
91           # lock is acquired
92           ...
93
94    """
95
96    def __init__(self, *, loop=None):
97        self._waiters = collections.deque()
98        self._locked = False
99        if loop is not None:
100            self._loop = loop
101        else:
102            self._loop = events.get_event_loop()
103
104    def __repr__(self):
105        res = super().__repr__()
106        extra = 'locked' if self._locked else 'unlocked'
107        if self._waiters:
108            extra = '{},waiters:{}'.format(extra, len(self._waiters))
109        return '<{} [{}]>'.format(res[1:-1], extra)
110
111    def locked(self):
112        """Return True if lock is acquired."""
113        return self._locked
114
115    @coroutine
116    def acquire(self):
117        """Acquire a lock.
118
119        This method blocks until the lock is unlocked, then sets it to
120        locked and returns True.
121        """
122        if not self._waiters and not self._locked:
123            self._locked = True
124            return True
125
126        fut = futures.Future(loop=self._loop)
127        self._waiters.append(fut)
128        try:
129            yield from fut
130            self._locked = True
131            return True
132        finally:
133            self._waiters.remove(fut)
134
135    def release(self):
136        """Release a lock.
137
138        When the lock is locked, reset it to unlocked, and return.
139        If any other coroutines are blocked waiting for the lock to become
140        unlocked, allow exactly one of them to proceed.
141
142        When invoked on an unlocked lock, a RuntimeError is raised.
143
144        There is no return value.
145        """
146        if self._locked:
147            self._locked = False
148            # Wake up the first waiter who isn't cancelled.
149            for fut in self._waiters:
150                if not fut.done():
151                    fut.set_result(True)
152                    break
153        else:
154            raise RuntimeError('Lock is not acquired.')
155
156    def __enter__(self):
157        raise RuntimeError(
158            '"yield from" should be used as context manager expression')
159
160    def __exit__(self, *args):
161        # This must exist because __enter__ exists, even though that
162        # always raises; that's how the with-statement works.
163        pass
164
165    def __iter__(self):
166        # This is not a coroutine.  It is meant to enable the idiom:
167        #
168        #     with (yield from lock):
169        #         <block>
170        #
171        # as an alternative to:
172        #
173        #     yield from lock.acquire()
174        #     try:
175        #         <block>
176        #     finally:
177        #         lock.release()
178        yield from self.acquire()
179        return _ContextManager(self)
180
181
182class Event:
183    """Asynchronous equivalent to threading.Event.
184
185    Class implementing event objects. An event manages a flag that can be set
186    to true with the set() method and reset to false with the clear() method.
187    The wait() method blocks until the flag is true. The flag is initially
188    false.
189    """
190
191    def __init__(self, *, loop=None):
192        self._waiters = collections.deque()
193        self._value = False
194        if loop is not None:
195            self._loop = loop
196        else:
197            self._loop = events.get_event_loop()
198
199    def __repr__(self):
200        res = super().__repr__()
201        extra = 'set' if self._value else 'unset'
202        if self._waiters:
203            extra = '{},waiters:{}'.format(extra, len(self._waiters))
204        return '<{} [{}]>'.format(res[1:-1], extra)
205
206    def is_set(self):
207        """Return True if and only if the internal flag is true."""
208        return self._value
209
210    def set(self):
211        """Set the internal flag to true. All coroutines waiting for it to
212        become true are awakened. Coroutine that call wait() once the flag is
213        true will not block at all.
214        """
215        if not self._value:
216            self._value = True
217
218            for fut in self._waiters:
219                if not fut.done():
220                    fut.set_result(True)
221
222    def clear(self):
223        """Reset the internal flag to false. Subsequently, coroutines calling
224        wait() will block until set() is called to set the internal flag
225        to true again."""
226        self._value = False
227
228    @coroutine
229    def wait(self):
230        """Block until the internal flag is true.
231
232        If the internal flag is true on entry, return True
233        immediately.  Otherwise, block until another coroutine calls
234        set() to set the flag to true, then return True.
235        """
236        if self._value:
237            return True
238
239        fut = futures.Future(loop=self._loop)
240        self._waiters.append(fut)
241        try:
242            yield from fut
243            return True
244        finally:
245            self._waiters.remove(fut)
246
247
248class Condition:
249    """Asynchronous equivalent to threading.Condition.
250
251    This class implements condition variable objects. A condition variable
252    allows one or more coroutines to wait until they are notified by another
253    coroutine.
254
255    A new Lock object is created and used as the underlying lock.
256    """
257
258    def __init__(self, lock=None, *, loop=None):
259        if loop is not None:
260            self._loop = loop
261        else:
262            self._loop = events.get_event_loop()
263
264        if lock is None:
265            lock = Lock(loop=self._loop)
266        elif lock._loop is not self._loop:
267            raise ValueError("loop argument must agree with lock")
268
269        self._lock = lock
270        # Export the lock's locked(), acquire() and release() methods.
271        self.locked = lock.locked
272        self.acquire = lock.acquire
273        self.release = lock.release
274
275        self._waiters = collections.deque()
276
277    def __repr__(self):
278        res = super().__repr__()
279        extra = 'locked' if self.locked() else 'unlocked'
280        if self._waiters:
281            extra = '{},waiters:{}'.format(extra, len(self._waiters))
282        return '<{} [{}]>'.format(res[1:-1], extra)
283
284    @coroutine
285    def wait(self):
286        """Wait until notified.
287
288        If the calling coroutine has not acquired the lock when this
289        method is called, a RuntimeError is raised.
290
291        This method releases the underlying lock, and then blocks
292        until it is awakened by a notify() or notify_all() call for
293        the same condition variable in another coroutine.  Once
294        awakened, it re-acquires the lock and returns True.
295        """
296        if not self.locked():
297            raise RuntimeError('cannot wait on un-acquired lock')
298
299        self.release()
300        try:
301            fut = futures.Future(loop=self._loop)
302            self._waiters.append(fut)
303            try:
304                yield from fut
305                return True
306            finally:
307                self._waiters.remove(fut)
308
309        finally:
310            yield from self.acquire()
311
312    @coroutine
313    def wait_for(self, predicate):
314        """Wait until a predicate becomes true.
315
316        The predicate should be a callable which result will be
317        interpreted as a boolean value.  The final predicate value is
318        the return value.
319        """
320        result = predicate()
321        while not result:
322            yield from self.wait()
323            result = predicate()
324        return result
325
326    def notify(self, n=1):
327        """By default, wake up one coroutine waiting on this condition, if any.
328        If the calling coroutine has not acquired the lock when this method
329        is called, a RuntimeError is raised.
330
331        This method wakes up at most n of the coroutines waiting for the
332        condition variable; it is a no-op if no coroutines are waiting.
333
334        Note: an awakened coroutine does not actually return from its
335        wait() call until it can reacquire the lock. Since notify() does
336        not release the lock, its caller should.
337        """
338        if not self.locked():
339            raise RuntimeError('cannot notify on un-acquired lock')
340
341        idx = 0
342        for fut in self._waiters:
343            if idx >= n:
344                break
345
346            if not fut.done():
347                idx += 1
348                fut.set_result(False)
349
350    def notify_all(self):
351        """Wake up all threads waiting on this condition. This method acts
352        like notify(), but wakes up all waiting threads instead of one. If the
353        calling thread has not acquired the lock when this method is called,
354        a RuntimeError is raised.
355        """
356        self.notify(len(self._waiters))
357
358    def __enter__(self):
359        raise RuntimeError(
360            '"yield from" should be used as context manager expression')
361
362    def __exit__(self, *args):
363        pass
364
365    def __iter__(self):
366        # See comment in Lock.__iter__().
367        yield from self.acquire()
368        return _ContextManager(self)
369
370
371class Semaphore:
372    """A Semaphore implementation.
373
374    A semaphore manages an internal counter which is decremented by each
375    acquire() call and incremented by each release() call. The counter
376    can never go below zero; when acquire() finds that it is zero, it blocks,
377    waiting until some other thread calls release().
378
379    Semaphores also support the context management protocol.
380
381    The optional argument gives the initial value for the internal
382    counter; it defaults to 1. If the value given is less than 0,
383    ValueError is raised.
384    """
385
386    def __init__(self, value=1, *, loop=None):
387        if value < 0:
388            raise ValueError("Semaphore initial value must be >= 0")
389        self._value = value
390        self._waiters = collections.deque()
391        if loop is not None:
392            self._loop = loop
393        else:
394            self._loop = events.get_event_loop()
395
396    def __repr__(self):
397        res = super().__repr__()
398        extra = 'locked' if self.locked() else 'unlocked,value:{}'.format(
399            self._value)
400        if self._waiters:
401            extra = '{},waiters:{}'.format(extra, len(self._waiters))
402        return '<{} [{}]>'.format(res[1:-1], extra)
403
404    def locked(self):
405        """Returns True if semaphore can not be acquired immediately."""
406        return self._value == 0
407
408    @coroutine
409    def acquire(self):
410        """Acquire a semaphore.
411
412        If the internal counter is larger than zero on entry,
413        decrement it by one and return True immediately.  If it is
414        zero on entry, block, waiting until some other coroutine has
415        called release() to make it larger than 0, and then return
416        True.
417        """
418        if not self._waiters and self._value > 0:
419            self._value -= 1
420            return True
421
422        fut = futures.Future(loop=self._loop)
423        self._waiters.append(fut)
424        try:
425            yield from fut
426            self._value -= 1
427            return True
428        finally:
429            self._waiters.remove(fut)
430
431    def release(self):
432        """Release a semaphore, incrementing the internal counter by one.
433        When it was zero on entry and another coroutine is waiting for it to
434        become larger than zero again, wake up that coroutine.
435        """
436        self._value += 1
437        for waiter in self._waiters:
438            if not waiter.done():
439                waiter.set_result(True)
440                break
441
442    def __enter__(self):
443        raise RuntimeError(
444            '"yield from" should be used as context manager expression')
445
446    def __exit__(self, *args):
447        pass
448
449    def __iter__(self):
450        # See comment in Lock.__iter__().
451        yield from self.acquire()
452        return _ContextManager(self)
453
454
455class BoundedSemaphore(Semaphore):
456    """A bounded semaphore implementation.
457
458    This raises ValueError in release() if it would increase the value
459    above the initial value.
460    """
461
462    def __init__(self, value=1, *, loop=None):
463        self._bound_value = value
464        super().__init__(value, loop=loop)
465
466    def release(self):
467        if self._value >= self._bound_value:
468            raise ValueError('BoundedSemaphore released too many times')
469        super().release()
470