1# -*- test-case-name: twisted.test.test_factories,twisted.internet.test.test_protocol -*- 2# Copyright (c) Twisted Matrix Laboratories. 3# See LICENSE for details. 4 5""" 6Standard implementations of Twisted protocol-related interfaces. 7 8Start here if you are looking to write a new protocol implementation for 9Twisted. The Protocol class contains some introductory material. 10""" 11 12from __future__ import division, absolute_import 13 14import random 15from zope.interface import implementer 16 17from twisted.python import log, failure, components 18from twisted.internet import interfaces, error, defer 19 20 21@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext) 22class Factory: 23 """ 24 This is a factory which produces protocols. 25 26 By default, buildProtocol will create a protocol of the class given in 27 self.protocol. 28 """ 29 30 # put a subclass of Protocol here: 31 protocol = None 32 33 numPorts = 0 34 noisy = True 35 36 @classmethod 37 def forProtocol(cls, protocol, *args, **kwargs): 38 """ 39 Create a factory for the given protocol. 40 41 It sets the C{protocol} attribute and returns the constructed factory 42 instance. 43 44 @param protocol: A L{Protocol} subclass 45 46 @param args: Positional arguments for the factory. 47 48 @param kwargs: Keyword arguments for the factory. 49 50 @return: A L{Factory} instance wired up to C{protocol}. 51 """ 52 factory = cls(*args, **kwargs) 53 factory.protocol = protocol 54 return factory 55 56 57 def logPrefix(self): 58 """ 59 Describe this factory for log messages. 60 """ 61 return self.__class__.__name__ 62 63 64 def doStart(self): 65 """Make sure startFactory is called. 66 67 Users should not call this function themselves! 68 """ 69 if not self.numPorts: 70 if self.noisy: 71 log.msg("Starting factory %r" % self) 72 self.startFactory() 73 self.numPorts = self.numPorts + 1 74 75 def doStop(self): 76 """Make sure stopFactory is called. 77 78 Users should not call this function themselves! 79 """ 80 if self.numPorts == 0: 81 # this shouldn't happen, but does sometimes and this is better 82 # than blowing up in assert as we did previously. 83 return 84 self.numPorts = self.numPorts - 1 85 if not self.numPorts: 86 if self.noisy: 87 log.msg("Stopping factory %r" % self) 88 self.stopFactory() 89 90 def startFactory(self): 91 """This will be called before I begin listening on a Port or Connector. 92 93 It will only be called once, even if the factory is connected 94 to multiple ports. 95 96 This can be used to perform 'unserialization' tasks that 97 are best put off until things are actually running, such 98 as connecting to a database, opening files, etcetera. 99 """ 100 101 def stopFactory(self): 102 """This will be called before I stop listening on all Ports/Connectors. 103 104 This can be overridden to perform 'shutdown' tasks such as disconnecting 105 database connections, closing files, etc. 106 107 It will be called, for example, before an application shuts down, 108 if it was connected to a port. User code should not call this function 109 directly. 110 """ 111 112 def buildProtocol(self, addr): 113 """Create an instance of a subclass of Protocol. 114 115 The returned instance will handle input on an incoming server 116 connection, and an attribute \"factory\" pointing to the creating 117 factory. 118 119 Override this method to alter how Protocol instances get created. 120 121 @param addr: an object implementing L{twisted.internet.interfaces.IAddress} 122 """ 123 p = self.protocol() 124 p.factory = self 125 return p 126 127 128class ClientFactory(Factory): 129 """A Protocol factory for clients. 130 131 This can be used together with the various connectXXX methods in 132 reactors. 133 """ 134 135 def startedConnecting(self, connector): 136 """Called when a connection has been started. 137 138 You can call connector.stopConnecting() to stop the connection attempt. 139 140 @param connector: a Connector object. 141 """ 142 143 def clientConnectionFailed(self, connector, reason): 144 """Called when a connection has failed to connect. 145 146 It may be useful to call connector.connect() - this will reconnect. 147 148 @type reason: L{twisted.python.failure.Failure} 149 """ 150 151 def clientConnectionLost(self, connector, reason): 152 """Called when an established connection is lost. 153 154 It may be useful to call connector.connect() - this will reconnect. 155 156 @type reason: L{twisted.python.failure.Failure} 157 """ 158 159 160class _InstanceFactory(ClientFactory): 161 """ 162 Factory used by ClientCreator. 163 164 @ivar deferred: The L{Deferred} which represents this connection attempt and 165 which will be fired when it succeeds or fails. 166 167 @ivar pending: After a connection attempt succeeds or fails, a delayed call 168 which will fire the L{Deferred} representing this connection attempt. 169 """ 170 171 noisy = False 172 pending = None 173 174 def __init__(self, reactor, instance, deferred): 175 self.reactor = reactor 176 self.instance = instance 177 self.deferred = deferred 178 179 180 def __repr__(self): 181 return "<ClientCreator factory: %r>" % (self.instance, ) 182 183 184 def buildProtocol(self, addr): 185 """ 186 Return the pre-constructed protocol instance and arrange to fire the 187 waiting L{Deferred} to indicate success establishing the connection. 188 """ 189 self.pending = self.reactor.callLater( 190 0, self.fire, self.deferred.callback, self.instance) 191 self.deferred = None 192 return self.instance 193 194 195 def clientConnectionFailed(self, connector, reason): 196 """ 197 Arrange to fire the waiting L{Deferred} with the given failure to 198 indicate the connection could not be established. 199 """ 200 self.pending = self.reactor.callLater( 201 0, self.fire, self.deferred.errback, reason) 202 self.deferred = None 203 204 205 def fire(self, func, value): 206 """ 207 Clear C{self.pending} to avoid a reference cycle and then invoke func 208 with the value. 209 """ 210 self.pending = None 211 func(value) 212 213 214 215class ClientCreator: 216 """ 217 Client connections that do not require a factory. 218 219 The various connect* methods create a protocol instance using the given 220 protocol class and arguments, and connect it, returning a Deferred of the 221 resulting protocol instance. 222 223 Useful for cases when we don't really need a factory. Mainly this 224 is when there is no shared state between protocol instances, and no need 225 to reconnect. 226 227 The C{connectTCP}, C{connectUNIX}, and C{connectSSL} methods each return a 228 L{Deferred} which will fire with an instance of the protocol class passed to 229 L{ClientCreator.__init__}. These Deferred can be cancelled to abort the 230 connection attempt (in a very unlikely case, cancelling the Deferred may not 231 prevent the protocol from being instantiated and connected to a transport; 232 if this happens, it will be disconnected immediately afterwards and the 233 Deferred will still errback with L{CancelledError}). 234 """ 235 236 def __init__(self, reactor, protocolClass, *args, **kwargs): 237 self.reactor = reactor 238 self.protocolClass = protocolClass 239 self.args = args 240 self.kwargs = kwargs 241 242 243 def _connect(self, method, *args, **kwargs): 244 """ 245 Initiate a connection attempt. 246 247 @param method: A callable which will actually start the connection 248 attempt. For example, C{reactor.connectTCP}. 249 250 @param *args: Positional arguments to pass to C{method}, excluding the 251 factory. 252 253 @param **kwargs: Keyword arguments to pass to C{method}. 254 255 @return: A L{Deferred} which fires with an instance of the protocol 256 class passed to this L{ClientCreator}'s initializer or fails if the 257 connection cannot be set up for some reason. 258 """ 259 def cancelConnect(deferred): 260 connector.disconnect() 261 if f.pending is not None: 262 f.pending.cancel() 263 d = defer.Deferred(cancelConnect) 264 f = _InstanceFactory( 265 self.reactor, self.protocolClass(*self.args, **self.kwargs), d) 266 connector = method(factory=f, *args, **kwargs) 267 return d 268 269 270 def connectTCP(self, host, port, timeout=30, bindAddress=None): 271 """ 272 Connect to a TCP server. 273 274 The parameters are all the same as to L{IReactorTCP.connectTCP} except 275 that the factory parameter is omitted. 276 277 @return: A L{Deferred} which fires with an instance of the protocol 278 class passed to this L{ClientCreator}'s initializer or fails if the 279 connection cannot be set up for some reason. 280 """ 281 return self._connect( 282 self.reactor.connectTCP, host, port, timeout=timeout, 283 bindAddress=bindAddress) 284 285 286 def connectUNIX(self, address, timeout=30, checkPID=False): 287 """ 288 Connect to a Unix socket. 289 290 The parameters are all the same as to L{IReactorUNIX.connectUNIX} except 291 that the factory parameter is omitted. 292 293 @return: A L{Deferred} which fires with an instance of the protocol 294 class passed to this L{ClientCreator}'s initializer or fails if the 295 connection cannot be set up for some reason. 296 """ 297 return self._connect( 298 self.reactor.connectUNIX, address, timeout=timeout, 299 checkPID=checkPID) 300 301 302 def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None): 303 """ 304 Connect to an SSL server. 305 306 The parameters are all the same as to L{IReactorSSL.connectSSL} except 307 that the factory parameter is omitted. 308 309 @return: A L{Deferred} which fires with an instance of the protocol 310 class passed to this L{ClientCreator}'s initializer or fails if the 311 connection cannot be set up for some reason. 312 """ 313 return self._connect( 314 self.reactor.connectSSL, host, port, 315 contextFactory=contextFactory, timeout=timeout, 316 bindAddress=bindAddress) 317 318 319 320class ReconnectingClientFactory(ClientFactory): 321 """ 322 Factory which auto-reconnects clients with an exponential back-off. 323 324 Note that clients should call my resetDelay method after they have 325 connected successfully. 326 327 @ivar maxDelay: Maximum number of seconds between connection attempts. 328 @ivar initialDelay: Delay for the first reconnection attempt. 329 @ivar factor: A multiplicitive factor by which the delay grows 330 @ivar jitter: Percentage of randomness to introduce into the delay length 331 to prevent stampeding. 332 @ivar clock: The clock used to schedule reconnection. It's mainly useful to 333 be parametrized in tests. If the factory is serialized, this attribute 334 will not be serialized, and the default value (the reactor) will be 335 restored when deserialized. 336 @type clock: L{IReactorTime} 337 @ivar maxRetries: Maximum number of consecutive unsuccessful connection 338 attempts, after which no further connection attempts will be made. If 339 this is not explicitly set, no maximum is applied. 340 """ 341 maxDelay = 3600 342 initialDelay = 1.0 343 # Note: These highly sensitive factors have been precisely measured by 344 # the National Institute of Science and Technology. Take extreme care 345 # in altering them, or you may damage your Internet! 346 # (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>) 347 factor = 2.7182818284590451 # (math.e) 348 # Phi = 1.6180339887498948 # (Phi is acceptable for use as a 349 # factor if e is too large for your application.) 350 jitter = 0.11962656472 # molar Planck constant times c, joule meter/mole 351 352 delay = initialDelay 353 retries = 0 354 maxRetries = None 355 _callID = None 356 connector = None 357 clock = None 358 359 continueTrying = 1 360 361 362 def clientConnectionFailed(self, connector, reason): 363 if self.continueTrying: 364 self.connector = connector 365 self.retry() 366 367 368 def clientConnectionLost(self, connector, unused_reason): 369 if self.continueTrying: 370 self.connector = connector 371 self.retry() 372 373 374 def retry(self, connector=None): 375 """ 376 Have this connector connect again, after a suitable delay. 377 """ 378 if not self.continueTrying: 379 if self.noisy: 380 log.msg("Abandoning %s on explicit request" % (connector,)) 381 return 382 383 if connector is None: 384 if self.connector is None: 385 raise ValueError("no connector to retry") 386 else: 387 connector = self.connector 388 389 self.retries += 1 390 if self.maxRetries is not None and (self.retries > self.maxRetries): 391 if self.noisy: 392 log.msg("Abandoning %s after %d retries." % 393 (connector, self.retries)) 394 return 395 396 self.delay = min(self.delay * self.factor, self.maxDelay) 397 if self.jitter: 398 self.delay = random.normalvariate(self.delay, 399 self.delay * self.jitter) 400 401 if self.noisy: 402 log.msg("%s will retry in %d seconds" % (connector, self.delay,)) 403 404 def reconnector(): 405 self._callID = None 406 connector.connect() 407 if self.clock is None: 408 from twisted.internet import reactor 409 self.clock = reactor 410 self._callID = self.clock.callLater(self.delay, reconnector) 411 412 413 def stopTrying(self): 414 """ 415 Put a stop to any attempt to reconnect in progress. 416 """ 417 # ??? Is this function really stopFactory? 418 if self._callID: 419 self._callID.cancel() 420 self._callID = None 421 self.continueTrying = 0 422 if self.connector: 423 try: 424 self.connector.stopConnecting() 425 except error.NotConnectingError: 426 pass 427 428 429 def resetDelay(self): 430 """ 431 Call this method after a successful connection: it resets the delay and 432 the retry counter. 433 """ 434 self.delay = self.initialDelay 435 self.retries = 0 436 self._callID = None 437 self.continueTrying = 1 438 439 440 def __getstate__(self): 441 """ 442 Remove all of the state which is mutated by connection attempts and 443 failures, returning just the state which describes how reconnections 444 should be attempted. This will make the unserialized instance 445 behave just as this one did when it was first instantiated. 446 """ 447 state = self.__dict__.copy() 448 for key in ['connector', 'retries', 'delay', 449 'continueTrying', '_callID', 'clock']: 450 if key in state: 451 del state[key] 452 return state 453 454 455 456class ServerFactory(Factory): 457 """Subclass this to indicate that your protocol.Factory is only usable for servers. 458 """ 459 460 461 462class BaseProtocol: 463 """ 464 This is the abstract superclass of all protocols. 465 466 Some methods have helpful default implementations here so that they can 467 easily be shared, but otherwise the direct subclasses of this class are more 468 interesting, L{Protocol} and L{ProcessProtocol}. 469 """ 470 connected = 0 471 transport = None 472 473 def makeConnection(self, transport): 474 """Make a connection to a transport and a server. 475 476 This sets the 'transport' attribute of this Protocol, and calls the 477 connectionMade() callback. 478 """ 479 self.connected = 1 480 self.transport = transport 481 self.connectionMade() 482 483 def connectionMade(self): 484 """Called when a connection is made. 485 486 This may be considered the initializer of the protocol, because 487 it is called when the connection is completed. For clients, 488 this is called once the connection to the server has been 489 established; for servers, this is called after an accept() call 490 stops blocking and a socket has been received. If you need to 491 send any greeting or initial message, do it here. 492 """ 493 494connectionDone=failure.Failure(error.ConnectionDone()) 495connectionDone.cleanFailure() 496 497 498@implementer(interfaces.IProtocol, interfaces.ILoggingContext) 499class Protocol(BaseProtocol): 500 """ 501 This is the base class for streaming connection-oriented protocols. 502 503 If you are going to write a new connection-oriented protocol for Twisted, 504 start here. Any protocol implementation, either client or server, should 505 be a subclass of this class. 506 507 The API is quite simple. Implement L{dataReceived} to handle both 508 event-based and synchronous input; output can be sent through the 509 'transport' attribute, which is to be an instance that implements 510 L{twisted.internet.interfaces.ITransport}. Override C{connectionLost} to be 511 notified when the connection ends. 512 513 Some subclasses exist already to help you write common types of protocols: 514 see the L{twisted.protocols.basic} module for a few of them. 515 """ 516 517 def logPrefix(self): 518 """ 519 Return a prefix matching the class name, to identify log messages 520 related to this protocol instance. 521 """ 522 return self.__class__.__name__ 523 524 525 def dataReceived(self, data): 526 """Called whenever data is received. 527 528 Use this method to translate to a higher-level message. Usually, some 529 callback will be made upon the receipt of each complete protocol 530 message. 531 532 @param data: a string of indeterminate length. Please keep in mind 533 that you will probably need to buffer some data, as partial 534 (or multiple) protocol messages may be received! I recommend 535 that unit tests for protocols call through to this method with 536 differing chunk sizes, down to one byte at a time. 537 """ 538 539 def connectionLost(self, reason=connectionDone): 540 """Called when the connection is shut down. 541 542 Clear any circular references here, and any external references 543 to this Protocol. The connection has been closed. 544 545 @type reason: L{twisted.python.failure.Failure} 546 """ 547 548 549@implementer(interfaces.IConsumer) 550class ProtocolToConsumerAdapter(components.Adapter): 551 552 def write(self, data): 553 self.original.dataReceived(data) 554 555 def registerProducer(self, producer, streaming): 556 pass 557 558 def unregisterProducer(self): 559 pass 560 561components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol, 562 interfaces.IConsumer) 563 564@implementer(interfaces.IProtocol) 565class ConsumerToProtocolAdapter(components.Adapter): 566 567 def dataReceived(self, data): 568 self.original.write(data) 569 570 def connectionLost(self, reason): 571 pass 572 573 def makeConnection(self, transport): 574 pass 575 576 def connectionMade(self): 577 pass 578 579components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer, 580 interfaces.IProtocol) 581 582@implementer(interfaces.IProcessProtocol) 583class ProcessProtocol(BaseProtocol): 584 """ 585 Base process protocol implementation which does simple dispatching for 586 stdin, stdout, and stderr file descriptors. 587 """ 588 589 def childDataReceived(self, childFD, data): 590 if childFD == 1: 591 self.outReceived(data) 592 elif childFD == 2: 593 self.errReceived(data) 594 595 596 def outReceived(self, data): 597 """ 598 Some data was received from stdout. 599 """ 600 601 602 def errReceived(self, data): 603 """ 604 Some data was received from stderr. 605 """ 606 607 608 def childConnectionLost(self, childFD): 609 if childFD == 0: 610 self.inConnectionLost() 611 elif childFD == 1: 612 self.outConnectionLost() 613 elif childFD == 2: 614 self.errConnectionLost() 615 616 617 def inConnectionLost(self): 618 """ 619 This will be called when stdin is closed. 620 """ 621 622 623 def outConnectionLost(self): 624 """ 625 This will be called when stdout is closed. 626 """ 627 628 629 def errConnectionLost(self): 630 """ 631 This will be called when stderr is closed. 632 """ 633 634 635 def processExited(self, reason): 636 """ 637 This will be called when the subprocess exits. 638 639 @type reason: L{twisted.python.failure.Failure} 640 """ 641 642 643 def processEnded(self, reason): 644 """ 645 Called when the child process exits and all file descriptors 646 associated with it have been closed. 647 648 @type reason: L{twisted.python.failure.Failure} 649 """ 650 651 652 653class AbstractDatagramProtocol: 654 """ 655 Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP. 656 """ 657 658 transport = None 659 numPorts = 0 660 noisy = True 661 662 def __getstate__(self): 663 d = self.__dict__.copy() 664 d['transport'] = None 665 return d 666 667 def doStart(self): 668 """Make sure startProtocol is called. 669 670 This will be called by makeConnection(), users should not call it. 671 """ 672 if not self.numPorts: 673 if self.noisy: 674 log.msg("Starting protocol %s" % self) 675 self.startProtocol() 676 self.numPorts = self.numPorts + 1 677 678 def doStop(self): 679 """Make sure stopProtocol is called. 680 681 This will be called by the port, users should not call it. 682 """ 683 assert self.numPorts > 0 684 self.numPorts = self.numPorts - 1 685 self.transport = None 686 if not self.numPorts: 687 if self.noisy: 688 log.msg("Stopping protocol %s" % self) 689 self.stopProtocol() 690 691 def startProtocol(self): 692 """Called when a transport is connected to this protocol. 693 694 Will only be called once, even if multiple ports are connected. 695 """ 696 697 def stopProtocol(self): 698 """Called when the transport is disconnected. 699 700 Will only be called once, after all ports are disconnected. 701 """ 702 703 def makeConnection(self, transport): 704 """Make a connection to a transport and a server. 705 706 This sets the 'transport' attribute of this DatagramProtocol, and calls the 707 doStart() callback. 708 """ 709 assert self.transport == None 710 self.transport = transport 711 self.doStart() 712 713 def datagramReceived(self, datagram, addr): 714 """Called when a datagram is received. 715 716 @param datagram: the string received from the transport. 717 @param addr: tuple of source of datagram. 718 """ 719 720 721@implementer(interfaces.ILoggingContext) 722class DatagramProtocol(AbstractDatagramProtocol): 723 """ 724 Protocol for datagram-oriented transport, e.g. UDP. 725 726 @type transport: C{NoneType} or 727 L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider 728 @ivar transport: The transport with which this protocol is associated, 729 if it is associated with one. 730 """ 731 732 def logPrefix(self): 733 """ 734 Return a prefix matching the class name, to identify log messages 735 related to this protocol instance. 736 """ 737 return self.__class__.__name__ 738 739 740 def connectionRefused(self): 741 """Called due to error from write in connected mode. 742 743 Note this is a result of ICMP message generated by *previous* 744 write. 745 """ 746 747 748class ConnectedDatagramProtocol(DatagramProtocol): 749 """Protocol for connected datagram-oriented transport. 750 751 No longer necessary for UDP. 752 """ 753 754 def datagramReceived(self, datagram): 755 """Called when a datagram is received. 756 757 @param datagram: the string received from the transport. 758 """ 759 760 def connectionFailed(self, failure): 761 """Called if connecting failed. 762 763 Usually this will be due to a DNS lookup failure. 764 """ 765 766 767 768@implementer(interfaces.ITransport) 769class FileWrapper: 770 """A wrapper around a file-like object to make it behave as a Transport. 771 772 This doesn't actually stream the file to the attached protocol, 773 and is thus useful mainly as a utility for debugging protocols. 774 """ 775 776 closed = 0 777 disconnecting = 0 778 producer = None 779 streamingProducer = 0 780 781 def __init__(self, file): 782 self.file = file 783 784 def write(self, data): 785 try: 786 self.file.write(data) 787 except: 788 self.handleException() 789 # self._checkProducer() 790 791 def _checkProducer(self): 792 # Cheating; this is called at "idle" times to allow producers to be 793 # found and dealt with 794 if self.producer: 795 self.producer.resumeProducing() 796 797 def registerProducer(self, producer, streaming): 798 """From abstract.FileDescriptor 799 """ 800 self.producer = producer 801 self.streamingProducer = streaming 802 if not streaming: 803 producer.resumeProducing() 804 805 def unregisterProducer(self): 806 self.producer = None 807 808 def stopConsuming(self): 809 self.unregisterProducer() 810 self.loseConnection() 811 812 def writeSequence(self, iovec): 813 self.write("".join(iovec)) 814 815 def loseConnection(self): 816 self.closed = 1 817 try: 818 self.file.close() 819 except (IOError, OSError): 820 self.handleException() 821 822 def getPeer(self): 823 # XXX: According to ITransport, this should return an IAddress! 824 return 'file', 'file' 825 826 def getHost(self): 827 # XXX: According to ITransport, this should return an IAddress! 828 return 'file' 829 830 def handleException(self): 831 pass 832 833 def resumeProducing(self): 834 # Never sends data anyways 835 pass 836 837 def pauseProducing(self): 838 # Never sends data anyways 839 pass 840 841 def stopProducing(self): 842 self.loseConnection() 843 844 845__all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionDone", 846 "Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory", 847 "AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramProtocol", 848 "ClientCreator"] 849