1# -*- coding: utf-8 -*-
2
3#    Copyright (C) 2014 Yahoo! Inc. All Rights Reserved.
4#
5#    Licensed under the Apache License, Version 2.0 (the "License"); you may
6#    not use this file except in compliance with the License. You may obtain
7#    a copy of the License at
8#
9#         http://www.apache.org/licenses/LICENSE-2.0
10#
11#    Unless required by applicable law or agreed to in writing, software
12#    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
13#    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
14#    License for the specific language governing permissions and limitations
15#    under the License.
16
17import collections
18import contextlib
19import copy
20import logging
21
22from oslo_utils import reflection
23import six
24
25LOG = logging.getLogger(__name__)
26
27
28class Listener(object):
29    """Immutable helper that represents a notification listener/target."""
30
31    def __init__(self, callback, args=None, kwargs=None, details_filter=None):
32        """Initialize members
33
34        :param callback: callback function
35        :param details_filter: a callback that will be called before the
36                               actual callback that can be used to discard
37                               the event (thus avoiding the invocation of
38                               the actual callback)
39        :param args: non-keyworded arguments
40        :type args: list/iterable/tuple
41        :param kwargs: key-value pair arguments
42        :type kwargs: dictionary
43        """
44        self._callback = callback
45        self._details_filter = details_filter
46        if not args:
47            self._args = ()
48        else:
49            if not isinstance(args, tuple):
50                self._args = tuple(args)
51            else:
52                self._args = args
53        if not kwargs:
54            self._kwargs = {}
55        else:
56            self._kwargs = kwargs.copy()
57
58    @property
59    def callback(self):
60        """Callback (can not be none) to call with event + details."""
61        return self._callback
62
63    @property
64    def details_filter(self):
65        """Callback (may be none) to call to discard events + details."""
66        return self._details_filter
67
68    @property
69    def kwargs(self):
70        """Dictionary of keyword arguments to use in future calls."""
71        return self._kwargs.copy()
72
73    @property
74    def args(self):
75        """Tuple of positional arguments to use in future calls."""
76        return self._args
77
78    def __call__(self, event_type, details):
79        """Activate the target callback with the given event + details.
80
81        NOTE(harlowja): if a details filter callback exists and it returns
82        a falsey value when called with the provided ``details``, then the
83        target callback will **not** be called.
84        """
85        if self._details_filter is not None:
86            if not self._details_filter(details):
87                return
88        kwargs = self._kwargs.copy()
89        kwargs['details'] = details
90        self._callback(event_type, *self._args, **kwargs)
91
92    def __repr__(self):
93        repr_msg = "%s object at 0x%x calling into '%r'" % (
94            reflection.get_class_name(self, fully_qualified=False),
95            id(self), self._callback)
96        if self._details_filter is not None:
97            repr_msg += " using details filter '%r'" % self._details_filter
98        return "<%s>" % repr_msg
99
100    def is_equivalent(self, callback, details_filter=None):
101        """Check if the callback is same
102
103        :param callback: callback used for comparison
104        :param details_filter: callback used for comparison
105        :returns: false if not the same callback, otherwise true
106        :rtype: boolean
107        """
108        if not reflection.is_same_callback(self._callback, callback):
109            return False
110        if details_filter is not None:
111            if self._details_filter is None:
112                return False
113            else:
114                return reflection.is_same_callback(self._details_filter,
115                                                   details_filter)
116        else:
117            return self._details_filter is None
118
119    def __eq__(self, other):
120        if isinstance(other, Listener):
121            return self.is_equivalent(other._callback,
122                                      details_filter=other._details_filter)
123        else:
124            return NotImplemented
125
126    def __ne__(self, other):
127        return not self.__eq__(other)
128
129
130class Notifier(object):
131    """A notification (`pub/sub`_ *like*) helper class.
132
133    It is intended to be used to subscribe to notifications of events
134    occurring as well as allow a entity to post said notifications to any
135    associated subscribers without having either entity care about how this
136    notification occurs.
137
138    **Not** thread-safe when a single notifier is mutated at the same
139    time by multiple threads. For example having multiple threads call
140    into :py:meth:`.register` or :py:meth:`.reset` at the same time could
141    potentially end badly. It is thread-safe when
142    only :py:meth:`.notify` calls or other read-only actions (like calling
143    into :py:meth:`.is_registered`) are occurring at the same time.
144
145    .. _pub/sub: http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern
146    """
147
148    #: Keys that can *not* be used in callbacks arguments
149    RESERVED_KEYS = ('details',)
150
151    #: Kleene star constant that is used to receive all notifications
152    ANY = '*'
153
154    #: Events which can *not* be used to trigger notifications
155    _DISALLOWED_NOTIFICATION_EVENTS = set([ANY])
156
157    def __init__(self):
158        self._topics = collections.defaultdict(list)
159
160    def __len__(self):
161        """Returns how many callbacks are registered.
162
163        :returns: count of how many callbacks are registered
164        :rtype: number
165        """
166        count = 0
167        for (_event_type, listeners) in six.iteritems(self._topics):
168            count += len(listeners)
169        return count
170
171    def is_registered(self, event_type, callback, details_filter=None):
172        """Check if a callback is registered.
173
174        :returns: checks if the callback is registered
175        :rtype: boolean
176        """
177        for listener in self._topics.get(event_type, []):
178            if listener.is_equivalent(callback, details_filter=details_filter):
179                return True
180        return False
181
182    def reset(self):
183        """Forget all previously registered callbacks."""
184        self._topics.clear()
185
186    def notify(self, event_type, details):
187        """Notify about event occurrence.
188
189        All callbacks registered to receive notifications about given
190        event type will be called. If the provided event type can not be
191        used to emit notifications (this is checked via
192        the :meth:`.can_be_registered` method) then it will silently be
193        dropped (notification failures are not allowed to cause or
194        raise exceptions).
195
196        :param event_type: event type that occurred
197        :param details: additional event details *dictionary* passed to
198                        callback keyword argument with the same name
199        :type details: dictionary
200        """
201        if not self.can_trigger_notification(event_type):
202            LOG.debug("Event type '%s' is not allowed to trigger"
203                      " notifications", event_type)
204            return
205        listeners = list(self._topics.get(self.ANY, []))
206        listeners.extend(self._topics.get(event_type, []))
207        if not listeners:
208            return
209        if not details:
210            details = {}
211        for listener in listeners:
212            try:
213                listener(event_type, details.copy())
214            except Exception:
215                LOG.warning("Failure calling listener %s to notify about event"
216                            " %s, details: %s", listener, event_type,
217                            details, exc_info=True)
218
219    def register(self, event_type, callback,
220                 args=None, kwargs=None, details_filter=None):
221        """Register a callback to be called when event of a given type occurs.
222
223        Callback will be called with provided ``args`` and ``kwargs`` and
224        when event type occurs (or on any event if ``event_type`` equals to
225        :attr:`.ANY`). It will also get additional keyword argument,
226        ``details``, that will hold event details provided to the
227        :meth:`.notify` method (if a details filter callback is provided then
228        the target callback will *only* be triggered if the details filter
229        callback returns a truthy value).
230
231        :param event_type: event type input
232        :param callback: function callback to be registered.
233        :param args: non-keyworded arguments
234        :type args: list
235        :param kwargs: key-value pair arguments
236        :type kwargs: dictionary
237        """
238        if not six.callable(callback):
239            raise ValueError("Event callback must be callable")
240        if details_filter is not None:
241            if not six.callable(details_filter):
242                raise ValueError("Details filter must be callable")
243        if not self.can_be_registered(event_type):
244            raise ValueError("Disallowed event type '%s' can not have a"
245                             " callback registered" % event_type)
246        if self.is_registered(event_type, callback,
247                              details_filter=details_filter):
248            raise ValueError("Event callback already registered with"
249                             " equivalent details filter")
250        if kwargs:
251            for k in self.RESERVED_KEYS:
252                if k in kwargs:
253                    raise KeyError("Reserved key '%s' not allowed in "
254                                   "kwargs" % k)
255        self._topics[event_type].append(
256            Listener(callback,
257                     args=args, kwargs=kwargs,
258                     details_filter=details_filter))
259
260    def deregister(self, event_type, callback, details_filter=None):
261        """Remove a single listener bound to event ``event_type``.
262
263        :param event_type: deregister listener bound to event_type
264        """
265        if event_type not in self._topics:
266            return False
267        for i, listener in enumerate(self._topics.get(event_type, [])):
268            if listener.is_equivalent(callback, details_filter=details_filter):
269                self._topics[event_type].pop(i)
270                return True
271        return False
272
273    def deregister_event(self, event_type):
274        """Remove a group of listeners bound to event ``event_type``.
275
276        :param event_type: deregister listeners bound to event_type
277        """
278        return len(self._topics.pop(event_type, []))
279
280    def copy(self):
281        c = copy.copy(self)
282        c._topics = collections.defaultdict(list)
283        for (event_type, listeners) in six.iteritems(self._topics):
284            c._topics[event_type] = listeners[:]
285        return c
286
287    def listeners_iter(self):
288        """Return an iterator over the mapping of event => listeners bound.
289
290        NOTE(harlowja): Each listener in the yielded (event, listeners)
291        tuple is an instance of the :py:class:`~.Listener`  type, which
292        itself wraps a provided callback (and its details filter
293        callback, if any).
294        """
295        for event_type, listeners in six.iteritems(self._topics):
296            if listeners:
297                yield (event_type, listeners)
298
299    def can_be_registered(self, event_type):
300        """Checks if the event can be registered/subscribed to."""
301        return True
302
303    def can_trigger_notification(self, event_type):
304        """Checks if the event can trigger a notification.
305
306        :param event_type: event that needs to be verified
307        :returns: whether the event can trigger a notification
308        :rtype: boolean
309        """
310        if event_type in self._DISALLOWED_NOTIFICATION_EVENTS:
311            return False
312        else:
313            return True
314
315
316class RestrictedNotifier(Notifier):
317    """A notification class that restricts events registered/triggered.
318
319    NOTE(harlowja): This class unlike :class:`.Notifier` restricts and
320    disallows registering callbacks for event types that are not declared
321    when constructing the notifier.
322    """
323
324    def __init__(self, watchable_events, allow_any=True):
325        super(RestrictedNotifier, self).__init__()
326        self._watchable_events = frozenset(watchable_events)
327        self._allow_any = allow_any
328
329    def events_iter(self):
330        """Returns iterator of events that can be registered/subscribed to.
331
332        NOTE(harlowja): does not include back the ``ANY`` event type as that
333        meta-type is not a specific event but is a capture-all that does not
334        imply the same meaning as specific event types.
335        """
336        for event_type in self._watchable_events:
337            yield event_type
338
339    def can_be_registered(self, event_type):
340        """Checks if the event can be registered/subscribed to.
341
342        :param event_type: event that needs to be verified
343        :returns: whether the event can be registered/subscribed to
344        :rtype: boolean
345        """
346        return (event_type in self._watchable_events or
347                (event_type == self.ANY and self._allow_any))
348
349
350@contextlib.contextmanager
351def register_deregister(notifier, event_type, callback=None,
352                        args=None, kwargs=None, details_filter=None):
353    """Context manager that registers a callback, then deregisters on exit.
354
355    NOTE(harlowja): if the callback is none, then this registers nothing, which
356                    is different from the behavior of the ``register`` method
357                    which will *not* accept none as it is not callable...
358    """
359    if callback is None:
360        yield
361    else:
362        notifier.register(event_type, callback,
363                          args=args, kwargs=kwargs,
364                          details_filter=details_filter)
365        try:
366            yield
367        finally:
368            notifier.deregister(event_type, callback,
369                                details_filter=details_filter)
370