1from zope.interface import implementer
2from twisted.trial import unittest
3from twisted.internet import defer, reactor
4from twisted.application import service
5from foolscap import info, reconnector, ipb, util
6from foolscap.api import Tub
7from foolscap.connections import tcp
8from foolscap.test.common import (certData_low, certData_high, Target)
9
10class Info(unittest.TestCase):
11    def test_stages(self):
12        ci = info.ConnectionInfo()
13
14        self.assertEqual(ci.connected, False)
15        self.assertEqual(ci.connectorStatuses, {})
16        self.assertEqual(ci.connectionHandlers, {})
17        self.assertEqual(ci.establishedAt, None)
18        self.assertEqual(ci.winningHint, None)
19        self.assertEqual(ci.listenerStatus, (None, None))
20        self.assertEqual(ci.lostAt, None)
21
22        ci._describe_connection_handler("hint1", "tcp")
23        ci._set_connection_status("hint1", "working")
24        self.assertEqual(ci.connectorStatuses, {"hint1": "working"})
25        self.assertEqual(ci.connectionHandlers, {"hint1": "tcp"})
26
27        ci._set_connection_status("hint1", "successful")
28        ci._set_winning_hint("hint1")
29        ci._set_established_at(10.0)
30        ci._set_connected(True)
31
32        self.assertEqual(ci.connected, True)
33        self.assertEqual(ci.connectorStatuses, {"hint1": "successful"})
34        self.assertEqual(ci.connectionHandlers, {"hint1": "tcp"})
35        self.assertEqual(ci.establishedAt, 10.0)
36        self.assertEqual(ci.winningHint, "hint1")
37        self.assertEqual(ci.listenerStatus, (None, None))
38        self.assertEqual(ci.lostAt, None)
39
40        ci._set_connected(False)
41        ci._set_lost_at(15.0)
42
43        self.assertEqual(ci.connected, False)
44        self.assertEqual(ci.lostAt, 15.0)
45
46@implementer(ipb.IConnectionHintHandler)
47class Handler:
48    def __init__(self):
49        self.asked = 0
50        self.accepted = 0
51        self._d = defer.Deferred()
52        self._err = None
53
54    def hint_to_endpoint(self, hint, reactor, update_status):
55        self.asked += 1
56        self._update_status = update_status
57        self._d = defer.Deferred()
58        if self._err:
59            raise self._err
60        self.accepted += 1
61        update_status("resolving hint")
62        return self._d
63
64def discard_status(status):
65    pass
66
67class Connect(unittest.TestCase):
68    def setUp(self):
69        self.s = service.MultiService()
70        self.s.startService()
71
72    def tearDown(self):
73        return self.s.stopService()
74
75    def makeTub(self, hint_type, listener_test_options={},
76                extra_hint=None):
77        tubA = Tub(certData=certData_low)
78        tubA.setServiceParent(self.s)
79        tubB = Tub(certData=certData_high)
80        tubB.setServiceParent(self.s)
81        self._tubA, self._tubB = tubA, tubB
82        portnum = util.allocate_tcp_port()
83        self._portnum = portnum
84        port = "tcp:%d:interface=127.0.0.1" % portnum
85        hint = "%s:127.0.0.1:%d" % (hint_type, portnum)
86        if extra_hint:
87            hint = hint + "," + extra_hint
88        tubA.listenOn(port, _test_options=listener_test_options)
89        tubA.setLocation(hint)
90        self._target = Target()
91        furl = tubA.registerReference(self._target)
92        return furl, tubB, hint
93
94    @defer.inlineCallbacks
95    def testInfo(self):
96        def tubA_sendHello_pause(d2):
97            ci = tubB.getConnectionInfoForFURL(furl)
98            self.assertEqual(ci.connectorStatuses, {hint: "negotiating"})
99            d2.callback(None)
100        test_options = {
101            "debug_pause_sendHello": tubA_sendHello_pause,
102            }
103        furl, tubB, hint = self.makeTub("tcp", test_options)
104        h = Handler()
105        tubB.removeAllConnectionHintHandlers()
106        tubB.addConnectionHintHandler("tcp", h)
107        d = tubB.getReference(furl)
108        ci = tubB.getConnectionInfoForFURL(furl)
109        self.assertEqual(ci.connectorStatuses, {hint: "resolving hint"})
110        h._d.callback(tcp.DefaultTCP().hint_to_endpoint(hint, reactor,
111                                                        discard_status))
112        ci = tubB.getConnectionInfoForFURL(furl)
113        self.assertEqual(ci.connectorStatuses, {hint: "connecting"})
114        # we use debug_pause_sendHello to catch "negotiating" here, then wait
115        rref = yield d
116        self.assertEqual(h.asked, 1)
117        self.assertEqual(h.accepted, 1)
118        ci = tubB.getConnectionInfoForFURL(furl)
119        self.assertEqual(ci.connectorStatuses, {hint: "successful"})
120        del rref
121
122    def testNoHandler(self):
123        furl, tubB, hint = self.makeTub("missing", extra_hint="slow:foo")
124        missing_hint, extra = hint.split(",")
125        tubB.removeAllConnectionHintHandlers()
126        h = Handler()
127        tubB.addConnectionHintHandler("slow", h)
128        d = tubB.getReference(furl)
129        del d # XXX
130        ci = tubB.getConnectionInfoForFURL(furl)
131        cs = ci.connectorStatuses
132        self.assertEqual(cs["slow:foo"], "resolving hint")
133        self.assertEqual(cs[missing_hint], "bad hint: no handler registered")
134        h._update_status("phase2")
135        ci = tubB.getConnectionInfoForFURL(furl)
136        cs = ci.connectorStatuses
137        self.assertEqual(cs["slow:foo"], "phase2")
138
139    @defer.inlineCallbacks
140    def testListener(self):
141        furl, tubB, hint = self.makeTub("tcp")
142        rref1 = yield tubB.getReference(furl)
143        yield rref1.callRemote("free", Target())
144        rref2 = self._target.calls[0][0][0]
145        ci = rref2.getConnectionInfo()
146        self.assertEqual(ci.connectorStatuses, {})
147        (listener, status) = ci.listenerStatus
148        self.assertEqual(status, "successful")
149        # Twisted-18.4.0 used the first one. Twisted-18.7.0 switched to
150        # attrs, which stringifies as the second one.
151        self.assertIn(listener,
152                      ("Listener on IPv4Address(TCP, '127.0.0.1', %d)" % self._portnum,
153                       "Listener on IPv4Address(type='TCP', host='127.0.0.1', port=%d)"
154                       % self._portnum))
155
156    @defer.inlineCallbacks
157    def testLoopback(self):
158        furl, tubB, hint = self.makeTub("tcp")
159        rref1 = yield self._tubA.getReference(furl)
160        ci = rref1.getConnectionInfo()
161        self.assertEqual(ci.connectorStatuses, {"loopback": "connected"})
162        self.assertEqual(ci.listenerStatus, (None, None))
163
164
165class Reconnection(unittest.TestCase):
166    def test_stages(self):
167        ri = reconnector.ReconnectionInfo()
168
169        self.assertEqual(ri.state, "unstarted")
170        self.assertEqual(ri.connectionInfo, None)
171        self.assertEqual(ri.lastAttempt, None)
172        self.assertEqual(ri.nextAttempt, None)
173
174        ci = object()
175        ri._set_state("connecting")
176        ri._set_connection_info(ci)
177        ri._set_last_attempt(10.0)
178
179        self.assertEqual(ri.state, "connecting")
180        self.assertEqual(ri.connectionInfo, ci)
181        self.assertEqual(ri.lastAttempt, 10.0)
182        self.assertEqual(ri.nextAttempt, None)
183
184        ri._set_state("connected")
185
186        self.assertEqual(ri.state, "connected")
187
188        ri._set_state("waiting")
189        ri._set_connection_info(None)
190        ri._set_next_attempt(20.0)
191
192        self.assertEqual(ri.state, "waiting")
193        self.assertEqual(ri.connectionInfo, None)
194        self.assertEqual(ri.lastAttempt, 10.0)
195        self.assertEqual(ri.nextAttempt, 20.0)
196