1"""Multiple-producer-multiple-consumer signal-dispatching.
2
3``dispatcher`` is the core of Louie, providing the primary API and the
4core logic for the system.
5
6Internal attributes:
7
8- ``WEAKREF_TYPES``: Tuple of types/classes which represent weak
9  references to receivers, and thus must be dereferenced on retrieval
10  to retrieve the callable object
11
12- ``connections``::
13
14    { senderkey (id) : { signal : [receivers...] } }
15
16- ``senders``: Used for cleaning up sender references on sender
17  deletion::
18
19    { senderkey (id) : weakref(sender) }
20
21- ``senders_back``: Used for cleaning up receiver references on receiver
22  deletion::
23
24    { receiverkey (id) : [senderkey (id)...] }
25"""
26
27import weakref
28
29from louie import error, robustapply, saferef
30from louie.sender import Anonymous, Any
31from louie.signal import All
32
33# Support for statistics.
34if __debug__:
35    import os
36
37    connects = 0
38    disconnects = 0
39    sends = 0
40
41    def print_stats():
42        print(
43            "\n"
44            f"Louie connects: {connects}\n"
45            f"Louie disconnects: {disconnects}\n"
46            f"Louie sends: {sends}\n"
47            "\n"
48        )
49
50    if "PYDISPATCH_STATS" in os.environ:
51        import atexit
52
53        atexit.register(print_stats)
54
55
56WEAKREF_TYPES = (weakref.ReferenceType, saferef.BoundMethodWeakref)
57
58
59connections = {}
60senders = {}
61senders_back = {}
62plugins = []
63
64
65def reset():
66    """Reset the state of Louie.
67
68    Useful during unit testing.  Should be avoided otherwise.
69    """
70    global connections, senders, senders_back, plugins
71    connections = {}
72    senders = {}
73    senders_back = {}
74    plugins = []
75
76
77def connect(receiver, signal=All, sender=Any, weak=True):
78    """Connect ``receiver`` to ``sender`` for ``signal``.
79
80    - ``receiver``: A callable Python object which is to receive
81      messages/signals/events.  Receivers must be hashable objects.
82
83      If weak is ``True``, then receiver must be weak-referencable (more
84      precisely ``saferef.safe_ref()`` must be able to create a
85      reference to the receiver).
86
87      Receivers are fairly flexible in their specification, as the
88      machinery in the ``robustapply`` module takes care of most of the
89      details regarding figuring out appropriate subsets of the sent
90      arguments to apply to a given receiver.
91
92      Note: If ``receiver`` is itself a weak reference (a callable), it
93      will be de-referenced by the system's machinery, so *generally*
94      weak references are not suitable as receivers, though some use
95      might be found for the facility whereby a higher-level library
96      passes in pre-weakrefed receiver references.
97
98    - ``signal``: The signal to which the receiver should respond.
99
100      If ``All``, receiver will receive all signals from the indicated
101      sender (which might also be ``All``, but is not necessarily
102      ``All``).
103
104      Otherwise must be a hashable Python object other than ``None``
105      (``DispatcherError`` raised on ``None``).
106
107    - ``sender``: The sender to which the receiver should respond.
108
109      If ``Any``, receiver will receive the indicated signals from any
110      sender.
111
112      If ``Anonymous``, receiver will only receive indicated signals
113      from ``send``/``send_exact`` which do not specify a sender, or
114      specify ``Anonymous`` explicitly as the sender.
115
116      Otherwise can be any python object.
117
118    - ``weak``: Whether to use weak references to the receiver.
119
120      By default, the module will attempt to use weak references to
121      the receiver objects.  If this parameter is ``False``, then strong
122      references will be used.
123
124    Returns ``None``, may raise ``DispatcherTypeError``.
125    """
126    if signal is None:
127        raise error.DispatcherTypeError(
128            f"Signal cannot be None (receiver={receiver!r} sender={sender!r})"
129        )
130    if weak:
131        receiver = saferef.safe_ref(receiver, on_delete=_remove_receiver)
132    senderkey = id(sender)
133    if senderkey in connections:
134        signals = connections[senderkey]
135    else:
136        connections[senderkey] = signals = {}
137    # Keep track of senders for cleanup.
138    # Is Anonymous something we want to clean up?
139    if sender not in (None, Anonymous, Any):
140
141        def remove(object, senderkey=senderkey):
142            _remove_sender(senderkey=senderkey)
143
144        # Skip objects that can not be weakly referenced, which means
145        # they won't be automatically cleaned up, but that's too bad.
146        try:
147            weak_sender = weakref.ref(sender, remove)
148            senders[senderkey] = weak_sender
149        except Exception:
150            pass
151    receiver_id = id(receiver)
152    # get current set, remove any current references to
153    # this receiver in the set, including back-references
154    if signal in signals:
155        receivers = signals[signal]
156        _remove_old_back_refs(senderkey, signal, receiver, receivers)
157    else:
158        receivers = signals[signal] = []
159    try:
160        current = senders_back.get(receiver_id)
161        if current is None:
162            senders_back[receiver_id] = current = []
163        if senderkey not in current:
164            current.append(senderkey)
165    except Exception:
166        pass
167    receivers.append(receiver)
168    # Update stats.
169    if __debug__:
170        global connects
171        connects += 1
172
173
174def disconnect(receiver, signal=All, sender=Any, weak=True):
175    """Disconnect ``receiver`` from ``sender`` for ``signal``.
176
177    - ``receiver``: The registered receiver to disconnect.
178
179    - ``signal``: The registered signal to disconnect.
180
181    - ``sender``: The registered sender to disconnect.
182
183    - ``weak``: The weakref state to disconnect.
184
185    ``disconnect`` reverses the process of ``connect``, the semantics for
186    the individual elements are logically equivalent to a tuple of
187    ``(receiver, signal, sender, weak)`` used as a key to be deleted
188    from the internal routing tables.  (The actual process is slightly
189    more complex but the semantics are basically the same).
190
191    Note: Using ``disconnect`` is not required to cleanup routing when
192    an object is deleted; the framework will remove routes for deleted
193    objects automatically.  It's only necessary to disconnect if you
194    want to stop routing to a live object.
195
196    Returns ``None``, may raise ``DispatcherTypeError`` or
197    ``DispatcherKeyError``.
198    """
199    if signal is None:
200        raise error.DispatcherTypeError(
201            f"Signal cannot be None (receiver={receiver!r} sender={sender!r})"
202        )
203    if weak:
204        receiver = saferef.safe_ref(receiver)
205    senderkey = id(sender)
206    try:
207        signals = connections[senderkey]
208        receivers = signals[signal]
209    except KeyError:
210        raise error.DispatcherKeyError(
211            f"No receivers found for signal {signal!r} from sender {sender!r}"
212        )
213    try:
214        # also removes from receivers
215        _remove_old_back_refs(senderkey, signal, receiver, receivers)
216    except ValueError:
217        raise error.DispatcherKeyError(
218            f"No connection to receiver {receiver!r} "
219            f"for signal {signal!r} from sender {sender!r}"
220        )
221    _cleanup_connections(senderkey, signal)
222    # Update stats.
223    if __debug__:
224        global disconnects
225        disconnects += 1
226
227
228def get_receivers(sender=Any, signal=All):
229    """Get list of receivers from global tables.
230
231    This function allows you to retrieve the raw list of receivers
232    from the connections table for the given sender and signal pair.
233
234    Note: There is no guarantee that this is the actual list stored in
235    the connections table, so the value should be treated as a simple
236    iterable/truth value rather than, for instance a list to which you
237    might append new records.
238
239    Normally you would use ``live_receivers(get_receivers(...))`` to
240    retrieve the actual receiver objects as an iterable object.
241    """
242    try:
243        return connections[id(sender)][signal]
244    except KeyError:
245        return []
246
247
248def live_receivers(receivers):
249    """Filter sequence of receivers to get resolved, live receivers.
250
251    This is a generator which will iterate over the passed sequence,
252    checking for weak references and resolving them, then returning
253    all live receivers.
254    """
255    for receiver in receivers:
256        if isinstance(receiver, WEAKREF_TYPES):
257            # Dereference the weak reference.
258            receiver = receiver()
259        if receiver is not None:
260            # Check installed plugins to make sure this receiver is
261            # live.
262            live = True
263            for plugin in plugins:
264                if not plugin.is_live(receiver):
265                    live = False
266                    break
267            if live:
268                yield receiver
269
270
271def get_all_receivers(sender=Any, signal=All):
272    """Get list of all receivers from global tables.
273
274    This gets all receivers which should receive the given signal from
275    sender, each receiver should be produced only once by the
276    resulting generator.
277    """
278    yielded = set()
279    for receivers in (
280        # Get receivers that receive *this* signal from *this* sender.
281        get_receivers(sender, signal),
282        # Add receivers that receive *all* signals from *this* sender.
283        get_receivers(sender, All),
284        # Add receivers that receive *this* signal from *any* sender.
285        get_receivers(Any, signal),
286        # Add receivers that receive *all* signals from *any* sender.
287        get_receivers(Any, All),
288    ):
289        # Make a copy of each list so it's immutable within the context
290        # of this function, even if a receiver calls disconnect() or any
291        # other function that changes a list of receivers.
292        for receiver in list(receivers):
293            if receiver:  # filter out dead instance-method weakrefs
294                try:
295                    if receiver not in yielded:
296                        yielded.add(receiver)
297                        yield receiver
298                except TypeError:
299                    # dead weakrefs raise TypeError on hash...
300                    pass
301
302
303def send(signal=All, sender=Anonymous, *arguments, **named):
304    """Send ``signal`` from ``sender`` to all connected receivers.
305
306    - ``signal``: (Hashable) signal value; see ``connect`` for details.
307
308    - ``sender``: The sender of the signal.
309
310      If ``Any``, only receivers registered for ``Any`` will receive the
311      message.
312
313      If ``Anonymous``, only receivers registered to receive messages
314      from ``Anonymous`` or ``Any`` will receive the message.
315
316      Otherwise can be any Python object (normally one registered with
317      a connect if you actually want something to occur).
318
319    - ``arguments``: Positional arguments which will be passed to *all*
320      receivers. Note that this may raise ``TypeError`` if the receivers
321      do not allow the particular arguments.  Note also that arguments
322      are applied before named arguments, so they should be used with
323      care.
324
325    - ``named``: Named arguments which will be filtered according to the
326      parameters of the receivers to only provide those acceptable to
327      the receiver.
328
329    Return a list of tuple pairs ``[(receiver, response), ...]``
330
331    If any receiver raises an error, the error propagates back through
332    send, terminating the dispatch loop, so it is quite possible to
333    not have all receivers called if a raises an error.
334    """
335    # Call each receiver with whatever arguments it can accept.
336    # Return a list of tuple pairs [(receiver, response), ... ].
337    responses = []
338    for receiver in live_receivers(get_all_receivers(sender, signal)):
339        # Wrap receiver using installed plugins.
340        original = receiver
341        for plugin in plugins:
342            receiver = plugin.wrap_receiver(receiver)
343        response = robustapply.robust_apply(
344            receiver, original, signal=signal, sender=sender, *arguments, **named
345        )
346        responses.append((receiver, response))
347    # Update stats.
348    if __debug__:
349        global sends
350        sends += 1
351    return responses
352
353
354def send_minimal(signal=All, sender=Anonymous, *arguments, **named):
355    """Like ``send``, but does not attach ``signal`` and ``sender``
356    arguments to the call to the receiver."""
357    # Call each receiver with whatever arguments it can accept.
358    # Return a list of tuple pairs [(receiver, response), ... ].
359    responses = []
360    for receiver in live_receivers(get_all_receivers(sender, signal)):
361        # Wrap receiver using installed plugins.
362        original = receiver
363        for plugin in plugins:
364            receiver = plugin.wrap_receiver(receiver)
365        response = robustapply.robust_apply(receiver, original, *arguments, **named)
366        responses.append((receiver, response))
367    # Update stats.
368    if __debug__:
369        global sends
370        sends += 1
371    return responses
372
373
374def send_exact(signal=All, sender=Anonymous, *arguments, **named):
375    """Send ``signal`` only to receivers registered for exact message.
376
377    ``send_exact`` allows for avoiding ``Any``/``Anonymous`` registered
378    handlers, sending only to those receivers explicitly registered
379    for a particular signal on a particular sender.
380    """
381    responses = []
382    for receiver in live_receivers(get_receivers(sender, signal)):
383        # Wrap receiver using installed plugins.
384        original = receiver
385        for plugin in plugins:
386            receiver = plugin.wrap_receiver(receiver)
387        response = robustapply.robust_apply(
388            receiver, original, signal=signal, sender=sender, *arguments, **named
389        )
390        responses.append((receiver, response))
391    return responses
392
393
394def send_robust(signal=All, sender=Anonymous, *arguments, **named):
395    """Send ``signal`` from ``sender`` to all connected receivers catching
396    errors
397
398    - ``signal``: (Hashable) signal value, see connect for details
399
400    - ``sender``: The sender of the signal.
401
402      If ``Any``, only receivers registered for ``Any`` will receive the
403      message.
404
405      If ``Anonymous``, only receivers registered to receive messages
406      from ``Anonymous`` or ``Any`` will receive the message.
407
408      Otherwise can be any Python object (normally one registered with
409      a connect if you actually want something to occur).
410
411    - ``arguments``: Positional arguments which will be passed to *all*
412      receivers. Note that this may raise ``TypeError`` if the receivers
413      do not allow the particular arguments.  Note also that arguments
414      are applied before named arguments, so they should be used with
415      care.
416
417    - ``named``: Named arguments which will be filtered according to the
418      parameters of the receivers to only provide those acceptable to
419      the receiver.
420
421    Return a list of tuple pairs ``[(receiver, response), ... ]``
422
423    If any receiver raises an error (specifically, any subclass of
424    ``Exception``), the error instance is returned as the result for
425    that receiver.
426    """
427    # Call each receiver with whatever arguments it can accept.
428    # Return a list of tuple pairs [(receiver, response), ... ].
429    responses = []
430    for receiver in live_receivers(get_all_receivers(sender, signal)):
431        original = receiver
432        for plugin in plugins:
433            receiver = plugin.wrap_receiver(receiver)
434        try:
435            response = robustapply.robust_apply(
436                receiver, original, signal=signal, sender=sender, *arguments, **named
437            )
438        except Exception as err:
439            responses.append((receiver, err))
440        else:
441            responses.append((receiver, response))
442    return responses
443
444
445def _remove_receiver(receiver):
446    """Remove ``receiver`` from connections."""
447    if not senders_back:
448        # During module cleanup the mapping will be replaced with None.
449        return False
450    backKey = id(receiver)
451    for senderkey in senders_back.get(backKey, ()):
452        try:
453            signals = list(connections[senderkey].keys())
454        except KeyError:
455            pass
456        else:
457            for signal in list(signals):
458                try:
459                    receivers = connections[senderkey][signal]
460                except KeyError:
461                    pass
462                else:
463                    try:
464                        receivers.remove(receiver)
465                    except Exception:
466                        pass
467                _cleanup_connections(senderkey, signal)
468    try:
469        del senders_back[backKey]
470    except KeyError:
471        pass
472
473
474def _cleanup_connections(senderkey, signal):
475    """Delete empty signals for ``senderkey``. Delete ``senderkey`` if
476    empty."""
477    try:
478        receivers = connections[senderkey][signal]
479    except Exception:
480        pass
481    else:
482        if not receivers:
483            # No more connected receivers. Therefore, remove the signal.
484            try:
485                signals = connections[senderkey]
486            except KeyError:
487                pass
488            else:
489                del signals[signal]
490                if not signals:
491                    # No more signal connections. Therefore, remove the sender.
492                    _remove_sender(senderkey)
493
494
495def _remove_sender(senderkey):
496    """Remove ``senderkey`` from connections."""
497    _remove_back_refs(senderkey)
498    try:
499        del connections[senderkey]
500    except KeyError:
501        pass
502    # Senderkey will only be in senders dictionary if sender
503    # could be weakly referenced.
504    try:
505        del senders[senderkey]
506    except Exception:
507        pass
508
509
510def _remove_back_refs(senderkey):
511    """Remove all back-references to this ``senderkey``."""
512    try:
513        signals = connections[senderkey]
514    except KeyError:
515        signals = None
516    else:
517        for signal, receivers in list(signals.items()):
518            for receiver in receivers:
519                _kill_back_ref(receiver, senderkey)
520
521
522def _remove_old_back_refs(senderkey, signal, receiver, receivers):
523    """Kill old ``senders_back`` references from ``receiver``.
524
525    This guards against multiple registration of the same receiver for
526    a given signal and sender leaking memory as old back reference
527    records build up.
528
529    Also removes old receiver instance from receivers.
530    """
531    try:
532        index = receivers.index(receiver)
533        # need to scan back references here and remove senderkey
534    except ValueError:
535        return False
536    else:
537        old_receiver = receivers[index]
538        del receivers[index]
539        found = 0
540        signals = connections.get(signal)
541        if signals is not None:
542            for sig, recs in list(connections.get(signal, {}).items()):
543                if sig != signal:
544                    for rec in recs:
545                        if rec is old_receiver:
546                            found = 1
547                            break
548        if not found:
549            _kill_back_ref(old_receiver, senderkey)
550            return True
551        return False
552
553
554def _kill_back_ref(receiver, senderkey):
555    """Do actual removal of back reference from ``receiver`` to
556    ``senderkey``."""
557    receiverkey = id(receiver)
558    senders = senders_back.get(receiverkey, ())
559    while senderkey in senders:
560        try:
561            senders.remove(senderkey)
562        except Exception:
563            break
564    if not senders:
565        try:
566            del senders_back[receiverkey]
567        except KeyError:
568            pass
569    return True
570