1"""Multiple-producer-multiple-consumer signal-dispatching. 2 3``dispatcher`` is the core of Louie, providing the primary API and the 4core logic for the system. 5 6Internal attributes: 7 8- ``WEAKREF_TYPES``: Tuple of types/classes which represent weak 9 references to receivers, and thus must be dereferenced on retrieval 10 to retrieve the callable object 11 12- ``connections``:: 13 14 { senderkey (id) : { signal : [receivers...] } } 15 16- ``senders``: Used for cleaning up sender references on sender 17 deletion:: 18 19 { senderkey (id) : weakref(sender) } 20 21- ``senders_back``: Used for cleaning up receiver references on receiver 22 deletion:: 23 24 { receiverkey (id) : [senderkey (id)...] } 25""" 26 27import weakref 28 29from louie import error, robustapply, saferef 30from louie.sender import Anonymous, Any 31from louie.signal import All 32 33# Support for statistics. 34if __debug__: 35 import os 36 37 connects = 0 38 disconnects = 0 39 sends = 0 40 41 def print_stats(): 42 print( 43 "\n" 44 f"Louie connects: {connects}\n" 45 f"Louie disconnects: {disconnects}\n" 46 f"Louie sends: {sends}\n" 47 "\n" 48 ) 49 50 if "PYDISPATCH_STATS" in os.environ: 51 import atexit 52 53 atexit.register(print_stats) 54 55 56WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref) 57 58 59connections = {} 60senders = {} 61senders_back = {} 62plugins = [] 63 64 65def reset(): 66 """Reset the state of Louie. 67 68 Useful during unit testing. Should be avoided otherwise. 69 """ 70 global connections, senders, senders_back, plugins 71 connections = {} 72 senders = {} 73 senders_back = {} 74 plugins = [] 75 76 77def connect(receiver, signal=All, sender=Any, weak=True): 78 """Connect ``receiver`` to ``sender`` for ``signal``. 79 80 - ``receiver``: A callable Python object which is to receive 81 messages/signals/events. Receivers must be hashable objects. 82 83 If weak is ``True``, then receiver must be weak-referencable (more 84 precisely ``saferef.safe_ref()`` must be able to create a 85 reference to the receiver). 86 87 Receivers are fairly flexible in their specification, as the 88 machinery in the ``robustapply`` module takes care of most of the 89 details regarding figuring out appropriate subsets of the sent 90 arguments to apply to a given receiver. 91 92 Note: If ``receiver`` is itself a weak reference (a callable), it 93 will be de-referenced by the system's machinery, so *generally* 94 weak references are not suitable as receivers, though some use 95 might be found for the facility whereby a higher-level library 96 passes in pre-weakrefed receiver references. 97 98 - ``signal``: The signal to which the receiver should respond. 99 100 If ``All``, receiver will receive all signals from the indicated 101 sender (which might also be ``All``, but is not necessarily 102 ``All``). 103 104 Otherwise must be a hashable Python object other than ``None`` 105 (``DispatcherError`` raised on ``None``). 106 107 - ``sender``: The sender to which the receiver should respond. 108 109 If ``Any``, receiver will receive the indicated signals from any 110 sender. 111 112 If ``Anonymous``, receiver will only receive indicated signals 113 from ``send``/``send_exact`` which do not specify a sender, or 114 specify ``Anonymous`` explicitly as the sender. 115 116 Otherwise can be any python object. 117 118 - ``weak``: Whether to use weak references to the receiver. 119 120 By default, the module will attempt to use weak references to 121 the receiver objects. If this parameter is ``False``, then strong 122 references will be used. 123 124 Returns ``None``, may raise ``DispatcherTypeError``. 125 """ 126 if signal is None: 127 raise error.DispatcherTypeError( 128 f"Signal cannot be None (receiver={receiver!r} sender={sender!r})" 129 ) 130 if weak: 131 receiver = saferef.safe_ref(receiver, on_delete=_remove_receiver) 132 senderkey = id(sender) 133 if senderkey in connections: 134 signals = connections[senderkey] 135 else: 136 connections[senderkey] = signals = {} 137 # Keep track of senders for cleanup. 138 # Is Anonymous something we want to clean up? 139 if sender not in (None, Anonymous, Any): 140 141 def remove(object, senderkey=senderkey): 142 _remove_sender(senderkey=senderkey) 143 144 # Skip objects that can not be weakly referenced, which means 145 # they won't be automatically cleaned up, but that's too bad. 146 try: 147 weak_sender = weakref.ref(sender, remove) 148 senders[senderkey] = weak_sender 149 except Exception: 150 pass 151 receiver_id = id(receiver) 152 # get current set, remove any current references to 153 # this receiver in the set, including back-references 154 if signal in signals: 155 receivers = signals[signal] 156 _remove_old_back_refs(senderkey, signal, receiver, receivers) 157 else: 158 receivers = signals[signal] = [] 159 try: 160 current = senders_back.get(receiver_id) 161 if current is None: 162 senders_back[receiver_id] = current = [] 163 if senderkey not in current: 164 current.append(senderkey) 165 except Exception: 166 pass 167 receivers.append(receiver) 168 # Update stats. 169 if __debug__: 170 global connects 171 connects += 1 172 173 174def disconnect(receiver, signal=All, sender=Any, weak=True): 175 """Disconnect ``receiver`` from ``sender`` for ``signal``. 176 177 - ``receiver``: The registered receiver to disconnect. 178 179 - ``signal``: The registered signal to disconnect. 180 181 - ``sender``: The registered sender to disconnect. 182 183 - ``weak``: The weakref state to disconnect. 184 185 ``disconnect`` reverses the process of ``connect``, the semantics for 186 the individual elements are logically equivalent to a tuple of 187 ``(receiver, signal, sender, weak)`` used as a key to be deleted 188 from the internal routing tables. (The actual process is slightly 189 more complex but the semantics are basically the same). 190 191 Note: Using ``disconnect`` is not required to cleanup routing when 192 an object is deleted; the framework will remove routes for deleted 193 objects automatically. It's only necessary to disconnect if you 194 want to stop routing to a live object. 195 196 Returns ``None``, may raise ``DispatcherTypeError`` or 197 ``DispatcherKeyError``. 198 """ 199 if signal is None: 200 raise error.DispatcherTypeError( 201 f"Signal cannot be None (receiver={receiver!r} sender={sender!r})" 202 ) 203 if weak: 204 receiver = saferef.safe_ref(receiver) 205 senderkey = id(sender) 206 try: 207 signals = connections[senderkey] 208 receivers = signals[signal] 209 except KeyError: 210 raise error.DispatcherKeyError( 211 f"No receivers found for signal {signal!r} from sender {sender!r}" 212 ) 213 try: 214 # also removes from receivers 215 _remove_old_back_refs(senderkey, signal, receiver, receivers) 216 except ValueError: 217 raise error.DispatcherKeyError( 218 f"No connection to receiver {receiver!r} " 219 f"for signal {signal!r} from sender {sender!r}" 220 ) 221 _cleanup_connections(senderkey, signal) 222 # Update stats. 223 if __debug__: 224 global disconnects 225 disconnects += 1 226 227 228def get_receivers(sender=Any, signal=All): 229 """Get list of receivers from global tables. 230 231 This function allows you to retrieve the raw list of receivers 232 from the connections table for the given sender and signal pair. 233 234 Note: There is no guarantee that this is the actual list stored in 235 the connections table, so the value should be treated as a simple 236 iterable/truth value rather than, for instance a list to which you 237 might append new records. 238 239 Normally you would use ``live_receivers(get_receivers(...))`` to 240 retrieve the actual receiver objects as an iterable object. 241 """ 242 try: 243 return connections[id(sender)][signal] 244 except KeyError: 245 return [] 246 247 248def live_receivers(receivers): 249 """Filter sequence of receivers to get resolved, live receivers. 250 251 This is a generator which will iterate over the passed sequence, 252 checking for weak references and resolving them, then returning 253 all live receivers. 254 """ 255 for receiver in receivers: 256 if isinstance(receiver, WEAKREF_TYPES): 257 # Dereference the weak reference. 258 receiver = receiver() 259 if receiver is not None: 260 # Check installed plugins to make sure this receiver is 261 # live. 262 live = True 263 for plugin in plugins: 264 if not plugin.is_live(receiver): 265 live = False 266 break 267 if live: 268 yield receiver 269 270 271def get_all_receivers(sender=Any, signal=All): 272 """Get list of all receivers from global tables. 273 274 This gets all receivers which should receive the given signal from 275 sender, each receiver should be produced only once by the 276 resulting generator. 277 """ 278 yielded = set() 279 for receivers in ( 280 # Get receivers that receive *this* signal from *this* sender. 281 get_receivers(sender, signal), 282 # Add receivers that receive *all* signals from *this* sender. 283 get_receivers(sender, All), 284 # Add receivers that receive *this* signal from *any* sender. 285 get_receivers(Any, signal), 286 # Add receivers that receive *all* signals from *any* sender. 287 get_receivers(Any, All), 288 ): 289 # Make a copy of each list so it's immutable within the context 290 # of this function, even if a receiver calls disconnect() or any 291 # other function that changes a list of receivers. 292 for receiver in list(receivers): 293 if receiver: # filter out dead instance-method weakrefs 294 try: 295 if receiver not in yielded: 296 yielded.add(receiver) 297 yield receiver 298 except TypeError: 299 # dead weakrefs raise TypeError on hash... 300 pass 301 302 303def send(signal=All, sender=Anonymous, *arguments, **named): 304 """Send ``signal`` from ``sender`` to all connected receivers. 305 306 - ``signal``: (Hashable) signal value; see ``connect`` for details. 307 308 - ``sender``: The sender of the signal. 309 310 If ``Any``, only receivers registered for ``Any`` will receive the 311 message. 312 313 If ``Anonymous``, only receivers registered to receive messages 314 from ``Anonymous`` or ``Any`` will receive the message. 315 316 Otherwise can be any Python object (normally one registered with 317 a connect if you actually want something to occur). 318 319 - ``arguments``: Positional arguments which will be passed to *all* 320 receivers. Note that this may raise ``TypeError`` if the receivers 321 do not allow the particular arguments. Note also that arguments 322 are applied before named arguments, so they should be used with 323 care. 324 325 - ``named``: Named arguments which will be filtered according to the 326 parameters of the receivers to only provide those acceptable to 327 the receiver. 328 329 Return a list of tuple pairs ``[(receiver, response), ...]`` 330 331 If any receiver raises an error, the error propagates back through 332 send, terminating the dispatch loop, so it is quite possible to 333 not have all receivers called if a raises an error. 334 """ 335 # Call each receiver with whatever arguments it can accept. 336 # Return a list of tuple pairs [(receiver, response), ... ]. 337 responses = [] 338 for receiver in live_receivers(get_all_receivers(sender, signal)): 339 # Wrap receiver using installed plugins. 340 original = receiver 341 for plugin in plugins: 342 receiver = plugin.wrap_receiver(receiver) 343 response = robustapply.robust_apply( 344 receiver, original, signal=signal, sender=sender, *arguments, **named 345 ) 346 responses.append((receiver, response)) 347 # Update stats. 348 if __debug__: 349 global sends 350 sends += 1 351 return responses 352 353 354def send_minimal(signal=All, sender=Anonymous, *arguments, **named): 355 """Like ``send``, but does not attach ``signal`` and ``sender`` 356 arguments to the call to the receiver.""" 357 # Call each receiver with whatever arguments it can accept. 358 # Return a list of tuple pairs [(receiver, response), ... ]. 359 responses = [] 360 for receiver in live_receivers(get_all_receivers(sender, signal)): 361 # Wrap receiver using installed plugins. 362 original = receiver 363 for plugin in plugins: 364 receiver = plugin.wrap_receiver(receiver) 365 response = robustapply.robust_apply(receiver, original, *arguments, **named) 366 responses.append((receiver, response)) 367 # Update stats. 368 if __debug__: 369 global sends 370 sends += 1 371 return responses 372 373 374def send_exact(signal=All, sender=Anonymous, *arguments, **named): 375 """Send ``signal`` only to receivers registered for exact message. 376 377 ``send_exact`` allows for avoiding ``Any``/``Anonymous`` registered 378 handlers, sending only to those receivers explicitly registered 379 for a particular signal on a particular sender. 380 """ 381 responses = [] 382 for receiver in live_receivers(get_receivers(sender, signal)): 383 # Wrap receiver using installed plugins. 384 original = receiver 385 for plugin in plugins: 386 receiver = plugin.wrap_receiver(receiver) 387 response = robustapply.robust_apply( 388 receiver, original, signal=signal, sender=sender, *arguments, **named 389 ) 390 responses.append((receiver, response)) 391 return responses 392 393 394def send_robust(signal=All, sender=Anonymous, *arguments, **named): 395 """Send ``signal`` from ``sender`` to all connected receivers catching 396 errors 397 398 - ``signal``: (Hashable) signal value, see connect for details 399 400 - ``sender``: The sender of the signal. 401 402 If ``Any``, only receivers registered for ``Any`` will receive the 403 message. 404 405 If ``Anonymous``, only receivers registered to receive messages 406 from ``Anonymous`` or ``Any`` will receive the message. 407 408 Otherwise can be any Python object (normally one registered with 409 a connect if you actually want something to occur). 410 411 - ``arguments``: Positional arguments which will be passed to *all* 412 receivers. Note that this may raise ``TypeError`` if the receivers 413 do not allow the particular arguments. Note also that arguments 414 are applied before named arguments, so they should be used with 415 care. 416 417 - ``named``: Named arguments which will be filtered according to the 418 parameters of the receivers to only provide those acceptable to 419 the receiver. 420 421 Return a list of tuple pairs ``[(receiver, response), ... ]`` 422 423 If any receiver raises an error (specifically, any subclass of 424 ``Exception``), the error instance is returned as the result for 425 that receiver. 426 """ 427 # Call each receiver with whatever arguments it can accept. 428 # Return a list of tuple pairs [(receiver, response), ... ]. 429 responses = [] 430 for receiver in live_receivers(get_all_receivers(sender, signal)): 431 original = receiver 432 for plugin in plugins: 433 receiver = plugin.wrap_receiver(receiver) 434 try: 435 response = robustapply.robust_apply( 436 receiver, original, signal=signal, sender=sender, *arguments, **named 437 ) 438 except Exception as err: 439 responses.append((receiver, err)) 440 else: 441 responses.append((receiver, response)) 442 return responses 443 444 445def _remove_receiver(receiver): 446 """Remove ``receiver`` from connections.""" 447 if not senders_back: 448 # During module cleanup the mapping will be replaced with None. 449 return False 450 backKey = id(receiver) 451 for senderkey in senders_back.get(backKey, ()): 452 try: 453 signals = list(connections[senderkey].keys()) 454 except KeyError: 455 pass 456 else: 457 for signal in list(signals): 458 try: 459 receivers = connections[senderkey][signal] 460 except KeyError: 461 pass 462 else: 463 try: 464 receivers.remove(receiver) 465 except Exception: 466 pass 467 _cleanup_connections(senderkey, signal) 468 try: 469 del senders_back[backKey] 470 except KeyError: 471 pass 472 473 474def _cleanup_connections(senderkey, signal): 475 """Delete empty signals for ``senderkey``. Delete ``senderkey`` if 476 empty.""" 477 try: 478 receivers = connections[senderkey][signal] 479 except Exception: 480 pass 481 else: 482 if not receivers: 483 # No more connected receivers. Therefore, remove the signal. 484 try: 485 signals = connections[senderkey] 486 except KeyError: 487 pass 488 else: 489 del signals[signal] 490 if not signals: 491 # No more signal connections. Therefore, remove the sender. 492 _remove_sender(senderkey) 493 494 495def _remove_sender(senderkey): 496 """Remove ``senderkey`` from connections.""" 497 _remove_back_refs(senderkey) 498 try: 499 del connections[senderkey] 500 except KeyError: 501 pass 502 # Senderkey will only be in senders dictionary if sender 503 # could be weakly referenced. 504 try: 505 del senders[senderkey] 506 except Exception: 507 pass 508 509 510def _remove_back_refs(senderkey): 511 """Remove all back-references to this ``senderkey``.""" 512 try: 513 signals = connections[senderkey] 514 except KeyError: 515 signals = None 516 else: 517 for signal, receivers in list(signals.items()): 518 for receiver in receivers: 519 _kill_back_ref(receiver, senderkey) 520 521 522def _remove_old_back_refs(senderkey, signal, receiver, receivers): 523 """Kill old ``senders_back`` references from ``receiver``. 524 525 This guards against multiple registration of the same receiver for 526 a given signal and sender leaking memory as old back reference 527 records build up. 528 529 Also removes old receiver instance from receivers. 530 """ 531 try: 532 index = receivers.index(receiver) 533 # need to scan back references here and remove senderkey 534 except ValueError: 535 return False 536 else: 537 old_receiver = receivers[index] 538 del receivers[index] 539 found = 0 540 signals = connections.get(signal) 541 if signals is not None: 542 for sig, recs in list(connections.get(signal, {}).items()): 543 if sig != signal: 544 for rec in recs: 545 if rec is old_receiver: 546 found = 1 547 break 548 if not found: 549 _kill_back_ref(old_receiver, senderkey) 550 return True 551 return False 552 553 554def _kill_back_ref(receiver, senderkey): 555 """Do actual removal of back reference from ``receiver`` to 556 ``senderkey``.""" 557 receiverkey = id(receiver) 558 senders = senders_back.get(receiverkey, ()) 559 while senderkey in senders: 560 try: 561 senders.remove(senderkey) 562 except Exception: 563 break 564 if not senders: 565 try: 566 del senders_back[receiverkey] 567 except KeyError: 568 pass 569 return True 570