1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from __future__ import absolute_import, division, print_function
16
17import collections
18from concurrent.futures import CancelledError
19
20from tornado import gen, ioloop
21from tornado.concurrent import Future, future_set_result_unless_cancelled
22
23__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
24
25
26class _TimeoutGarbageCollector(object):
27    """Base class for objects that periodically clean up timed-out waiters.
28
29    Avoids memory leak in a common pattern like:
30
31        while True:
32            yield condition.wait(short_timeout)
33            print('looping....')
34    """
35    def __init__(self):
36        self._waiters = collections.deque()  # Futures.
37        self._timeouts = 0
38
39    def _garbage_collect(self):
40        # Occasionally clear timed-out waiters.
41        self._timeouts += 1
42        if self._timeouts > 100:
43            self._timeouts = 0
44            self._waiters = collections.deque(
45                w for w in self._waiters if not w.done())
46
47
48class Condition(_TimeoutGarbageCollector):
49    """A condition allows one or more coroutines to wait until notified.
50
51    Like a standard `threading.Condition`, but does not need an underlying lock
52    that is acquired and released.
53
54    With a `Condition`, coroutines can wait to be notified by other coroutines:
55
56    .. testcode::
57
58        from tornado import gen
59        from tornado.ioloop import IOLoop
60        from tornado.locks import Condition
61
62        condition = Condition()
63
64        async def waiter():
65            print("I'll wait right here")
66            await condition.wait()
67            print("I'm done waiting")
68
69        async def notifier():
70            print("About to notify")
71            condition.notify()
72            print("Done notifying")
73
74        async def runner():
75            # Wait for waiter() and notifier() in parallel
76            await gen.multi([waiter(), notifier()])
77
78        IOLoop.current().run_sync(runner)
79
80    .. testoutput::
81
82        I'll wait right here
83        About to notify
84        Done notifying
85        I'm done waiting
86
87    `wait` takes an optional ``timeout`` argument, which is either an absolute
88    timestamp::
89
90        io_loop = IOLoop.current()
91
92        # Wait up to 1 second for a notification.
93        await condition.wait(timeout=io_loop.time() + 1)
94
95    ...or a `datetime.timedelta` for a timeout relative to the current time::
96
97        # Wait up to 1 second.
98        await condition.wait(timeout=datetime.timedelta(seconds=1))
99
100    The method returns False if there's no notification before the deadline.
101
102    .. versionchanged:: 5.0
103       Previously, waiters could be notified synchronously from within
104       `notify`. Now, the notification will always be received on the
105       next iteration of the `.IOLoop`.
106    """
107
108    def __init__(self):
109        super(Condition, self).__init__()
110        self.io_loop = ioloop.IOLoop.current()
111
112    def __repr__(self):
113        result = '<%s' % (self.__class__.__name__, )
114        if self._waiters:
115            result += ' waiters[%s]' % len(self._waiters)
116        return result + '>'
117
118    def wait(self, timeout=None):
119        """Wait for `.notify`.
120
121        Returns a `.Future` that resolves ``True`` if the condition is notified,
122        or ``False`` after a timeout.
123        """
124        waiter = Future()
125        self._waiters.append(waiter)
126        if timeout:
127            def on_timeout():
128                if not waiter.done():
129                    future_set_result_unless_cancelled(waiter, False)
130                self._garbage_collect()
131            io_loop = ioloop.IOLoop.current()
132            timeout_handle = io_loop.add_timeout(timeout, on_timeout)
133            waiter.add_done_callback(
134                lambda _: io_loop.remove_timeout(timeout_handle))
135        return waiter
136
137    def notify(self, n=1):
138        """Wake ``n`` waiters."""
139        waiters = []  # Waiters we plan to run right now.
140        while n and self._waiters:
141            waiter = self._waiters.popleft()
142            if not waiter.done():  # Might have timed out.
143                n -= 1
144                waiters.append(waiter)
145
146        for waiter in waiters:
147            future_set_result_unless_cancelled(waiter, True)
148
149    def notify_all(self):
150        """Wake all waiters."""
151        self.notify(len(self._waiters))
152
153
154class Event(object):
155    """An event blocks coroutines until its internal flag is set to True.
156
157    Similar to `threading.Event`.
158
159    A coroutine can wait for an event to be set. Once it is set, calls to
160    ``yield event.wait()`` will not block unless the event has been cleared:
161
162    .. testcode::
163
164        from tornado import gen
165        from tornado.ioloop import IOLoop
166        from tornado.locks import Event
167
168        event = Event()
169
170        async def waiter():
171            print("Waiting for event")
172            await event.wait()
173            print("Not waiting this time")
174            await event.wait()
175            print("Done")
176
177        async def setter():
178            print("About to set the event")
179            event.set()
180
181        async def runner():
182            await gen.multi([waiter(), setter()])
183
184        IOLoop.current().run_sync(runner)
185
186    .. testoutput::
187
188        Waiting for event
189        About to set the event
190        Not waiting this time
191        Done
192    """
193    def __init__(self):
194        self._value = False
195        self._waiters = set()
196
197    def __repr__(self):
198        return '<%s %s>' % (
199            self.__class__.__name__, 'set' if self.is_set() else 'clear')
200
201    def is_set(self):
202        """Return ``True`` if the internal flag is true."""
203        return self._value
204
205    def set(self):
206        """Set the internal flag to ``True``. All waiters are awakened.
207
208        Calling `.wait` once the flag is set will not block.
209        """
210        if not self._value:
211            self._value = True
212
213            for fut in self._waiters:
214                if not fut.done():
215                    fut.set_result(None)
216
217    def clear(self):
218        """Reset the internal flag to ``False``.
219
220        Calls to `.wait` will block until `.set` is called.
221        """
222        self._value = False
223
224    def wait(self, timeout=None):
225        """Block until the internal flag is true.
226
227        Returns a Future, which raises `tornado.util.TimeoutError` after a
228        timeout.
229        """
230        fut = Future()
231        if self._value:
232            fut.set_result(None)
233            return fut
234        self._waiters.add(fut)
235        fut.add_done_callback(lambda fut: self._waiters.remove(fut))
236        if timeout is None:
237            return fut
238        else:
239            timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,))
240            # This is a slightly clumsy workaround for the fact that
241            # gen.with_timeout doesn't cancel its futures. Cancelling
242            # fut will remove it from the waiters list.
243            timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None)
244            return timeout_fut
245
246
247class _ReleasingContextManager(object):
248    """Releases a Lock or Semaphore at the end of a "with" statement.
249
250        with (yield semaphore.acquire()):
251            pass
252
253        # Now semaphore.release() has been called.
254    """
255    def __init__(self, obj):
256        self._obj = obj
257
258    def __enter__(self):
259        pass
260
261    def __exit__(self, exc_type, exc_val, exc_tb):
262        self._obj.release()
263
264
265class Semaphore(_TimeoutGarbageCollector):
266    """A lock that can be acquired a fixed number of times before blocking.
267
268    A Semaphore manages a counter representing the number of `.release` calls
269    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
270    method blocks if necessary until it can return without making the counter
271    negative.
272
273    Semaphores limit access to a shared resource. To allow access for two
274    workers at a time:
275
276    .. testsetup:: semaphore
277
278       from collections import deque
279
280       from tornado import gen
281       from tornado.ioloop import IOLoop
282       from tornado.concurrent import Future
283
284       # Ensure reliable doctest output: resolve Futures one at a time.
285       futures_q = deque([Future() for _ in range(3)])
286
287       async def simulator(futures):
288           for f in futures:
289               # simulate the asynchronous passage of time
290               await gen.sleep(0)
291               await gen.sleep(0)
292               f.set_result(None)
293
294       IOLoop.current().add_callback(simulator, list(futures_q))
295
296       def use_some_resource():
297           return futures_q.popleft()
298
299    .. testcode:: semaphore
300
301        from tornado import gen
302        from tornado.ioloop import IOLoop
303        from tornado.locks import Semaphore
304
305        sem = Semaphore(2)
306
307        async def worker(worker_id):
308            await sem.acquire()
309            try:
310                print("Worker %d is working" % worker_id)
311                await use_some_resource()
312            finally:
313                print("Worker %d is done" % worker_id)
314                sem.release()
315
316        async def runner():
317            # Join all workers.
318            await gen.multi([worker(i) for i in range(3)])
319
320        IOLoop.current().run_sync(runner)
321
322    .. testoutput:: semaphore
323
324        Worker 0 is working
325        Worker 1 is working
326        Worker 0 is done
327        Worker 2 is working
328        Worker 1 is done
329        Worker 2 is done
330
331    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
332    the semaphore has been released once, by worker 0.
333
334    The semaphore can be used as an async context manager::
335
336        async def worker(worker_id):
337            async with sem:
338                print("Worker %d is working" % worker_id)
339                await use_some_resource()
340
341            # Now the semaphore has been released.
342            print("Worker %d is done" % worker_id)
343
344    For compatibility with older versions of Python, `.acquire` is a
345    context manager, so ``worker`` could also be written as::
346
347        @gen.coroutine
348        def worker(worker_id):
349            with (yield sem.acquire()):
350                print("Worker %d is working" % worker_id)
351                yield use_some_resource()
352
353            # Now the semaphore has been released.
354            print("Worker %d is done" % worker_id)
355
356    .. versionchanged:: 4.3
357       Added ``async with`` support in Python 3.5.
358
359    """
360    def __init__(self, value=1):
361        super(Semaphore, self).__init__()
362        if value < 0:
363            raise ValueError('semaphore initial value must be >= 0')
364
365        self._value = value
366
367    def __repr__(self):
368        res = super(Semaphore, self).__repr__()
369        extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
370            self._value)
371        if self._waiters:
372            extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
373        return '<{0} [{1}]>'.format(res[1:-1], extra)
374
375    def release(self):
376        """Increment the counter and wake one waiter."""
377        self._value += 1
378        while self._waiters:
379            waiter = self._waiters.popleft()
380            if not waiter.done():
381                self._value -= 1
382
383                # If the waiter is a coroutine paused at
384                #
385                #     with (yield semaphore.acquire()):
386                #
387                # then the context manager's __exit__ calls release() at the end
388                # of the "with" block.
389                waiter.set_result(_ReleasingContextManager(self))
390                break
391
392    def acquire(self, timeout=None):
393        """Decrement the counter. Returns a Future.
394
395        Block if the counter is zero and wait for a `.release`. The Future
396        raises `.TimeoutError` after the deadline.
397        """
398        waiter = Future()
399        if self._value > 0:
400            self._value -= 1
401            waiter.set_result(_ReleasingContextManager(self))
402        else:
403            self._waiters.append(waiter)
404            if timeout:
405                def on_timeout():
406                    if not waiter.done():
407                        waiter.set_exception(gen.TimeoutError())
408                    self._garbage_collect()
409                io_loop = ioloop.IOLoop.current()
410                timeout_handle = io_loop.add_timeout(timeout, on_timeout)
411                waiter.add_done_callback(
412                    lambda _: io_loop.remove_timeout(timeout_handle))
413        return waiter
414
415    def __enter__(self):
416        raise RuntimeError(
417            "Use Semaphore like 'with (yield semaphore.acquire())', not like"
418            " 'with semaphore'")
419
420    __exit__ = __enter__
421
422    @gen.coroutine
423    def __aenter__(self):
424        yield self.acquire()
425
426    @gen.coroutine
427    def __aexit__(self, typ, value, tb):
428        self.release()
429
430
431class BoundedSemaphore(Semaphore):
432    """A semaphore that prevents release() being called too many times.
433
434    If `.release` would increment the semaphore's value past the initial
435    value, it raises `ValueError`. Semaphores are mostly used to guard
436    resources with limited capacity, so a semaphore released too many times
437    is a sign of a bug.
438    """
439    def __init__(self, value=1):
440        super(BoundedSemaphore, self).__init__(value=value)
441        self._initial_value = value
442
443    def release(self):
444        """Increment the counter and wake one waiter."""
445        if self._value >= self._initial_value:
446            raise ValueError("Semaphore released too many times")
447        super(BoundedSemaphore, self).release()
448
449
450class Lock(object):
451    """A lock for coroutines.
452
453    A Lock begins unlocked, and `acquire` locks it immediately. While it is
454    locked, a coroutine that yields `acquire` waits until another coroutine
455    calls `release`.
456
457    Releasing an unlocked lock raises `RuntimeError`.
458
459    A Lock can be used as an async context manager with the ``async
460    with`` statement:
461
462    >>> from tornado import locks
463    >>> lock = locks.Lock()
464    >>>
465    >>> async def f():
466    ...    async with lock:
467    ...        # Do something holding the lock.
468    ...        pass
469    ...
470    ...    # Now the lock is released.
471
472    For compatibility with older versions of Python, the `.acquire`
473    method asynchronously returns a regular context manager:
474
475    >>> async def f2():
476    ...    with (yield lock.acquire()):
477    ...        # Do something holding the lock.
478    ...        pass
479    ...
480    ...    # Now the lock is released.
481
482    .. versionchanged:: 4.3
483       Added ``async with`` support in Python 3.5.
484
485    """
486    def __init__(self):
487        self._block = BoundedSemaphore(value=1)
488
489    def __repr__(self):
490        return "<%s _block=%s>" % (
491            self.__class__.__name__,
492            self._block)
493
494    def acquire(self, timeout=None):
495        """Attempt to lock. Returns a Future.
496
497        Returns a Future, which raises `tornado.util.TimeoutError` after a
498        timeout.
499        """
500        return self._block.acquire(timeout)
501
502    def release(self):
503        """Unlock.
504
505        The first coroutine in line waiting for `acquire` gets the lock.
506
507        If not locked, raise a `RuntimeError`.
508        """
509        try:
510            self._block.release()
511        except ValueError:
512            raise RuntimeError('release unlocked lock')
513
514    def __enter__(self):
515        raise RuntimeError(
516            "Use Lock like 'with (yield lock)', not like 'with lock'")
517
518    __exit__ = __enter__
519
520    @gen.coroutine
521    def __aenter__(self):
522        yield self.acquire()
523
524    @gen.coroutine
525    def __aexit__(self, typ, value, tb):
526        self.release()
527