1# -*- test-case-name: twisted.words.test.test_jabberxmlstream -*- 2# 3# Copyright (c) Twisted Matrix Laboratories. 4# See LICENSE for details. 5 6""" 7XMPP XML Streams 8 9Building blocks for setting up XML Streams, including helping classes for 10doing authentication on either client or server side, and working with XML 11Stanzas. 12""" 13 14from hashlib import sha1 15from zope.interface import directlyProvides, implements 16 17from twisted.internet import defer, protocol 18from twisted.internet.error import ConnectionLost 19from twisted.python import failure, log, randbytes 20from twisted.words.protocols.jabber import error, ijabber, jid 21from twisted.words.xish import domish, xmlstream 22from twisted.words.xish.xmlstream import STREAM_CONNECTED_EVENT 23from twisted.words.xish.xmlstream import STREAM_START_EVENT 24from twisted.words.xish.xmlstream import STREAM_END_EVENT 25from twisted.words.xish.xmlstream import STREAM_ERROR_EVENT 26 27try: 28 from twisted.internet import ssl 29except ImportError: 30 ssl = None 31if ssl and not ssl.supported: 32 ssl = None 33 34STREAM_AUTHD_EVENT = intern("//event/stream/authd") 35INIT_FAILED_EVENT = intern("//event/xmpp/initfailed") 36 37NS_STREAMS = 'http://etherx.jabber.org/streams' 38NS_XMPP_TLS = 'urn:ietf:params:xml:ns:xmpp-tls' 39 40Reset = object() 41 42def hashPassword(sid, password): 43 """ 44 Create a SHA1-digest string of a session identifier and password. 45 46 @param sid: The stream session identifier. 47 @type sid: C{unicode}. 48 @param password: The password to be hashed. 49 @type password: C{unicode}. 50 """ 51 if not isinstance(sid, unicode): 52 raise TypeError("The session identifier must be a unicode object") 53 if not isinstance(password, unicode): 54 raise TypeError("The password must be a unicode object") 55 input = u"%s%s" % (sid, password) 56 return sha1(input.encode('utf-8')).hexdigest() 57 58 59 60class Authenticator: 61 """ 62 Base class for business logic of initializing an XmlStream 63 64 Subclass this object to enable an XmlStream to initialize and authenticate 65 to different types of stream hosts (such as clients, components, etc.). 66 67 Rules: 68 1. The Authenticator MUST dispatch a L{STREAM_AUTHD_EVENT} when the 69 stream has been completely initialized. 70 2. The Authenticator SHOULD reset all state information when 71 L{associateWithStream} is called. 72 3. The Authenticator SHOULD override L{streamStarted}, and start 73 initialization there. 74 75 @type xmlstream: L{XmlStream} 76 @ivar xmlstream: The XmlStream that needs authentication 77 78 @note: the term authenticator is historical. Authenticators perform 79 all steps required to prepare the stream for the exchange 80 of XML stanzas. 81 """ 82 83 def __init__(self): 84 self.xmlstream = None 85 86 87 def connectionMade(self): 88 """ 89 Called by the XmlStream when the underlying socket connection is 90 in place. 91 92 This allows the Authenticator to send an initial root element, if it's 93 connecting, or wait for an inbound root from the peer if it's accepting 94 the connection. 95 96 Subclasses can use self.xmlstream.send() to send any initial data to 97 the peer. 98 """ 99 100 101 def streamStarted(self, rootElement): 102 """ 103 Called by the XmlStream when the stream has started. 104 105 A stream is considered to have started when the start tag of the root 106 element has been received. 107 108 This examines C{rootElement} to see if there is a version attribute. 109 If absent, C{0.0} is assumed per RFC 3920. Subsequently, the 110 minimum of the version from the received stream header and the 111 value stored in L{xmlstream} is taken and put back in L{xmlstream}. 112 113 Extensions of this method can extract more information from the 114 stream header and perform checks on them, optionally sending 115 stream errors and closing the stream. 116 """ 117 if rootElement.hasAttribute("version"): 118 version = rootElement["version"].split(".") 119 try: 120 version = (int(version[0]), int(version[1])) 121 except (IndexError, ValueError): 122 version = (0, 0) 123 else: 124 version = (0, 0) 125 126 self.xmlstream.version = min(self.xmlstream.version, version) 127 128 129 def associateWithStream(self, xmlstream): 130 """ 131 Called by the XmlStreamFactory when a connection has been made 132 to the requested peer, and an XmlStream object has been 133 instantiated. 134 135 The default implementation just saves a handle to the new 136 XmlStream. 137 138 @type xmlstream: L{XmlStream} 139 @param xmlstream: The XmlStream that will be passing events to this 140 Authenticator. 141 142 """ 143 self.xmlstream = xmlstream 144 145 146 147class ConnectAuthenticator(Authenticator): 148 """ 149 Authenticator for initiating entities. 150 """ 151 152 namespace = None 153 154 def __init__(self, otherHost): 155 self.otherHost = otherHost 156 157 158 def connectionMade(self): 159 self.xmlstream.namespace = self.namespace 160 self.xmlstream.otherEntity = jid.internJID(self.otherHost) 161 self.xmlstream.sendHeader() 162 163 164 def initializeStream(self): 165 """ 166 Perform stream initialization procedures. 167 168 An L{XmlStream} holds a list of initializer objects in its 169 C{initializers} attribute. This method calls these initializers in 170 order and dispatches the C{STREAM_AUTHD_EVENT} event when the list has 171 been successfully processed. Otherwise it dispatches the 172 C{INIT_FAILED_EVENT} event with the failure. 173 174 Initializers may return the special L{Reset} object to halt the 175 initialization processing. It signals that the current initializer was 176 successfully processed, but that the XML Stream has been reset. An 177 example is the TLSInitiatingInitializer. 178 """ 179 180 def remove_first(result): 181 self.xmlstream.initializers.pop(0) 182 183 return result 184 185 def do_next(result): 186 """ 187 Take the first initializer and process it. 188 189 On success, the initializer is removed from the list and 190 then next initializer will be tried. 191 """ 192 193 if result is Reset: 194 return None 195 196 try: 197 init = self.xmlstream.initializers[0] 198 except IndexError: 199 self.xmlstream.dispatch(self.xmlstream, STREAM_AUTHD_EVENT) 200 return None 201 else: 202 d = defer.maybeDeferred(init.initialize) 203 d.addCallback(remove_first) 204 d.addCallback(do_next) 205 return d 206 207 d = defer.succeed(None) 208 d.addCallback(do_next) 209 d.addErrback(self.xmlstream.dispatch, INIT_FAILED_EVENT) 210 211 212 def streamStarted(self, rootElement): 213 """ 214 Called by the XmlStream when the stream has started. 215 216 This extends L{Authenticator.streamStarted} to extract further stream 217 headers from C{rootElement}, optionally wait for stream features being 218 received and then call C{initializeStream}. 219 """ 220 221 Authenticator.streamStarted(self, rootElement) 222 223 self.xmlstream.sid = rootElement.getAttribute("id") 224 225 if rootElement.hasAttribute("from"): 226 self.xmlstream.otherEntity = jid.internJID(rootElement["from"]) 227 228 # Setup observer for stream features, if applicable 229 if self.xmlstream.version >= (1, 0): 230 def onFeatures(element): 231 features = {} 232 for feature in element.elements(): 233 features[(feature.uri, feature.name)] = feature 234 235 self.xmlstream.features = features 236 self.initializeStream() 237 238 self.xmlstream.addOnetimeObserver('/features[@xmlns="%s"]' % 239 NS_STREAMS, 240 onFeatures) 241 else: 242 self.initializeStream() 243 244 245 246class ListenAuthenticator(Authenticator): 247 """ 248 Authenticator for receiving entities. 249 """ 250 251 namespace = None 252 253 def associateWithStream(self, xmlstream): 254 """ 255 Called by the XmlStreamFactory when a connection has been made. 256 257 Extend L{Authenticator.associateWithStream} to set the L{XmlStream} 258 to be non-initiating. 259 """ 260 Authenticator.associateWithStream(self, xmlstream) 261 self.xmlstream.initiating = False 262 263 264 def streamStarted(self, rootElement): 265 """ 266 Called by the XmlStream when the stream has started. 267 268 This extends L{Authenticator.streamStarted} to extract further 269 information from the stream headers from C{rootElement}. 270 """ 271 Authenticator.streamStarted(self, rootElement) 272 273 self.xmlstream.namespace = rootElement.defaultUri 274 275 if rootElement.hasAttribute("to"): 276 self.xmlstream.thisEntity = jid.internJID(rootElement["to"]) 277 278 self.xmlstream.prefixes = {} 279 for prefix, uri in rootElement.localPrefixes.iteritems(): 280 self.xmlstream.prefixes[uri] = prefix 281 282 self.xmlstream.sid = unicode(randbytes.secureRandom(8).encode('hex')) 283 284 285 286class FeatureNotAdvertized(Exception): 287 """ 288 Exception indicating a stream feature was not advertized, while required by 289 the initiating entity. 290 """ 291 292 293 294class BaseFeatureInitiatingInitializer(object): 295 """ 296 Base class for initializers with a stream feature. 297 298 This assumes the associated XmlStream represents the initiating entity 299 of the connection. 300 301 @cvar feature: tuple of (uri, name) of the stream feature root element. 302 @type feature: tuple of (C{str}, C{str}) 303 @ivar required: whether the stream feature is required to be advertized 304 by the receiving entity. 305 @type required: C{bool} 306 """ 307 308 implements(ijabber.IInitiatingInitializer) 309 310 feature = None 311 required = False 312 313 def __init__(self, xs): 314 self.xmlstream = xs 315 316 317 def initialize(self): 318 """ 319 Initiate the initialization. 320 321 Checks if the receiving entity advertizes the stream feature. If it 322 does, the initialization is started. If it is not advertized, and the 323 C{required} instance variable is C{True}, it raises 324 L{FeatureNotAdvertized}. Otherwise, the initialization silently 325 succeeds. 326 """ 327 328 if self.feature in self.xmlstream.features: 329 return self.start() 330 elif self.required: 331 raise FeatureNotAdvertized 332 else: 333 return None 334 335 336 def start(self): 337 """ 338 Start the actual initialization. 339 340 May return a deferred for asynchronous initialization. 341 """ 342 343 344 345class TLSError(Exception): 346 """ 347 TLS base exception. 348 """ 349 350 351 352class TLSFailed(TLSError): 353 """ 354 Exception indicating failed TLS negotiation 355 """ 356 357 358 359class TLSRequired(TLSError): 360 """ 361 Exception indicating required TLS negotiation. 362 363 This exception is raised when the receiving entity requires TLS 364 negotiation and the initiating does not desire to negotiate TLS. 365 """ 366 367 368 369class TLSNotSupported(TLSError): 370 """ 371 Exception indicating missing TLS support. 372 373 This exception is raised when the initiating entity wants and requires to 374 negotiate TLS when the OpenSSL library is not available. 375 """ 376 377 378 379class TLSInitiatingInitializer(BaseFeatureInitiatingInitializer): 380 """ 381 TLS stream initializer for the initiating entity. 382 383 It is strongly required to include this initializer in the list of 384 initializers for an XMPP stream. By default it will try to negotiate TLS. 385 An XMPP server may indicate that TLS is required. If TLS is not desired, 386 set the C{wanted} attribute to False instead of removing it from the list 387 of initializers, so a proper exception L{TLSRequired} can be raised. 388 389 @cvar wanted: indicates if TLS negotiation is wanted. 390 @type wanted: C{bool} 391 """ 392 393 feature = (NS_XMPP_TLS, 'starttls') 394 wanted = True 395 _deferred = None 396 397 def onProceed(self, obj): 398 """ 399 Proceed with TLS negotiation and reset the XML stream. 400 """ 401 402 self.xmlstream.removeObserver('/failure', self.onFailure) 403 ctx = ssl.CertificateOptions() 404 self.xmlstream.transport.startTLS(ctx) 405 self.xmlstream.reset() 406 self.xmlstream.sendHeader() 407 self._deferred.callback(Reset) 408 409 410 def onFailure(self, obj): 411 self.xmlstream.removeObserver('/proceed', self.onProceed) 412 self._deferred.errback(TLSFailed()) 413 414 415 def start(self): 416 """ 417 Start TLS negotiation. 418 419 This checks if the receiving entity requires TLS, the SSL library is 420 available and uses the C{required} and C{wanted} instance variables to 421 determine what to do in the various different cases. 422 423 For example, if the SSL library is not available, and wanted and 424 required by the user, it raises an exception. However if it is not 425 required by both parties, initialization silently succeeds, moving 426 on to the next step. 427 """ 428 if self.wanted: 429 if ssl is None: 430 if self.required: 431 return defer.fail(TLSNotSupported()) 432 else: 433 return defer.succeed(None) 434 else: 435 pass 436 elif self.xmlstream.features[self.feature].required: 437 return defer.fail(TLSRequired()) 438 else: 439 return defer.succeed(None) 440 441 self._deferred = defer.Deferred() 442 self.xmlstream.addOnetimeObserver("/proceed", self.onProceed) 443 self.xmlstream.addOnetimeObserver("/failure", self.onFailure) 444 self.xmlstream.send(domish.Element((NS_XMPP_TLS, "starttls"))) 445 return self._deferred 446 447 448 449class XmlStream(xmlstream.XmlStream): 450 """ 451 XMPP XML Stream protocol handler. 452 453 @ivar version: XML stream version as a tuple (major, minor). Initially, 454 this is set to the minimally supported version. Upon 455 receiving the stream header of the peer, it is set to the 456 minimum of that value and the version on the received 457 header. 458 @type version: (C{int}, C{int}) 459 @ivar namespace: default namespace URI for stream 460 @type namespace: C{unicode} 461 @ivar thisEntity: JID of this entity 462 @type thisEntity: L{JID} 463 @ivar otherEntity: JID of the peer entity 464 @type otherEntity: L{JID} 465 @ivar sid: session identifier 466 @type sid: C{unicode} 467 @ivar initiating: True if this is the initiating stream 468 @type initiating: C{bool} 469 @ivar features: map of (uri, name) to stream features element received from 470 the receiving entity. 471 @type features: C{dict} of (C{unicode}, C{unicode}) to L{domish.Element}. 472 @ivar prefixes: map of URI to prefixes that are to appear on stream 473 header. 474 @type prefixes: C{dict} of C{unicode} to C{unicode} 475 @ivar initializers: list of stream initializer objects 476 @type initializers: C{list} of objects that provide L{IInitializer} 477 @ivar authenticator: associated authenticator that uses C{initializers} to 478 initialize the XML stream. 479 """ 480 481 version = (1, 0) 482 namespace = 'invalid' 483 thisEntity = None 484 otherEntity = None 485 sid = None 486 initiating = True 487 488 _headerSent = False # True if the stream header has been sent 489 490 def __init__(self, authenticator): 491 xmlstream.XmlStream.__init__(self) 492 493 self.prefixes = {NS_STREAMS: 'stream'} 494 self.authenticator = authenticator 495 self.initializers = [] 496 self.features = {} 497 498 # Reset the authenticator 499 authenticator.associateWithStream(self) 500 501 502 def _callLater(self, *args, **kwargs): 503 from twisted.internet import reactor 504 return reactor.callLater(*args, **kwargs) 505 506 507 def reset(self): 508 """ 509 Reset XML Stream. 510 511 Resets the XML Parser for incoming data. This is to be used after 512 successfully negotiating a new layer, e.g. TLS and SASL. Note that 513 registered event observers will continue to be in place. 514 """ 515 self._headerSent = False 516 self._initializeStream() 517 518 519 def onStreamError(self, errelem): 520 """ 521 Called when a stream:error element has been received. 522 523 Dispatches a L{STREAM_ERROR_EVENT} event with the error element to 524 allow for cleanup actions and drops the connection. 525 526 @param errelem: The received error element. 527 @type errelem: L{domish.Element} 528 """ 529 self.dispatch(failure.Failure(error.exceptionFromStreamError(errelem)), 530 STREAM_ERROR_EVENT) 531 self.transport.loseConnection() 532 533 534 def sendHeader(self): 535 """ 536 Send stream header. 537 """ 538 # set up optional extra namespaces 539 localPrefixes = {} 540 for uri, prefix in self.prefixes.iteritems(): 541 if uri != NS_STREAMS: 542 localPrefixes[prefix] = uri 543 544 rootElement = domish.Element((NS_STREAMS, 'stream'), self.namespace, 545 localPrefixes=localPrefixes) 546 547 if self.otherEntity: 548 rootElement['to'] = self.otherEntity.userhost() 549 550 if self.thisEntity: 551 rootElement['from'] = self.thisEntity.userhost() 552 553 if not self.initiating and self.sid: 554 rootElement['id'] = self.sid 555 556 if self.version >= (1, 0): 557 rootElement['version'] = "%d.%d" % self.version 558 559 self.send(rootElement.toXml(prefixes=self.prefixes, closeElement=0)) 560 self._headerSent = True 561 562 563 def sendFooter(self): 564 """ 565 Send stream footer. 566 """ 567 self.send('</stream:stream>') 568 569 570 def sendStreamError(self, streamError): 571 """ 572 Send stream level error. 573 574 If we are the receiving entity, and haven't sent the header yet, 575 we sent one first. 576 577 After sending the stream error, the stream is closed and the transport 578 connection dropped. 579 580 @param streamError: stream error instance 581 @type streamError: L{error.StreamError} 582 """ 583 if not self._headerSent and not self.initiating: 584 self.sendHeader() 585 586 if self._headerSent: 587 self.send(streamError.getElement()) 588 self.sendFooter() 589 590 self.transport.loseConnection() 591 592 593 def send(self, obj): 594 """ 595 Send data over the stream. 596 597 This overrides L{xmlstream.Xmlstream.send} to use the default namespace 598 of the stream header when serializing L{domish.IElement}s. It is 599 assumed that if you pass an object that provides L{domish.IElement}, 600 it represents a direct child of the stream's root element. 601 """ 602 if domish.IElement.providedBy(obj): 603 obj = obj.toXml(prefixes=self.prefixes, 604 defaultUri=self.namespace, 605 prefixesInScope=self.prefixes.values()) 606 607 xmlstream.XmlStream.send(self, obj) 608 609 610 def connectionMade(self): 611 """ 612 Called when a connection is made. 613 614 Notifies the authenticator when a connection has been made. 615 """ 616 xmlstream.XmlStream.connectionMade(self) 617 self.authenticator.connectionMade() 618 619 620 def onDocumentStart(self, rootElement): 621 """ 622 Called when the stream header has been received. 623 624 Extracts the header's C{id} and C{version} attributes from the root 625 element. The C{id} attribute is stored in our C{sid} attribute and the 626 C{version} attribute is parsed and the minimum of the version we sent 627 and the parsed C{version} attribute is stored as a tuple (major, minor) 628 in this class' C{version} attribute. If no C{version} attribute was 629 present, we assume version 0.0. 630 631 If appropriate (we are the initiating stream and the minimum of our and 632 the other party's version is at least 1.0), a one-time observer is 633 registered for getting the stream features. The registered function is 634 C{onFeatures}. 635 636 Ultimately, the authenticator's C{streamStarted} method will be called. 637 638 @param rootElement: The root element. 639 @type rootElement: L{domish.Element} 640 """ 641 xmlstream.XmlStream.onDocumentStart(self, rootElement) 642 643 # Setup observer for stream errors 644 self.addOnetimeObserver("/error[@xmlns='%s']" % NS_STREAMS, 645 self.onStreamError) 646 647 self.authenticator.streamStarted(rootElement) 648 649 650 651class XmlStreamFactory(xmlstream.XmlStreamFactory): 652 """ 653 Factory for Jabber XmlStream objects as a reconnecting client. 654 655 Note that this differs from L{xmlstream.XmlStreamFactory} in that 656 it generates Jabber specific L{XmlStream} instances that have 657 authenticators. 658 """ 659 660 protocol = XmlStream 661 662 def __init__(self, authenticator): 663 xmlstream.XmlStreamFactory.__init__(self, authenticator) 664 self.authenticator = authenticator 665 666 667 668class XmlStreamServerFactory(xmlstream.BootstrapMixin, 669 protocol.ServerFactory): 670 """ 671 Factory for Jabber XmlStream objects as a server. 672 673 @since: 8.2. 674 @ivar authenticatorFactory: Factory callable that takes no arguments, to 675 create a fresh authenticator to be associated 676 with the XmlStream. 677 """ 678 679 protocol = XmlStream 680 681 def __init__(self, authenticatorFactory): 682 xmlstream.BootstrapMixin.__init__(self) 683 self.authenticatorFactory = authenticatorFactory 684 685 686 def buildProtocol(self, addr): 687 """ 688 Create an instance of XmlStream. 689 690 A new authenticator instance will be created and passed to the new 691 XmlStream. Registered bootstrap event observers are installed as well. 692 """ 693 authenticator = self.authenticatorFactory() 694 xs = self.protocol(authenticator) 695 xs.factory = self 696 self.installBootstraps(xs) 697 return xs 698 699 700 701class TimeoutError(Exception): 702 """ 703 Exception raised when no IQ response has been received before the 704 configured timeout. 705 """ 706 707 708 709def upgradeWithIQResponseTracker(xs): 710 """ 711 Enhances an XmlStream for iq response tracking. 712 713 This makes an L{XmlStream} object provide L{IIQResponseTracker}. When a 714 response is an error iq stanza, the deferred has its errback invoked with a 715 failure that holds a L{StanzaException<error.StanzaException>} that is 716 easier to examine. 717 """ 718 def callback(iq): 719 """ 720 Handle iq response by firing associated deferred. 721 """ 722 if getattr(iq, 'handled', False): 723 return 724 725 try: 726 d = xs.iqDeferreds[iq["id"]] 727 except KeyError: 728 pass 729 else: 730 del xs.iqDeferreds[iq["id"]] 731 iq.handled = True 732 if iq['type'] == 'error': 733 d.errback(error.exceptionFromStanza(iq)) 734 else: 735 d.callback(iq) 736 737 738 def disconnected(_): 739 """ 740 Make sure deferreds do not linger on after disconnect. 741 742 This errbacks all deferreds of iq's for which no response has been 743 received with a L{ConnectionLost} failure. Otherwise, the deferreds 744 will never be fired. 745 """ 746 iqDeferreds = xs.iqDeferreds 747 xs.iqDeferreds = {} 748 for d in iqDeferreds.itervalues(): 749 d.errback(ConnectionLost()) 750 751 xs.iqDeferreds = {} 752 xs.iqDefaultTimeout = getattr(xs, 'iqDefaultTimeout', None) 753 xs.addObserver(xmlstream.STREAM_END_EVENT, disconnected) 754 xs.addObserver('/iq[@type="result"]', callback) 755 xs.addObserver('/iq[@type="error"]', callback) 756 directlyProvides(xs, ijabber.IIQResponseTracker) 757 758 759 760class IQ(domish.Element): 761 """ 762 Wrapper for an iq stanza. 763 764 Iq stanzas are used for communications with a request-response behaviour. 765 Each iq request is associated with an XML stream and has its own unique id 766 to be able to track the response. 767 768 @ivar timeout: if set, a timeout period after which the deferred returned 769 by C{send} will have its errback called with a 770 L{TimeoutError} failure. 771 @type timeout: C{float} 772 """ 773 774 timeout = None 775 776 def __init__(self, xmlstream, stanzaType="set"): 777 """ 778 @type xmlstream: L{xmlstream.XmlStream} 779 @param xmlstream: XmlStream to use for transmission of this IQ 780 781 @type stanzaType: C{str} 782 @param stanzaType: IQ type identifier ('get' or 'set') 783 """ 784 domish.Element.__init__(self, (None, "iq")) 785 self.addUniqueId() 786 self["type"] = stanzaType 787 self._xmlstream = xmlstream 788 789 790 def send(self, to=None): 791 """ 792 Send out this iq. 793 794 Returns a deferred that is fired when an iq response with the same id 795 is received. Result responses will be passed to the deferred callback. 796 Error responses will be transformed into a 797 L{StanzaError<error.StanzaError>} and result in the errback of the 798 deferred being invoked. 799 800 @rtype: L{defer.Deferred} 801 """ 802 if to is not None: 803 self["to"] = to 804 805 if not ijabber.IIQResponseTracker.providedBy(self._xmlstream): 806 upgradeWithIQResponseTracker(self._xmlstream) 807 808 d = defer.Deferred() 809 self._xmlstream.iqDeferreds[self['id']] = d 810 811 timeout = self.timeout or self._xmlstream.iqDefaultTimeout 812 if timeout is not None: 813 def onTimeout(): 814 del self._xmlstream.iqDeferreds[self['id']] 815 d.errback(TimeoutError("IQ timed out")) 816 817 call = self._xmlstream._callLater(timeout, onTimeout) 818 819 def cancelTimeout(result): 820 if call.active(): 821 call.cancel() 822 823 return result 824 825 d.addBoth(cancelTimeout) 826 827 self._xmlstream.send(self) 828 return d 829 830 831 832def toResponse(stanza, stanzaType=None): 833 """ 834 Create a response stanza from another stanza. 835 836 This takes the addressing and id attributes from a stanza to create a (new, 837 empty) response stanza. The addressing attributes are swapped and the id 838 copied. Optionally, the stanza type of the response can be specified. 839 840 @param stanza: the original stanza 841 @type stanza: L{domish.Element} 842 @param stanzaType: optional response stanza type 843 @type stanzaType: C{str} 844 @return: the response stanza. 845 @rtype: L{domish.Element} 846 """ 847 848 toAddr = stanza.getAttribute('from') 849 fromAddr = stanza.getAttribute('to') 850 stanzaID = stanza.getAttribute('id') 851 852 response = domish.Element((None, stanza.name)) 853 if toAddr: 854 response['to'] = toAddr 855 if fromAddr: 856 response['from'] = fromAddr 857 if stanzaID: 858 response['id'] = stanzaID 859 if stanzaType: 860 response['type'] = stanzaType 861 862 return response 863 864 865 866class XMPPHandler(object): 867 """ 868 XMPP protocol handler. 869 870 Classes derived from this class implement (part of) one or more XMPP 871 extension protocols, and are referred to as a subprotocol implementation. 872 """ 873 874 implements(ijabber.IXMPPHandler) 875 876 def __init__(self): 877 self.parent = None 878 self.xmlstream = None 879 880 881 def setHandlerParent(self, parent): 882 self.parent = parent 883 self.parent.addHandler(self) 884 885 886 def disownHandlerParent(self, parent): 887 self.parent.removeHandler(self) 888 self.parent = None 889 890 891 def makeConnection(self, xs): 892 self.xmlstream = xs 893 self.connectionMade() 894 895 896 def connectionMade(self): 897 """ 898 Called after a connection has been established. 899 900 Can be overridden to perform work before stream initialization. 901 """ 902 903 904 def connectionInitialized(self): 905 """ 906 The XML stream has been initialized. 907 908 Can be overridden to perform work after stream initialization, e.g. to 909 set up observers and start exchanging XML stanzas. 910 """ 911 912 913 def connectionLost(self, reason): 914 """ 915 The XML stream has been closed. 916 917 This method can be extended to inspect the C{reason} argument and 918 act on it. 919 """ 920 self.xmlstream = None 921 922 923 def send(self, obj): 924 """ 925 Send data over the managed XML stream. 926 927 @note: The stream manager maintains a queue for data sent using this 928 method when there is no current initialized XML stream. This 929 data is then sent as soon as a new stream has been established 930 and initialized. Subsequently, L{connectionInitialized} will be 931 called again. If this queueing is not desired, use C{send} on 932 C{self.xmlstream}. 933 934 @param obj: data to be sent over the XML stream. This is usually an 935 object providing L{domish.IElement}, or serialized XML. See 936 L{xmlstream.XmlStream} for details. 937 """ 938 self.parent.send(obj) 939 940 941 942class XMPPHandlerCollection(object): 943 """ 944 Collection of XMPP subprotocol handlers. 945 946 This allows for grouping of subprotocol handlers, but is not an 947 L{XMPPHandler} itself, so this is not recursive. 948 949 @ivar handlers: List of protocol handlers. 950 @type handlers: C{list} of objects providing 951 L{IXMPPHandler} 952 """ 953 954 implements(ijabber.IXMPPHandlerCollection) 955 956 def __init__(self): 957 self.handlers = [] 958 959 960 def __iter__(self): 961 """ 962 Act as a container for handlers. 963 """ 964 return iter(self.handlers) 965 966 967 def addHandler(self, handler): 968 """ 969 Add protocol handler. 970 971 Protocol handlers are expected to provide L{ijabber.IXMPPHandler}. 972 """ 973 self.handlers.append(handler) 974 975 976 def removeHandler(self, handler): 977 """ 978 Remove protocol handler. 979 """ 980 self.handlers.remove(handler) 981 982 983 984class StreamManager(XMPPHandlerCollection): 985 """ 986 Business logic representing a managed XMPP connection. 987 988 This maintains a single XMPP connection and provides facilities for packet 989 routing and transmission. Business logic modules are objects providing 990 L{ijabber.IXMPPHandler} (like subclasses of L{XMPPHandler}), and added 991 using L{addHandler}. 992 993 @ivar xmlstream: currently managed XML stream 994 @type xmlstream: L{XmlStream} 995 @ivar logTraffic: if true, log all traffic. 996 @type logTraffic: C{bool} 997 @ivar _initialized: Whether the stream represented by L{xmlstream} has 998 been initialized. This is used when caching outgoing 999 stanzas. 1000 @type _initialized: C{bool} 1001 @ivar _packetQueue: internal buffer of unsent data. See L{send} for details. 1002 @type _packetQueue: C{list} 1003 """ 1004 1005 logTraffic = False 1006 1007 def __init__(self, factory): 1008 XMPPHandlerCollection.__init__(self) 1009 self.xmlstream = None 1010 self._packetQueue = [] 1011 self._initialized = False 1012 1013 factory.addBootstrap(STREAM_CONNECTED_EVENT, self._connected) 1014 factory.addBootstrap(STREAM_AUTHD_EVENT, self._authd) 1015 factory.addBootstrap(INIT_FAILED_EVENT, self.initializationFailed) 1016 factory.addBootstrap(STREAM_END_EVENT, self._disconnected) 1017 self.factory = factory 1018 1019 1020 def addHandler(self, handler): 1021 """ 1022 Add protocol handler. 1023 1024 When an XML stream has already been established, the handler's 1025 C{connectionInitialized} will be called to get it up to speed. 1026 """ 1027 XMPPHandlerCollection.addHandler(self, handler) 1028 1029 # get protocol handler up to speed when a connection has already 1030 # been established 1031 if self.xmlstream and self._initialized: 1032 handler.makeConnection(self.xmlstream) 1033 handler.connectionInitialized() 1034 1035 1036 def _connected(self, xs): 1037 """ 1038 Called when the transport connection has been established. 1039 1040 Here we optionally set up traffic logging (depending on L{logTraffic}) 1041 and call each handler's C{makeConnection} method with the L{XmlStream} 1042 instance. 1043 """ 1044 def logDataIn(buf): 1045 log.msg("RECV: %r" % buf) 1046 1047 def logDataOut(buf): 1048 log.msg("SEND: %r" % buf) 1049 1050 if self.logTraffic: 1051 xs.rawDataInFn = logDataIn 1052 xs.rawDataOutFn = logDataOut 1053 1054 self.xmlstream = xs 1055 1056 for e in self: 1057 e.makeConnection(xs) 1058 1059 1060 def _authd(self, xs): 1061 """ 1062 Called when the stream has been initialized. 1063 1064 Send out cached stanzas and call each handler's 1065 C{connectionInitialized} method. 1066 """ 1067 # Flush all pending packets 1068 for p in self._packetQueue: 1069 xs.send(p) 1070 self._packetQueue = [] 1071 self._initialized = True 1072 1073 # Notify all child services which implement 1074 # the IService interface 1075 for e in self: 1076 e.connectionInitialized() 1077 1078 1079 def initializationFailed(self, reason): 1080 """ 1081 Called when stream initialization has failed. 1082 1083 Stream initialization has halted, with the reason indicated by 1084 C{reason}. It may be retried by calling the authenticator's 1085 C{initializeStream}. See the respective authenticators for details. 1086 1087 @param reason: A failure instance indicating why stream initialization 1088 failed. 1089 @type reason: L{failure.Failure} 1090 """ 1091 1092 1093 def _disconnected(self, reason): 1094 """ 1095 Called when the stream has been closed. 1096 1097 From this point on, the manager doesn't interact with the 1098 L{XmlStream} anymore and notifies each handler that the connection 1099 was lost by calling its C{connectionLost} method. 1100 """ 1101 self.xmlstream = None 1102 self._initialized = False 1103 1104 # Notify all child services which implement 1105 # the IService interface 1106 for e in self: 1107 e.connectionLost(reason) 1108 1109 1110 def send(self, obj): 1111 """ 1112 Send data over the XML stream. 1113 1114 When there is no established XML stream, the data is queued and sent 1115 out when a new XML stream has been established and initialized. 1116 1117 @param obj: data to be sent over the XML stream. See 1118 L{xmlstream.XmlStream.send} for details. 1119 """ 1120 if self._initialized: 1121 self.xmlstream.send(obj) 1122 else: 1123 self._packetQueue.append(obj) 1124 1125 1126 1127__all__ = ['Authenticator', 'BaseFeatureInitiatingInitializer', 1128 'ConnectAuthenticator', 'FeatureNotAdvertized', 1129 'INIT_FAILED_EVENT', 'IQ', 'ListenAuthenticator', 'NS_STREAMS', 1130 'NS_XMPP_TLS', 'Reset', 'STREAM_AUTHD_EVENT', 1131 'STREAM_CONNECTED_EVENT', 'STREAM_END_EVENT', 'STREAM_ERROR_EVENT', 1132 'STREAM_START_EVENT', 'StreamManager', 'TLSError', 'TLSFailed', 1133 'TLSInitiatingInitializer', 'TLSNotSupported', 'TLSRequired', 1134 'TimeoutError', 'XMPPHandler', 'XMPPHandlerCollection', 'XmlStream', 1135 'XmlStreamFactory', 'XmlStreamServerFactory', 'hashPassword', 1136 'toResponse', 'upgradeWithIQResponseTracker'] 1137