1import math 2 3import attr 4 5import trio 6 7from . import _core 8from ._core import enable_ki_protection, ParkingLot 9from ._util import Final 10 11 12@attr.s(frozen=True) 13class _EventStatistics: 14 tasks_waiting = attr.ib() 15 16 17@attr.s(repr=False, eq=False, hash=False, slots=True) 18class Event(metaclass=Final): 19 """A waitable boolean value useful for inter-task synchronization, 20 inspired by :class:`threading.Event`. 21 22 An event object has an internal boolean flag, representing whether 23 the event has happened yet. The flag is initially False, and the 24 :meth:`wait` method waits until the flag is True. If the flag is 25 already True, then :meth:`wait` returns immediately. (If the event has 26 already happened, there's nothing to wait for.) The :meth:`set` method 27 sets the flag to True, and wakes up any waiters. 28 29 This behavior is useful because it helps avoid race conditions and 30 lost wakeups: it doesn't matter whether :meth:`set` gets called just 31 before or after :meth:`wait`. If you want a lower-level wakeup 32 primitive that doesn't have this protection, consider :class:`Condition` 33 or :class:`trio.lowlevel.ParkingLot`. 34 35 .. note:: Unlike `threading.Event`, `trio.Event` has no 36 `~threading.Event.clear` method. In Trio, once an `Event` has happened, 37 it cannot un-happen. If you need to represent a series of events, 38 consider creating a new `Event` object for each one (they're cheap!), 39 or other synchronization methods like :ref:`channels <channels>` or 40 `trio.lowlevel.ParkingLot`. 41 42 """ 43 44 _tasks = attr.ib(factory=set, init=False) 45 _flag = attr.ib(default=False, init=False) 46 47 def is_set(self): 48 """Return the current value of the internal flag.""" 49 return self._flag 50 51 @enable_ki_protection 52 def set(self): 53 """Set the internal flag value to True, and wake any waiting tasks.""" 54 if not self._flag: 55 self._flag = True 56 for task in self._tasks: 57 _core.reschedule(task) 58 self._tasks.clear() 59 60 async def wait(self): 61 """Block until the internal flag value becomes True. 62 63 If it's already True, then this method returns immediately. 64 65 """ 66 if self._flag: 67 await trio.lowlevel.checkpoint() 68 else: 69 task = _core.current_task() 70 self._tasks.add(task) 71 72 def abort_fn(_): 73 self._tasks.remove(task) 74 return _core.Abort.SUCCEEDED 75 76 await _core.wait_task_rescheduled(abort_fn) 77 78 def statistics(self): 79 """Return an object containing debugging information. 80 81 Currently the following fields are defined: 82 83 * ``tasks_waiting``: The number of tasks blocked on this event's 84 :meth:`wait` method. 85 86 """ 87 return _EventStatistics(tasks_waiting=len(self._tasks)) 88 89 90def async_cm(cls): 91 @enable_ki_protection 92 async def __aenter__(self): 93 await self.acquire() 94 95 __aenter__.__qualname__ = cls.__qualname__ + ".__aenter__" 96 cls.__aenter__ = __aenter__ 97 98 @enable_ki_protection 99 async def __aexit__(self, *args): 100 self.release() 101 102 __aexit__.__qualname__ = cls.__qualname__ + ".__aexit__" 103 cls.__aexit__ = __aexit__ 104 return cls 105 106 107@attr.s(frozen=True) 108class _CapacityLimiterStatistics: 109 borrowed_tokens = attr.ib() 110 total_tokens = attr.ib() 111 borrowers = attr.ib() 112 tasks_waiting = attr.ib() 113 114 115@async_cm 116class CapacityLimiter(metaclass=Final): 117 """An object for controlling access to a resource with limited capacity. 118 119 Sometimes you need to put a limit on how many tasks can do something at 120 the same time. For example, you might want to use some threads to run 121 multiple blocking I/O operations in parallel... but if you use too many 122 threads at once, then your system can become overloaded and it'll actually 123 make things slower. One popular solution is to impose a policy like "run 124 up to 40 threads at the same time, but no more". But how do you implement 125 a policy like this? 126 127 That's what :class:`CapacityLimiter` is for. You can think of a 128 :class:`CapacityLimiter` object as a sack that starts out holding some fixed 129 number of tokens:: 130 131 limit = trio.CapacityLimiter(40) 132 133 Then tasks can come along and borrow a token out of the sack:: 134 135 # Borrow a token: 136 async with limit: 137 # We are holding a token! 138 await perform_expensive_operation() 139 # Exiting the 'async with' block puts the token back into the sack 140 141 And crucially, if you try to borrow a token but the sack is empty, then 142 you have to wait for another task to finish what it's doing and put its 143 token back first before you can take it and continue. 144 145 Another way to think of it: a :class:`CapacityLimiter` is like a sofa with a 146 fixed number of seats, and if they're all taken then you have to wait for 147 someone to get up before you can sit down. 148 149 By default, :func:`trio.to_thread.run_sync` uses a 150 :class:`CapacityLimiter` to limit the number of threads running at once; 151 see `trio.to_thread.current_default_thread_limiter` for details. 152 153 If you're familiar with semaphores, then you can think of this as a 154 restricted semaphore that's specialized for one common use case, with 155 additional error checking. For a more traditional semaphore, see 156 :class:`Semaphore`. 157 158 .. note:: 159 160 Don't confuse this with the `"leaky bucket" 161 <https://en.wikipedia.org/wiki/Leaky_bucket>`__ or `"token bucket" 162 <https://en.wikipedia.org/wiki/Token_bucket>`__ algorithms used to 163 limit bandwidth usage on networks. The basic idea of using tokens to 164 track a resource limit is similar, but this is a very simple sack where 165 tokens aren't automatically created or destroyed over time; they're 166 just borrowed and then put back. 167 168 """ 169 170 def __init__(self, total_tokens): 171 self._lot = ParkingLot() 172 self._borrowers = set() 173 # Maps tasks attempting to acquire -> borrower, to handle on-behalf-of 174 self._pending_borrowers = {} 175 # invoke the property setter for validation 176 self.total_tokens = total_tokens 177 assert self._total_tokens == total_tokens 178 179 def __repr__(self): 180 return "<trio.CapacityLimiter at {:#x}, {}/{} with {} waiting>".format( 181 id(self), len(self._borrowers), self._total_tokens, len(self._lot) 182 ) 183 184 @property 185 def total_tokens(self): 186 """The total capacity available. 187 188 You can change :attr:`total_tokens` by assigning to this attribute. If 189 you make it larger, then the appropriate number of waiting tasks will 190 be woken immediately to take the new tokens. If you decrease 191 total_tokens below the number of tasks that are currently using the 192 resource, then all current tasks will be allowed to finish as normal, 193 but no new tasks will be allowed in until the total number of tasks 194 drops below the new total_tokens. 195 196 """ 197 return self._total_tokens 198 199 @total_tokens.setter 200 def total_tokens(self, new_total_tokens): 201 if not isinstance(new_total_tokens, int) and new_total_tokens != math.inf: 202 raise TypeError("total_tokens must be an int or math.inf") 203 if new_total_tokens < 1: 204 raise ValueError("total_tokens must be >= 1") 205 self._total_tokens = new_total_tokens 206 self._wake_waiters() 207 208 def _wake_waiters(self): 209 available = self._total_tokens - len(self._borrowers) 210 for woken in self._lot.unpark(count=available): 211 self._borrowers.add(self._pending_borrowers.pop(woken)) 212 213 @property 214 def borrowed_tokens(self): 215 """The amount of capacity that's currently in use.""" 216 return len(self._borrowers) 217 218 @property 219 def available_tokens(self): 220 """The amount of capacity that's available to use.""" 221 return self.total_tokens - self.borrowed_tokens 222 223 @enable_ki_protection 224 def acquire_nowait(self): 225 """Borrow a token from the sack, without blocking. 226 227 Raises: 228 WouldBlock: if no tokens are available. 229 RuntimeError: if the current task already holds one of this sack's 230 tokens. 231 232 """ 233 self.acquire_on_behalf_of_nowait(trio.lowlevel.current_task()) 234 235 @enable_ki_protection 236 def acquire_on_behalf_of_nowait(self, borrower): 237 """Borrow a token from the sack on behalf of ``borrower``, without 238 blocking. 239 240 Args: 241 borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object 242 used to record who is borrowing this token. This is used by 243 :func:`trio.to_thread.run_sync` to allow threads to "hold 244 tokens", with the intention in the future of using it to `allow 245 deadlock detection and other useful things 246 <https://github.com/python-trio/trio/issues/182>`__ 247 248 Raises: 249 WouldBlock: if no tokens are available. 250 RuntimeError: if ``borrower`` already holds one of this sack's 251 tokens. 252 253 """ 254 if borrower in self._borrowers: 255 raise RuntimeError( 256 "this borrower is already holding one of this " 257 "CapacityLimiter's tokens" 258 ) 259 if len(self._borrowers) < self._total_tokens and not self._lot: 260 self._borrowers.add(borrower) 261 else: 262 raise trio.WouldBlock 263 264 @enable_ki_protection 265 async def acquire(self): 266 """Borrow a token from the sack, blocking if necessary. 267 268 Raises: 269 RuntimeError: if the current task already holds one of this sack's 270 tokens. 271 272 """ 273 await self.acquire_on_behalf_of(trio.lowlevel.current_task()) 274 275 @enable_ki_protection 276 async def acquire_on_behalf_of(self, borrower): 277 """Borrow a token from the sack on behalf of ``borrower``, blocking if 278 necessary. 279 280 Args: 281 borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object 282 used to record who is borrowing this token; see 283 :meth:`acquire_on_behalf_of_nowait` for details. 284 285 Raises: 286 RuntimeError: if ``borrower`` task already holds one of this sack's 287 tokens. 288 289 """ 290 await trio.lowlevel.checkpoint_if_cancelled() 291 try: 292 self.acquire_on_behalf_of_nowait(borrower) 293 except trio.WouldBlock: 294 task = trio.lowlevel.current_task() 295 self._pending_borrowers[task] = borrower 296 try: 297 await self._lot.park() 298 except trio.Cancelled: 299 self._pending_borrowers.pop(task) 300 raise 301 else: 302 await trio.lowlevel.cancel_shielded_checkpoint() 303 304 @enable_ki_protection 305 def release(self): 306 """Put a token back into the sack. 307 308 Raises: 309 RuntimeError: if the current task has not acquired one of this 310 sack's tokens. 311 312 """ 313 self.release_on_behalf_of(trio.lowlevel.current_task()) 314 315 @enable_ki_protection 316 def release_on_behalf_of(self, borrower): 317 """Put a token back into the sack on behalf of ``borrower``. 318 319 Raises: 320 RuntimeError: if the given borrower has not acquired one of this 321 sack's tokens. 322 323 """ 324 if borrower not in self._borrowers: 325 raise RuntimeError( 326 "this borrower isn't holding any of this CapacityLimiter's tokens" 327 ) 328 self._borrowers.remove(borrower) 329 self._wake_waiters() 330 331 def statistics(self): 332 """Return an object containing debugging information. 333 334 Currently the following fields are defined: 335 336 * ``borrowed_tokens``: The number of tokens currently borrowed from 337 the sack. 338 * ``total_tokens``: The total number of tokens in the sack. Usually 339 this will be larger than ``borrowed_tokens``, but it's possibly for 340 it to be smaller if :attr:`total_tokens` was recently decreased. 341 * ``borrowers``: A list of all tasks or other entities that currently 342 hold a token. 343 * ``tasks_waiting``: The number of tasks blocked on this 344 :class:`CapacityLimiter`\'s :meth:`acquire` or 345 :meth:`acquire_on_behalf_of` methods. 346 347 """ 348 return _CapacityLimiterStatistics( 349 borrowed_tokens=len(self._borrowers), 350 total_tokens=self._total_tokens, 351 # Use a list instead of a frozenset just in case we start to allow 352 # one borrower to hold multiple tokens in the future 353 borrowers=list(self._borrowers), 354 tasks_waiting=len(self._lot), 355 ) 356 357 358@async_cm 359class Semaphore(metaclass=Final): 360 """A `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`__. 361 362 A semaphore holds an integer value, which can be incremented by 363 calling :meth:`release` and decremented by calling :meth:`acquire` – but 364 the value is never allowed to drop below zero. If the value is zero, then 365 :meth:`acquire` will block until someone calls :meth:`release`. 366 367 If you're looking for a :class:`Semaphore` to limit the number of tasks 368 that can access some resource simultaneously, then consider using a 369 :class:`CapacityLimiter` instead. 370 371 This object's interface is similar to, but different from, that of 372 :class:`threading.Semaphore`. 373 374 A :class:`Semaphore` object can be used as an async context manager; it 375 blocks on entry but not on exit. 376 377 Args: 378 initial_value (int): A non-negative integer giving semaphore's initial 379 value. 380 max_value (int or None): If given, makes this a "bounded" semaphore that 381 raises an error if the value is about to exceed the given 382 ``max_value``. 383 384 """ 385 386 def __init__(self, initial_value, *, max_value=None): 387 if not isinstance(initial_value, int): 388 raise TypeError("initial_value must be an int") 389 if initial_value < 0: 390 raise ValueError("initial value must be >= 0") 391 if max_value is not None: 392 if not isinstance(max_value, int): 393 raise TypeError("max_value must be None or an int") 394 if max_value < initial_value: 395 raise ValueError("max_values must be >= initial_value") 396 397 # Invariants: 398 # bool(self._lot) implies self._value == 0 399 # (or equivalently: self._value > 0 implies not self._lot) 400 self._lot = trio.lowlevel.ParkingLot() 401 self._value = initial_value 402 self._max_value = max_value 403 404 def __repr__(self): 405 if self._max_value is None: 406 max_value_str = "" 407 else: 408 max_value_str = ", max_value={}".format(self._max_value) 409 return "<trio.Semaphore({}{}) at {:#x}>".format( 410 self._value, max_value_str, id(self) 411 ) 412 413 @property 414 def value(self): 415 """The current value of the semaphore.""" 416 return self._value 417 418 @property 419 def max_value(self): 420 """The maximum allowed value. May be None to indicate no limit.""" 421 return self._max_value 422 423 @enable_ki_protection 424 def acquire_nowait(self): 425 """Attempt to decrement the semaphore value, without blocking. 426 427 Raises: 428 WouldBlock: if the value is zero. 429 430 """ 431 if self._value > 0: 432 assert not self._lot 433 self._value -= 1 434 else: 435 raise trio.WouldBlock 436 437 @enable_ki_protection 438 async def acquire(self): 439 """Decrement the semaphore value, blocking if necessary to avoid 440 letting it drop below zero. 441 442 """ 443 await trio.lowlevel.checkpoint_if_cancelled() 444 try: 445 self.acquire_nowait() 446 except trio.WouldBlock: 447 await self._lot.park() 448 else: 449 await trio.lowlevel.cancel_shielded_checkpoint() 450 451 @enable_ki_protection 452 def release(self): 453 """Increment the semaphore value, possibly waking a task blocked in 454 :meth:`acquire`. 455 456 Raises: 457 ValueError: if incrementing the value would cause it to exceed 458 :attr:`max_value`. 459 460 """ 461 if self._lot: 462 assert self._value == 0 463 self._lot.unpark(count=1) 464 else: 465 if self._max_value is not None and self._value == self._max_value: 466 raise ValueError("semaphore released too many times") 467 self._value += 1 468 469 def statistics(self): 470 """Return an object containing debugging information. 471 472 Currently the following fields are defined: 473 474 * ``tasks_waiting``: The number of tasks blocked on this semaphore's 475 :meth:`acquire` method. 476 477 """ 478 return self._lot.statistics() 479 480 481@attr.s(frozen=True) 482class _LockStatistics: 483 locked = attr.ib() 484 owner = attr.ib() 485 tasks_waiting = attr.ib() 486 487 488@async_cm 489@attr.s(eq=False, hash=False, repr=False) 490class _LockImpl: 491 _lot = attr.ib(factory=ParkingLot, init=False) 492 _owner = attr.ib(default=None, init=False) 493 494 def __repr__(self): 495 if self.locked(): 496 s1 = "locked" 497 s2 = " with {} waiters".format(len(self._lot)) 498 else: 499 s1 = "unlocked" 500 s2 = "" 501 return "<{} {} object at {:#x}{}>".format( 502 s1, self.__class__.__name__, id(self), s2 503 ) 504 505 def locked(self): 506 """Check whether the lock is currently held. 507 508 Returns: 509 bool: True if the lock is held, False otherwise. 510 511 """ 512 return self._owner is not None 513 514 @enable_ki_protection 515 def acquire_nowait(self): 516 """Attempt to acquire the lock, without blocking. 517 518 Raises: 519 WouldBlock: if the lock is held. 520 521 """ 522 523 task = trio.lowlevel.current_task() 524 if self._owner is task: 525 raise RuntimeError("attempt to re-acquire an already held Lock") 526 elif self._owner is None and not self._lot: 527 # No-one owns it 528 self._owner = task 529 else: 530 raise trio.WouldBlock 531 532 @enable_ki_protection 533 async def acquire(self): 534 """Acquire the lock, blocking if necessary.""" 535 await trio.lowlevel.checkpoint_if_cancelled() 536 try: 537 self.acquire_nowait() 538 except trio.WouldBlock: 539 # NOTE: it's important that the contended acquire path is just 540 # "_lot.park()", because that's how Condition.wait() acquires the 541 # lock as well. 542 await self._lot.park() 543 else: 544 await trio.lowlevel.cancel_shielded_checkpoint() 545 546 @enable_ki_protection 547 def release(self): 548 """Release the lock. 549 550 Raises: 551 RuntimeError: if the calling task does not hold the lock. 552 553 """ 554 task = trio.lowlevel.current_task() 555 if task is not self._owner: 556 raise RuntimeError("can't release a Lock you don't own") 557 if self._lot: 558 (self._owner,) = self._lot.unpark(count=1) 559 else: 560 self._owner = None 561 562 def statistics(self): 563 """Return an object containing debugging information. 564 565 Currently the following fields are defined: 566 567 * ``locked``: boolean indicating whether the lock is held. 568 * ``owner``: the :class:`trio.lowlevel.Task` currently holding the lock, 569 or None if the lock is not held. 570 * ``tasks_waiting``: The number of tasks blocked on this lock's 571 :meth:`acquire` method. 572 573 """ 574 return _LockStatistics( 575 locked=self.locked(), owner=self._owner, tasks_waiting=len(self._lot) 576 ) 577 578 579class Lock(_LockImpl, metaclass=Final): 580 """A classic `mutex 581 <https://en.wikipedia.org/wiki/Lock_(computer_science)>`__. 582 583 This is a non-reentrant, single-owner lock. Unlike 584 :class:`threading.Lock`, only the owner of the lock is allowed to release 585 it. 586 587 A :class:`Lock` object can be used as an async context manager; it 588 blocks on entry but not on exit. 589 590 """ 591 592 593class StrictFIFOLock(_LockImpl, metaclass=Final): 594 r"""A variant of :class:`Lock` where tasks are guaranteed to acquire the 595 lock in strict first-come-first-served order. 596 597 An example of when this is useful is if you're implementing something like 598 :class:`trio.SSLStream` or an HTTP/2 server using `h2 599 <https://hyper-h2.readthedocs.io/>`__, where you have multiple concurrent 600 tasks that are interacting with a shared state machine, and at 601 unpredictable moments the state machine requests that a chunk of data be 602 sent over the network. (For example, when using h2 simply reading incoming 603 data can occasionally `create outgoing data to send 604 <https://http2.github.io/http2-spec/#PING>`__.) The challenge is to make 605 sure that these chunks are sent in the correct order, without being 606 garbled. 607 608 One option would be to use a regular :class:`Lock`, and wrap it around 609 every interaction with the state machine:: 610 611 # This approach is sometimes workable but often sub-optimal; see below 612 async with lock: 613 state_machine.do_something() 614 if state_machine.has_data_to_send(): 615 await conn.sendall(state_machine.get_data_to_send()) 616 617 But this can be problematic. If you're using h2 then *usually* reading 618 incoming data doesn't create the need to send any data, so we don't want 619 to force every task that tries to read from the network to sit and wait 620 a potentially long time for ``sendall`` to finish. And in some situations 621 this could even potentially cause a deadlock, if the remote peer is 622 waiting for you to read some data before it accepts the data you're 623 sending. 624 625 :class:`StrictFIFOLock` provides an alternative. We can rewrite our 626 example like:: 627 628 # Note: no awaits between when we start using the state machine and 629 # when we block to take the lock! 630 state_machine.do_something() 631 if state_machine.has_data_to_send(): 632 # Notice that we fetch the data to send out of the state machine 633 # *before* sleeping, so that other tasks won't see it. 634 chunk = state_machine.get_data_to_send() 635 async with strict_fifo_lock: 636 await conn.sendall(chunk) 637 638 First we do all our interaction with the state machine in a single 639 scheduling quantum (notice there are no ``await``\s in there), so it's 640 automatically atomic with respect to other tasks. And then if and only if 641 we have data to send, we get in line to send it – and 642 :class:`StrictFIFOLock` guarantees that each task will send its data in 643 the same order that the state machine generated it. 644 645 Currently, :class:`StrictFIFOLock` is identical to :class:`Lock`, 646 but (a) this may not always be true in the future, especially if Trio ever 647 implements `more sophisticated scheduling policies 648 <https://github.com/python-trio/trio/issues/32>`__, and (b) the above code 649 is relying on a pretty subtle property of its lock. Using a 650 :class:`StrictFIFOLock` acts as an executable reminder that you're relying 651 on this property. 652 653 """ 654 655 656@attr.s(frozen=True) 657class _ConditionStatistics: 658 tasks_waiting = attr.ib() 659 lock_statistics = attr.ib() 660 661 662@async_cm 663class Condition(metaclass=Final): 664 """A classic `condition variable 665 <https://en.wikipedia.org/wiki/Monitor_(synchronization)>`__, similar to 666 :class:`threading.Condition`. 667 668 A :class:`Condition` object can be used as an async context manager to 669 acquire the underlying lock; it blocks on entry but not on exit. 670 671 Args: 672 lock (Lock): the lock object to use. If given, must be a 673 :class:`trio.Lock`. If None, a new :class:`Lock` will be allocated 674 and used. 675 676 """ 677 678 def __init__(self, lock=None): 679 if lock is None: 680 lock = Lock() 681 if not type(lock) is Lock: 682 raise TypeError("lock must be a trio.Lock") 683 self._lock = lock 684 self._lot = trio.lowlevel.ParkingLot() 685 686 def locked(self): 687 """Check whether the underlying lock is currently held. 688 689 Returns: 690 bool: True if the lock is held, False otherwise. 691 692 """ 693 return self._lock.locked() 694 695 def acquire_nowait(self): 696 """Attempt to acquire the underlying lock, without blocking. 697 698 Raises: 699 WouldBlock: if the lock is currently held. 700 701 """ 702 return self._lock.acquire_nowait() 703 704 async def acquire(self): 705 """Acquire the underlying lock, blocking if necessary.""" 706 await self._lock.acquire() 707 708 def release(self): 709 """Release the underlying lock.""" 710 self._lock.release() 711 712 @enable_ki_protection 713 async def wait(self): 714 """Wait for another task to call :meth:`notify` or 715 :meth:`notify_all`. 716 717 When calling this method, you must hold the lock. It releases the lock 718 while waiting, and then re-acquires it before waking up. 719 720 There is a subtlety with how this method interacts with cancellation: 721 when cancelled it will block to re-acquire the lock before raising 722 :exc:`Cancelled`. This may cause cancellation to be less prompt than 723 expected. The advantage is that it makes code like this work:: 724 725 async with condition: 726 await condition.wait() 727 728 If we didn't re-acquire the lock before waking up, and :meth:`wait` 729 were cancelled here, then we'd crash in ``condition.__aexit__`` when 730 we tried to release the lock we no longer held. 731 732 Raises: 733 RuntimeError: if the calling task does not hold the lock. 734 735 """ 736 if trio.lowlevel.current_task() is not self._lock._owner: 737 raise RuntimeError("must hold the lock to wait") 738 self.release() 739 # NOTE: we go to sleep on self._lot, but we'll wake up on 740 # self._lock._lot. That's all that's required to acquire a Lock. 741 try: 742 await self._lot.park() 743 except: 744 with trio.CancelScope(shield=True): 745 await self.acquire() 746 raise 747 748 def notify(self, n=1): 749 """Wake one or more tasks that are blocked in :meth:`wait`. 750 751 Args: 752 n (int): The number of tasks to wake. 753 754 Raises: 755 RuntimeError: if the calling task does not hold the lock. 756 757 """ 758 if trio.lowlevel.current_task() is not self._lock._owner: 759 raise RuntimeError("must hold the lock to notify") 760 self._lot.repark(self._lock._lot, count=n) 761 762 def notify_all(self): 763 """Wake all tasks that are currently blocked in :meth:`wait`. 764 765 Raises: 766 RuntimeError: if the calling task does not hold the lock. 767 768 """ 769 if trio.lowlevel.current_task() is not self._lock._owner: 770 raise RuntimeError("must hold the lock to notify") 771 self._lot.repark_all(self._lock._lot) 772 773 def statistics(self): 774 r"""Return an object containing debugging information. 775 776 Currently the following fields are defined: 777 778 * ``tasks_waiting``: The number of tasks blocked on this condition's 779 :meth:`wait` method. 780 * ``lock_statistics``: The result of calling the underlying 781 :class:`Lock`\s :meth:`~Lock.statistics` method. 782 783 """ 784 return _ConditionStatistics( 785 tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics() 786 ) 787