1""" 2Provide the Topic class. 3 4:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved. 5:license: BSD, see LICENSE_BSD_Simple.txt for details. 6""" 7 8 9from weakref import ref as weakref 10 11from .listener import ( 12 Listener, 13 ListenerValidator, 14) 15 16from .topicutils import ( 17 ALL_TOPICS, 18 stringize, 19 tupleize, 20 validateName, 21 smartDedent, 22) 23 24from .topicexc import ( 25 TopicDefnError, 26 TopicNameError, 27 ExcHandlerError, 28) 29 30from .publishermixin import PublisherMixin 31 32from .topicargspec import ( 33 ArgsInfo, 34 ArgSpecGiven, 35 topicArgsFromCallable, 36 SenderMissingReqdMsgDataError, 37 SenderUnknownMsgDataError, 38 MessageDataSpecError, 39) 40 41from .. import py2and3 42 43 44class Topic(PublisherMixin): 45 """ 46 Represent topics in pubsub. Contains information about a topic, 47 including topic's message data specification (MDS), the list of 48 subscribed listeners, docstring for the topic. It allows Python-like 49 access to subtopics (e.g. A.B is subtopic B of topic A). 50 """ 51 52 def __init__(self, treeConfig, nameTuple, description, 53 msgArgsInfo, parent=None): 54 """Create a topic. Should only be called by TopicManager via its 55 getOrCreateTopic() method (which gets called in several places 56 in pubsub, such as sendMessage, subscribe, and newTopic). 57 58 :param treeConfig: topic tree configuration settings 59 :param nameTuple: topic name, in tuple format (no dots) 60 :param description: "docstring" for topic 61 :param ArgsInfo msgArgsInfo: object that defines MDS for topic 62 :param parent: parent of topic 63 64 :raises ValueError: invalid topic name 65 """ 66 if parent is None: 67 if nameTuple != (ALL_TOPICS,): 68 msg = 'Only one topic, named %s, can be root of topic tree' 69 raise ValueError(msg % 'pub.ALL_TOPICS') 70 else: 71 validateName(nameTuple) 72 self.__tupleName = nameTuple 73 74 self.__handlingUncaughtListenerExc = False 75 self._treeConfig = treeConfig 76 PublisherMixin.__init__(self) 77 78 self.__validator = None 79 # Registered listeners were originally kept in a Python list; however 80 # a few methods require lookup of the Listener for the given callable, 81 # which is an O(n) operation. A set() could have been more suitable but 82 # there is no way of retrieving an element from a set without iterating 83 # over the set, again an O(n) operation. A dict() is ok too. Because 84 # Listener.__eq__(callable) returns true if the Listener instance wraps 85 # the given callable, and because Listener.__hash__ produces the hash 86 # value of the wrapped callable, calling dict[callable] on a 87 # dict(Listener -> Listener) mapping will be O(1) in most cases: 88 # the dict will take the callables hash, find the list of Listeners that 89 # have that hash, and then iterate over that inner list to find the 90 # Listener instance which satisfies Listener == callable, and will return 91 # the Listener. 92 self.__listeners = dict() 93 94 # specification: 95 self.__description = None 96 self.setDescription(description) 97 self.__msgArgs = msgArgsInfo 98 if msgArgsInfo.isComplete(): 99 self.__finalize() 100 else: 101 assert not self._treeConfig.raiseOnTopicUnspecified 102 103 # now that we know the args are fine, we can link to parent 104 self.__parentTopic = None 105 self.__subTopics = {} 106 if parent is None: 107 assert self.hasMDS() 108 else: 109 self.__parentTopic = weakref(parent) 110 assert self.__msgArgs.parentAI() is parent._getListenerSpec() 111 parent.__adoptSubtopic( self ) 112 113 def setDescription(self, desc): 114 """Set the 'docstring' of topic""" 115 self.__description = desc 116 117 def getDescription(self): 118 """Return the 'docstring' of topic""" 119 if self.__description is None: 120 return None 121 return smartDedent(self.__description) 122 123 def setMsgArgSpec(self, argsDocs, required=()): 124 """Specify the message data for topic messages. 125 :param argsDocs: a dictionary of keyword names (message data name) and data 'docstring'; cannot be None 126 :param required: a list of those keyword names, appearing in argsDocs, 127 which are required (all others are assumed optional) 128 129 Can only be called if this info has not been already set at construction 130 or in a previous call. 131 :raise RuntimeError: if MDS already set at construction or previous call.""" 132 assert self.__parentTopic is not None # for root of tree, this method never called! 133 if argsDocs is None: 134 raise ValueError('Cannot set listener spec to None') 135 136 if self.__msgArgs is None or not self.__msgArgs.isComplete(): 137 try: 138 specGiven = ArgSpecGiven(argsDocs, required) 139 self.__msgArgs = ArgsInfo(self.__tupleName, specGiven, 140 self.__parentTopic()._getListenerSpec()) 141 except MessageDataSpecError: 142 # discard the lower part of the stack trace 143 exc = py2and3.getexcobj() 144 raise exc 145 self.__finalize() 146 147 else: 148 raise RuntimeError('Not allowed to call this: msg spec already set!') 149 150 def getArgs(self): 151 """Returns a pair (reqdArgs, optArgs) where reqdArgs is tuple 152 of names of required message arguments, optArgs is tuple 153 of names for optional arguments. If topic args not specified 154 yet, returns (None, None).""" 155 sendable = self.__msgArgs.isComplete() 156 assert sendable == self.hasMDS() 157 if sendable: 158 return (self.__msgArgs.allRequired , 159 self.__msgArgs.allOptional) 160 return None, None 161 162 def getArgDescriptions(self): 163 """Get a map of keyword names to docstrings: documents each MDS element. """ 164 return self.__msgArgs.getArgsDocs() 165 166 def setArgDescriptions(self, **docs): 167 """Set the docstring for each MDS datum.""" 168 self.__msgArgs.setArgsDocs(docs) 169 170 def hasMDS(self): 171 """Return true if this topic has a message data specification (MDS).""" 172 return self.__validator is not None 173 174 def filterMsgArgs(self, msgKwargs, check=False): 175 """Get the MDS docstrings for each of the spedified kwargs.""" 176 filteredArgs = self.__msgArgs.filterArgs(msgKwargs) 177 # if no check of args yet, do it now: 178 if check: 179 self.__msgArgs.check(filteredArgs) 180 return filteredArgs 181 182 def isAll(self): 183 """Returns true if this topic is the 'all topics' topic. All root 184 topics behave as though they are child of that topic. """ 185 return self.__tupleName == (ALL_TOPICS,) 186 187 def isRoot(self): 188 """Returns true if this is a "root" topic, false otherwise. A 189 root topic is a topic whose name contains no dots and which 190 has pub.ALL_TOPICS as parent.""" 191 parent = self.getParent() 192 if parent: 193 return parent.isAll() 194 assert self.isAll() 195 return False 196 197 def getName(self): 198 """Return dotted form of full topic name""" 199 return stringize(self.__tupleName) 200 201 def getNameTuple(self): 202 """Return tuple form of full topic name""" 203 return self.__tupleName 204 205 def getNodeName(self): 206 """Return the last part of the topic name (has no dots)""" 207 name = self.__tupleName[-1] 208 return name 209 210 def getParent(self): 211 """Get Topic object that is parent of self (i.e. self is a subtopic 212 of parent). Return none if self is the "all topics" topic.""" 213 if self.__parentTopic is None: 214 return None 215 return self.__parentTopic() 216 217 def hasSubtopic(self, name=None): 218 """Return true only if name is a subtopic of self. If name not 219 specified, return true only if self has at least one subtopic.""" 220 if name is None: 221 return len(self.__subTopics) > 0 222 223 return name in self.__subTopics 224 225 def getSubtopic(self, relName): 226 """Get the specified subtopic object. The relName can be a valid 227 subtopic name, a dotted-name string, or a tuple. """ 228 if not relName: 229 raise ValueError("getSubtopic() arg can't be empty") 230 topicTuple = tupleize(relName) 231 assert topicTuple 232 233 topicObj = self 234 for topicName in topicTuple: 235 child = topicObj.__subTopics.get(topicName) 236 if child is None: 237 msg = 'Topic "%s" doesn\'t have "%s" as subtopic' % (topicObj.getName(), topicName) 238 raise TopicNameError(relName, msg) 239 topicObj = child 240 241 return topicObj 242 243 def getSubtopics(self): 244 """Get a list of Topic instances that are subtopics of self.""" 245 return py2and3.values(self.__subTopics) 246 247 def getNumListeners(self): 248 """Return number of listeners currently subscribed to topic. This is 249 different from number of listeners that will get notified since more 250 general topics up the topic tree may have listeners.""" 251 return len(self.__listeners) 252 253 def hasListener(self, listener): 254 """Return true if listener is subscribed to this topic.""" 255 return listener in self.__listeners 256 257 def hasListeners(self): 258 """Return true if there are any listeners subscribed to 259 this topic, false otherwise.""" 260 return bool(self.__listeners) 261 262 def getListeners(self): 263 """Get a copy of list of listeners subscribed to this topic. Safe to iterate over while listeners 264 get un/subscribed from this topics (such as while sending a message).""" 265 return py2and3.keys(self.__listeners) 266 267 def getListenersIter(self): 268 """Get an iterator over listeners subscribed to this topic. Do not use if listeners can be 269 un/subscribed while iterating. """ 270 return py2and3.iterkeys(self.__listeners) 271 272 def validate(self, listener): 273 """Checks whether listener could be subscribed to this topic: 274 if yes, just returns; if not, raises ListenerMismatchError. 275 Note that method raises TopicDefnError if self not 276 hasMDS().""" 277 if not self.hasMDS(): 278 raise TopicDefnError(self.__tupleName) 279 return self.__validator.validate(listener) 280 281 def isValid(self, listener): 282 """Return True only if listener could be subscribed to this topic, 283 otherwise returns False. Note that method raises TopicDefnError 284 if self not hasMDS().""" 285 if not self.hasMDS(): 286 raise TopicDefnError(self.__tupleName) 287 return self.__validator.isValid(listener) 288 289 def subscribe(self, listener): 290 """Subscribe listener to this topic. Returns a pair 291 (pub.Listener, success). The success is true only if listener 292 was not already subscribed and is now subscribed. """ 293 if listener in self.__listeners: 294 assert self.hasMDS() 295 subdLisnr, newSub = self.__listeners[listener], False 296 297 else: 298 if self.__validator is None: 299 args, reqd = topicArgsFromCallable(listener) 300 self.setMsgArgSpec(args, reqd) 301 argsInfo = self.__validator.validate(listener) 302 weakListener = Listener( 303 listener, argsInfo, onDead=self.__onDeadListener) 304 self.__listeners[weakListener] = weakListener 305 subdLisnr, newSub = weakListener, True 306 307 # notify of subscription 308 self._treeConfig.notificationMgr.notifySubscribe(subdLisnr, self, newSub) 309 310 return subdLisnr, newSub 311 312 def unsubscribe(self, listener): 313 """Unsubscribe the specified listener from this topic. Returns 314 the pub.Listener object associated with the listener that was 315 unsubscribed, or None if the specified listener was not 316 subscribed to this topic. Note that this method calls 317 ``notifyUnsubscribe(listener, self)`` on all registered notification 318 handlers (see pub.addNotificationHandler).""" 319 unsubdLisnr = self.__listeners.pop(listener, None) 320 if unsubdLisnr is None: 321 return None 322 323 unsubdLisnr._unlinkFromTopic_() 324 assert listener == unsubdLisnr.getCallable() 325 326 # notify of unsubscription 327 self._treeConfig.notificationMgr.notifyUnsubscribe(unsubdLisnr, self) 328 329 return unsubdLisnr 330 331 def unsubscribeAllListeners(self, filter=None): 332 """Clears list of subscribed listeners. If filter is given, it must 333 be a function that takes a listener and returns true if the listener 334 should be unsubscribed. Returns the list of Listener for listeners 335 that were unsubscribed.""" 336 unsubd = [] 337 if filter is None: 338 for listener in self.__listeners: 339 listener._unlinkFromTopic_() 340 unsubd = py2and3.keys(self.__listeners) 341 self.__listeners = {} 342 else: 343 unsubd = [] 344 for listener in py2and3.keys(self.__listeners): 345 if filter(listener): 346 unsubd.append(listener) 347 listener._unlinkFromTopic_() 348 del self.__listeners[listener] 349 350 # send notification regarding all listeners actually unsubscribed 351 notificationMgr = self._treeConfig.notificationMgr 352 for unsubdLisnr in unsubd: 353 notificationMgr.notifyUnsubscribe(unsubdLisnr, self) 354 355 return unsubd 356 357 ############################################################# 358 # 359 # Impementation 360 # 361 ############################################################# 362 363 def _getListenerSpec(self): 364 """Only to be called by pubsub package""" 365 return self.__msgArgs 366 367 def _publish(self, data): 368 """This sends message to listeners of parent topics as well. 369 If an exception is raised in a listener, the publish is 370 aborted, except if there is a handler (see 371 pub.setListenerExcHandler).""" 372 self._treeConfig.notificationMgr.notifySend('pre', self) 373 374 # send to ourself 375 iterState = self._mix_prePublish(data) 376 self.__sendMessage(data, self, iterState) 377 378 # send up the chain 379 topicObj = self.getParent() 380 while topicObj is not None: 381 if topicObj.hasListeners(): 382 iterState = self._mix_prePublish(data, topicObj, iterState) 383 self.__sendMessage(data, topicObj, iterState) 384 385 # done for this topic, continue up branch to parent towards root 386 topicObj = topicObj.getParent() 387 388 self._treeConfig.notificationMgr.notifySend('post', self) 389 390 def __sendMessage(self, data, topicObj, iterState): 391 # now send message data to each listener for current topic; 392 # use list of listeners rather than iterator, so that if listeners added/removed during 393 # send loop, no runtime exception: 394 for listener in topicObj.getListeners(): 395 try: 396 self._treeConfig.notificationMgr.notifySend('in', topicObj, pubListener=listener) 397 self._mix_callListener(listener, data, iterState) 398 399 except Exception: 400 # if exception handling is on, handle, otherwise re-raise 401 handler = self._treeConfig.listenerExcHandler 402 if handler is None or self.__handlingUncaughtListenerExc: 403 raise 404 405 # try handling the exception so we can continue the send: 406 try: 407 self.__handlingUncaughtListenerExc = True 408 handler( listener.name(), topicObj ) 409 self.__handlingUncaughtListenerExc = False 410 except Exception: 411 exc = py2and3.getexcobj() 412 #print 'exception raised', exc 413 self.__handlingUncaughtListenerExc = False 414 raise ExcHandlerError(listener.name(), topicObj, exc) 415 416 def __finalize(self): 417 """Finalize the topic specification, which currently means 418 creating the listener validator for this topic. This allows 419 calls to subscribe() to validate that listener adheres to 420 topic's message data specification (MDS).""" 421 assert self.__msgArgs.isComplete() 422 assert not self.hasMDS() 423 424 # must make sure can adopt a validator 425 required = self.__msgArgs.allRequired 426 optional = self.__msgArgs.allOptional 427 self.__validator = ListenerValidator(required, list(optional) ) 428 assert not self.__listeners 429 430 def _undefineSelf_(self, topicsMap): 431 """Called by topic manager when deleting a topic.""" 432 if self.__parentTopic is not None: 433 self.__parentTopic().__abandonSubtopic(self.__tupleName[-1]) 434 self.__undefineBranch(topicsMap) 435 436 def __undefineBranch(self, topicsMap): 437 """Unsubscribe all our listeners, remove all subtopics from self, 438 then detach from parent. Parent is not notified, because method 439 assumes it has been called by parent""" 440 #print 'Remove %s listeners (%s)' % (self.getName(), self.getNumListeners()) 441 self.unsubscribeAllListeners() 442 self.__parentTopic = None 443 444 for subName, subObj in py2and3.iteritems(self.__subTopics): 445 assert isinstance(subObj, Topic) 446 #print 'Unlinking %s from parent' % subObj.getName() 447 subObj.__undefineBranch(topicsMap) 448 449 self.__subTopics = {} 450 del topicsMap[self.getName()] 451 452 def __adoptSubtopic(self, topicObj): 453 """Add topicObj as child topic.""" 454 assert topicObj.__parentTopic() is self 455 attrName = topicObj.getNodeName() 456 self.__subTopics[attrName] = topicObj 457 458 def __abandonSubtopic(self, name): 459 """The given subtopic becomes orphan (no parent).""" 460 topicObj = self.__subTopics.pop(name) 461 assert topicObj.__parentTopic() is self 462 463 def __onDeadListener(self, weakListener): 464 """One of our subscribed listeners has died, so remove it and notify""" 465 pubListener = self.__listeners.pop(weakListener) 466 # notify: 467 self._treeConfig.notificationMgr.notifyDeadListener(pubListener, self) 468 469 def __str__(self): 470 return "%s(%s)" % (self.getName(), self.getNumListeners()) 471 472 473