1# -*- test-case-name: foolscap.test.test_tub -*-
2
3from __future__ import print_function
4import os.path
5from twisted.trial import unittest
6from twisted.internet import defer
7from twisted.application import service
8from twisted.python import log, failure
9from twisted.test.proto_helpers import StringTransport
10
11from foolscap.api import Tub, SturdyRef, Referenceable
12from foolscap.furl import encode_furl
13from foolscap.referenceable import RemoteReference
14from foolscap.eventual import eventually, fireEventually, flushEventualQueue
15from foolscap.util import allocate_tcp_port
16from foolscap.test.common import HelperTarget, TargetMixin, ShouldFailMixin, \
17     StallMixin, MakeTubsMixin
18from foolscap.tokens import WrongTubIdError, PBError, NoLocationHintsError, \
19    NoLocationError
20
21# create this data with:
22#  t = Tub()
23#  print t.getCertData()
24CERT_TUBID = "kyc7sslzzyl4evmk7imxrdfcdzvq7qjk"
25CERT_DATA = """\
26-----BEGIN CERTIFICATE-----
27MIIBnjCCAQcCAgCEMA0GCSqGSIb3DQEBBAUAMBcxFTATBgNVBAMUDG5ld3BiX3Ro
28aW5neTAeFw0wOTA1MTkwMTEyMDNaFw0xMDA1MTkwMTEyMDNaMBcxFTATBgNVBAMU
29DG5ld3BiX3RoaW5neTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA0xVspHI+
30YPkkBAposW5G3CBA8fa8kqBeBoqIiGfJq7uDrT4MYqe96DOs6ehd/1MTtbvK0mhd
314BDOurMS/+rBdMDAcfZlM4PMq+aqNRLBobFHrVH+H6h7v3V7grEOeZkSSvJbJdXT
32xTKRu7AQrKXXAMHpOfMWfyZYDzYxKm4TY00CAwEAATANBgkqhkiG9w0BAQQFAAOB
33gQA2HfwiApHoIc50eq/KO8tQqXC1PLTnb3Q8wy5OK5PZuBPlafloBRjRw8I14tfq
342puvr61rQt6AEjXGrhhndg5d8KIvY6LzZT4AHFQ0L4iL8zJ/GAHSBVY88Q1r2PyD
35Dy8XFzPuxEo3WRzL2ncaFcPbYzsLFQBmwJaav725VFbTbg==
36-----END CERTIFICATE-----
37-----BEGIN RSA PRIVATE KEY-----
38MIICXAIBAAKBgQDTFWykcj5g+SQECmixbkbcIEDx9rySoF4GioiIZ8mru4OtPgxi
39p73oM6zp6F3/UxO1u8rSaF3gEM66sxL/6sF0wMBx9mUzg8yr5qo1EsGhsUetUf4f
40qHu/dXuCsQ55mRJK8lsl1dPFMpG7sBCspdcAwek58xZ/JlgPNjEqbhNjTQIDAQAB
41AoGAah63Q+V7nt0iUjW5dJpwXXKJtBvLqhudqcQz5//lz8Sx6oLrTx3tx7NTFzWP
42LDHkEtWanjWCHIfWpt4oiyjGoLWwon32wfgahEiDBKpmY61by/xo4RSDAzm5Oogu
43E4WGIPtpduc+GZf5C0m7zwhP0fC57MGfAX/xyctx6z7qzzECQQD2tJwvfkdSk+5f
44qvg7iUnP5mLcjKGjHFL8s9sIQysyjpwXloBgIWztuJdp5vFt0ojV+8NKUFxtmBmf
45yYpWPHe3AkEA2wlBCtzafGYNCSGiHfa/94M4Duf0dAua3hBQ9+Ld3ZD3KgBU5ZMC
46qRbm5ul8CKFmuwKGE//TWnX6JYbur6VVGwJAesCZKiR6FoOWyzFFvFHuUSzAKh8r
47Wf6A6E4RgQXy24AL+Myg6bQYAByl8kLABDYKcfaIUFS1+K4CqffdBlWl9wJAX3Ii
4846blljuqBoafbEsvz51gei5deYvtCkM15S742ynmamkGlZuAF0qhh5HKuMAMUgWB
49g4mBAfRS8rNfoy56bQJBAMShPEINsuumVaUnrEQg6g/misPPycO4MIEm5G1hHvli
50uXVWwCwZgjHHsG5+jhGheZjvKXl+RS71Z6dQjwOYkng=
51-----END RSA PRIVATE KEY-----
52"""
53
54class TestCertFile(unittest.TestCase):
55    def test_generate(self):
56        t = Tub()
57        certdata = t.getCertData()
58        self.assertTrue(b"BEGIN CERTIFICATE" in certdata)
59        self.assertTrue(b"PRIVATE KEY" in certdata)
60
61    def test_certdata(self):
62        t1 = Tub()
63        data1 = t1.getCertData()
64        t2 = Tub(certData=data1)
65        data2 = t2.getCertData()
66        self.assertTrue(data1 == data2)
67
68    def test_certfile(self):
69        fn = "test_tub.TestCertFile.certfile"
70        t1 = Tub(certFile=fn)
71        self.assertTrue(os.path.exists(fn))
72        data1 = t1.getCertData()
73
74        t2 = Tub(certFile=fn)
75        data2 = t2.getCertData()
76        self.assertTrue(data1 == data2)
77
78    def test_tubid(self):
79        t = Tub(certData=CERT_DATA)
80        self.assertEqual(t.getTubID(), CERT_TUBID)
81
82class SetLocation(unittest.TestCase):
83
84    def setUp(self):
85        self.s = service.MultiService()
86        self.s.startService()
87
88    def tearDown(self):
89        d = self.s.stopService()
90        d.addCallback(flushEventualQueue)
91        return d
92
93    def test_set_location(self):
94        t = Tub()
95        portnum = allocate_tcp_port()
96        t.listenOn("tcp:%d:interface=127.0.0.1" % portnum)
97        t.setServiceParent(self.s)
98        t.setLocation("127.0.0.1:12345")
99        # setLocation may only be called once
100        self.assertRaises(PBError, t.setLocation, "127.0.0.1:12345")
101
102    def test_unreachable(self):
103        t = Tub()
104        t.setServiceParent(self.s)
105        # we call neither .listenOn nor .setLocation
106        self.assertEqual(t.locationHints, [])
107        self.assertRaises(NoLocationError,
108                              t.registerReference, Referenceable())
109
110
111
112class FurlFile(unittest.TestCase):
113
114    def setUp(self):
115        self.s = service.MultiService()
116        self.s.startService()
117
118    def tearDown(self):
119        d = self.s.stopService()
120        d.addCallback(flushEventualQueue)
121        return d
122
123    def test_furlfile(self):
124        cfn = "test_tub.FurlFile.test_furlfile.certfile"
125        t1 = Tub(certFile=cfn)
126        t1.setServiceParent(self.s)
127        portnum = allocate_tcp_port()
128        port1 = "tcp:%d:interface=127.0.0.1" % portnum
129        t1.listenOn(port1)
130        t1.setLocation("127.0.0.1:%d" % portnum)
131        r1 = Referenceable()
132        ffn = "test_tub.FurlFile.test_furlfile.furlfile"
133        furl1 = t1.registerReference(r1, furlFile=ffn)
134        d = defer.maybeDeferred(t1.disownServiceParent)
135
136        self.assertTrue(os.path.exists(ffn))
137        self.assertEqual(furl1, open(ffn,"r").read().strip())
138
139        def _take2(res):
140            t2 = Tub(certFile=cfn)
141            t2.setServiceParent(self.s)
142            t2.listenOn(port1)
143            t2.setLocation("127.0.0.1:%d" % portnum)
144            r2 = Referenceable()
145            furl2 = t2.registerReference(r2, furlFile=ffn)
146            self.assertEqual(furl1, furl2)
147            return t2.disownServiceParent()
148        d.addCallback(_take2)
149        return d
150
151    def test_tubid_check(self):
152        t1 = Tub() # gets a new key
153        t1.setServiceParent(self.s)
154        portnum = allocate_tcp_port()
155        port1 = "tcp:%d:interface=127.0.0.1" % portnum
156        t1.listenOn(port1)
157        t1.setLocation("127.0.0.1:%d" % portnum)
158        r1 = Referenceable()
159        ffn = "test_tub.FurlFile.test_tubid_check.furlfile"
160        furl1 = t1.registerReference(r1, furlFile=ffn)
161        d = defer.maybeDeferred(t1.disownServiceParent)
162
163        self.assertTrue(os.path.exists(ffn))
164        self.assertEqual(furl1, open(ffn,"r").read().strip())
165
166        def _take2(res):
167            t2 = Tub() # gets a different key
168            t2.setServiceParent(self.s)
169            t2.listenOn(port1)
170            t2.setLocation("127.0.0.1:%d" % portnum)
171            r2 = Referenceable()
172            self.assertRaises(WrongTubIdError,
173                                  t2.registerReference, r2, furlFile=ffn)
174            return t2.disownServiceParent()
175        d.addCallback(_take2)
176        return d
177
178class QueuedStartup(TargetMixin, MakeTubsMixin, unittest.TestCase):
179    # calling getReference and connectTo before the Tub has started should
180    # put off network activity until the Tub is started.
181
182    def setUp(self):
183        TargetMixin.setUp(self)
184        (self.tubB,) = self.makeTubs(1)
185
186        self.barry = HelperTarget("barry")
187        self.barry_url = self.tubB.registerReference(self.barry)
188
189        self.bill = HelperTarget("bill")
190        self.bill_url = self.tubB.registerReference(self.bill)
191
192        self.bob = HelperTarget("bob")
193        self.bob_url = self.tubB.registerReference(self.bob)
194
195    def tearDown(self):
196        d = TargetMixin.tearDown(self)
197        def _more(res):
198            return defer.DeferredList([s.stopService() for s in self.services])
199        d.addCallback(_more)
200        d.addCallback(flushEventualQueue)
201        return d
202
203    def test_queued_getref(self):
204        t1 = Tub()
205        d1 = t1.getReference(self.barry_url)
206        d2 = t1.getReference(self.bill_url)
207        def _check(res):
208            ((barry_success, barry_rref),
209             (bill_success, bill_rref)) = res
210            self.assertTrue(barry_success)
211            self.assertTrue(bill_success)
212            self.assertTrue(isinstance(barry_rref, RemoteReference))
213            self.assertTrue(isinstance(bill_rref, RemoteReference))
214            self.assertFalse(barry_rref == bill_success)
215        dl = defer.DeferredList([d1, d2])
216        dl.addCallback(_check)
217        self.services.append(t1)
218        eventually(t1.startService)
219        return dl
220
221    def test_queued_reconnector(self):
222        t1 = Tub()
223        bill_connections = []
224        barry_connections = []
225        t1.connectTo(self.bill_url, bill_connections.append)
226        t1.connectTo(self.barry_url, barry_connections.append)
227        def _check():
228            if len(bill_connections) >= 1 and len(barry_connections) >= 1:
229                return True
230            return False
231        d = self.poll(_check)
232        def _validate(res):
233            self.assertTrue(isinstance(bill_connections[0], RemoteReference))
234            self.assertTrue(isinstance(barry_connections[0], RemoteReference))
235            self.assertFalse(bill_connections[0] == barry_connections[0])
236        d.addCallback(_validate)
237        self.services.append(t1)
238        eventually(t1.startService)
239        return d
240
241
242class NameLookup(TargetMixin, MakeTubsMixin, unittest.TestCase):
243
244    # test registerNameLookupHandler
245
246    def setUp(self):
247        TargetMixin.setUp(self)
248        self.tubA, self.tubB = self.makeTubs(2)
249        self.url_on_b = self.tubB.registerReference(Referenceable())
250        self.lookups = []
251        self.lookups2 = []
252        self.names = {}
253        self.names2 = {}
254
255    def tearDown(self):
256        d = TargetMixin.tearDown(self)
257        def _more(res):
258            return defer.DeferredList([s.stopService() for s in self.services])
259        d.addCallback(_more)
260        d.addCallback(flushEventualQueue)
261        return d
262
263    def lookup(self, name):
264        self.lookups.append(name)
265        return self.names.get(name, None)
266
267    def lookup2(self, name):
268        self.lookups2.append(name)
269        return self.names2.get(name, None)
270
271    def testNameLookup(self):
272        t1 = HelperTarget()
273        t2 = HelperTarget()
274        self.names["foo"] = t1
275        self.names2["bar"] = t2
276        self.names2["baz"] = t2
277        self.tubB.registerNameLookupHandler(self.lookup)
278        self.tubB.registerNameLookupHandler(self.lookup2)
279        # hack up a new furl pointing at the same tub but with a name that
280        # hasn't been registered.
281        s = SturdyRef(self.url_on_b)
282        s.name = "foo"
283
284        d = self.tubA.getReference(s)
285
286        def _check(res):
287            self.assertTrue(isinstance(res, RemoteReference))
288            self.assertEqual(self.lookups, ["foo"])
289            # the first lookup should short-circuit the process
290            self.assertEqual(self.lookups2, [])
291            self.lookups = []; self.lookups2 = []
292            s.name = "bar"
293            return self.tubA.getReference(s)
294        d.addCallback(_check)
295
296        def _check2(res):
297            self.assertTrue(isinstance(res, RemoteReference))
298            # if the first lookup fails, the second handler should be asked
299            self.assertEqual(self.lookups, ["bar"])
300            self.assertEqual(self.lookups2, ["bar"])
301            self.lookups = []; self.lookups2 = []
302            # make sure that loopbacks use this too
303            return self.tubB.getReference(s)
304        d.addCallback(_check2)
305
306        def _check3(res):
307            self.assertTrue(isinstance(res, RemoteReference))
308            self.assertEqual(self.lookups, ["bar"])
309            self.assertEqual(self.lookups2, ["bar"])
310            self.lookups = []; self.lookups2 = []
311            # and make sure we can de-register handlers
312            self.tubB.unregisterNameLookupHandler(self.lookup)
313            s.name = "baz"
314            return self.tubA.getReference(s)
315        d.addCallback(_check3)
316
317        def _check4(res):
318            self.assertTrue(isinstance(res, RemoteReference))
319            self.assertEqual(self.lookups, [])
320            self.assertEqual(self.lookups2, ["baz"])
321            self.lookups = []; self.lookups2 = []
322        d.addCallback(_check4)
323
324        return d
325
326class Shutdown(unittest.TestCase, ShouldFailMixin):
327    def test_doublestop(self):
328        tub = Tub()
329        tub.startService()
330        d = tub.stopService()
331        d.addCallback(lambda res:
332                      self.shouldFail(RuntimeError,
333                                      "test_doublestop_startService",
334                                      "Sorry, but Tubs cannot be restarted",
335                                      tub.startService))
336        d.addCallback(lambda res:
337                      self.shouldFail(RuntimeError,
338                                      "test_doublestop_getReference",
339                                      "Sorry, but this Tub has been shut down",
340                                      tub.getReference, "furl"))
341        d.addCallback(lambda res:
342                      self.shouldFail(RuntimeError,
343                                      "test_doublestop_connectTo",
344                                      "Sorry, but this Tub has been shut down",
345                                      tub.connectTo, "furl", None))
346        return d
347
348
349    def test_wait_for_brokers(self):
350        """
351        The L{Deferred} returned by L{Tub.stopService} fires only after the
352        L{Broker} connections belonging to the L{Tub} have disconnected.
353        """
354        tub = Tub()
355        tub.startService()
356
357        another_tub = Tub()
358        another_tub.startService()
359
360        brokers = list(tub.brokerClass(None) for i in range(3))
361        for n, b in enumerate(brokers):
362            b.makeConnection(StringTransport())
363            ref = SturdyRef(encode_furl(another_tub.tubID, [], str(n)))
364            tub.brokerAttached(ref, b, isClient=(n % 2)==1)
365
366        stopping = tub.stopService()
367        d = flushEventualQueue()
368
369        def event(ignored):
370            self.assertNoResult(stopping)
371            for b in brokers:
372                b.connectionLost(failure.Failure(Exception("Connection lost")))
373            return flushEventualQueue()
374        d.addCallback(event)
375
376        def connectionsLost(ignored):
377            self.successResultOf(stopping)
378        d.addCallback(connectionsLost)
379
380        return d
381
382
383class Receiver(Referenceable):
384    def __init__(self, tub):
385        self.tub = tub
386        self.done_d = defer.Deferred()
387    def remote_one(self):
388        d = self.tub.stopService()
389        d.addBoth(lambda r: fireEventually(r))
390        d.addBoth(self.done_d.callback)
391    def remote_two(self):
392        msg = "Receiver.remote_two: I shouldn't be called"
393        print(msg)
394        f = failure.Failure(ValueError(msg))
395        log.err(f)
396
397class CancelPendingDeliveries(StallMixin, MakeTubsMixin, unittest.TestCase):
398    def setUp(self):
399        self.tubA, self.tubB = self.makeTubs(2)
400
401    def tearDown(self):
402        dl = [defer.succeed(None)]
403        if self.tubA.running:
404            dl.append(defer.maybeDeferred(self.tubA.stopService))
405        if self.tubB.running:
406            dl.append(defer.maybeDeferred(self.tubB.stopService))
407        d = defer.DeferredList(dl)
408        d.addCallback(flushEventualQueue)
409        return d
410
411    def test_cancel_pending_deliveries(self):
412        # when a Tub is stopped, any deliveries that were pending should be
413        # discarded. TubA sends remote_one+remote_two (and we hope they
414        # arrive in the same chunk). TubB responds to remote_one by shutting
415        # down. remote_two should be discarded. The bug was that remote_two
416        # would cause an unhandled error on the TubB side.
417
418        r = Receiver(self.tubB)
419        furl = self.tubB.registerReference(r)
420        d = self.tubA.getReference(furl)
421        def _go(rref):
422            # we want these two to get sent and received in the same hunk
423            rref.callRemoteOnly("one")
424            rref.callRemoteOnly("two")
425            return r.done_d
426        d.addCallback(_go)
427        # let remote_two do its log.err before we move on to the next test
428        d.addCallback(self.stall, 1.0)
429        return d
430
431class BadLocationFURL(unittest.TestCase):
432    def setUp(self):
433        self.s = service.MultiService()
434        self.s.startService()
435
436    def tearDown(self):
437        d = self.s.stopService()
438        d.addCallback(flushEventualQueue)
439        return d
440
441    def test_empty_location(self):
442        # bug #129: a FURL with no location hints causes a synchronous
443        # exception in Tub.getReference(), instead of an errback'ed Deferred.
444
445        tubA = Tub()
446        tubA.setServiceParent(self.s)
447        tubB = Tub()
448        tubB.setServiceParent(self.s)
449
450        # This is a hack to get a FURL with empty location hints. The correct
451        # way to make a Tub unreachable is to not call .setLocation() at all.
452        tubB.setLocation("")
453        r = Receiver(tubB)
454        furl = tubB.registerReference(r)
455        # the buggy behavior is that the following call raises an exception
456        d = tubA.getReference(furl)
457        # whereas it ought to return a Deferred
458        self.assertTrue(isinstance(d, defer.Deferred))
459        def _check(f):
460            self.assertTrue(isinstance(f, failure.Failure), f)
461            self.assertTrue(f.check(NoLocationHintsError), f)
462        d.addBoth(_check)
463        return d
464
465    def test_future(self):
466        tubA = Tub()
467        tubA.setServiceParent(self.s)
468        tubB = Tub()
469        tubB.setServiceParent(self.s)
470
471        # "future:stuff" is interpreted as a "location hint format from the
472        # future", which we're supposed to ignore, and are thus left with no
473        # hints
474        tubB.setLocation("future:stuff")
475        r = Receiver(tubB)
476        furl = tubB.registerReference(r)
477        # the buggy behavior is that the following call raises an exception
478        d = tubA.getReference(furl)
479        # whereas it ought to return a Deferred
480        self.assertTrue(isinstance(d, defer.Deferred))
481        def _check(f):
482            self.assertTrue(isinstance(f, failure.Failure), f)
483            self.assertTrue(f.check(NoLocationHintsError), f)
484        d.addBoth(_check)
485        return d
486