1# -*- test-case-name: twisted.words.test.test_jabberxmlstream -*-
2#
3# Copyright (c) Twisted Matrix Laboratories.
4# See LICENSE for details.
5
6"""
7XMPP XML Streams
8
9Building blocks for setting up XML Streams, including helping classes for
10doing authentication on either client or server side, and working with XML
11Stanzas.
12"""
13
14from hashlib import sha1
15from zope.interface import directlyProvides, implements
16
17from twisted.internet import defer, protocol
18from twisted.internet.error import ConnectionLost
19from twisted.python import failure, log, randbytes
20from twisted.words.protocols.jabber import error, ijabber, jid
21from twisted.words.xish import domish, xmlstream
22from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT
23from twisted.words.xish.xmlstream import STREAM_START_EVENT
24from twisted.words.xish.xmlstream import STREAM_END_EVENT
25from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT
26
27try:
28    from twisted.internet import ssl
29except ImportError:
30    ssl = None
31if ssl and not ssl.supported:
32    ssl = None
33
34STREAM_AUTHD_EVENT = intern("//event/stream/authd")
35INIT_FAILED_EVENT = intern("//event/xmpp/initfailed")
36
37NS_STREAMS = 'http://etherx.jabber.org/streams'
38NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls'
39
40Reset = object()
41
42def hashPassword(sid, password):
43    """
44    Create a SHA1-digest string of a session identifier and password.
45
46    @param sid: The stream session identifier.
47    @type sid: C{unicode}.
48    @param password: The password to be hashed.
49    @type password: C{unicode}.
50    """
51    if not isinstance(sid, unicode):
52        raise TypeError("The session identifier must be a unicode object")
53    if not isinstance(password, unicode):
54        raise TypeError("The password must be a unicode object")
55    input = u"%s%s" % (sid, password)
56    return sha1(input.encode('utf-8')).hexdigest()
57
58
59
60class Authenticator:
61    """
62    Base class for business logic of initializing an XmlStream
63
64    Subclass this object to enable an XmlStream to initialize and authenticate
65    to different types of stream hosts (such as clients, components, etc.).
66
67    Rules:
68      1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the
69         stream has been completely initialized.
70      2. The Authenticator SHOULD reset all state information when
71         L{associateWithStream} is called.
72      3. The Authenticator SHOULD override L{streamStarted}, and start
73         initialization there.
74
75    @type xmlstream: L{XmlStream}
76    @ivar xmlstream: The XmlStream that needs authentication
77
78    @note: the term authenticator is historical. Authenticators perform
79           all steps required to prepare the stream for the exchange
80           of XML stanzas.
81    """
82
83    def __init__(self):
84        self.xmlstream = None
85
86
87    def connectionMade(self):
88        """
89        Called by the XmlStream when the underlying socket connection is
90        in place.
91
92        This allows the Authenticator to send an initial root element, if it's
93        connecting, or wait for an inbound root from the peer if it's accepting
94        the connection.
95
96        Subclasses can use self.xmlstream.send() to send any initial data to
97        the peer.
98        """
99
100
101    def streamStarted(self, rootElement):
102        """
103        Called by the XmlStream when the stream has started.
104
105        A stream is considered to have started when the start tag of the root
106        element has been received.
107
108        This examines C{rootElement} to see if there is a version attribute.
109        If absent, C{0.0} is assumed per RFC 3920. Subsequently, the
110        minimum of the version from the received stream header and the
111        value stored in L{xmlstream} is taken and put back in L{xmlstream}.
112
113        Extensions of this method can extract more information from the
114        stream header and perform checks on them, optionally sending
115        stream errors and closing the stream.
116        """
117        if rootElement.hasAttribute("version"):
118            version = rootElement["version"].split(".")
119            try:
120                version = (int(version[0]), int(version[1]))
121            except (IndexError, ValueError):
122                version = (0, 0)
123        else:
124            version = (0, 0)
125
126        self.xmlstream.version = min(self.xmlstream.version, version)
127
128
129    def associateWithStream(self, xmlstream):
130        """
131        Called by the XmlStreamFactory when a connection has been made
132        to the requested peer, and an XmlStream object has been
133        instantiated.
134
135        The default implementation just saves a handle to the new
136        XmlStream.
137
138        @type xmlstream: L{XmlStream}
139        @param xmlstream: The XmlStream that will be passing events to this
140                          Authenticator.
141
142        """
143        self.xmlstream = xmlstream
144
145
146
147class ConnectAuthenticator(Authenticator):
148    """
149    Authenticator for initiating entities.
150    """
151
152    namespace = None
153
154    def __init__(self, otherHost):
155        self.otherHost = otherHost
156
157
158    def connectionMade(self):
159        self.xmlstream.namespace = self.namespace
160        self.xmlstream.otherEntity = jid.internJID(self.otherHost)
161        self.xmlstream.sendHeader()
162
163
164    def initializeStream(self):
165        """
166        Perform stream initialization procedures.
167
168        An L{XmlStream} holds a list of initializer objects in its
169        C{initializers} attribute. This method calls these initializers in
170        order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has
171        been successfully processed. Otherwise it dispatches the
172        C{INIT_FAILED_EVENT} event with the failure.
173
174        Initializers may return the special L{Reset} object to halt the
175        initialization processing. It signals that the current initializer was
176        successfully processed, but that the XML Stream has been reset. An
177        example is the TLSInitiatingInitializer.
178        """
179
180        def remove_first(result):
181            self.xmlstream.initializers.pop(0)
182
183            return result
184
185        def do_next(result):
186            """
187            Take the first initializer and process it.
188
189            On success, the initializer is removed from the list and
190            then next initializer will be tried.
191            """
192
193            if result is Reset:
194                return None
195
196            try:
197                init = self.xmlstream.initializers[0]
198            except IndexError:
199                self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT)
200                return None
201            else:
202                d = defer.maybeDeferred(init.initialize)
203                d.addCallback(remove_first)
204                d.addCallback(do_next)
205                return d
206
207        d = defer.succeed(None)
208        d.addCallback(do_next)
209        d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT)
210
211
212    def streamStarted(self, rootElement):
213        """
214        Called by the XmlStream when the stream has started.
215
216        This extends L{Authenticator.streamStarted} to extract further stream
217        headers from C{rootElement}, optionally wait for stream features being
218        received and then call C{initializeStream}.
219        """
220
221        Authenticator.streamStarted(self, rootElement)
222
223        self.xmlstream.sid = rootElement.getAttribute("id")
224
225        if rootElement.hasAttribute("from"):
226            self.xmlstream.otherEntity = jid.internJID(rootElement["from"])
227
228        # Setup observer for stream features, if applicable
229        if self.xmlstream.version >= (1, 0):
230            def onFeatures(element):
231                features = {}
232                for feature in element.elements():
233                    features[(feature.uri, feature.name)] = feature
234
235                self.xmlstream.features = features
236                self.initializeStream()
237
238            self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' %
239                                                  NS_STREAMS,
240                                              onFeatures)
241        else:
242            self.initializeStream()
243
244
245
246class ListenAuthenticator(Authenticator):
247    """
248    Authenticator for receiving entities.
249    """
250
251    namespace = None
252
253    def associateWithStream(self, xmlstream):
254        """
255        Called by the XmlStreamFactory when a connection has been made.
256
257        Extend L{Authenticator.associateWithStream} to set the L{XmlStream}
258        to be non-initiating.
259        """
260        Authenticator.associateWithStream(self, xmlstream)
261        self.xmlstream.initiating = False
262
263
264    def streamStarted(self, rootElement):
265        """
266        Called by the XmlStream when the stream has started.
267
268        This extends L{Authenticator.streamStarted} to extract further
269        information from the stream headers from C{rootElement}.
270        """
271        Authenticator.streamStarted(self, rootElement)
272
273        self.xmlstream.namespace = rootElement.defaultUri
274
275        if rootElement.hasAttribute("to"):
276            self.xmlstream.thisEntity = jid.internJID(rootElement["to"])
277
278        self.xmlstream.prefixes = {}
279        for prefix, uri in rootElement.localPrefixes.iteritems():
280            self.xmlstream.prefixes[uri] = prefix
281
282        self.xmlstream.sid = unicode(randbytes.secureRandom(8).encode('hex'))
283
284
285
286class FeatureNotAdvertized(Exception):
287    """
288    Exception indicating a stream feature was not advertized, while required by
289    the initiating entity.
290    """
291
292
293
294class BaseFeatureInitiatingInitializer(object):
295    """
296    Base class for initializers with a stream feature.
297
298    This assumes the associated XmlStream represents the initiating entity
299    of the connection.
300
301    @cvar feature: tuple of (uri, name) of the stream feature root element.
302    @type feature: tuple of (C{str}, C{str})
303    @ivar required: whether the stream feature is required to be advertized
304                    by the receiving entity.
305    @type required: C{bool}
306    """
307
308    implements(ijabber.IInitiatingInitializer)
309
310    feature = None
311    required = False
312
313    def __init__(self, xs):
314        self.xmlstream = xs
315
316
317    def initialize(self):
318        """
319        Initiate the initialization.
320
321        Checks if the receiving entity advertizes the stream feature. If it
322        does, the initialization is started. If it is not advertized, and the
323        C{required} instance variable is C{True}, it raises
324        L{FeatureNotAdvertized}. Otherwise, the initialization silently
325        succeeds.
326        """
327
328        if self.feature in self.xmlstream.features:
329            return self.start()
330        elif self.required:
331            raise FeatureNotAdvertized
332        else:
333            return None
334
335
336    def start(self):
337        """
338        Start the actual initialization.
339
340        May return a deferred for asynchronous initialization.
341        """
342
343
344
345class TLSError(Exception):
346    """
347    TLS base exception.
348    """
349
350
351
352class TLSFailed(TLSError):
353    """
354    Exception indicating failed TLS negotiation
355    """
356
357
358
359class TLSRequired(TLSError):
360    """
361    Exception indicating required TLS negotiation.
362
363    This exception is raised when the receiving entity requires TLS
364    negotiation and the initiating does not desire to negotiate TLS.
365    """
366
367
368
369class TLSNotSupported(TLSError):
370    """
371    Exception indicating missing TLS support.
372
373    This exception is raised when the initiating entity wants and requires to
374    negotiate TLS when the OpenSSL library is not available.
375    """
376
377
378
379class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer):
380    """
381    TLS stream initializer for the initiating entity.
382
383    It is strongly required to include this initializer in the list of
384    initializers for an XMPP stream. By default it will try to negotiate TLS.
385    An XMPP server may indicate that TLS is required. If TLS is not desired,
386    set the C{wanted} attribute to False instead of removing it from the list
387    of initializers, so a proper exception L{TLSRequired} can be raised.
388
389    @cvar wanted: indicates if TLS negotiation is wanted.
390    @type wanted: C{bool}
391    """
392
393    feature = (NS_XMPP_TLS, 'starttls')
394    wanted = True
395    _deferred = None
396
397    def onProceed(self, obj):
398        """
399        Proceed with TLS negotiation and reset the XML stream.
400        """
401
402        self.xmlstream.removeObserver('/failure', self.onFailure)
403        ctx = ssl.CertificateOptions()
404        self.xmlstream.transport.startTLS(ctx)
405        self.xmlstream.reset()
406        self.xmlstream.sendHeader()
407        self._deferred.callback(Reset)
408
409
410    def onFailure(self, obj):
411        self.xmlstream.removeObserver('/proceed', self.onProceed)
412        self._deferred.errback(TLSFailed())
413
414
415    def start(self):
416        """
417        Start TLS negotiation.
418
419        This checks if the receiving entity requires TLS, the SSL library is
420        available and uses the C{required} and C{wanted} instance variables to
421        determine what to do in the various different cases.
422
423        For example, if the SSL library is not available, and wanted and
424        required by the user, it raises an exception. However if it is not
425        required by both parties, initialization silently succeeds, moving
426        on to the next step.
427        """
428        if self.wanted:
429            if ssl is None:
430                if self.required:
431                    return defer.fail(TLSNotSupported())
432                else:
433                    return defer.succeed(None)
434            else:
435                pass
436        elif self.xmlstream.features[self.feature].required:
437            return defer.fail(TLSRequired())
438        else:
439            return defer.succeed(None)
440
441        self._deferred = defer.Deferred()
442        self.xmlstream.addOnetimeObserver("/proceed", self.onProceed)
443        self.xmlstream.addOnetimeObserver("/failure", self.onFailure)
444        self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls")))
445        return self._deferred
446
447
448
449class XmlStream(xmlstream.XmlStream):
450    """
451    XMPP XML Stream protocol handler.
452
453    @ivar version: XML stream version as a tuple (major, minor). Initially,
454                   this is set to the minimally supported version. Upon
455                   receiving the stream header of the peer, it is set to the
456                   minimum of that value and the version on the received
457                   header.
458    @type version: (C{int}, C{int})
459    @ivar namespace: default namespace URI for stream
460    @type namespace: C{unicode}
461    @ivar thisEntity: JID of this entity
462    @type thisEntity: L{JID}
463    @ivar otherEntity: JID of the peer entity
464    @type otherEntity: L{JID}
465    @ivar sid: session identifier
466    @type sid: C{unicode}
467    @ivar initiating: True if this is the initiating stream
468    @type initiating: C{bool}
469    @ivar features: map of (uri, name) to stream features element received from
470                    the receiving entity.
471    @type features: C{dict} of (C{unicode}, C{unicode}) to L{domish.Element}.
472    @ivar prefixes: map of URI to prefixes that are to appear on stream
473                    header.
474    @type prefixes: C{dict} of C{unicode} to C{unicode}
475    @ivar initializers: list of stream initializer objects
476    @type initializers: C{list} of objects that provide L{IInitializer}
477    @ivar authenticator: associated authenticator that uses C{initializers} to
478                         initialize the XML stream.
479    """
480
481    version = (1, 0)
482    namespace = 'invalid'
483    thisEntity = None
484    otherEntity = None
485    sid = None
486    initiating = True
487
488    _headerSent = False     # True if the stream header has been sent
489
490    def __init__(self, authenticator):
491        xmlstream.XmlStream.__init__(self)
492
493        self.prefixes = {NS_STREAMS: 'stream'}
494        self.authenticator = authenticator
495        self.initializers = []
496        self.features = {}
497
498        # Reset the authenticator
499        authenticator.associateWithStream(self)
500
501
502    def _callLater(self, *args, **kwargs):
503        from twisted.internet import reactor
504        return reactor.callLater(*args, **kwargs)
505
506
507    def reset(self):
508        """
509        Reset XML Stream.
510
511        Resets the XML Parser for incoming data. This is to be used after
512        successfully negotiating a new layer, e.g. TLS and SASL. Note that
513        registered event observers will continue to be in place.
514        """
515        self._headerSent = False
516        self._initializeStream()
517
518
519    def onStreamError(self, errelem):
520        """
521        Called when a stream:error element has been received.
522
523        Dispatches a L{STREAM_ERROR_EVENT} event with the error element to
524        allow for cleanup actions and drops the connection.
525
526        @param errelem: The received error element.
527        @type errelem: L{domish.Element}
528        """
529        self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)),
530                      STREAM_ERROR_EVENT)
531        self.transport.loseConnection()
532
533
534    def sendHeader(self):
535        """
536        Send stream header.
537        """
538        # set up optional extra namespaces
539        localPrefixes = {}
540        for uri, prefix in self.prefixes.iteritems():
541            if uri != NS_STREAMS:
542                localPrefixes[prefix] = uri
543
544        rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace,
545                                     localPrefixes=localPrefixes)
546
547        if self.otherEntity:
548            rootElement['to'] = self.otherEntity.userhost()
549
550        if self.thisEntity:
551            rootElement['from'] = self.thisEntity.userhost()
552
553        if not self.initiating and self.sid:
554            rootElement['id'] = self.sid
555
556        if self.version >= (1, 0):
557            rootElement['version'] = "%d.%d" % self.version
558
559        self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0))
560        self._headerSent = True
561
562
563    def sendFooter(self):
564        """
565        Send stream footer.
566        """
567        self.send('</stream:stream>')
568
569
570    def sendStreamError(self, streamError):
571        """
572        Send stream level error.
573
574        If we are the receiving entity, and haven't sent the header yet,
575        we sent one first.
576
577        After sending the stream error, the stream is closed and the transport
578        connection dropped.
579
580        @param streamError: stream error instance
581        @type streamError: L{error.StreamError}
582        """
583        if not self._headerSent and not self.initiating:
584            self.sendHeader()
585
586        if self._headerSent:
587            self.send(streamError.getElement())
588            self.sendFooter()
589
590        self.transport.loseConnection()
591
592
593    def send(self, obj):
594        """
595        Send data over the stream.
596
597        This overrides L{xmlstream.Xmlstream.send} to use the default namespace
598        of the stream header when serializing L{domish.IElement}s. It is
599        assumed that if you pass an object that provides L{domish.IElement},
600        it represents a direct child of the stream's root element.
601        """
602        if domish.IElement.providedBy(obj):
603            obj = obj.toXml(prefixes=self.prefixes,
604                            defaultUri=self.namespace,
605                            prefixesInScope=self.prefixes.values())
606
607        xmlstream.XmlStream.send(self, obj)
608
609
610    def connectionMade(self):
611        """
612        Called when a connection is made.
613
614        Notifies the authenticator when a connection has been made.
615        """
616        xmlstream.XmlStream.connectionMade(self)
617        self.authenticator.connectionMade()
618
619
620    def onDocumentStart(self, rootElement):
621        """
622        Called when the stream header has been received.
623
624        Extracts the header's C{id} and C{version} attributes from the root
625        element. The C{id} attribute is stored in our C{sid} attribute and the
626        C{version} attribute is parsed and the minimum of the version we sent
627        and the parsed C{version} attribute is stored as a tuple (major, minor)
628        in this class' C{version} attribute. If no C{version} attribute was
629        present, we assume version 0.0.
630
631        If appropriate (we are the initiating stream and the minimum of our and
632        the other party's version is at least 1.0), a one-time observer is
633        registered for getting the stream features. The registered function is
634        C{onFeatures}.
635
636        Ultimately, the authenticator's C{streamStarted} method will be called.
637
638        @param rootElement: The root element.
639        @type rootElement: L{domish.Element}
640        """
641        xmlstream.XmlStream.onDocumentStart(self, rootElement)
642
643        # Setup observer for stream errors
644        self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS,
645                                self.onStreamError)
646
647        self.authenticator.streamStarted(rootElement)
648
649
650
651class XmlStreamFactory(xmlstream.XmlStreamFactory):
652    """
653    Factory for Jabber XmlStream objects as a reconnecting client.
654
655    Note that this differs from L{xmlstream.XmlStreamFactory} in that
656    it generates Jabber specific L{XmlStream} instances that have
657    authenticators.
658    """
659
660    protocol = XmlStream
661
662    def __init__(self, authenticator):
663        xmlstream.XmlStreamFactory.__init__(self, authenticator)
664        self.authenticator = authenticator
665
666
667
668class XmlStreamServerFactory(xmlstream.BootstrapMixin,
669                             protocol.ServerFactory):
670    """
671    Factory for Jabber XmlStream objects as a server.
672
673    @since: 8.2.
674    @ivar authenticatorFactory: Factory callable that takes no arguments, to
675                                create a fresh authenticator to be associated
676                                with the XmlStream.
677    """
678
679    protocol = XmlStream
680
681    def __init__(self, authenticatorFactory):
682        xmlstream.BootstrapMixin.__init__(self)
683        self.authenticatorFactory = authenticatorFactory
684
685
686    def buildProtocol(self, addr):
687        """
688        Create an instance of XmlStream.
689
690        A new authenticator instance will be created and passed to the new
691        XmlStream. Registered bootstrap event observers are installed as well.
692        """
693        authenticator = self.authenticatorFactory()
694        xs = self.protocol(authenticator)
695        xs.factory = self
696        self.installBootstraps(xs)
697        return xs
698
699
700
701class TimeoutError(Exception):
702    """
703    Exception raised when no IQ response has been received before the
704    configured timeout.
705    """
706
707
708
709def upgradeWithIQResponseTracker(xs):
710    """
711    Enhances an XmlStream for iq response tracking.
712
713    This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a
714    response is an error iq stanza, the deferred has its errback invoked with a
715    failure that holds a L{StanzaException<error.StanzaException>} that is
716    easier to examine.
717    """
718    def callback(iq):
719        """
720        Handle iq response by firing associated deferred.
721        """
722        if getattr(iq, 'handled', False):
723            return
724
725        try:
726            d = xs.iqDeferreds[iq["id"]]
727        except KeyError:
728            pass
729        else:
730            del xs.iqDeferreds[iq["id"]]
731            iq.handled = True
732            if iq['type'] == 'error':
733                d.errback(error.exceptionFromStanza(iq))
734            else:
735                d.callback(iq)
736
737
738    def disconnected(_):
739        """
740        Make sure deferreds do not linger on after disconnect.
741
742        This errbacks all deferreds of iq's for which no response has been
743        received with a L{ConnectionLost} failure. Otherwise, the deferreds
744        will never be fired.
745        """
746        iqDeferreds = xs.iqDeferreds
747        xs.iqDeferreds = {}
748        for d in iqDeferreds.itervalues():
749            d.errback(ConnectionLost())
750
751    xs.iqDeferreds = {}
752    xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None)
753    xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected)
754    xs.addObserver('/iq[@type="result"]', callback)
755    xs.addObserver('/iq[@type="error"]', callback)
756    directlyProvides(xs, ijabber.IIQResponseTracker)
757
758
759
760class IQ(domish.Element):
761    """
762    Wrapper for an iq stanza.
763
764    Iq stanzas are used for communications with a request-response behaviour.
765    Each iq request is associated with an XML stream and has its own unique id
766    to be able to track the response.
767
768    @ivar timeout: if set, a timeout period after which the deferred returned
769                   by C{send} will have its errback called with a
770                   L{TimeoutError} failure.
771    @type timeout: C{float}
772    """
773
774    timeout = None
775
776    def __init__(self, xmlstream, stanzaType="set"):
777        """
778        @type xmlstream: L{xmlstream.XmlStream}
779        @param xmlstream: XmlStream to use for transmission of this IQ
780
781        @type stanzaType: C{str}
782        @param stanzaType: IQ type identifier ('get' or 'set')
783        """
784        domish.Element.__init__(self, (None, "iq"))
785        self.addUniqueId()
786        self["type"] = stanzaType
787        self._xmlstream = xmlstream
788
789
790    def send(self, to=None):
791        """
792        Send out this iq.
793
794        Returns a deferred that is fired when an iq response with the same id
795        is received. Result responses will be passed to the deferred callback.
796        Error responses will be transformed into a
797        L{StanzaError<error.StanzaError>} and result in the errback of the
798        deferred being invoked.
799
800        @rtype: L{defer.Deferred}
801        """
802        if to is not None:
803            self["to"] = to
804
805        if not ijabber.IIQResponseTracker.providedBy(self._xmlstream):
806            upgradeWithIQResponseTracker(self._xmlstream)
807
808        d = defer.Deferred()
809        self._xmlstream.iqDeferreds[self['id']] = d
810
811        timeout = self.timeout or self._xmlstream.iqDefaultTimeout
812        if timeout is not None:
813            def onTimeout():
814                del self._xmlstream.iqDeferreds[self['id']]
815                d.errback(TimeoutError("IQ timed out"))
816
817            call = self._xmlstream._callLater(timeout, onTimeout)
818
819            def cancelTimeout(result):
820                if call.active():
821                    call.cancel()
822
823                return result
824
825            d.addBoth(cancelTimeout)
826
827        self._xmlstream.send(self)
828        return d
829
830
831
832def toResponse(stanza, stanzaType=None):
833    """
834    Create a response stanza from another stanza.
835
836    This takes the addressing and id attributes from a stanza to create a (new,
837    empty) response stanza. The addressing attributes are swapped and the id
838    copied. Optionally, the stanza type of the response can be specified.
839
840    @param stanza: the original stanza
841    @type stanza: L{domish.Element}
842    @param stanzaType: optional response stanza type
843    @type stanzaType: C{str}
844    @return: the response stanza.
845    @rtype: L{domish.Element}
846    """
847
848    toAddr = stanza.getAttribute('from')
849    fromAddr = stanza.getAttribute('to')
850    stanzaID = stanza.getAttribute('id')
851
852    response = domish.Element((None, stanza.name))
853    if toAddr:
854        response['to'] = toAddr
855    if fromAddr:
856        response['from'] = fromAddr
857    if stanzaID:
858        response['id'] = stanzaID
859    if stanzaType:
860        response['type'] = stanzaType
861
862    return response
863
864
865
866class XMPPHandler(object):
867    """
868    XMPP protocol handler.
869
870    Classes derived from this class implement (part of) one or more XMPP
871    extension protocols, and are referred to as a subprotocol implementation.
872    """
873
874    implements(ijabber.IXMPPHandler)
875
876    def __init__(self):
877        self.parent = None
878        self.xmlstream = None
879
880
881    def setHandlerParent(self, parent):
882        self.parent = parent
883        self.parent.addHandler(self)
884
885
886    def disownHandlerParent(self, parent):
887        self.parent.removeHandler(self)
888        self.parent = None
889
890
891    def makeConnection(self, xs):
892        self.xmlstream = xs
893        self.connectionMade()
894
895
896    def connectionMade(self):
897        """
898        Called after a connection has been established.
899
900        Can be overridden to perform work before stream initialization.
901        """
902
903
904    def connectionInitialized(self):
905        """
906        The XML stream has been initialized.
907
908        Can be overridden to perform work after stream initialization, e.g. to
909        set up observers and start exchanging XML stanzas.
910        """
911
912
913    def connectionLost(self, reason):
914        """
915        The XML stream has been closed.
916
917        This method can be extended to inspect the C{reason} argument and
918        act on it.
919        """
920        self.xmlstream = None
921
922
923    def send(self, obj):
924        """
925        Send data over the managed XML stream.
926
927        @note: The stream manager maintains a queue for data sent using this
928               method when there is no current initialized XML stream. This
929               data is then sent as soon as a new stream has been established
930               and initialized. Subsequently, L{connectionInitialized} will be
931               called again. If this queueing is not desired, use C{send} on
932               C{self.xmlstream}.
933
934        @param obj: data to be sent over the XML stream. This is usually an
935                    object providing L{domish.IElement}, or serialized XML. See
936                    L{xmlstream.XmlStream} for details.
937        """
938        self.parent.send(obj)
939
940
941
942class XMPPHandlerCollection(object):
943    """
944    Collection of XMPP subprotocol handlers.
945
946    This allows for grouping of subprotocol handlers, but is not an
947    L{XMPPHandler} itself, so this is not recursive.
948
949    @ivar handlers: List of protocol handlers.
950    @type handlers: C{list} of objects providing
951                      L{IXMPPHandler}
952    """
953
954    implements(ijabber.IXMPPHandlerCollection)
955
956    def __init__(self):
957        self.handlers = []
958
959
960    def __iter__(self):
961        """
962        Act as a container for handlers.
963        """
964        return iter(self.handlers)
965
966
967    def addHandler(self, handler):
968        """
969        Add protocol handler.
970
971        Protocol handlers are expected to provide L{ijabber.IXMPPHandler}.
972        """
973        self.handlers.append(handler)
974
975
976    def removeHandler(self, handler):
977        """
978        Remove protocol handler.
979        """
980        self.handlers.remove(handler)
981
982
983
984class StreamManager(XMPPHandlerCollection):
985    """
986    Business logic representing a managed XMPP connection.
987
988    This maintains a single XMPP connection and provides facilities for packet
989    routing and transmission. Business logic modules are objects providing
990    L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added
991    using L{addHandler}.
992
993    @ivar xmlstream: currently managed XML stream
994    @type xmlstream: L{XmlStream}
995    @ivar logTraffic: if true, log all traffic.
996    @type logTraffic: C{bool}
997    @ivar _initialized: Whether the stream represented by L{xmlstream} has
998                        been initialized. This is used when caching outgoing
999                        stanzas.
1000    @type _initialized: C{bool}
1001    @ivar _packetQueue: internal buffer of unsent data. See L{send} for details.
1002    @type _packetQueue: C{list}
1003    """
1004
1005    logTraffic = False
1006
1007    def __init__(self, factory):
1008        XMPPHandlerCollection.__init__(self)
1009        self.xmlstream = None
1010        self._packetQueue = []
1011        self._initialized = False
1012
1013        factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected)
1014        factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd)
1015        factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed)
1016        factory.addBootstrap(STREAM_END_EVENT, self._disconnected)
1017        self.factory = factory
1018
1019
1020    def addHandler(self, handler):
1021        """
1022        Add protocol handler.
1023
1024        When an XML stream has already been established, the handler's
1025        C{connectionInitialized} will be called to get it up to speed.
1026        """
1027        XMPPHandlerCollection.addHandler(self, handler)
1028
1029        # get protocol handler up to speed when a connection has already
1030        # been established
1031        if self.xmlstream and self._initialized:
1032            handler.makeConnection(self.xmlstream)
1033            handler.connectionInitialized()
1034
1035
1036    def _connected(self, xs):
1037        """
1038        Called when the transport connection has been established.
1039
1040        Here we optionally set up traffic logging (depending on L{logTraffic})
1041        and call each handler's C{makeConnection} method with the L{XmlStream}
1042        instance.
1043        """
1044        def logDataIn(buf):
1045            log.msg("RECV: %r" % buf)
1046
1047        def logDataOut(buf):
1048            log.msg("SEND: %r" % buf)
1049
1050        if self.logTraffic:
1051            xs.rawDataInFn = logDataIn
1052            xs.rawDataOutFn = logDataOut
1053
1054        self.xmlstream = xs
1055
1056        for e in self:
1057            e.makeConnection(xs)
1058
1059
1060    def _authd(self, xs):
1061        """
1062        Called when the stream has been initialized.
1063
1064        Send out cached stanzas and call each handler's
1065        C{connectionInitialized} method.
1066        """
1067        # Flush all pending packets
1068        for p in self._packetQueue:
1069            xs.send(p)
1070        self._packetQueue = []
1071        self._initialized = True
1072
1073        # Notify all child services which implement
1074        # the IService interface
1075        for e in self:
1076            e.connectionInitialized()
1077
1078
1079    def initializationFailed(self, reason):
1080        """
1081        Called when stream initialization has failed.
1082
1083        Stream initialization has halted, with the reason indicated by
1084        C{reason}. It may be retried by calling the authenticator's
1085        C{initializeStream}. See the respective authenticators for details.
1086
1087        @param reason: A failure instance indicating why stream initialization
1088                       failed.
1089        @type reason: L{failure.Failure}
1090        """
1091
1092
1093    def _disconnected(self, reason):
1094        """
1095        Called when the stream has been closed.
1096
1097        From this point on, the manager doesn't interact with the
1098        L{XmlStream} anymore and notifies each handler that the connection
1099        was lost by calling its C{connectionLost} method.
1100        """
1101        self.xmlstream = None
1102        self._initialized = False
1103
1104        # Notify all child services which implement
1105        # the IService interface
1106        for e in self:
1107            e.connectionLost(reason)
1108
1109
1110    def send(self, obj):
1111        """
1112        Send data over the XML stream.
1113
1114        When there is no established XML stream, the data is queued and sent
1115        out when a new XML stream has been established and initialized.
1116
1117        @param obj: data to be sent over the XML stream. See
1118                    L{xmlstream.XmlStream.send} for details.
1119        """
1120        if self._initialized:
1121            self.xmlstream.send(obj)
1122        else:
1123            self._packetQueue.append(obj)
1124
1125
1126
1127__all__ = ['Authenticator', 'BaseFeatureInitiatingInitializer',
1128           'ConnectAuthenticator', 'FeatureNotAdvertized',
1129           'INIT_FAILED_EVENT', 'IQ', 'ListenAuthenticator', 'NS_STREAMS',
1130           'NS_XMPP_TLS', 'Reset', 'STREAM_AUTHD_EVENT',
1131           'STREAM_CONNECTED_EVENT', 'STREAM_END_EVENT', 'STREAM_ERROR_EVENT',
1132           'STREAM_START_EVENT', 'StreamManager', 'TLSError', 'TLSFailed',
1133           'TLSInitiatingInitializer', 'TLSNotSupported', 'TLSRequired',
1134           'TimeoutError', 'XMPPHandler', 'XMPPHandlerCollection', 'XmlStream',
1135           'XmlStreamFactory', 'XmlStreamServerFactory', 'hashPassword',
1136           'toResponse', 'upgradeWithIQResponseTracker']
1137