1# -*- test-case-name: foolscap.test.test_tub -*- 2 3from __future__ import print_function 4import os.path 5from twisted.trial import unittest 6from twisted.internet import defer 7from twisted.application import service 8from twisted.python import log, failure 9from twisted.test.proto_helpers import StringTransport 10 11from foolscap.api import Tub, SturdyRef, Referenceable 12from foolscap.furl import encode_furl 13from foolscap.referenceable import RemoteReference 14from foolscap.eventual import eventually, fireEventually, flushEventualQueue 15from foolscap.util import allocate_tcp_port 16from foolscap.test.common import HelperTarget, TargetMixin, ShouldFailMixin, \ 17 StallMixin, MakeTubsMixin 18from foolscap.tokens import WrongTubIdError, PBError, NoLocationHintsError, \ 19 NoLocationError 20 21# create this data with: 22# t = Tub() 23# print t.getCertData() 24CERT_TUBID = "kyc7sslzzyl4evmk7imxrdfcdzvq7qjk" 25CERT_DATA = """\ 26-----BEGIN CERTIFICATE----- 27MIIBnjCCAQcCAgCEMA0GCSqGSIb3DQEBBAUAMBcxFTATBgNVBAMUDG5ld3BiX3Ro 28aW5neTAeFw0wOTA1MTkwMTEyMDNaFw0xMDA1MTkwMTEyMDNaMBcxFTATBgNVBAMU 29DG5ld3BiX3RoaW5neTCBnzANBgkqhkiG9w0BAQEFAAOBjQAwgYkCgYEA0xVspHI+ 30YPkkBAposW5G3CBA8fa8kqBeBoqIiGfJq7uDrT4MYqe96DOs6ehd/1MTtbvK0mhd 314BDOurMS/+rBdMDAcfZlM4PMq+aqNRLBobFHrVH+H6h7v3V7grEOeZkSSvJbJdXT 32xTKRu7AQrKXXAMHpOfMWfyZYDzYxKm4TY00CAwEAATANBgkqhkiG9w0BAQQFAAOB 33gQA2HfwiApHoIc50eq/KO8tQqXC1PLTnb3Q8wy5OK5PZuBPlafloBRjRw8I14tfq 342puvr61rQt6AEjXGrhhndg5d8KIvY6LzZT4AHFQ0L4iL8zJ/GAHSBVY88Q1r2PyD 35Dy8XFzPuxEo3WRzL2ncaFcPbYzsLFQBmwJaav725VFbTbg== 36-----END CERTIFICATE----- 37-----BEGIN RSA PRIVATE KEY----- 38MIICXAIBAAKBgQDTFWykcj5g+SQECmixbkbcIEDx9rySoF4GioiIZ8mru4OtPgxi 39p73oM6zp6F3/UxO1u8rSaF3gEM66sxL/6sF0wMBx9mUzg8yr5qo1EsGhsUetUf4f 40qHu/dXuCsQ55mRJK8lsl1dPFMpG7sBCspdcAwek58xZ/JlgPNjEqbhNjTQIDAQAB 41AoGAah63Q+V7nt0iUjW5dJpwXXKJtBvLqhudqcQz5//lz8Sx6oLrTx3tx7NTFzWP 42LDHkEtWanjWCHIfWpt4oiyjGoLWwon32wfgahEiDBKpmY61by/xo4RSDAzm5Oogu 43E4WGIPtpduc+GZf5C0m7zwhP0fC57MGfAX/xyctx6z7qzzECQQD2tJwvfkdSk+5f 44qvg7iUnP5mLcjKGjHFL8s9sIQysyjpwXloBgIWztuJdp5vFt0ojV+8NKUFxtmBmf 45yYpWPHe3AkEA2wlBCtzafGYNCSGiHfa/94M4Duf0dAua3hBQ9+Ld3ZD3KgBU5ZMC 46qRbm5ul8CKFmuwKGE//TWnX6JYbur6VVGwJAesCZKiR6FoOWyzFFvFHuUSzAKh8r 47Wf6A6E4RgQXy24AL+Myg6bQYAByl8kLABDYKcfaIUFS1+K4CqffdBlWl9wJAX3Ii 4846blljuqBoafbEsvz51gei5deYvtCkM15S742ynmamkGlZuAF0qhh5HKuMAMUgWB 49g4mBAfRS8rNfoy56bQJBAMShPEINsuumVaUnrEQg6g/misPPycO4MIEm5G1hHvli 50uXVWwCwZgjHHsG5+jhGheZjvKXl+RS71Z6dQjwOYkng= 51-----END RSA PRIVATE KEY----- 52""" 53 54class TestCertFile(unittest.TestCase): 55 def test_generate(self): 56 t = Tub() 57 certdata = t.getCertData() 58 self.assertTrue(b"BEGIN CERTIFICATE" in certdata) 59 self.assertTrue(b"PRIVATE KEY" in certdata) 60 61 def test_certdata(self): 62 t1 = Tub() 63 data1 = t1.getCertData() 64 t2 = Tub(certData=data1) 65 data2 = t2.getCertData() 66 self.assertTrue(data1 == data2) 67 68 def test_certfile(self): 69 fn = "test_tub.TestCertFile.certfile" 70 t1 = Tub(certFile=fn) 71 self.assertTrue(os.path.exists(fn)) 72 data1 = t1.getCertData() 73 74 t2 = Tub(certFile=fn) 75 data2 = t2.getCertData() 76 self.assertTrue(data1 == data2) 77 78 def test_tubid(self): 79 t = Tub(certData=CERT_DATA) 80 self.assertEqual(t.getTubID(), CERT_TUBID) 81 82class SetLocation(unittest.TestCase): 83 84 def setUp(self): 85 self.s = service.MultiService() 86 self.s.startService() 87 88 def tearDown(self): 89 d = self.s.stopService() 90 d.addCallback(flushEventualQueue) 91 return d 92 93 def test_set_location(self): 94 t = Tub() 95 portnum = allocate_tcp_port() 96 t.listenOn("tcp:%d:interface=127.0.0.1" % portnum) 97 t.setServiceParent(self.s) 98 t.setLocation("127.0.0.1:12345") 99 # setLocation may only be called once 100 self.assertRaises(PBError, t.setLocation, "127.0.0.1:12345") 101 102 def test_unreachable(self): 103 t = Tub() 104 t.setServiceParent(self.s) 105 # we call neither .listenOn nor .setLocation 106 self.assertEqual(t.locationHints, []) 107 self.assertRaises(NoLocationError, 108 t.registerReference, Referenceable()) 109 110 111 112class FurlFile(unittest.TestCase): 113 114 def setUp(self): 115 self.s = service.MultiService() 116 self.s.startService() 117 118 def tearDown(self): 119 d = self.s.stopService() 120 d.addCallback(flushEventualQueue) 121 return d 122 123 def test_furlfile(self): 124 cfn = "test_tub.FurlFile.test_furlfile.certfile" 125 t1 = Tub(certFile=cfn) 126 t1.setServiceParent(self.s) 127 portnum = allocate_tcp_port() 128 port1 = "tcp:%d:interface=127.0.0.1" % portnum 129 t1.listenOn(port1) 130 t1.setLocation("127.0.0.1:%d" % portnum) 131 r1 = Referenceable() 132 ffn = "test_tub.FurlFile.test_furlfile.furlfile" 133 furl1 = t1.registerReference(r1, furlFile=ffn) 134 d = defer.maybeDeferred(t1.disownServiceParent) 135 136 self.assertTrue(os.path.exists(ffn)) 137 self.assertEqual(furl1, open(ffn,"r").read().strip()) 138 139 def _take2(res): 140 t2 = Tub(certFile=cfn) 141 t2.setServiceParent(self.s) 142 t2.listenOn(port1) 143 t2.setLocation("127.0.0.1:%d" % portnum) 144 r2 = Referenceable() 145 furl2 = t2.registerReference(r2, furlFile=ffn) 146 self.assertEqual(furl1, furl2) 147 return t2.disownServiceParent() 148 d.addCallback(_take2) 149 return d 150 151 def test_tubid_check(self): 152 t1 = Tub() # gets a new key 153 t1.setServiceParent(self.s) 154 portnum = allocate_tcp_port() 155 port1 = "tcp:%d:interface=127.0.0.1" % portnum 156 t1.listenOn(port1) 157 t1.setLocation("127.0.0.1:%d" % portnum) 158 r1 = Referenceable() 159 ffn = "test_tub.FurlFile.test_tubid_check.furlfile" 160 furl1 = t1.registerReference(r1, furlFile=ffn) 161 d = defer.maybeDeferred(t1.disownServiceParent) 162 163 self.assertTrue(os.path.exists(ffn)) 164 self.assertEqual(furl1, open(ffn,"r").read().strip()) 165 166 def _take2(res): 167 t2 = Tub() # gets a different key 168 t2.setServiceParent(self.s) 169 t2.listenOn(port1) 170 t2.setLocation("127.0.0.1:%d" % portnum) 171 r2 = Referenceable() 172 self.assertRaises(WrongTubIdError, 173 t2.registerReference, r2, furlFile=ffn) 174 return t2.disownServiceParent() 175 d.addCallback(_take2) 176 return d 177 178class QueuedStartup(TargetMixin, MakeTubsMixin, unittest.TestCase): 179 # calling getReference and connectTo before the Tub has started should 180 # put off network activity until the Tub is started. 181 182 def setUp(self): 183 TargetMixin.setUp(self) 184 (self.tubB,) = self.makeTubs(1) 185 186 self.barry = HelperTarget("barry") 187 self.barry_url = self.tubB.registerReference(self.barry) 188 189 self.bill = HelperTarget("bill") 190 self.bill_url = self.tubB.registerReference(self.bill) 191 192 self.bob = HelperTarget("bob") 193 self.bob_url = self.tubB.registerReference(self.bob) 194 195 def tearDown(self): 196 d = TargetMixin.tearDown(self) 197 def _more(res): 198 return defer.DeferredList([s.stopService() for s in self.services]) 199 d.addCallback(_more) 200 d.addCallback(flushEventualQueue) 201 return d 202 203 def test_queued_getref(self): 204 t1 = Tub() 205 d1 = t1.getReference(self.barry_url) 206 d2 = t1.getReference(self.bill_url) 207 def _check(res): 208 ((barry_success, barry_rref), 209 (bill_success, bill_rref)) = res 210 self.assertTrue(barry_success) 211 self.assertTrue(bill_success) 212 self.assertTrue(isinstance(barry_rref, RemoteReference)) 213 self.assertTrue(isinstance(bill_rref, RemoteReference)) 214 self.assertFalse(barry_rref == bill_success) 215 dl = defer.DeferredList([d1, d2]) 216 dl.addCallback(_check) 217 self.services.append(t1) 218 eventually(t1.startService) 219 return dl 220 221 def test_queued_reconnector(self): 222 t1 = Tub() 223 bill_connections = [] 224 barry_connections = [] 225 t1.connectTo(self.bill_url, bill_connections.append) 226 t1.connectTo(self.barry_url, barry_connections.append) 227 def _check(): 228 if len(bill_connections) >= 1 and len(barry_connections) >= 1: 229 return True 230 return False 231 d = self.poll(_check) 232 def _validate(res): 233 self.assertTrue(isinstance(bill_connections[0], RemoteReference)) 234 self.assertTrue(isinstance(barry_connections[0], RemoteReference)) 235 self.assertFalse(bill_connections[0] == barry_connections[0]) 236 d.addCallback(_validate) 237 self.services.append(t1) 238 eventually(t1.startService) 239 return d 240 241 242class NameLookup(TargetMixin, MakeTubsMixin, unittest.TestCase): 243 244 # test registerNameLookupHandler 245 246 def setUp(self): 247 TargetMixin.setUp(self) 248 self.tubA, self.tubB = self.makeTubs(2) 249 self.url_on_b = self.tubB.registerReference(Referenceable()) 250 self.lookups = [] 251 self.lookups2 = [] 252 self.names = {} 253 self.names2 = {} 254 255 def tearDown(self): 256 d = TargetMixin.tearDown(self) 257 def _more(res): 258 return defer.DeferredList([s.stopService() for s in self.services]) 259 d.addCallback(_more) 260 d.addCallback(flushEventualQueue) 261 return d 262 263 def lookup(self, name): 264 self.lookups.append(name) 265 return self.names.get(name, None) 266 267 def lookup2(self, name): 268 self.lookups2.append(name) 269 return self.names2.get(name, None) 270 271 def testNameLookup(self): 272 t1 = HelperTarget() 273 t2 = HelperTarget() 274 self.names["foo"] = t1 275 self.names2["bar"] = t2 276 self.names2["baz"] = t2 277 self.tubB.registerNameLookupHandler(self.lookup) 278 self.tubB.registerNameLookupHandler(self.lookup2) 279 # hack up a new furl pointing at the same tub but with a name that 280 # hasn't been registered. 281 s = SturdyRef(self.url_on_b) 282 s.name = "foo" 283 284 d = self.tubA.getReference(s) 285 286 def _check(res): 287 self.assertTrue(isinstance(res, RemoteReference)) 288 self.assertEqual(self.lookups, ["foo"]) 289 # the first lookup should short-circuit the process 290 self.assertEqual(self.lookups2, []) 291 self.lookups = []; self.lookups2 = [] 292 s.name = "bar" 293 return self.tubA.getReference(s) 294 d.addCallback(_check) 295 296 def _check2(res): 297 self.assertTrue(isinstance(res, RemoteReference)) 298 # if the first lookup fails, the second handler should be asked 299 self.assertEqual(self.lookups, ["bar"]) 300 self.assertEqual(self.lookups2, ["bar"]) 301 self.lookups = []; self.lookups2 = [] 302 # make sure that loopbacks use this too 303 return self.tubB.getReference(s) 304 d.addCallback(_check2) 305 306 def _check3(res): 307 self.assertTrue(isinstance(res, RemoteReference)) 308 self.assertEqual(self.lookups, ["bar"]) 309 self.assertEqual(self.lookups2, ["bar"]) 310 self.lookups = []; self.lookups2 = [] 311 # and make sure we can de-register handlers 312 self.tubB.unregisterNameLookupHandler(self.lookup) 313 s.name = "baz" 314 return self.tubA.getReference(s) 315 d.addCallback(_check3) 316 317 def _check4(res): 318 self.assertTrue(isinstance(res, RemoteReference)) 319 self.assertEqual(self.lookups, []) 320 self.assertEqual(self.lookups2, ["baz"]) 321 self.lookups = []; self.lookups2 = [] 322 d.addCallback(_check4) 323 324 return d 325 326class Shutdown(unittest.TestCase, ShouldFailMixin): 327 def test_doublestop(self): 328 tub = Tub() 329 tub.startService() 330 d = tub.stopService() 331 d.addCallback(lambda res: 332 self.shouldFail(RuntimeError, 333 "test_doublestop_startService", 334 "Sorry, but Tubs cannot be restarted", 335 tub.startService)) 336 d.addCallback(lambda res: 337 self.shouldFail(RuntimeError, 338 "test_doublestop_getReference", 339 "Sorry, but this Tub has been shut down", 340 tub.getReference, "furl")) 341 d.addCallback(lambda res: 342 self.shouldFail(RuntimeError, 343 "test_doublestop_connectTo", 344 "Sorry, but this Tub has been shut down", 345 tub.connectTo, "furl", None)) 346 return d 347 348 349 def test_wait_for_brokers(self): 350 """ 351 The L{Deferred} returned by L{Tub.stopService} fires only after the 352 L{Broker} connections belonging to the L{Tub} have disconnected. 353 """ 354 tub = Tub() 355 tub.startService() 356 357 another_tub = Tub() 358 another_tub.startService() 359 360 brokers = list(tub.brokerClass(None) for i in range(3)) 361 for n, b in enumerate(brokers): 362 b.makeConnection(StringTransport()) 363 ref = SturdyRef(encode_furl(another_tub.tubID, [], str(n))) 364 tub.brokerAttached(ref, b, isClient=(n % 2)==1) 365 366 stopping = tub.stopService() 367 d = flushEventualQueue() 368 369 def event(ignored): 370 self.assertNoResult(stopping) 371 for b in brokers: 372 b.connectionLost(failure.Failure(Exception("Connection lost"))) 373 return flushEventualQueue() 374 d.addCallback(event) 375 376 def connectionsLost(ignored): 377 self.successResultOf(stopping) 378 d.addCallback(connectionsLost) 379 380 return d 381 382 383class Receiver(Referenceable): 384 def __init__(self, tub): 385 self.tub = tub 386 self.done_d = defer.Deferred() 387 def remote_one(self): 388 d = self.tub.stopService() 389 d.addBoth(lambda r: fireEventually(r)) 390 d.addBoth(self.done_d.callback) 391 def remote_two(self): 392 msg = "Receiver.remote_two: I shouldn't be called" 393 print(msg) 394 f = failure.Failure(ValueError(msg)) 395 log.err(f) 396 397class CancelPendingDeliveries(StallMixin, MakeTubsMixin, unittest.TestCase): 398 def setUp(self): 399 self.tubA, self.tubB = self.makeTubs(2) 400 401 def tearDown(self): 402 dl = [defer.succeed(None)] 403 if self.tubA.running: 404 dl.append(defer.maybeDeferred(self.tubA.stopService)) 405 if self.tubB.running: 406 dl.append(defer.maybeDeferred(self.tubB.stopService)) 407 d = defer.DeferredList(dl) 408 d.addCallback(flushEventualQueue) 409 return d 410 411 def test_cancel_pending_deliveries(self): 412 # when a Tub is stopped, any deliveries that were pending should be 413 # discarded. TubA sends remote_one+remote_two (and we hope they 414 # arrive in the same chunk). TubB responds to remote_one by shutting 415 # down. remote_two should be discarded. The bug was that remote_two 416 # would cause an unhandled error on the TubB side. 417 418 r = Receiver(self.tubB) 419 furl = self.tubB.registerReference(r) 420 d = self.tubA.getReference(furl) 421 def _go(rref): 422 # we want these two to get sent and received in the same hunk 423 rref.callRemoteOnly("one") 424 rref.callRemoteOnly("two") 425 return r.done_d 426 d.addCallback(_go) 427 # let remote_two do its log.err before we move on to the next test 428 d.addCallback(self.stall, 1.0) 429 return d 430 431class BadLocationFURL(unittest.TestCase): 432 def setUp(self): 433 self.s = service.MultiService() 434 self.s.startService() 435 436 def tearDown(self): 437 d = self.s.stopService() 438 d.addCallback(flushEventualQueue) 439 return d 440 441 def test_empty_location(self): 442 # bug #129: a FURL with no location hints causes a synchronous 443 # exception in Tub.getReference(), instead of an errback'ed Deferred. 444 445 tubA = Tub() 446 tubA.setServiceParent(self.s) 447 tubB = Tub() 448 tubB.setServiceParent(self.s) 449 450 # This is a hack to get a FURL with empty location hints. The correct 451 # way to make a Tub unreachable is to not call .setLocation() at all. 452 tubB.setLocation("") 453 r = Receiver(tubB) 454 furl = tubB.registerReference(r) 455 # the buggy behavior is that the following call raises an exception 456 d = tubA.getReference(furl) 457 # whereas it ought to return a Deferred 458 self.assertTrue(isinstance(d, defer.Deferred)) 459 def _check(f): 460 self.assertTrue(isinstance(f, failure.Failure), f) 461 self.assertTrue(f.check(NoLocationHintsError), f) 462 d.addBoth(_check) 463 return d 464 465 def test_future(self): 466 tubA = Tub() 467 tubA.setServiceParent(self.s) 468 tubB = Tub() 469 tubB.setServiceParent(self.s) 470 471 # "future:stuff" is interpreted as a "location hint format from the 472 # future", which we're supposed to ignore, and are thus left with no 473 # hints 474 tubB.setLocation("future:stuff") 475 r = Receiver(tubB) 476 furl = tubB.registerReference(r) 477 # the buggy behavior is that the following call raises an exception 478 d = tubA.getReference(furl) 479 # whereas it ought to return a Deferred 480 self.assertTrue(isinstance(d, defer.Deferred)) 481 def _check(f): 482 self.assertTrue(isinstance(f, failure.Failure), f) 483 self.assertTrue(f.check(NoLocationHintsError), f) 484 d.addBoth(_check) 485 return d 486