1from zope.interface import implementer 2from twisted.trial import unittest 3from twisted.internet import defer, reactor 4from twisted.application import service 5from foolscap import info, reconnector, ipb, util 6from foolscap.api import Tub 7from foolscap.connections import tcp 8from foolscap.test.common import (certData_low, certData_high, Target) 9 10class Info(unittest.TestCase): 11 def test_stages(self): 12 ci = info.ConnectionInfo() 13 14 self.assertEqual(ci.connected, False) 15 self.assertEqual(ci.connectorStatuses, {}) 16 self.assertEqual(ci.connectionHandlers, {}) 17 self.assertEqual(ci.establishedAt, None) 18 self.assertEqual(ci.winningHint, None) 19 self.assertEqual(ci.listenerStatus, (None, None)) 20 self.assertEqual(ci.lostAt, None) 21 22 ci._describe_connection_handler("hint1", "tcp") 23 ci._set_connection_status("hint1", "working") 24 self.assertEqual(ci.connectorStatuses, {"hint1": "working"}) 25 self.assertEqual(ci.connectionHandlers, {"hint1": "tcp"}) 26 27 ci._set_connection_status("hint1", "successful") 28 ci._set_winning_hint("hint1") 29 ci._set_established_at(10.0) 30 ci._set_connected(True) 31 32 self.assertEqual(ci.connected, True) 33 self.assertEqual(ci.connectorStatuses, {"hint1": "successful"}) 34 self.assertEqual(ci.connectionHandlers, {"hint1": "tcp"}) 35 self.assertEqual(ci.establishedAt, 10.0) 36 self.assertEqual(ci.winningHint, "hint1") 37 self.assertEqual(ci.listenerStatus, (None, None)) 38 self.assertEqual(ci.lostAt, None) 39 40 ci._set_connected(False) 41 ci._set_lost_at(15.0) 42 43 self.assertEqual(ci.connected, False) 44 self.assertEqual(ci.lostAt, 15.0) 45 46@implementer(ipb.IConnectionHintHandler) 47class Handler: 48 def __init__(self): 49 self.asked = 0 50 self.accepted = 0 51 self._d = defer.Deferred() 52 self._err = None 53 54 def hint_to_endpoint(self, hint, reactor, update_status): 55 self.asked += 1 56 self._update_status = update_status 57 self._d = defer.Deferred() 58 if self._err: 59 raise self._err 60 self.accepted += 1 61 update_status("resolving hint") 62 return self._d 63 64def discard_status(status): 65 pass 66 67class Connect(unittest.TestCase): 68 def setUp(self): 69 self.s = service.MultiService() 70 self.s.startService() 71 72 def tearDown(self): 73 return self.s.stopService() 74 75 def makeTub(self, hint_type, listener_test_options={}, 76 extra_hint=None): 77 tubA = Tub(certData=certData_low) 78 tubA.setServiceParent(self.s) 79 tubB = Tub(certData=certData_high) 80 tubB.setServiceParent(self.s) 81 self._tubA, self._tubB = tubA, tubB 82 portnum = util.allocate_tcp_port() 83 self._portnum = portnum 84 port = "tcp:%d:interface=127.0.0.1" % portnum 85 hint = "%s:127.0.0.1:%d" % (hint_type, portnum) 86 if extra_hint: 87 hint = hint + "," + extra_hint 88 tubA.listenOn(port, _test_options=listener_test_options) 89 tubA.setLocation(hint) 90 self._target = Target() 91 furl = tubA.registerReference(self._target) 92 return furl, tubB, hint 93 94 @defer.inlineCallbacks 95 def testInfo(self): 96 def tubA_sendHello_pause(d2): 97 ci = tubB.getConnectionInfoForFURL(furl) 98 self.assertEqual(ci.connectorStatuses, {hint: "negotiating"}) 99 d2.callback(None) 100 test_options = { 101 "debug_pause_sendHello": tubA_sendHello_pause, 102 } 103 furl, tubB, hint = self.makeTub("tcp", test_options) 104 h = Handler() 105 tubB.removeAllConnectionHintHandlers() 106 tubB.addConnectionHintHandler("tcp", h) 107 d = tubB.getReference(furl) 108 ci = tubB.getConnectionInfoForFURL(furl) 109 self.assertEqual(ci.connectorStatuses, {hint: "resolving hint"}) 110 h._d.callback(tcp.DefaultTCP().hint_to_endpoint(hint, reactor, 111 discard_status)) 112 ci = tubB.getConnectionInfoForFURL(furl) 113 self.assertEqual(ci.connectorStatuses, {hint: "connecting"}) 114 # we use debug_pause_sendHello to catch "negotiating" here, then wait 115 rref = yield d 116 self.assertEqual(h.asked, 1) 117 self.assertEqual(h.accepted, 1) 118 ci = tubB.getConnectionInfoForFURL(furl) 119 self.assertEqual(ci.connectorStatuses, {hint: "successful"}) 120 del rref 121 122 def testNoHandler(self): 123 furl, tubB, hint = self.makeTub("missing", extra_hint="slow:foo") 124 missing_hint, extra = hint.split(",") 125 tubB.removeAllConnectionHintHandlers() 126 h = Handler() 127 tubB.addConnectionHintHandler("slow", h) 128 d = tubB.getReference(furl) 129 del d # XXX 130 ci = tubB.getConnectionInfoForFURL(furl) 131 cs = ci.connectorStatuses 132 self.assertEqual(cs["slow:foo"], "resolving hint") 133 self.assertEqual(cs[missing_hint], "bad hint: no handler registered") 134 h._update_status("phase2") 135 ci = tubB.getConnectionInfoForFURL(furl) 136 cs = ci.connectorStatuses 137 self.assertEqual(cs["slow:foo"], "phase2") 138 139 @defer.inlineCallbacks 140 def testListener(self): 141 furl, tubB, hint = self.makeTub("tcp") 142 rref1 = yield tubB.getReference(furl) 143 yield rref1.callRemote("free", Target()) 144 rref2 = self._target.calls[0][0][0] 145 ci = rref2.getConnectionInfo() 146 self.assertEqual(ci.connectorStatuses, {}) 147 (listener, status) = ci.listenerStatus 148 self.assertEqual(status, "successful") 149 # Twisted-18.4.0 used the first one. Twisted-18.7.0 switched to 150 # attrs, which stringifies as the second one. 151 self.assertIn(listener, 152 ("Listener on IPv4Address(TCP, '127.0.0.1', %d)" % self._portnum, 153 "Listener on IPv4Address(type='TCP', host='127.0.0.1', port=%d)" 154 % self._portnum)) 155 156 @defer.inlineCallbacks 157 def testLoopback(self): 158 furl, tubB, hint = self.makeTub("tcp") 159 rref1 = yield self._tubA.getReference(furl) 160 ci = rref1.getConnectionInfo() 161 self.assertEqual(ci.connectorStatuses, {"loopback": "connected"}) 162 self.assertEqual(ci.listenerStatus, (None, None)) 163 164 165class Reconnection(unittest.TestCase): 166 def test_stages(self): 167 ri = reconnector.ReconnectionInfo() 168 169 self.assertEqual(ri.state, "unstarted") 170 self.assertEqual(ri.connectionInfo, None) 171 self.assertEqual(ri.lastAttempt, None) 172 self.assertEqual(ri.nextAttempt, None) 173 174 ci = object() 175 ri._set_state("connecting") 176 ri._set_connection_info(ci) 177 ri._set_last_attempt(10.0) 178 179 self.assertEqual(ri.state, "connecting") 180 self.assertEqual(ri.connectionInfo, ci) 181 self.assertEqual(ri.lastAttempt, 10.0) 182 self.assertEqual(ri.nextAttempt, None) 183 184 ri._set_state("connected") 185 186 self.assertEqual(ri.state, "connected") 187 188 ri._set_state("waiting") 189 ri._set_connection_info(None) 190 ri._set_next_attempt(20.0) 191 192 self.assertEqual(ri.state, "waiting") 193 self.assertEqual(ri.connectionInfo, None) 194 self.assertEqual(ri.lastAttempt, 10.0) 195 self.assertEqual(ri.nextAttempt, 20.0) 196