1"""Synchronization primitives.""" 2 3__all__ = ['Lock', 'Event', 'Condition', 'Semaphore', 'BoundedSemaphore'] 4 5import collections 6 7from . import events 8from . import futures 9from .coroutines import coroutine 10 11 12class _ContextManager: 13 """Context manager. 14 15 This enables the following idiom for acquiring and releasing a 16 lock around a block: 17 18 with (yield from lock): 19 <block> 20 21 while failing loudly when accidentally using: 22 23 with lock: 24 <block> 25 """ 26 27 def __init__(self, lock): 28 self._lock = lock 29 30 def __enter__(self): 31 # We have no use for the "as ..." clause in the with 32 # statement for locks. 33 return None 34 35 def __exit__(self, *args): 36 try: 37 self._lock.release() 38 finally: 39 self._lock = None # Crudely prevent reuse. 40 41 42class Lock: 43 """Primitive lock objects. 44 45 A primitive lock is a synchronization primitive that is not owned 46 by a particular coroutine when locked. A primitive lock is in one 47 of two states, 'locked' or 'unlocked'. 48 49 It is created in the unlocked state. It has two basic methods, 50 acquire() and release(). When the state is unlocked, acquire() 51 changes the state to locked and returns immediately. When the 52 state is locked, acquire() blocks until a call to release() in 53 another coroutine changes it to unlocked, then the acquire() call 54 resets it to locked and returns. The release() method should only 55 be called in the locked state; it changes the state to unlocked 56 and returns immediately. If an attempt is made to release an 57 unlocked lock, a RuntimeError will be raised. 58 59 When more than one coroutine is blocked in acquire() waiting for 60 the state to turn to unlocked, only one coroutine proceeds when a 61 release() call resets the state to unlocked; first coroutine which 62 is blocked in acquire() is being processed. 63 64 acquire() is a coroutine and should be called with 'yield from'. 65 66 Locks also support the context management protocol. '(yield from lock)' 67 should be used as context manager expression. 68 69 Usage: 70 71 lock = Lock() 72 ... 73 yield from lock 74 try: 75 ... 76 finally: 77 lock.release() 78 79 Context manager usage: 80 81 lock = Lock() 82 ... 83 with (yield from lock): 84 ... 85 86 Lock objects can be tested for locking state: 87 88 if not lock.locked(): 89 yield from lock 90 else: 91 # lock is acquired 92 ... 93 94 """ 95 96 def __init__(self, *, loop=None): 97 self._waiters = collections.deque() 98 self._locked = False 99 if loop is not None: 100 self._loop = loop 101 else: 102 self._loop = events.get_event_loop() 103 104 def __repr__(self): 105 res = super().__repr__() 106 extra = 'locked' if self._locked else 'unlocked' 107 if self._waiters: 108 extra = '{},waiters:{}'.format(extra, len(self._waiters)) 109 return '<{} [{}]>'.format(res[1:-1], extra) 110 111 def locked(self): 112 """Return True if lock is acquired.""" 113 return self._locked 114 115 @coroutine 116 def acquire(self): 117 """Acquire a lock. 118 119 This method blocks until the lock is unlocked, then sets it to 120 locked and returns True. 121 """ 122 if not self._waiters and not self._locked: 123 self._locked = True 124 return True 125 126 fut = futures.Future(loop=self._loop) 127 self._waiters.append(fut) 128 try: 129 yield from fut 130 self._locked = True 131 return True 132 finally: 133 self._waiters.remove(fut) 134 135 def release(self): 136 """Release a lock. 137 138 When the lock is locked, reset it to unlocked, and return. 139 If any other coroutines are blocked waiting for the lock to become 140 unlocked, allow exactly one of them to proceed. 141 142 When invoked on an unlocked lock, a RuntimeError is raised. 143 144 There is no return value. 145 """ 146 if self._locked: 147 self._locked = False 148 # Wake up the first waiter who isn't cancelled. 149 for fut in self._waiters: 150 if not fut.done(): 151 fut.set_result(True) 152 break 153 else: 154 raise RuntimeError('Lock is not acquired.') 155 156 def __enter__(self): 157 raise RuntimeError( 158 '"yield from" should be used as context manager expression') 159 160 def __exit__(self, *args): 161 # This must exist because __enter__ exists, even though that 162 # always raises; that's how the with-statement works. 163 pass 164 165 def __iter__(self): 166 # This is not a coroutine. It is meant to enable the idiom: 167 # 168 # with (yield from lock): 169 # <block> 170 # 171 # as an alternative to: 172 # 173 # yield from lock.acquire() 174 # try: 175 # <block> 176 # finally: 177 # lock.release() 178 yield from self.acquire() 179 return _ContextManager(self) 180 181 182class Event: 183 """Asynchronous equivalent to threading.Event. 184 185 Class implementing event objects. An event manages a flag that can be set 186 to true with the set() method and reset to false with the clear() method. 187 The wait() method blocks until the flag is true. The flag is initially 188 false. 189 """ 190 191 def __init__(self, *, loop=None): 192 self._waiters = collections.deque() 193 self._value = False 194 if loop is not None: 195 self._loop = loop 196 else: 197 self._loop = events.get_event_loop() 198 199 def __repr__(self): 200 res = super().__repr__() 201 extra = 'set' if self._value else 'unset' 202 if self._waiters: 203 extra = '{},waiters:{}'.format(extra, len(self._waiters)) 204 return '<{} [{}]>'.format(res[1:-1], extra) 205 206 def is_set(self): 207 """Return True if and only if the internal flag is true.""" 208 return self._value 209 210 def set(self): 211 """Set the internal flag to true. All coroutines waiting for it to 212 become true are awakened. Coroutine that call wait() once the flag is 213 true will not block at all. 214 """ 215 if not self._value: 216 self._value = True 217 218 for fut in self._waiters: 219 if not fut.done(): 220 fut.set_result(True) 221 222 def clear(self): 223 """Reset the internal flag to false. Subsequently, coroutines calling 224 wait() will block until set() is called to set the internal flag 225 to true again.""" 226 self._value = False 227 228 @coroutine 229 def wait(self): 230 """Block until the internal flag is true. 231 232 If the internal flag is true on entry, return True 233 immediately. Otherwise, block until another coroutine calls 234 set() to set the flag to true, then return True. 235 """ 236 if self._value: 237 return True 238 239 fut = futures.Future(loop=self._loop) 240 self._waiters.append(fut) 241 try: 242 yield from fut 243 return True 244 finally: 245 self._waiters.remove(fut) 246 247 248class Condition: 249 """Asynchronous equivalent to threading.Condition. 250 251 This class implements condition variable objects. A condition variable 252 allows one or more coroutines to wait until they are notified by another 253 coroutine. 254 255 A new Lock object is created and used as the underlying lock. 256 """ 257 258 def __init__(self, lock=None, *, loop=None): 259 if loop is not None: 260 self._loop = loop 261 else: 262 self._loop = events.get_event_loop() 263 264 if lock is None: 265 lock = Lock(loop=self._loop) 266 elif lock._loop is not self._loop: 267 raise ValueError("loop argument must agree with lock") 268 269 self._lock = lock 270 # Export the lock's locked(), acquire() and release() methods. 271 self.locked = lock.locked 272 self.acquire = lock.acquire 273 self.release = lock.release 274 275 self._waiters = collections.deque() 276 277 def __repr__(self): 278 res = super().__repr__() 279 extra = 'locked' if self.locked() else 'unlocked' 280 if self._waiters: 281 extra = '{},waiters:{}'.format(extra, len(self._waiters)) 282 return '<{} [{}]>'.format(res[1:-1], extra) 283 284 @coroutine 285 def wait(self): 286 """Wait until notified. 287 288 If the calling coroutine has not acquired the lock when this 289 method is called, a RuntimeError is raised. 290 291 This method releases the underlying lock, and then blocks 292 until it is awakened by a notify() or notify_all() call for 293 the same condition variable in another coroutine. Once 294 awakened, it re-acquires the lock and returns True. 295 """ 296 if not self.locked(): 297 raise RuntimeError('cannot wait on un-acquired lock') 298 299 self.release() 300 try: 301 fut = futures.Future(loop=self._loop) 302 self._waiters.append(fut) 303 try: 304 yield from fut 305 return True 306 finally: 307 self._waiters.remove(fut) 308 309 finally: 310 yield from self.acquire() 311 312 @coroutine 313 def wait_for(self, predicate): 314 """Wait until a predicate becomes true. 315 316 The predicate should be a callable which result will be 317 interpreted as a boolean value. The final predicate value is 318 the return value. 319 """ 320 result = predicate() 321 while not result: 322 yield from self.wait() 323 result = predicate() 324 return result 325 326 def notify(self, n=1): 327 """By default, wake up one coroutine waiting on this condition, if any. 328 If the calling coroutine has not acquired the lock when this method 329 is called, a RuntimeError is raised. 330 331 This method wakes up at most n of the coroutines waiting for the 332 condition variable; it is a no-op if no coroutines are waiting. 333 334 Note: an awakened coroutine does not actually return from its 335 wait() call until it can reacquire the lock. Since notify() does 336 not release the lock, its caller should. 337 """ 338 if not self.locked(): 339 raise RuntimeError('cannot notify on un-acquired lock') 340 341 idx = 0 342 for fut in self._waiters: 343 if idx >= n: 344 break 345 346 if not fut.done(): 347 idx += 1 348 fut.set_result(False) 349 350 def notify_all(self): 351 """Wake up all threads waiting on this condition. This method acts 352 like notify(), but wakes up all waiting threads instead of one. If the 353 calling thread has not acquired the lock when this method is called, 354 a RuntimeError is raised. 355 """ 356 self.notify(len(self._waiters)) 357 358 def __enter__(self): 359 raise RuntimeError( 360 '"yield from" should be used as context manager expression') 361 362 def __exit__(self, *args): 363 pass 364 365 def __iter__(self): 366 # See comment in Lock.__iter__(). 367 yield from self.acquire() 368 return _ContextManager(self) 369 370 371class Semaphore: 372 """A Semaphore implementation. 373 374 A semaphore manages an internal counter which is decremented by each 375 acquire() call and incremented by each release() call. The counter 376 can never go below zero; when acquire() finds that it is zero, it blocks, 377 waiting until some other thread calls release(). 378 379 Semaphores also support the context management protocol. 380 381 The optional argument gives the initial value for the internal 382 counter; it defaults to 1. If the value given is less than 0, 383 ValueError is raised. 384 """ 385 386 def __init__(self, value=1, *, loop=None): 387 if value < 0: 388 raise ValueError("Semaphore initial value must be >= 0") 389 self._value = value 390 self._waiters = collections.deque() 391 if loop is not None: 392 self._loop = loop 393 else: 394 self._loop = events.get_event_loop() 395 396 def __repr__(self): 397 res = super().__repr__() 398 extra = 'locked' if self.locked() else 'unlocked,value:{}'.format( 399 self._value) 400 if self._waiters: 401 extra = '{},waiters:{}'.format(extra, len(self._waiters)) 402 return '<{} [{}]>'.format(res[1:-1], extra) 403 404 def locked(self): 405 """Returns True if semaphore can not be acquired immediately.""" 406 return self._value == 0 407 408 @coroutine 409 def acquire(self): 410 """Acquire a semaphore. 411 412 If the internal counter is larger than zero on entry, 413 decrement it by one and return True immediately. If it is 414 zero on entry, block, waiting until some other coroutine has 415 called release() to make it larger than 0, and then return 416 True. 417 """ 418 if not self._waiters and self._value > 0: 419 self._value -= 1 420 return True 421 422 fut = futures.Future(loop=self._loop) 423 self._waiters.append(fut) 424 try: 425 yield from fut 426 self._value -= 1 427 return True 428 finally: 429 self._waiters.remove(fut) 430 431 def release(self): 432 """Release a semaphore, incrementing the internal counter by one. 433 When it was zero on entry and another coroutine is waiting for it to 434 become larger than zero again, wake up that coroutine. 435 """ 436 self._value += 1 437 for waiter in self._waiters: 438 if not waiter.done(): 439 waiter.set_result(True) 440 break 441 442 def __enter__(self): 443 raise RuntimeError( 444 '"yield from" should be used as context manager expression') 445 446 def __exit__(self, *args): 447 pass 448 449 def __iter__(self): 450 # See comment in Lock.__iter__(). 451 yield from self.acquire() 452 return _ContextManager(self) 453 454 455class BoundedSemaphore(Semaphore): 456 """A bounded semaphore implementation. 457 458 This raises ValueError in release() if it would increase the value 459 above the initial value. 460 """ 461 462 def __init__(self, value=1, *, loop=None): 463 self._bound_value = value 464 super().__init__(value, loop=loop) 465 466 def release(self): 467 if self._value >= self._bound_value: 468 raise ValueError('BoundedSemaphore released too many times') 469 super().release() 470