1# -*- test-case-name: twisted.test.test_factories,twisted.internet.test.test_protocol -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Standard implementations of Twisted protocol-related interfaces.
7
8Start here if you are looking to write a new protocol implementation for
9Twisted.  The Protocol class contains some introductory material.
10"""
11
12
13import random
14from typing import Callable, Optional
15
16from zope.interface import implementer
17
18from twisted.internet import defer, error, interfaces
19from twisted.internet.interfaces import IAddress, ITransport
20from twisted.logger import _loggerFor
21from twisted.python import components, failure, log
22
23
24@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext)
25class Factory:
26    """
27    This is a factory which produces protocols.
28
29    By default, buildProtocol will create a protocol of the class given in
30    self.protocol.
31    """
32
33    protocol: "Optional[Callable[[], Protocol]]" = None
34
35    numPorts = 0
36    noisy = True
37
38    @classmethod
39    def forProtocol(cls, protocol, *args, **kwargs):
40        """
41        Create a factory for the given protocol.
42
43        It sets the C{protocol} attribute and returns the constructed factory
44        instance.
45
46        @param protocol: A L{Protocol} subclass
47
48        @param args: Positional arguments for the factory.
49
50        @param kwargs: Keyword arguments for the factory.
51
52        @return: A L{Factory} instance wired up to C{protocol}.
53        """
54        factory = cls(*args, **kwargs)
55        factory.protocol = protocol
56        return factory
57
58    def logPrefix(self):
59        """
60        Describe this factory for log messages.
61        """
62        return self.__class__.__name__
63
64    def doStart(self):
65        """
66        Make sure startFactory is called.
67
68        Users should not call this function themselves!
69        """
70        if not self.numPorts:
71            if self.noisy:
72                _loggerFor(self).info("Starting factory {factory!r}", factory=self)
73            self.startFactory()
74        self.numPorts = self.numPorts + 1
75
76    def doStop(self):
77        """
78        Make sure stopFactory is called.
79
80        Users should not call this function themselves!
81        """
82        if self.numPorts == 0:
83            # This shouldn't happen, but does sometimes and this is better
84            # than blowing up in assert as we did previously.
85            return
86        self.numPorts = self.numPorts - 1
87        if not self.numPorts:
88            if self.noisy:
89                _loggerFor(self).info("Stopping factory {factory!r}", factory=self)
90            self.stopFactory()
91
92    def startFactory(self):
93        """
94        This will be called before I begin listening on a Port or Connector.
95
96        It will only be called once, even if the factory is connected
97        to multiple ports.
98
99        This can be used to perform 'unserialization' tasks that
100        are best put off until things are actually running, such
101        as connecting to a database, opening files, etcetera.
102        """
103
104    def stopFactory(self):
105        """
106        This will be called before I stop listening on all Ports/Connectors.
107
108        This can be overridden to perform 'shutdown' tasks such as disconnecting
109        database connections, closing files, etc.
110
111        It will be called, for example, before an application shuts down,
112        if it was connected to a port. User code should not call this function
113        directly.
114        """
115
116    def buildProtocol(self, addr: IAddress) -> "Optional[Protocol]":
117        """
118        Create an instance of a subclass of Protocol.
119
120        The returned instance will handle input on an incoming server
121        connection, and an attribute "factory" pointing to the creating
122        factory.
123
124        Alternatively, L{None} may be returned to immediately close the
125        new connection.
126
127        Override this method to alter how Protocol instances get created.
128
129        @param addr: an object implementing L{IAddress}
130        """
131        assert self.protocol is not None
132        p = self.protocol()
133        p.factory = self
134        return p
135
136
137class ClientFactory(Factory):
138    """
139    A Protocol factory for clients.
140
141    This can be used together with the various connectXXX methods in
142    reactors.
143    """
144
145    def startedConnecting(self, connector):
146        """
147        Called when a connection has been started.
148
149        You can call connector.stopConnecting() to stop the connection attempt.
150
151        @param connector: a Connector object.
152        """
153
154    def clientConnectionFailed(self, connector, reason):
155        """
156        Called when a connection has failed to connect.
157
158        It may be useful to call connector.connect() - this will reconnect.
159
160        @type reason: L{twisted.python.failure.Failure}
161        """
162
163    def clientConnectionLost(self, connector, reason):
164        """
165        Called when an established connection is lost.
166
167        It may be useful to call connector.connect() - this will reconnect.
168
169        @type reason: L{twisted.python.failure.Failure}
170        """
171
172
173class _InstanceFactory(ClientFactory):
174    """
175    Factory used by ClientCreator.
176
177    @ivar deferred: The L{Deferred} which represents this connection attempt and
178        which will be fired when it succeeds or fails.
179
180    @ivar pending: After a connection attempt succeeds or fails, a delayed call
181        which will fire the L{Deferred} representing this connection attempt.
182    """
183
184    noisy = False
185    pending = None
186
187    def __init__(self, reactor, instance, deferred):
188        self.reactor = reactor
189        self.instance = instance
190        self.deferred = deferred
191
192    def __repr__(self) -> str:
193        return f"<ClientCreator factory: {self.instance!r}>"
194
195    def buildProtocol(self, addr):
196        """
197        Return the pre-constructed protocol instance and arrange to fire the
198        waiting L{Deferred} to indicate success establishing the connection.
199        """
200        self.pending = self.reactor.callLater(
201            0, self.fire, self.deferred.callback, self.instance
202        )
203        self.deferred = None
204        return self.instance
205
206    def clientConnectionFailed(self, connector, reason):
207        """
208        Arrange to fire the waiting L{Deferred} with the given failure to
209        indicate the connection could not be established.
210        """
211        self.pending = self.reactor.callLater(
212            0, self.fire, self.deferred.errback, reason
213        )
214        self.deferred = None
215
216    def fire(self, func, value):
217        """
218        Clear C{self.pending} to avoid a reference cycle and then invoke func
219        with the value.
220        """
221        self.pending = None
222        func(value)
223
224
225class ClientCreator:
226    """
227    Client connections that do not require a factory.
228
229    The various connect* methods create a protocol instance using the given
230    protocol class and arguments, and connect it, returning a Deferred of the
231    resulting protocol instance.
232
233    Useful for cases when we don't really need a factory.  Mainly this
234    is when there is no shared state between protocol instances, and no need
235    to reconnect.
236
237    The C{connectTCP}, C{connectUNIX}, and C{connectSSL} methods each return a
238    L{Deferred} which will fire with an instance of the protocol class passed to
239    L{ClientCreator.__init__}.  These Deferred can be cancelled to abort the
240    connection attempt (in a very unlikely case, cancelling the Deferred may not
241    prevent the protocol from being instantiated and connected to a transport;
242    if this happens, it will be disconnected immediately afterwards and the
243    Deferred will still errback with L{CancelledError}).
244    """
245
246    def __init__(self, reactor, protocolClass, *args, **kwargs):
247        self.reactor = reactor
248        self.protocolClass = protocolClass
249        self.args = args
250        self.kwargs = kwargs
251
252    def _connect(self, method, *args, **kwargs):
253        """
254        Initiate a connection attempt.
255
256        @param method: A callable which will actually start the connection
257            attempt.  For example, C{reactor.connectTCP}.
258
259        @param args: Positional arguments to pass to C{method}, excluding the
260            factory.
261
262        @param kwargs: Keyword arguments to pass to C{method}.
263
264        @return: A L{Deferred} which fires with an instance of the protocol
265            class passed to this L{ClientCreator}'s initializer or fails if the
266            connection cannot be set up for some reason.
267        """
268
269        def cancelConnect(deferred):
270            connector.disconnect()
271            if f.pending is not None:
272                f.pending.cancel()
273
274        d = defer.Deferred(cancelConnect)
275        f = _InstanceFactory(
276            self.reactor, self.protocolClass(*self.args, **self.kwargs), d
277        )
278        connector = method(factory=f, *args, **kwargs)
279        return d
280
281    def connectTCP(self, host, port, timeout=30, bindAddress=None):
282        """
283        Connect to a TCP server.
284
285        The parameters are all the same as to L{IReactorTCP.connectTCP} except
286        that the factory parameter is omitted.
287
288        @return: A L{Deferred} which fires with an instance of the protocol
289            class passed to this L{ClientCreator}'s initializer or fails if the
290            connection cannot be set up for some reason.
291        """
292        return self._connect(
293            self.reactor.connectTCP,
294            host,
295            port,
296            timeout=timeout,
297            bindAddress=bindAddress,
298        )
299
300    def connectUNIX(self, address, timeout=30, checkPID=False):
301        """
302        Connect to a Unix socket.
303
304        The parameters are all the same as to L{IReactorUNIX.connectUNIX} except
305        that the factory parameter is omitted.
306
307        @return: A L{Deferred} which fires with an instance of the protocol
308            class passed to this L{ClientCreator}'s initializer or fails if the
309            connection cannot be set up for some reason.
310        """
311        return self._connect(
312            self.reactor.connectUNIX, address, timeout=timeout, checkPID=checkPID
313        )
314
315    def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None):
316        """
317        Connect to an SSL server.
318
319        The parameters are all the same as to L{IReactorSSL.connectSSL} except
320        that the factory parameter is omitted.
321
322        @return: A L{Deferred} which fires with an instance of the protocol
323            class passed to this L{ClientCreator}'s initializer or fails if the
324            connection cannot be set up for some reason.
325        """
326        return self._connect(
327            self.reactor.connectSSL,
328            host,
329            port,
330            contextFactory=contextFactory,
331            timeout=timeout,
332            bindAddress=bindAddress,
333        )
334
335
336class ReconnectingClientFactory(ClientFactory):
337    """
338    Factory which auto-reconnects clients with an exponential back-off.
339
340    Note that clients should call my resetDelay method after they have
341    connected successfully.
342
343    @ivar maxDelay: Maximum number of seconds between connection attempts.
344    @ivar initialDelay: Delay for the first reconnection attempt.
345    @ivar factor: A multiplicitive factor by which the delay grows
346    @ivar jitter: Percentage of randomness to introduce into the delay length
347        to prevent stampeding.
348    @ivar clock: The clock used to schedule reconnection. It's mainly useful to
349        be parametrized in tests. If the factory is serialized, this attribute
350        will not be serialized, and the default value (the reactor) will be
351        restored when deserialized.
352    @type clock: L{IReactorTime}
353    @ivar maxRetries: Maximum number of consecutive unsuccessful connection
354        attempts, after which no further connection attempts will be made. If
355        this is not explicitly set, no maximum is applied.
356    """
357
358    maxDelay = 3600
359    initialDelay = 1.0
360    # Note: These highly sensitive factors have been precisely measured by
361    # the National Institute of Science and Technology.  Take extreme care
362    # in altering them, or you may damage your Internet!
363    # (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>)
364    factor = 2.7182818284590451  # (math.e)
365    # Phi = 1.6180339887498948 # (Phi is acceptable for use as a
366    # factor if e is too large for your application.)
367
368    # This is the value of the molar Planck constant times c, joule
369    # meter/mole.  The value is attributable to
370    # https://physics.nist.gov/cgi-bin/cuu/Value?nahc|search_for=molar+planck+constant+times+c
371    jitter = 0.119626565582
372
373    delay = initialDelay
374    retries = 0
375    maxRetries = None
376    _callID = None
377    connector = None
378    clock = None
379
380    continueTrying = 1
381
382    def clientConnectionFailed(self, connector, reason):
383        if self.continueTrying:
384            self.connector = connector
385            self.retry()
386
387    def clientConnectionLost(self, connector, unused_reason):
388        if self.continueTrying:
389            self.connector = connector
390            self.retry()
391
392    def retry(self, connector=None):
393        """
394        Have this connector connect again, after a suitable delay.
395        """
396        if not self.continueTrying:
397            if self.noisy:
398                log.msg(f"Abandoning {connector} on explicit request")
399            return
400
401        if connector is None:
402            if self.connector is None:
403                raise ValueError("no connector to retry")
404            else:
405                connector = self.connector
406
407        self.retries += 1
408        if self.maxRetries is not None and (self.retries > self.maxRetries):
409            if self.noisy:
410                log.msg("Abandoning %s after %d retries." % (connector, self.retries))
411            return
412
413        self.delay = min(self.delay * self.factor, self.maxDelay)
414        if self.jitter:
415            self.delay = random.normalvariate(self.delay, self.delay * self.jitter)
416
417        if self.noisy:
418            log.msg(
419                "%s will retry in %d seconds"
420                % (
421                    connector,
422                    self.delay,
423                )
424            )
425
426        def reconnector():
427            self._callID = None
428            connector.connect()
429
430        if self.clock is None:
431            from twisted.internet import reactor
432
433            self.clock = reactor
434        self._callID = self.clock.callLater(self.delay, reconnector)
435
436    def stopTrying(self):
437        """
438        Put a stop to any attempt to reconnect in progress.
439        """
440        # ??? Is this function really stopFactory?
441        if self._callID:
442            self._callID.cancel()
443            self._callID = None
444        self.continueTrying = 0
445        if self.connector:
446            try:
447                self.connector.stopConnecting()
448            except error.NotConnectingError:
449                pass
450
451    def resetDelay(self):
452        """
453        Call this method after a successful connection: it resets the delay and
454        the retry counter.
455        """
456        self.delay = self.initialDelay
457        self.retries = 0
458        self._callID = None
459        self.continueTrying = 1
460
461    def __getstate__(self):
462        """
463        Remove all of the state which is mutated by connection attempts and
464        failures, returning just the state which describes how reconnections
465        should be attempted.  This will make the unserialized instance
466        behave just as this one did when it was first instantiated.
467        """
468        state = self.__dict__.copy()
469        for key in [
470            "connector",
471            "retries",
472            "delay",
473            "continueTrying",
474            "_callID",
475            "clock",
476        ]:
477            if key in state:
478                del state[key]
479        return state
480
481
482class ServerFactory(Factory):
483    """
484    Subclass this to indicate that your protocol.Factory is only usable for servers.
485    """
486
487
488class BaseProtocol:
489    """
490    This is the abstract superclass of all protocols.
491
492    Some methods have helpful default implementations here so that they can
493    easily be shared, but otherwise the direct subclasses of this class are more
494    interesting, L{Protocol} and L{ProcessProtocol}.
495    """
496
497    connected = 0
498    transport: Optional[ITransport] = None
499
500    def makeConnection(self, transport):
501        """
502        Make a connection to a transport and a server.
503
504        This sets the 'transport' attribute of this Protocol, and calls the
505        connectionMade() callback.
506        """
507        self.connected = 1
508        self.transport = transport
509        self.connectionMade()
510
511    def connectionMade(self):
512        """
513        Called when a connection is made.
514
515        This may be considered the initializer of the protocol, because
516        it is called when the connection is completed.  For clients,
517        this is called once the connection to the server has been
518        established; for servers, this is called after an accept() call
519        stops blocking and a socket has been received.  If you need to
520        send any greeting or initial message, do it here.
521        """
522
523
524connectionDone = failure.Failure(error.ConnectionDone())
525connectionDone.cleanFailure()
526
527
528@implementer(interfaces.IProtocol, interfaces.ILoggingContext)
529class Protocol(BaseProtocol):
530    """
531    This is the base class for streaming connection-oriented protocols.
532
533    If you are going to write a new connection-oriented protocol for Twisted,
534    start here.  Any protocol implementation, either client or server, should
535    be a subclass of this class.
536
537    The API is quite simple.  Implement L{dataReceived} to handle both
538    event-based and synchronous input; output can be sent through the
539    'transport' attribute, which is to be an instance that implements
540    L{twisted.internet.interfaces.ITransport}.  Override C{connectionLost} to be
541    notified when the connection ends.
542
543    Some subclasses exist already to help you write common types of protocols:
544    see the L{twisted.protocols.basic} module for a few of them.
545    """
546
547    factory: Optional[Factory] = None
548
549    def logPrefix(self):
550        """
551        Return a prefix matching the class name, to identify log messages
552        related to this protocol instance.
553        """
554        return self.__class__.__name__
555
556    def dataReceived(self, data: bytes):
557        """
558        Called whenever data is received.
559
560        Use this method to translate to a higher-level message.  Usually, some
561        callback will be made upon the receipt of each complete protocol
562        message.
563
564        @param data: a string of indeterminate length.  Please keep in mind
565            that you will probably need to buffer some data, as partial
566            (or multiple) protocol messages may be received!  I recommend
567            that unit tests for protocols call through to this method with
568            differing chunk sizes, down to one byte at a time.
569        """
570
571    def connectionLost(self, reason: failure.Failure = connectionDone):
572        """
573        Called when the connection is shut down.
574
575        Clear any circular references here, and any external references
576        to this Protocol.  The connection has been closed.
577
578        @type reason: L{twisted.python.failure.Failure}
579        """
580
581
582@implementer(interfaces.IConsumer)
583class ProtocolToConsumerAdapter(components.Adapter):
584    def write(self, data: bytes):
585        self.original.dataReceived(data)
586
587    def registerProducer(self, producer, streaming):
588        pass
589
590    def unregisterProducer(self):
591        pass
592
593
594components.registerAdapter(
595    ProtocolToConsumerAdapter, interfaces.IProtocol, interfaces.IConsumer
596)
597
598
599@implementer(interfaces.IProtocol)
600class ConsumerToProtocolAdapter(components.Adapter):
601    def dataReceived(self, data: bytes):
602        self.original.write(data)
603
604    def connectionLost(self, reason: failure.Failure):
605        pass
606
607    def makeConnection(self, transport):
608        pass
609
610    def connectionMade(self):
611        pass
612
613
614components.registerAdapter(
615    ConsumerToProtocolAdapter, interfaces.IConsumer, interfaces.IProtocol
616)
617
618
619@implementer(interfaces.IProcessProtocol)
620class ProcessProtocol(BaseProtocol):
621    """
622    Base process protocol implementation which does simple dispatching for
623    stdin, stdout, and stderr file descriptors.
624    """
625
626    def childDataReceived(self, childFD: int, data: bytes):
627        if childFD == 1:
628            self.outReceived(data)
629        elif childFD == 2:
630            self.errReceived(data)
631
632    def outReceived(self, data: bytes):
633        """
634        Some data was received from stdout.
635        """
636
637    def errReceived(self, data: bytes):
638        """
639        Some data was received from stderr.
640        """
641
642    def childConnectionLost(self, childFD: int):
643        if childFD == 0:
644            self.inConnectionLost()
645        elif childFD == 1:
646            self.outConnectionLost()
647        elif childFD == 2:
648            self.errConnectionLost()
649
650    def inConnectionLost(self):
651        """
652        This will be called when stdin is closed.
653        """
654
655    def outConnectionLost(self):
656        """
657        This will be called when stdout is closed.
658        """
659
660    def errConnectionLost(self):
661        """
662        This will be called when stderr is closed.
663        """
664
665    def processExited(self, reason: failure.Failure):
666        """
667        This will be called when the subprocess exits.
668
669        @type reason: L{twisted.python.failure.Failure}
670        """
671
672    def processEnded(self, reason: failure.Failure):
673        """
674        Called when the child process exits and all file descriptors
675        associated with it have been closed.
676
677        @type reason: L{twisted.python.failure.Failure}
678        """
679
680
681class AbstractDatagramProtocol:
682    """
683    Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP,
684    UDP.
685    """
686
687    transport = None
688    numPorts = 0
689    noisy = True
690
691    def __getstate__(self):
692        d = self.__dict__.copy()
693        d["transport"] = None
694        return d
695
696    def doStart(self):
697        """
698        Make sure startProtocol is called.
699
700        This will be called by makeConnection(), users should not call it.
701        """
702        if not self.numPorts:
703            if self.noisy:
704                log.msg("Starting protocol %s" % self)
705            self.startProtocol()
706        self.numPorts = self.numPorts + 1
707
708    def doStop(self):
709        """
710        Make sure stopProtocol is called.
711
712        This will be called by the port, users should not call it.
713        """
714        assert self.numPorts > 0
715        self.numPorts = self.numPorts - 1
716        self.transport = None
717        if not self.numPorts:
718            if self.noisy:
719                log.msg("Stopping protocol %s" % self)
720            self.stopProtocol()
721
722    def startProtocol(self):
723        """
724        Called when a transport is connected to this protocol.
725
726        Will only be called once, even if multiple ports are connected.
727        """
728
729    def stopProtocol(self):
730        """
731        Called when the transport is disconnected.
732
733        Will only be called once, after all ports are disconnected.
734        """
735
736    def makeConnection(self, transport):
737        """
738        Make a connection to a transport and a server.
739
740        This sets the 'transport' attribute of this DatagramProtocol, and calls the
741        doStart() callback.
742        """
743        assert self.transport == None
744        self.transport = transport
745        self.doStart()
746
747    def datagramReceived(self, datagram: bytes, addr):
748        """
749        Called when a datagram is received.
750
751        @param datagram: the bytes received from the transport.
752        @param addr: tuple of source of datagram.
753        """
754
755
756@implementer(interfaces.ILoggingContext)
757class DatagramProtocol(AbstractDatagramProtocol):
758    """
759    Protocol for datagram-oriented transport, e.g. UDP.
760
761    @type transport: L{None} or
762        L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider
763    @ivar transport: The transport with which this protocol is associated,
764        if it is associated with one.
765    """
766
767    def logPrefix(self):
768        """
769        Return a prefix matching the class name, to identify log messages
770        related to this protocol instance.
771        """
772        return self.__class__.__name__
773
774    def connectionRefused(self):
775        """
776        Called due to error from write in connected mode.
777
778        Note this is a result of ICMP message generated by *previous*
779        write.
780        """
781
782
783class ConnectedDatagramProtocol(DatagramProtocol):
784    """
785    Protocol for connected datagram-oriented transport.
786
787    No longer necessary for UDP.
788    """
789
790    def datagramReceived(self, datagram):
791        """
792        Called when a datagram is received.
793
794        @param datagram: the string received from the transport.
795        """
796
797    def connectionFailed(self, failure: failure.Failure):
798        """
799        Called if connecting failed.
800
801        Usually this will be due to a DNS lookup failure.
802        """
803
804
805@implementer(interfaces.ITransport)
806class FileWrapper:
807    """
808    A wrapper around a file-like object to make it behave as a Transport.
809
810    This doesn't actually stream the file to the attached protocol,
811    and is thus useful mainly as a utility for debugging protocols.
812    """
813
814    closed = 0
815    disconnecting = 0
816    producer = None
817    streamingProducer = 0
818
819    def __init__(self, file):
820        self.file = file
821
822    def write(self, data: bytes):
823        try:
824            self.file.write(data)
825        except BaseException:
826            self.handleException()
827
828    def _checkProducer(self):
829        # Cheating; this is called at "idle" times to allow producers to be
830        # found and dealt with
831        if self.producer:
832            self.producer.resumeProducing()
833
834    def registerProducer(self, producer, streaming):
835        """
836        From abstract.FileDescriptor
837        """
838        self.producer = producer
839        self.streamingProducer = streaming
840        if not streaming:
841            producer.resumeProducing()
842
843    def unregisterProducer(self):
844        self.producer = None
845
846    def stopConsuming(self):
847        self.unregisterProducer()
848        self.loseConnection()
849
850    def writeSequence(self, iovec):
851        self.write(b"".join(iovec))
852
853    def loseConnection(self):
854        self.closed = 1
855        try:
856            self.file.close()
857        except OSError:
858            self.handleException()
859
860    def getPeer(self):
861        # FIXME: https://twistedmatrix.com/trac/ticket/7820
862        # According to ITransport, this should return an IAddress!
863        return "file", "file"
864
865    def getHost(self):
866        # FIXME: https://twistedmatrix.com/trac/ticket/7820
867        # According to ITransport, this should return an IAddress!
868        return "file"
869
870    def handleException(self):
871        pass
872
873    def resumeProducing(self):
874        # Never sends data anyways
875        pass
876
877    def pauseProducing(self):
878        # Never sends data anyways
879        pass
880
881    def stopProducing(self):
882        self.loseConnection()
883
884
885__all__ = [
886    "Factory",
887    "ClientFactory",
888    "ReconnectingClientFactory",
889    "connectionDone",
890    "Protocol",
891    "ProcessProtocol",
892    "FileWrapper",
893    "ServerFactory",
894    "AbstractDatagramProtocol",
895    "DatagramProtocol",
896    "ConnectedDatagramProtocol",
897    "ClientCreator",
898]
899