1import json
2from datetime import datetime
3from mock import Mock
4
5from twisted.trial import unittest
6from twisted.internet import defer
7from twisted.python.failure import Failure
8from twisted.web.client import ResponseDone
9
10from txtorcon.router import Router, hexIdFromHash, hashFromHexId
11
12
13class FakeController(object):
14    def get_info_raw(self, i):
15        return defer.succeed('250-ip-to-country/something=XX\r\n250 OK')
16
17
18class UtilityTests(unittest.TestCase):
19
20    def test_hex_converters(self):
21        self.assertEqual(
22            hexIdFromHash('AHhuQ8zFQJdT8l42Axxc6m6kNwI'),
23            '$00786E43CCC5409753F25E36031C5CEA6EA43702'
24        )
25        self.assertEqual(
26            hashFromHexId('$00786E43CCC5409753F25E36031C5CEA6EA43702'),
27            'AHhuQ8zFQJdT8l42Axxc6m6kNwI'
28        )
29        # should work with or without leading $
30        self.assertEqual(
31            hexIdFromHash(hashFromHexId('00786E43CCC5409753F25E36031C5CEA6EA43702')),
32            '$00786E43CCC5409753F25E36031C5CEA6EA43702'
33        )
34
35
36class RouterTests(unittest.TestCase):
37
38    def test_ctor(self):
39        controller = object()
40        router = Router(controller)
41        router.update("foo",
42                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
43                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
44                      "2011-12-16 15:11:34",
45                      "77.183.225.114",
46                      "24051", "24052")
47        self.assertEqual(
48            router.id_hex,
49            "$00786E43CCC5409753F25E36031C5CEA6EA43702"
50        )
51
52        # we assert this twice to cover the cached + uncached cases
53        self.assertTrue(isinstance(router.modified, datetime))
54        self.assertTrue(isinstance(router.modified, datetime))
55        self.assertEqual(router.policy, '')
56
57    def test_unique_name(self):
58        controller = object()
59        router = Router(controller)
60        router.update("foo",
61                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
62                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
63                      "2011-12-16 15:11:34",
64                      "77.183.225.114",
65                      "24051", "24052")
66        self.assertEqual(
67            router.id_hex,
68            "$00786E43CCC5409753F25E36031C5CEA6EA43702"
69        )
70        self.assertEqual(
71            router.unique_name,
72            "$00786E43CCC5409753F25E36031C5CEA6EA43702"
73        )
74        router.flags = ['Named']
75        self.assertEqual(router.unique_name, "foo")
76
77    def test_flags(self):
78        controller = object()
79        router = Router(controller)
80        router.update("foo",
81                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
82                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
83                      "2011-12-16 15:11:34",
84                      "77.183.225.114",
85                      "24051", "24052")
86        router.flags = "Exit Fast Named Running V2Dir Valid".split()
87        self.assertEqual(router.name_is_unique, True)
88
89    def test_flags_from_string(self):
90        controller = object()
91        router = Router(controller)
92        router.update("foo",
93                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
94                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
95                      "2011-12-16 15:11:34",
96                      "77.183.225.114",
97                      "24051", "24052")
98        router.flags = "Exit Fast Named Running V2Dir Valid"
99        self.assertEqual(router.name_is_unique, True)
100
101    def test_policy_accept(self):
102        controller = object()
103        router = Router(controller)
104        router.update("foo",
105                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
106                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
107                      "2011-12-16 15:11:34",
108                      "77.183.225.114",
109                      "24051", "24052")
110        router.policy = "accept 25,128-256".split()
111        self.assertTrue(router.accepts_port(25))
112        for x in range(128, 256):
113            self.assertTrue(router.accepts_port(x))
114        self.assertTrue(not router.accepts_port(26))
115        self.assertEqual(router.policy, 'accept 25,128-256')
116
117    def test_policy_reject(self):
118        controller = object()
119        router = Router(controller)
120        router.update("foo",
121                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
122                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
123                      "2011-12-16 15:11:34",
124                      "77.183.225.114",
125                      "24051", "24052")
126        router.policy = "reject 500-600,655,7766".split()
127        for x in range(1, 500):
128            self.assertTrue(router.accepts_port(x))
129        for x in range(500, 601):
130            self.assertTrue(not router.accepts_port(x))
131
132        self.assertEqual(router.policy, 'reject 500-600,655,7766')
133
134    def test_countrycode(self):
135        class CountryCodeController(object):
136            def get_info_raw(self, i):
137                return defer.succeed(
138                    '250-ip-to-country/127.1.2.3=ZZ\r\n250 OK'
139                )
140        controller = CountryCodeController()
141        router = Router(controller)
142        router.update("foo",
143                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
144                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
145                      "2011-12-16 15:11:34",
146                      "127.1.2.3",
147                      "24051", "24052")
148
149        self.assertEqual(router.location.countrycode, 'ZZ')
150
151    @defer.inlineCallbacks
152    def test_get_location_private(self):
153
154        class CountryCodeController(object):
155            def get_info_raw(self, i):
156                return defer.succeed(
157                    '250-ip-to-country/192.168.0.1=ZZ\r\n250 OK'
158                )
159        controller = CountryCodeController()
160        r = Router(controller)
161        r.update('routername', 'deadbeef', 'orhash', 'modified', '192.168.0.1', '', '')
162        loc0 = yield r.get_location()
163        loc1 = yield r.get_location()
164
165        self.assertEqual(loc0.countrycode, 'ZZ')
166        self.assertEqual(loc1.countrycode, 'ZZ')
167
168    @defer.inlineCallbacks
169    def test_get_location_something(self):
170
171        class CountryCodeController(object):
172            def get_info_raw(self, i):
173                return defer.succeed(
174                    '250-ip-to-country/8.8.8.8=US\r\n250 OK'
175                )
176        controller = CountryCodeController()
177        r = Router(controller)
178        r.update('routername', 'deadbeef', 'orhash', 'modified', '8.8.8.8', '', '')
179        loc = yield r.get_location()
180
181        self.assertNotEqual(loc.countrycode, None)
182
183    @defer.inlineCallbacks
184    def test_get_location_unknown(self):
185
186        class CountryCodeController(object):
187            def get_info_raw(self, i):
188                raise RuntimeError("shouldn't happen")
189        controller = CountryCodeController()
190
191        r = Router(controller)
192        loc = yield r.get_location()
193
194        self.assertEqual(loc.countrycode, None)
195
196    def test_policy_error(self):
197        router = Router(object())
198        try:
199            router.policy = 'foo 123'
200            self.fail()
201        except Exception as e:
202            self.assertTrue("Don't understand" in str(e))
203
204    def test_policy_not_set_error(self):
205        router = Router(object())
206        try:
207            router.accepts_port(123)
208            self.fail()
209        except Exception as e:
210            self.assertTrue("policy" in str(e))
211
212    def test_repr(self):
213        router = Router(FakeController())
214        router.update("foo",
215                      "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
216                      "MAANkj30tnFvmoh7FsjVFr+cmcs",
217                      "2011-12-16 15:11:34",
218                      "1.2.3.4",
219                      "24051", "24052")
220        router.flags = ['Named']
221        repr(router)
222
223    def test_repr_no_update(self):
224        router = Router(FakeController())
225        repr(router)
226
227
228class OnionOOTests(unittest.TestCase):
229
230    def setUp(self):
231        self.router = Router(FakeController())
232        self.router.update(
233            "foo",
234            "AHhuQ8zFQJdT8l42Axxc6m6kNwI",
235            "MAANkj30tnFvmoh7FsjVFr+cmcs",
236            "2011-12-16 15:11:34",
237            "1.2.3.4",
238            "24051", "24052"
239        )
240
241    @defer.inlineCallbacks
242    def test_onionoo_get_fails(self):
243        agent = Mock()
244        resp = Mock()
245        resp.code = 500
246        agent.request = Mock(return_value=defer.succeed(resp))
247
248        with self.assertRaises(Exception) as ctx:
249            yield self.router.get_onionoo_details(agent)
250        self.assertTrue(
251            "Failed to lookup" in str(ctx.exception)
252        )
253
254    @defer.inlineCallbacks
255    def test_onionoo_success(self):
256        agent = Mock()
257        resp = Mock()
258        resp.code = 200
259
260        def feed_response(protocol):
261            config = {
262                "relays": [
263                    {
264                        "fingerprint": "00786E43CCC5409753F25E36031C5CEA6EA43702",
265                    },
266                ]
267            }
268            protocol.dataReceived(json.dumps(config).encode())
269            protocol.connectionLost(Failure(ResponseDone()))
270        resp.deliverBody = Mock(side_effect=feed_response)
271        agent.request = Mock(return_value=defer.succeed(resp))
272
273        data = yield self.router.get_onionoo_details(agent)
274
275        self.assertTrue('fingerprint' in data)
276        self.assertTrue(data['fingerprint'] == "00786E43CCC5409753F25E36031C5CEA6EA43702")
277
278    @defer.inlineCallbacks
279    def test_onionoo_too_many_answers(self):
280        agent = Mock()
281        resp = Mock()
282        resp.code = 200
283
284        def feed_response(protocol):
285            config = {
286                "relays": [
287                    {
288                        "fingerprint": "00786E43CCC5409753F25E36031C5CEA6EA43702",
289                    },
290                    {
291                        "fingerprint": "boom",
292                    }
293                ]
294            }
295            protocol.dataReceived(json.dumps(config).encode())
296            protocol.connectionLost(Failure(ResponseDone()))
297        resp.deliverBody = Mock(side_effect=feed_response)
298        agent.request = Mock(return_value=defer.succeed(resp))
299
300        with self.assertRaises(Exception) as ctx:
301            yield self.router.get_onionoo_details(agent)
302
303        self.assertTrue(
304            "multiple relays for" in str(ctx.exception)
305        )
306
307    @defer.inlineCallbacks
308    def test_onionoo_wrong_fingerprint(self):
309        agent = Mock()
310        resp = Mock()
311        resp.code = 200
312
313        def feed_response(protocol):
314            config = {
315                "relays": [
316                    {
317                        "fingerprint": "boom",
318                    },
319                ]
320            }
321            protocol.dataReceived(json.dumps(config).encode())
322            protocol.connectionLost(Failure(ResponseDone()))
323        resp.deliverBody = Mock(side_effect=feed_response)
324        agent.request = Mock(return_value=defer.succeed(resp))
325
326        with self.assertRaises(Exception) as ctx:
327            yield self.router.get_onionoo_details(agent)
328
329        self.assertTrue(
330            " but got data for " in str(ctx.exception)
331        )
332