1from __future__ import print_function 2 3from zope.interface import implementer, directlyProvides 4from zope.interface.verify import verifyClass 5from twisted.trial import unittest 6from twisted.test import proto_helpers 7from twisted.python.failure import Failure 8from twisted.internet import task, defer 9from twisted.internet.interfaces import IStreamClientEndpoint, IReactorCore 10 11import tempfile 12 13from ipaddress import IPv4Address 14 15from mock import patch, Mock 16 17from txtorcon import TorControlProtocol 18from txtorcon import TorProtocolError 19from txtorcon import TorState 20from txtorcon import Stream 21from txtorcon import Circuit 22from txtorcon import build_tor_connection 23from txtorcon import build_local_tor_connection 24from txtorcon import build_timeout_circuit 25from txtorcon import CircuitBuildTimedOutError 26from txtorcon.interface import IStreamAttacher 27from txtorcon.interface import ICircuitListener 28from txtorcon.interface import IStreamListener 29from txtorcon.interface import StreamListenerMixin 30from txtorcon.interface import CircuitListenerMixin 31from txtorcon.circuit import _get_circuit_attacher 32from txtorcon.circuit import _extract_reason 33 34try: 35 from .py3_torstate import TorStatePy3Tests # noqa 36except SyntaxError: 37 pass 38 39 40@implementer(ICircuitListener) 41class CircuitListener(object): 42 43 def __init__(self, expected): 44 "expect is a list of tuples: (event, {key:value, key1:value1, ..})" 45 self.expected = expected 46 47 def checker(self, state, circuit, arg=None): 48 if self.expected[0][0] != state: 49 raise RuntimeError( 50 'Expected event "%s" not "%s".' % 51 (self.expected[0][0], state) 52 ) 53 for (k, v) in self.expected[0][1].items(): 54 if k == 'arg': 55 if v != arg: 56 raise RuntimeError( 57 'Expected argument to have value "%s", not "%s"' % (arg, v) 58 ) 59 elif getattr(circuit, k) != v: 60 raise RuntimeError( 61 'Expected attribute "%s" to have value "%s", not "%s"' % 62 (k, v, getattr(circuit, k)) 63 ) 64 self.expected = self.expected[1:] 65 66 def circuit_new(self, circuit): 67 self.checker('new', circuit) 68 69 def circuit_launched(self, circuit): 70 self.checker('launched', circuit) 71 72 def circuit_extend(self, circuit, router): 73 self.checker('extend', circuit, router) 74 75 def circuit_built(self, circuit): 76 self.checker('built', circuit) 77 78 def circuit_closed(self, circuit, **kw): 79 self.checker('closed', circuit, **kw) 80 81 def circuit_failed(self, circuit, **kw): 82 self.checker('failed', circuit, **kw) 83 84 85@implementer(IStreamListener) 86class StreamListener(object): 87 88 def __init__(self, expected): 89 "expect is a list of tuples: (event, {key:value, key1:value1, ..})" 90 self.expected = expected 91 92 def checker(self, state, stream, arg=None): 93 if self.expected[0][0] != state: 94 raise RuntimeError( 95 'Expected event "%s" not "%s".' % (self.expected[0][0], state) 96 ) 97 for (k, v) in self.expected[0][1].items(): 98 if k == 'arg': 99 if v != arg: 100 raise RuntimeError( 101 'Expected argument to have value "%s", not "%s"' % 102 (arg, v) 103 ) 104 elif getattr(stream, k) != v: 105 raise RuntimeError( 106 'Expected attribute "%s" to have value "%s", not "%s"' % 107 (k, v, getattr(stream, k)) 108 ) 109 self.expected = self.expected[1:] 110 111 def stream_new(self, stream): 112 self.checker('new', stream) 113 114 def stream_succeeded(self, stream): 115 self.checker('succeeded', stream) 116 117 def stream_attach(self, stream, circuit): 118 self.checker('attach', stream, circuit) 119 120 def stream_closed(self, stream): 121 self.checker('closed', stream) 122 123 def stream_failed(self, stream, reason, remote_reason): 124 self.checker('failed', stream, reason) 125 126 127@implementer(IReactorCore) 128class FakeReactor: 129 130 def __init__(self, test): 131 self.test = test 132 133 def addSystemEventTrigger(self, *args): 134 self.test.assertEqual(args[0], 'before') 135 self.test.assertEqual(args[1], 'shutdown') 136 self.test.assertEqual(args[2], self.test.state.undo_attacher) 137 return 1 138 139 def removeSystemEventTrigger(self, id): 140 self.test.assertEqual(id, 1) 141 142 def connectTCP(self, *args, **kw): 143 """for testing build_tor_connection""" 144 raise RuntimeError('connectTCP: ' + str(args)) 145 146 def connectUNIX(self, *args, **kw): 147 """for testing build_tor_connection""" 148 raise RuntimeError('connectUNIX: ' + str(args)) 149 150 151class FakeCircuit(Circuit): 152 153 def __init__(self, id=-999): 154 self.streams = [] 155 self.id = id 156 self.state = 'BOGUS' 157 158 159@implementer(IStreamClientEndpoint) 160class FakeEndpoint: 161 162 def get_info_raw(self, keys): 163 ans = '\r\n'.join(map(lambda k: '%s=' % k, keys.split())) 164 return defer.succeed(ans) 165 166 def get_info_incremental(self, key, linecb): 167 linecb('%s=' % key) 168 return defer.succeed('') 169 170 def connect(self, protocol_factory): 171 self.proto = TorControlProtocol() 172 self.proto.transport = proto_helpers.StringTransport() 173 self.proto.get_info_raw = self.get_info_raw 174 self.proto.get_info_incremental = self.get_info_incremental 175 self.proto._set_valid_events( 176 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL' 177 ) 178 179 return defer.succeed(self.proto) 180 181 182@implementer(IStreamClientEndpoint) 183class FakeEndpointAnswers: 184 185 def __init__(self, answers): 186 self.answers = answers 187 # since we use pop() we need these to be "backwards" 188 self.answers.reverse() 189 190 def add_event_listener(self, *args): 191 return defer.succeed(None) 192 193 def get_info_raw(self, keys): 194 ans = '' 195 for k in keys.split(): 196 if len(self.answers) == 0: 197 raise TorProtocolError(551, "ran out of answers") 198 ans += '%s=%s\r\n' % (k, self.answers.pop()) 199 return ans[:-2] # don't want trailing \r\n 200 201 def get_info_incremental(self, key, linecb): 202 data = self.answers.pop().split('\n') 203 if len(data) == 1: 204 linecb('{}={}'.format(key, data[0])) 205 else: 206 linecb('{}='.format(key)) 207 for line in data: 208 linecb(line) 209 return defer.succeed('') 210 211 def connect(self, protocol_factory): 212 self.proto = TorControlProtocol() 213 self.proto.transport = proto_helpers.StringTransport() 214 self.proto.get_info_raw = self.get_info_raw 215 self.proto.get_info_incremental = self.get_info_incremental 216 self.proto.add_event_listener = self.add_event_listener 217 self.proto._set_valid_events( 218 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL' 219 ) 220 221 return defer.succeed(self.proto) 222 223 224class BootstrapTests(unittest.TestCase): 225 226 def confirm_proto(self, x): 227 self.assertTrue(isinstance(x, TorControlProtocol)) 228 self.assertTrue(x.post_bootstrap.called) 229 return x 230 231 def confirm_state(self, x): 232 self.assertIsInstance(x, TorState) 233 self.assertTrue(x.post_bootstrap.called) 234 return x 235 236 def confirm_consensus(self, x): 237 self.assertEqual(2, len(x.all_routers)) 238 self.assertIn('fake', x.routers) 239 self.assertIn('ekaf', x.routers) 240 return x 241 242 def test_build(self): 243 p = FakeEndpoint() 244 d = build_tor_connection(p, build_state=False) 245 d.addCallback(self.confirm_proto) 246 p.proto.post_bootstrap.callback(p.proto) 247 return d 248 249 def test_build_tcp(self): 250 d = build_tor_connection((FakeReactor(self), '127.0.0.1', 1234)) 251 d.addCallback(self.fail) 252 d.addErrback(lambda x: None) 253 return d 254 255 def test_build_unix(self): 256 tf = tempfile.NamedTemporaryFile() 257 d = build_tor_connection((FakeReactor(self), tf.name)) 258 d.addCallback(self.fail) 259 d.addErrback(lambda x: None) 260 return d 261 262 def test_build_unix_wrong_permissions(self): 263 self.assertRaises( 264 ValueError, 265 build_tor_connection, 266 (FakeReactor(self), 'a non-existant filename') 267 ) 268 269 def test_build_wrong_size_tuple(self): 270 self.assertRaises(TypeError, build_tor_connection, (1, 2, 3, 4)) 271 272 def test_build_wrong_args_entirely(self): 273 self.assertRaises( 274 TypeError, 275 build_tor_connection, 276 'incorrect argument' 277 ) 278 279 def confirm_pid(self, state): 280 self.assertEqual(state.tor_pid, 1234) 281 return state 282 283 def confirm_no_pid(self, state): 284 self.assertEqual(state.tor_pid, 0) 285 286 def test_build_with_answers(self): 287 p = FakeEndpointAnswers(['', # ns/all 288 '', # circuit-status 289 '', # stream-status 290 '', # address-mappings/all 291 '', # entry-guards 292 '1234' # PID 293 ]) 294 295 d = build_tor_connection(p, build_state=True) 296 d.addCallback(self.confirm_state).addErrback(self.fail) 297 d.addCallback(self.confirm_pid).addErrback(self.fail) 298 p.proto.post_bootstrap.callback(p.proto) 299 return d 300 301 def test_build_with_answers_ns(self): 302 fake_consensus = '\n'.join([ 303 'r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 11.11.11.11 443 80', 304 's Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof', 305 'r ekaf foooooooooooooooooooooooooo barbarbarbarbarbarbarbarbar 2011-11-11 16:30:00 22.22.22.22 443 80', 306 's Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof', 307 '', 308 ]) 309 p = FakeEndpointAnswers([fake_consensus, # ns/all 310 '', # circuit-status 311 '', # stream-status 312 '', # address-mappings/all 313 '', # entry-guards 314 '1234' # PID 315 ]) 316 317 d = build_tor_connection(p, build_state=True) 318 d.addCallback(self.confirm_state).addErrback(self.fail) 319 d.addCallback(self.confirm_pid).addErrback(self.fail) 320 d.addCallback(self.confirm_consensus).addErrback(self.fail) 321 p.proto.post_bootstrap.callback(p.proto) 322 return d 323 324 def test_build_with_answers_no_pid(self): 325 p = FakeEndpointAnswers(['', # ns/all 326 '', # circuit-status 327 '', # stream-status 328 '', # address-mappings/all 329 '' # entry-guards 330 ]) 331 332 d = build_tor_connection(p, build_state=True) 333 d.addCallback(self.confirm_state) 334 d.addCallback(self.confirm_no_pid) 335 p.proto.post_bootstrap.callback(p.proto) 336 return d 337 338 def test_build_with_answers_guards_unfound_entry(self): 339 p = FakeEndpointAnswers(['', # ns/all 340 '', # circuit-status 341 '', # stream-status 342 '', # address-mappings/all 343 '\n\nkerblam up\nOK\n' # entry-guards 344 ]) 345 346 d = build_tor_connection(p, build_state=True) 347 d.addCallback(self.confirm_state) 348 d.addCallback(self.confirm_no_pid) 349 p.proto.post_bootstrap.callback(p.proto) 350 return d 351 352 def test_build_local_unix(self): 353 reactor = FakeReactor(self) 354 d = build_local_tor_connection(reactor) 355 d.addErrback(lambda _: None) 356 return d 357 358 def test_build_local_tcp(self): 359 reactor = FakeReactor(self) 360 d = build_local_tor_connection(reactor, socket=None) 361 d.addErrback(lambda _: None) 362 return d 363 364 365class UtilTests(unittest.TestCase): 366 367 def test_extract_reason_no_reason(self): 368 reason = _extract_reason(dict()) 369 self.assertEqual("unknown", reason) 370 371 372class StateTests(unittest.TestCase): 373 374 def setUp(self): 375 self.protocol = TorControlProtocol() 376 self.state = TorState(self.protocol) 377 # avoid spew in trial logs; state prints this by default 378 self.state._attacher_error = lambda f: f 379 self.protocol.connectionMade = lambda: None 380 self.transport = proto_helpers.StringTransport() 381 self.protocol.makeConnection(self.transport) 382 383 def test_close_stream_with_attacher(self): 384 @implementer(IStreamAttacher) 385 class MyAttacher(object): 386 387 def __init__(self): 388 self.streams = [] 389 390 def attach_stream(self, stream, circuits): 391 self.streams.append(stream) 392 return None 393 394 attacher = MyAttacher() 395 self.state.set_attacher(attacher, FakeReactor(self)) 396 self.state._stream_update("76 CLOSED 0 www.example.com:0 REASON=DONE") 397 398 def test_attacher_error_handler(self): 399 # make sure error-handling "does something" that isn't blowing up 400 with patch('sys.stdout'): 401 TorState(self.protocol)._attacher_error(Failure(RuntimeError("quote"))) 402 403 def test_stream_update(self): 404 # we use a circuit ID of 0 so it doesn't try to look anything 405 # up but it's not really correct to have a SUCCEEDED w/o a 406 # valid circuit, I don't think 407 self.state._stream_update('1610 SUCCEEDED 0 74.125.224.243:80') 408 self.assertTrue(1610 in self.state.streams) 409 410 def test_single_streams(self): 411 self.state.circuits[496] = FakeCircuit(496) 412 self.state._stream_status( 413 'stream-status=123 SUCCEEDED 496 www.example.com:6667' 414 ) 415 self.assertEqual(len(self.state.streams), 1) 416 417 def test_multiple_streams(self): 418 self.state.circuits[496] = FakeCircuit(496) 419 self.state._stream_status( 420 '\r\n'.join([ 421 'stream-status=', 422 '123 SUCCEEDED 496 www.example.com:6667', 423 '124 SUCCEEDED 496 www.example.com:6667', 424 ]) 425 ) 426 self.assertEqual(len(self.state.streams), 2) 427 428 def send(self, line): 429 self.protocol.dataReceived(line.strip() + b"\r\n") 430 431 @defer.inlineCallbacks 432 def test_bootstrap_callback(self): 433 ''' 434 FIXME: something is still screwy with this; try throwing an 435 exception from TorState.bootstrap and we'll just hang... 436 ''' 437 438 from .test_torconfig import FakeControlProtocol 439 protocol = FakeControlProtocol( 440 [ 441 "ns/all=", # ns/all 442 "", # circuit-status 443 "", # stream-status 444 "", # address-mappings/all 445 "entry-guards=\r\n$0000000000000000000000000000000000000000=name up\r\n$1111111111111111111111111111111111111111=foo up\r\n$9999999999999999999999999999999999999999=eman unusable 2012-01-01 22:00:00\r\n", # entry-guards 446 "99999", # process/pid 447 "??", # ip-to-country/0.0.0.0 448 ] 449 ) 450 451 state = yield TorState.from_protocol(protocol) 452 453 self.assertEqual(len(state.entry_guards), 2) 454 self.assertTrue('$0000000000000000000000000000000000000000' in state.entry_guards) 455 self.assertTrue('$1111111111111111111111111111111111111111' in state.entry_guards) 456 self.assertEqual(len(state.unusable_entry_guards), 1) 457 self.assertTrue('$9999999999999999999999999999999999999999' in state.unusable_entry_guards[0]) 458 459 def test_bootstrap_existing_addresses(self): 460 ''' 461 FIXME: something is still screwy with this; try throwing an 462 exception from TorState.bootstrap and we'll just hang... 463 ''' 464 465 d = self.state.post_bootstrap 466 467 clock = task.Clock() 468 self.state.addrmap.scheduler = clock 469 470 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 471 self.state._bootstrap() 472 473 self.send(b"250+ns/all=") 474 self.send(b".") 475 self.send(b"250 OK") 476 477 self.send(b"250+circuit-status=") 478 self.send(b".") 479 self.send(b"250 OK") 480 481 self.send(b"250-stream-status=") 482 self.send(b"250 OK") 483 484 self.send(b"250+address-mappings/all=") 485 self.send(b'www.example.com 127.0.0.1 "2012-01-01 00:00:00"') 486 self.send(b'subdomain.example.com 10.0.0.0 "2012-01-01 00:01:02"') 487 self.send(b".") 488 self.send(b"250 OK") 489 490 for ignored in self.state.event_map.items(): 491 self.send(b"250 OK") 492 493 self.send(b"250-entry-guards=") 494 self.send(b"250 OK") 495 496 self.send(b"250 OK") 497 498 self.assertEqual(len(self.state.addrmap.addr), 4) 499 self.assertTrue('www.example.com' in self.state.addrmap.addr) 500 self.assertTrue('subdomain.example.com' in self.state.addrmap.addr) 501 self.assertTrue('10.0.0.0' in self.state.addrmap.addr) 502 self.assertTrue('127.0.0.1' in self.state.addrmap.addr) 503 self.assertEqual(IPv4Address(u'127.0.0.1'), self.state.addrmap.find('www.example.com').ip) 504 self.assertEqual('www.example.com', self.state.addrmap.find('127.0.0.1').name) 505 self.assertEqual(IPv4Address(u'10.0.0.0'), self.state.addrmap.find('subdomain.example.com').ip) 506 self.assertEqual('subdomain.example.com', self.state.addrmap.find('10.0.0.0').name) 507 508 return d 509 510 def test_bootstrap_single_existing_circuit(self): 511 ''' 512 test with exactly one circuit. should probably test with 2 as 513 well, since there was a bug with the handling of just one. 514 ''' 515 516 d = self.state.post_bootstrap 517 518 clock = task.Clock() 519 self.state.addrmap.scheduler = clock 520 521 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 522 self.state._bootstrap() 523 524 self.send(b"250+ns/all=") 525 self.send(b".") 526 self.send(b"250 OK") 527 528 self.send(b"250-circuit-status=123 BUILT PURPOSE=GENERAL") 529 self.send(b"250 OK") 530 531 self.send(b"250-stream-status=") 532 self.send(b"250 OK") 533 534 self.send(b"250+address-mappings/all=") 535 self.send(b".") 536 self.send(b"250 OK") 537 538 for ignored in self.state.event_map.items(): 539 self.send(b"250 OK") 540 541 self.send(b"250-entry-guards=") 542 self.send(b"250 OK") 543 544 self.send(b"250 OK") 545 546 self.assertTrue(self.state.find_circuit(123)) 547 self.assertEqual(len(self.state.circuits), 1) 548 549 return d 550 551 def test_unset_attacher(self): 552 553 @implementer(IStreamAttacher) 554 class MyAttacher(object): 555 556 def attach_stream(self, stream, circuits): 557 return None 558 559 fr = FakeReactor(self) 560 attacher = MyAttacher() 561 self.state.set_attacher(attacher, fr) 562 self.send(b"250 OK") 563 self.state.set_attacher(None, fr) 564 self.send(b"250 OK") 565 self.assertEqual( 566 self.transport.value(), 567 b'SETCONF __LeaveStreamsUnattached=1\r\nSETCONF' 568 b' __LeaveStreamsUnattached=0\r\n' 569 ) 570 571 def test_attacher_twice(self): 572 """ 573 It should be an error to set an attacher twice 574 """ 575 @implementer(IStreamAttacher) 576 class MyAttacher(object): 577 pass 578 579 attacher = MyAttacher() 580 self.state.set_attacher(attacher, FakeReactor(self)) 581 # attach the *same* instance twice; not an error 582 self.state.set_attacher(attacher, FakeReactor(self)) 583 with self.assertRaises(RuntimeError) as ctx: 584 self.state.set_attacher(MyAttacher(), FakeReactor(self)) 585 self.assertTrue( 586 "already have an attacher" in str(ctx.exception) 587 ) 588 589 @defer.inlineCallbacks 590 def _test_attacher_both_apis(self): 591 """ 592 similar to above, but first set_attacher is implicit via 593 Circuit.stream_via 594 """ 595 reactor = Mock() 596 directlyProvides(reactor, IReactorCore) 597 598 @implementer(IStreamAttacher) 599 class MyAttacher(object): 600 pass 601 602 circ = Circuit(self.state) 603 circ.state = 'BUILT' 604 605 # use the "preferred" API, which will set an attacher 606 factory = Mock() 607 proto = Mock() 608 proto.when_done = Mock(return_value=defer.succeed(None)) 609 factory.connect = Mock(return_value=defer.succeed(proto)) 610 ep = circ.stream_via(reactor, 'meejah.ca', 443, factory) 611 addr = Mock() 612 addr.host = '10.0.0.1' 613 addr.port = 1011 614 ep._target_endpoint._get_address = Mock(return_value=defer.succeed(addr)) 615 attacher = yield _get_circuit_attacher(reactor, self.state) 616 d = ep.connect('foo') 617 stream = Mock() 618 import ipaddress 619 stream.source_addr = ipaddress.IPv4Address(u'10.0.0.1') 620 stream.source_port = 1011 621 attacher.attach_stream(stream, []) 622 yield d 623 624 # ...now use the low-level API (should be an error) 625 with self.assertRaises(RuntimeError) as ctx: 626 self.state.set_attacher(MyAttacher(), FakeReactor(self)) 627 self.assertTrue( 628 "already have an attacher" in str(ctx.exception) 629 ) 630 631 def test_attacher(self): 632 @implementer(IStreamAttacher) 633 class MyAttacher(object): 634 635 def __init__(self): 636 self.streams = [] 637 self.answer = None 638 639 def attach_stream(self, stream, circuits): 640 self.streams.append(stream) 641 return self.answer 642 643 attacher = MyAttacher() 644 self.state.set_attacher(attacher, FakeReactor(self)) 645 events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL' 646 self.protocol._set_valid_events(events) 647 self.state._add_events() 648 for ignored in self.state.event_map.items(): 649 self.send(b"250 OK") 650 651 self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER") 652 self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE") 653 self.assertEqual(len(attacher.streams), 1) 654 self.assertEqual(attacher.streams[0].id, 1) 655 self.assertEqual(len(self.protocol.commands), 1) 656 self.assertEqual(self.protocol.commands[0][1], b'ATTACHSTREAM 1 0') 657 658 # we should totally ignore .exit URIs 659 attacher.streams = [] 660 self.send(b"650 STREAM 2 NEW 0 10.0.0.0.$E11D2B2269CC25E67CA6C9FB5843497539A74FD0.exit:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME") 661 self.assertEqual(len(attacher.streams), 0) 662 self.assertEqual(len(self.protocol.commands), 1) 663 664 # we should NOT ignore .onion URIs 665 attacher.streams = [] 666 self.send(b"650 STREAM 3 NEW 0 xxxxxxxxxxxxxxxx.onion:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME") 667 self.assertEqual(len(attacher.streams), 1) 668 self.assertEqual(len(self.protocol.commands), 2) 669 self.assertEqual(self.protocol.commands[1][1], b'ATTACHSTREAM 3 0') 670 671 # normal attach 672 circ = FakeCircuit(1) 673 circ.state = 'BUILT' 674 self.state.circuits[1] = circ 675 attacher.answer = circ 676 self.send(b"650 STREAM 4 NEW 0 xxxxxxxxxxxxxxxx.onion:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME") 677 self.assertEqual(len(attacher.streams), 2) 678 self.assertEqual(len(self.protocol.commands), 3) 679 self.assertEqual(self.protocol.commands[2][1], b'ATTACHSTREAM 4 1') 680 681 def test_attacher_defer(self): 682 @implementer(IStreamAttacher) 683 class MyAttacher(object): 684 685 def __init__(self, answer): 686 self.streams = [] 687 self.answer = answer 688 689 def attach_stream(self, stream, circuits): 690 self.streams.append(stream) 691 return defer.succeed(self.answer) 692 693 self.state.circuits[1] = FakeCircuit(1) 694 self.state.circuits[1].state = 'BUILT' 695 attacher = MyAttacher(self.state.circuits[1]) 696 self.state.set_attacher(attacher, FakeReactor(self)) 697 698 # boilerplate to finish enough set-up in the protocol so it 699 # works 700 events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL' 701 self.protocol._set_valid_events(events) 702 self.state._add_events() 703 for ignored in self.state.event_map.items(): 704 self.send(b"250 OK") 705 706 self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER") 707 self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE") 708 self.assertEqual(len(attacher.streams), 1) 709 self.assertEqual(attacher.streams[0].id, 1) 710 self.assertEqual(len(self.protocol.commands), 1) 711 self.assertEqual(self.protocol.commands[0][1], b'ATTACHSTREAM 1 1') 712 713 @defer.inlineCallbacks 714 def test_attacher_errors(self): 715 @implementer(IStreamAttacher) 716 class MyAttacher(object): 717 718 def __init__(self, answer): 719 self.streams = [] 720 self.answer = answer 721 722 def attach_stream(self, stream, circuits): 723 return self.answer 724 725 self.state.circuits[1] = FakeCircuit(1) 726 attacher = MyAttacher(FakeCircuit(2)) 727 self.state.set_attacher(attacher, FakeReactor(self)) 728 729 stream = Stream(self.state) 730 stream.id = 3 731 msg = '' 732 try: 733 yield self.state._maybe_attach(stream) 734 except Exception as e: 735 msg = str(e) 736 self.assertTrue('circuit unknown' in msg) 737 738 attacher.answer = self.state.circuits[1] 739 msg = '' 740 try: 741 yield self.state._maybe_attach(stream) 742 except Exception as e: 743 msg = str(e) 744 self.assertTrue('only attach to BUILT' in msg) 745 746 attacher.answer = 'not a Circuit instance' 747 msg = '' 748 try: 749 yield self.state._maybe_attach(stream) 750 except Exception as e: 751 msg = str(e) 752 self.assertTrue('Circuit instance' in msg) 753 754 def test_attacher_no_attach(self): 755 @implementer(IStreamAttacher) 756 class MyAttacher(object): 757 758 def __init__(self): 759 self.streams = [] 760 761 def attach_stream(self, stream, circuits): 762 self.streams.append(stream) 763 return TorState.DO_NOT_ATTACH 764 765 attacher = MyAttacher() 766 self.state.set_attacher(attacher, FakeReactor(self)) 767 events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL' 768 self.protocol._set_valid_events(events) 769 self.state._add_events() 770 for ignored in self.state.event_map.items(): 771 self.send(b"250 OK") 772 773 self.transport.clear() 774 self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER") 775 self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE") 776 self.assertEqual(len(attacher.streams), 1) 777 self.assertEqual(attacher.streams[0].id, 1) 778 self.assertEqual(self.transport.value(), b'') 779 780 def test_close_stream_with_id(self): 781 stream = Stream(self.state) 782 stream.id = 1 783 784 self.state.streams[1] = stream 785 self.state.close_stream(stream) 786 self.assertEqual(self.transport.value(), b'CLOSESTREAM 1 1\r\n') 787 788 def test_close_stream_with_stream(self): 789 stream = Stream(self.state) 790 stream.id = 1 791 792 self.state.streams[1] = stream 793 self.state.close_stream(stream.id) 794 self.assertEqual(self.transport.value(), b'CLOSESTREAM 1 1\r\n') 795 796 def test_close_stream_invalid_reason(self): 797 stream = Stream(self.state) 798 stream.id = 1 799 self.state.streams[1] = stream 800 self.assertRaises( 801 ValueError, 802 self.state.close_stream, 803 stream, 804 'FOO_INVALID_REASON' 805 ) 806 807 def test_close_circuit_with_id(self): 808 circuit = Circuit(self.state) 809 circuit.id = 1 810 811 self.state.circuits[1] = circuit 812 self.state.close_circuit(circuit.id) 813 self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1\r\n') 814 815 def test_close_circuit_with_circuit(self): 816 circuit = Circuit(self.state) 817 circuit.id = 1 818 819 self.state.circuits[1] = circuit 820 self.state.close_circuit(circuit) 821 self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1\r\n') 822 823 def test_close_circuit_with_flags(self): 824 circuit = Circuit(self.state) 825 circuit.id = 1 826 # try: 827 # self.state.close_circuit(circuit.id, IfUnused=True) 828 # self.assertTrue(False) 829 # except KeyError: 830 # pass 831 832 self.state.circuits[1] = circuit 833 self.state.close_circuit(circuit.id, IfUnused=True) 834 self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1 IfUnused\r\n') 835 836 def test_circuit_destroy(self): 837 self.state._circuit_update('365 LAUNCHED PURPOSE=GENERAL') 838 self.assertTrue(365 in self.state.circuits) 839 self.state._circuit_update('365 FAILED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT') 840 self.assertTrue(365 not in self.state.circuits) 841 842 def test_circuit_destroy_already(self): 843 self.state._circuit_update('365 LAUNCHED PURPOSE=GENERAL') 844 self.assertTrue(365 in self.state.circuits) 845 self.state._circuit_update('365 CLOSED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT') 846 self.assertTrue(365 not in self.state.circuits) 847 self.state._circuit_update('365 CLOSED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT') 848 self.assertTrue(365 not in self.state.circuits) 849 850 def test_circuit_listener(self): 851 events = 'CIRC STREAM ORCONN BW DEBUG INFO NOTICE WARN ERR NEWDESC ADDRMAP AUTHDIR_NEWDESCS DESCCHANGED NS STATUS_GENERAL STATUS_CLIENT STATUS_SERVER GUARD STREAM_BW CLIENTS_SEEN NEWCONSENSUS BUILDTIMEOUT_SET' 852 self.protocol._set_valid_events(events) 853 self.state._add_events() 854 for ignored in self.state.event_map.items(): 855 self.send(b"250 OK") 856 857 # we use this router later on in an EXTEND 858 self.state._update_network_status("""ns/all= 859r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0 860s Fast Guard Running Stable Unnamed Valid 861w Bandwidth=51500 862p reject 1-65535""") 863 864 expected = [('new', {'id': 456}), 865 ('launched', {}), 866 ('extend', {'id': 123}) 867 ] 868 listen = CircuitListener(expected) 869 # first add a Circuit before we listen 870 self.protocol.dataReceived(b"650 CIRC 123 LAUNCHED PURPOSE=GENERAL\r\n") 871 self.assertEqual(len(self.state.circuits), 1) 872 873 # make sure we get added to existing circuits 874 self.state.add_circuit_listener(listen) 875 first_circuit = list(self.state.circuits.values())[0] 876 self.assertTrue(listen in first_circuit.listeners) 877 878 # now add a Circuit after we started listening 879 self.protocol.dataReceived(b"650 CIRC 456 LAUNCHED PURPOSE=GENERAL\r\n") 880 self.assertEqual(len(self.state.circuits), 2) 881 self.assertTrue(listen in list(self.state.circuits.values())[0].listeners) 882 self.assertTrue(listen in list(self.state.circuits.values())[1].listeners) 883 884 # now update the first Circuit to ensure we're really, really 885 # listening 886 self.protocol.dataReceived(b"650 CIRC 123 EXTENDED $D82183B1C09E1D7795FF2D7116BAB5106AA3E60E~PPrivCom012 PURPOSE=GENERAL\r\n") 887 self.assertEqual(len(listen.expected), 0) 888 889 def test_router_from_id_invalid_key(self): 890 self.failUnlessRaises(KeyError, self.state.router_from_id, 'somethingcompletelydifferent..thatis42long') 891 892 def test_router_from_named_router(self): 893 r = self.state.router_from_id('$AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=foo') 894 self.assertEqual(r.id_hex, '$AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA') 895 self.assertEqual(r.unique_name, 'foo') 896 897 def confirm_router_state(self, x): 898 self.assertTrue('$624926802351575FF7E4E3D60EFA3BFB56E67E8A' in self.state.routers) 899 router = self.state.routers['$624926802351575FF7E4E3D60EFA3BFB56E67E8A'] 900 self.assertTrue('exit' in router.flags) 901 self.assertTrue('fast' in router.flags) 902 self.assertTrue('guard' in router.flags) 903 self.assertTrue('hsdir' in router.flags) 904 self.assertTrue('named' in router.flags) 905 self.assertTrue('running' in router.flags) 906 self.assertTrue('stable' in router.flags) 907 self.assertTrue('v2dir' in router.flags) 908 self.assertTrue('valid' in router.flags) 909 self.assertTrue('futureproof' in router.flags) 910 self.assertEqual(router.bandwidth, 518000) 911 self.assertTrue(router.accepts_port(43)) 912 self.assertTrue(router.accepts_port(53)) 913 self.assertTrue(not router.accepts_port(44)) 914 self.assertTrue(router.accepts_port(989)) 915 self.assertTrue(router.accepts_port(990)) 916 self.assertTrue(not router.accepts_port(991)) 917 self.assertTrue(not router.accepts_port(988)) 918 919 def test_router_with_ipv6_address(self): 920 self.state._update_network_status("""ns/all= 921r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0 922a [2001:0:0:0::0]:4321 923s Fast Guard Running Stable Named Valid 924w Bandwidth=51500 925p reject 1-65535""") 926 self.assertEqual(len(self.state.routers_by_name['PPrivCom012'][0].ip_v6), 1) 927 self.assertEqual(self.state.routers_by_name['PPrivCom012'][0].ip_v6[0], '[2001:0:0:0::0]:4321') 928 929 def test_invalid_routers(self): 930 try: 931 self.state._update_network_status('''ns/all= 932r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 933r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 934s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 935w Bandwidth=518000 936p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888 937.''') 938 self.fail() 939 940 except RuntimeError as e: 941 # self.assertTrue('Illegal state' in str(e)) 942 # flip back when we go back to Automat 943 self.assertTrue('Expected "s "' in str(e)) 944 945 def test_routers_no_policy(self): 946 """ 947 ensure we can parse a router descriptor which has no p line 948 """ 949 950 self.state._update_network_status('''ns/all= 951r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 952s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 953w Bandwidth=518000 954r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0 955s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 956w Bandwidth=518000 957p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888 958.''') 959 self.assertTrue('fake' in self.state.routers.keys()) 960 self.assertTrue('PPrivCom012' in self.state.routers.keys()) 961 962 def test_routers_no_bandwidth(self): 963 """ 964 ensure we can parse a router descriptor which has no w line 965 """ 966 967 self.state._update_network_status('''ns/all= 968r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 969s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 970r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0 971s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 972w Bandwidth=518000 973p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888 974.''') 975 self.assertTrue('fake' in self.state.routers.keys()) 976 self.assertTrue('PPrivCom012' in self.state.routers.keys()) 977 978 def test_router_factory(self): 979 self.state._update_network_status('''ns/all= 980r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 981s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 982w Bandwidth=518000 983p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888 984r fake YxxmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 985s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof 986w Bandwidth=543000 987p accept 43,53 988.''') 989 self.assertTrue('$624926802351575FF7E4E3D60EFA3BFB56E67E8A' in self.state.routers) 990 r = self.state.routers['$624926802351575FF7E4E3D60EFA3BFB56E67E8A'] 991 self.assertEqual(r.controller, self.state.protocol) 992 self.assertEqual(r.bandwidth, 518000) 993 self.assertEqual(len(self.state.routers_by_name['fake']), 2) 994 995 # now we do an update 996 self.state._update_network_status('''ns/all= 997r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80 998s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof Authority 999w Bandwidth=543000 1000p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888 1001.''') 1002 self.assertEqual(r.bandwidth, 543000) 1003 1004 def test_empty_stream_update(self): 1005 self.state._stream_update('''stream-status=''') 1006 1007 def test_addrmap(self): 1008 self.state._addr_map('example.com 127.0.0.1 "2012-01-01 00:00:00" EXPIRES=NEVER') 1009 1010 def test_double_newconsensus(self): 1011 """ 1012 The arrival of a second NEWCONSENSUS event causes parsing 1013 errors. 1014 """ 1015 1016 # bootstrap the TorState so we can send it a "real" 650 1017 # update 1018 1019 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 1020 self.state._bootstrap() 1021 1022 self.send(b"250+ns/all=") 1023 self.send(b".") 1024 self.send(b"250 OK") 1025 1026 self.send(b"250+circuit-status=") 1027 self.send(b".") 1028 self.send(b"250 OK") 1029 1030 self.send(b"250-stream-status=") 1031 self.send(b"250 OK") 1032 1033 self.send(b"250-address-mappings/all=") 1034 self.send(b'250 OK') 1035 1036 for ignored in self.state.event_map.items(): 1037 self.send(b"250 OK") 1038 1039 self.send(b"250-entry-guards=") 1040 self.send(b"250 OK") 1041 1042 self.send(b"250 OK") 1043 1044 # state is now bootstrapped, we can send our NEWCONSENSUS update 1045 1046 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1047r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1048s Fast Guard Running Stable Valid 1049w Bandwidth=166 1050p reject 1-65535 1051. 1052650 OK 1053'''.split(b'\n'))) 1054 1055 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1056r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1057s Fast Guard Running Stable Valid 1058w Bandwidth=166 1059p reject 1-65535 1060. 1061650 OK 1062'''.split(b'\n'))) 1063 1064 self.assertEqual(1, len(self.state.all_routers)) 1065 self.assertTrue('Unnamed' in self.state.routers) 1066 self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers) 1067 1068 def test_newconsensus_remove_routers(self): 1069 """ 1070 router removed from consensus is removed 1071 """ 1072 1073 # bootstrap the TorState so we can send it a "real" 650 1074 # update 1075 1076 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 1077 self.state._bootstrap() 1078 1079 self.send(b"250+ns/all=") 1080 self.send(b".") 1081 self.send(b"250 OK") 1082 1083 self.send(b"250+circuit-status=") 1084 self.send(b".") 1085 self.send(b"250 OK") 1086 1087 self.send(b"250-stream-status=") 1088 self.send(b"250 OK") 1089 1090 self.send(b"250-address-mappings/all=") 1091 self.send(b'250 OK') 1092 1093 for ignored in self.state.event_map.items(): 1094 self.send(b"250 OK") 1095 1096 self.send(b"250-entry-guards=") 1097 self.send(b"250 OK") 1098 1099 self.send(b"250 OK") 1100 1101 # state is now bootstrapped, we can send our NEWCONSENSUS update 1102 1103 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1104r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1105s Fast Guard Running Stable Valid 1106w Bandwidth=166 1107p reject 1-65535 1108r Foo ABJJJJUFz1lvQS0jq8nhTdRiXEk /zzzUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1109s Fast Guard Running Stable Valid 1110w Bandwidth=166 1111p reject 1-65535 1112. 1113650 OK 1114'''.split(b'\n'))) 1115 1116 self.assertEqual(2, len(self.state.all_routers)) 1117 self.assertTrue('Unnamed' in self.state.routers) 1118 self.assertTrue('Foo' in self.state.routers_by_name) 1119 self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers) 1120 self.assertTrue('$001249249505CF596F412D23ABC9E14DD4625C49' in self.state.routers) 1121 1122 # this is a different fingerprint, but same name 1123 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1124r Unnamed ABBBguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1125s Fast Guard Running Stable Valid 1126w Bandwidth=166 1127p reject 1-65535 1128. 1129650 OK 1130'''.split(b'\n'))) 1131 1132 self.assertEqual(1, len(self.state.all_routers)) 1133 self.assertIn('Unnamed', self.state.routers) 1134 self.assertIn('$00104182E505CF596F412D23ABC9E14DD4625C49', self.state.routers) 1135 self.assertNotIn('$00126582E505CF596F412D23ABC9E14DD4625C49', self.state.routers) 1136 self.assertNotIn('$00126582E505CF596F412D23ABC9E14DD4625C49', self.state.routers_by_hash) 1137 self.assertNotIn('$001249249505CF596F412D23ABC9E14DD4625C49', self.state.routers) 1138 self.assertNotIn('$001249249505CF596F412D23ABC9E14DD4625C49', self.state.routers_by_hash) 1139 self.assertNotIn('Foo', self.state.routers_by_name) 1140 1141 def test_NEWCONSENSUS_ends_with_OK_on_w(self): 1142 """ 1143 The arrival of a second NEWCONSENSUS event causes parsing 1144 errors. 1145 """ 1146 1147 # bootstrap the TorState so we can send it a "real" 650 1148 # update 1149 1150 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 1151 self.state._bootstrap() 1152 1153 self.send(b"250+ns/all=") 1154 self.send(b".") 1155 self.send(b"250 OK") 1156 1157 self.send(b"250+circuit-status=") 1158 self.send(b".") 1159 self.send(b"250 OK") 1160 1161 self.send(b"250-stream-status=") 1162 self.send(b"250 OK") 1163 1164 self.send(b"250-address-mappings/all=") 1165 self.send(b"250 OK") 1166 1167 for ignored in self.state.event_map.items(): 1168 self.send(b"250 OK") 1169 1170 self.send(b"250-entry-guards=") 1171 self.send(b"250 OK") 1172 1173 self.send(b"250 OK") 1174 1175 # state is now bootstrapped, we can send our NEWCONSENSUS update 1176 1177 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1178r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1179s Fast Guard Running Stable Valid 1180w Bandwidth=166 1181. 1182650 OK 1183'''.split(b'\n'))) 1184 1185 self.assertTrue('Unnamed' in self.state.routers) 1186 self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers) 1187 1188 def test_NEWCONSENSUS_ends_with_OK_on_s(self): 1189 """ 1190 The arrival of a second NEWCONSENSUS event causes parsing 1191 errors. 1192 """ 1193 1194 # bootstrap the TorState so we can send it a "real" 650 1195 # update 1196 1197 self.protocol._set_valid_events(' '.join(self.state.event_map.keys())) 1198 self.state._bootstrap() 1199 1200 self.send(b"250+ns/all=") 1201 self.send(b".") 1202 self.send(b"250 OK") 1203 1204 self.send(b"250+circuit-status=") 1205 self.send(b".") 1206 self.send(b"250 OK") 1207 1208 self.send(b"250-stream-status=") 1209 self.send(b"250 OK") 1210 1211 self.send(b"250-address-mappings/all=") 1212 self.send(b"250 OK") 1213 1214 for ignored in self.state.event_map.items(): 1215 self.send(b"250 OK") 1216 1217 self.send(b"250-entry-guards=") 1218 self.send(b"250 OK") 1219 1220 self.send(b"250 OK") 1221 1222 # state is now bootstrapped, we can send our NEWCONSENSUS update 1223 1224 self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS 1225r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0 1226s Fast Guard Running Stable Valid 1227. 1228650 OK 1229'''.split(b'\n'))) 1230 1231 self.assertTrue('Unnamed' in self.state.routers) 1232 self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers) 1233 1234 def test_stream_create(self): 1235 self.state._stream_update('1610 NEW 0 1.2.3.4:56') 1236 self.assertTrue(1610 in self.state.streams) 1237 1238 def test_stream_destroy(self): 1239 self.state._stream_update('1610 NEW 0 1.2.3.4:56') 1240 self.assertTrue(1610 in self.state.streams) 1241 self.state._stream_update("1610 FAILED 0 www.example.com:0 REASON=DONE REMOTE_REASON=FAILED") 1242 self.assertTrue(1610 not in self.state.streams) 1243 1244 def test_stream_detach(self): 1245 circ = FakeCircuit(1) 1246 circ.state = 'BUILT' 1247 self.state.circuits[1] = circ 1248 1249 self.state._stream_update('1610 NEW 0 1.2.3.4:56') 1250 self.assertTrue(1610 in self.state.streams) 1251 self.state._stream_update("1610 SUCCEEDED 1 4.3.2.1:80") 1252 self.assertEqual(self.state.streams[1610].circuit, circ) 1253 1254 self.state._stream_update("1610 DETACHED 0 www.example.com:0 REASON=DONE REMOTE_REASON=FAILED") 1255 self.assertEqual(self.state.streams[1610].circuit, None) 1256 1257 def test_stream_listener(self): 1258 self.protocol._set_valid_events('CIRC STREAM ORCONN BW DEBUG INFO NOTICE WARN ERR NEWDESC ADDRMAP AUTHDIR_NEWDESCS DESCCHANGED NS STATUS_GENERAL STATUS_CLIENT STATUS_SERVER GUARD STREAM_BW CLIENTS_SEEN NEWCONSENSUS BUILDTIMEOUT_SET') 1259 self.state._add_events() 1260 for ignored in self.state.event_map.items(): 1261 self.send(b"250 OK") 1262 1263 expected = [('new', {}), 1264 ] 1265 listen = StreamListener(expected) 1266 self.send(b"650 STREAM 77 NEW 0 www.yahoo.cn:80 SOURCE_ADDR=127.0.0.1:54315 PURPOSE=USER") 1267 self.state.add_stream_listener(listen) 1268 1269 self.assertEqual(1, len(self.state.streams.values())) 1270 self.assertTrue(listen in list(self.state.streams.values())[0].listeners) 1271 self.assertEqual(len(self.state.streams), 1) 1272 self.assertEqual(len(listen.expected), 1) 1273 1274 self.send(b"650 STREAM 78 NEW 0 www.yahoo.cn:80 SOURCE_ADDR=127.0.0.1:54315 PURPOSE=USER") 1275 self.assertEqual(len(self.state.streams), 2) 1276 self.assertEqual(len(listen.expected), 0) 1277 1278 def test_build_circuit(self): 1279 class FakeRouter: 1280 def __init__(self, i): 1281 self.id_hex = i 1282 self.flags = [] 1283 1284 path = [] 1285 for x in range(3): 1286 path.append(FakeRouter("$%040d" % x)) 1287 # can't just check flags for guard status, need to know if 1288 # it's in the running Tor's notion of Entry Guards 1289 path[0].flags = ['guard'] 1290 1291 self.state.build_circuit(path, using_guards=True) 1292 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n') 1293 # should have gotten a warning about this not being an entry 1294 # guard 1295 self.assertEqual(len(self.flushWarnings()), 1) 1296 1297 def test_build_circuit_no_routers(self): 1298 self.state.build_circuit() 1299 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0\r\n') 1300 1301 def test_build_circuit_unfound_router(self): 1302 self.state.build_circuit(routers=[b'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'], using_guards=False) 1303 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\r\n') 1304 1305 def circuit_callback(self, circ): 1306 self.assertTrue(isinstance(circ, Circuit)) 1307 self.assertEqual(circ.id, 1234) 1308 1309 def test_build_circuit_final_callback(self): 1310 class FakeRouter: 1311 def __init__(self, i): 1312 self.id_hex = i 1313 self.flags = [] 1314 1315 path = [] 1316 for x in range(3): 1317 path.append(FakeRouter("$%040d" % x)) 1318 # can't just check flags for guard status, need to know if 1319 # it's in the running Tor's notion of Entry Guards 1320 path[0].flags = ['guard'] 1321 1322 # FIXME TODO we should verify we get a circuit_new event for 1323 # this circuit 1324 1325 d = self.state.build_circuit(path, using_guards=True) 1326 d.addCallback(self.circuit_callback) 1327 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n') 1328 self.send(b"250 EXTENDED 1234") 1329 # should have gotten a warning about this not being an entry 1330 # guard 1331 self.assertEqual(len(self.flushWarnings()), 1) 1332 return d 1333 1334 def test_build_circuit_error(self): 1335 """ 1336 tests that we check the callback properly 1337 """ 1338 1339 try: 1340 self.state._find_circuit_after_extend("FOO 1234") 1341 self.assertTrue(False) 1342 except RuntimeError as e: 1343 self.assertTrue('Expected EXTENDED' in str(e)) 1344 1345 def test_listener_mixins(self): 1346 self.assertTrue(verifyClass(IStreamListener, StreamListenerMixin)) 1347 self.assertTrue(verifyClass(ICircuitListener, CircuitListenerMixin)) 1348 1349 def test_build_circuit_timedout(self): 1350 class FakeRouter: 1351 def __init__(self, i): 1352 self.id_hex = i 1353 self.flags = [] 1354 1355 path = [] 1356 for x in range(3): 1357 path.append(FakeRouter("$%040d" % x)) 1358 # can't just check flags for guard status, need to know if 1359 # it's in the running Tor's notion of Entry Guards 1360 path[0].flags = ['guard'] 1361 1362 # FIXME TODO we should verify we get a circuit_new event for 1363 # this circuit 1364 timeout = 10 1365 clock = task.Clock() 1366 1367 d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False) 1368 clock.advance(10) 1369 1370 def check_for_timeout_error(f): 1371 self.assertTrue(isinstance(f.type(), CircuitBuildTimedOutError)) 1372 d.addErrback(check_for_timeout_error) 1373 return d 1374 1375 @defer.inlineCallbacks 1376 def test_build_circuit_cancelled(self): 1377 class FakeRouter: 1378 def __init__(self, i): 1379 self.id_hex = i 1380 self.flags = [] 1381 1382 path = [] 1383 for x in range(3): 1384 path.append(FakeRouter("$%040d" % x)) 1385 # can't just check flags for guard status, need to know if 1386 # it's in the running Tor's notion of Entry Guards 1387 path[0].flags = ['guard'] 1388 1389 class FakeCircuit: 1390 close_called = False 1391 1392 def when_built(self): 1393 return defer.Deferred() 1394 1395 def close(self): 1396 self.close_called = True 1397 return defer.succeed(None) 1398 1399 circ = FakeCircuit() 1400 1401 def _build(*args, **kw): 1402 return defer.succeed(circ) 1403 self.state.build_circuit = _build 1404 1405 timeout = 10 1406 clock = task.Clock() 1407 1408 # we want this circuit to get to BUILT, but *then* we call 1409 # .cancel() on the deferred -- in which case, the circuit must 1410 # be closed 1411 d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False) 1412 clock.advance(1) 1413 d.cancel() 1414 1415 with self.assertRaises(CircuitBuildTimedOutError): 1416 yield d 1417 self.assertTrue(circ.close_called) 1418 1419 def test_build_circuit_timeout_after_progress(self): 1420 """ 1421 Similar to above but we timeout after Tor has ack'd our 1422 circuit-creation attempt, but before reaching BUILT. 1423 """ 1424 class FakeRouter: 1425 def __init__(self, i): 1426 self.id_hex = i 1427 self.flags = [] 1428 1429 class FakeCircuit(Circuit): 1430 def close(self): 1431 return defer.succeed(None) 1432 1433 path = [] 1434 for x in range(3): 1435 path.append(FakeRouter("$%040d" % x)) 1436 1437 def fake_queue(cmd): 1438 self.assertTrue(cmd.startswith('EXTENDCIRCUIT 0')) 1439 return defer.succeed("EXTENDED 1234") 1440 1441 queue_command = patch.object(self.protocol, 'queue_command', fake_queue) 1442 circuit_factory = patch.object(self.state, 'circuit_factory', FakeCircuit) 1443 with queue_command, circuit_factory: 1444 timeout = 10 1445 clock = task.Clock() 1446 1447 d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False) 1448 clock.advance(timeout + 1) 1449 1450 def check_for_timeout_error(f): 1451 self.assertTrue(isinstance(f.type(), CircuitBuildTimedOutError)) 1452 d.addErrback(check_for_timeout_error) 1453 return d 1454 1455 def test_build_circuit_not_timedout(self): 1456 class FakeRouter: 1457 def __init__(self, i): 1458 self.id_hex = i 1459 self.flags = [] 1460 1461 path = [] 1462 for x in range(3): 1463 path.append(FakeRouter("$%040d" % x)) 1464 path[0].flags = ['guard'] 1465 1466 timeout = 10 1467 clock = task.Clock() 1468 d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=True) 1469 d.addCallback(self.circuit_callback) 1470 1471 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n') 1472 self.send(b"250 EXTENDED 1234") 1473 # we can't just .send(b'650 CIRC 1234 BUILT') this because we 1474 # didn't fully hook up the protocol to the state, e.g. via 1475 # post_bootstrap etc. 1476 self.state.circuits[1234].update(['1234', 'BUILT']) 1477 # should have gotten a warning about this not being an entry 1478 # guard 1479 self.assertEqual(len(self.flushWarnings()), 1) 1480 return d 1481 1482 def test_build_circuit_failure(self): 1483 class FakeRouter: 1484 def __init__(self, i): 1485 self.id_hex = i 1486 self.flags = [] 1487 1488 path = [] 1489 for x in range(3): 1490 path.append(FakeRouter("$%040d" % x)) 1491 path[0].flags = ['guard'] 1492 1493 timeout = 10 1494 clock = task.Clock() 1495 d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=True) 1496 d.addCallback(self.circuit_callback) 1497 1498 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n') 1499 self.send(b"250 EXTENDED 1234") 1500 # we can't just .send(b'650 CIRC 1234 BUILT') this because we 1501 # didn't fully hook up the protocol to the state, e.g. via 1502 # post_bootstrap etc. 1503 self.state.circuits[1234].update(['1234', 'FAILED', 'REASON=TIMEOUT']) 1504 1505 def check_reason(fail): 1506 self.assertEqual(fail.value.reason, 'TIMEOUT') 1507 d.addErrback(check_reason) 1508 1509 return d 1510 1511 def test_build_circuit_with_purpose(self): 1512 class FakeRouter: 1513 def __init__(self, i): 1514 self.id_hex = i 1515 self.flags = [] 1516 1517 path = [] 1518 for x in range(3): 1519 path.append(FakeRouter("$%040d" % x)) 1520 path[0].flags = ['guard'] 1521 1522 d = self.state.build_circuit(path, using_guards=True, purpose="general") 1523 d.addCallback(self.circuit_callback) 1524 1525 self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002 purpose=general\r\n') 1526 self.send(b"250 EXTENDED 1234") 1527 # we can't just .send(b'650 CIRC 1234 BUILT') this because we 1528 # didn't fully hook up the protocol to the state, e.g. via 1529 # post_bootstrap etc. 1530 self.state.circuits[1234].update(['1234', 'FAILED', 'REASON=TIMEOUT']) 1531 1532 def check_reason(fail): 1533 self.assertEqual(fail.value.reason, 'TIMEOUT') 1534 d.addErrback(check_reason) 1535 1536 return d 1537 1538 1539class ComposibleListenerTests(unittest.TestCase): 1540 1541 def setUp(self): 1542 self.protocol = TorControlProtocol() 1543 self.state = TorState(self.protocol) 1544 1545 def test_circuit_new(self): 1546 listener_calls = [] 1547 1548 @self.state.on_circuit_new 1549 def _(circ): 1550 listener_calls.append(circ) 1551 1552 c = self.state._maybe_create_circuit("42") 1553 c.update(["42", "LAUNCHED"]) 1554 1555 self.assertEqual(len(listener_calls), 1) 1556 self.assertEqual(listener_calls[0].id, 42) 1557 1558 def test_circuit_launched(self): 1559 listener_calls = [] 1560 1561 @self.state.on_circuit_launched 1562 def _(circ): 1563 listener_calls.append(circ) 1564 1565 c = self.state._maybe_create_circuit("42") 1566 c.update(["42", "LAUNCHED"]) 1567 1568 self.assertEqual(len(listener_calls), 1) 1569 self.assertEqual(listener_calls[0].id, 42) 1570 1571 def test_circuit_extend(self): 1572 listener_calls = [] 1573 1574 @self.state.on_circuit_extend 1575 def _(circ, router): 1576 listener_calls.append(circ) 1577 1578 c = self.state._maybe_create_circuit("42") 1579 c.update(["42", "LAUNCHED"]) 1580 c.update(["42", "BUILDING", "$deadbeef,$1ee7"]) 1581 1582 # we get two calls because we've now extended to 2 relays 1583 self.assertEqual(len(listener_calls), 2) 1584 self.assertEqual(listener_calls[0].id, 42) 1585 self.assertEqual(listener_calls[1].id, 42) 1586 1587 def test_circuit_built(self): 1588 listener_calls = [] 1589 1590 @self.state.on_circuit_built 1591 def _(circ): 1592 listener_calls.append(circ) 1593 1594 c = self.state._maybe_create_circuit("42") 1595 c.update(["42", "LAUNCHED"]) 1596 c.update(["42", "BUILT"]) 1597 1598 self.assertEqual(len(listener_calls), 1) 1599 self.assertEqual(listener_calls[0].id, 42) 1600 1601 def test_circuit_closed(self): 1602 listener_calls = [] 1603 1604 @self.state.on_circuit_closed 1605 def _(circ): 1606 listener_calls.append(circ) 1607 1608 c = self.state._maybe_create_circuit("42") 1609 c.update(["42", "LAUNCHED"]) 1610 c.update(["42", "CLOSED"]) 1611 1612 self.assertEqual(len(listener_calls), 1) 1613 self.assertEqual(listener_calls[0].id, 42) 1614 1615 def test_circuit_failed(self): 1616 listener_calls = [] 1617 1618 @self.state.on_circuit_failed 1619 def _(circ): 1620 listener_calls.append(circ) 1621 1622 c = self.state._maybe_create_circuit("42") 1623 c.update(["42", "LAUNCHED"]) 1624 c.update(["42", "FAILED"]) 1625 1626 self.assertEqual(len(listener_calls), 1) 1627 self.assertEqual(listener_calls[0].id, 42) 1628 1629 def test_stream_events(self): 1630 """ 1631 one for the philosophers: is this 'one, but big' test better / 1632 easier to read than the N circuit tests above? 1633 """ 1634 listener_calls = [] # list of 2-tuples 1635 1636 @self.state.on_stream_new 1637 def _(stream): 1638 listener_calls.append(("new", stream.id)) 1639 1640 @self.state.on_stream_succeeded 1641 def _(stream): 1642 listener_calls.append(("succeeded", stream.id)) 1643 1644 @self.state.on_stream_attach 1645 def _(stream, circ): 1646 listener_calls.append(("attach", stream.id)) 1647 1648 @self.state.on_stream_detach 1649 def _(stream): 1650 listener_calls.append(("detach", stream.id)) 1651 1652 @self.state.on_stream_closed 1653 def _(stream): 1654 listener_calls.append(("closed", stream.id)) 1655 1656 @self.state.on_stream_failed 1657 def _(stream): 1658 listener_calls.append(("failed", stream.id)) 1659 1660 circ = self.state._maybe_create_circuit("42") 1661 circ.update(["42", "LAUNCHED"]) 1662 1663 self.state._stream_update("1234 NEW 0 meejah.ca:80") 1664 self.state._stream_update("1234 SUCCEEDED 42") 1665 self.state._stream_update("1234 DETACHED 0") 1666 self.state._stream_update("1234 CLOSED 0") 1667 self.state._stream_update("1234 FAILED 0") 1668 1669 self.assertEqual( 1670 listener_calls, 1671 [ 1672 ("new", 1234), 1673 ("succeeded", 1234), 1674 ("attach", 1234), 1675 ("detach", 1234), 1676 ("closed", 1234), 1677 ("failed", 1234), 1678 ] 1679 ) 1680