1"""
2Provide the Topic class.
3
4:copyright: Copyright since 2006 by Oliver Schoenborn, all rights reserved.
5:license: BSD, see LICENSE_BSD_Simple.txt for details.
6"""
7
8
9from weakref import ref as weakref
10
11from .listener import (
12    Listener,
13    ListenerValidator,
14)
15
16from .topicutils import (
17    ALL_TOPICS,
18    stringize,
19    tupleize,
20    validateName,
21    smartDedent,
22)
23
24from .topicexc import (
25    TopicDefnError,
26    TopicNameError,
27    ExcHandlerError,
28)
29
30from .publishermixin import PublisherMixin
31
32from .topicargspec import (
33    ArgsInfo,
34    ArgSpecGiven,
35    topicArgsFromCallable,
36    SenderMissingReqdMsgDataError,
37    SenderUnknownMsgDataError,
38    MessageDataSpecError,
39)
40
41from .. import py2and3
42
43
44class Topic(PublisherMixin):
45    """
46    Represent topics in pubsub. Contains information about a topic,
47    including topic's message data specification (MDS), the list of
48    subscribed listeners, docstring for the topic. It allows Python-like
49    access to subtopics (e.g. A.B is subtopic B of topic A).
50    """
51
52    def __init__(self, treeConfig, nameTuple, description,
53        msgArgsInfo, parent=None):
54        """Create a topic. Should only be called by TopicManager via its
55        getOrCreateTopic() method (which gets called in several places
56        in pubsub, such as sendMessage, subscribe, and newTopic).
57
58        :param treeConfig: topic tree configuration settings
59        :param nameTuple: topic name, in tuple format (no dots)
60        :param description: "docstring" for topic
61        :param ArgsInfo msgArgsInfo: object that defines MDS for topic
62        :param parent: parent of topic
63
64        :raises ValueError: invalid topic name
65        """
66        if parent is None:
67            if nameTuple != (ALL_TOPICS,):
68                msg = 'Only one topic, named %s, can be root of topic tree'
69                raise ValueError(msg % 'pub.ALL_TOPICS')
70        else:
71            validateName(nameTuple)
72        self.__tupleName = nameTuple
73
74        self.__handlingUncaughtListenerExc = False
75        self._treeConfig = treeConfig
76        PublisherMixin.__init__(self)
77
78        self.__validator = None
79        # Registered listeners were originally kept in a Python list; however
80        # a few methods require lookup of the Listener for the given callable,
81        # which is an O(n) operation. A set() could have been more suitable but
82        # there is no way of retrieving an element from a set without iterating
83        # over the set, again an O(n) operation. A dict() is ok too. Because
84        # Listener.__eq__(callable) returns true if the Listener instance wraps
85        # the given callable, and because Listener.__hash__ produces the hash
86        # value of the wrapped callable, calling dict[callable] on a
87        # dict(Listener -> Listener) mapping will be O(1) in most cases:
88        # the dict will take the callables hash, find the list of Listeners that
89        # have that hash, and then iterate over that inner list to find the
90        # Listener instance which satisfies Listener == callable, and will return
91        # the Listener.
92        self.__listeners = dict()
93
94        # specification:
95        self.__description  = None
96        self.setDescription(description)
97        self.__msgArgs = msgArgsInfo
98        if msgArgsInfo.isComplete():
99            self.__finalize()
100        else:
101            assert not self._treeConfig.raiseOnTopicUnspecified
102
103        # now that we know the args are fine, we can link to parent
104        self.__parentTopic = None
105        self.__subTopics = {}
106        if parent is None:
107            assert self.hasMDS()
108        else:
109            self.__parentTopic = weakref(parent)
110            assert self.__msgArgs.parentAI() is parent._getListenerSpec()
111            parent.__adoptSubtopic( self )
112
113    def setDescription(self, desc):
114        """Set the 'docstring' of topic"""
115        self.__description = desc
116
117    def getDescription(self):
118        """Return the 'docstring' of topic"""
119        if self.__description is None:
120            return None
121        return smartDedent(self.__description)
122
123    def setMsgArgSpec(self, argsDocs, required=()):
124        """Specify the message data for topic messages.
125        :param argsDocs: a dictionary of keyword names (message data name) and data 'docstring'; cannot be None
126        :param required: a list of those keyword names, appearing in argsDocs,
127        which are required (all others are assumed optional)
128
129        Can only be called if this info has not been already set at construction
130        or in a previous call.
131        :raise RuntimeError: if MDS already set at construction or previous call."""
132        assert self.__parentTopic is not None # for root of tree, this method never called!
133        if argsDocs is None:
134            raise ValueError('Cannot set listener spec to None')
135
136        if self.__msgArgs is None or not self.__msgArgs.isComplete():
137            try:
138                specGiven = ArgSpecGiven(argsDocs, required)
139                self.__msgArgs = ArgsInfo(self.__tupleName, specGiven,
140                    self.__parentTopic()._getListenerSpec())
141            except MessageDataSpecError:
142                # discard the lower part of the stack trace
143                exc = py2and3.getexcobj()
144                raise exc
145            self.__finalize()
146
147        else:
148            raise RuntimeError('Not allowed to call this: msg spec already set!')
149
150    def getArgs(self):
151        """Returns a pair (reqdArgs, optArgs) where reqdArgs is tuple
152        of names of required message arguments, optArgs is tuple
153        of names for optional arguments. If topic args not specified
154        yet, returns (None, None)."""
155        sendable = self.__msgArgs.isComplete()
156        assert sendable == self.hasMDS()
157        if sendable:
158            return (self.__msgArgs.allRequired ,
159                    self.__msgArgs.allOptional)
160        return None, None
161
162    def getArgDescriptions(self):
163        """Get a map of keyword names to docstrings: documents each MDS element. """
164        return self.__msgArgs.getArgsDocs()
165
166    def setArgDescriptions(self, **docs):
167        """Set the docstring for each MDS datum."""
168        self.__msgArgs.setArgsDocs(docs)
169
170    def hasMDS(self):
171        """Return true if this topic has a message data specification (MDS)."""
172        return self.__validator is not None
173
174    def filterMsgArgs(self, msgKwargs, check=False):
175        """Get the MDS docstrings for each of the spedified kwargs."""
176        filteredArgs = self.__msgArgs.filterArgs(msgKwargs)
177        # if no check of args yet, do it now:
178        if check:
179            self.__msgArgs.check(filteredArgs)
180        return filteredArgs
181
182    def isAll(self):
183        """Returns true if this topic is the 'all topics' topic. All root
184        topics behave as though they are child of that topic. """
185        return self.__tupleName == (ALL_TOPICS,)
186
187    def isRoot(self):
188        """Returns true if this is a "root" topic, false otherwise. A
189        root topic is a topic whose name contains no dots and which
190        has pub.ALL_TOPICS as parent."""
191        parent = self.getParent()
192        if parent:
193            return parent.isAll()
194        assert self.isAll()
195        return False
196
197    def getName(self):
198        """Return dotted form of full topic name"""
199        return stringize(self.__tupleName)
200
201    def getNameTuple(self):
202        """Return tuple form of full topic name"""
203        return self.__tupleName
204
205    def getNodeName(self):
206        """Return the last part of the topic name (has no dots)"""
207        name = self.__tupleName[-1]
208        return name
209
210    def getParent(self):
211        """Get Topic object that is parent of self (i.e. self is a subtopic
212        of parent). Return none if self is the "all topics" topic."""
213        if self.__parentTopic is None:
214            return None
215        return self.__parentTopic()
216
217    def hasSubtopic(self, name=None):
218        """Return true only if name is a subtopic of self. If name not
219        specified, return true only if self has at least one subtopic."""
220        if name is None:
221            return len(self.__subTopics) > 0
222
223        return name in self.__subTopics
224
225    def getSubtopic(self, relName):
226        """Get the specified subtopic object. The relName can be a valid
227        subtopic name, a dotted-name string, or a tuple. """
228        if not relName:
229            raise ValueError("getSubtopic() arg can't be empty")
230        topicTuple = tupleize(relName)
231        assert topicTuple
232
233        topicObj = self
234        for topicName in topicTuple:
235            child = topicObj.__subTopics.get(topicName)
236            if child is None:
237                msg = 'Topic "%s" doesn\'t have "%s" as subtopic' % (topicObj.getName(), topicName)
238                raise TopicNameError(relName, msg)
239            topicObj = child
240
241        return topicObj
242
243    def getSubtopics(self):
244        """Get a list of Topic instances that are subtopics of self."""
245        return py2and3.values(self.__subTopics)
246
247    def getNumListeners(self):
248        """Return number of listeners currently subscribed to topic. This is
249        different from number of listeners that will get notified since more
250        general topics up the topic tree may have listeners."""
251        return len(self.__listeners)
252
253    def hasListener(self, listener):
254        """Return true if listener is subscribed to this topic."""
255        return listener in self.__listeners
256
257    def hasListeners(self):
258        """Return true if there are any listeners subscribed to
259        this topic, false otherwise."""
260        return bool(self.__listeners)
261
262    def getListeners(self):
263        """Get a copy of list of listeners subscribed to this topic. Safe to iterate over while listeners
264        get un/subscribed from this topics (such as while sending a message)."""
265        return py2and3.keys(self.__listeners)
266
267    def getListenersIter(self):
268        """Get an iterator over listeners subscribed to this topic. Do not use if listeners can be
269        un/subscribed while iterating. """
270        return py2and3.iterkeys(self.__listeners)
271
272    def validate(self, listener):
273        """Checks whether listener could be subscribed to this topic:
274        if yes, just returns; if not, raises ListenerMismatchError.
275        Note that method raises TopicDefnError if self not
276        hasMDS()."""
277        if not self.hasMDS():
278            raise TopicDefnError(self.__tupleName)
279        return self.__validator.validate(listener)
280
281    def isValid(self, listener):
282        """Return True only if listener could be subscribed to this topic,
283        otherwise returns False. Note that method raises TopicDefnError
284        if self not hasMDS()."""
285        if not self.hasMDS():
286            raise TopicDefnError(self.__tupleName)
287        return self.__validator.isValid(listener)
288
289    def subscribe(self, listener):
290        """Subscribe listener to this topic. Returns a pair
291        (pub.Listener, success). The success is true only if listener
292        was not already subscribed and is now subscribed. """
293        if listener in self.__listeners:
294            assert self.hasMDS()
295            subdLisnr, newSub = self.__listeners[listener], False
296
297        else:
298            if self.__validator is None:
299                args, reqd = topicArgsFromCallable(listener)
300                self.setMsgArgSpec(args, reqd)
301            argsInfo = self.__validator.validate(listener)
302            weakListener = Listener(
303                listener, argsInfo, onDead=self.__onDeadListener)
304            self.__listeners[weakListener] = weakListener
305            subdLisnr, newSub = weakListener, True
306
307        # notify of subscription
308        self._treeConfig.notificationMgr.notifySubscribe(subdLisnr, self, newSub)
309
310        return subdLisnr, newSub
311
312    def unsubscribe(self, listener):
313        """Unsubscribe the specified listener from this topic. Returns
314        the pub.Listener object associated with the listener that was
315        unsubscribed, or None if the specified listener was not
316        subscribed to this topic.  Note that this method calls
317        ``notifyUnsubscribe(listener, self)`` on all registered notification
318        handlers (see pub.addNotificationHandler)."""
319        unsubdLisnr = self.__listeners.pop(listener, None)
320        if unsubdLisnr is None:
321            return None
322
323        unsubdLisnr._unlinkFromTopic_()
324        assert listener == unsubdLisnr.getCallable()
325
326        # notify of unsubscription
327        self._treeConfig.notificationMgr.notifyUnsubscribe(unsubdLisnr, self)
328
329        return unsubdLisnr
330
331    def unsubscribeAllListeners(self, filter=None):
332        """Clears list of subscribed listeners. If filter is given, it must
333        be a function that takes a listener and returns true if the listener
334        should be unsubscribed. Returns the list of Listener for listeners
335        that were unsubscribed."""
336        unsubd = []
337        if filter is None:
338            for listener in self.__listeners:
339                listener._unlinkFromTopic_()
340            unsubd = py2and3.keys(self.__listeners)
341            self.__listeners = {}
342        else:
343            unsubd = []
344            for listener in py2and3.keys(self.__listeners):
345                if filter(listener):
346                    unsubd.append(listener)
347                    listener._unlinkFromTopic_()
348                    del self.__listeners[listener]
349
350        # send notification regarding all listeners actually unsubscribed
351        notificationMgr = self._treeConfig.notificationMgr
352        for unsubdLisnr in unsubd:
353            notificationMgr.notifyUnsubscribe(unsubdLisnr, self)
354
355        return unsubd
356
357    #############################################################
358    #
359    # Impementation
360    #
361    #############################################################
362
363    def _getListenerSpec(self):
364        """Only to be called by pubsub package"""
365        return self.__msgArgs
366
367    def _publish(self, data):
368        """This sends message to listeners of parent topics as well.
369        If an exception is raised in a listener, the publish is
370        aborted, except if there is a handler (see
371        pub.setListenerExcHandler)."""
372        self._treeConfig.notificationMgr.notifySend('pre', self)
373
374        # send to ourself
375        iterState = self._mix_prePublish(data)
376        self.__sendMessage(data, self, iterState)
377
378        # send up the chain
379        topicObj = self.getParent()
380        while topicObj is not None:
381            if topicObj.hasListeners():
382                iterState = self._mix_prePublish(data, topicObj, iterState)
383                self.__sendMessage(data, topicObj, iterState)
384
385            # done for this topic, continue up branch to parent towards root
386            topicObj = topicObj.getParent()
387
388        self._treeConfig.notificationMgr.notifySend('post', self)
389
390    def __sendMessage(self, data, topicObj, iterState):
391        # now send message data to each listener for current topic;
392        # use list of listeners rather than iterator, so that if listeners added/removed during
393        # send loop, no runtime exception:
394        for listener in topicObj.getListeners():
395            try:
396                self._treeConfig.notificationMgr.notifySend('in', topicObj, pubListener=listener)
397                self._mix_callListener(listener, data, iterState)
398
399            except Exception:
400                # if exception handling is on, handle, otherwise re-raise
401                handler = self._treeConfig.listenerExcHandler
402                if handler is None or self.__handlingUncaughtListenerExc:
403                    raise
404
405                # try handling the exception so we can continue the send:
406                try:
407                    self.__handlingUncaughtListenerExc = True
408                    handler( listener.name(), topicObj )
409                    self.__handlingUncaughtListenerExc = False
410                except Exception:
411                    exc = py2and3.getexcobj()
412                    #print 'exception raised', exc
413                    self.__handlingUncaughtListenerExc = False
414                    raise ExcHandlerError(listener.name(), topicObj, exc)
415
416    def __finalize(self):
417        """Finalize the topic specification, which currently means
418        creating the listener validator for this topic. This allows
419        calls to subscribe() to validate that listener adheres to
420        topic's message data specification (MDS)."""
421        assert self.__msgArgs.isComplete()
422        assert not self.hasMDS()
423
424        # must make sure can adopt a validator
425        required = self.__msgArgs.allRequired
426        optional = self.__msgArgs.allOptional
427        self.__validator = ListenerValidator(required, list(optional) )
428        assert not self.__listeners
429
430    def _undefineSelf_(self, topicsMap):
431        """Called by topic manager when deleting a topic."""
432        if self.__parentTopic is not None:
433            self.__parentTopic().__abandonSubtopic(self.__tupleName[-1])
434        self.__undefineBranch(topicsMap)
435
436    def __undefineBranch(self, topicsMap):
437        """Unsubscribe all our listeners, remove all subtopics from self,
438        then detach from parent. Parent is not notified, because method
439        assumes it has been called by parent"""
440        #print 'Remove %s listeners (%s)' % (self.getName(), self.getNumListeners())
441        self.unsubscribeAllListeners()
442        self.__parentTopic = None
443
444        for subName, subObj in py2and3.iteritems(self.__subTopics):
445            assert isinstance(subObj, Topic)
446            #print 'Unlinking %s from parent' % subObj.getName()
447            subObj.__undefineBranch(topicsMap)
448
449        self.__subTopics = {}
450        del topicsMap[self.getName()]
451
452    def __adoptSubtopic(self, topicObj):
453        """Add topicObj as child topic."""
454        assert topicObj.__parentTopic() is self
455        attrName = topicObj.getNodeName()
456        self.__subTopics[attrName] = topicObj
457
458    def __abandonSubtopic(self, name):
459        """The given subtopic becomes orphan (no parent)."""
460        topicObj = self.__subTopics.pop(name)
461        assert topicObj.__parentTopic() is self
462
463    def __onDeadListener(self, weakListener):
464        """One of our subscribed listeners has died, so remove it and notify"""
465        pubListener = self.__listeners.pop(weakListener)
466        # notify:
467        self._treeConfig.notificationMgr.notifyDeadListener(pubListener, self)
468
469    def __str__(self):
470        return "%s(%s)" % (self.getName(), self.getNumListeners())
471
472
473