1import json 2from datetime import datetime 3from mock import Mock 4 5from twisted.trial import unittest 6from twisted.internet import defer 7from twisted.python.failure import Failure 8from twisted.web.client import ResponseDone 9 10from txtorcon.router import Router, hexIdFromHash, hashFromHexId 11 12 13class FakeController(object): 14 def get_info_raw(self, i): 15 return defer.succeed('250-ip-to-country/something=XX\r\n250 OK') 16 17 18class UtilityTests(unittest.TestCase): 19 20 def test_hex_converters(self): 21 self.assertEqual( 22 hexIdFromHash('AHhuQ8zFQJdT8l42Axxc6m6kNwI'), 23 '$00786E43CCC5409753F25E36031C5CEA6EA43702' 24 ) 25 self.assertEqual( 26 hashFromHexId('$00786E43CCC5409753F25E36031C5CEA6EA43702'), 27 'AHhuQ8zFQJdT8l42Axxc6m6kNwI' 28 ) 29 # should work with or without leading $ 30 self.assertEqual( 31 hexIdFromHash(hashFromHexId('00786E43CCC5409753F25E36031C5CEA6EA43702')), 32 '$00786E43CCC5409753F25E36031C5CEA6EA43702' 33 ) 34 35 36class RouterTests(unittest.TestCase): 37 38 def test_ctor(self): 39 controller = object() 40 router = Router(controller) 41 router.update("foo", 42 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 43 "MAANkj30tnFvmoh7FsjVFr+cmcs", 44 "2011-12-16 15:11:34", 45 "77.183.225.114", 46 "24051", "24052") 47 self.assertEqual( 48 router.id_hex, 49 "$00786E43CCC5409753F25E36031C5CEA6EA43702" 50 ) 51 52 # we assert this twice to cover the cached + uncached cases 53 self.assertTrue(isinstance(router.modified, datetime)) 54 self.assertTrue(isinstance(router.modified, datetime)) 55 self.assertEqual(router.policy, '') 56 57 def test_unique_name(self): 58 controller = object() 59 router = Router(controller) 60 router.update("foo", 61 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 62 "MAANkj30tnFvmoh7FsjVFr+cmcs", 63 "2011-12-16 15:11:34", 64 "77.183.225.114", 65 "24051", "24052") 66 self.assertEqual( 67 router.id_hex, 68 "$00786E43CCC5409753F25E36031C5CEA6EA43702" 69 ) 70 self.assertEqual( 71 router.unique_name, 72 "$00786E43CCC5409753F25E36031C5CEA6EA43702" 73 ) 74 router.flags = ['Named'] 75 self.assertEqual(router.unique_name, "foo") 76 77 def test_flags(self): 78 controller = object() 79 router = Router(controller) 80 router.update("foo", 81 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 82 "MAANkj30tnFvmoh7FsjVFr+cmcs", 83 "2011-12-16 15:11:34", 84 "77.183.225.114", 85 "24051", "24052") 86 router.flags = "Exit Fast Named Running V2Dir Valid".split() 87 self.assertEqual(router.name_is_unique, True) 88 89 def test_flags_from_string(self): 90 controller = object() 91 router = Router(controller) 92 router.update("foo", 93 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 94 "MAANkj30tnFvmoh7FsjVFr+cmcs", 95 "2011-12-16 15:11:34", 96 "77.183.225.114", 97 "24051", "24052") 98 router.flags = "Exit Fast Named Running V2Dir Valid" 99 self.assertEqual(router.name_is_unique, True) 100 101 def test_policy_accept(self): 102 controller = object() 103 router = Router(controller) 104 router.update("foo", 105 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 106 "MAANkj30tnFvmoh7FsjVFr+cmcs", 107 "2011-12-16 15:11:34", 108 "77.183.225.114", 109 "24051", "24052") 110 router.policy = "accept 25,128-256".split() 111 self.assertTrue(router.accepts_port(25)) 112 for x in range(128, 256): 113 self.assertTrue(router.accepts_port(x)) 114 self.assertTrue(not router.accepts_port(26)) 115 self.assertEqual(router.policy, 'accept 25,128-256') 116 117 def test_policy_reject(self): 118 controller = object() 119 router = Router(controller) 120 router.update("foo", 121 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 122 "MAANkj30tnFvmoh7FsjVFr+cmcs", 123 "2011-12-16 15:11:34", 124 "77.183.225.114", 125 "24051", "24052") 126 router.policy = "reject 500-600,655,7766".split() 127 for x in range(1, 500): 128 self.assertTrue(router.accepts_port(x)) 129 for x in range(500, 601): 130 self.assertTrue(not router.accepts_port(x)) 131 132 self.assertEqual(router.policy, 'reject 500-600,655,7766') 133 134 def test_countrycode(self): 135 class CountryCodeController(object): 136 def get_info_raw(self, i): 137 return defer.succeed( 138 '250-ip-to-country/127.1.2.3=ZZ\r\n250 OK' 139 ) 140 controller = CountryCodeController() 141 router = Router(controller) 142 router.update("foo", 143 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 144 "MAANkj30tnFvmoh7FsjVFr+cmcs", 145 "2011-12-16 15:11:34", 146 "127.1.2.3", 147 "24051", "24052") 148 149 self.assertEqual(router.location.countrycode, 'ZZ') 150 151 @defer.inlineCallbacks 152 def test_get_location_private(self): 153 154 class CountryCodeController(object): 155 def get_info_raw(self, i): 156 return defer.succeed( 157 '250-ip-to-country/192.168.0.1=ZZ\r\n250 OK' 158 ) 159 controller = CountryCodeController() 160 r = Router(controller) 161 r.update('routername', 'deadbeef', 'orhash', 'modified', '192.168.0.1', '', '') 162 loc0 = yield r.get_location() 163 loc1 = yield r.get_location() 164 165 self.assertEqual(loc0.countrycode, 'ZZ') 166 self.assertEqual(loc1.countrycode, 'ZZ') 167 168 @defer.inlineCallbacks 169 def test_get_location_something(self): 170 171 class CountryCodeController(object): 172 def get_info_raw(self, i): 173 return defer.succeed( 174 '250-ip-to-country/8.8.8.8=US\r\n250 OK' 175 ) 176 controller = CountryCodeController() 177 r = Router(controller) 178 r.update('routername', 'deadbeef', 'orhash', 'modified', '8.8.8.8', '', '') 179 loc = yield r.get_location() 180 181 self.assertNotEqual(loc.countrycode, None) 182 183 @defer.inlineCallbacks 184 def test_get_location_unknown(self): 185 186 class CountryCodeController(object): 187 def get_info_raw(self, i): 188 raise RuntimeError("shouldn't happen") 189 controller = CountryCodeController() 190 191 r = Router(controller) 192 loc = yield r.get_location() 193 194 self.assertEqual(loc.countrycode, None) 195 196 def test_policy_error(self): 197 router = Router(object()) 198 try: 199 router.policy = 'foo 123' 200 self.fail() 201 except Exception as e: 202 self.assertTrue("Don't understand" in str(e)) 203 204 def test_policy_not_set_error(self): 205 router = Router(object()) 206 try: 207 router.accepts_port(123) 208 self.fail() 209 except Exception as e: 210 self.assertTrue("policy" in str(e)) 211 212 def test_repr(self): 213 router = Router(FakeController()) 214 router.update("foo", 215 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 216 "MAANkj30tnFvmoh7FsjVFr+cmcs", 217 "2011-12-16 15:11:34", 218 "1.2.3.4", 219 "24051", "24052") 220 router.flags = ['Named'] 221 repr(router) 222 223 def test_repr_no_update(self): 224 router = Router(FakeController()) 225 repr(router) 226 227 228class OnionOOTests(unittest.TestCase): 229 230 def setUp(self): 231 self.router = Router(FakeController()) 232 self.router.update( 233 "foo", 234 "AHhuQ8zFQJdT8l42Axxc6m6kNwI", 235 "MAANkj30tnFvmoh7FsjVFr+cmcs", 236 "2011-12-16 15:11:34", 237 "1.2.3.4", 238 "24051", "24052" 239 ) 240 241 @defer.inlineCallbacks 242 def test_onionoo_get_fails(self): 243 agent = Mock() 244 resp = Mock() 245 resp.code = 500 246 agent.request = Mock(return_value=defer.succeed(resp)) 247 248 with self.assertRaises(Exception) as ctx: 249 yield self.router.get_onionoo_details(agent) 250 self.assertTrue( 251 "Failed to lookup" in str(ctx.exception) 252 ) 253 254 @defer.inlineCallbacks 255 def test_onionoo_success(self): 256 agent = Mock() 257 resp = Mock() 258 resp.code = 200 259 260 def feed_response(protocol): 261 config = { 262 "relays": [ 263 { 264 "fingerprint": "00786E43CCC5409753F25E36031C5CEA6EA43702", 265 }, 266 ] 267 } 268 protocol.dataReceived(json.dumps(config).encode()) 269 protocol.connectionLost(Failure(ResponseDone())) 270 resp.deliverBody = Mock(side_effect=feed_response) 271 agent.request = Mock(return_value=defer.succeed(resp)) 272 273 data = yield self.router.get_onionoo_details(agent) 274 275 self.assertTrue('fingerprint' in data) 276 self.assertTrue(data['fingerprint'] == "00786E43CCC5409753F25E36031C5CEA6EA43702") 277 278 @defer.inlineCallbacks 279 def test_onionoo_too_many_answers(self): 280 agent = Mock() 281 resp = Mock() 282 resp.code = 200 283 284 def feed_response(protocol): 285 config = { 286 "relays": [ 287 { 288 "fingerprint": "00786E43CCC5409753F25E36031C5CEA6EA43702", 289 }, 290 { 291 "fingerprint": "boom", 292 } 293 ] 294 } 295 protocol.dataReceived(json.dumps(config).encode()) 296 protocol.connectionLost(Failure(ResponseDone())) 297 resp.deliverBody = Mock(side_effect=feed_response) 298 agent.request = Mock(return_value=defer.succeed(resp)) 299 300 with self.assertRaises(Exception) as ctx: 301 yield self.router.get_onionoo_details(agent) 302 303 self.assertTrue( 304 "multiple relays for" in str(ctx.exception) 305 ) 306 307 @defer.inlineCallbacks 308 def test_onionoo_wrong_fingerprint(self): 309 agent = Mock() 310 resp = Mock() 311 resp.code = 200 312 313 def feed_response(protocol): 314 config = { 315 "relays": [ 316 { 317 "fingerprint": "boom", 318 }, 319 ] 320 } 321 protocol.dataReceived(json.dumps(config).encode()) 322 protocol.connectionLost(Failure(ResponseDone())) 323 resp.deliverBody = Mock(side_effect=feed_response) 324 agent.request = Mock(return_value=defer.succeed(resp)) 325 326 with self.assertRaises(Exception) as ctx: 327 yield self.router.get_onionoo_details(agent) 328 329 self.assertTrue( 330 " but got data for " in str(ctx.exception) 331 ) 332