1import math
2
3import attr
4
5import trio
6
7from . import _core
8from ._core import enable_ki_protection, ParkingLot
9from ._util import Final
10
11
12@attr.s(frozen=True)
13class _EventStatistics:
14    tasks_waiting = attr.ib()
15
16
17@attr.s(repr=False, eq=False, hash=False, slots=True)
18class Event(metaclass=Final):
19    """A waitable boolean value useful for inter-task synchronization,
20    inspired by :class:`threading.Event`.
21
22    An event object has an internal boolean flag, representing whether
23    the event has happened yet. The flag is initially False, and the
24    :meth:`wait` method waits until the flag is True. If the flag is
25    already True, then :meth:`wait` returns immediately. (If the event has
26    already happened, there's nothing to wait for.) The :meth:`set` method
27    sets the flag to True, and wakes up any waiters.
28
29    This behavior is useful because it helps avoid race conditions and
30    lost wakeups: it doesn't matter whether :meth:`set` gets called just
31    before or after :meth:`wait`. If you want a lower-level wakeup
32    primitive that doesn't have this protection, consider :class:`Condition`
33    or :class:`trio.lowlevel.ParkingLot`.
34
35    .. note:: Unlike `threading.Event`, `trio.Event` has no
36       `~threading.Event.clear` method. In Trio, once an `Event` has happened,
37       it cannot un-happen. If you need to represent a series of events,
38       consider creating a new `Event` object for each one (they're cheap!),
39       or other synchronization methods like :ref:`channels <channels>` or
40       `trio.lowlevel.ParkingLot`.
41
42    """
43
44    _tasks = attr.ib(factory=set, init=False)
45    _flag = attr.ib(default=False, init=False)
46
47    def is_set(self):
48        """Return the current value of the internal flag."""
49        return self._flag
50
51    @enable_ki_protection
52    def set(self):
53        """Set the internal flag value to True, and wake any waiting tasks."""
54        if not self._flag:
55            self._flag = True
56            for task in self._tasks:
57                _core.reschedule(task)
58            self._tasks.clear()
59
60    async def wait(self):
61        """Block until the internal flag value becomes True.
62
63        If it's already True, then this method returns immediately.
64
65        """
66        if self._flag:
67            await trio.lowlevel.checkpoint()
68        else:
69            task = _core.current_task()
70            self._tasks.add(task)
71
72            def abort_fn(_):
73                self._tasks.remove(task)
74                return _core.Abort.SUCCEEDED
75
76            await _core.wait_task_rescheduled(abort_fn)
77
78    def statistics(self):
79        """Return an object containing debugging information.
80
81        Currently the following fields are defined:
82
83        * ``tasks_waiting``: The number of tasks blocked on this event's
84          :meth:`wait` method.
85
86        """
87        return _EventStatistics(tasks_waiting=len(self._tasks))
88
89
90def async_cm(cls):
91    @enable_ki_protection
92    async def __aenter__(self):
93        await self.acquire()
94
95    __aenter__.__qualname__ = cls.__qualname__ + ".__aenter__"
96    cls.__aenter__ = __aenter__
97
98    @enable_ki_protection
99    async def __aexit__(self, *args):
100        self.release()
101
102    __aexit__.__qualname__ = cls.__qualname__ + ".__aexit__"
103    cls.__aexit__ = __aexit__
104    return cls
105
106
107@attr.s(frozen=True)
108class _CapacityLimiterStatistics:
109    borrowed_tokens = attr.ib()
110    total_tokens = attr.ib()
111    borrowers = attr.ib()
112    tasks_waiting = attr.ib()
113
114
115@async_cm
116class CapacityLimiter(metaclass=Final):
117    """An object for controlling access to a resource with limited capacity.
118
119    Sometimes you need to put a limit on how many tasks can do something at
120    the same time. For example, you might want to use some threads to run
121    multiple blocking I/O operations in parallel... but if you use too many
122    threads at once, then your system can become overloaded and it'll actually
123    make things slower. One popular solution is to impose a policy like "run
124    up to 40 threads at the same time, but no more". But how do you implement
125    a policy like this?
126
127    That's what :class:`CapacityLimiter` is for. You can think of a
128    :class:`CapacityLimiter` object as a sack that starts out holding some fixed
129    number of tokens::
130
131       limit = trio.CapacityLimiter(40)
132
133    Then tasks can come along and borrow a token out of the sack::
134
135       # Borrow a token:
136       async with limit:
137           # We are holding a token!
138           await perform_expensive_operation()
139       # Exiting the 'async with' block puts the token back into the sack
140
141    And crucially, if you try to borrow a token but the sack is empty, then
142    you have to wait for another task to finish what it's doing and put its
143    token back first before you can take it and continue.
144
145    Another way to think of it: a :class:`CapacityLimiter` is like a sofa with a
146    fixed number of seats, and if they're all taken then you have to wait for
147    someone to get up before you can sit down.
148
149    By default, :func:`trio.to_thread.run_sync` uses a
150    :class:`CapacityLimiter` to limit the number of threads running at once;
151    see `trio.to_thread.current_default_thread_limiter` for details.
152
153    If you're familiar with semaphores, then you can think of this as a
154    restricted semaphore that's specialized for one common use case, with
155    additional error checking. For a more traditional semaphore, see
156    :class:`Semaphore`.
157
158    .. note::
159
160       Don't confuse this with the `"leaky bucket"
161       <https://en.wikipedia.org/wiki/Leaky_bucket>`__ or `"token bucket"
162       <https://en.wikipedia.org/wiki/Token_bucket>`__ algorithms used to
163       limit bandwidth usage on networks. The basic idea of using tokens to
164       track a resource limit is similar, but this is a very simple sack where
165       tokens aren't automatically created or destroyed over time; they're
166       just borrowed and then put back.
167
168    """
169
170    def __init__(self, total_tokens):
171        self._lot = ParkingLot()
172        self._borrowers = set()
173        # Maps tasks attempting to acquire -> borrower, to handle on-behalf-of
174        self._pending_borrowers = {}
175        # invoke the property setter for validation
176        self.total_tokens = total_tokens
177        assert self._total_tokens == total_tokens
178
179    def __repr__(self):
180        return "<trio.CapacityLimiter at {:#x}, {}/{} with {} waiting>".format(
181            id(self), len(self._borrowers), self._total_tokens, len(self._lot)
182        )
183
184    @property
185    def total_tokens(self):
186        """The total capacity available.
187
188        You can change :attr:`total_tokens` by assigning to this attribute. If
189        you make it larger, then the appropriate number of waiting tasks will
190        be woken immediately to take the new tokens. If you decrease
191        total_tokens below the number of tasks that are currently using the
192        resource, then all current tasks will be allowed to finish as normal,
193        but no new tasks will be allowed in until the total number of tasks
194        drops below the new total_tokens.
195
196        """
197        return self._total_tokens
198
199    @total_tokens.setter
200    def total_tokens(self, new_total_tokens):
201        if not isinstance(new_total_tokens, int) and new_total_tokens != math.inf:
202            raise TypeError("total_tokens must be an int or math.inf")
203        if new_total_tokens < 1:
204            raise ValueError("total_tokens must be >= 1")
205        self._total_tokens = new_total_tokens
206        self._wake_waiters()
207
208    def _wake_waiters(self):
209        available = self._total_tokens - len(self._borrowers)
210        for woken in self._lot.unpark(count=available):
211            self._borrowers.add(self._pending_borrowers.pop(woken))
212
213    @property
214    def borrowed_tokens(self):
215        """The amount of capacity that's currently in use."""
216        return len(self._borrowers)
217
218    @property
219    def available_tokens(self):
220        """The amount of capacity that's available to use."""
221        return self.total_tokens - self.borrowed_tokens
222
223    @enable_ki_protection
224    def acquire_nowait(self):
225        """Borrow a token from the sack, without blocking.
226
227        Raises:
228          WouldBlock: if no tokens are available.
229          RuntimeError: if the current task already holds one of this sack's
230              tokens.
231
232        """
233        self.acquire_on_behalf_of_nowait(trio.lowlevel.current_task())
234
235    @enable_ki_protection
236    def acquire_on_behalf_of_nowait(self, borrower):
237        """Borrow a token from the sack on behalf of ``borrower``, without
238        blocking.
239
240        Args:
241          borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object
242             used to record who is borrowing this token. This is used by
243             :func:`trio.to_thread.run_sync` to allow threads to "hold
244             tokens", with the intention in the future of using it to `allow
245             deadlock detection and other useful things
246             <https://github.com/python-trio/trio/issues/182>`__
247
248        Raises:
249          WouldBlock: if no tokens are available.
250          RuntimeError: if ``borrower`` already holds one of this sack's
251              tokens.
252
253        """
254        if borrower in self._borrowers:
255            raise RuntimeError(
256                "this borrower is already holding one of this "
257                "CapacityLimiter's tokens"
258            )
259        if len(self._borrowers) < self._total_tokens and not self._lot:
260            self._borrowers.add(borrower)
261        else:
262            raise trio.WouldBlock
263
264    @enable_ki_protection
265    async def acquire(self):
266        """Borrow a token from the sack, blocking if necessary.
267
268        Raises:
269          RuntimeError: if the current task already holds one of this sack's
270              tokens.
271
272        """
273        await self.acquire_on_behalf_of(trio.lowlevel.current_task())
274
275    @enable_ki_protection
276    async def acquire_on_behalf_of(self, borrower):
277        """Borrow a token from the sack on behalf of ``borrower``, blocking if
278        necessary.
279
280        Args:
281          borrower: A :class:`trio.lowlevel.Task` or arbitrary opaque object
282             used to record who is borrowing this token; see
283             :meth:`acquire_on_behalf_of_nowait` for details.
284
285        Raises:
286          RuntimeError: if ``borrower`` task already holds one of this sack's
287             tokens.
288
289        """
290        await trio.lowlevel.checkpoint_if_cancelled()
291        try:
292            self.acquire_on_behalf_of_nowait(borrower)
293        except trio.WouldBlock:
294            task = trio.lowlevel.current_task()
295            self._pending_borrowers[task] = borrower
296            try:
297                await self._lot.park()
298            except trio.Cancelled:
299                self._pending_borrowers.pop(task)
300                raise
301        else:
302            await trio.lowlevel.cancel_shielded_checkpoint()
303
304    @enable_ki_protection
305    def release(self):
306        """Put a token back into the sack.
307
308        Raises:
309          RuntimeError: if the current task has not acquired one of this
310              sack's tokens.
311
312        """
313        self.release_on_behalf_of(trio.lowlevel.current_task())
314
315    @enable_ki_protection
316    def release_on_behalf_of(self, borrower):
317        """Put a token back into the sack on behalf of ``borrower``.
318
319        Raises:
320          RuntimeError: if the given borrower has not acquired one of this
321              sack's tokens.
322
323        """
324        if borrower not in self._borrowers:
325            raise RuntimeError(
326                "this borrower isn't holding any of this CapacityLimiter's tokens"
327            )
328        self._borrowers.remove(borrower)
329        self._wake_waiters()
330
331    def statistics(self):
332        """Return an object containing debugging information.
333
334        Currently the following fields are defined:
335
336        * ``borrowed_tokens``: The number of tokens currently borrowed from
337          the sack.
338        * ``total_tokens``: The total number of tokens in the sack. Usually
339          this will be larger than ``borrowed_tokens``, but it's possibly for
340          it to be smaller if :attr:`total_tokens` was recently decreased.
341        * ``borrowers``: A list of all tasks or other entities that currently
342          hold a token.
343        * ``tasks_waiting``: The number of tasks blocked on this
344          :class:`CapacityLimiter`\'s :meth:`acquire` or
345          :meth:`acquire_on_behalf_of` methods.
346
347        """
348        return _CapacityLimiterStatistics(
349            borrowed_tokens=len(self._borrowers),
350            total_tokens=self._total_tokens,
351            # Use a list instead of a frozenset just in case we start to allow
352            # one borrower to hold multiple tokens in the future
353            borrowers=list(self._borrowers),
354            tasks_waiting=len(self._lot),
355        )
356
357
358@async_cm
359class Semaphore(metaclass=Final):
360    """A `semaphore <https://en.wikipedia.org/wiki/Semaphore_(programming)>`__.
361
362    A semaphore holds an integer value, which can be incremented by
363    calling :meth:`release` and decremented by calling :meth:`acquire` – but
364    the value is never allowed to drop below zero. If the value is zero, then
365    :meth:`acquire` will block until someone calls :meth:`release`.
366
367    If you're looking for a :class:`Semaphore` to limit the number of tasks
368    that can access some resource simultaneously, then consider using a
369    :class:`CapacityLimiter` instead.
370
371    This object's interface is similar to, but different from, that of
372    :class:`threading.Semaphore`.
373
374    A :class:`Semaphore` object can be used as an async context manager; it
375    blocks on entry but not on exit.
376
377    Args:
378      initial_value (int): A non-negative integer giving semaphore's initial
379        value.
380      max_value (int or None): If given, makes this a "bounded" semaphore that
381        raises an error if the value is about to exceed the given
382        ``max_value``.
383
384    """
385
386    def __init__(self, initial_value, *, max_value=None):
387        if not isinstance(initial_value, int):
388            raise TypeError("initial_value must be an int")
389        if initial_value < 0:
390            raise ValueError("initial value must be >= 0")
391        if max_value is not None:
392            if not isinstance(max_value, int):
393                raise TypeError("max_value must be None or an int")
394            if max_value < initial_value:
395                raise ValueError("max_values must be >= initial_value")
396
397        # Invariants:
398        # bool(self._lot) implies self._value == 0
399        # (or equivalently: self._value > 0 implies not self._lot)
400        self._lot = trio.lowlevel.ParkingLot()
401        self._value = initial_value
402        self._max_value = max_value
403
404    def __repr__(self):
405        if self._max_value is None:
406            max_value_str = ""
407        else:
408            max_value_str = ", max_value={}".format(self._max_value)
409        return "<trio.Semaphore({}{}) at {:#x}>".format(
410            self._value, max_value_str, id(self)
411        )
412
413    @property
414    def value(self):
415        """The current value of the semaphore."""
416        return self._value
417
418    @property
419    def max_value(self):
420        """The maximum allowed value. May be None to indicate no limit."""
421        return self._max_value
422
423    @enable_ki_protection
424    def acquire_nowait(self):
425        """Attempt to decrement the semaphore value, without blocking.
426
427        Raises:
428          WouldBlock: if the value is zero.
429
430        """
431        if self._value > 0:
432            assert not self._lot
433            self._value -= 1
434        else:
435            raise trio.WouldBlock
436
437    @enable_ki_protection
438    async def acquire(self):
439        """Decrement the semaphore value, blocking if necessary to avoid
440        letting it drop below zero.
441
442        """
443        await trio.lowlevel.checkpoint_if_cancelled()
444        try:
445            self.acquire_nowait()
446        except trio.WouldBlock:
447            await self._lot.park()
448        else:
449            await trio.lowlevel.cancel_shielded_checkpoint()
450
451    @enable_ki_protection
452    def release(self):
453        """Increment the semaphore value, possibly waking a task blocked in
454        :meth:`acquire`.
455
456        Raises:
457          ValueError: if incrementing the value would cause it to exceed
458              :attr:`max_value`.
459
460        """
461        if self._lot:
462            assert self._value == 0
463            self._lot.unpark(count=1)
464        else:
465            if self._max_value is not None and self._value == self._max_value:
466                raise ValueError("semaphore released too many times")
467            self._value += 1
468
469    def statistics(self):
470        """Return an object containing debugging information.
471
472        Currently the following fields are defined:
473
474        * ``tasks_waiting``: The number of tasks blocked on this semaphore's
475          :meth:`acquire` method.
476
477        """
478        return self._lot.statistics()
479
480
481@attr.s(frozen=True)
482class _LockStatistics:
483    locked = attr.ib()
484    owner = attr.ib()
485    tasks_waiting = attr.ib()
486
487
488@async_cm
489@attr.s(eq=False, hash=False, repr=False)
490class _LockImpl:
491    _lot = attr.ib(factory=ParkingLot, init=False)
492    _owner = attr.ib(default=None, init=False)
493
494    def __repr__(self):
495        if self.locked():
496            s1 = "locked"
497            s2 = " with {} waiters".format(len(self._lot))
498        else:
499            s1 = "unlocked"
500            s2 = ""
501        return "<{} {} object at {:#x}{}>".format(
502            s1, self.__class__.__name__, id(self), s2
503        )
504
505    def locked(self):
506        """Check whether the lock is currently held.
507
508        Returns:
509          bool: True if the lock is held, False otherwise.
510
511        """
512        return self._owner is not None
513
514    @enable_ki_protection
515    def acquire_nowait(self):
516        """Attempt to acquire the lock, without blocking.
517
518        Raises:
519          WouldBlock: if the lock is held.
520
521        """
522
523        task = trio.lowlevel.current_task()
524        if self._owner is task:
525            raise RuntimeError("attempt to re-acquire an already held Lock")
526        elif self._owner is None and not self._lot:
527            # No-one owns it
528            self._owner = task
529        else:
530            raise trio.WouldBlock
531
532    @enable_ki_protection
533    async def acquire(self):
534        """Acquire the lock, blocking if necessary."""
535        await trio.lowlevel.checkpoint_if_cancelled()
536        try:
537            self.acquire_nowait()
538        except trio.WouldBlock:
539            # NOTE: it's important that the contended acquire path is just
540            # "_lot.park()", because that's how Condition.wait() acquires the
541            # lock as well.
542            await self._lot.park()
543        else:
544            await trio.lowlevel.cancel_shielded_checkpoint()
545
546    @enable_ki_protection
547    def release(self):
548        """Release the lock.
549
550        Raises:
551          RuntimeError: if the calling task does not hold the lock.
552
553        """
554        task = trio.lowlevel.current_task()
555        if task is not self._owner:
556            raise RuntimeError("can't release a Lock you don't own")
557        if self._lot:
558            (self._owner,) = self._lot.unpark(count=1)
559        else:
560            self._owner = None
561
562    def statistics(self):
563        """Return an object containing debugging information.
564
565        Currently the following fields are defined:
566
567        * ``locked``: boolean indicating whether the lock is held.
568        * ``owner``: the :class:`trio.lowlevel.Task` currently holding the lock,
569          or None if the lock is not held.
570        * ``tasks_waiting``: The number of tasks blocked on this lock's
571          :meth:`acquire` method.
572
573        """
574        return _LockStatistics(
575            locked=self.locked(), owner=self._owner, tasks_waiting=len(self._lot)
576        )
577
578
579class Lock(_LockImpl, metaclass=Final):
580    """A classic `mutex
581    <https://en.wikipedia.org/wiki/Lock_(computer_science)>`__.
582
583    This is a non-reentrant, single-owner lock. Unlike
584    :class:`threading.Lock`, only the owner of the lock is allowed to release
585    it.
586
587    A :class:`Lock` object can be used as an async context manager; it
588    blocks on entry but not on exit.
589
590    """
591
592
593class StrictFIFOLock(_LockImpl, metaclass=Final):
594    r"""A variant of :class:`Lock` where tasks are guaranteed to acquire the
595    lock in strict first-come-first-served order.
596
597    An example of when this is useful is if you're implementing something like
598    :class:`trio.SSLStream` or an HTTP/2 server using `h2
599    <https://hyper-h2.readthedocs.io/>`__, where you have multiple concurrent
600    tasks that are interacting with a shared state machine, and at
601    unpredictable moments the state machine requests that a chunk of data be
602    sent over the network. (For example, when using h2 simply reading incoming
603    data can occasionally `create outgoing data to send
604    <https://http2.github.io/http2-spec/#PING>`__.) The challenge is to make
605    sure that these chunks are sent in the correct order, without being
606    garbled.
607
608    One option would be to use a regular :class:`Lock`, and wrap it around
609    every interaction with the state machine::
610
611        # This approach is sometimes workable but often sub-optimal; see below
612        async with lock:
613            state_machine.do_something()
614            if state_machine.has_data_to_send():
615                await conn.sendall(state_machine.get_data_to_send())
616
617    But this can be problematic. If you're using h2 then *usually* reading
618    incoming data doesn't create the need to send any data, so we don't want
619    to force every task that tries to read from the network to sit and wait
620    a potentially long time for ``sendall`` to finish. And in some situations
621    this could even potentially cause a deadlock, if the remote peer is
622    waiting for you to read some data before it accepts the data you're
623    sending.
624
625    :class:`StrictFIFOLock` provides an alternative. We can rewrite our
626    example like::
627
628        # Note: no awaits between when we start using the state machine and
629        # when we block to take the lock!
630        state_machine.do_something()
631        if state_machine.has_data_to_send():
632            # Notice that we fetch the data to send out of the state machine
633            # *before* sleeping, so that other tasks won't see it.
634            chunk = state_machine.get_data_to_send()
635            async with strict_fifo_lock:
636                await conn.sendall(chunk)
637
638    First we do all our interaction with the state machine in a single
639    scheduling quantum (notice there are no ``await``\s in there), so it's
640    automatically atomic with respect to other tasks. And then if and only if
641    we have data to send, we get in line to send it – and
642    :class:`StrictFIFOLock` guarantees that each task will send its data in
643    the same order that the state machine generated it.
644
645    Currently, :class:`StrictFIFOLock` is identical to :class:`Lock`,
646    but (a) this may not always be true in the future, especially if Trio ever
647    implements `more sophisticated scheduling policies
648    <https://github.com/python-trio/trio/issues/32>`__, and (b) the above code
649    is relying on a pretty subtle property of its lock. Using a
650    :class:`StrictFIFOLock` acts as an executable reminder that you're relying
651    on this property.
652
653    """
654
655
656@attr.s(frozen=True)
657class _ConditionStatistics:
658    tasks_waiting = attr.ib()
659    lock_statistics = attr.ib()
660
661
662@async_cm
663class Condition(metaclass=Final):
664    """A classic `condition variable
665    <https://en.wikipedia.org/wiki/Monitor_(synchronization)>`__, similar to
666    :class:`threading.Condition`.
667
668    A :class:`Condition` object can be used as an async context manager to
669    acquire the underlying lock; it blocks on entry but not on exit.
670
671    Args:
672      lock (Lock): the lock object to use. If given, must be a
673          :class:`trio.Lock`. If None, a new :class:`Lock` will be allocated
674          and used.
675
676    """
677
678    def __init__(self, lock=None):
679        if lock is None:
680            lock = Lock()
681        if not type(lock) is Lock:
682            raise TypeError("lock must be a trio.Lock")
683        self._lock = lock
684        self._lot = trio.lowlevel.ParkingLot()
685
686    def locked(self):
687        """Check whether the underlying lock is currently held.
688
689        Returns:
690          bool: True if the lock is held, False otherwise.
691
692        """
693        return self._lock.locked()
694
695    def acquire_nowait(self):
696        """Attempt to acquire the underlying lock, without blocking.
697
698        Raises:
699          WouldBlock: if the lock is currently held.
700
701        """
702        return self._lock.acquire_nowait()
703
704    async def acquire(self):
705        """Acquire the underlying lock, blocking if necessary."""
706        await self._lock.acquire()
707
708    def release(self):
709        """Release the underlying lock."""
710        self._lock.release()
711
712    @enable_ki_protection
713    async def wait(self):
714        """Wait for another task to call :meth:`notify` or
715        :meth:`notify_all`.
716
717        When calling this method, you must hold the lock. It releases the lock
718        while waiting, and then re-acquires it before waking up.
719
720        There is a subtlety with how this method interacts with cancellation:
721        when cancelled it will block to re-acquire the lock before raising
722        :exc:`Cancelled`. This may cause cancellation to be less prompt than
723        expected. The advantage is that it makes code like this work::
724
725           async with condition:
726               await condition.wait()
727
728        If we didn't re-acquire the lock before waking up, and :meth:`wait`
729        were cancelled here, then we'd crash in ``condition.__aexit__`` when
730        we tried to release the lock we no longer held.
731
732        Raises:
733          RuntimeError: if the calling task does not hold the lock.
734
735        """
736        if trio.lowlevel.current_task() is not self._lock._owner:
737            raise RuntimeError("must hold the lock to wait")
738        self.release()
739        # NOTE: we go to sleep on self._lot, but we'll wake up on
740        # self._lock._lot. That's all that's required to acquire a Lock.
741        try:
742            await self._lot.park()
743        except:
744            with trio.CancelScope(shield=True):
745                await self.acquire()
746            raise
747
748    def notify(self, n=1):
749        """Wake one or more tasks that are blocked in :meth:`wait`.
750
751        Args:
752          n (int): The number of tasks to wake.
753
754        Raises:
755          RuntimeError: if the calling task does not hold the lock.
756
757        """
758        if trio.lowlevel.current_task() is not self._lock._owner:
759            raise RuntimeError("must hold the lock to notify")
760        self._lot.repark(self._lock._lot, count=n)
761
762    def notify_all(self):
763        """Wake all tasks that are currently blocked in :meth:`wait`.
764
765        Raises:
766          RuntimeError: if the calling task does not hold the lock.
767
768        """
769        if trio.lowlevel.current_task() is not self._lock._owner:
770            raise RuntimeError("must hold the lock to notify")
771        self._lot.repark_all(self._lock._lot)
772
773    def statistics(self):
774        r"""Return an object containing debugging information.
775
776        Currently the following fields are defined:
777
778        * ``tasks_waiting``: The number of tasks blocked on this condition's
779          :meth:`wait` method.
780        * ``lock_statistics``: The result of calling the underlying
781          :class:`Lock`\s  :meth:`~Lock.statistics` method.
782
783        """
784        return _ConditionStatistics(
785            tasks_waiting=len(self._lot), lock_statistics=self._lock.statistics()
786        )
787