1# Copyright 2015 The Tornado Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15import collections 16import datetime 17import types 18 19from tornado import gen, ioloop 20from tornado.concurrent import Future, future_set_result_unless_cancelled 21 22from typing import Union, Optional, Type, Any, Awaitable 23import typing 24 25if typing.TYPE_CHECKING: 26 from typing import Deque, Set # noqa: F401 27 28__all__ = ["Condition", "Event", "Semaphore", "BoundedSemaphore", "Lock"] 29 30 31class _TimeoutGarbageCollector(object): 32 """Base class for objects that periodically clean up timed-out waiters. 33 34 Avoids memory leak in a common pattern like: 35 36 while True: 37 yield condition.wait(short_timeout) 38 print('looping....') 39 """ 40 41 def __init__(self) -> None: 42 self._waiters = collections.deque() # type: Deque[Future] 43 self._timeouts = 0 44 45 def _garbage_collect(self) -> None: 46 # Occasionally clear timed-out waiters. 47 self._timeouts += 1 48 if self._timeouts > 100: 49 self._timeouts = 0 50 self._waiters = collections.deque(w for w in self._waiters if not w.done()) 51 52 53class Condition(_TimeoutGarbageCollector): 54 """A condition allows one or more coroutines to wait until notified. 55 56 Like a standard `threading.Condition`, but does not need an underlying lock 57 that is acquired and released. 58 59 With a `Condition`, coroutines can wait to be notified by other coroutines: 60 61 .. testcode:: 62 63 from tornado import gen 64 from tornado.ioloop import IOLoop 65 from tornado.locks import Condition 66 67 condition = Condition() 68 69 async def waiter(): 70 print("I'll wait right here") 71 await condition.wait() 72 print("I'm done waiting") 73 74 async def notifier(): 75 print("About to notify") 76 condition.notify() 77 print("Done notifying") 78 79 async def runner(): 80 # Wait for waiter() and notifier() in parallel 81 await gen.multi([waiter(), notifier()]) 82 83 IOLoop.current().run_sync(runner) 84 85 .. testoutput:: 86 87 I'll wait right here 88 About to notify 89 Done notifying 90 I'm done waiting 91 92 `wait` takes an optional ``timeout`` argument, which is either an absolute 93 timestamp:: 94 95 io_loop = IOLoop.current() 96 97 # Wait up to 1 second for a notification. 98 await condition.wait(timeout=io_loop.time() + 1) 99 100 ...or a `datetime.timedelta` for a timeout relative to the current time:: 101 102 # Wait up to 1 second. 103 await condition.wait(timeout=datetime.timedelta(seconds=1)) 104 105 The method returns False if there's no notification before the deadline. 106 107 .. versionchanged:: 5.0 108 Previously, waiters could be notified synchronously from within 109 `notify`. Now, the notification will always be received on the 110 next iteration of the `.IOLoop`. 111 """ 112 113 def __init__(self) -> None: 114 super().__init__() 115 self.io_loop = ioloop.IOLoop.current() 116 117 def __repr__(self) -> str: 118 result = "<%s" % (self.__class__.__name__,) 119 if self._waiters: 120 result += " waiters[%s]" % len(self._waiters) 121 return result + ">" 122 123 def wait( 124 self, timeout: Optional[Union[float, datetime.timedelta]] = None 125 ) -> Awaitable[bool]: 126 """Wait for `.notify`. 127 128 Returns a `.Future` that resolves ``True`` if the condition is notified, 129 or ``False`` after a timeout. 130 """ 131 waiter = Future() # type: Future[bool] 132 self._waiters.append(waiter) 133 if timeout: 134 135 def on_timeout() -> None: 136 if not waiter.done(): 137 future_set_result_unless_cancelled(waiter, False) 138 self._garbage_collect() 139 140 io_loop = ioloop.IOLoop.current() 141 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 142 waiter.add_done_callback(lambda _: io_loop.remove_timeout(timeout_handle)) 143 return waiter 144 145 def notify(self, n: int = 1) -> None: 146 """Wake ``n`` waiters.""" 147 waiters = [] # Waiters we plan to run right now. 148 while n and self._waiters: 149 waiter = self._waiters.popleft() 150 if not waiter.done(): # Might have timed out. 151 n -= 1 152 waiters.append(waiter) 153 154 for waiter in waiters: 155 future_set_result_unless_cancelled(waiter, True) 156 157 def notify_all(self) -> None: 158 """Wake all waiters.""" 159 self.notify(len(self._waiters)) 160 161 162class Event(object): 163 """An event blocks coroutines until its internal flag is set to True. 164 165 Similar to `threading.Event`. 166 167 A coroutine can wait for an event to be set. Once it is set, calls to 168 ``yield event.wait()`` will not block unless the event has been cleared: 169 170 .. testcode:: 171 172 from tornado import gen 173 from tornado.ioloop import IOLoop 174 from tornado.locks import Event 175 176 event = Event() 177 178 async def waiter(): 179 print("Waiting for event") 180 await event.wait() 181 print("Not waiting this time") 182 await event.wait() 183 print("Done") 184 185 async def setter(): 186 print("About to set the event") 187 event.set() 188 189 async def runner(): 190 await gen.multi([waiter(), setter()]) 191 192 IOLoop.current().run_sync(runner) 193 194 .. testoutput:: 195 196 Waiting for event 197 About to set the event 198 Not waiting this time 199 Done 200 """ 201 202 def __init__(self) -> None: 203 self._value = False 204 self._waiters = set() # type: Set[Future[None]] 205 206 def __repr__(self) -> str: 207 return "<%s %s>" % ( 208 self.__class__.__name__, 209 "set" if self.is_set() else "clear", 210 ) 211 212 def is_set(self) -> bool: 213 """Return ``True`` if the internal flag is true.""" 214 return self._value 215 216 def set(self) -> None: 217 """Set the internal flag to ``True``. All waiters are awakened. 218 219 Calling `.wait` once the flag is set will not block. 220 """ 221 if not self._value: 222 self._value = True 223 224 for fut in self._waiters: 225 if not fut.done(): 226 fut.set_result(None) 227 228 def clear(self) -> None: 229 """Reset the internal flag to ``False``. 230 231 Calls to `.wait` will block until `.set` is called. 232 """ 233 self._value = False 234 235 def wait( 236 self, timeout: Optional[Union[float, datetime.timedelta]] = None 237 ) -> Awaitable[None]: 238 """Block until the internal flag is true. 239 240 Returns an awaitable, which raises `tornado.util.TimeoutError` after a 241 timeout. 242 """ 243 fut = Future() # type: Future[None] 244 if self._value: 245 fut.set_result(None) 246 return fut 247 self._waiters.add(fut) 248 fut.add_done_callback(lambda fut: self._waiters.remove(fut)) 249 if timeout is None: 250 return fut 251 else: 252 timeout_fut = gen.with_timeout(timeout, fut) 253 # This is a slightly clumsy workaround for the fact that 254 # gen.with_timeout doesn't cancel its futures. Cancelling 255 # fut will remove it from the waiters list. 256 timeout_fut.add_done_callback( 257 lambda tf: fut.cancel() if not fut.done() else None 258 ) 259 return timeout_fut 260 261 262class _ReleasingContextManager(object): 263 """Releases a Lock or Semaphore at the end of a "with" statement. 264 265 with (yield semaphore.acquire()): 266 pass 267 268 # Now semaphore.release() has been called. 269 """ 270 271 def __init__(self, obj: Any) -> None: 272 self._obj = obj 273 274 def __enter__(self) -> None: 275 pass 276 277 def __exit__( 278 self, 279 exc_type: "Optional[Type[BaseException]]", 280 exc_val: Optional[BaseException], 281 exc_tb: Optional[types.TracebackType], 282 ) -> None: 283 self._obj.release() 284 285 286class Semaphore(_TimeoutGarbageCollector): 287 """A lock that can be acquired a fixed number of times before blocking. 288 289 A Semaphore manages a counter representing the number of `.release` calls 290 minus the number of `.acquire` calls, plus an initial value. The `.acquire` 291 method blocks if necessary until it can return without making the counter 292 negative. 293 294 Semaphores limit access to a shared resource. To allow access for two 295 workers at a time: 296 297 .. testsetup:: semaphore 298 299 from collections import deque 300 301 from tornado import gen 302 from tornado.ioloop import IOLoop 303 from tornado.concurrent import Future 304 305 # Ensure reliable doctest output: resolve Futures one at a time. 306 futures_q = deque([Future() for _ in range(3)]) 307 308 async def simulator(futures): 309 for f in futures: 310 # simulate the asynchronous passage of time 311 await gen.sleep(0) 312 await gen.sleep(0) 313 f.set_result(None) 314 315 IOLoop.current().add_callback(simulator, list(futures_q)) 316 317 def use_some_resource(): 318 return futures_q.popleft() 319 320 .. testcode:: semaphore 321 322 from tornado import gen 323 from tornado.ioloop import IOLoop 324 from tornado.locks import Semaphore 325 326 sem = Semaphore(2) 327 328 async def worker(worker_id): 329 await sem.acquire() 330 try: 331 print("Worker %d is working" % worker_id) 332 await use_some_resource() 333 finally: 334 print("Worker %d is done" % worker_id) 335 sem.release() 336 337 async def runner(): 338 # Join all workers. 339 await gen.multi([worker(i) for i in range(3)]) 340 341 IOLoop.current().run_sync(runner) 342 343 .. testoutput:: semaphore 344 345 Worker 0 is working 346 Worker 1 is working 347 Worker 0 is done 348 Worker 2 is working 349 Worker 1 is done 350 Worker 2 is done 351 352 Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until 353 the semaphore has been released once, by worker 0. 354 355 The semaphore can be used as an async context manager:: 356 357 async def worker(worker_id): 358 async with sem: 359 print("Worker %d is working" % worker_id) 360 await use_some_resource() 361 362 # Now the semaphore has been released. 363 print("Worker %d is done" % worker_id) 364 365 For compatibility with older versions of Python, `.acquire` is a 366 context manager, so ``worker`` could also be written as:: 367 368 @gen.coroutine 369 def worker(worker_id): 370 with (yield sem.acquire()): 371 print("Worker %d is working" % worker_id) 372 yield use_some_resource() 373 374 # Now the semaphore has been released. 375 print("Worker %d is done" % worker_id) 376 377 .. versionchanged:: 4.3 378 Added ``async with`` support in Python 3.5. 379 380 """ 381 382 def __init__(self, value: int = 1) -> None: 383 super().__init__() 384 if value < 0: 385 raise ValueError("semaphore initial value must be >= 0") 386 387 self._value = value 388 389 def __repr__(self) -> str: 390 res = super().__repr__() 391 extra = ( 392 "locked" if self._value == 0 else "unlocked,value:{0}".format(self._value) 393 ) 394 if self._waiters: 395 extra = "{0},waiters:{1}".format(extra, len(self._waiters)) 396 return "<{0} [{1}]>".format(res[1:-1], extra) 397 398 def release(self) -> None: 399 """Increment the counter and wake one waiter.""" 400 self._value += 1 401 while self._waiters: 402 waiter = self._waiters.popleft() 403 if not waiter.done(): 404 self._value -= 1 405 406 # If the waiter is a coroutine paused at 407 # 408 # with (yield semaphore.acquire()): 409 # 410 # then the context manager's __exit__ calls release() at the end 411 # of the "with" block. 412 waiter.set_result(_ReleasingContextManager(self)) 413 break 414 415 def acquire( 416 self, timeout: Optional[Union[float, datetime.timedelta]] = None 417 ) -> Awaitable[_ReleasingContextManager]: 418 """Decrement the counter. Returns an awaitable. 419 420 Block if the counter is zero and wait for a `.release`. The awaitable 421 raises `.TimeoutError` after the deadline. 422 """ 423 waiter = Future() # type: Future[_ReleasingContextManager] 424 if self._value > 0: 425 self._value -= 1 426 waiter.set_result(_ReleasingContextManager(self)) 427 else: 428 self._waiters.append(waiter) 429 if timeout: 430 431 def on_timeout() -> None: 432 if not waiter.done(): 433 waiter.set_exception(gen.TimeoutError()) 434 self._garbage_collect() 435 436 io_loop = ioloop.IOLoop.current() 437 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 438 waiter.add_done_callback( 439 lambda _: io_loop.remove_timeout(timeout_handle) 440 ) 441 return waiter 442 443 def __enter__(self) -> None: 444 raise RuntimeError("Use 'async with' instead of 'with' for Semaphore") 445 446 def __exit__( 447 self, 448 typ: "Optional[Type[BaseException]]", 449 value: Optional[BaseException], 450 traceback: Optional[types.TracebackType], 451 ) -> None: 452 self.__enter__() 453 454 async def __aenter__(self) -> None: 455 await self.acquire() 456 457 async def __aexit__( 458 self, 459 typ: "Optional[Type[BaseException]]", 460 value: Optional[BaseException], 461 tb: Optional[types.TracebackType], 462 ) -> None: 463 self.release() 464 465 466class BoundedSemaphore(Semaphore): 467 """A semaphore that prevents release() being called too many times. 468 469 If `.release` would increment the semaphore's value past the initial 470 value, it raises `ValueError`. Semaphores are mostly used to guard 471 resources with limited capacity, so a semaphore released too many times 472 is a sign of a bug. 473 """ 474 475 def __init__(self, value: int = 1) -> None: 476 super().__init__(value=value) 477 self._initial_value = value 478 479 def release(self) -> None: 480 """Increment the counter and wake one waiter.""" 481 if self._value >= self._initial_value: 482 raise ValueError("Semaphore released too many times") 483 super().release() 484 485 486class Lock(object): 487 """A lock for coroutines. 488 489 A Lock begins unlocked, and `acquire` locks it immediately. While it is 490 locked, a coroutine that yields `acquire` waits until another coroutine 491 calls `release`. 492 493 Releasing an unlocked lock raises `RuntimeError`. 494 495 A Lock can be used as an async context manager with the ``async 496 with`` statement: 497 498 >>> from tornado import locks 499 >>> lock = locks.Lock() 500 >>> 501 >>> async def f(): 502 ... async with lock: 503 ... # Do something holding the lock. 504 ... pass 505 ... 506 ... # Now the lock is released. 507 508 For compatibility with older versions of Python, the `.acquire` 509 method asynchronously returns a regular context manager: 510 511 >>> async def f2(): 512 ... with (yield lock.acquire()): 513 ... # Do something holding the lock. 514 ... pass 515 ... 516 ... # Now the lock is released. 517 518 .. versionchanged:: 4.3 519 Added ``async with`` support in Python 3.5. 520 521 """ 522 523 def __init__(self) -> None: 524 self._block = BoundedSemaphore(value=1) 525 526 def __repr__(self) -> str: 527 return "<%s _block=%s>" % (self.__class__.__name__, self._block) 528 529 def acquire( 530 self, timeout: Optional[Union[float, datetime.timedelta]] = None 531 ) -> Awaitable[_ReleasingContextManager]: 532 """Attempt to lock. Returns an awaitable. 533 534 Returns an awaitable, which raises `tornado.util.TimeoutError` after a 535 timeout. 536 """ 537 return self._block.acquire(timeout) 538 539 def release(self) -> None: 540 """Unlock. 541 542 The first coroutine in line waiting for `acquire` gets the lock. 543 544 If not locked, raise a `RuntimeError`. 545 """ 546 try: 547 self._block.release() 548 except ValueError: 549 raise RuntimeError("release unlocked lock") 550 551 def __enter__(self) -> None: 552 raise RuntimeError("Use `async with` instead of `with` for Lock") 553 554 def __exit__( 555 self, 556 typ: "Optional[Type[BaseException]]", 557 value: Optional[BaseException], 558 tb: Optional[types.TracebackType], 559 ) -> None: 560 self.__enter__() 561 562 async def __aenter__(self) -> None: 563 await self.acquire() 564 565 async def __aexit__( 566 self, 567 typ: "Optional[Type[BaseException]]", 568 value: Optional[BaseException], 569 tb: Optional[types.TracebackType], 570 ) -> None: 571 self.release() 572