1from aiohttp.frozenlist import FrozenList 2 3__all__ = ("Signal",) 4 5 6class Signal(FrozenList): 7 """Coroutine-based signal implementation. 8 9 To connect a callback to a signal, use any list method. 10 11 Signals are fired using the send() coroutine, which takes named 12 arguments. 13 """ 14 15 __slots__ = ("_owner",) 16 17 def __init__(self, owner): 18 super().__init__() 19 self._owner = owner 20 21 def __repr__(self): 22 return "<Signal owner={}, frozen={}, {!r}>".format( 23 self._owner, self.frozen, list(self) 24 ) 25 26 async def send(self, *args, **kwargs): 27 """ 28 Sends data to all registered receivers. 29 """ 30 if not self.frozen: 31 raise RuntimeError("Cannot send non-frozen signal.") 32 33 for receiver in self: 34 await receiver(*args, **kwargs) # type: ignore 35