1# Copyright 2015 The Tornado Authors
2#
3# Licensed under the Apache License, Version 2.0 (the "License"); you may
4# not use this file except in compliance with the License. You may obtain
5# a copy of the License at
6#
7#     http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
12# License for the specific language governing permissions and limitations
13# under the License.
14
15from __future__ import absolute_import, division, print_function
16
17import collections
18
19from tornado import gen, ioloop
20from tornado.concurrent import Future
21
22__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock']
23
24
25class _TimeoutGarbageCollector(object):
26    """Base class for objects that periodically clean up timed-out waiters.
27
28    Avoids memory leak in a common pattern like:
29
30        while True:
31            yield condition.wait(short_timeout)
32            print('looping....')
33    """
34    def __init__(self):
35        self._waiters = collections.deque()  # Futures.
36        self._timeouts = 0
37
38    def _garbage_collect(self):
39        # Occasionally clear timed-out waiters.
40        self._timeouts += 1
41        if self._timeouts > 100:
42            self._timeouts = 0
43            self._waiters = collections.deque(
44                w for w in self._waiters if not w.done())
45
46
47class Condition(_TimeoutGarbageCollector):
48    """A condition allows one or more coroutines to wait until notified.
49
50    Like a standard `threading.Condition`, but does not need an underlying lock
51    that is acquired and released.
52
53    With a `Condition`, coroutines can wait to be notified by other coroutines:
54
55    .. testcode::
56
57        from tornado import gen
58        from tornado.ioloop import IOLoop
59        from tornado.locks import Condition
60
61        condition = Condition()
62
63        @gen.coroutine
64        def waiter():
65            print("I'll wait right here")
66            yield condition.wait()  # Yield a Future.
67            print("I'm done waiting")
68
69        @gen.coroutine
70        def notifier():
71            print("About to notify")
72            condition.notify()
73            print("Done notifying")
74
75        @gen.coroutine
76        def runner():
77            # Yield two Futures; wait for waiter() and notifier() to finish.
78            yield [waiter(), notifier()]
79
80        IOLoop.current().run_sync(runner)
81
82    .. testoutput::
83
84        I'll wait right here
85        About to notify
86        Done notifying
87        I'm done waiting
88
89    `wait` takes an optional ``timeout`` argument, which is either an absolute
90    timestamp::
91
92        io_loop = IOLoop.current()
93
94        # Wait up to 1 second for a notification.
95        yield condition.wait(timeout=io_loop.time() + 1)
96
97    ...or a `datetime.timedelta` for a timeout relative to the current time::
98
99        # Wait up to 1 second.
100        yield condition.wait(timeout=datetime.timedelta(seconds=1))
101
102    The method raises `tornado.gen.TimeoutError` if there's no notification
103    before the deadline.
104    """
105
106    def __init__(self):
107        super(Condition, self).__init__()
108        self.io_loop = ioloop.IOLoop.current()
109
110    def __repr__(self):
111        result = '<%s' % (self.__class__.__name__, )
112        if self._waiters:
113            result += ' waiters[%s]' % len(self._waiters)
114        return result + '>'
115
116    def wait(self, timeout=None):
117        """Wait for `.notify`.
118
119        Returns a `.Future` that resolves ``True`` if the condition is notified,
120        or ``False`` after a timeout.
121        """
122        waiter = Future()
123        self._waiters.append(waiter)
124        if timeout:
125            def on_timeout():
126                waiter.set_result(False)
127                self._garbage_collect()
128            io_loop = ioloop.IOLoop.current()
129            timeout_handle = io_loop.add_timeout(timeout, on_timeout)
130            waiter.add_done_callback(
131                lambda _: io_loop.remove_timeout(timeout_handle))
132        return waiter
133
134    def notify(self, n=1):
135        """Wake ``n`` waiters."""
136        waiters = []  # Waiters we plan to run right now.
137        while n and self._waiters:
138            waiter = self._waiters.popleft()
139            if not waiter.done():  # Might have timed out.
140                n -= 1
141                waiters.append(waiter)
142
143        for waiter in waiters:
144            waiter.set_result(True)
145
146    def notify_all(self):
147        """Wake all waiters."""
148        self.notify(len(self._waiters))
149
150
151class Event(object):
152    """An event blocks coroutines until its internal flag is set to True.
153
154    Similar to `threading.Event`.
155
156    A coroutine can wait for an event to be set. Once it is set, calls to
157    ``yield event.wait()`` will not block unless the event has been cleared:
158
159    .. testcode::
160
161        from tornado import gen
162        from tornado.ioloop import IOLoop
163        from tornado.locks import Event
164
165        event = Event()
166
167        @gen.coroutine
168        def waiter():
169            print("Waiting for event")
170            yield event.wait()
171            print("Not waiting this time")
172            yield event.wait()
173            print("Done")
174
175        @gen.coroutine
176        def setter():
177            print("About to set the event")
178            event.set()
179
180        @gen.coroutine
181        def runner():
182            yield [waiter(), setter()]
183
184        IOLoop.current().run_sync(runner)
185
186    .. testoutput::
187
188        Waiting for event
189        About to set the event
190        Not waiting this time
191        Done
192    """
193    def __init__(self):
194        self._future = Future()
195
196    def __repr__(self):
197        return '<%s %s>' % (
198            self.__class__.__name__, 'set' if self.is_set() else 'clear')
199
200    def is_set(self):
201        """Return ``True`` if the internal flag is true."""
202        return self._future.done()
203
204    def set(self):
205        """Set the internal flag to ``True``. All waiters are awakened.
206
207        Calling `.wait` once the flag is set will not block.
208        """
209        if not self._future.done():
210            self._future.set_result(None)
211
212    def clear(self):
213        """Reset the internal flag to ``False``.
214
215        Calls to `.wait` will block until `.set` is called.
216        """
217        if self._future.done():
218            self._future = Future()
219
220    def wait(self, timeout=None):
221        """Block until the internal flag is true.
222
223        Returns a Future, which raises `tornado.gen.TimeoutError` after a
224        timeout.
225        """
226        if timeout is None:
227            return self._future
228        else:
229            return gen.with_timeout(timeout, self._future)
230
231
232class _ReleasingContextManager(object):
233    """Releases a Lock or Semaphore at the end of a "with" statement.
234
235        with (yield semaphore.acquire()):
236            pass
237
238        # Now semaphore.release() has been called.
239    """
240    def __init__(self, obj):
241        self._obj = obj
242
243    def __enter__(self):
244        pass
245
246    def __exit__(self, exc_type, exc_val, exc_tb):
247        self._obj.release()
248
249
250class Semaphore(_TimeoutGarbageCollector):
251    """A lock that can be acquired a fixed number of times before blocking.
252
253    A Semaphore manages a counter representing the number of `.release` calls
254    minus the number of `.acquire` calls, plus an initial value. The `.acquire`
255    method blocks if necessary until it can return without making the counter
256    negative.
257
258    Semaphores limit access to a shared resource. To allow access for two
259    workers at a time:
260
261    .. testsetup:: semaphore
262
263       from collections import deque
264
265       from tornado import gen
266       from tornado.ioloop import IOLoop
267       from tornado.concurrent import Future
268
269       # Ensure reliable doctest output: resolve Futures one at a time.
270       futures_q = deque([Future() for _ in range(3)])
271
272       @gen.coroutine
273       def simulator(futures):
274           for f in futures:
275               yield gen.moment
276               f.set_result(None)
277
278       IOLoop.current().add_callback(simulator, list(futures_q))
279
280       def use_some_resource():
281           return futures_q.popleft()
282
283    .. testcode:: semaphore
284
285        from tornado import gen
286        from tornado.ioloop import IOLoop
287        from tornado.locks import Semaphore
288
289        sem = Semaphore(2)
290
291        @gen.coroutine
292        def worker(worker_id):
293            yield sem.acquire()
294            try:
295                print("Worker %d is working" % worker_id)
296                yield use_some_resource()
297            finally:
298                print("Worker %d is done" % worker_id)
299                sem.release()
300
301        @gen.coroutine
302        def runner():
303            # Join all workers.
304            yield [worker(i) for i in range(3)]
305
306        IOLoop.current().run_sync(runner)
307
308    .. testoutput:: semaphore
309
310        Worker 0 is working
311        Worker 1 is working
312        Worker 0 is done
313        Worker 2 is working
314        Worker 1 is done
315        Worker 2 is done
316
317    Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until
318    the semaphore has been released once, by worker 0.
319
320    `.acquire` is a context manager, so ``worker`` could be written as::
321
322        @gen.coroutine
323        def worker(worker_id):
324            with (yield sem.acquire()):
325                print("Worker %d is working" % worker_id)
326                yield use_some_resource()
327
328            # Now the semaphore has been released.
329            print("Worker %d is done" % worker_id)
330
331    In Python 3.5, the semaphore itself can be used as an async context
332    manager::
333
334        async def worker(worker_id):
335            async with sem:
336                print("Worker %d is working" % worker_id)
337                await use_some_resource()
338
339            # Now the semaphore has been released.
340            print("Worker %d is done" % worker_id)
341
342    .. versionchanged:: 4.3
343       Added ``async with`` support in Python 3.5.
344    """
345    def __init__(self, value=1):
346        super(Semaphore, self).__init__()
347        if value < 0:
348            raise ValueError('semaphore initial value must be >= 0')
349
350        self._value = value
351
352    def __repr__(self):
353        res = super(Semaphore, self).__repr__()
354        extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format(
355            self._value)
356        if self._waiters:
357            extra = '{0},waiters:{1}'.format(extra, len(self._waiters))
358        return '<{0} [{1}]>'.format(res[1:-1], extra)
359
360    def release(self):
361        """Increment the counter and wake one waiter."""
362        self._value += 1
363        while self._waiters:
364            waiter = self._waiters.popleft()
365            if not waiter.done():
366                self._value -= 1
367
368                # If the waiter is a coroutine paused at
369                #
370                #     with (yield semaphore.acquire()):
371                #
372                # then the context manager's __exit__ calls release() at the end
373                # of the "with" block.
374                waiter.set_result(_ReleasingContextManager(self))
375                break
376
377    def acquire(self, timeout=None):
378        """Decrement the counter. Returns a Future.
379
380        Block if the counter is zero and wait for a `.release`. The Future
381        raises `.TimeoutError` after the deadline.
382        """
383        waiter = Future()
384        if self._value > 0:
385            self._value -= 1
386            waiter.set_result(_ReleasingContextManager(self))
387        else:
388            self._waiters.append(waiter)
389            if timeout:
390                def on_timeout():
391                    waiter.set_exception(gen.TimeoutError())
392                    self._garbage_collect()
393                io_loop = ioloop.IOLoop.current()
394                timeout_handle = io_loop.add_timeout(timeout, on_timeout)
395                waiter.add_done_callback(
396                    lambda _: io_loop.remove_timeout(timeout_handle))
397        return waiter
398
399    def __enter__(self):
400        raise RuntimeError(
401            "Use Semaphore like 'with (yield semaphore.acquire())', not like"
402            " 'with semaphore'")
403
404    __exit__ = __enter__
405
406    @gen.coroutine
407    def __aenter__(self):
408        yield self.acquire()
409
410    @gen.coroutine
411    def __aexit__(self, typ, value, tb):
412        self.release()
413
414
415class BoundedSemaphore(Semaphore):
416    """A semaphore that prevents release() being called too many times.
417
418    If `.release` would increment the semaphore's value past the initial
419    value, it raises `ValueError`. Semaphores are mostly used to guard
420    resources with limited capacity, so a semaphore released too many times
421    is a sign of a bug.
422    """
423    def __init__(self, value=1):
424        super(BoundedSemaphore, self).__init__(value=value)
425        self._initial_value = value
426
427    def release(self):
428        """Increment the counter and wake one waiter."""
429        if self._value >= self._initial_value:
430            raise ValueError("Semaphore released too many times")
431        super(BoundedSemaphore, self).release()
432
433
434class Lock(object):
435    """A lock for coroutines.
436
437    A Lock begins unlocked, and `acquire` locks it immediately. While it is
438    locked, a coroutine that yields `acquire` waits until another coroutine
439    calls `release`.
440
441    Releasing an unlocked lock raises `RuntimeError`.
442
443    `acquire` supports the context manager protocol in all Python versions:
444
445    >>> from tornado import gen, locks
446    >>> lock = locks.Lock()
447    >>>
448    >>> @gen.coroutine
449    ... def f():
450    ...    with (yield lock.acquire()):
451    ...        # Do something holding the lock.
452    ...        pass
453    ...
454    ...    # Now the lock is released.
455
456    In Python 3.5, `Lock` also supports the async context manager
457    protocol. Note that in this case there is no `acquire`, because
458    ``async with`` includes both the ``yield`` and the ``acquire``
459    (just as it does with `threading.Lock`):
460
461    >>> async def f():  # doctest: +SKIP
462    ...    async with lock:
463    ...        # Do something holding the lock.
464    ...        pass
465    ...
466    ...    # Now the lock is released.
467
468    .. versionchanged:: 4.3
469       Added ``async with`` support in Python 3.5.
470
471    """
472    def __init__(self):
473        self._block = BoundedSemaphore(value=1)
474
475    def __repr__(self):
476        return "<%s _block=%s>" % (
477            self.__class__.__name__,
478            self._block)
479
480    def acquire(self, timeout=None):
481        """Attempt to lock. Returns a Future.
482
483        Returns a Future, which raises `tornado.gen.TimeoutError` after a
484        timeout.
485        """
486        return self._block.acquire(timeout)
487
488    def release(self):
489        """Unlock.
490
491        The first coroutine in line waiting for `acquire` gets the lock.
492
493        If not locked, raise a `RuntimeError`.
494        """
495        try:
496            self._block.release()
497        except ValueError:
498            raise RuntimeError('release unlocked lock')
499
500    def __enter__(self):
501        raise RuntimeError(
502            "Use Lock like 'with (yield lock)', not like 'with lock'")
503
504    __exit__ = __enter__
505
506    @gen.coroutine
507    def __aenter__(self):
508        yield self.acquire()
509
510    @gen.coroutine
511    def __aexit__(self, typ, value, tb):
512        self.release()
513