1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15import collections
16import datetime
17import types
18
19from tornado import gen, ioloop
20from tornado.concurrent import Future, future_set_result_unless_cancelled
21
22from typing import Union, Optional, Type, Any, Awaitable
23import typing
24
25if typing.TYPE_CHECKING:
26    from typing import Deque, Set  # noqa: F401
27
28__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"]
29
30
31class _TimeoutGarbageCollector(object):
32    """Base class for objects that periodically clean up timed-out waiters.
33
34    Avoids memory leak in a common pattern like:
35
36        while True:
37            yield condition.wait(short_timeout)
38            print('looping....')
39    """
40
41    def __init__(self) -> None:
42        self._waiters = collections.deque()  # type: Deque[Future]
43        self._timeouts = 0
44
45    def _garbage_collect(self) -> None:
46        # Occasionally clear timed-out waiters.
47        self._timeouts += 1
48        if self._timeouts > 100:
49            self._timeouts = 0
50            self._waiters = collections.deque(w for w in self._waiters if not w.done())
51
52
53class Condition(_TimeoutGarbageCollector):
54    """A condition allows one or more coroutines to wait until notified.
55
56    Like a standard `threading.Condition`, but does not need an underlying lock
57    that is acquired and released.
58
59    With a `Condition`, coroutines can wait to be notified by other coroutines:
60
61    .. testcode::
62
63        from tornado import gen
64        from tornado.ioloop import IOLoop
65        from tornado.locks import Condition
66
67        condition = Condition()
68
69        async def waiter():
70            print("I'll wait right here")
71            await condition.wait()
72            print("I'm done waiting")
73
74        async def notifier():
75            print("About to notify")
76            condition.notify()
77            print("Done notifying")
78
79        async def runner():
80            # Wait for waiter() and notifier() in parallel
81            await gen.multi([waiter(), notifier()])
82
83        IOLoop.current().run_sync(runner)
84
85    .. testoutput::
86
87        I'll wait right here
88        About to notify
89        Done notifying
90        I'm done waiting
91
92    `wait` takes an optional ``timeout`` argument, which is either an absolute
93    timestamp::
94
95        io_loop = IOLoop.current()
96
97        # Wait up to 1 second for a notification.
98        await condition.wait(timeout=io_loop.time() + 1)
99
100    ...or a `datetime.timedelta` for a timeout relative to the current time::
101
102        # Wait up to 1 second.
103        await condition.wait(timeout=datetime.timedelta(seconds=1))
104
105    The method returns False if there's no notification before the deadline.
106
107    .. versionchanged:: 5.0
108       Previously, waiters could be notified synchronously from within
109       `notify`. Now, the notification will always be received on the
110       next iteration of the `.IOLoop`.
111    """
112
113    def __init__(self) -> None:
114        super().__init__()
115        self.io_loop = ioloop.IOLoop.current()
116
117    def __repr__(self) -> str:
118        result = "<%s" % (self.__class__.__name__,)
119        if self._waiters:
120            result += " waiters[%s]" % len(self._waiters)
121        return result + ">"
122
123    def wait(
124        self, timeout: Optional[Union[float, datetime.timedelta]] = None
125    ) -> Awaitable[bool]:
126        """Wait for `.notify`.
127
128        Returns a `.Future` that resolves ``True`` if the condition is notified,
129        or ``False`` after a timeout.
130        """
131        waiter = Future()  # type: Future[bool]
132        self._waiters.append(waiter)
133        if timeout:
134
135            def on_timeout() -> None:
136                if not waiter.done():
137                    future_set_result_unless_cancelled(waiter, False)
138                self._garbage_collect()
139
140            io_loop = ioloop.IOLoop.current()
141            timeout_handle = io_loop.add_timeout(timeout, on_timeout)
142            waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle))
143        return waiter
144
145    def notify(self, n: int = 1) -> None:
146        """Wake ``n`` waiters."""
147        waiters = []  # Waiters we plan to run right now.
148        while n and self._waiters:
149            waiter = self._waiters.popleft()
150            if not waiter.done():  # Might have timed out.
151                n -= 1
152                waiters.append(waiter)
153
154        for waiter in waiters:
155            future_set_result_unless_cancelled(waiter, True)
156
157    def notify_all(self) -> None:
158        """Wake all waiters."""
159        self.notify(len(self._waiters))
160
161
162class Event(object):
163    """An event blocks coroutines until its internal flag is set to True.
164
165    Similar to `threading.Event`.
166
167    A coroutine can wait for an event to be set. Once it is set, calls to
168    ``yield event.wait()`` will not block unless the event has been cleared:
169
170    .. testcode::
171
172        from tornado import gen
173        from tornado.ioloop import IOLoop
174        from tornado.locks import Event
175
176        event = Event()
177
178        async def waiter():
179            print("Waiting for event")
180            await event.wait()
181            print("Not waiting this time")
182            await event.wait()
183            print("Done")
184
185        async def setter():
186            print("About to set the event")
187            event.set()
188
189        async def runner():
190            await gen.multi([waiter(), setter()])
191
192        IOLoop.current().run_sync(runner)
193
194    .. testoutput::
195
196        Waiting for event
197        About to set the event
198        Not waiting this time
199        Done
200    """
201
202    def __init__(self) -> None:
203        self._value = False
204        self._waiters = set()  # type: Set[Future[None]]
205
206    def __repr__(self) -> str:
207        return "<%s %s>" % (
208            self.__class__.__name__,
209            "set" if self.is_set() else "clear",
210        )
211
212    def is_set(self) -> bool:
213        """Return ``True`` if the internal flag is true."""
214        return self._value
215
216    def set(self) -> None:
217        """Set the internal flag to ``True``. All waiters are awakened.
218
219        Calling `.wait` once the flag is set will not block.
220        """
221        if not self._value:
222            self._value = True
223
224            for fut in self._waiters:
225                if not fut.done():
226                    fut.set_result(None)
227
228    def clear(self) -> None:
229        """Reset the internal flag to ``False``.
230
231        Calls to `.wait` will block until `.set` is called.
232        """
233        self._value = False
234
235    def wait(
236        self, timeout: Optional[Union[float, datetime.timedelta]] = None
237    ) -> Awaitable[None]:
238        """Block until the internal flag is true.
239
240        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
241        timeout.
242        """
243        fut = Future()  # type: Future[None]
244        if self._value:
245            fut.set_result(None)
246            return fut
247        self._waiters.add(fut)
248        fut.add_done_callback(lambda fut: self._waiters.remove(fut))
249        if timeout is None:
250            return fut
251        else:
252            timeout_fut = gen.with_timeout(timeout, fut)
253            # This is a slightly clumsy workaround for the fact that
254            # gen.with_timeout doesn't cancel its futures. Cancelling
255            # fut will remove it from the waiters list.
256            timeout_fut.add_done_callback(
257                lambda tf: fut.cancel() if not fut.done() else None
258            )
259            return timeout_fut
260
261
262class _ReleasingContextManager(object):
263    """Releases a Lock or Semaphore at the end of a "with" statement.
264
265        with (yield semaphore.acquire()):
266            pass
267
268        # Now semaphore.release() has been called.
269    """
270
271    def __init__(self, obj: Any) -> None:
272        self._obj = obj
273
274    def __enter__(self) -> None:
275        pass
276
277    def __exit__(
278        self,
279        exc_type: "Optional[Type[BaseException]]",
280        exc_val: Optional[BaseException],
281        exc_tb: Optional[types.TracebackType],
282    ) -> None:
283        self._obj.release()
284
285
286class Semaphore(_TimeoutGarbageCollector):
287    """A lock that can be acquired a fixed number of times before blocking.
288
289    A Semaphore manages a counter representing the number of `.release` calls
290    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
291    method blocks if necessary until it can return without making the counter
292    negative.
293
294    Semaphores limit access to a shared resource. To allow access for two
295    workers at a time:
296
297    .. testsetup:: semaphore
298
299       from collections import deque
300
301       from tornado import gen
302       from tornado.ioloop import IOLoop
303       from tornado.concurrent import Future
304
305       # Ensure reliable doctest output: resolve Futures one at a time.
306       futures_q = deque([Future() for _ in range(3)])
307
308       async def simulator(futures):
309           for f in futures:
310               # simulate the asynchronous passage of time
311               await gen.sleep(0)
312               await gen.sleep(0)
313               f.set_result(None)
314
315       IOLoop.current().add_callback(simulator, list(futures_q))
316
317       def use_some_resource():
318           return futures_q.popleft()
319
320    .. testcode:: semaphore
321
322        from tornado import gen
323        from tornado.ioloop import IOLoop
324        from tornado.locks import Semaphore
325
326        sem = Semaphore(2)
327
328        async def worker(worker_id):
329            await sem.acquire()
330            try:
331                print("Worker %d is working" % worker_id)
332                await use_some_resource()
333            finally:
334                print("Worker %d is done" % worker_id)
335                sem.release()
336
337        async def runner():
338            # Join all workers.
339            await gen.multi([worker(i) for i in range(3)])
340
341        IOLoop.current().run_sync(runner)
342
343    .. testoutput:: semaphore
344
345        Worker 0 is working
346        Worker 1 is working
347        Worker 0 is done
348        Worker 2 is working
349        Worker 1 is done
350        Worker 2 is done
351
352    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
353    the semaphore has been released once, by worker 0.
354
355    The semaphore can be used as an async context manager::
356
357        async def worker(worker_id):
358            async with sem:
359                print("Worker %d is working" % worker_id)
360                await use_some_resource()
361
362            # Now the semaphore has been released.
363            print("Worker %d is done" % worker_id)
364
365    For compatibility with older versions of Python, `.acquire` is a
366    context manager, so ``worker`` could also be written as::
367
368        @gen.coroutine
369        def worker(worker_id):
370            with (yield sem.acquire()):
371                print("Worker %d is working" % worker_id)
372                yield use_some_resource()
373
374            # Now the semaphore has been released.
375            print("Worker %d is done" % worker_id)
376
377    .. versionchanged:: 4.3
378       Added ``async with`` support in Python 3.5.
379
380    """
381
382    def __init__(self, value: int = 1) -> None:
383        super().__init__()
384        if value < 0:
385            raise ValueError("semaphore initial value must be >= 0")
386
387        self._value = value
388
389    def __repr__(self) -> str:
390        res = super().__repr__()
391        extra = (
392            "locked" if self._value == 0 else "unlocked,value:{0}".format(self._value)
393        )
394        if self._waiters:
395            extra = "{0},waiters:{1}".format(extra, len(self._waiters))
396        return "<{0} [{1}]>".format(res[1:-1], extra)
397
398    def release(self) -> None:
399        """Increment the counter and wake one waiter."""
400        self._value += 1
401        while self._waiters:
402            waiter = self._waiters.popleft()
403            if not waiter.done():
404                self._value -= 1
405
406                # If the waiter is a coroutine paused at
407                #
408                #     with (yield semaphore.acquire()):
409                #
410                # then the context manager's __exit__ calls release() at the end
411                # of the "with" block.
412                waiter.set_result(_ReleasingContextManager(self))
413                break
414
415    def acquire(
416        self, timeout: Optional[Union[float, datetime.timedelta]] = None
417    ) -> Awaitable[_ReleasingContextManager]:
418        """Decrement the counter. Returns an awaitable.
419
420        Block if the counter is zero and wait for a `.release`. The awaitable
421        raises `.TimeoutError` after the deadline.
422        """
423        waiter = Future()  # type: Future[_ReleasingContextManager]
424        if self._value > 0:
425            self._value -= 1
426            waiter.set_result(_ReleasingContextManager(self))
427        else:
428            self._waiters.append(waiter)
429            if timeout:
430
431                def on_timeout() -> None:
432                    if not waiter.done():
433                        waiter.set_exception(gen.TimeoutError())
434                    self._garbage_collect()
435
436                io_loop = ioloop.IOLoop.current()
437                timeout_handle = io_loop.add_timeout(timeout, on_timeout)
438                waiter.add_done_callback(
439                    lambda _: io_loop.remove_timeout(timeout_handle)
440                )
441        return waiter
442
443    def __enter__(self) -> None:
444        raise RuntimeError("Use 'async with' instead of 'with' for Semaphore")
445
446    def __exit__(
447        self,
448        typ: "Optional[Type[BaseException]]",
449        value: Optional[BaseException],
450        traceback: Optional[types.TracebackType],
451    ) -> None:
452        self.__enter__()
453
454    async def __aenter__(self) -> None:
455        await self.acquire()
456
457    async def __aexit__(
458        self,
459        typ: "Optional[Type[BaseException]]",
460        value: Optional[BaseException],
461        tb: Optional[types.TracebackType],
462    ) -> None:
463        self.release()
464
465
466class BoundedSemaphore(Semaphore):
467    """A semaphore that prevents release() being called too many times.
468
469    If `.release` would increment the semaphore's value past the initial
470    value, it raises `ValueError`. Semaphores are mostly used to guard
471    resources with limited capacity, so a semaphore released too many times
472    is a sign of a bug.
473    """
474
475    def __init__(self, value: int = 1) -> None:
476        super().__init__(value=value)
477        self._initial_value = value
478
479    def release(self) -> None:
480        """Increment the counter and wake one waiter."""
481        if self._value >= self._initial_value:
482            raise ValueError("Semaphore released too many times")
483        super().release()
484
485
486class Lock(object):
487    """A lock for coroutines.
488
489    A Lock begins unlocked, and `acquire` locks it immediately. While it is
490    locked, a coroutine that yields `acquire` waits until another coroutine
491    calls `release`.
492
493    Releasing an unlocked lock raises `RuntimeError`.
494
495    A Lock can be used as an async context manager with the ``async
496    with`` statement:
497
498    >>> from tornado import locks
499    >>> lock = locks.Lock()
500    >>>
501    >>> async def f():
502    ...    async with lock:
503    ...        # Do something holding the lock.
504    ...        pass
505    ...
506    ...    # Now the lock is released.
507
508    For compatibility with older versions of Python, the `.acquire`
509    method asynchronously returns a regular context manager:
510
511    >>> async def f2():
512    ...    with (yield lock.acquire()):
513    ...        # Do something holding the lock.
514    ...        pass
515    ...
516    ...    # Now the lock is released.
517
518    .. versionchanged:: 4.3
519       Added ``async with`` support in Python 3.5.
520
521    """
522
523    def __init__(self) -> None:
524        self._block = BoundedSemaphore(value=1)
525
526    def __repr__(self) -> str:
527        return "<%s _block=%s>" % (self.__class__.__name__, self._block)
528
529    def acquire(
530        self, timeout: Optional[Union[float, datetime.timedelta]] = None
531    ) -> Awaitable[_ReleasingContextManager]:
532        """Attempt to lock. Returns an awaitable.
533
534        Returns an awaitable, which raises `tornado.util.TimeoutError` after a
535        timeout.
536        """
537        return self._block.acquire(timeout)
538
539    def release(self) -> None:
540        """Unlock.
541
542        The first coroutine in line waiting for `acquire` gets the lock.
543
544        If not locked, raise a `RuntimeError`.
545        """
546        try:
547            self._block.release()
548        except ValueError:
549            raise RuntimeError("release unlocked lock")
550
551    def __enter__(self) -> None:
552        raise RuntimeError("Use `async with` instead of `with` for Lock")
553
554    def __exit__(
555        self,
556        typ: "Optional[Type[BaseException]]",
557        value: Optional[BaseException],
558        tb: Optional[types.TracebackType],
559    ) -> None:
560        self.__enter__()
561
562    async def __aenter__(self) -> None:
563        await self.acquire()
564
565    async def __aexit__(
566        self,
567        typ: "Optional[Type[BaseException]]",
568        value: Optional[BaseException],
569        tb: Optional[types.TracebackType],
570    ) -> None:
571        self.release()
572