1# -*- test-case-name: twisted.test.test_factories,twisted.internet.test.test_protocol -*-
2# Copyright (c) Twisted Matrix Laboratories.
3# See LICENSE for details.
4
5"""
6Standard implementations of Twisted protocol-related interfaces.
7
8Start here if you are looking to write a new protocol implementation for
9Twisted.  The Protocol class contains some introductory material.
10"""
11
12from __future__ import division, absolute_import
13
14import random
15from zope.interface import implementer
16
17from twisted.python import log, failure, components
18from twisted.internet import interfaces, error, defer
19
20
21@implementer(interfaces.IProtocolFactory, interfaces.ILoggingContext)
22class Factory:
23    """
24    This is a factory which produces protocols.
25
26    By default, buildProtocol will create a protocol of the class given in
27    self.protocol.
28    """
29
30    # put a subclass of Protocol here:
31    protocol = None
32
33    numPorts = 0
34    noisy = True
35
36    @classmethod
37    def forProtocol(cls, protocol, *args, **kwargs):
38        """
39        Create a factory for the given protocol.
40
41        It sets the C{protocol} attribute and returns the constructed factory
42        instance.
43
44        @param protocol: A L{Protocol} subclass
45
46        @param args: Positional arguments for the factory.
47
48        @param kwargs: Keyword arguments for the factory.
49
50        @return: A L{Factory} instance wired up to C{protocol}.
51        """
52        factory = cls(*args, **kwargs)
53        factory.protocol = protocol
54        return factory
55
56
57    def logPrefix(self):
58        """
59        Describe this factory for log messages.
60        """
61        return self.__class__.__name__
62
63
64    def doStart(self):
65        """Make sure startFactory is called.
66
67        Users should not call this function themselves!
68        """
69        if not self.numPorts:
70            if self.noisy:
71                log.msg("Starting factory %r" % self)
72            self.startFactory()
73        self.numPorts = self.numPorts + 1
74
75    def doStop(self):
76        """Make sure stopFactory is called.
77
78        Users should not call this function themselves!
79        """
80        if self.numPorts == 0:
81            # this shouldn't happen, but does sometimes and this is better
82            # than blowing up in assert as we did previously.
83            return
84        self.numPorts = self.numPorts - 1
85        if not self.numPorts:
86            if self.noisy:
87                log.msg("Stopping factory %r" % self)
88            self.stopFactory()
89
90    def startFactory(self):
91        """This will be called before I begin listening on a Port or Connector.
92
93        It will only be called once, even if the factory is connected
94        to multiple ports.
95
96        This can be used to perform 'unserialization' tasks that
97        are best put off until things are actually running, such
98        as connecting to a database, opening files, etcetera.
99        """
100
101    def stopFactory(self):
102        """This will be called before I stop listening on all Ports/Connectors.
103
104        This can be overridden to perform 'shutdown' tasks such as disconnecting
105        database connections, closing files, etc.
106
107        It will be called, for example, before an application shuts down,
108        if it was connected to a port. User code should not call this function
109        directly.
110        """
111
112    def buildProtocol(self, addr):
113        """Create an instance of a subclass of Protocol.
114
115        The returned instance will handle input on an incoming server
116        connection, and an attribute \"factory\" pointing to the creating
117        factory.
118
119        Override this method to alter how Protocol instances get created.
120
121        @param addr: an object implementing L{twisted.internet.interfaces.IAddress}
122        """
123        p = self.protocol()
124        p.factory = self
125        return p
126
127
128class ClientFactory(Factory):
129    """A Protocol factory for clients.
130
131    This can be used together with the various connectXXX methods in
132    reactors.
133    """
134
135    def startedConnecting(self, connector):
136        """Called when a connection has been started.
137
138        You can call connector.stopConnecting() to stop the connection attempt.
139
140        @param connector: a Connector object.
141        """
142
143    def clientConnectionFailed(self, connector, reason):
144        """Called when a connection has failed to connect.
145
146        It may be useful to call connector.connect() - this will reconnect.
147
148        @type reason: L{twisted.python.failure.Failure}
149        """
150
151    def clientConnectionLost(self, connector, reason):
152        """Called when an established connection is lost.
153
154        It may be useful to call connector.connect() - this will reconnect.
155
156        @type reason: L{twisted.python.failure.Failure}
157        """
158
159
160class _InstanceFactory(ClientFactory):
161    """
162    Factory used by ClientCreator.
163
164    @ivar deferred: The L{Deferred} which represents this connection attempt and
165        which will be fired when it succeeds or fails.
166
167    @ivar pending: After a connection attempt succeeds or fails, a delayed call
168        which will fire the L{Deferred} representing this connection attempt.
169    """
170
171    noisy = False
172    pending = None
173
174    def __init__(self, reactor, instance, deferred):
175        self.reactor = reactor
176        self.instance = instance
177        self.deferred = deferred
178
179
180    def __repr__(self):
181        return "<ClientCreator factory: %r>" % (self.instance, )
182
183
184    def buildProtocol(self, addr):
185        """
186        Return the pre-constructed protocol instance and arrange to fire the
187        waiting L{Deferred} to indicate success establishing the connection.
188        """
189        self.pending = self.reactor.callLater(
190            0, self.fire, self.deferred.callback, self.instance)
191        self.deferred = None
192        return self.instance
193
194
195    def clientConnectionFailed(self, connector, reason):
196        """
197        Arrange to fire the waiting L{Deferred} with the given failure to
198        indicate the connection could not be established.
199        """
200        self.pending = self.reactor.callLater(
201            0, self.fire, self.deferred.errback, reason)
202        self.deferred = None
203
204
205    def fire(self, func, value):
206        """
207        Clear C{self.pending} to avoid a reference cycle and then invoke func
208        with the value.
209        """
210        self.pending = None
211        func(value)
212
213
214
215class ClientCreator:
216    """
217    Client connections that do not require a factory.
218
219    The various connect* methods create a protocol instance using the given
220    protocol class and arguments, and connect it, returning a Deferred of the
221    resulting protocol instance.
222
223    Useful for cases when we don't really need a factory.  Mainly this
224    is when there is no shared state between protocol instances, and no need
225    to reconnect.
226
227    The C{connectTCP}, C{connectUNIX}, and C{connectSSL} methods each return a
228    L{Deferred} which will fire with an instance of the protocol class passed to
229    L{ClientCreator.__init__}.  These Deferred can be cancelled to abort the
230    connection attempt (in a very unlikely case, cancelling the Deferred may not
231    prevent the protocol from being instantiated and connected to a transport;
232    if this happens, it will be disconnected immediately afterwards and the
233    Deferred will still errback with L{CancelledError}).
234    """
235
236    def __init__(self, reactor, protocolClass, *args, **kwargs):
237        self.reactor = reactor
238        self.protocolClass = protocolClass
239        self.args = args
240        self.kwargs = kwargs
241
242
243    def _connect(self, method, *args, **kwargs):
244        """
245        Initiate a connection attempt.
246
247        @param method: A callable which will actually start the connection
248            attempt.  For example, C{reactor.connectTCP}.
249
250        @param *args: Positional arguments to pass to C{method}, excluding the
251            factory.
252
253        @param **kwargs: Keyword arguments to pass to C{method}.
254
255        @return: A L{Deferred} which fires with an instance of the protocol
256            class passed to this L{ClientCreator}'s initializer or fails if the
257            connection cannot be set up for some reason.
258        """
259        def cancelConnect(deferred):
260            connector.disconnect()
261            if f.pending is not None:
262                f.pending.cancel()
263        d = defer.Deferred(cancelConnect)
264        f = _InstanceFactory(
265            self.reactor, self.protocolClass(*self.args, **self.kwargs), d)
266        connector = method(factory=f, *args, **kwargs)
267        return d
268
269
270    def connectTCP(self, host, port, timeout=30, bindAddress=None):
271        """
272        Connect to a TCP server.
273
274        The parameters are all the same as to L{IReactorTCP.connectTCP} except
275        that the factory parameter is omitted.
276
277        @return: A L{Deferred} which fires with an instance of the protocol
278            class passed to this L{ClientCreator}'s initializer or fails if the
279            connection cannot be set up for some reason.
280        """
281        return self._connect(
282            self.reactor.connectTCP, host, port, timeout=timeout,
283            bindAddress=bindAddress)
284
285
286    def connectUNIX(self, address, timeout=30, checkPID=False):
287        """
288        Connect to a Unix socket.
289
290        The parameters are all the same as to L{IReactorUNIX.connectUNIX} except
291        that the factory parameter is omitted.
292
293        @return: A L{Deferred} which fires with an instance of the protocol
294            class passed to this L{ClientCreator}'s initializer or fails if the
295            connection cannot be set up for some reason.
296        """
297        return self._connect(
298            self.reactor.connectUNIX, address, timeout=timeout,
299            checkPID=checkPID)
300
301
302    def connectSSL(self, host, port, contextFactory, timeout=30, bindAddress=None):
303        """
304        Connect to an SSL server.
305
306        The parameters are all the same as to L{IReactorSSL.connectSSL} except
307        that the factory parameter is omitted.
308
309        @return: A L{Deferred} which fires with an instance of the protocol
310            class passed to this L{ClientCreator}'s initializer or fails if the
311            connection cannot be set up for some reason.
312        """
313        return self._connect(
314            self.reactor.connectSSL, host, port,
315            contextFactory=contextFactory, timeout=timeout,
316            bindAddress=bindAddress)
317
318
319
320class ReconnectingClientFactory(ClientFactory):
321    """
322    Factory which auto-reconnects clients with an exponential back-off.
323
324    Note that clients should call my resetDelay method after they have
325    connected successfully.
326
327    @ivar maxDelay: Maximum number of seconds between connection attempts.
328    @ivar initialDelay: Delay for the first reconnection attempt.
329    @ivar factor: A multiplicitive factor by which the delay grows
330    @ivar jitter: Percentage of randomness to introduce into the delay length
331        to prevent stampeding.
332    @ivar clock: The clock used to schedule reconnection. It's mainly useful to
333        be parametrized in tests. If the factory is serialized, this attribute
334        will not be serialized, and the default value (the reactor) will be
335        restored when deserialized.
336    @type clock: L{IReactorTime}
337    @ivar maxRetries: Maximum number of consecutive unsuccessful connection
338        attempts, after which no further connection attempts will be made. If
339        this is not explicitly set, no maximum is applied.
340    """
341    maxDelay = 3600
342    initialDelay = 1.0
343    # Note: These highly sensitive factors have been precisely measured by
344    # the National Institute of Science and Technology.  Take extreme care
345    # in altering them, or you may damage your Internet!
346    # (Seriously: <http://physics.nist.gov/cuu/Constants/index.html>)
347    factor = 2.7182818284590451 # (math.e)
348    # Phi = 1.6180339887498948 # (Phi is acceptable for use as a
349    # factor if e is too large for your application.)
350    jitter = 0.11962656472 # molar Planck constant times c, joule meter/mole
351
352    delay = initialDelay
353    retries = 0
354    maxRetries = None
355    _callID = None
356    connector = None
357    clock = None
358
359    continueTrying = 1
360
361
362    def clientConnectionFailed(self, connector, reason):
363        if self.continueTrying:
364            self.connector = connector
365            self.retry()
366
367
368    def clientConnectionLost(self, connector, unused_reason):
369        if self.continueTrying:
370            self.connector = connector
371            self.retry()
372
373
374    def retry(self, connector=None):
375        """
376        Have this connector connect again, after a suitable delay.
377        """
378        if not self.continueTrying:
379            if self.noisy:
380                log.msg("Abandoning %s on explicit request" % (connector,))
381            return
382
383        if connector is None:
384            if self.connector is None:
385                raise ValueError("no connector to retry")
386            else:
387                connector = self.connector
388
389        self.retries += 1
390        if self.maxRetries is not None and (self.retries > self.maxRetries):
391            if self.noisy:
392                log.msg("Abandoning %s after %d retries." %
393                        (connector, self.retries))
394            return
395
396        self.delay = min(self.delay * self.factor, self.maxDelay)
397        if self.jitter:
398            self.delay = random.normalvariate(self.delay,
399                                              self.delay * self.jitter)
400
401        if self.noisy:
402            log.msg("%s will retry in %d seconds" % (connector, self.delay,))
403
404        def reconnector():
405            self._callID = None
406            connector.connect()
407        if self.clock is None:
408            from twisted.internet import reactor
409            self.clock = reactor
410        self._callID = self.clock.callLater(self.delay, reconnector)
411
412
413    def stopTrying(self):
414        """
415        Put a stop to any attempt to reconnect in progress.
416        """
417        # ??? Is this function really stopFactory?
418        if self._callID:
419            self._callID.cancel()
420            self._callID = None
421        self.continueTrying = 0
422        if self.connector:
423            try:
424                self.connector.stopConnecting()
425            except error.NotConnectingError:
426                pass
427
428
429    def resetDelay(self):
430        """
431        Call this method after a successful connection: it resets the delay and
432        the retry counter.
433        """
434        self.delay = self.initialDelay
435        self.retries = 0
436        self._callID = None
437        self.continueTrying = 1
438
439
440    def __getstate__(self):
441        """
442        Remove all of the state which is mutated by connection attempts and
443        failures, returning just the state which describes how reconnections
444        should be attempted.  This will make the unserialized instance
445        behave just as this one did when it was first instantiated.
446        """
447        state = self.__dict__.copy()
448        for key in ['connector', 'retries', 'delay',
449                    'continueTrying', '_callID', 'clock']:
450            if key in state:
451                del state[key]
452        return state
453
454
455
456class ServerFactory(Factory):
457    """Subclass this to indicate that your protocol.Factory is only usable for servers.
458    """
459
460
461
462class BaseProtocol:
463    """
464    This is the abstract superclass of all protocols.
465
466    Some methods have helpful default implementations here so that they can
467    easily be shared, but otherwise the direct subclasses of this class are more
468    interesting, L{Protocol} and L{ProcessProtocol}.
469    """
470    connected = 0
471    transport = None
472
473    def makeConnection(self, transport):
474        """Make a connection to a transport and a server.
475
476        This sets the 'transport' attribute of this Protocol, and calls the
477        connectionMade() callback.
478        """
479        self.connected = 1
480        self.transport = transport
481        self.connectionMade()
482
483    def connectionMade(self):
484        """Called when a connection is made.
485
486        This may be considered the initializer of the protocol, because
487        it is called when the connection is completed.  For clients,
488        this is called once the connection to the server has been
489        established; for servers, this is called after an accept() call
490        stops blocking and a socket has been received.  If you need to
491        send any greeting or initial message, do it here.
492        """
493
494connectionDone=failure.Failure(error.ConnectionDone())
495connectionDone.cleanFailure()
496
497
498@implementer(interfaces.IProtocol, interfaces.ILoggingContext)
499class Protocol(BaseProtocol):
500    """
501    This is the base class for streaming connection-oriented protocols.
502
503    If you are going to write a new connection-oriented protocol for Twisted,
504    start here.  Any protocol implementation, either client or server, should
505    be a subclass of this class.
506
507    The API is quite simple.  Implement L{dataReceived} to handle both
508    event-based and synchronous input; output can be sent through the
509    'transport' attribute, which is to be an instance that implements
510    L{twisted.internet.interfaces.ITransport}.  Override C{connectionLost} to be
511    notified when the connection ends.
512
513    Some subclasses exist already to help you write common types of protocols:
514    see the L{twisted.protocols.basic} module for a few of them.
515    """
516
517    def logPrefix(self):
518        """
519        Return a prefix matching the class name, to identify log messages
520        related to this protocol instance.
521        """
522        return self.__class__.__name__
523
524
525    def dataReceived(self, data):
526        """Called whenever data is received.
527
528        Use this method to translate to a higher-level message.  Usually, some
529        callback will be made upon the receipt of each complete protocol
530        message.
531
532        @param data: a string of indeterminate length.  Please keep in mind
533            that you will probably need to buffer some data, as partial
534            (or multiple) protocol messages may be received!  I recommend
535            that unit tests for protocols call through to this method with
536            differing chunk sizes, down to one byte at a time.
537        """
538
539    def connectionLost(self, reason=connectionDone):
540        """Called when the connection is shut down.
541
542        Clear any circular references here, and any external references
543        to this Protocol.  The connection has been closed.
544
545        @type reason: L{twisted.python.failure.Failure}
546        """
547
548
549@implementer(interfaces.IConsumer)
550class ProtocolToConsumerAdapter(components.Adapter):
551
552    def write(self, data):
553        self.original.dataReceived(data)
554
555    def registerProducer(self, producer, streaming):
556        pass
557
558    def unregisterProducer(self):
559        pass
560
561components.registerAdapter(ProtocolToConsumerAdapter, interfaces.IProtocol,
562                           interfaces.IConsumer)
563
564@implementer(interfaces.IProtocol)
565class ConsumerToProtocolAdapter(components.Adapter):
566
567    def dataReceived(self, data):
568        self.original.write(data)
569
570    def connectionLost(self, reason):
571        pass
572
573    def makeConnection(self, transport):
574        pass
575
576    def connectionMade(self):
577        pass
578
579components.registerAdapter(ConsumerToProtocolAdapter, interfaces.IConsumer,
580                           interfaces.IProtocol)
581
582@implementer(interfaces.IProcessProtocol)
583class ProcessProtocol(BaseProtocol):
584    """
585    Base process protocol implementation which does simple dispatching for
586    stdin, stdout, and stderr file descriptors.
587    """
588
589    def childDataReceived(self, childFD, data):
590        if childFD == 1:
591            self.outReceived(data)
592        elif childFD == 2:
593            self.errReceived(data)
594
595
596    def outReceived(self, data):
597        """
598        Some data was received from stdout.
599        """
600
601
602    def errReceived(self, data):
603        """
604        Some data was received from stderr.
605        """
606
607
608    def childConnectionLost(self, childFD):
609        if childFD == 0:
610            self.inConnectionLost()
611        elif childFD == 1:
612            self.outConnectionLost()
613        elif childFD == 2:
614            self.errConnectionLost()
615
616
617    def inConnectionLost(self):
618        """
619        This will be called when stdin is closed.
620        """
621
622
623    def outConnectionLost(self):
624        """
625        This will be called when stdout is closed.
626        """
627
628
629    def errConnectionLost(self):
630        """
631        This will be called when stderr is closed.
632        """
633
634
635    def processExited(self, reason):
636        """
637        This will be called when the subprocess exits.
638
639        @type reason: L{twisted.python.failure.Failure}
640        """
641
642
643    def processEnded(self, reason):
644        """
645        Called when the child process exits and all file descriptors
646        associated with it have been closed.
647
648        @type reason: L{twisted.python.failure.Failure}
649        """
650
651
652
653class AbstractDatagramProtocol:
654    """
655    Abstract protocol for datagram-oriented transports, e.g. IP, ICMP, ARP, UDP.
656    """
657
658    transport = None
659    numPorts = 0
660    noisy = True
661
662    def __getstate__(self):
663        d = self.__dict__.copy()
664        d['transport'] = None
665        return d
666
667    def doStart(self):
668        """Make sure startProtocol is called.
669
670        This will be called by makeConnection(), users should not call it.
671        """
672        if not self.numPorts:
673            if self.noisy:
674                log.msg("Starting protocol %s" % self)
675            self.startProtocol()
676        self.numPorts = self.numPorts + 1
677
678    def doStop(self):
679        """Make sure stopProtocol is called.
680
681        This will be called by the port, users should not call it.
682        """
683        assert self.numPorts > 0
684        self.numPorts = self.numPorts - 1
685        self.transport = None
686        if not self.numPorts:
687            if self.noisy:
688                log.msg("Stopping protocol %s" % self)
689            self.stopProtocol()
690
691    def startProtocol(self):
692        """Called when a transport is connected to this protocol.
693
694        Will only be called once, even if multiple ports are connected.
695        """
696
697    def stopProtocol(self):
698        """Called when the transport is disconnected.
699
700        Will only be called once, after all ports are disconnected.
701        """
702
703    def makeConnection(self, transport):
704        """Make a connection to a transport and a server.
705
706        This sets the 'transport' attribute of this DatagramProtocol, and calls the
707        doStart() callback.
708        """
709        assert self.transport == None
710        self.transport = transport
711        self.doStart()
712
713    def datagramReceived(self, datagram, addr):
714        """Called when a datagram is received.
715
716        @param datagram: the string received from the transport.
717        @param addr: tuple of source of datagram.
718        """
719
720
721@implementer(interfaces.ILoggingContext)
722class DatagramProtocol(AbstractDatagramProtocol):
723    """
724    Protocol for datagram-oriented transport, e.g. UDP.
725
726    @type transport: C{NoneType} or
727        L{IUDPTransport<twisted.internet.interfaces.IUDPTransport>} provider
728    @ivar transport: The transport with which this protocol is associated,
729        if it is associated with one.
730    """
731
732    def logPrefix(self):
733        """
734        Return a prefix matching the class name, to identify log messages
735        related to this protocol instance.
736        """
737        return self.__class__.__name__
738
739
740    def connectionRefused(self):
741        """Called due to error from write in connected mode.
742
743        Note this is a result of ICMP message generated by *previous*
744        write.
745        """
746
747
748class ConnectedDatagramProtocol(DatagramProtocol):
749    """Protocol for connected datagram-oriented transport.
750
751    No longer necessary for UDP.
752    """
753
754    def datagramReceived(self, datagram):
755        """Called when a datagram is received.
756
757        @param datagram: the string received from the transport.
758        """
759
760    def connectionFailed(self, failure):
761        """Called if connecting failed.
762
763        Usually this will be due to a DNS lookup failure.
764        """
765
766
767
768@implementer(interfaces.ITransport)
769class FileWrapper:
770    """A wrapper around a file-like object to make it behave as a Transport.
771
772    This doesn't actually stream the file to the attached protocol,
773    and is thus useful mainly as a utility for debugging protocols.
774    """
775
776    closed = 0
777    disconnecting = 0
778    producer = None
779    streamingProducer = 0
780
781    def __init__(self, file):
782        self.file = file
783
784    def write(self, data):
785        try:
786            self.file.write(data)
787        except:
788            self.handleException()
789        # self._checkProducer()
790
791    def _checkProducer(self):
792        # Cheating; this is called at "idle" times to allow producers to be
793        # found and dealt with
794        if self.producer:
795            self.producer.resumeProducing()
796
797    def registerProducer(self, producer, streaming):
798        """From abstract.FileDescriptor
799        """
800        self.producer = producer
801        self.streamingProducer = streaming
802        if not streaming:
803            producer.resumeProducing()
804
805    def unregisterProducer(self):
806        self.producer = None
807
808    def stopConsuming(self):
809        self.unregisterProducer()
810        self.loseConnection()
811
812    def writeSequence(self, iovec):
813        self.write("".join(iovec))
814
815    def loseConnection(self):
816        self.closed = 1
817        try:
818            self.file.close()
819        except (IOError, OSError):
820            self.handleException()
821
822    def getPeer(self):
823        # XXX: According to ITransport, this should return an IAddress!
824        return 'file', 'file'
825
826    def getHost(self):
827        # XXX: According to ITransport, this should return an IAddress!
828        return 'file'
829
830    def handleException(self):
831        pass
832
833    def resumeProducing(self):
834        # Never sends data anyways
835        pass
836
837    def pauseProducing(self):
838        # Never sends data anyways
839        pass
840
841    def stopProducing(self):
842        self.loseConnection()
843
844
845__all__ = ["Factory", "ClientFactory", "ReconnectingClientFactory", "connectionDone",
846           "Protocol", "ProcessProtocol", "FileWrapper", "ServerFactory",
847           "AbstractDatagramProtocol", "DatagramProtocol", "ConnectedDatagramProtocol",
848           "ClientCreator"]
849