1# -*- test-case-name: foolscap.test.test_reconnector -*-
2
3import random
4import time
5from twisted.internet import reactor
6from twisted.python import log
7from foolscap.tokens import NegotiationError, RemoteNegotiationError
8
9class ReconnectionInfo:
10    def __init__(self):
11        self.state = "unstarted"
12        self.connectionInfo = None
13        self.lastAttempt = None
14        self.nextAttempt = None
15
16    def _set_state(self, state):
17        self.state = state # unstarted, connecting, connected, waiting
18    def _set_connection_info(self, connectionInfo):
19        self.connectionInfo = connectionInfo
20    def _set_last_attempt(self, when):
21        self.lastAttempt = when
22    def _set_next_attempt(self, when):
23        self.nextAttempt = when
24
25
26class Reconnector(object):
27    """Establish (and maintain) a connection to a given PBURL.
28
29    I establish a connection to the PBURL and run a callback to inform the
30    caller about the newly-available RemoteReference. If the connection is
31    lost, I schedule a reconnection attempt for the near future. If that one
32    fails, I keep trying at longer and longer intervals (exponential
33    backoff).
34
35    My constructor accepts a callback which will be fired each time a
36    connection attempt succeeds. This callback is run with the new
37    RemoteReference and any additional args/kwargs provided to me. The
38    callback should then use rref.notifyOnDisconnect() to get a message when
39    the connection goes away. At some point after it goes away, the
40    Reconnector will reconnect.
41
42    When you no longer want to maintain this connection, call my
43    stopConnecting() method. I promise to not invoke your callback after
44    you've called stopConnecting(), even if there was already a connection
45    attempt in progress. If you had an active connection before calling
46    stopConnecting(), you will still have access to it, until it breaks on
47    its own. (I will not attempt to break existing connections, I will merely
48    stop trying to create new ones).
49    """
50
51    # adapted from twisted.internet.protocol.ReconnectingClientFactory
52    maxDelay = 3600
53    initialDelay = 1.0
54    # Note: These highly sensitive factors have been precisely measured by
55    # the National Institute of Science and Technology.  Take extreme care
56    # in altering them, or you may damage your Internet!
57    factor = 2.7182818284590451 # (math.e)
58    # Phi = 1.6180339887498948 # (Phi is acceptable for use as a
59    # factor if e is too large for your application.)
60    jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole
61    verbose = False
62
63    def __init__(self, url, cb, args, kwargs):
64        self._url = url
65        self._active = False
66        self._observer = (cb, args, kwargs)
67        self._delay = self.initialDelay
68        self._timer = None
69        self._tub = None
70        self._last_failure = None
71        self._reconnectionInfo = ReconnectionInfo()
72
73    def startConnecting(self, tub):
74        self._tub = tub
75        if self.verbose:
76            log.msg("Reconnector starting for %s" % self._url)
77        self._active = True
78        self._connect()
79
80    def stopConnecting(self):
81        if self.verbose:
82            log.msg("Reconnector stopping for %s" % self._url)
83        self._active = False
84        if self._timer:
85            self._timer.cancel()
86            self._timer = False
87        if self._tub:
88            self._tub._removeReconnector(self)
89
90    def reset(self):
91        """Reset the connection timer and try again very soon."""
92        self._delay = self.initialDelay
93        if self._timer:
94            self._timer.reset(1.0)
95
96    def getDelayUntilNextAttempt(self):
97        if not self._timer:
98            return None
99        return self._timer.getTime() - time.time()
100
101    def getLastFailure(self):
102        return self._last_failure
103
104    def getReconnectionInfo(self):
105        return self._reconnectionInfo
106
107    def _connect(self):
108        self._reconnectionInfo._set_state("connecting")
109        self._reconnectionInfo._set_last_attempt(time.time())
110        d = self._tub.getReference(self._url)
111        ci = self._tub.getConnectionInfoForFURL(self._url)
112        self._reconnectionInfo._set_connection_info(ci)
113        d.addCallbacks(self._connected, self._failed)
114
115    def _connected(self, rref):
116        if not self._active:
117            return
118        self._reconnectionInfo._set_state("connected")
119        ci = self._tub.getConnectionInfoForFURL(self._url)
120        self._reconnectionInfo._set_connection_info(ci)
121        self._last_failure = None
122        rref.notifyOnDisconnect(self._disconnected)
123        cb, args, kwargs = self._observer
124        cb(rref, *args, **kwargs)
125
126    def _failed(self, f):
127        self._last_failure = f
128        ci = getattr(f, "_connectionInfo", None)
129        if ci:
130            self._reconnectionInfo._set_connection_info(ci)
131
132        # I'd like to quietly handle "normal" problems (basically TCP
133        # failures and NegotiationErrors that result from the target either
134        # not speaking Foolscap or not hosting the Tub that we want), but not
135        # hide coding errors or version mismatches.
136        log_it = self.verbose
137
138        # log certain unusual errors, even without self.verbose, to help
139        # people figure out why their reconnectors aren't connecting, since
140        # the usual getReference errback chain isn't active. This doesn't
141        # include ConnectError (which is a parent class of
142        # ConnectionRefusedError), so it won't fire if we just had a bad
143        # host/port, since the way we use connection hints will provoke that
144        # all the time.
145        if f.check(RemoteNegotiationError, NegotiationError):
146            log_it = True
147        if log_it:
148            log.msg("Reconnector._failed (furl=%s): %s" % (self._url, f))
149        if not self._active:
150            return
151        self._delay = min(self._delay * self.factor, self.maxDelay)
152        if self.jitter:
153            self._delay = random.normalvariate(self._delay,
154                                               self._delay * self.jitter)
155        self._retry()
156
157    def _disconnected(self):
158        self._delay = self.initialDelay
159        self._retry()
160
161    def _retry(self):
162        if not self._active:
163            return
164        if self.verbose:
165            log.msg("Reconnector scheduling retry in %ds for %s" %
166                    (self._delay, self._url))
167        self._reconnectionInfo._set_state("waiting")
168        self._reconnectionInfo._set_next_attempt(time.time() + self._delay)
169        self._timer = reactor.callLater(self._delay, self._timer_expired)
170
171    def _timer_expired(self):
172        self._timer = None
173        self._connect()
174
175