1 2""" 3Infrastructure code for testing. Implements incoming and outgoing xml/xmpp 4streams 5""" 6 7import servicetest 8from servicetest import Event, EventPattern 9import twisted 10from twisted.words.xish import domish, xpath, xmlstream 11from twisted.internet.protocol import Factory, ClientFactory 12from twisted.internet import reactor 13 14from ipv6 import listenTCP6, connectTCP6 15 16NS_STREAMS = 'http://etherx.jabber.org/streams' 17 18def make_stream_event(type, stanza): 19 event = servicetest.Event(type, stanza=stanza) 20 if stanza.hasAttribute("to"): 21 event.to = stanza.getAttribute("to") 22 else: 23 event.to = None 24 25 if stanza.hasAttribute("from"): 26 event.from_ = stanza.getAttribute("from") 27 else: 28 event.from_ = None 29 30 event.name = event.to 31 event.remote_name = event.from_ 32 33 return event 34 35def make_iq_event(iq): 36 event = make_stream_event('stream-iq', iq) 37 event.iq_type = iq.getAttribute("type") 38 event.iq_id = iq.getAttribute("id") 39 query = iq.firstChildElement() 40 41 if query: 42 event.query = query 43 event.query_ns = query.uri 44 event.query_name = query.name 45 46 if query.getAttribute("node"): 47 event.query_node = query.getAttribute("node") 48 else: 49 event.query = None 50 51 return event 52 53def make_presence_event(stanza): 54 event = make_stream_event('stream-presence', stanza) 55 event.presence_type = stanza.getAttribute('type') 56 57 statuses = xpath.queryForNodes('/presence/status', stanza) 58 59 if statuses: 60 event.presence_status = str(statuses[0]) 61 62 return event 63 64def make_message_event(stanza): 65 event = make_stream_event('stream-message', stanza) 66 event.message_type = stanza.getAttribute('type') 67 return event 68 69class BaseXmlStream(xmlstream.XmlStream): 70 prefixes = { NS_STREAMS: 'stream' } 71 version = "1.0" 72 namespace = 'jabber:client' 73 74 def __init__(self, event_function, name = None, remote_name = None): 75 xmlstream.XmlStream.__init__(self) 76 77 self.name = name 78 self.remote_name = remote_name 79 self.event_func = event_function 80 81 self.event_function = event_function 82 self.addObserver(xmlstream.STREAM_START_EVENT, 83 lambda *args: self.event(Event('stream-opened'))) 84 self.addObserver('//features', lambda x: self.event( 85 make_stream_event('stream-features', x))) 86 self.addObserver('//iq', lambda x: self.event( 87 make_iq_event(x))) 88 self.addObserver('//message', lambda x: self.event( 89 make_message_event(x))) 90 self.addObserver('//presence', lambda x: self.event( 91 make_presence_event(x))) 92 93 def send_header(self): 94 root = domish.Element((NS_STREAMS, 'stream'), 'jabber:client') 95 if self.name is not None: 96 root['from'] = self.name 97 if self.remote_name is not None: 98 root['to'] = self.remote_name 99 root['version'] = self.version 100 self.send(root.toXml(closeElement = 0, prefixes=self.prefixes)) 101 102 def event(self, e): 103 e.connection = self 104 self.event_function(e) 105 106 def send(self, obj): 107 if domish.IElement.providedBy(obj): 108 if self.name != None: 109 obj["from"] = self.name 110 if self.remote_name != None: 111 obj["to"] = self.remote_name 112 obj = obj.toXml(prefixes=self.prefixes) 113 114 xmlstream.XmlStream.send(self, obj) 115 116 117class IncomingXmppStream(BaseXmlStream): 118 def __init__(self, event_func, name): 119 BaseXmlStream.__init__(self, event_func, name, None) 120 121 def onDocumentStart(self, rootElement): 122 # Use the fact that it's always salut that connects, so it sends a 123 # proper opening 124 assert rootElement.name == "stream" 125 assert rootElement.uri == NS_STREAMS 126 127 assert rootElement.hasAttribute("from") 128 assert rootElement.hasAttribute("to") 129 if self.name is not None: 130 assert rootElement["to"] == self.name, self.name 131 132 assert rootElement.hasAttribute("version") 133 assert rootElement["version"] == "1.0" 134 135 self.remote_name = rootElement["from"] 136 self.send_header() 137 self.send_features() 138 BaseXmlStream.onDocumentStart(self, rootElement) 139 140 def send_features(self): 141 features = domish.Element((NS_STREAMS, 'features')) 142 self.send(features) 143 144class IncomingXmppFactory(Factory): 145 def buildProtocol(self, addr): 146 p = self.protocol() 147 p.factory = self 148 e = Event('incoming-connection', listener = self) 149 p.event(e) 150 return p 151 152def setup_stream_listener(queue, name, port = 0, protocol = None): 153 if protocol == None: 154 protocol = IncomingXmppStream 155 156 factory = IncomingXmppFactory() 157 factory.protocol = lambda *args: protocol(queue.append, name) 158 port = reactor.listenTCP(port, factory) 159 160 return (factory, port.getHost().port) 161 162def setup_stream_listener6(queue, name, port = 0, protocol = None): 163 if protocol == None: 164 protocol = IncomingXmppStream 165 166 factory = IncomingXmppFactory() 167 factory.protocol = lambda *args: protocol(queue.append, name) 168 port = listenTCP6(port, factory) 169 170 return (factory, port.getHost().port) 171 172class OutgoingXmppStream(BaseXmlStream): 173 def __init__(self, event_function, name, remote_name): 174 BaseXmlStream.__init__(self, event_function, name, remote_name) 175 self.addObserver(xmlstream.STREAM_CONNECTED_EVENT, self.connected) 176 177 def connected (self, stream): 178 e = Event('connection-result', succeeded = True) 179 self.event(e) 180 181 self.send_header() 182 183class OutgoingXmppiChatStream(OutgoingXmppStream): 184 def __init__(self, event_function, name, remote_name): 185 # set name and remote_name as None as iChat doesn't send 'to' and 186 # 'from' attributes. 187 OutgoingXmppStream.__init__(self, event_function, None, None) 188 189class IncomingXmppiChatStream(IncomingXmppStream): 190 def __init__(self, event_func, name): 191 # set name to None as iChat doesn't send 'from' attribute. 192 IncomingXmppStream.__init__(self, event_func, None) 193 194class OutgoingXmppFactory(ClientFactory): 195 def __init__(self, event_function): 196 self.event_func = event_function 197 198 def clientConnectionFailed(self, connector, reason): 199 ClientFactory.clientConnectionFailed(self, connector, reason) 200 e = Event('connection-result', succeeded = False, reason = reason) 201 self.event_func(e) 202 203def connect_to_stream(queue, name, remote_name, host, port, protocol = None): 204 if protocol == None: 205 protocol = OutgoingXmppStream 206 207 p = protocol(queue.append, name, remote_name) 208 209 factory = OutgoingXmppFactory(queue.append) 210 factory.protocol = lambda *args: p 211 reactor.connectTCP(host, port, factory) 212 213 return p 214 215def connect_to_stream6(queue, name, remote_name, host, port, protocol = None): 216 if protocol == None: 217 protocol = OutgoingXmppStream 218 219 p = protocol(queue.append, name, remote_name) 220 221 factory = OutgoingXmppFactory(queue.append) 222 factory.protocol = lambda *args: p 223 connectTCP6(reactor, host, port, factory) 224 225 return p 226 227if __name__ == '__main__': 228 def run_test(): 229 q = servicetest.IteratingEventQueue() 230 # Set verboseness if needed for debugging 231 #q.verbose = True 232 233 (listener, port) = setup_stream_listener(q, "incoming") 234 outbound = connect_to_stream(q, "outgoing", 235 "incoming", "localhost", port) 236 237 inbound = q.expect('incoming-connection', 238 listener = listener).connection 239 240 # inbound stream is opened first, then outbounds stream is opened and 241 # receive features 242 q.expect('stream-opened', connection = inbound) 243 q.expect('stream-opened', connection = outbound) 244 q.expect('stream-features', connection = outbound) 245 246 247 message = domish.Element(('','message')) 248 message.addElement('body', content="test123") 249 outbound.send(message) 250 251 e = q.expect('stream-message', connection=inbound) 252 253 # twisting twisted 254 reactor.stop() 255 256 reactor.callLater(0.1, run_test) 257 reactor.run() 258