1# -*- test-case-name: twisted.words.test.test_xmlstream -*- 2# 3# Copyright (c) Twisted Matrix Laboratories. 4# See LICENSE for details. 5 6""" 7XML Stream processing. 8 9An XML Stream is defined as a connection over which two XML documents are 10exchanged during the lifetime of the connection, one for each direction. The 11unit of interaction is a direct child element of the root element (stanza). 12 13The most prominent use of XML Streams is Jabber, but this module is generically 14usable. See Twisted Words for Jabber specific protocol support. 15 16Maintainer: Ralph Meijer 17""" 18 19from twisted.python import failure 20from twisted.internet import protocol 21from twisted.words.xish import domish, utility 22 23STREAM_CONNECTED_EVENT = intern("//event/stream/connected") 24STREAM_START_EVENT = intern("//event/stream/start") 25STREAM_END_EVENT = intern("//event/stream/end") 26STREAM_ERROR_EVENT = intern("//event/stream/error") 27 28class XmlStream(protocol.Protocol, utility.EventDispatcher): 29 """ Generic Streaming XML protocol handler. 30 31 This protocol handler will parse incoming data as XML and dispatch events 32 accordingly. Incoming stanzas can be handled by registering observers using 33 XPath-like expressions that are matched against each stanza. See 34 L{utility.EventDispatcher} for details. 35 """ 36 def __init__(self): 37 utility.EventDispatcher.__init__(self) 38 self.stream = None 39 self.rawDataOutFn = None 40 self.rawDataInFn = None 41 42 def _initializeStream(self): 43 """ Sets up XML Parser. """ 44 self.stream = domish.elementStream() 45 self.stream.DocumentStartEvent = self.onDocumentStart 46 self.stream.ElementEvent = self.onElement 47 self.stream.DocumentEndEvent = self.onDocumentEnd 48 49 ### -------------------------------------------------------------- 50 ### 51 ### Protocol events 52 ### 53 ### -------------------------------------------------------------- 54 55 def connectionMade(self): 56 """ Called when a connection is made. 57 58 Sets up the XML parser and dispatches the L{STREAM_CONNECTED_EVENT} 59 event indicating the connection has been established. 60 """ 61 self._initializeStream() 62 self.dispatch(self, STREAM_CONNECTED_EVENT) 63 64 def dataReceived(self, data): 65 """ Called whenever data is received. 66 67 Passes the data to the XML parser. This can result in calls to the 68 DOM handlers. If a parse error occurs, the L{STREAM_ERROR_EVENT} event 69 is called to allow for cleanup actions, followed by dropping the 70 connection. 71 """ 72 try: 73 if self.rawDataInFn: 74 self.rawDataInFn(data) 75 self.stream.parse(data) 76 except domish.ParserError: 77 self.dispatch(failure.Failure(), STREAM_ERROR_EVENT) 78 self.transport.loseConnection() 79 80 def connectionLost(self, reason): 81 """ Called when the connection is shut down. 82 83 Dispatches the L{STREAM_END_EVENT}. 84 """ 85 self.dispatch(reason, STREAM_END_EVENT) 86 self.stream = None 87 88 ### -------------------------------------------------------------- 89 ### 90 ### DOM events 91 ### 92 ### -------------------------------------------------------------- 93 94 def onDocumentStart(self, rootElement): 95 """ Called whenever the start tag of a root element has been received. 96 97 Dispatches the L{STREAM_START_EVENT}. 98 """ 99 self.dispatch(self, STREAM_START_EVENT) 100 101 def onElement(self, element): 102 """ Called whenever a direct child element of the root element has 103 been received. 104 105 Dispatches the received element. 106 """ 107 self.dispatch(element) 108 109 def onDocumentEnd(self): 110 """ Called whenever the end tag of the root element has been received. 111 112 Closes the connection. This causes C{connectionLost} being called. 113 """ 114 self.transport.loseConnection() 115 116 def setDispatchFn(self, fn): 117 """ Set another function to handle elements. """ 118 self.stream.ElementEvent = fn 119 120 def resetDispatchFn(self): 121 """ Set the default function (C{onElement}) to handle elements. """ 122 self.stream.ElementEvent = self.onElement 123 124 def send(self, obj): 125 """ Send data over the stream. 126 127 Sends the given C{obj} over the connection. C{obj} may be instances of 128 L{domish.Element}, C{unicode} and C{str}. The first two will be 129 properly serialized and/or encoded. C{str} objects must be in UTF-8 130 encoding. 131 132 Note: because it is easy to make mistakes in maintaining a properly 133 encoded C{str} object, it is advised to use C{unicode} objects 134 everywhere when dealing with XML Streams. 135 136 @param obj: Object to be sent over the stream. 137 @type obj: L{domish.Element}, L{domish} or C{str} 138 139 """ 140 if domish.IElement.providedBy(obj): 141 obj = obj.toXml() 142 143 if isinstance(obj, unicode): 144 obj = obj.encode('utf-8') 145 146 if self.rawDataOutFn: 147 self.rawDataOutFn(obj) 148 149 self.transport.write(obj) 150 151 152 153class BootstrapMixin(object): 154 """ 155 XmlStream factory mixin to install bootstrap event observers. 156 157 This mixin is for factories providing 158 L{IProtocolFactory<twisted.internet.interfaces.IProtocolFactory>} to make 159 sure bootstrap event observers are set up on protocols, before incoming 160 data is processed. Such protocols typically derive from 161 L{utility.EventDispatcher}, like L{XmlStream}. 162 163 You can set up bootstrap event observers using C{addBootstrap}. The 164 C{event} and C{fn} parameters correspond with the C{event} and 165 C{observerfn} arguments to L{utility.EventDispatcher.addObserver}. 166 167 @since: 8.2. 168 @ivar bootstraps: The list of registered bootstrap event observers. 169 @type bootstrap: C{list} 170 """ 171 172 def __init__(self): 173 self.bootstraps = [] 174 175 176 def installBootstraps(self, dispatcher): 177 """ 178 Install registered bootstrap observers. 179 180 @param dispatcher: Event dispatcher to add the observers to. 181 @type dispatcher: L{utility.EventDispatcher} 182 """ 183 for event, fn in self.bootstraps: 184 dispatcher.addObserver(event, fn) 185 186 187 def addBootstrap(self, event, fn): 188 """ 189 Add a bootstrap event handler. 190 191 @param event: The event to register an observer for. 192 @type event: C{str} or L{xpath.XPathQuery} 193 @param fn: The observer callable to be registered. 194 """ 195 self.bootstraps.append((event, fn)) 196 197 198 def removeBootstrap(self, event, fn): 199 """ 200 Remove a bootstrap event handler. 201 202 @param event: The event the observer is registered for. 203 @type event: C{str} or L{xpath.XPathQuery} 204 @param fn: The registered observer callable. 205 """ 206 self.bootstraps.remove((event, fn)) 207 208 209 210class XmlStreamFactoryMixin(BootstrapMixin): 211 """ 212 XmlStream factory mixin that takes care of event handlers. 213 214 All positional and keyword arguments passed to create this factory are 215 passed on as-is to the protocol. 216 217 @ivar args: Positional arguments passed to the protocol upon instantiation. 218 @type args: C{tuple}. 219 @ivar kwargs: Keyword arguments passed to the protocol upon instantiation. 220 @type kwargs: C{dict}. 221 """ 222 223 def __init__(self, *args, **kwargs): 224 BootstrapMixin.__init__(self) 225 self.args = args 226 self.kwargs = kwargs 227 228 229 def buildProtocol(self, addr): 230 """ 231 Create an instance of XmlStream. 232 233 The returned instance will have bootstrap event observers registered 234 and will proceed to handle input on an incoming connection. 235 """ 236 xs = self.protocol(*self.args, **self.kwargs) 237 xs.factory = self 238 self.installBootstraps(xs) 239 return xs 240 241 242 243class XmlStreamFactory(XmlStreamFactoryMixin, 244 protocol.ReconnectingClientFactory): 245 """ 246 Factory for XmlStream protocol objects as a reconnection client. 247 """ 248 249 protocol = XmlStream 250 251 def buildProtocol(self, addr): 252 """ 253 Create a protocol instance. 254 255 Overrides L{XmlStreamFactoryMixin.buildProtocol} to work with 256 a L{ReconnectingClientFactory}. As this is called upon having an 257 connection established, we are resetting the delay for reconnection 258 attempts when the connection is lost again. 259 """ 260 self.resetDelay() 261 return XmlStreamFactoryMixin.buildProtocol(self, addr) 262