1# -*- coding: utf-8 -*- 2 3# Copyright (C) 2014 Yahoo! Inc. All Rights Reserved. 4# 5# Licensed under the Apache License, Version 2.0 (the "License"); you may 6# not use this file except in compliance with the License. You may obtain 7# a copy of the License at 8# 9# http://www.apache.org/licenses/LICENSE-2.0 10# 11# Unless required by applicable law or agreed to in writing, software 12# distributed under the License is distributed on an "AS IS" BASIS, WITHOUT 13# WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the 14# License for the specific language governing permissions and limitations 15# under the License. 16 17import collections 18import contextlib 19import copy 20import logging 21 22from oslo_utils import reflection 23import six 24 25LOG = logging.getLogger(__name__) 26 27 28class Listener(object): 29 """Immutable helper that represents a notification listener/target.""" 30 31 def __init__(self, callback, args=None, kwargs=None, details_filter=None): 32 """Initialize members 33 34 :param callback: callback function 35 :param details_filter: a callback that will be called before the 36 actual callback that can be used to discard 37 the event (thus avoiding the invocation of 38 the actual callback) 39 :param args: non-keyworded arguments 40 :type args: list/iterable/tuple 41 :param kwargs: key-value pair arguments 42 :type kwargs: dictionary 43 """ 44 self._callback = callback 45 self._details_filter = details_filter 46 if not args: 47 self._args = () 48 else: 49 if not isinstance(args, tuple): 50 self._args = tuple(args) 51 else: 52 self._args = args 53 if not kwargs: 54 self._kwargs = {} 55 else: 56 self._kwargs = kwargs.copy() 57 58 @property 59 def callback(self): 60 """Callback (can not be none) to call with event + details.""" 61 return self._callback 62 63 @property 64 def details_filter(self): 65 """Callback (may be none) to call to discard events + details.""" 66 return self._details_filter 67 68 @property 69 def kwargs(self): 70 """Dictionary of keyword arguments to use in future calls.""" 71 return self._kwargs.copy() 72 73 @property 74 def args(self): 75 """Tuple of positional arguments to use in future calls.""" 76 return self._args 77 78 def __call__(self, event_type, details): 79 """Activate the target callback with the given event + details. 80 81 NOTE(harlowja): if a details filter callback exists and it returns 82 a falsey value when called with the provided ``details``, then the 83 target callback will **not** be called. 84 """ 85 if self._details_filter is not None: 86 if not self._details_filter(details): 87 return 88 kwargs = self._kwargs.copy() 89 kwargs['details'] = details 90 self._callback(event_type, *self._args, **kwargs) 91 92 def __repr__(self): 93 repr_msg = "%s object at 0x%x calling into '%r'" % ( 94 reflection.get_class_name(self, fully_qualified=False), 95 id(self), self._callback) 96 if self._details_filter is not None: 97 repr_msg += " using details filter '%r'" % self._details_filter 98 return "<%s>" % repr_msg 99 100 def is_equivalent(self, callback, details_filter=None): 101 """Check if the callback is same 102 103 :param callback: callback used for comparison 104 :param details_filter: callback used for comparison 105 :returns: false if not the same callback, otherwise true 106 :rtype: boolean 107 """ 108 if not reflection.is_same_callback(self._callback, callback): 109 return False 110 if details_filter is not None: 111 if self._details_filter is None: 112 return False 113 else: 114 return reflection.is_same_callback(self._details_filter, 115 details_filter) 116 else: 117 return self._details_filter is None 118 119 def __eq__(self, other): 120 if isinstance(other, Listener): 121 return self.is_equivalent(other._callback, 122 details_filter=other._details_filter) 123 else: 124 return NotImplemented 125 126 def __ne__(self, other): 127 return not self.__eq__(other) 128 129 130class Notifier(object): 131 """A notification (`pub/sub`_ *like*) helper class. 132 133 It is intended to be used to subscribe to notifications of events 134 occurring as well as allow a entity to post said notifications to any 135 associated subscribers without having either entity care about how this 136 notification occurs. 137 138 **Not** thread-safe when a single notifier is mutated at the same 139 time by multiple threads. For example having multiple threads call 140 into :py:meth:`.register` or :py:meth:`.reset` at the same time could 141 potentially end badly. It is thread-safe when 142 only :py:meth:`.notify` calls or other read-only actions (like calling 143 into :py:meth:`.is_registered`) are occurring at the same time. 144 145 .. _pub/sub: http://en.wikipedia.org/wiki/Publish%E2%80%93subscribe_pattern 146 """ 147 148 #: Keys that can *not* be used in callbacks arguments 149 RESERVED_KEYS = ('details',) 150 151 #: Kleene star constant that is used to receive all notifications 152 ANY = '*' 153 154 #: Events which can *not* be used to trigger notifications 155 _DISALLOWED_NOTIFICATION_EVENTS = set([ANY]) 156 157 def __init__(self): 158 self._topics = collections.defaultdict(list) 159 160 def __len__(self): 161 """Returns how many callbacks are registered. 162 163 :returns: count of how many callbacks are registered 164 :rtype: number 165 """ 166 count = 0 167 for (_event_type, listeners) in six.iteritems(self._topics): 168 count += len(listeners) 169 return count 170 171 def is_registered(self, event_type, callback, details_filter=None): 172 """Check if a callback is registered. 173 174 :returns: checks if the callback is registered 175 :rtype: boolean 176 """ 177 for listener in self._topics.get(event_type, []): 178 if listener.is_equivalent(callback, details_filter=details_filter): 179 return True 180 return False 181 182 def reset(self): 183 """Forget all previously registered callbacks.""" 184 self._topics.clear() 185 186 def notify(self, event_type, details): 187 """Notify about event occurrence. 188 189 All callbacks registered to receive notifications about given 190 event type will be called. If the provided event type can not be 191 used to emit notifications (this is checked via 192 the :meth:`.can_be_registered` method) then it will silently be 193 dropped (notification failures are not allowed to cause or 194 raise exceptions). 195 196 :param event_type: event type that occurred 197 :param details: additional event details *dictionary* passed to 198 callback keyword argument with the same name 199 :type details: dictionary 200 """ 201 if not self.can_trigger_notification(event_type): 202 LOG.debug("Event type '%s' is not allowed to trigger" 203 " notifications", event_type) 204 return 205 listeners = list(self._topics.get(self.ANY, [])) 206 listeners.extend(self._topics.get(event_type, [])) 207 if not listeners: 208 return 209 if not details: 210 details = {} 211 for listener in listeners: 212 try: 213 listener(event_type, details.copy()) 214 except Exception: 215 LOG.warning("Failure calling listener %s to notify about event" 216 " %s, details: %s", listener, event_type, 217 details, exc_info=True) 218 219 def register(self, event_type, callback, 220 args=None, kwargs=None, details_filter=None): 221 """Register a callback to be called when event of a given type occurs. 222 223 Callback will be called with provided ``args`` and ``kwargs`` and 224 when event type occurs (or on any event if ``event_type`` equals to 225 :attr:`.ANY`). It will also get additional keyword argument, 226 ``details``, that will hold event details provided to the 227 :meth:`.notify` method (if a details filter callback is provided then 228 the target callback will *only* be triggered if the details filter 229 callback returns a truthy value). 230 231 :param event_type: event type input 232 :param callback: function callback to be registered. 233 :param args: non-keyworded arguments 234 :type args: list 235 :param kwargs: key-value pair arguments 236 :type kwargs: dictionary 237 """ 238 if not six.callable(callback): 239 raise ValueError("Event callback must be callable") 240 if details_filter is not None: 241 if not six.callable(details_filter): 242 raise ValueError("Details filter must be callable") 243 if not self.can_be_registered(event_type): 244 raise ValueError("Disallowed event type '%s' can not have a" 245 " callback registered" % event_type) 246 if self.is_registered(event_type, callback, 247 details_filter=details_filter): 248 raise ValueError("Event callback already registered with" 249 " equivalent details filter") 250 if kwargs: 251 for k in self.RESERVED_KEYS: 252 if k in kwargs: 253 raise KeyError("Reserved key '%s' not allowed in " 254 "kwargs" % k) 255 self._topics[event_type].append( 256 Listener(callback, 257 args=args, kwargs=kwargs, 258 details_filter=details_filter)) 259 260 def deregister(self, event_type, callback, details_filter=None): 261 """Remove a single listener bound to event ``event_type``. 262 263 :param event_type: deregister listener bound to event_type 264 """ 265 if event_type not in self._topics: 266 return False 267 for i, listener in enumerate(self._topics.get(event_type, [])): 268 if listener.is_equivalent(callback, details_filter=details_filter): 269 self._topics[event_type].pop(i) 270 return True 271 return False 272 273 def deregister_event(self, event_type): 274 """Remove a group of listeners bound to event ``event_type``. 275 276 :param event_type: deregister listeners bound to event_type 277 """ 278 return len(self._topics.pop(event_type, [])) 279 280 def copy(self): 281 c = copy.copy(self) 282 c._topics = collections.defaultdict(list) 283 for (event_type, listeners) in six.iteritems(self._topics): 284 c._topics[event_type] = listeners[:] 285 return c 286 287 def listeners_iter(self): 288 """Return an iterator over the mapping of event => listeners bound. 289 290 NOTE(harlowja): Each listener in the yielded (event, listeners) 291 tuple is an instance of the :py:class:`~.Listener` type, which 292 itself wraps a provided callback (and its details filter 293 callback, if any). 294 """ 295 for event_type, listeners in six.iteritems(self._topics): 296 if listeners: 297 yield (event_type, listeners) 298 299 def can_be_registered(self, event_type): 300 """Checks if the event can be registered/subscribed to.""" 301 return True 302 303 def can_trigger_notification(self, event_type): 304 """Checks if the event can trigger a notification. 305 306 :param event_type: event that needs to be verified 307 :returns: whether the event can trigger a notification 308 :rtype: boolean 309 """ 310 if event_type in self._DISALLOWED_NOTIFICATION_EVENTS: 311 return False 312 else: 313 return True 314 315 316class RestrictedNotifier(Notifier): 317 """A notification class that restricts events registered/triggered. 318 319 NOTE(harlowja): This class unlike :class:`.Notifier` restricts and 320 disallows registering callbacks for event types that are not declared 321 when constructing the notifier. 322 """ 323 324 def __init__(self, watchable_events, allow_any=True): 325 super(RestrictedNotifier, self).__init__() 326 self._watchable_events = frozenset(watchable_events) 327 self._allow_any = allow_any 328 329 def events_iter(self): 330 """Returns iterator of events that can be registered/subscribed to. 331 332 NOTE(harlowja): does not include back the ``ANY`` event type as that 333 meta-type is not a specific event but is a capture-all that does not 334 imply the same meaning as specific event types. 335 """ 336 for event_type in self._watchable_events: 337 yield event_type 338 339 def can_be_registered(self, event_type): 340 """Checks if the event can be registered/subscribed to. 341 342 :param event_type: event that needs to be verified 343 :returns: whether the event can be registered/subscribed to 344 :rtype: boolean 345 """ 346 return (event_type in self._watchable_events or 347 (event_type == self.ANY and self._allow_any)) 348 349 350@contextlib.contextmanager 351def register_deregister(notifier, event_type, callback=None, 352 args=None, kwargs=None, details_filter=None): 353 """Context manager that registers a callback, then deregisters on exit. 354 355 NOTE(harlowja): if the callback is none, then this registers nothing, which 356 is different from the behavior of the ``register`` method 357 which will *not* accept none as it is not callable... 358 """ 359 if callback is None: 360 yield 361 else: 362 notifier.register(event_type, callback, 363 args=args, kwargs=kwargs, 364 details_filter=details_filter) 365 try: 366 yield 367 finally: 368 notifier.deregister(event_type, callback, 369 details_filter=details_filter) 370