1# -*- test-case-name: foolscap.test.test_pb -*- 2 3import os.path, weakref, binascii, re 4import six 5from warnings import warn 6from zope.interface import implementer 7from twisted.internet import (reactor, defer, protocol, error, interfaces, 8 endpoints) 9from twisted.application import service 10from twisted.python.failure import Failure 11from twisted.python.deprecate import deprecated 12from twisted.python.versions import Version 13 14from foolscap import ipb, base32, negotiate, broker, eventual, storage 15from foolscap import connection, util, info 16from foolscap.connections import tcp 17from foolscap.referenceable import SturdyRef 18from .furl import BadFURLError 19from foolscap.tokens import PBError, BananaError, WrongTubIdError, \ 20 WrongNameError, NoLocationError 21from foolscap.reconnector import Reconnector 22from foolscap.logging import log as flog 23from foolscap.logging import log 24from foolscap.logging import publish as flog_publish 25from foolscap.logging.log import UNUSUAL 26 27from foolscap import crypto 28 29class Listener(protocol.ServerFactory, service.Service): 30 """I am responsible for a single listening port, which connects to a 31 single Tub. I listen on an Endpoint, and can be constructed with either 32 the Endpoint, or a string (which I will pass to serverFromString()).""" 33 # this also serves as the ServerFactory 34 35 def __init__(self, tub, endpoint_or_description, _test_options={}, 36 negotiationClass=negotiate.Negotiation): 37 assert isinstance(tub, Tub) 38 self._tub = tub 39 40 if interfaces.IStreamServerEndpoint.providedBy(endpoint_or_description): 41 self._ep = endpoint_or_description 42 elif isinstance(endpoint_or_description, str): 43 self._ep = endpoints.serverFromString(reactor, 44 endpoint_or_description) 45 else: 46 raise TypeError("I require an endpoint, or a string description that can be turned into one") 47 self._lp = None 48 49 self._test_options = _test_options 50 self._negotiationClass = negotiationClass 51 self._redirects = {} 52 53 def startService(self): 54 service.Service.startService(self) 55 d = self._ep.listen(self) 56 def _listening(lp): 57 self._lp = lp 58 d.addCallback(_listening) 59 60 def stopService(self): 61 service.Service.stopService(self) 62 if self._lp: 63 return self._lp.stopListening() 64 65 @deprecated(Version("Foolscap", 0, 12, 0), 66 # "please use .." 67 "pre-allocated port numbers") 68 def getPortnum(self): 69 """When this Listener was created with a port string of '0' or 70 'tcp:0' (meaning 'please allocate me something'), and if the Listener 71 is active (it is attached to a Tub which is in the 'running' state), 72 this method will return the port number that was allocated. This is 73 useful for the following pattern:: 74 75 t = Tub() 76 l = t.listenOn('tcp:0') 77 t.setLocation('localhost:%d' % l.getPortnum()) 78 """ 79 assert self._lp 80 return self._lp.getHost().port 81 82 def __repr__(self): 83 return ("<Listener at 0x%x on %s with tub %s>" % 84 (abs(id(self)), str(self._ep), str(self._tub.tubID))) 85 86 def addRedirect(self, tubID, location): 87 assert tubID is not None 88 self._redirects[tubID] = location 89 def removeRedirect(self, tubID): 90 del self._redirects[tubID] 91 92 def buildProtocol(self, addr): 93 """Return a Broker attached to me (as the service provider). 94 """ 95 lp = log.msg("%s accepting connection from %s" % (self, addr), 96 addr=(addr.host, addr.port), 97 facility="foolscap.listener") 98 proto = self._negotiationClass(logparent=lp) 99 ci = info.ConnectionInfo() 100 ci._set_listener_description(self._describe()) 101 ci._set_listener_status("negotiating") 102 proto.initServer(self, ci) 103 proto.factory = self 104 return proto 105 106 def lookupTubID(self, tubID): 107 tubID = six.ensure_str(tubID) 108 tub = None 109 if tubID == self._tub.tubID: 110 tub = self._tub 111 return (tub, self._redirects.get(tubID)) 112 113 def _describe(self): 114 desc = "Listener" 115 if self._lp: 116 desc += " on %s" % str(self._lp.getHost()) 117 return desc 118 119def generateSwissnumber(bits): 120 bytes = os.urandom(bits//8) 121 return base32.encode(bytes) 122 123@implementer(ipb.ITub) 124class Tub(service.MultiService): 125 """I am a presence in the PB universe, also known as a Tub. 126 127 I am a Service (in the twisted.application.service.Service sense), 128 so you either need to call my startService() method before using me, 129 or setServiceParent() me to a running service. 130 131 This is the primary entry point for all PB-using applications, both 132 clients and servers. 133 134 I am known to the outside world by a base URL, which may include 135 authentication information (a yURL). This is my 'TubID'. 136 137 I contain Referenceables, and manage RemoteReferences to Referenceables 138 that live in other Tubs. 139 140 141 @param certData: if provided, use it as a certificate rather than 142 generating a new one. This is a PEM-encoded 143 private/public keypair, as returned by Tub.getCertData() 144 145 @param certFile: if provided, the Tub will store its certificate in 146 this file. If the file does not exist when the Tub is 147 created, the Tub will generate a new certificate and 148 store it here. If the file does exist, the certificate 149 will be loaded from this file. 150 151 The simplest way to use the Tub is to choose a long-term 152 location for the certificate, use certFile= to tell the 153 Tub about it, and then let the Tub manage its own 154 certificate. 155 156 You may provide certData, or certFile, (or neither), but 157 not both. 158 159 @param _test_options: a dictionary of options that can influence 160 connection connection negotiation. Currently 161 defined keys are: 162 - debug_slow: if True, wait half a second between 163 each negotiation response 164 165 @ivar brokers: maps TubIDs to L{Broker} instances 166 167 @ivar referenceToName: maps Referenceable to a name 168 @ivar nameToReference: maps name to Referenceable 169 170 @type tubID: string 171 @ivar tubID: a global identifier for this Tub, possibly including 172 authentication information, hash of SSL certificate 173 174 """ 175 176 unsafeTracebacks = True # TODO: better way to enable this 177 logLocalFailures = False 178 logRemoteFailures = False 179 debugBanana = False 180 NAMEBITS = 160 # length of swissnumber for each reference 181 TUBIDBITS = 16 # length of non-crypto tubID 182 negotiationClass = negotiate.Negotiation 183 brokerClass = broker.Broker 184 keepaliveTimeout = 4*60 # ping when connection has been idle this long 185 disconnectTimeout = None # disconnect after this much idle time 186 tubID = None 187 188 def __init__(self, certData=None, certFile=None, _test_options={}): 189 service.MultiService.__init__(self) 190 self.setup(_test_options) 191 if certFile: 192 self.setupEncryptionFile(certFile) 193 else: 194 self.setupEncryption(certData) 195 196 def __repr__(self): 197 return "<Tub id=%s>" % self.tubID 198 199 def setupEncryptionFile(self, certFile): 200 try: 201 certData = open(certFile, "rb").read() 202 except EnvironmentError: 203 certData = None 204 self.setupEncryption(certData) 205 206 if certData is None: 207 f = open(certFile, "wb") 208 f.write(self.getCertData()) 209 f.close() 210 211 def setupEncryption(self, certData): 212 if certData: 213 cert = crypto.loadCertificate(certData) 214 else: 215 cert = self.createCertificate() 216 self.myCertificate = cert 217 self.tubID = crypto.digest32(cert.digest("sha1")) 218 219 def make_incarnation(self): 220 unique = six.ensure_str(binascii.b2a_hex(os.urandom(8))) 221 # TODO: it'd be nice to have a sequential component, so incarnations 222 # could be ordered, but it requires disk space 223 sequential = None 224 self.incarnation = (unique, sequential) 225 self.incarnation_string = unique 226 227 def getIncarnationString(self): 228 return self.incarnation_string 229 230 def setup(self, _test_options): 231 self._test_options = _test_options 232 self.logger = flog.theLogger 233 self.listeners = [] 234 self.locationHints = [] 235 236 # duplicate-connection management 237 self.make_incarnation() 238 239 # the master_table records the master-seqnum we used for the last 240 # established connection with the given tubid. It only contains 241 # entries for which we were the master. 242 self.master_table = {} # k:tubid, v:seqnum 243 # the slave_table records the (master-IR,master-seqnum) pair for the 244 # last established connection with the given tubid. It only contains 245 # entries for which we were the slave. 246 self.slave_table = {} # k:tubid, v:(master-IR,seqnum) 247 248 # local Referenceables 249 self.nameToReference = weakref.WeakValueDictionary() 250 self.referenceToName = weakref.WeakKeyDictionary() 251 self.strongReferences = [] 252 self.nameLookupHandlers = [] 253 254 # remote stuff. Most of these use a TubRef as a dictionary key 255 self.tubConnectors = {} # maps TubRef to a TubConnector 256 self.waitingForBrokers = {} # maps TubRef to list of Deferreds 257 self.brokers = {} # maps TubRef to a Broker that connects to them 258 self.reconnectors = [] 259 260 self._connectionHandlers = {"tcp": tcp.default()} 261 self._activeConnectors = [] 262 263 self._pending_getReferences = [] # list of (d, furl) pairs 264 265 self._logport = None 266 self._logport_furl = None 267 self._logport_furlfile = None 268 269 self._log_gatherer_furls = [] 270 self._log_gatherer_furlfile = None 271 self._log_gatherer_connectors = {} # maps furl to reconnector 272 273 self._handle_old_duplicate_connections = False 274 self._expose_remote_exception_types = True 275 self.accept_gifts = True 276 277 def setOption(self, name, value): 278 name = six.ensure_str(name) 279 if name == "logLocalFailures": 280 # log (with log.err) any exceptions that occur during the 281 # execution of a local Referenceable's method, which is invoked 282 # on behalf of a remote caller. These exceptions are reported to 283 # the remote caller through their callRemote's Deferred as usual: 284 # this option enables logging on the callee's side (i.e. our 285 # side) as well. 286 # 287 # TODO: This does not yet include Violations which were raised 288 # because the inbound callRemote had arguments that didn't meet 289 # our specifications. But it should. 290 self.logLocalFailures = bool(value) 291 elif name == "logRemoteFailures": 292 # log (with log.err) any exceptions that occur during the 293 # execution of a remote Referenceabe's method, invoked on behalf 294 # of a local RemoteReference.callRemote(). These exceptions are 295 # reported to our local caller through the usual Deferred.errback 296 # mechanism: this enables logging on the caller's side (i.e. our 297 # side) as well. 298 self.logRemoteFailures = bool(value) 299 elif name == "keepaliveTimeout": 300 self.keepaliveTimeout = int(value) 301 elif name == "disconnectTimeout": 302 self.disconnectTimeout = int(value) 303 elif name == "logport-furlfile": 304 self.setLogPortFURLFile(value) 305 elif name == "log-gatherer-furl": 306 self.setLogGathererFURL(value) 307 elif name == "log-gatherer-furlfile": 308 self.setLogGathererFURLFile(value) 309 elif name == "bridge-twisted-logs": 310 assert value is not False, "cannot unbridge twisted logs" 311 if value is True: 312 return flog.bridgeLogsFromTwisted(self.tubID) 313 else: 314 # for tests, bridge logs from a specific twisted LogPublisher 315 return flog.bridgeLogsFromTwisted(self.tubID, 316 twisted_logger=value) 317 elif name == "handle-old-duplicate-connections": 318 if value is True: 319 value = 60 320 self._handle_old_duplicate_connections = int(value) 321 elif name == "expose-remote-exception-types": 322 self._expose_remote_exception_types = bool(value) 323 elif name == "accept-gifts": 324 self.accept_gifts = bool(value) 325 else: 326 raise KeyError("unknown option name '%s'" % name) 327 328 def removeAllConnectionHintHandlers(self): 329 self._connectionHandlers = {} 330 331 def addConnectionHintHandler(self, hint_type, handler): 332 assert ipb.IConnectionHintHandler.providedBy(handler) 333 self._connectionHandlers[six.ensure_str(hint_type)] = handler 334 335 def setLogGathererFURL(self, gatherer_furl_or_furls): 336 assert not self._log_gatherer_furls 337 if isinstance(gatherer_furl_or_furls, (type(b""), type(u""))): 338 self._log_gatherer_furls.append(gatherer_furl_or_furls) 339 else: 340 self._log_gatherer_furls.extend(gatherer_furl_or_furls) 341 self._maybeConnectToGatherer() 342 343 def setLogGathererFURLFile(self, gatherer_furlfile): 344 assert not self._log_gatherer_furlfile 345 self._log_gatherer_furlfile = gatherer_furlfile 346 self._maybeConnectToGatherer() 347 348 def _maybeConnectToGatherer(self): 349 if not self.locationHints: 350 return 351 furls = [] 352 if self._log_gatherer_furls: 353 furls.extend(self._log_gatherer_furls) 354 if self._log_gatherer_furlfile: 355 try: 356 # allow multiple lines 357 for line in open(self._log_gatherer_furlfile, "r").readlines(): 358 furl = line.strip() 359 if furl: 360 furls.append(furl) 361 except EnvironmentError: 362 pass 363 for f in furls: 364 if f in self._log_gatherer_connectors: 365 continue 366 connector = self.connectTo(f, self._log_gatherer_connected) 367 self._log_gatherer_connectors[f] = connector 368 369 def _log_gatherer_connected(self, rref): 370 # we want the logport's furl to be nailed down now, so we'll use the 371 # right (persistent) name even if the user never calls 372 # tub.getLogPortFURL() directly. 373 ignored = self.getLogPortFURL() 374 del ignored 375 tubID = six.ensure_binary(self.tubID) 376 rref.callRemoteOnly('logport', tubID, self.getLogPort()) 377 378 379 def getLogPort(self): 380 if not self.locationHints: 381 raise NoLocationError 382 return self._maybeCreateLogPort() 383 384 def _maybeCreateLogPort(self): 385 if not self._logport: 386 self._logport = flog_publish.LogPublisher(self.logger) 387 return self._logport 388 389 def setLogPortFURLFile(self, furlfile): 390 self._logport_furlfile = furlfile 391 self._maybeCreateLogPortFURLFile() 392 393 def _maybeCreateLogPortFURLFile(self): 394 if not self._logport_furlfile: 395 return 396 if not self.locationHints: 397 return 398 # getLogPortFURL() creates the logport-furlfile as a side-effect 399 ignored = self.getLogPortFURL() 400 del ignored 401 402 def getLogPortFURL(self): 403 if not self.locationHints: 404 raise NoLocationError 405 if self._logport_furl: 406 return self._logport_furl 407 furlfile = self._logport_furlfile 408 # the Tub must be running and configured (setLocation) by now 409 self._logport_furl = self.registerReference(self.getLogPort(), 410 furlFile=furlfile) 411 return self._logport_furl 412 413 414 def log(self, *args, **kwargs): 415 kwargs['tubID'] = self.tubID 416 return log.msg(*args, **kwargs) 417 418 def createCertificate(self): 419 return crypto.createCertificate() 420 421 def getCertData(self): 422 # the bytes returned by this method can be used as the certData= 423 # argument to create a new Tub with the same identity. TODO: actually 424 # test this, I don't know if dump/keypair.newCertificate is the right 425 # pair of methods. 426 return six.ensure_binary(self.myCertificate.dumpPEM()) 427 428 def setLocation(self, *hints): 429 """Tell this service what its location is: a host:port description of 430 how to reach it from the outside world. You need to use this because 431 the Tub can't do it without help. If you do a 432 C{s.listenOn('tcp:1234')}, and the host is known as 433 C{foo.example.com}, then it would be appropriate to do:: 434 435 s.setLocation('foo.example.com:1234') 436 437 You must set the location before you can register any references. 438 439 Tubs can have multiple location hints, just provide multiple 440 arguments. """ 441 442 if self.locationHints: 443 raise PBError("Tub.setLocation() can only be called once") 444 self.locationHints = [six.ensure_str(hint) for hint in hints] 445 self._maybeCreateLogPortFURLFile() 446 self._maybeConnectToGatherer() 447 448 @deprecated(Version("Foolscap", 0, 12, 0), 449 # "please use .." 450 "user-provided hostnames") 451 def setLocationAutomatically(self, *extra_addresses): 452 """Determine one of this host's publically-visible IP addresses and 453 use it to set our location. This uses whatever source address would 454 be used to get to a well-known public host (A.ROOT-SERVERS.NET), 455 which is effectively the interface on which a default route lives. 456 This is neither very pretty (IP address instead of hostname) nor 457 guaranteed to work (it may very well be a 192.168 'private' address), 458 but for publically-visible hosts this will probably produce a useable 459 FURL. 460 461 This method returns a Deferred that will fire once the location is 462 actually established. Calls to registerReference() must be put off 463 until the location has been set. And of course, you must call 464 listenOn() before calling setLocationAutomatically().""" 465 466 # first, make sure the reactor is actually running, by using the 467 # eventual-send queue 468 d = eventual.fireEventually() 469 470 def _reactor_running(res): 471 assert self.running 472 # we can't use get_local_ip_for until the reactor is running 473 return util.get_local_ip_for() 474 d.addCallback(_reactor_running) 475 476 def _got_local_ip(local_address): 477 local_addresses = set(extra_addresses) 478 if local_address: 479 local_addresses.add(local_address) 480 local_addresses.add("127.0.0.1") 481 locations = set() 482 for l in self.getListeners(): 483 portnum = l.getPortnum() 484 for addr in local_addresses: 485 locations.add("%s:%d" % (addr, portnum)) 486 locations = list(locations) 487 locations.sort() 488 assert len(locations) >= 1 489 location = ",".join(locations) 490 self.setLocation(location) 491 d.addCallback(_got_local_ip) 492 return d 493 494 def listenOn(self, what, _test_options={}): 495 """Start listening for connections. 496 497 @type what: string 498 @param what: a L{twisted.internet.endpoints.serverFromString} -style 499 description 500 @param _test_options: a dictionary of options that can influence 501 connection negotiation before the target Tub 502 has been determined 503 504 @return: The Listener object that was created. This can be used to 505 stop listening later on.""" 506 507 if isinstance(what, (six.binary_type, six.text_type)): 508 what = six.ensure_str(what) 509 510 if what in ("0", "tcp:0"): 511 warningString = ("Tub.listenOn('tcp:0') was deprecated " 512 "in Foolscap 0.12.0; please use pre-allocated " 513 "port numbers instead") 514 warn(warningString, DeprecationWarning, stacklevel=2) 515 516 if isinstance(what, six.string_types) and re.search(r"^\d+$", what): 517 warn("Tub.listenOn('12345') was deprecated " 518 "in Foolscap 0.12.0; please use qualified endpoint " 519 "descriptions like 'tcp:12345'", 520 DeprecationWarning, stacklevel=2) 521 what = "tcp:%s" % what 522 523 l = Listener(self, what, _test_options, self.negotiationClass) 524 self.listeners.append(l) 525 l.setServiceParent(self) 526 return l 527 528 def stopListeningOn(self, l): 529 # this returns a Deferred when the port is shut down 530 self.listeners.remove(l) 531 return l.disownServiceParent() 532 533 def getListeners(self): 534 """Return the set of Listener objects that allow the outside world to 535 connect to this Tub.""" 536 return self.listeners[:] 537 538 def getTubID(self): 539 return self.tubID 540 def getShortTubID(self): 541 return self.tubID[:4] 542 543 def getConnectionInfoForFURL(self, furl): 544 try: 545 tubref = SturdyRef(furl).getTubRef() 546 except (ValueError, BadFURLError): 547 return None # unparseable FURL 548 return self._getConnectionInfoForTubRef(tubref) 549 550 def _getConnectionInfoForTubRef(self, tubref): 551 if tubref in self.brokers: 552 return self.brokers[tubref].getConnectionInfo() 553 if tubref in self.tubConnectors: 554 return self.tubConnectors[tubref].getConnectionInfo() 555 return None # currently have no established or in-progress connection 556 557 def connectorStarted(self, c): 558 assert self.running 559 # TODO: why a list? shouldn't there only ever be one TubConnector? 560 self._activeConnectors.append(c) 561 def connectorFinished(self, c): 562 if c in self._activeConnectors: 563 self._activeConnectors.remove(c) 564 565 def startService(self): 566 service.MultiService.startService(self) 567 for d,sturdy in self._pending_getReferences: 568 d1 = eventual.fireEventually(sturdy) 569 d1.addCallback(self.getReference) 570 d1.addBoth(lambda res, d=d: d.callback(res)) 571 del self._pending_getReferences 572 for rc in self.reconnectors: 573 eventual.eventually(rc.startConnecting, self) 574 575 def _tubsAreNotRestartable(self, *args, **kwargs): 576 raise RuntimeError("Sorry, but Tubs cannot be restarted.") 577 def _tubHasBeenShutDown(self, *args, **kwargs): 578 raise RuntimeError("Sorry, but this Tub has been shut down.") 579 580 def stopService(self): 581 # note that once you stopService a Tub, I cannot be restarted. (at 582 # least this code is not designed to make that possible.. it might be 583 # doable in the future). 584 assert self.running 585 self.startService = self._tubsAreNotRestartable 586 self.getReference = self._tubHasBeenShutDown 587 self.connectTo = self._tubHasBeenShutDown 588 589 # Tell everything to shut down now. We assume that it will stop 590 # twitching by the next tick, so Trial unit tests won't complain 591 # about a dirty reactor. We wait on a few things that might not 592 # behave. 593 dl = [] 594 for rc in list(self.reconnectors): 595 rc.stopConnecting() 596 del self.reconnectors 597 for c in list(self._activeConnectors): 598 c.shutdown() 599 why = Failure(error.ConnectionDone("Tub.stopService was called")) 600 for b in list(self.brokers.values()): 601 broker_disconnected = defer.Deferred() 602 dl.append(broker_disconnected) 603 b._notifyOnConnectionLost( 604 lambda d=broker_disconnected: d.callback(None) 605 ) 606 b.shutdown(why, fireDisconnectWatchers=False) 607 608 d = defer.DeferredList(dl) 609 d.addCallback(lambda _: service.MultiService.stopService(self)) 610 d.addCallback(eventual.fireEventually) 611 return d 612 613 def generateSwissnumber(self, bits): 614 return generateSwissnumber(bits) 615 616 def buildURL(self, name): 617 # TODO: IPv6 dotted-quad addresses have colons, but need to have 618 # host:port 619 hints = ",".join(self.locationHints) 620 return "pb://" + self.tubID + "@" + hints + "/" + name 621 #hints = b",".join(self.locationHints) 622 #return b"pb://" + self.tubID + b"@" + hints + b"/" + name 623 624 def registerReference(self, ref, name=None, furlFile=None): 625 """Make a Referenceable available to the outside world. A URL is 626 returned which can be used to access this object. This registration 627 will remain in effect (and the Tub will retain a reference to the 628 object to keep it meaningful) until explicitly unregistered, or the 629 Tub is shut down. 630 631 @type name: string (optional) 632 @param name: if provided, the object will be registered with this 633 name. If not, a random (unguessable) string will be 634 used. 635 636 @param furlFile: if provided, get the name from this file (if 637 it exists), and write the new FURL to this file. 638 If 'name=' is also provided, it is used for the 639 name, but the FURL is still written to this file. 640 641 @rtype: string 642 @return: the URL which points to this object. This URL can be passed 643 to Tub.getReference() in any Tub on any host which can reach this 644 one. 645 """ 646 647 if not self.locationHints: 648 raise NoLocationError("you must setLocation() before " 649 "you can registerReference()") 650 oldfurl = None 651 if furlFile: 652 try: 653 oldfurl = open(furlFile, "r").read().strip() 654 except EnvironmentError: 655 pass 656 if oldfurl: 657 sr = SturdyRef(oldfurl) 658 if name is None: 659 name = sr.name 660 if self.tubID != sr.tubID: 661 raise WrongTubIdError("I cannot keep using the old FURL from %s" 662 " because it does not have the same" 663 " TubID as I do (%s)" % 664 (furlFile, self.tubID)) 665 if name != sr.name: 666 raise WrongNameError("I cannot keep using the old FURL from %s" 667 " because you called registerReference" 668 " with a new name (%s)" % 669 (furlFile, name)) 670 name = self._assignName(ref, name) 671 assert name 672 if ref not in self.strongReferences: 673 self.strongReferences.append(ref) 674 furl = self.buildURL(name) 675 if furlFile: 676 need_to_chmod = not os.path.exists(furlFile) 677 f = open(furlFile, "w") 678 f.write(furl + "\n") 679 f.close() 680 if need_to_chmod: 681 # XXX: open-to-chmod race here 682 os.chmod(furlFile, 0o600) 683 return furl 684 685 # this is called by either registerReference or by 686 # getOrCreateURLForReference 687 def _assignName(self, ref, preferred_name=None): 688 """Make a Referenceable available to the outside world, but do not 689 retain a strong reference to it. If we must create a new name, use 690 preferred_name. If that is None, use a random unguessable name. 691 """ 692 if not self.locationHints: 693 # without a location, there is no point in giving it a name 694 return None 695 if ref in self.referenceToName: 696 return self.referenceToName[ref] 697 name = preferred_name 698 if not name: 699 name = self.generateSwissnumber(self.NAMEBITS) 700 self.referenceToName[ref] = name 701 self.nameToReference[name] = ref 702 return name 703 704 def getReferenceForName(self, name): 705 if name in self.nameToReference: 706 return self.nameToReference[name] 707 for lookup in self.nameLookupHandlers: 708 ref = lookup(name) 709 if ref: 710 if ref not in self.referenceToName: 711 self.referenceToName[ref] = name 712 return ref 713 # don't reveal the full swissnum 714 hint = name[:2] 715 raise KeyError("unable to find reference for name starting with '%s'" 716 % hint) 717 718 def getReferenceForURL(self, url): 719 # TODO: who should this be used by? 720 sturdy = SturdyRef(url) 721 assert sturdy.tubID == self.tubID 722 return self.getReferenceForName(sturdy.name) 723 724 def getOrCreateURLForReference(self, ref): 725 """Return the global URL for the reference, if there is one, or None 726 if there is not.""" 727 name = self._assignName(ref) 728 if name: 729 return self.buildURL(name) 730 return None 731 732 def revokeReference(self, ref): 733 # TODO 734 pass 735 736 def unregisterURL(self, url): 737 sturdy = SturdyRef(url) 738 name = sturdy.name 739 ref = self.nameToReference[name] 740 del self.nameToReference[name] 741 del self.referenceToName[ref] 742 self.revokeReference(ref) 743 744 def unregisterReference(self, ref): 745 name = self.referenceToName[ref] 746 url = self.buildURL(name) 747 sturdy = SturdyRef(url) 748 name = sturdy.name 749 del self.nameToReference[name] 750 del self.referenceToName[ref] 751 if ref in self.strongReferences: 752 self.strongReferences.remove(ref) 753 self.revokeReference(ref) 754 755 def registerNameLookupHandler(self, lookup): 756 """Add a function to help convert names to Referenceables. 757 758 When remote systems pass a FURL to their Tub.getReference(), our Tub 759 will be asked to locate a Referenceable for the name inside that 760 furl. The normal mechanism for this is to look at the table 761 maintained by registerReference() and unregisterReference(). If the 762 name does not exist in that table, other 'lookup handler' functions 763 are given a chance. Each lookup handler is asked in turn, and the 764 first which returns a non-None value wins. 765 766 This may be useful for cases where the furl represents an object that 767 lives on disk, or is generated on demand: rather than creating all 768 possible Referenceables at startup, the lookup handler can create or 769 retrieve the objects only when someone asks for them. 770 771 Note that constructing the FURLs of these objects may be non-trivial. 772 It is safe to create an object, use tub.registerReference in one 773 invocation of a program to obtain (and publish) the furl, parse the 774 furl to extract the name, save the contents of the object on disk, 775 then in a later invocation of the program use a lookup handler to 776 retrieve the object from disk. This approach means the objects that 777 are created in a given invocation stick around (inside 778 tub.strongReferences) for the rest of that invocation. An alternatve 779 approach is to create the object but *not* use tub.registerReference, 780 but in that case you have to construct the FURL yourself, and the Tub 781 does not currently provide any support for doing this robustly. 782 783 @param lookup: a callable which accepts a name (as a string) and 784 returns either a Referenceable or None. Note that 785 these strings should not contain a slash, a question 786 mark, or an ampersand, as these are reserved in the 787 FURL for later expansion (to add parameters beyond the 788 object name) 789 """ 790 self.nameLookupHandlers.append(lookup) 791 792 def unregisterNameLookupHandler(self, lookup): 793 self.nameLookupHandlers.remove(lookup) 794 795 def getReference(self, sturdyOrURL): 796 """Acquire a RemoteReference for the given SturdyRef/URL. 797 798 The Tub must be running (i.e. Tub.startService()) when this is 799 invoked. Future releases may relax this requirement. 800 801 @return: a Deferred that fires with the RemoteReference. Any failures 802 are returned asynchronously. 803 """ 804 805 return defer.maybeDeferred(self._getReference, sturdyOrURL) 806 807 def _getReference(self, sturdyOrURL): 808 if isinstance(sturdyOrURL, SturdyRef): 809 sturdy = sturdyOrURL 810 else: 811 sturdyOrURL = six.ensure_str(sturdyOrURL) 812 sturdy = SturdyRef(sturdyOrURL) 813 814 if not self.running: 815 # queue their request for service once the Tub actually starts 816 log.msg("Tub.getReference(%s) queued until Tub.startService called" 817 % sturdy, facility="foolscap.tub") 818 d = defer.Deferred() 819 self._pending_getReferences.append((d, sturdy)) 820 return d 821 822 name = sturdy.name 823 d = self.getBrokerForTubRef(sturdy.getTubRef()) 824 d.addCallback(lambda b: b.getYourReferenceByName(name)) 825 return d 826 827 def connectTo(self, _furl, _cb, *args, **kwargs): 828 """Establish (and maintain) a connection to a given PBURL. 829 830 I establish a connection to the PBURL and run a callback to inform 831 the caller about the newly-available RemoteReference. If the 832 connection is lost, I schedule a reconnection attempt for the near 833 future. If that one fails, I keep trying at longer and longer 834 intervals (exponential backoff). 835 836 I accept a callback which will be fired each time a connection 837 attempt succeeds. This callback is run with the new RemoteReference 838 and any additional args/kwargs provided to me. The callback should 839 then use rref.notifyOnDisconnect() to get a message when the 840 connection goes away. At some point after it goes away, the 841 Reconnector will reconnect. 842 843 The Tub must be running (i.e. Tub.startService()) when this is 844 invoked. Future releases may relax this requirement. 845 846 I return a Reconnector object. When you no longer want to maintain 847 this connection, call the stopConnecting() method on the Reconnector. 848 I promise to not invoke your callback after you've called 849 stopConnecting(), even if there was already a connection attempt in 850 progress. If you had an active connection before calling 851 stopConnecting(), you will still have access to it, until it breaks 852 on its own. (I will not attempt to break existing connections, I will 853 merely stop trying to create new ones). All my Reconnector objects 854 will be shut down when the Tub is stopped. 855 856 Usage:: 857 858 def _got_ref(rref, arg1, arg2): 859 rref.callRemote('hello again') 860 # etc 861 rc = tub.connectTo(_got_ref, 'arg1', 'arg2') 862 ... 863 rc.stopConnecting() # later 864 """ 865 866 rc = Reconnector(_furl, _cb, args, kwargs) 867 if self.running: 868 rc.startConnecting(self) 869 else: 870 self.log("Tub.connectTo(%s) queued until Tub.startService called" 871 % _furl, level=UNUSUAL) 872 self.reconnectors.append(rc) 873 return rc 874 875 def serialize(self, obj): 876 b = broker.StorageBroker(None) 877 b.setTub(self) 878 d = storage.serialize(obj, banana=b) 879 return d 880 881 def unserialize(self, data): 882 b = broker.StorageBroker(None) 883 b.setTub(self) 884 d = storage.unserialize(data, banana=b) 885 assert isinstance(d, defer.Deferred) 886 return d 887 888 # beyond here are internal methods, not for use by application code 889 890 # _removeReconnector is called by the Reconnector 891 def _removeReconnector(self, rc): 892 self.reconnectors.remove(rc) 893 894 def getBrokerForTubRef(self, tubref): 895 if tubref in self.brokers: 896 return defer.succeed(self.brokers[tubref]) 897 if tubref.getTubID() == self.tubID: 898 b = self._createLoopbackBroker(tubref) 899 # _createLoopbackBroker will call brokerAttached, which will add 900 # it to self.brokers 901 # TODO: stash this in self.brokers, so we don't create multiples 902 return defer.succeed(b) 903 904 d = defer.Deferred() 905 if tubref not in self.waitingForBrokers: 906 self.waitingForBrokers[tubref] = [] 907 self.waitingForBrokers[tubref].append(d) 908 909 if tubref not in self.tubConnectors: 910 # the TubConnector will call our brokerAttached when it finishes 911 # negotiation, which will fire waitingForBrokers[tubref]. 912 c = connection.TubConnector(self, tubref, self._connectionHandlers) 913 self.tubConnectors[tubref] = c 914 c.connect() 915 916 return d 917 918 def _createLoopbackBroker(self, tubref): 919 t1,t2 = broker.LoopbackTransport(), broker.LoopbackTransport() 920 t1.setPeer(t2); t2.setPeer(t1) 921 n = negotiate.Negotiation() 922 params = n.loopbackDecision() 923 ci = info.ConnectionInfo() 924 b1 = self.brokerClass(tubref, params, connectionInfo=ci) 925 b2 = self.brokerClass(tubref, params) 926 # we treat b1 as "our" broker, and b2 as "theirs", and we pretend 927 # that b2 has just connected to us. We keep track of b1, and b2 keeps 928 # track of us. 929 b1.setTub(self) 930 b2.setTub(self) 931 t1.protocol = b1; t2.protocol = b2 932 b1.makeConnection(t1); b2.makeConnection(t2) 933 ci._set_connected(True) 934 ci._set_winning_hint("loopback") 935 ci._set_connection_status("loopback", "connected") 936 ci._set_established_at(b1.creation_timestamp) 937 self.brokerAttached(tubref, b1, False) 938 return b1 939 940 def connectionFailed(self, tubref, why): 941 # we previously initiated an outbound TubConnector to this tubref, but 942 # it was unable to establish a connection. 'why' is the most useful 943 # Failure that occurred (i.e. it is a NegotiationError if we made it 944 # that far, otherwise it's a ConnectionFailed). 945 946 if tubref in self.tubConnectors: 947 del self.tubConnectors[tubref] 948 if tubref in self.brokers: 949 # oh, but fortunately an inbound connection must have succeeded. 950 # Nevermind. 951 return 952 953 # inform hopeful Broker-waiters that they aren't getting one 954 if tubref in self.waitingForBrokers: 955 waiting = self.waitingForBrokers[tubref] 956 del self.waitingForBrokers[tubref] 957 for d in waiting: 958 d.errback(why) 959 960 def brokerAttached(self, tubref, broker, isClient): 961 assert self.running 962 assert tubref 963 964 if tubref in self.tubConnectors: 965 # we initiated an outbound connection to this tubref 966 if not isClient: 967 # however, the connection we got was from an inbound 968 # connection. The completed (inbound) connection wins, so 969 # abandon the outbound TubConnector 970 self.tubConnectors[tubref].shutdown() 971 972 # we don't need the TubConnector any more 973 del self.tubConnectors[tubref] 974 975 if tubref in self.brokers: 976 # this shouldn't happen: acceptDecision is supposed to drop any 977 # existing old connection first. 978 self.log("ERROR: unexpected duplicate connection from %s" % tubref) 979 raise BananaError("unexpected duplicate connection") 980 self.brokers[tubref] = broker 981 982 # now inform everyone who's been waiting on it 983 if tubref in self.waitingForBrokers: 984 for d in self.waitingForBrokers[tubref]: 985 eventual.eventually(d.callback, broker) 986 del self.waitingForBrokers[tubref] 987 988 def brokerDetached(self, broker, why): 989 # a loopback connection will produce two Brokers that both use the 990 # same tubref. Both will shut down about the same time. Make sure 991 # this doesn't confuse us. 992 993 # the Broker will have already severed all active references 994 for tubref in list(self.brokers.keys()): 995 if self.brokers[tubref] is broker: 996 del self.brokers[tubref] 997 998 def debug_listBrokers(self): 999 # return a list of (tubref, inbound, outbound) tuples. The tubref 1000 # tells you which broker this is, 'inbound' is a list of 1001 # InboundDelivery objects (one per outstanding inbound message), and 1002 # 'outbound' is a list of PendingRequest objects (one per message 1003 # that's waiting on a remote broker to complete). 1004 output = [] 1005 all_brokers = list(self.brokers.items()) 1006 for tubref,_broker in all_brokers: 1007 inbound = _broker.inboundDeliveryQueue[:] 1008 outbound = [pr 1009 for (reqID, pr) in 1010 sorted(_broker.waitingForAnswers.items()) ] 1011 output.append( (str(tubref), inbound, outbound) ) 1012 return output 1013