1""" 2 3:copyright: Copyright 2006-2009 by Oliver Schoenborn, all rights reserved. 4:license: BSD, see LICENSE.txt for details. 5 6 7""" 8 9import unittest 10from unittests import wtc 11 12from wx.lib.pubsub.pub import ( 13 ALL_TOPICS, 14 MessageDataSpecError, 15 TopicTreeTraverser, 16 TopicNameError, 17 TopicDefnError 18 ) 19 20from wx.lib.pubsub.core import ITopicDefnProvider 21 22from wx.lib.pubsub.core.topicmgr import \ 23 ArgSpecGiven 24 25from wx.lib.pubsub.core.topicutils import validateName 26 27from wx.lib.pubsub.utils.topictreeprinter import \ 28 printTreeDocs, ITopicTreeVisitor 29 30class lib_pubsub_TopicMgr0_Basic(wtc.PubsubTestCase): 31 """ 32 Only tests TopicMgr methods. This must use some query methods on 33 topic objects to validate that TopicMgr did it's job properly. 34 """ 35 def failTopicName(self, name): 36 self.assertRaises(TopicNameError, validateName, name) 37 38 def test10_GoodTopicNames(self): 39 # 40 # Test that valid topic names are accepted by pubsub' 41 # 42 43 validateName('test.asdf') 44 validateName('test.a') 45 validateName('test.a.b') 46 47 def test10_BadTopicNames(self): 48 # 49 # Test that invalid topic names are rejected by pubsub 50 # 51 52 # parts of topic name are 'empty' 53 self.failTopicName( '' ) 54 self.failTopicName( ('',) ) 55 self.failTopicName( ('test','asdf','') ) 56 self.failTopicName( ('test','a', None) ) 57 58 # parts of topic name have invalid char 59 self.failTopicName( ('test','a','b','_') ) 60 self.failTopicName( ('(aa',) ) 61 62 self.failTopicName( (ALL_TOPICS,) ) 63 64 65class lib_pubsub_TopicMgr1_GetOrCreate_NoDefnProv(wtc.PubsubTestCase): 66 """ 67 Only tests TopicMgr methods. This must use some query methods on 68 topic objects to validate that TopicMgr did it's job properly. 69 """ 70 71 def test10_NoProtoListener(self): 72 # 73 # Test the getOrCreateTopic without proto listener 74 # 75 topicMgr = self.pub.getDefaultTopicMgr() 76 77 def verifyNonSendable(topicObj, nameTuple, parent): 78 """Any non-sendable topic will satisfy these conditions:""" 79 self.assertEqual(0, topicMgr.hasTopicDefinition(nameTuple)) 80 assert not topicObj.hasMDS() 81 assert topicObj.getListeners() == [] 82 assert topicObj.getNameTuple() == nameTuple 83 assert topicObj.getNumListeners() == 0 84 assert topicObj.getParent() is parent 85 assert topicObj.getNodeName() == topicObj.getNameTuple()[-1] 86 def foobar(): 87 pass 88 assert not topicObj.hasListener(foobar) 89 assert not topicObj.hasListeners() 90 assert not topicObj.hasSubtopic('asdfafs') 91 assert not topicObj.isAll() 92 self.assertRaises(TopicDefnError, topicObj.isValid, foobar) 93 self.assertRaises(TopicDefnError, topicObj.validate, foobar) 94 # check that getTopic and getOrCreateTopic won't create again: 95 assert topicMgr.getOrCreateTopic(nameTuple) is topicObj 96 assert topicMgr.getTopic(nameTuple) is topicObj 97 98 # test with a root topic 99 rootName = 'GetOrCreate_NoProtoListener' 100 tName = rootName 101 # verify doesn't exist yet 102 assert topicMgr.getTopic(tName, True) is None 103 # ok create it, unsendable 104 rootTopic = topicMgr.getOrCreateTopic(tName) 105 verifyNonSendable(rootTopic, (rootName,), topicMgr.getRootAllTopics()) 106 DESC_NO_SPEC = 'UNDOCUMENTED: created without spec' 107 assert rootTopic.getDescription() == DESC_NO_SPEC 108 assert rootTopic.isRoot() 109 assert rootTopic.getSubtopics() == [] 110 assert not rootTopic.isAll() 111 assert not rootTopic.hasSubtopic() 112 113 # test with a subtopic 114 tName1 = (rootName, 'stB') 115 tName2 = tName1 + ('sstC',) 116 assert topicMgr.getTopic(tName1, True) is None 117 assert topicMgr.getTopic(tName2, True) is None 118 subsubTopic = topicMgr.getOrCreateTopic(tName2) 119 # verify that parent was created implicitly 120 subTopic = topicMgr.getTopic(tName1) 121 verifyNonSendable(subTopic, tName1, rootTopic) 122 verifyNonSendable(subsubTopic, tName2, subTopic) 123 assert subsubTopic.getDescription() == DESC_NO_SPEC 124 DESC_PARENT_NO_SPEC = 'UNDOCUMENTED: created as parent without specification' 125 assert subTopic.getDescription() == DESC_PARENT_NO_SPEC 126 assert rootTopic.getSubtopics() == [subTopic] 127 assert rootTopic.hasSubtopic() 128 assert subTopic.getSubtopics() == [subsubTopic] 129 assert subTopic.hasSubtopic() 130 assert subsubTopic.getSubtopics() == [] 131 assert not subsubTopic.hasSubtopic() 132 133 # check that getTopic raises expected exception when undefined topic: 134 tName = 'Undefined' 135 self.assertRaises(TopicNameError, topicMgr.getTopic, tName) 136 tName = rootName + '.Undefined' 137 self.assertRaises(TopicNameError, topicMgr.getTopic, tName) 138 139 def test20_WithProtoListener(self): 140 # 141 # Test the getOrCreateTopic with proto listener 142 # 143 topicMgr = self.pub.getDefaultTopicMgr() 144 145 rootName = 'GetOrCreate_WithProtoListener' 146 tName = rootName 147 # verify doesn't exist yet 148 assert topicMgr.getTopic(tName, True) is None 149 def protoListener(arg1, arg2=None): 150 pass 151 # ok create it, sendable 152 rootTopic = topicMgr.getOrCreateTopic(tName, protoListener) 153 # check that getTopic and getOrCreateTopic won't create again: 154 assert topicMgr.getOrCreateTopic(tName) is rootTopic 155 assert topicMgr.getTopic(tName) is rootTopic 156 assert rootTopic.hasMDS() 157 assert topicMgr.hasTopicDefinition(tName) 158 expectDesc = 'UNDOCUMENTED: created from protoListener "protoListener" in module' 159 assert rootTopic.getDescription().startswith(expectDesc) 160 161 # check that topic created can discern between good and bad listener 162 assert rootTopic.isValid(protoListener) 163 def badListener1(): 164 pass # missing required arg 165 def badListener2(arg2): 166 pass # opt arg is required 167 def badListener3(arg1, arg3): 168 pass # extra required arg 169 assert not rootTopic.isValid(badListener1) 170 assert not rootTopic.isValid(badListener2) 171 assert not rootTopic.isValid(badListener3) 172 173 # verify that missing parent created is not sendable, child is 174 def protoListener2(arg1, arg2=None): 175 pass 176 tName = (tName, 'stA', 'sstB') 177 subsubTopic = topicMgr.getOrCreateTopic(tName, protoListener2) 178 subTopic = topicMgr.getTopic( tName[:-1] ) 179 assert not topicMgr.hasTopicDefinition( tName[:-1] ) 180 assert topicMgr.hasTopicDefinition( tName ) 181 assert subsubTopic.isValid(protoListener2) 182 183 184class lib_pubsub_TopicMgr2_GetOrCreate_DefnProv(wtc.PubsubTestCase): 185 """ 186 Test TopicManager when one or more definition providers 187 can provide for some topic definitions. 188 """ 189 190 def test10_DefnProvider(self): 191 # 192 # Test the addition and clearing of definition providers 193 # 194 topicMgr = self.pub.getDefaultTopicMgr() 195 196 class DefnProvider(ITopicDefnProvider): 197 pass 198 dp1 = DefnProvider() 199 dp2 = DefnProvider() 200 topicMgr.addDefnProvider(dp1) 201 assert 1 == topicMgr.getNumDefnProviders() 202 topicMgr.addDefnProvider(dp1) 203 assert 1 == topicMgr.getNumDefnProviders() 204 topicMgr.addDefnProvider(dp2) 205 assert 2 == topicMgr.getNumDefnProviders() 206 topicMgr.addDefnProvider(dp2) 207 assert 2 == topicMgr.getNumDefnProviders() 208 topicMgr.addDefnProvider(dp1) 209 assert 2 == topicMgr.getNumDefnProviders() 210 211 topicMgr.clearDefnProviders() 212 topicMgr.getNumDefnProviders() 213 assert 0 == topicMgr.getNumDefnProviders() 214 topicMgr.addDefnProvider(dp1) 215 assert 1 == topicMgr.getNumDefnProviders() 216 topicMgr.clearDefnProviders() 217 218 def test20_UseProvider(self): 219 # 220 # Test the use of definition providers for topics. We create 221 # two so we can check that more than one can work together. 222 # One provides good definitions, one provides some with errors. 223 # 224 topicMgr = self.pub.getDefaultTopicMgr() 225 226 class DefnProvider(ITopicDefnProvider): 227 """ 228 Provide definitions for a root topic, subtopic, and 229 one subtopic whose parent is not defined here. It is easier 230 to use sub-only definitions. 231 """ 232 def __init__(self): 233 self.defns = { 234 ('a',) : (dict(arg1='arg1 desc', arg2='arg2 desc'), 235 ('arg1',) ), 236 ('a', 'b') : (dict(arg1='arg1 desc', arg2='arg2 desc', 237 arg3='arg3 desc', arg4='arg2 desc'), 238 ('arg1', 'arg3',) ), 239 # parent doesn't have defn 240 ('a', 'c', 'd') : ( 241 dict(arg1='arg1 desc', arg2='arg2 desc', 242 arg3='arg3 desc', arg4='arg4 desc', 243 arg5='arg5 desc', arg6='arg6 desc'), 244 ('arg1', 'arg3', 'arg5',)), 245 } 246 247 def getDefn(self, topicNameTuple): 248 if topicNameTuple not in self.defns: 249 return None, None 250 defn = ArgSpecGiven() 251 defn.setAll( * self.defns[topicNameTuple] ) 252 desc = '%s desc' % '.'.join(topicNameTuple) 253 return desc, defn 254 255 class DefnProviderErr(ITopicDefnProvider): 256 """ 257 Provide some definitions that have wrong arg spec. It is 258 easier to use the 'all-spec' for definitions, which provides 259 an opportunity for a different method of ArgSpecGiven. 260 """ 261 def __init__(self): 262 self.defns = { 263 ('a', 'err1') : (# missing arg2 264 dict(arg1=''), 265 ('arg1',) ), 266 ('a', 'err2') : (# missing arg1 267 dict(arg2=''), ), 268 ('a', 'err3') : (# arg1 is no longer required 269 dict(arg1='', arg2=''), ), 270 } 271 272 def getDefn(self, topicNameTuple): 273 if topicNameTuple not in self.defns: 274 return None, None 275 defn = ArgSpecGiven() 276 defn.setAll( * self.defns[topicNameTuple] ) 277 desc = '%s desc' % '.'.join(topicNameTuple) 278 return desc, defn 279 280 topicMgr.addDefnProvider( DefnProvider() ) 281 topicMgr.addDefnProvider( DefnProviderErr() ) 282 283 # create some topics that will use defn provider 284 topic = topicMgr.getOrCreateTopic('a') 285 assert topic.getDescription() == 'a desc' 286 assert topic.hasMDS() 287 topic = topicMgr.getOrCreateTopic('a.b') 288 assert topic.getDescription() == 'a.b desc' 289 assert topic.hasMDS() 290 topic = topicMgr.getOrCreateTopic('a.c.d') 291 assert topic.getDescription() == 'a.c.d desc' 292 assert topic.hasMDS() 293 assert not topicMgr.hasTopicDefinition('a.c') 294 # check 295 parent = topicMgr.getTopic('a.c') 296 assert not parent.hasMDS() 297 def protoListener(arg1, arg3, arg2=None, arg4=None): pass 298 parent = topicMgr.getOrCreateTopic('a.c', protoListener) 299 assert parent.hasMDS() 300 assert topic.hasMDS() 301 302 # now the erroneous ones: 303 def testRaises(topicName, expectMsg): 304 self.assertRaises(MessageDataSpecError, topicMgr.getOrCreateTopic, 305 topicName) 306 try: 307 assert topicMgr.getOrCreateTopic(topicName) is None 308 except MessageDataSpecError as exc: 309 # ok, did raise but is it correct message? 310 try: 311 str(exc).index(expectMsg) 312 except ValueError: 313 msg = 'Wrong message, expected \n "%s", got \n "%s"' 314 raise RuntimeError(msg % (expectMsg, str(exc)) ) 315 316 testRaises('a.err1', 'Params [arg1] missing inherited [arg2] for topic "a.err1"') 317 testRaises('a.err2', 'Params [arg2] missing inherited [arg1] for topic "a.err2"') 318 testRaises('a.err3', 'Params [] missing inherited [arg1] for topic "a.err3" required args') 319 320 321 def test30_DelTopic(self): 322 # 323 # Test topic deletion 324 # 325 topicMgr = self.pub.getDefaultTopicMgr() 326 327 topicMgr.getOrCreateTopic('delTopic.b.c.d.e') 328 assert topicMgr.getTopic('delTopic.b.c.d.e') is not None 329 assert topicMgr.getTopic('delTopic.b.c.d').hasSubtopic('e') 330 331 assert topicMgr.getTopic('delTopic.b').hasSubtopic('c') 332 topicMgr.delTopic('delTopic.b.c') 333 assert not topicMgr.getTopic('delTopic.b').hasSubtopic('c') 334 assert topicMgr.getTopic('delTopic.b.c.d.e', okIfNone=True) is None 335 assert topicMgr.getTopic('delTopic.b.c.d', okIfNone=True) is None 336 assert topicMgr.getTopic('delTopic.b.c', okIfNone=True) is None 337 338 339class lib_pubsub_TopicMgr3_TreeTraverser(wtc.PubsubTestCase): 340 expectedOutput = '''\ 341\-- Topic "a2" 342 \-- Topic "a" 343 \-- Topic "a" 344 \-- Topic "b" 345 \-- Topic "b" 346 \-- Topic "a" 347 \-- Topic "b"''' 348 349 350 def test1(self): 351 # 352 # Test printing of topic tree 353 # 354 topicMgr = self.pub.getDefaultTopicMgr() 355 356 root = topicMgr.getOrCreateTopic('a2') 357 topicMgr.getOrCreateTopic('a2.a.a') 358 topicMgr.getOrCreateTopic('a2.a.b') 359 topicMgr.getOrCreateTopic('a2.b.a') 360 topicMgr.getOrCreateTopic('a2.b.b') 361 362 from six import StringIO 363 buffer = StringIO() 364 printTreeDocs(rootTopic=root, width=70, fileObj=buffer) 365 self.assertEqual( buffer.getvalue(), self.expectedOutput ) 366 367 def test2(self): 368 # 369 # Test traversing with and without filtering, breadth and depth 370 # 371 topicMgr = self.pub.getDefaultTopicMgr() 372 373 class MyTraverser(ITopicTreeVisitor): 374 def __init__(self, pub): 375 self.traverser = pub.TopicTreeTraverser(self) 376 self.calls = '' 377 self.topics = [] 378 379 def traverse(self, rootTopic, **kwargs): 380 self.traverser.traverse(rootTopic, **kwargs) 381 382 def __append(self, val): 383 self.calls = self.calls + str(val) 384 385 def _startTraversal(self): 386 self.__append(1) 387 388 def _accept(self, topicObj): 389 self.__append(2) 390 # only accept topics at root or second level tree, or if tailName() is 'A' 391 return len(topicObj.getNameTuple()) <= 2 or topicObj.getNodeName() == 'A' 392 393 def _onTopic(self, topicObj): 394 self.__append(3) 395 self.topics.append(topicObj.getNodeName()) 396 397 def _startChildren(self): 398 self.__append(4) 399 400 def _endChildren(self): 401 self.__append(5) 402 403 def _doneTraversal(self): 404 self.__append(6) 405 406 root = topicMgr.getOrCreateTopic('traversal') 407 topicMgr.getOrCreateTopic('traversal.a.A') 408 topicMgr.getOrCreateTopic('traversal.a.B.foo') 409 topicMgr.getOrCreateTopic('traversal.b.C') 410 topicMgr.getOrCreateTopic('traversal.b.D.bar') 411 412 def exe(expectCalls, expectTopics, **kwargs): 413 traverser = MyTraverser(self.pub) 414 traverser.traverse(root, **kwargs) 415 self.assertEqual(traverser.topics, expectTopics) 416 self.assertEqual(traverser.calls, expectCalls) 417 418 exe(expectCalls = '13434345343455534345343455556', 419 expectTopics = ['traversal', 'a', 'A', 'B', 'foo', 'b', 'C', 'D', 'bar'], 420 onlyFiltered = False) 421 exe(expectCalls = '13433543354335454354543545456', 422 expectTopics = ['traversal', 'a', 'b', 'A', 'B', 'C', 'D', 'foo', 'bar'], 423 how = TopicTreeTraverser.BREADTH, onlyFiltered = False) 424 exe(expectCalls = '123423423452523422556', 425 expectTopics = ['traversal','a','A','b']) 426 exe(expectCalls = '123423235423254225456', 427 expectTopics = ['traversal','a','b','A'], 428 how = TopicTreeTraverser.BREADTH) 429 430 431#--------------------------------------------------------------------------- 432 433 434if __name__ == '__main__': 435 unittest.main() 436