1# -*- test-case-name: foolscap.test.test_reconnector -*- 2 3import random 4import time 5from twisted.internet import reactor 6from twisted.python import log 7from foolscap.tokens import NegotiationError, RemoteNegotiationError 8 9class ReconnectionInfo: 10 def __init__(self): 11 self.state = "unstarted" 12 self.connectionInfo = None 13 self.lastAttempt = None 14 self.nextAttempt = None 15 16 def _set_state(self, state): 17 self.state = state # unstarted, connecting, connected, waiting 18 def _set_connection_info(self, connectionInfo): 19 self.connectionInfo = connectionInfo 20 def _set_last_attempt(self, when): 21 self.lastAttempt = when 22 def _set_next_attempt(self, when): 23 self.nextAttempt = when 24 25 26class Reconnector(object): 27 """Establish (and maintain) a connection to a given PBURL. 28 29 I establish a connection to the PBURL and run a callback to inform the 30 caller about the newly-available RemoteReference. If the connection is 31 lost, I schedule a reconnection attempt for the near future. If that one 32 fails, I keep trying at longer and longer intervals (exponential 33 backoff). 34 35 My constructor accepts a callback which will be fired each time a 36 connection attempt succeeds. This callback is run with the new 37 RemoteReference and any additional args/kwargs provided to me. The 38 callback should then use rref.notifyOnDisconnect() to get a message when 39 the connection goes away. At some point after it goes away, the 40 Reconnector will reconnect. 41 42 When you no longer want to maintain this connection, call my 43 stopConnecting() method. I promise to not invoke your callback after 44 you've called stopConnecting(), even if there was already a connection 45 attempt in progress. If you had an active connection before calling 46 stopConnecting(), you will still have access to it, until it breaks on 47 its own. (I will not attempt to break existing connections, I will merely 48 stop trying to create new ones). 49 """ 50 51 # adapted from twisted.internet.protocol.ReconnectingClientFactory 52 maxDelay = 3600 53 initialDelay = 1.0 54 # Note: These highly sensitive factors have been precisely measured by 55 # the National Institute of Science and Technology. Take extreme care 56 # in altering them, or you may damage your Internet! 57 factor = 2.7182818284590451 # (math.e) 58 # Phi = 1.6180339887498948 # (Phi is acceptable for use as a 59 # factor if e is too large for your application.) 60 jitter = 0.11962656492 # molar Planck constant times c, Joule meter/mole 61 verbose = False 62 63 def __init__(self, url, cb, args, kwargs): 64 self._url = url 65 self._active = False 66 self._observer = (cb, args, kwargs) 67 self._delay = self.initialDelay 68 self._timer = None 69 self._tub = None 70 self._last_failure = None 71 self._reconnectionInfo = ReconnectionInfo() 72 73 def startConnecting(self, tub): 74 self._tub = tub 75 if self.verbose: 76 log.msg("Reconnector starting for %s" % self._url) 77 self._active = True 78 self._connect() 79 80 def stopConnecting(self): 81 if self.verbose: 82 log.msg("Reconnector stopping for %s" % self._url) 83 self._active = False 84 if self._timer: 85 self._timer.cancel() 86 self._timer = False 87 if self._tub: 88 self._tub._removeReconnector(self) 89 90 def reset(self): 91 """Reset the connection timer and try again very soon.""" 92 self._delay = self.initialDelay 93 if self._timer: 94 self._timer.reset(1.0) 95 96 def getDelayUntilNextAttempt(self): 97 if not self._timer: 98 return None 99 return self._timer.getTime() - time.time() 100 101 def getLastFailure(self): 102 return self._last_failure 103 104 def getReconnectionInfo(self): 105 return self._reconnectionInfo 106 107 def _connect(self): 108 self._reconnectionInfo._set_state("connecting") 109 self._reconnectionInfo._set_last_attempt(time.time()) 110 d = self._tub.getReference(self._url) 111 ci = self._tub.getConnectionInfoForFURL(self._url) 112 self._reconnectionInfo._set_connection_info(ci) 113 d.addCallbacks(self._connected, self._failed) 114 115 def _connected(self, rref): 116 if not self._active: 117 return 118 self._reconnectionInfo._set_state("connected") 119 ci = self._tub.getConnectionInfoForFURL(self._url) 120 self._reconnectionInfo._set_connection_info(ci) 121 self._last_failure = None 122 rref.notifyOnDisconnect(self._disconnected) 123 cb, args, kwargs = self._observer 124 cb(rref, *args, **kwargs) 125 126 def _failed(self, f): 127 self._last_failure = f 128 ci = getattr(f, "_connectionInfo", None) 129 if ci: 130 self._reconnectionInfo._set_connection_info(ci) 131 132 # I'd like to quietly handle "normal" problems (basically TCP 133 # failures and NegotiationErrors that result from the target either 134 # not speaking Foolscap or not hosting the Tub that we want), but not 135 # hide coding errors or version mismatches. 136 log_it = self.verbose 137 138 # log certain unusual errors, even without self.verbose, to help 139 # people figure out why their reconnectors aren't connecting, since 140 # the usual getReference errback chain isn't active. This doesn't 141 # include ConnectError (which is a parent class of 142 # ConnectionRefusedError), so it won't fire if we just had a bad 143 # host/port, since the way we use connection hints will provoke that 144 # all the time. 145 if f.check(RemoteNegotiationError, NegotiationError): 146 log_it = True 147 if log_it: 148 log.msg("Reconnector._failed (furl=%s): %s" % (self._url, f)) 149 if not self._active: 150 return 151 self._delay = min(self._delay * self.factor, self.maxDelay) 152 if self.jitter: 153 self._delay = random.normalvariate(self._delay, 154 self._delay * self.jitter) 155 self._retry() 156 157 def _disconnected(self): 158 self._delay = self.initialDelay 159 self._retry() 160 161 def _retry(self): 162 if not self._active: 163 return 164 if self.verbose: 165 log.msg("Reconnector scheduling retry in %ds for %s" % 166 (self._delay, self._url)) 167 self._reconnectionInfo._set_state("waiting") 168 self._reconnectionInfo._set_next_attempt(time.time() + self._delay) 169 self._timer = reactor.callLater(self._delay, self._timer_expired) 170 171 def _timer_expired(self): 172 self._timer = None 173 self._connect() 174 175