1# Copyright 2015 The Tornado Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15from __future__ import absolute_import, division, print_function 16 17import collections 18from concurrent.futures import CancelledError 19 20from tornado import gen, ioloop 21from tornado.concurrent import Future, future_set_result_unless_cancelled 22 23__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock'] 24 25 26class _TimeoutGarbageCollector(object): 27 """Base class for objects that periodically clean up timed-out waiters. 28 29 Avoids memory leak in a common pattern like: 30 31 while True: 32 yield condition.wait(short_timeout) 33 print('looping....') 34 """ 35 def __init__(self): 36 self._waiters = collections.deque() # Futures. 37 self._timeouts = 0 38 39 def _garbage_collect(self): 40 # Occasionally clear timed-out waiters. 41 self._timeouts += 1 42 if self._timeouts > 100: 43 self._timeouts = 0 44 self._waiters = collections.deque( 45 w for w in self._waiters if not w.done()) 46 47 48class Condition(_TimeoutGarbageCollector): 49 """A condition allows one or more coroutines to wait until notified. 50 51 Like a standard `threading.Condition`, but does not need an underlying lock 52 that is acquired and released. 53 54 With a `Condition`, coroutines can wait to be notified by other coroutines: 55 56 .. testcode:: 57 58 from tornado import gen 59 from tornado.ioloop import IOLoop 60 from tornado.locks import Condition 61 62 condition = Condition() 63 64 async def waiter(): 65 print("I'll wait right here") 66 await condition.wait() 67 print("I'm done waiting") 68 69 async def notifier(): 70 print("About to notify") 71 condition.notify() 72 print("Done notifying") 73 74 async def runner(): 75 # Wait for waiter() and notifier() in parallel 76 await gen.multi([waiter(), notifier()]) 77 78 IOLoop.current().run_sync(runner) 79 80 .. testoutput:: 81 82 I'll wait right here 83 About to notify 84 Done notifying 85 I'm done waiting 86 87 `wait` takes an optional ``timeout`` argument, which is either an absolute 88 timestamp:: 89 90 io_loop = IOLoop.current() 91 92 # Wait up to 1 second for a notification. 93 await condition.wait(timeout=io_loop.time() + 1) 94 95 ...or a `datetime.timedelta` for a timeout relative to the current time:: 96 97 # Wait up to 1 second. 98 await condition.wait(timeout=datetime.timedelta(seconds=1)) 99 100 The method returns False if there's no notification before the deadline. 101 102 .. versionchanged:: 5.0 103 Previously, waiters could be notified synchronously from within 104 `notify`. Now, the notification will always be received on the 105 next iteration of the `.IOLoop`. 106 """ 107 108 def __init__(self): 109 super(Condition, self).__init__() 110 self.io_loop = ioloop.IOLoop.current() 111 112 def __repr__(self): 113 result = '<%s' % (self.__class__.__name__, ) 114 if self._waiters: 115 result += ' waiters[%s]' % len(self._waiters) 116 return result + '>' 117 118 def wait(self, timeout=None): 119 """Wait for `.notify`. 120 121 Returns a `.Future` that resolves ``True`` if the condition is notified, 122 or ``False`` after a timeout. 123 """ 124 waiter = Future() 125 self._waiters.append(waiter) 126 if timeout: 127 def on_timeout(): 128 if not waiter.done(): 129 future_set_result_unless_cancelled(waiter, False) 130 self._garbage_collect() 131 io_loop = ioloop.IOLoop.current() 132 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 133 waiter.add_done_callback( 134 lambda _: io_loop.remove_timeout(timeout_handle)) 135 return waiter 136 137 def notify(self, n=1): 138 """Wake ``n`` waiters.""" 139 waiters = [] # Waiters we plan to run right now. 140 while n and self._waiters: 141 waiter = self._waiters.popleft() 142 if not waiter.done(): # Might have timed out. 143 n -= 1 144 waiters.append(waiter) 145 146 for waiter in waiters: 147 future_set_result_unless_cancelled(waiter, True) 148 149 def notify_all(self): 150 """Wake all waiters.""" 151 self.notify(len(self._waiters)) 152 153 154class Event(object): 155 """An event blocks coroutines until its internal flag is set to True. 156 157 Similar to `threading.Event`. 158 159 A coroutine can wait for an event to be set. Once it is set, calls to 160 ``yield event.wait()`` will not block unless the event has been cleared: 161 162 .. testcode:: 163 164 from tornado import gen 165 from tornado.ioloop import IOLoop 166 from tornado.locks import Event 167 168 event = Event() 169 170 async def waiter(): 171 print("Waiting for event") 172 await event.wait() 173 print("Not waiting this time") 174 await event.wait() 175 print("Done") 176 177 async def setter(): 178 print("About to set the event") 179 event.set() 180 181 async def runner(): 182 await gen.multi([waiter(), setter()]) 183 184 IOLoop.current().run_sync(runner) 185 186 .. testoutput:: 187 188 Waiting for event 189 About to set the event 190 Not waiting this time 191 Done 192 """ 193 def __init__(self): 194 self._value = False 195 self._waiters = set() 196 197 def __repr__(self): 198 return '<%s %s>' % ( 199 self.__class__.__name__, 'set' if self.is_set() else 'clear') 200 201 def is_set(self): 202 """Return ``True`` if the internal flag is true.""" 203 return self._value 204 205 def set(self): 206 """Set the internal flag to ``True``. All waiters are awakened. 207 208 Calling `.wait` once the flag is set will not block. 209 """ 210 if not self._value: 211 self._value = True 212 213 for fut in self._waiters: 214 if not fut.done(): 215 fut.set_result(None) 216 217 def clear(self): 218 """Reset the internal flag to ``False``. 219 220 Calls to `.wait` will block until `.set` is called. 221 """ 222 self._value = False 223 224 def wait(self, timeout=None): 225 """Block until the internal flag is true. 226 227 Returns a Future, which raises `tornado.util.TimeoutError` after a 228 timeout. 229 """ 230 fut = Future() 231 if self._value: 232 fut.set_result(None) 233 return fut 234 self._waiters.add(fut) 235 fut.add_done_callback(lambda fut: self._waiters.remove(fut)) 236 if timeout is None: 237 return fut 238 else: 239 timeout_fut = gen.with_timeout(timeout, fut, quiet_exceptions=(CancelledError,)) 240 # This is a slightly clumsy workaround for the fact that 241 # gen.with_timeout doesn't cancel its futures. Cancelling 242 # fut will remove it from the waiters list. 243 timeout_fut.add_done_callback(lambda tf: fut.cancel() if not fut.done() else None) 244 return timeout_fut 245 246 247class _ReleasingContextManager(object): 248 """Releases a Lock or Semaphore at the end of a "with" statement. 249 250 with (yield semaphore.acquire()): 251 pass 252 253 # Now semaphore.release() has been called. 254 """ 255 def __init__(self, obj): 256 self._obj = obj 257 258 def __enter__(self): 259 pass 260 261 def __exit__(self, exc_type, exc_val, exc_tb): 262 self._obj.release() 263 264 265class Semaphore(_TimeoutGarbageCollector): 266 """A lock that can be acquired a fixed number of times before blocking. 267 268 A Semaphore manages a counter representing the number of `.release` calls 269 minus the number of `.acquire` calls, plus an initial value. The `.acquire` 270 method blocks if necessary until it can return without making the counter 271 negative. 272 273 Semaphores limit access to a shared resource. To allow access for two 274 workers at a time: 275 276 .. testsetup:: semaphore 277 278 from collections import deque 279 280 from tornado import gen 281 from tornado.ioloop import IOLoop 282 from tornado.concurrent import Future 283 284 # Ensure reliable doctest output: resolve Futures one at a time. 285 futures_q = deque([Future() for _ in range(3)]) 286 287 async def simulator(futures): 288 for f in futures: 289 # simulate the asynchronous passage of time 290 await gen.sleep(0) 291 await gen.sleep(0) 292 f.set_result(None) 293 294 IOLoop.current().add_callback(simulator, list(futures_q)) 295 296 def use_some_resource(): 297 return futures_q.popleft() 298 299 .. testcode:: semaphore 300 301 from tornado import gen 302 from tornado.ioloop import IOLoop 303 from tornado.locks import Semaphore 304 305 sem = Semaphore(2) 306 307 async def worker(worker_id): 308 await sem.acquire() 309 try: 310 print("Worker %d is working" % worker_id) 311 await use_some_resource() 312 finally: 313 print("Worker %d is done" % worker_id) 314 sem.release() 315 316 async def runner(): 317 # Join all workers. 318 await gen.multi([worker(i) for i in range(3)]) 319 320 IOLoop.current().run_sync(runner) 321 322 .. testoutput:: semaphore 323 324 Worker 0 is working 325 Worker 1 is working 326 Worker 0 is done 327 Worker 2 is working 328 Worker 1 is done 329 Worker 2 is done 330 331 Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until 332 the semaphore has been released once, by worker 0. 333 334 The semaphore can be used as an async context manager:: 335 336 async def worker(worker_id): 337 async with sem: 338 print("Worker %d is working" % worker_id) 339 await use_some_resource() 340 341 # Now the semaphore has been released. 342 print("Worker %d is done" % worker_id) 343 344 For compatibility with older versions of Python, `.acquire` is a 345 context manager, so ``worker`` could also be written as:: 346 347 @gen.coroutine 348 def worker(worker_id): 349 with (yield sem.acquire()): 350 print("Worker %d is working" % worker_id) 351 yield use_some_resource() 352 353 # Now the semaphore has been released. 354 print("Worker %d is done" % worker_id) 355 356 .. versionchanged:: 4.3 357 Added ``async with`` support in Python 3.5. 358 359 """ 360 def __init__(self, value=1): 361 super(Semaphore, self).__init__() 362 if value < 0: 363 raise ValueError('semaphore initial value must be >= 0') 364 365 self._value = value 366 367 def __repr__(self): 368 res = super(Semaphore, self).__repr__() 369 extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format( 370 self._value) 371 if self._waiters: 372 extra = '{0},waiters:{1}'.format(extra, len(self._waiters)) 373 return '<{0} [{1}]>'.format(res[1:-1], extra) 374 375 def release(self): 376 """Increment the counter and wake one waiter.""" 377 self._value += 1 378 while self._waiters: 379 waiter = self._waiters.popleft() 380 if not waiter.done(): 381 self._value -= 1 382 383 # If the waiter is a coroutine paused at 384 # 385 # with (yield semaphore.acquire()): 386 # 387 # then the context manager's __exit__ calls release() at the end 388 # of the "with" block. 389 waiter.set_result(_ReleasingContextManager(self)) 390 break 391 392 def acquire(self, timeout=None): 393 """Decrement the counter. Returns a Future. 394 395 Block if the counter is zero and wait for a `.release`. The Future 396 raises `.TimeoutError` after the deadline. 397 """ 398 waiter = Future() 399 if self._value > 0: 400 self._value -= 1 401 waiter.set_result(_ReleasingContextManager(self)) 402 else: 403 self._waiters.append(waiter) 404 if timeout: 405 def on_timeout(): 406 if not waiter.done(): 407 waiter.set_exception(gen.TimeoutError()) 408 self._garbage_collect() 409 io_loop = ioloop.IOLoop.current() 410 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 411 waiter.add_done_callback( 412 lambda _: io_loop.remove_timeout(timeout_handle)) 413 return waiter 414 415 def __enter__(self): 416 raise RuntimeError( 417 "Use Semaphore like 'with (yield semaphore.acquire())', not like" 418 " 'with semaphore'") 419 420 __exit__ = __enter__ 421 422 @gen.coroutine 423 def __aenter__(self): 424 yield self.acquire() 425 426 @gen.coroutine 427 def __aexit__(self, typ, value, tb): 428 self.release() 429 430 431class BoundedSemaphore(Semaphore): 432 """A semaphore that prevents release() being called too many times. 433 434 If `.release` would increment the semaphore's value past the initial 435 value, it raises `ValueError`. Semaphores are mostly used to guard 436 resources with limited capacity, so a semaphore released too many times 437 is a sign of a bug. 438 """ 439 def __init__(self, value=1): 440 super(BoundedSemaphore, self).__init__(value=value) 441 self._initial_value = value 442 443 def release(self): 444 """Increment the counter and wake one waiter.""" 445 if self._value >= self._initial_value: 446 raise ValueError("Semaphore released too many times") 447 super(BoundedSemaphore, self).release() 448 449 450class Lock(object): 451 """A lock for coroutines. 452 453 A Lock begins unlocked, and `acquire` locks it immediately. While it is 454 locked, a coroutine that yields `acquire` waits until another coroutine 455 calls `release`. 456 457 Releasing an unlocked lock raises `RuntimeError`. 458 459 A Lock can be used as an async context manager with the ``async 460 with`` statement: 461 462 >>> from tornado import locks 463 >>> lock = locks.Lock() 464 >>> 465 >>> async def f(): 466 ... async with lock: 467 ... # Do something holding the lock. 468 ... pass 469 ... 470 ... # Now the lock is released. 471 472 For compatibility with older versions of Python, the `.acquire` 473 method asynchronously returns a regular context manager: 474 475 >>> async def f2(): 476 ... with (yield lock.acquire()): 477 ... # Do something holding the lock. 478 ... pass 479 ... 480 ... # Now the lock is released. 481 482 .. versionchanged:: 4.3 483 Added ``async with`` support in Python 3.5. 484 485 """ 486 def __init__(self): 487 self._block = BoundedSemaphore(value=1) 488 489 def __repr__(self): 490 return "<%s _block=%s>" % ( 491 self.__class__.__name__, 492 self._block) 493 494 def acquire(self, timeout=None): 495 """Attempt to lock. Returns a Future. 496 497 Returns a Future, which raises `tornado.util.TimeoutError` after a 498 timeout. 499 """ 500 return self._block.acquire(timeout) 501 502 def release(self): 503 """Unlock. 504 505 The first coroutine in line waiting for `acquire` gets the lock. 506 507 If not locked, raise a `RuntimeError`. 508 """ 509 try: 510 self._block.release() 511 except ValueError: 512 raise RuntimeError('release unlocked lock') 513 514 def __enter__(self): 515 raise RuntimeError( 516 "Use Lock like 'with (yield lock)', not like 'with lock'") 517 518 __exit__ = __enter__ 519 520 @gen.coroutine 521 def __aenter__(self): 522 yield self.acquire() 523 524 @gen.coroutine 525 def __aexit__(self, typ, value, tb): 526 self.release() 527