1# -*- test-case-name: foolscap.test.test_pb -*-
2
3import os.path, weakref, binascii, re
4import six
5from warnings import warn
6from zope.interface import implementer
7from twisted.internet import (reactor, defer, protocol, error, interfaces,
8                              endpoints)
9from twisted.application import service
10from twisted.python.failure import Failure
11from twisted.python.deprecate import deprecated
12from twisted.python.versions import Version
13
14from foolscap import ipb, base32, negotiate, broker, eventual, storage
15from foolscap import connection, util, info
16from foolscap.connections import tcp
17from foolscap.referenceable import SturdyRef
18from .furl import BadFURLError
19from foolscap.tokens import PBError, BananaError, WrongTubIdError, \
20     WrongNameError, NoLocationError
21from foolscap.reconnector import Reconnector
22from foolscap.logging import log as flog
23from foolscap.logging import log
24from foolscap.logging import publish as flog_publish
25from foolscap.logging.log import UNUSUAL
26
27from foolscap import crypto
28
29class Listener(protocol.ServerFactory, service.Service):
30    """I am responsible for a single listening port, which connects to a
31    single Tub. I listen on an Endpoint, and can be constructed with either
32    the Endpoint, or a string (which I will pass to serverFromString())."""
33    # this also serves as the ServerFactory
34
35    def __init__(self, tub, endpoint_or_description, _test_options={},
36                 negotiationClass=negotiate.Negotiation):
37        assert isinstance(tub, Tub)
38        self._tub = tub
39
40        if interfaces.IStreamServerEndpoint.providedBy(endpoint_or_description):
41            self._ep = endpoint_or_description
42        elif isinstance(endpoint_or_description, str):
43            self._ep = endpoints.serverFromString(reactor,
44                                                  endpoint_or_description)
45        else:
46            raise TypeError("I require an endpoint, or a string description that can be turned into one")
47        self._lp = None
48
49        self._test_options = _test_options
50        self._negotiationClass = negotiationClass
51        self._redirects = {}
52
53    def startService(self):
54        service.Service.startService(self)
55        d = self._ep.listen(self)
56        def _listening(lp):
57            self._lp = lp
58        d.addCallback(_listening)
59
60    def stopService(self):
61        service.Service.stopService(self)
62        if self._lp:
63            return self._lp.stopListening()
64
65    @deprecated(Version("Foolscap", 0, 12, 0),
66                # "please use .."
67                "pre-allocated port numbers")
68    def getPortnum(self):
69        """When this Listener was created with a port string of '0' or
70        'tcp:0' (meaning 'please allocate me something'), and if the Listener
71        is active (it is attached to a Tub which is in the 'running' state),
72        this method will return the port number that was allocated. This is
73        useful for the following pattern::
74
75            t = Tub()
76            l = t.listenOn('tcp:0')
77            t.setLocation('localhost:%d' % l.getPortnum())
78        """
79        assert self._lp
80        return self._lp.getHost().port
81
82    def __repr__(self):
83        return ("<Listener at 0x%x on %s with tub %s>" %
84                (abs(id(self)), str(self._ep), str(self._tub.tubID)))
85
86    def addRedirect(self, tubID, location):
87        assert tubID is not None
88        self._redirects[tubID] = location
89    def removeRedirect(self, tubID):
90        del self._redirects[tubID]
91
92    def buildProtocol(self, addr):
93        """Return a Broker attached to me (as the service provider).
94        """
95        lp = log.msg("%s accepting connection from %s" % (self, addr),
96                     addr=(addr.host, addr.port),
97                     facility="foolscap.listener")
98        proto = self._negotiationClass(logparent=lp)
99        ci = info.ConnectionInfo()
100        ci._set_listener_description(self._describe())
101        ci._set_listener_status("negotiating")
102        proto.initServer(self, ci)
103        proto.factory = self
104        return proto
105
106    def lookupTubID(self, tubID):
107        tubID = six.ensure_str(tubID)
108        tub = None
109        if tubID == self._tub.tubID:
110            tub = self._tub
111        return (tub, self._redirects.get(tubID))
112
113    def _describe(self):
114        desc = "Listener"
115        if self._lp:
116            desc += " on %s" % str(self._lp.getHost())
117        return desc
118
119def generateSwissnumber(bits):
120    bytes = os.urandom(bits//8)
121    return base32.encode(bytes)
122
123@implementer(ipb.ITub)
124class Tub(service.MultiService):
125    """I am a presence in the PB universe, also known as a Tub.
126
127    I am a Service (in the twisted.application.service.Service sense),
128    so you either need to call my startService() method before using me,
129    or setServiceParent() me to a running service.
130
131    This is the primary entry point for all PB-using applications, both
132    clients and servers.
133
134    I am known to the outside world by a base URL, which may include
135    authentication information (a yURL). This is my 'TubID'.
136
137    I contain Referenceables, and manage RemoteReferences to Referenceables
138    that live in other Tubs.
139
140
141    @param certData: if provided, use it as a certificate rather than
142                     generating a new one. This is a PEM-encoded
143                     private/public keypair, as returned by Tub.getCertData()
144
145    @param certFile: if provided, the Tub will store its certificate in
146                     this file. If the file does not exist when the Tub is
147                     created, the Tub will generate a new certificate and
148                     store it here. If the file does exist, the certificate
149                     will be loaded from this file.
150
151                     The simplest way to use the Tub is to choose a long-term
152                     location for the certificate, use certFile= to tell the
153                     Tub about it, and then let the Tub manage its own
154                     certificate.
155
156                     You may provide certData, or certFile, (or neither), but
157                     not both.
158
159    @param _test_options: a dictionary of options that can influence
160                          connection connection negotiation. Currently
161                          defined keys are:
162                          - debug_slow: if True, wait half a second between
163                                        each negotiation response
164
165    @ivar brokers: maps TubIDs to L{Broker} instances
166
167    @ivar referenceToName: maps Referenceable to a name
168    @ivar nameToReference: maps name to Referenceable
169
170    @type tubID: string
171    @ivar tubID: a global identifier for this Tub, possibly including
172                 authentication information, hash of SSL certificate
173
174    """
175
176    unsafeTracebacks = True # TODO: better way to enable this
177    logLocalFailures = False
178    logRemoteFailures = False
179    debugBanana = False
180    NAMEBITS = 160 # length of swissnumber for each reference
181    TUBIDBITS = 16 # length of non-crypto tubID
182    negotiationClass = negotiate.Negotiation
183    brokerClass = broker.Broker
184    keepaliveTimeout = 4*60 # ping when connection has been idle this long
185    disconnectTimeout = None # disconnect after this much idle time
186    tubID = None
187
188    def __init__(self, certData=None, certFile=None, _test_options={}):
189        service.MultiService.__init__(self)
190        self.setup(_test_options)
191        if certFile:
192            self.setupEncryptionFile(certFile)
193        else:
194            self.setupEncryption(certData)
195
196    def __repr__(self):
197        return "<Tub id=%s>" % self.tubID
198
199    def setupEncryptionFile(self, certFile):
200        try:
201            certData = open(certFile, "rb").read()
202        except EnvironmentError:
203            certData = None
204        self.setupEncryption(certData)
205
206        if certData is None:
207            f = open(certFile, "wb")
208            f.write(self.getCertData())
209            f.close()
210
211    def setupEncryption(self, certData):
212        if certData:
213            cert = crypto.loadCertificate(certData)
214        else:
215            cert = self.createCertificate()
216        self.myCertificate = cert
217        self.tubID = crypto.digest32(cert.digest("sha1"))
218
219    def make_incarnation(self):
220        unique = six.ensure_str(binascii.b2a_hex(os.urandom(8)))
221        # TODO: it'd be nice to have a sequential component, so incarnations
222        # could be ordered, but it requires disk space
223        sequential = None
224        self.incarnation = (unique, sequential)
225        self.incarnation_string = unique
226
227    def getIncarnationString(self):
228        return self.incarnation_string
229
230    def setup(self, _test_options):
231        self._test_options = _test_options
232        self.logger = flog.theLogger
233        self.listeners = []
234        self.locationHints = []
235
236        # duplicate-connection management
237        self.make_incarnation()
238
239        # the master_table records the master-seqnum we used for the last
240        # established connection with the given tubid. It only contains
241        # entries for which we were the master.
242        self.master_table = {} # k:tubid, v:seqnum
243        # the slave_table records the (master-IR,master-seqnum) pair for the
244        # last established connection with the given tubid. It only contains
245        # entries for which we were the slave.
246        self.slave_table = {} # k:tubid, v:(master-IR,seqnum)
247
248        # local Referenceables
249        self.nameToReference = weakref.WeakValueDictionary()
250        self.referenceToName = weakref.WeakKeyDictionary()
251        self.strongReferences = []
252        self.nameLookupHandlers = []
253
254        # remote stuff. Most of these use a TubRef as a dictionary key
255        self.tubConnectors = {} # maps TubRef to a TubConnector
256        self.waitingForBrokers = {} # maps TubRef to list of Deferreds
257        self.brokers = {} # maps TubRef to a Broker that connects to them
258        self.reconnectors = []
259
260        self._connectionHandlers = {"tcp": tcp.default()}
261        self._activeConnectors = []
262
263        self._pending_getReferences = [] # list of (d, furl) pairs
264
265        self._logport = None
266        self._logport_furl = None
267        self._logport_furlfile = None
268
269        self._log_gatherer_furls = []
270        self._log_gatherer_furlfile = None
271        self._log_gatherer_connectors = {} # maps furl to reconnector
272
273        self._handle_old_duplicate_connections = False
274        self._expose_remote_exception_types = True
275        self.accept_gifts = True
276
277    def setOption(self, name, value):
278        name = six.ensure_str(name)
279        if name == "logLocalFailures":
280            # log (with log.err) any exceptions that occur during the
281            # execution of a local Referenceable's method, which is invoked
282            # on behalf of a remote caller. These exceptions are reported to
283            # the remote caller through their callRemote's Deferred as usual:
284            # this option enables logging on the callee's side (i.e. our
285            # side) as well.
286            #
287            # TODO: This does not yet include Violations which were raised
288            # because the inbound callRemote had arguments that didn't meet
289            # our specifications. But it should.
290            self.logLocalFailures = bool(value)
291        elif name == "logRemoteFailures":
292            # log (with log.err) any exceptions that occur during the
293            # execution of a remote Referenceabe's method, invoked on behalf
294            # of a local RemoteReference.callRemote(). These exceptions are
295            # reported to our local caller through the usual Deferred.errback
296            # mechanism: this enables logging on the caller's side (i.e. our
297            # side) as well.
298            self.logRemoteFailures = bool(value)
299        elif name == "keepaliveTimeout":
300            self.keepaliveTimeout = int(value)
301        elif name == "disconnectTimeout":
302            self.disconnectTimeout = int(value)
303        elif name == "logport-furlfile":
304            self.setLogPortFURLFile(value)
305        elif name == "log-gatherer-furl":
306            self.setLogGathererFURL(value)
307        elif name == "log-gatherer-furlfile":
308            self.setLogGathererFURLFile(value)
309        elif name == "bridge-twisted-logs":
310            assert value is not False, "cannot unbridge twisted logs"
311            if value is True:
312                return flog.bridgeLogsFromTwisted(self.tubID)
313            else:
314                # for tests, bridge logs from a specific twisted LogPublisher
315                return flog.bridgeLogsFromTwisted(self.tubID,
316                                                  twisted_logger=value)
317        elif name == "handle-old-duplicate-connections":
318            if value is True:
319                value = 60
320            self._handle_old_duplicate_connections = int(value)
321        elif name == "expose-remote-exception-types":
322            self._expose_remote_exception_types = bool(value)
323        elif name == "accept-gifts":
324            self.accept_gifts = bool(value)
325        else:
326            raise KeyError("unknown option name '%s'" % name)
327
328    def removeAllConnectionHintHandlers(self):
329        self._connectionHandlers = {}
330
331    def addConnectionHintHandler(self, hint_type, handler):
332        assert ipb.IConnectionHintHandler.providedBy(handler)
333        self._connectionHandlers[six.ensure_str(hint_type)] = handler
334
335    def setLogGathererFURL(self, gatherer_furl_or_furls):
336        assert not self._log_gatherer_furls
337        if isinstance(gatherer_furl_or_furls, (type(b""), type(u""))):
338            self._log_gatherer_furls.append(gatherer_furl_or_furls)
339        else:
340            self._log_gatherer_furls.extend(gatherer_furl_or_furls)
341        self._maybeConnectToGatherer()
342
343    def setLogGathererFURLFile(self, gatherer_furlfile):
344        assert not self._log_gatherer_furlfile
345        self._log_gatherer_furlfile = gatherer_furlfile
346        self._maybeConnectToGatherer()
347
348    def _maybeConnectToGatherer(self):
349        if not self.locationHints:
350            return
351        furls = []
352        if self._log_gatherer_furls:
353            furls.extend(self._log_gatherer_furls)
354        if self._log_gatherer_furlfile:
355            try:
356                # allow multiple lines
357                for line in open(self._log_gatherer_furlfile, "r").readlines():
358                    furl = line.strip()
359                    if furl:
360                        furls.append(furl)
361            except EnvironmentError:
362                pass
363        for f in furls:
364            if f in self._log_gatherer_connectors:
365                continue
366            connector = self.connectTo(f, self._log_gatherer_connected)
367            self._log_gatherer_connectors[f] = connector
368
369    def _log_gatherer_connected(self, rref):
370        # we want the logport's furl to be nailed down now, so we'll use the
371        # right (persistent) name even if the user never calls
372        # tub.getLogPortFURL() directly.
373        ignored = self.getLogPortFURL()
374        del ignored
375        tubID = six.ensure_binary(self.tubID)
376        rref.callRemoteOnly('logport', tubID, self.getLogPort())
377
378
379    def getLogPort(self):
380        if not self.locationHints:
381            raise NoLocationError
382        return self._maybeCreateLogPort()
383
384    def _maybeCreateLogPort(self):
385        if not self._logport:
386            self._logport = flog_publish.LogPublisher(self.logger)
387        return self._logport
388
389    def setLogPortFURLFile(self, furlfile):
390        self._logport_furlfile = furlfile
391        self._maybeCreateLogPortFURLFile()
392
393    def _maybeCreateLogPortFURLFile(self):
394        if not self._logport_furlfile:
395            return
396        if not self.locationHints:
397            return
398        # getLogPortFURL() creates the logport-furlfile as a side-effect
399        ignored = self.getLogPortFURL()
400        del ignored
401
402    def getLogPortFURL(self):
403        if not self.locationHints:
404            raise NoLocationError
405        if self._logport_furl:
406            return self._logport_furl
407        furlfile = self._logport_furlfile
408        # the Tub must be running and configured (setLocation) by now
409        self._logport_furl = self.registerReference(self.getLogPort(),
410                                                    furlFile=furlfile)
411        return self._logport_furl
412
413
414    def log(self, *args, **kwargs):
415        kwargs['tubID'] = self.tubID
416        return log.msg(*args, **kwargs)
417
418    def createCertificate(self):
419        return crypto.createCertificate()
420
421    def getCertData(self):
422        # the bytes returned by this method can be used as the certData=
423        # argument to create a new Tub with the same identity. TODO: actually
424        # test this, I don't know if dump/keypair.newCertificate is the right
425        # pair of methods.
426        return six.ensure_binary(self.myCertificate.dumpPEM())
427
428    def setLocation(self, *hints):
429        """Tell this service what its location is: a host:port description of
430        how to reach it from the outside world. You need to use this because
431        the Tub can't do it without help. If you do a
432        C{s.listenOn('tcp:1234')}, and the host is known as
433        C{foo.example.com}, then it would be appropriate to do::
434
435            s.setLocation('foo.example.com:1234')
436
437        You must set the location before you can register any references.
438
439        Tubs can have multiple location hints, just provide multiple
440        arguments. """
441
442        if self.locationHints:
443            raise PBError("Tub.setLocation() can only be called once")
444        self.locationHints = [six.ensure_str(hint) for hint in hints]
445        self._maybeCreateLogPortFURLFile()
446        self._maybeConnectToGatherer()
447
448    @deprecated(Version("Foolscap", 0, 12, 0),
449                # "please use .."
450                "user-provided hostnames")
451    def setLocationAutomatically(self, *extra_addresses):
452        """Determine one of this host's publically-visible IP addresses and
453        use it to set our location. This uses whatever source address would
454        be used to get to a well-known public host (A.ROOT-SERVERS.NET),
455        which is effectively the interface on which a default route lives.
456        This is neither very pretty (IP address instead of hostname) nor
457        guaranteed to work (it may very well be a 192.168 'private' address),
458        but for publically-visible hosts this will probably produce a useable
459        FURL.
460
461        This method returns a Deferred that will fire once the location is
462        actually established. Calls to registerReference() must be put off
463        until the location has been set. And of course, you must call
464        listenOn() before calling setLocationAutomatically()."""
465
466        # first, make sure the reactor is actually running, by using the
467        # eventual-send queue
468        d = eventual.fireEventually()
469
470        def _reactor_running(res):
471            assert self.running
472            # we can't use get_local_ip_for until the reactor is running
473            return util.get_local_ip_for()
474        d.addCallback(_reactor_running)
475
476        def _got_local_ip(local_address):
477            local_addresses = set(extra_addresses)
478            if local_address:
479                local_addresses.add(local_address)
480            local_addresses.add("127.0.0.1")
481            locations = set()
482            for l in self.getListeners():
483                portnum = l.getPortnum()
484                for addr in local_addresses:
485                    locations.add("%s:%d" % (addr, portnum))
486            locations = list(locations)
487            locations.sort()
488            assert len(locations) >= 1
489            location = ",".join(locations)
490            self.setLocation(location)
491        d.addCallback(_got_local_ip)
492        return d
493
494    def listenOn(self, what, _test_options={}):
495        """Start listening for connections.
496
497        @type  what: string
498        @param what: a L{twisted.internet.endpoints.serverFromString} -style
499                     description
500        @param _test_options: a dictionary of options that can influence
501                              connection negotiation before the target Tub
502                              has been determined
503
504        @return: The Listener object that was created. This can be used to
505        stop listening later on."""
506
507        if isinstance(what, (six.binary_type, six.text_type)):
508            what = six.ensure_str(what)
509
510        if what in ("0", "tcp:0"):
511            warningString = ("Tub.listenOn('tcp:0') was deprecated "
512                             "in Foolscap 0.12.0; please use pre-allocated "
513                             "port numbers instead")
514            warn(warningString, DeprecationWarning, stacklevel=2)
515
516        if isinstance(what, six.string_types) and re.search(r"^\d+$", what):
517            warn("Tub.listenOn('12345') was deprecated "
518                 "in Foolscap 0.12.0; please use qualified endpoint "
519                 "descriptions like 'tcp:12345'",
520                 DeprecationWarning, stacklevel=2)
521            what = "tcp:%s" % what
522
523        l = Listener(self, what, _test_options, self.negotiationClass)
524        self.listeners.append(l)
525        l.setServiceParent(self)
526        return l
527
528    def stopListeningOn(self, l):
529        # this returns a Deferred when the port is shut down
530        self.listeners.remove(l)
531        return l.disownServiceParent()
532
533    def getListeners(self):
534        """Return the set of Listener objects that allow the outside world to
535        connect to this Tub."""
536        return self.listeners[:]
537
538    def getTubID(self):
539        return self.tubID
540    def getShortTubID(self):
541        return self.tubID[:4]
542
543    def getConnectionInfoForFURL(self, furl):
544        try:
545            tubref = SturdyRef(furl).getTubRef()
546        except (ValueError, BadFURLError):
547            return None # unparseable FURL
548        return self._getConnectionInfoForTubRef(tubref)
549
550    def _getConnectionInfoForTubRef(self, tubref):
551        if tubref in self.brokers:
552            return self.brokers[tubref].getConnectionInfo()
553        if tubref in self.tubConnectors:
554            return self.tubConnectors[tubref].getConnectionInfo()
555        return None # currently have no established or in-progress connection
556
557    def connectorStarted(self, c):
558        assert self.running
559        # TODO: why a list? shouldn't there only ever be one TubConnector?
560        self._activeConnectors.append(c)
561    def connectorFinished(self, c):
562        if c in self._activeConnectors:
563            self._activeConnectors.remove(c)
564
565    def startService(self):
566        service.MultiService.startService(self)
567        for d,sturdy in self._pending_getReferences:
568            d1 = eventual.fireEventually(sturdy)
569            d1.addCallback(self.getReference)
570            d1.addBoth(lambda res, d=d: d.callback(res))
571        del self._pending_getReferences
572        for rc in self.reconnectors:
573            eventual.eventually(rc.startConnecting, self)
574
575    def _tubsAreNotRestartable(self, *args, **kwargs):
576        raise RuntimeError("Sorry, but Tubs cannot be restarted.")
577    def _tubHasBeenShutDown(self, *args, **kwargs):
578        raise RuntimeError("Sorry, but this Tub has been shut down.")
579
580    def stopService(self):
581        # note that once you stopService a Tub, I cannot be restarted. (at
582        # least this code is not designed to make that possible.. it might be
583        # doable in the future).
584        assert self.running
585        self.startService = self._tubsAreNotRestartable
586        self.getReference = self._tubHasBeenShutDown
587        self.connectTo = self._tubHasBeenShutDown
588
589        # Tell everything to shut down now. We assume that it will stop
590        # twitching by the next tick, so Trial unit tests won't complain
591        # about a dirty reactor. We wait on a few things that might not
592        # behave.
593        dl = []
594        for rc in list(self.reconnectors):
595            rc.stopConnecting()
596        del self.reconnectors
597        for c in list(self._activeConnectors):
598            c.shutdown()
599        why = Failure(error.ConnectionDone("Tub.stopService was called"))
600        for b in list(self.brokers.values()):
601            broker_disconnected = defer.Deferred()
602            dl.append(broker_disconnected)
603            b._notifyOnConnectionLost(
604                lambda d=broker_disconnected: d.callback(None)
605            )
606            b.shutdown(why, fireDisconnectWatchers=False)
607
608        d = defer.DeferredList(dl)
609        d.addCallback(lambda _: service.MultiService.stopService(self))
610        d.addCallback(eventual.fireEventually)
611        return d
612
613    def generateSwissnumber(self, bits):
614        return generateSwissnumber(bits)
615
616    def buildURL(self, name):
617        # TODO: IPv6 dotted-quad addresses have colons, but need to have
618        # host:port
619        hints = ",".join(self.locationHints)
620        return "pb://" + self.tubID + "@" + hints + "/" + name
621        #hints = b",".join(self.locationHints)
622        #return b"pb://" + self.tubID + b"@" + hints + b"/" + name
623
624    def registerReference(self, ref, name=None, furlFile=None):
625        """Make a Referenceable available to the outside world. A URL is
626        returned which can be used to access this object. This registration
627        will remain in effect (and the Tub will retain a reference to the
628        object to keep it meaningful) until explicitly unregistered, or the
629        Tub is shut down.
630
631        @type  name: string (optional)
632        @param name: if provided, the object will be registered with this
633                     name. If not, a random (unguessable) string will be
634                     used.
635
636        @param furlFile: if provided, get the name from this file (if
637                         it exists), and write the new FURL to this file.
638                         If 'name=' is also provided, it is used for the
639                         name, but the FURL is still written to this file.
640
641        @rtype: string
642        @return: the URL which points to this object. This URL can be passed
643        to Tub.getReference() in any Tub on any host which can reach this
644        one.
645        """
646
647        if not self.locationHints:
648            raise NoLocationError("you must setLocation() before "
649                                  "you can registerReference()")
650        oldfurl = None
651        if furlFile:
652            try:
653                oldfurl = open(furlFile, "r").read().strip()
654            except EnvironmentError:
655                pass
656        if oldfurl:
657            sr = SturdyRef(oldfurl)
658            if name is None:
659                name = sr.name
660            if self.tubID != sr.tubID:
661                raise WrongTubIdError("I cannot keep using the old FURL from %s"
662                                      " because it does not have the same"
663                                      " TubID as I do (%s)" %
664                                      (furlFile, self.tubID))
665            if name != sr.name:
666                raise WrongNameError("I cannot keep using the old FURL from %s"
667                                     " because you called registerReference"
668                                     " with a new name (%s)" %
669                                     (furlFile, name))
670        name = self._assignName(ref, name)
671        assert name
672        if ref not in self.strongReferences:
673            self.strongReferences.append(ref)
674        furl = self.buildURL(name)
675        if furlFile:
676            need_to_chmod = not os.path.exists(furlFile)
677            f = open(furlFile, "w")
678            f.write(furl + "\n")
679            f.close()
680            if need_to_chmod:
681                # XXX: open-to-chmod race here
682                os.chmod(furlFile, 0o600)
683        return furl
684
685    # this is called by either registerReference or by
686    # getOrCreateURLForReference
687    def _assignName(self, ref, preferred_name=None):
688        """Make a Referenceable available to the outside world, but do not
689        retain a strong reference to it. If we must create a new name, use
690        preferred_name. If that is None, use a random unguessable name.
691        """
692        if not self.locationHints:
693            # without a location, there is no point in giving it a name
694            return None
695        if ref in self.referenceToName:
696            return self.referenceToName[ref]
697        name = preferred_name
698        if not name:
699            name = self.generateSwissnumber(self.NAMEBITS)
700        self.referenceToName[ref] = name
701        self.nameToReference[name] = ref
702        return name
703
704    def getReferenceForName(self, name):
705        if name in self.nameToReference:
706            return self.nameToReference[name]
707        for lookup in self.nameLookupHandlers:
708            ref = lookup(name)
709            if ref:
710                if ref not in self.referenceToName:
711                    self.referenceToName[ref] = name
712                return ref
713        # don't reveal the full swissnum
714        hint = name[:2]
715        raise KeyError("unable to find reference for name starting with '%s'"
716                       % hint)
717
718    def getReferenceForURL(self, url):
719        # TODO: who should this be used by?
720        sturdy = SturdyRef(url)
721        assert sturdy.tubID == self.tubID
722        return self.getReferenceForName(sturdy.name)
723
724    def getOrCreateURLForReference(self, ref):
725        """Return the global URL for the reference, if there is one, or None
726        if there is not."""
727        name = self._assignName(ref)
728        if name:
729            return self.buildURL(name)
730        return None
731
732    def revokeReference(self, ref):
733        # TODO
734        pass
735
736    def unregisterURL(self, url):
737        sturdy = SturdyRef(url)
738        name = sturdy.name
739        ref = self.nameToReference[name]
740        del self.nameToReference[name]
741        del self.referenceToName[ref]
742        self.revokeReference(ref)
743
744    def unregisterReference(self, ref):
745        name = self.referenceToName[ref]
746        url = self.buildURL(name)
747        sturdy = SturdyRef(url)
748        name = sturdy.name
749        del self.nameToReference[name]
750        del self.referenceToName[ref]
751        if ref in self.strongReferences:
752            self.strongReferences.remove(ref)
753        self.revokeReference(ref)
754
755    def registerNameLookupHandler(self, lookup):
756        """Add a function to help convert names to Referenceables.
757
758        When remote systems pass a FURL to their Tub.getReference(), our Tub
759        will be asked to locate a Referenceable for the name inside that
760        furl. The normal mechanism for this is to look at the table
761        maintained by registerReference() and unregisterReference(). If the
762        name does not exist in that table, other 'lookup handler' functions
763        are given a chance. Each lookup handler is asked in turn, and the
764        first which returns a non-None value wins.
765
766        This may be useful for cases where the furl represents an object that
767        lives on disk, or is generated on demand: rather than creating all
768        possible Referenceables at startup, the lookup handler can create or
769        retrieve the objects only when someone asks for them.
770
771        Note that constructing the FURLs of these objects may be non-trivial.
772        It is safe to create an object, use tub.registerReference in one
773        invocation of a program to obtain (and publish) the furl, parse the
774        furl to extract the name, save the contents of the object on disk,
775        then in a later invocation of the program use a lookup handler to
776        retrieve the object from disk. This approach means the objects that
777        are created in a given invocation stick around (inside
778        tub.strongReferences) for the rest of that invocation. An alternatve
779        approach is to create the object but *not* use tub.registerReference,
780        but in that case you have to construct the FURL yourself, and the Tub
781        does not currently provide any support for doing this robustly.
782
783        @param lookup: a callable which accepts a name (as a string) and
784                       returns either a Referenceable or None. Note that
785                       these strings should not contain a slash, a question
786                       mark, or an ampersand, as these are reserved in the
787                       FURL for later expansion (to add parameters beyond the
788                       object name)
789        """
790        self.nameLookupHandlers.append(lookup)
791
792    def unregisterNameLookupHandler(self, lookup):
793        self.nameLookupHandlers.remove(lookup)
794
795    def getReference(self, sturdyOrURL):
796        """Acquire a RemoteReference for the given SturdyRef/URL.
797
798        The Tub must be running (i.e. Tub.startService()) when this is
799        invoked. Future releases may relax this requirement.
800
801        @return: a Deferred that fires with the RemoteReference. Any failures
802        are returned asynchronously.
803        """
804
805        return defer.maybeDeferred(self._getReference, sturdyOrURL)
806
807    def _getReference(self, sturdyOrURL):
808        if isinstance(sturdyOrURL, SturdyRef):
809            sturdy = sturdyOrURL
810        else:
811            sturdyOrURL = six.ensure_str(sturdyOrURL)
812            sturdy = SturdyRef(sturdyOrURL)
813
814        if not self.running:
815            # queue their request for service once the Tub actually starts
816            log.msg("Tub.getReference(%s) queued until Tub.startService called"
817                    % sturdy, facility="foolscap.tub")
818            d = defer.Deferred()
819            self._pending_getReferences.append((d, sturdy))
820            return d
821
822        name = sturdy.name
823        d = self.getBrokerForTubRef(sturdy.getTubRef())
824        d.addCallback(lambda b: b.getYourReferenceByName(name))
825        return d
826
827    def connectTo(self, _furl, _cb, *args, **kwargs):
828        """Establish (and maintain) a connection to a given PBURL.
829
830        I establish a connection to the PBURL and run a callback to inform
831        the caller about the newly-available RemoteReference. If the
832        connection is lost, I schedule a reconnection attempt for the near
833        future. If that one fails, I keep trying at longer and longer
834        intervals (exponential backoff).
835
836        I accept a callback which will be fired each time a connection
837        attempt succeeds. This callback is run with the new RemoteReference
838        and any additional args/kwargs provided to me. The callback should
839        then use rref.notifyOnDisconnect() to get a message when the
840        connection goes away. At some point after it goes away, the
841        Reconnector will reconnect.
842
843        The Tub must be running (i.e. Tub.startService()) when this is
844        invoked. Future releases may relax this requirement.
845
846        I return a Reconnector object. When you no longer want to maintain
847        this connection, call the stopConnecting() method on the Reconnector.
848        I promise to not invoke your callback after you've called
849        stopConnecting(), even if there was already a connection attempt in
850        progress. If you had an active connection before calling
851        stopConnecting(), you will still have access to it, until it breaks
852        on its own. (I will not attempt to break existing connections, I will
853        merely stop trying to create new ones). All my Reconnector objects
854        will be shut down when the Tub is stopped.
855
856        Usage::
857
858         def _got_ref(rref, arg1, arg2):
859             rref.callRemote('hello again')
860             # etc
861         rc = tub.connectTo(_got_ref, 'arg1', 'arg2')
862         ...
863         rc.stopConnecting() # later
864        """
865
866        rc = Reconnector(_furl, _cb, args, kwargs)
867        if self.running:
868            rc.startConnecting(self)
869        else:
870            self.log("Tub.connectTo(%s) queued until Tub.startService called"
871                     % _furl, level=UNUSUAL)
872        self.reconnectors.append(rc)
873        return rc
874
875    def serialize(self, obj):
876        b = broker.StorageBroker(None)
877        b.setTub(self)
878        d = storage.serialize(obj, banana=b)
879        return d
880
881    def unserialize(self, data):
882        b = broker.StorageBroker(None)
883        b.setTub(self)
884        d = storage.unserialize(data, banana=b)
885        assert isinstance(d, defer.Deferred)
886        return d
887
888    # beyond here are internal methods, not for use by application code
889
890    # _removeReconnector is called by the Reconnector
891    def _removeReconnector(self, rc):
892        self.reconnectors.remove(rc)
893
894    def getBrokerForTubRef(self, tubref):
895        if tubref in self.brokers:
896            return defer.succeed(self.brokers[tubref])
897        if tubref.getTubID() == self.tubID:
898            b = self._createLoopbackBroker(tubref)
899            # _createLoopbackBroker will call brokerAttached, which will add
900            # it to self.brokers
901            # TODO: stash this in self.brokers, so we don't create multiples
902            return defer.succeed(b)
903
904        d = defer.Deferred()
905        if tubref not in self.waitingForBrokers:
906            self.waitingForBrokers[tubref] = []
907        self.waitingForBrokers[tubref].append(d)
908
909        if tubref not in self.tubConnectors:
910            # the TubConnector will call our brokerAttached when it finishes
911            # negotiation, which will fire waitingForBrokers[tubref].
912            c = connection.TubConnector(self, tubref, self._connectionHandlers)
913            self.tubConnectors[tubref] = c
914            c.connect()
915
916        return d
917
918    def _createLoopbackBroker(self, tubref):
919        t1,t2 = broker.LoopbackTransport(), broker.LoopbackTransport()
920        t1.setPeer(t2); t2.setPeer(t1)
921        n = negotiate.Negotiation()
922        params = n.loopbackDecision()
923        ci = info.ConnectionInfo()
924        b1 = self.brokerClass(tubref, params, connectionInfo=ci)
925        b2 = self.brokerClass(tubref, params)
926        # we treat b1 as "our" broker, and b2 as "theirs", and we pretend
927        # that b2 has just connected to us. We keep track of b1, and b2 keeps
928        # track of us.
929        b1.setTub(self)
930        b2.setTub(self)
931        t1.protocol = b1; t2.protocol = b2
932        b1.makeConnection(t1); b2.makeConnection(t2)
933        ci._set_connected(True)
934        ci._set_winning_hint("loopback")
935        ci._set_connection_status("loopback", "connected")
936        ci._set_established_at(b1.creation_timestamp)
937        self.brokerAttached(tubref, b1, False)
938        return b1
939
940    def connectionFailed(self, tubref, why):
941        # we previously initiated an outbound TubConnector to this tubref, but
942        # it was unable to establish a connection. 'why' is the most useful
943        # Failure that occurred (i.e. it is a NegotiationError if we made it
944        # that far, otherwise it's a ConnectionFailed).
945
946        if tubref in self.tubConnectors:
947            del self.tubConnectors[tubref]
948        if tubref in self.brokers:
949            # oh, but fortunately an inbound connection must have succeeded.
950            # Nevermind.
951            return
952
953        # inform hopeful Broker-waiters that they aren't getting one
954        if tubref in self.waitingForBrokers:
955            waiting = self.waitingForBrokers[tubref]
956            del self.waitingForBrokers[tubref]
957            for d in waiting:
958                d.errback(why)
959
960    def brokerAttached(self, tubref, broker, isClient):
961        assert self.running
962        assert tubref
963
964        if tubref in self.tubConnectors:
965            # we initiated an outbound connection to this tubref
966            if not isClient:
967                # however, the connection we got was from an inbound
968                # connection. The completed (inbound) connection wins, so
969                # abandon the outbound TubConnector
970                self.tubConnectors[tubref].shutdown()
971
972            # we don't need the TubConnector any more
973            del self.tubConnectors[tubref]
974
975        if tubref in self.brokers:
976            # this shouldn't happen: acceptDecision is supposed to drop any
977            # existing old connection first.
978            self.log("ERROR: unexpected duplicate connection from %s" % tubref)
979            raise BananaError("unexpected duplicate connection")
980        self.brokers[tubref] = broker
981
982        # now inform everyone who's been waiting on it
983        if tubref in self.waitingForBrokers:
984            for d in self.waitingForBrokers[tubref]:
985                eventual.eventually(d.callback, broker)
986            del self.waitingForBrokers[tubref]
987
988    def brokerDetached(self, broker, why):
989        # a loopback connection will produce two Brokers that both use the
990        # same tubref. Both will shut down about the same time. Make sure
991        # this doesn't confuse us.
992
993        # the Broker will have already severed all active references
994        for tubref in list(self.brokers.keys()):
995            if self.brokers[tubref] is broker:
996                del self.brokers[tubref]
997
998    def debug_listBrokers(self):
999        # return a list of (tubref, inbound, outbound) tuples. The tubref
1000        # tells you which broker this is, 'inbound' is a list of
1001        # InboundDelivery objects (one per outstanding inbound message), and
1002        # 'outbound' is a list of PendingRequest objects (one per message
1003        # that's waiting on a remote broker to complete).
1004        output = []
1005        all_brokers = list(self.brokers.items())
1006        for tubref,_broker in all_brokers:
1007            inbound = _broker.inboundDeliveryQueue[:]
1008            outbound = [pr
1009                        for (reqID, pr) in
1010                        sorted(_broker.waitingForAnswers.items()) ]
1011            output.append( (str(tubref), inbound, outbound) )
1012        return output
1013