1# Copyright 2015 The Tornado Authors 2# 3# Licensed under the Apache License, Version 2.0 (the "License"); you may 4# not use this file except in compliance with the License. You may obtain 5# a copy of the License at 6# 7# http://www.apache.org/licenses/LICENSE-2.0 8# 9# Unless required by applicable law or agreed to in writing, software 10# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 11# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 12# License for the specific language governing permissions and limitations 13# under the License. 14 15from __future__ import absolute_import, division, print_function 16 17import collections 18 19from tornado import gen, ioloop 20from tornado.concurrent import Future 21 22__all__ = ['Condition', 'Event', 'Semaphore', 'BoundedSemaphore', 'Lock'] 23 24 25class _TimeoutGarbageCollector(object): 26 """Base class for objects that periodically clean up timed-out waiters. 27 28 Avoids memory leak in a common pattern like: 29 30 while True: 31 yield condition.wait(short_timeout) 32 print('looping....') 33 """ 34 def __init__(self): 35 self._waiters = collections.deque() # Futures. 36 self._timeouts = 0 37 38 def _garbage_collect(self): 39 # Occasionally clear timed-out waiters. 40 self._timeouts += 1 41 if self._timeouts > 100: 42 self._timeouts = 0 43 self._waiters = collections.deque( 44 w for w in self._waiters if not w.done()) 45 46 47class Condition(_TimeoutGarbageCollector): 48 """A condition allows one or more coroutines to wait until notified. 49 50 Like a standard `threading.Condition`, but does not need an underlying lock 51 that is acquired and released. 52 53 With a `Condition`, coroutines can wait to be notified by other coroutines: 54 55 .. testcode:: 56 57 from tornado import gen 58 from tornado.ioloop import IOLoop 59 from tornado.locks import Condition 60 61 condition = Condition() 62 63 @gen.coroutine 64 def waiter(): 65 print("I'll wait right here") 66 yield condition.wait() # Yield a Future. 67 print("I'm done waiting") 68 69 @gen.coroutine 70 def notifier(): 71 print("About to notify") 72 condition.notify() 73 print("Done notifying") 74 75 @gen.coroutine 76 def runner(): 77 # Yield two Futures; wait for waiter() and notifier() to finish. 78 yield [waiter(), notifier()] 79 80 IOLoop.current().run_sync(runner) 81 82 .. testoutput:: 83 84 I'll wait right here 85 About to notify 86 Done notifying 87 I'm done waiting 88 89 `wait` takes an optional ``timeout`` argument, which is either an absolute 90 timestamp:: 91 92 io_loop = IOLoop.current() 93 94 # Wait up to 1 second for a notification. 95 yield condition.wait(timeout=io_loop.time() + 1) 96 97 ...or a `datetime.timedelta` for a timeout relative to the current time:: 98 99 # Wait up to 1 second. 100 yield condition.wait(timeout=datetime.timedelta(seconds=1)) 101 102 The method raises `tornado.gen.TimeoutError` if there's no notification 103 before the deadline. 104 """ 105 106 def __init__(self): 107 super(Condition, self).__init__() 108 self.io_loop = ioloop.IOLoop.current() 109 110 def __repr__(self): 111 result = '<%s' % (self.__class__.__name__, ) 112 if self._waiters: 113 result += ' waiters[%s]' % len(self._waiters) 114 return result + '>' 115 116 def wait(self, timeout=None): 117 """Wait for `.notify`. 118 119 Returns a `.Future` that resolves ``True`` if the condition is notified, 120 or ``False`` after a timeout. 121 """ 122 waiter = Future() 123 self._waiters.append(waiter) 124 if timeout: 125 def on_timeout(): 126 waiter.set_result(False) 127 self._garbage_collect() 128 io_loop = ioloop.IOLoop.current() 129 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 130 waiter.add_done_callback( 131 lambda _: io_loop.remove_timeout(timeout_handle)) 132 return waiter 133 134 def notify(self, n=1): 135 """Wake ``n`` waiters.""" 136 waiters = [] # Waiters we plan to run right now. 137 while n and self._waiters: 138 waiter = self._waiters.popleft() 139 if not waiter.done(): # Might have timed out. 140 n -= 1 141 waiters.append(waiter) 142 143 for waiter in waiters: 144 waiter.set_result(True) 145 146 def notify_all(self): 147 """Wake all waiters.""" 148 self.notify(len(self._waiters)) 149 150 151class Event(object): 152 """An event blocks coroutines until its internal flag is set to True. 153 154 Similar to `threading.Event`. 155 156 A coroutine can wait for an event to be set. Once it is set, calls to 157 ``yield event.wait()`` will not block unless the event has been cleared: 158 159 .. testcode:: 160 161 from tornado import gen 162 from tornado.ioloop import IOLoop 163 from tornado.locks import Event 164 165 event = Event() 166 167 @gen.coroutine 168 def waiter(): 169 print("Waiting for event") 170 yield event.wait() 171 print("Not waiting this time") 172 yield event.wait() 173 print("Done") 174 175 @gen.coroutine 176 def setter(): 177 print("About to set the event") 178 event.set() 179 180 @gen.coroutine 181 def runner(): 182 yield [waiter(), setter()] 183 184 IOLoop.current().run_sync(runner) 185 186 .. testoutput:: 187 188 Waiting for event 189 About to set the event 190 Not waiting this time 191 Done 192 """ 193 def __init__(self): 194 self._future = Future() 195 196 def __repr__(self): 197 return '<%s %s>' % ( 198 self.__class__.__name__, 'set' if self.is_set() else 'clear') 199 200 def is_set(self): 201 """Return ``True`` if the internal flag is true.""" 202 return self._future.done() 203 204 def set(self): 205 """Set the internal flag to ``True``. All waiters are awakened. 206 207 Calling `.wait` once the flag is set will not block. 208 """ 209 if not self._future.done(): 210 self._future.set_result(None) 211 212 def clear(self): 213 """Reset the internal flag to ``False``. 214 215 Calls to `.wait` will block until `.set` is called. 216 """ 217 if self._future.done(): 218 self._future = Future() 219 220 def wait(self, timeout=None): 221 """Block until the internal flag is true. 222 223 Returns a Future, which raises `tornado.gen.TimeoutError` after a 224 timeout. 225 """ 226 if timeout is None: 227 return self._future 228 else: 229 return gen.with_timeout(timeout, self._future) 230 231 232class _ReleasingContextManager(object): 233 """Releases a Lock or Semaphore at the end of a "with" statement. 234 235 with (yield semaphore.acquire()): 236 pass 237 238 # Now semaphore.release() has been called. 239 """ 240 def __init__(self, obj): 241 self._obj = obj 242 243 def __enter__(self): 244 pass 245 246 def __exit__(self, exc_type, exc_val, exc_tb): 247 self._obj.release() 248 249 250class Semaphore(_TimeoutGarbageCollector): 251 """A lock that can be acquired a fixed number of times before blocking. 252 253 A Semaphore manages a counter representing the number of `.release` calls 254 minus the number of `.acquire` calls, plus an initial value. The `.acquire` 255 method blocks if necessary until it can return without making the counter 256 negative. 257 258 Semaphores limit access to a shared resource. To allow access for two 259 workers at a time: 260 261 .. testsetup:: semaphore 262 263 from collections import deque 264 265 from tornado import gen 266 from tornado.ioloop import IOLoop 267 from tornado.concurrent import Future 268 269 # Ensure reliable doctest output: resolve Futures one at a time. 270 futures_q = deque([Future() for _ in range(3)]) 271 272 @gen.coroutine 273 def simulator(futures): 274 for f in futures: 275 yield gen.moment 276 f.set_result(None) 277 278 IOLoop.current().add_callback(simulator, list(futures_q)) 279 280 def use_some_resource(): 281 return futures_q.popleft() 282 283 .. testcode:: semaphore 284 285 from tornado import gen 286 from tornado.ioloop import IOLoop 287 from tornado.locks import Semaphore 288 289 sem = Semaphore(2) 290 291 @gen.coroutine 292 def worker(worker_id): 293 yield sem.acquire() 294 try: 295 print("Worker %d is working" % worker_id) 296 yield use_some_resource() 297 finally: 298 print("Worker %d is done" % worker_id) 299 sem.release() 300 301 @gen.coroutine 302 def runner(): 303 # Join all workers. 304 yield [worker(i) for i in range(3)] 305 306 IOLoop.current().run_sync(runner) 307 308 .. testoutput:: semaphore 309 310 Worker 0 is working 311 Worker 1 is working 312 Worker 0 is done 313 Worker 2 is working 314 Worker 1 is done 315 Worker 2 is done 316 317 Workers 0 and 1 are allowed to run concurrently, but worker 2 waits until 318 the semaphore has been released once, by worker 0. 319 320 `.acquire` is a context manager, so ``worker`` could be written as:: 321 322 @gen.coroutine 323 def worker(worker_id): 324 with (yield sem.acquire()): 325 print("Worker %d is working" % worker_id) 326 yield use_some_resource() 327 328 # Now the semaphore has been released. 329 print("Worker %d is done" % worker_id) 330 331 In Python 3.5, the semaphore itself can be used as an async context 332 manager:: 333 334 async def worker(worker_id): 335 async with sem: 336 print("Worker %d is working" % worker_id) 337 await use_some_resource() 338 339 # Now the semaphore has been released. 340 print("Worker %d is done" % worker_id) 341 342 .. versionchanged:: 4.3 343 Added ``async with`` support in Python 3.5. 344 """ 345 def __init__(self, value=1): 346 super(Semaphore, self).__init__() 347 if value < 0: 348 raise ValueError('semaphore initial value must be >= 0') 349 350 self._value = value 351 352 def __repr__(self): 353 res = super(Semaphore, self).__repr__() 354 extra = 'locked' if self._value == 0 else 'unlocked,value:{0}'.format( 355 self._value) 356 if self._waiters: 357 extra = '{0},waiters:{1}'.format(extra, len(self._waiters)) 358 return '<{0} [{1}]>'.format(res[1:-1], extra) 359 360 def release(self): 361 """Increment the counter and wake one waiter.""" 362 self._value += 1 363 while self._waiters: 364 waiter = self._waiters.popleft() 365 if not waiter.done(): 366 self._value -= 1 367 368 # If the waiter is a coroutine paused at 369 # 370 # with (yield semaphore.acquire()): 371 # 372 # then the context manager's __exit__ calls release() at the end 373 # of the "with" block. 374 waiter.set_result(_ReleasingContextManager(self)) 375 break 376 377 def acquire(self, timeout=None): 378 """Decrement the counter. Returns a Future. 379 380 Block if the counter is zero and wait for a `.release`. The Future 381 raises `.TimeoutError` after the deadline. 382 """ 383 waiter = Future() 384 if self._value > 0: 385 self._value -= 1 386 waiter.set_result(_ReleasingContextManager(self)) 387 else: 388 self._waiters.append(waiter) 389 if timeout: 390 def on_timeout(): 391 waiter.set_exception(gen.TimeoutError()) 392 self._garbage_collect() 393 io_loop = ioloop.IOLoop.current() 394 timeout_handle = io_loop.add_timeout(timeout, on_timeout) 395 waiter.add_done_callback( 396 lambda _: io_loop.remove_timeout(timeout_handle)) 397 return waiter 398 399 def __enter__(self): 400 raise RuntimeError( 401 "Use Semaphore like 'with (yield semaphore.acquire())', not like" 402 " 'with semaphore'") 403 404 __exit__ = __enter__ 405 406 @gen.coroutine 407 def __aenter__(self): 408 yield self.acquire() 409 410 @gen.coroutine 411 def __aexit__(self, typ, value, tb): 412 self.release() 413 414 415class BoundedSemaphore(Semaphore): 416 """A semaphore that prevents release() being called too many times. 417 418 If `.release` would increment the semaphore's value past the initial 419 value, it raises `ValueError`. Semaphores are mostly used to guard 420 resources with limited capacity, so a semaphore released too many times 421 is a sign of a bug. 422 """ 423 def __init__(self, value=1): 424 super(BoundedSemaphore, self).__init__(value=value) 425 self._initial_value = value 426 427 def release(self): 428 """Increment the counter and wake one waiter.""" 429 if self._value >= self._initial_value: 430 raise ValueError("Semaphore released too many times") 431 super(BoundedSemaphore, self).release() 432 433 434class Lock(object): 435 """A lock for coroutines. 436 437 A Lock begins unlocked, and `acquire` locks it immediately. While it is 438 locked, a coroutine that yields `acquire` waits until another coroutine 439 calls `release`. 440 441 Releasing an unlocked lock raises `RuntimeError`. 442 443 `acquire` supports the context manager protocol in all Python versions: 444 445 >>> from tornado import gen, locks 446 >>> lock = locks.Lock() 447 >>> 448 >>> @gen.coroutine 449 ... def f(): 450 ... with (yield lock.acquire()): 451 ... # Do something holding the lock. 452 ... pass 453 ... 454 ... # Now the lock is released. 455 456 In Python 3.5, `Lock` also supports the async context manager 457 protocol. Note that in this case there is no `acquire`, because 458 ``async with`` includes both the ``yield`` and the ``acquire`` 459 (just as it does with `threading.Lock`): 460 461 >>> async def f(): # doctest: +SKIP 462 ... async with lock: 463 ... # Do something holding the lock. 464 ... pass 465 ... 466 ... # Now the lock is released. 467 468 .. versionchanged:: 4.3 469 Added ``async with`` support in Python 3.5. 470 471 """ 472 def __init__(self): 473 self._block = BoundedSemaphore(value=1) 474 475 def __repr__(self): 476 return "<%s _block=%s>" % ( 477 self.__class__.__name__, 478 self._block) 479 480 def acquire(self, timeout=None): 481 """Attempt to lock. Returns a Future. 482 483 Returns a Future, which raises `tornado.gen.TimeoutError` after a 484 timeout. 485 """ 486 return self._block.acquire(timeout) 487 488 def release(self): 489 """Unlock. 490 491 The first coroutine in line waiting for `acquire` gets the lock. 492 493 If not locked, raise a `RuntimeError`. 494 """ 495 try: 496 self._block.release() 497 except ValueError: 498 raise RuntimeError('release unlocked lock') 499 500 def __enter__(self): 501 raise RuntimeError( 502 "Use Lock like 'with (yield lock)', not like 'with lock'") 503 504 __exit__ = __enter__ 505 506 @gen.coroutine 507 def __aenter__(self): 508 yield self.acquire() 509 510 @gen.coroutine 511 def __aexit__(self, typ, value, tb): 512 self.release() 513