1from __future__ import print_function
2
3from zope.interface import implementer, directlyProvides
4from zope.interface.verify import verifyClass
5from twisted.trial import unittest
6from twisted.test import proto_helpers
7from twisted.python.failure import Failure
8from twisted.internet import task, defer
9from twisted.internet.interfaces import IStreamClientEndpoint, IReactorCore
10
11import tempfile
12
13from ipaddress import IPv4Address
14
15from mock import patch, Mock
16
17from txtorcon import TorControlProtocol
18from txtorcon import TorProtocolError
19from txtorcon import TorState
20from txtorcon import Stream
21from txtorcon import Circuit
22from txtorcon import build_tor_connection
23from txtorcon import build_local_tor_connection
24from txtorcon import build_timeout_circuit
25from txtorcon import CircuitBuildTimedOutError
26from txtorcon.interface import IStreamAttacher
27from txtorcon.interface import ICircuitListener
28from txtorcon.interface import IStreamListener
29from txtorcon.interface import StreamListenerMixin
30from txtorcon.interface import CircuitListenerMixin
31from txtorcon.circuit import _get_circuit_attacher
32from txtorcon.circuit import _extract_reason
33
34try:
35    from .py3_torstate import TorStatePy3Tests  # noqa
36except SyntaxError:
37    pass
38
39
40@implementer(ICircuitListener)
41class CircuitListener(object):
42
43    def __init__(self, expected):
44        "expect is a list of tuples: (event, {key:value, key1:value1, ..})"
45        self.expected = expected
46
47    def checker(self, state, circuit, arg=None):
48        if self.expected[0][0] != state:
49            raise RuntimeError(
50                'Expected event "%s" not "%s".' %
51                (self.expected[0][0], state)
52            )
53        for (k, v) in self.expected[0][1].items():
54            if k == 'arg':
55                if v != arg:
56                    raise RuntimeError(
57                        'Expected argument to have value "%s", not "%s"' % (arg, v)
58                    )
59            elif getattr(circuit, k) != v:
60                raise RuntimeError(
61                    'Expected attribute "%s" to have value "%s", not "%s"' %
62                    (k, v, getattr(circuit, k))
63                )
64        self.expected = self.expected[1:]
65
66    def circuit_new(self, circuit):
67        self.checker('new', circuit)
68
69    def circuit_launched(self, circuit):
70        self.checker('launched', circuit)
71
72    def circuit_extend(self, circuit, router):
73        self.checker('extend', circuit, router)
74
75    def circuit_built(self, circuit):
76        self.checker('built', circuit)
77
78    def circuit_closed(self, circuit, **kw):
79        self.checker('closed', circuit, **kw)
80
81    def circuit_failed(self, circuit, **kw):
82        self.checker('failed', circuit, **kw)
83
84
85@implementer(IStreamListener)
86class StreamListener(object):
87
88    def __init__(self, expected):
89        "expect is a list of tuples: (event, {key:value, key1:value1, ..})"
90        self.expected = expected
91
92    def checker(self, state, stream, arg=None):
93        if self.expected[0][0] != state:
94            raise RuntimeError(
95                'Expected event "%s" not "%s".' % (self.expected[0][0], state)
96            )
97        for (k, v) in self.expected[0][1].items():
98            if k == 'arg':
99                if v != arg:
100                    raise RuntimeError(
101                        'Expected argument to have value "%s", not "%s"' %
102                        (arg, v)
103                    )
104            elif getattr(stream, k) != v:
105                raise RuntimeError(
106                    'Expected attribute "%s" to have value "%s", not "%s"' %
107                    (k, v, getattr(stream, k))
108                )
109        self.expected = self.expected[1:]
110
111    def stream_new(self, stream):
112        self.checker('new', stream)
113
114    def stream_succeeded(self, stream):
115        self.checker('succeeded', stream)
116
117    def stream_attach(self, stream, circuit):
118        self.checker('attach', stream, circuit)
119
120    def stream_closed(self, stream):
121        self.checker('closed', stream)
122
123    def stream_failed(self, stream, reason, remote_reason):
124        self.checker('failed', stream, reason)
125
126
127@implementer(IReactorCore)
128class FakeReactor:
129
130    def __init__(self, test):
131        self.test = test
132
133    def addSystemEventTrigger(self, *args):
134        self.test.assertEqual(args[0], 'before')
135        self.test.assertEqual(args[1], 'shutdown')
136        self.test.assertEqual(args[2], self.test.state.undo_attacher)
137        return 1
138
139    def removeSystemEventTrigger(self, id):
140        self.test.assertEqual(id, 1)
141
142    def connectTCP(self, *args, **kw):
143        """for testing build_tor_connection"""
144        raise RuntimeError('connectTCP: ' + str(args))
145
146    def connectUNIX(self, *args, **kw):
147        """for testing build_tor_connection"""
148        raise RuntimeError('connectUNIX: ' + str(args))
149
150
151class FakeCircuit(Circuit):
152
153    def __init__(self, id=-999):
154        self.streams = []
155        self.id = id
156        self.state = 'BOGUS'
157
158
159@implementer(IStreamClientEndpoint)
160class FakeEndpoint:
161
162    def get_info_raw(self, keys):
163        ans = '\r\n'.join(map(lambda k: '%s=' % k, keys.split()))
164        return defer.succeed(ans)
165
166    def get_info_incremental(self, key, linecb):
167        linecb('%s=' % key)
168        return defer.succeed('')
169
170    def connect(self, protocol_factory):
171        self.proto = TorControlProtocol()
172        self.proto.transport = proto_helpers.StringTransport()
173        self.proto.get_info_raw = self.get_info_raw
174        self.proto.get_info_incremental = self.get_info_incremental
175        self.proto._set_valid_events(
176            'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL'
177        )
178
179        return defer.succeed(self.proto)
180
181
182@implementer(IStreamClientEndpoint)
183class FakeEndpointAnswers:
184
185    def __init__(self, answers):
186        self.answers = answers
187        # since we use pop() we need these to be "backwards"
188        self.answers.reverse()
189
190    def add_event_listener(self, *args):
191        return defer.succeed(None)
192
193    def get_info_raw(self, keys):
194        ans = ''
195        for k in keys.split():
196            if len(self.answers) == 0:
197                raise TorProtocolError(551, "ran out of answers")
198            ans += '%s=%s\r\n' % (k, self.answers.pop())
199        return ans[:-2]                 # don't want trailing \r\n
200
201    def get_info_incremental(self, key, linecb):
202        data = self.answers.pop().split('\n')
203        if len(data) == 1:
204            linecb('{}={}'.format(key, data[0]))
205        else:
206            linecb('{}='.format(key))
207            for line in data:
208                linecb(line)
209        return defer.succeed('')
210
211    def connect(self, protocol_factory):
212        self.proto = TorControlProtocol()
213        self.proto.transport = proto_helpers.StringTransport()
214        self.proto.get_info_raw = self.get_info_raw
215        self.proto.get_info_incremental = self.get_info_incremental
216        self.proto.add_event_listener = self.add_event_listener
217        self.proto._set_valid_events(
218            'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL'
219        )
220
221        return defer.succeed(self.proto)
222
223
224class BootstrapTests(unittest.TestCase):
225
226    def confirm_proto(self, x):
227        self.assertTrue(isinstance(x, TorControlProtocol))
228        self.assertTrue(x.post_bootstrap.called)
229        return x
230
231    def confirm_state(self, x):
232        self.assertIsInstance(x, TorState)
233        self.assertTrue(x.post_bootstrap.called)
234        return x
235
236    def confirm_consensus(self, x):
237        self.assertEqual(2, len(x.all_routers))
238        self.assertIn('fake', x.routers)
239        self.assertIn('ekaf', x.routers)
240        return x
241
242    def test_build(self):
243        p = FakeEndpoint()
244        d = build_tor_connection(p, build_state=False)
245        d.addCallback(self.confirm_proto)
246        p.proto.post_bootstrap.callback(p.proto)
247        return d
248
249    def test_build_tcp(self):
250        d = build_tor_connection((FakeReactor(self), '127.0.0.1', 1234))
251        d.addCallback(self.fail)
252        d.addErrback(lambda x: None)
253        return d
254
255    def test_build_unix(self):
256        tf = tempfile.NamedTemporaryFile()
257        d = build_tor_connection((FakeReactor(self), tf.name))
258        d.addCallback(self.fail)
259        d.addErrback(lambda x: None)
260        return d
261
262    def test_build_unix_wrong_permissions(self):
263        self.assertRaises(
264            ValueError,
265            build_tor_connection,
266            (FakeReactor(self), 'a non-existant filename')
267        )
268
269    def test_build_wrong_size_tuple(self):
270        self.assertRaises(TypeError, build_tor_connection, (1, 2, 3, 4))
271
272    def test_build_wrong_args_entirely(self):
273        self.assertRaises(
274            TypeError,
275            build_tor_connection,
276            'incorrect argument'
277        )
278
279    def confirm_pid(self, state):
280        self.assertEqual(state.tor_pid, 1234)
281        return state
282
283    def confirm_no_pid(self, state):
284        self.assertEqual(state.tor_pid, 0)
285
286    def test_build_with_answers(self):
287        p = FakeEndpointAnswers(['',     # ns/all
288                                 '',     # circuit-status
289                                 '',     # stream-status
290                                 '',     # address-mappings/all
291                                 '',     # entry-guards
292                                 '1234'  # PID
293                                 ])
294
295        d = build_tor_connection(p, build_state=True)
296        d.addCallback(self.confirm_state).addErrback(self.fail)
297        d.addCallback(self.confirm_pid).addErrback(self.fail)
298        p.proto.post_bootstrap.callback(p.proto)
299        return d
300
301    def test_build_with_answers_ns(self):
302        fake_consensus = '\n'.join([
303            'r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 11.11.11.11 443 80',
304            's Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof',
305            'r ekaf foooooooooooooooooooooooooo barbarbarbarbarbarbarbarbar 2011-11-11 16:30:00 22.22.22.22 443 80',
306            's Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof',
307            '',
308        ])
309        p = FakeEndpointAnswers([fake_consensus,     # ns/all
310                                 '',     # circuit-status
311                                 '',     # stream-status
312                                 '',     # address-mappings/all
313                                 '',     # entry-guards
314                                 '1234'  # PID
315                                 ])
316
317        d = build_tor_connection(p, build_state=True)
318        d.addCallback(self.confirm_state).addErrback(self.fail)
319        d.addCallback(self.confirm_pid).addErrback(self.fail)
320        d.addCallback(self.confirm_consensus).addErrback(self.fail)
321        p.proto.post_bootstrap.callback(p.proto)
322        return d
323
324    def test_build_with_answers_no_pid(self):
325        p = FakeEndpointAnswers(['',    # ns/all
326                                 '',    # circuit-status
327                                 '',    # stream-status
328                                 '',    # address-mappings/all
329                                 ''     # entry-guards
330                                 ])
331
332        d = build_tor_connection(p, build_state=True)
333        d.addCallback(self.confirm_state)
334        d.addCallback(self.confirm_no_pid)
335        p.proto.post_bootstrap.callback(p.proto)
336        return d
337
338    def test_build_with_answers_guards_unfound_entry(self):
339        p = FakeEndpointAnswers(['',    # ns/all
340                                 '',    # circuit-status
341                                 '',    # stream-status
342                                 '',    # address-mappings/all
343                                 '\n\nkerblam up\nOK\n'     # entry-guards
344                                 ])
345
346        d = build_tor_connection(p, build_state=True)
347        d.addCallback(self.confirm_state)
348        d.addCallback(self.confirm_no_pid)
349        p.proto.post_bootstrap.callback(p.proto)
350        return d
351
352    def test_build_local_unix(self):
353        reactor = FakeReactor(self)
354        d = build_local_tor_connection(reactor)
355        d.addErrback(lambda _: None)
356        return d
357
358    def test_build_local_tcp(self):
359        reactor = FakeReactor(self)
360        d = build_local_tor_connection(reactor, socket=None)
361        d.addErrback(lambda _: None)
362        return d
363
364
365class UtilTests(unittest.TestCase):
366
367    def test_extract_reason_no_reason(self):
368        reason = _extract_reason(dict())
369        self.assertEqual("unknown", reason)
370
371
372class StateTests(unittest.TestCase):
373
374    def setUp(self):
375        self.protocol = TorControlProtocol()
376        self.state = TorState(self.protocol)
377        # avoid spew in trial logs; state prints this by default
378        self.state._attacher_error = lambda f: f
379        self.protocol.connectionMade = lambda: None
380        self.transport = proto_helpers.StringTransport()
381        self.protocol.makeConnection(self.transport)
382
383    def test_close_stream_with_attacher(self):
384        @implementer(IStreamAttacher)
385        class MyAttacher(object):
386
387            def __init__(self):
388                self.streams = []
389
390            def attach_stream(self, stream, circuits):
391                self.streams.append(stream)
392                return None
393
394        attacher = MyAttacher()
395        self.state.set_attacher(attacher, FakeReactor(self))
396        self.state._stream_update("76 CLOSED 0 www.example.com:0 REASON=DONE")
397
398    def test_attacher_error_handler(self):
399        # make sure error-handling "does something" that isn't blowing up
400        with patch('sys.stdout'):
401            TorState(self.protocol)._attacher_error(Failure(RuntimeError("quote")))
402
403    def test_stream_update(self):
404        # we use a circuit ID of 0 so it doesn't try to look anything
405        # up but it's not really correct to have a SUCCEEDED w/o a
406        # valid circuit, I don't think
407        self.state._stream_update('1610 SUCCEEDED 0 74.125.224.243:80')
408        self.assertTrue(1610 in self.state.streams)
409
410    def test_single_streams(self):
411        self.state.circuits[496] = FakeCircuit(496)
412        self.state._stream_status(
413            'stream-status=123 SUCCEEDED 496 www.example.com:6667'
414        )
415        self.assertEqual(len(self.state.streams), 1)
416
417    def test_multiple_streams(self):
418        self.state.circuits[496] = FakeCircuit(496)
419        self.state._stream_status(
420            '\r\n'.join([
421                'stream-status=',
422                '123 SUCCEEDED 496 www.example.com:6667',
423                '124 SUCCEEDED 496 www.example.com:6667',
424            ])
425        )
426        self.assertEqual(len(self.state.streams), 2)
427
428    def send(self, line):
429        self.protocol.dataReceived(line.strip() + b"\r\n")
430
431    @defer.inlineCallbacks
432    def test_bootstrap_callback(self):
433        '''
434        FIXME: something is still screwy with this; try throwing an
435        exception from TorState.bootstrap and we'll just hang...
436        '''
437
438        from .test_torconfig import FakeControlProtocol
439        protocol = FakeControlProtocol(
440            [
441                "ns/all=",  # ns/all
442                "",  # circuit-status
443                "",  # stream-status
444                "",  # address-mappings/all
445                "entry-guards=\r\n$0000000000000000000000000000000000000000=name up\r\n$1111111111111111111111111111111111111111=foo up\r\n$9999999999999999999999999999999999999999=eman unusable 2012-01-01 22:00:00\r\n",  # entry-guards
446                "99999",  # process/pid
447                "??",  # ip-to-country/0.0.0.0
448            ]
449        )
450
451        state = yield TorState.from_protocol(protocol)
452
453        self.assertEqual(len(state.entry_guards), 2)
454        self.assertTrue('$0000000000000000000000000000000000000000' in state.entry_guards)
455        self.assertTrue('$1111111111111111111111111111111111111111' in state.entry_guards)
456        self.assertEqual(len(state.unusable_entry_guards), 1)
457        self.assertTrue('$9999999999999999999999999999999999999999' in state.unusable_entry_guards[0])
458
459    def test_bootstrap_existing_addresses(self):
460        '''
461        FIXME: something is still screwy with this; try throwing an
462        exception from TorState.bootstrap and we'll just hang...
463        '''
464
465        d = self.state.post_bootstrap
466
467        clock = task.Clock()
468        self.state.addrmap.scheduler = clock
469
470        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
471        self.state._bootstrap()
472
473        self.send(b"250+ns/all=")
474        self.send(b".")
475        self.send(b"250 OK")
476
477        self.send(b"250+circuit-status=")
478        self.send(b".")
479        self.send(b"250 OK")
480
481        self.send(b"250-stream-status=")
482        self.send(b"250 OK")
483
484        self.send(b"250+address-mappings/all=")
485        self.send(b'www.example.com 127.0.0.1 "2012-01-01 00:00:00"')
486        self.send(b'subdomain.example.com 10.0.0.0 "2012-01-01 00:01:02"')
487        self.send(b".")
488        self.send(b"250 OK")
489
490        for ignored in self.state.event_map.items():
491            self.send(b"250 OK")
492
493        self.send(b"250-entry-guards=")
494        self.send(b"250 OK")
495
496        self.send(b"250 OK")
497
498        self.assertEqual(len(self.state.addrmap.addr), 4)
499        self.assertTrue('www.example.com' in self.state.addrmap.addr)
500        self.assertTrue('subdomain.example.com' in self.state.addrmap.addr)
501        self.assertTrue('10.0.0.0' in self.state.addrmap.addr)
502        self.assertTrue('127.0.0.1' in self.state.addrmap.addr)
503        self.assertEqual(IPv4Address(u'127.0.0.1'), self.state.addrmap.find('www.example.com').ip)
504        self.assertEqual('www.example.com', self.state.addrmap.find('127.0.0.1').name)
505        self.assertEqual(IPv4Address(u'10.0.0.0'), self.state.addrmap.find('subdomain.example.com').ip)
506        self.assertEqual('subdomain.example.com', self.state.addrmap.find('10.0.0.0').name)
507
508        return d
509
510    def test_bootstrap_single_existing_circuit(self):
511        '''
512        test with exactly one circuit. should probably test with 2 as
513        well, since there was a bug with the handling of just one.
514        '''
515
516        d = self.state.post_bootstrap
517
518        clock = task.Clock()
519        self.state.addrmap.scheduler = clock
520
521        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
522        self.state._bootstrap()
523
524        self.send(b"250+ns/all=")
525        self.send(b".")
526        self.send(b"250 OK")
527
528        self.send(b"250-circuit-status=123 BUILT PURPOSE=GENERAL")
529        self.send(b"250 OK")
530
531        self.send(b"250-stream-status=")
532        self.send(b"250 OK")
533
534        self.send(b"250+address-mappings/all=")
535        self.send(b".")
536        self.send(b"250 OK")
537
538        for ignored in self.state.event_map.items():
539            self.send(b"250 OK")
540
541        self.send(b"250-entry-guards=")
542        self.send(b"250 OK")
543
544        self.send(b"250 OK")
545
546        self.assertTrue(self.state.find_circuit(123))
547        self.assertEqual(len(self.state.circuits), 1)
548
549        return d
550
551    def test_unset_attacher(self):
552
553        @implementer(IStreamAttacher)
554        class MyAttacher(object):
555
556            def attach_stream(self, stream, circuits):
557                return None
558
559        fr = FakeReactor(self)
560        attacher = MyAttacher()
561        self.state.set_attacher(attacher, fr)
562        self.send(b"250 OK")
563        self.state.set_attacher(None, fr)
564        self.send(b"250 OK")
565        self.assertEqual(
566            self.transport.value(),
567            b'SETCONF __LeaveStreamsUnattached=1\r\nSETCONF'
568            b' __LeaveStreamsUnattached=0\r\n'
569        )
570
571    def test_attacher_twice(self):
572        """
573        It should be an error to set an attacher twice
574        """
575        @implementer(IStreamAttacher)
576        class MyAttacher(object):
577            pass
578
579        attacher = MyAttacher()
580        self.state.set_attacher(attacher, FakeReactor(self))
581        # attach the *same* instance twice; not an error
582        self.state.set_attacher(attacher, FakeReactor(self))
583        with self.assertRaises(RuntimeError) as ctx:
584            self.state.set_attacher(MyAttacher(), FakeReactor(self))
585        self.assertTrue(
586            "already have an attacher" in str(ctx.exception)
587        )
588
589    @defer.inlineCallbacks
590    def _test_attacher_both_apis(self):
591        """
592        similar to above, but first set_attacher is implicit via
593        Circuit.stream_via
594        """
595        reactor = Mock()
596        directlyProvides(reactor, IReactorCore)
597
598        @implementer(IStreamAttacher)
599        class MyAttacher(object):
600            pass
601
602        circ = Circuit(self.state)
603        circ.state = 'BUILT'
604
605        # use the "preferred" API, which will set an attacher
606        factory = Mock()
607        proto = Mock()
608        proto.when_done = Mock(return_value=defer.succeed(None))
609        factory.connect = Mock(return_value=defer.succeed(proto))
610        ep = circ.stream_via(reactor, 'meejah.ca', 443, factory)
611        addr = Mock()
612        addr.host = '10.0.0.1'
613        addr.port = 1011
614        ep._target_endpoint._get_address = Mock(return_value=defer.succeed(addr))
615        attacher = yield _get_circuit_attacher(reactor, self.state)
616        d = ep.connect('foo')
617        stream = Mock()
618        import ipaddress
619        stream.source_addr = ipaddress.IPv4Address(u'10.0.0.1')
620        stream.source_port = 1011
621        attacher.attach_stream(stream, [])
622        yield d
623
624        # ...now use the low-level API (should be an error)
625        with self.assertRaises(RuntimeError) as ctx:
626            self.state.set_attacher(MyAttacher(), FakeReactor(self))
627        self.assertTrue(
628            "already have an attacher" in str(ctx.exception)
629        )
630
631    def test_attacher(self):
632        @implementer(IStreamAttacher)
633        class MyAttacher(object):
634
635            def __init__(self):
636                self.streams = []
637                self.answer = None
638
639            def attach_stream(self, stream, circuits):
640                self.streams.append(stream)
641                return self.answer
642
643        attacher = MyAttacher()
644        self.state.set_attacher(attacher, FakeReactor(self))
645        events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL'
646        self.protocol._set_valid_events(events)
647        self.state._add_events()
648        for ignored in self.state.event_map.items():
649            self.send(b"250 OK")
650
651        self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER")
652        self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE")
653        self.assertEqual(len(attacher.streams), 1)
654        self.assertEqual(attacher.streams[0].id, 1)
655        self.assertEqual(len(self.protocol.commands), 1)
656        self.assertEqual(self.protocol.commands[0][1], b'ATTACHSTREAM 1 0')
657
658        # we should totally ignore .exit URIs
659        attacher.streams = []
660        self.send(b"650 STREAM 2 NEW 0 10.0.0.0.$E11D2B2269CC25E67CA6C9FB5843497539A74FD0.exit:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME")
661        self.assertEqual(len(attacher.streams), 0)
662        self.assertEqual(len(self.protocol.commands), 1)
663
664        # we should NOT ignore .onion URIs
665        attacher.streams = []
666        self.send(b"650 STREAM 3 NEW 0 xxxxxxxxxxxxxxxx.onion:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME")
667        self.assertEqual(len(attacher.streams), 1)
668        self.assertEqual(len(self.protocol.commands), 2)
669        self.assertEqual(self.protocol.commands[1][1], b'ATTACHSTREAM 3 0')
670
671        # normal attach
672        circ = FakeCircuit(1)
673        circ.state = 'BUILT'
674        self.state.circuits[1] = circ
675        attacher.answer = circ
676        self.send(b"650 STREAM 4 NEW 0 xxxxxxxxxxxxxxxx.onion:80 SOURCE_ADDR=127.0.0.1:12345 PURPOSE=TIME")
677        self.assertEqual(len(attacher.streams), 2)
678        self.assertEqual(len(self.protocol.commands), 3)
679        self.assertEqual(self.protocol.commands[2][1], b'ATTACHSTREAM 4 1')
680
681    def test_attacher_defer(self):
682        @implementer(IStreamAttacher)
683        class MyAttacher(object):
684
685            def __init__(self, answer):
686                self.streams = []
687                self.answer = answer
688
689            def attach_stream(self, stream, circuits):
690                self.streams.append(stream)
691                return defer.succeed(self.answer)
692
693        self.state.circuits[1] = FakeCircuit(1)
694        self.state.circuits[1].state = 'BUILT'
695        attacher = MyAttacher(self.state.circuits[1])
696        self.state.set_attacher(attacher, FakeReactor(self))
697
698        # boilerplate to finish enough set-up in the protocol so it
699        # works
700        events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL'
701        self.protocol._set_valid_events(events)
702        self.state._add_events()
703        for ignored in self.state.event_map.items():
704            self.send(b"250 OK")
705
706        self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER")
707        self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE")
708        self.assertEqual(len(attacher.streams), 1)
709        self.assertEqual(attacher.streams[0].id, 1)
710        self.assertEqual(len(self.protocol.commands), 1)
711        self.assertEqual(self.protocol.commands[0][1], b'ATTACHSTREAM 1 1')
712
713    @defer.inlineCallbacks
714    def test_attacher_errors(self):
715        @implementer(IStreamAttacher)
716        class MyAttacher(object):
717
718            def __init__(self, answer):
719                self.streams = []
720                self.answer = answer
721
722            def attach_stream(self, stream, circuits):
723                return self.answer
724
725        self.state.circuits[1] = FakeCircuit(1)
726        attacher = MyAttacher(FakeCircuit(2))
727        self.state.set_attacher(attacher, FakeReactor(self))
728
729        stream = Stream(self.state)
730        stream.id = 3
731        msg = ''
732        try:
733            yield self.state._maybe_attach(stream)
734        except Exception as e:
735            msg = str(e)
736        self.assertTrue('circuit unknown' in msg)
737
738        attacher.answer = self.state.circuits[1]
739        msg = ''
740        try:
741            yield self.state._maybe_attach(stream)
742        except Exception as e:
743            msg = str(e)
744        self.assertTrue('only attach to BUILT' in msg)
745
746        attacher.answer = 'not a Circuit instance'
747        msg = ''
748        try:
749            yield self.state._maybe_attach(stream)
750        except Exception as e:
751            msg = str(e)
752        self.assertTrue('Circuit instance' in msg)
753
754    def test_attacher_no_attach(self):
755        @implementer(IStreamAttacher)
756        class MyAttacher(object):
757
758            def __init__(self):
759                self.streams = []
760
761            def attach_stream(self, stream, circuits):
762                self.streams.append(stream)
763                return TorState.DO_NOT_ATTACH
764
765        attacher = MyAttacher()
766        self.state.set_attacher(attacher, FakeReactor(self))
767        events = 'GUARD STREAM CIRC NS NEWCONSENSUS ORCONN NEWDESC ADDRMAP STATUS_GENERAL'
768        self.protocol._set_valid_events(events)
769        self.state._add_events()
770        for ignored in self.state.event_map.items():
771            self.send(b"250 OK")
772
773        self.transport.clear()
774        self.send(b"650 STREAM 1 NEW 0 ca.yahoo.com:80 SOURCE_ADDR=127.0.0.1:54327 PURPOSE=USER")
775        self.send(b"650 STREAM 1 REMAP 0 87.248.112.181:80 SOURCE=CACHE")
776        self.assertEqual(len(attacher.streams), 1)
777        self.assertEqual(attacher.streams[0].id, 1)
778        self.assertEqual(self.transport.value(), b'')
779
780    def test_close_stream_with_id(self):
781        stream = Stream(self.state)
782        stream.id = 1
783
784        self.state.streams[1] = stream
785        self.state.close_stream(stream)
786        self.assertEqual(self.transport.value(), b'CLOSESTREAM 1 1\r\n')
787
788    def test_close_stream_with_stream(self):
789        stream = Stream(self.state)
790        stream.id = 1
791
792        self.state.streams[1] = stream
793        self.state.close_stream(stream.id)
794        self.assertEqual(self.transport.value(), b'CLOSESTREAM 1 1\r\n')
795
796    def test_close_stream_invalid_reason(self):
797        stream = Stream(self.state)
798        stream.id = 1
799        self.state.streams[1] = stream
800        self.assertRaises(
801            ValueError,
802            self.state.close_stream,
803            stream,
804            'FOO_INVALID_REASON'
805        )
806
807    def test_close_circuit_with_id(self):
808        circuit = Circuit(self.state)
809        circuit.id = 1
810
811        self.state.circuits[1] = circuit
812        self.state.close_circuit(circuit.id)
813        self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1\r\n')
814
815    def test_close_circuit_with_circuit(self):
816        circuit = Circuit(self.state)
817        circuit.id = 1
818
819        self.state.circuits[1] = circuit
820        self.state.close_circuit(circuit)
821        self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1\r\n')
822
823    def test_close_circuit_with_flags(self):
824        circuit = Circuit(self.state)
825        circuit.id = 1
826        # try:
827        #     self.state.close_circuit(circuit.id, IfUnused=True)
828        #     self.assertTrue(False)
829        # except KeyError:
830        #     pass
831
832        self.state.circuits[1] = circuit
833        self.state.close_circuit(circuit.id, IfUnused=True)
834        self.assertEqual(self.transport.value(), b'CLOSECIRCUIT 1 IfUnused\r\n')
835
836    def test_circuit_destroy(self):
837        self.state._circuit_update('365 LAUNCHED PURPOSE=GENERAL')
838        self.assertTrue(365 in self.state.circuits)
839        self.state._circuit_update('365 FAILED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT')
840        self.assertTrue(365 not in self.state.circuits)
841
842    def test_circuit_destroy_already(self):
843        self.state._circuit_update('365 LAUNCHED PURPOSE=GENERAL')
844        self.assertTrue(365 in self.state.circuits)
845        self.state._circuit_update('365 CLOSED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT')
846        self.assertTrue(365 not in self.state.circuits)
847        self.state._circuit_update('365 CLOSED $E11D2B2269CC25E67CA6C9FB5843497539A74FD0=eris,$50DD343021E509EB3A5A7FD0D8A4F8364AFBDCB5=venus,$253DFF1838A2B7782BE7735F74E50090D46CA1BC=chomsky PURPOSE=GENERAL REASON=TIMEOUT')
848        self.assertTrue(365 not in self.state.circuits)
849
850    def test_circuit_listener(self):
851        events = 'CIRC STREAM ORCONN BW DEBUG INFO NOTICE WARN ERR NEWDESC ADDRMAP AUTHDIR_NEWDESCS DESCCHANGED NS STATUS_GENERAL STATUS_CLIENT STATUS_SERVER GUARD STREAM_BW CLIENTS_SEEN NEWCONSENSUS BUILDTIMEOUT_SET'
852        self.protocol._set_valid_events(events)
853        self.state._add_events()
854        for ignored in self.state.event_map.items():
855            self.send(b"250 OK")
856
857        # we use this router later on in an EXTEND
858        self.state._update_network_status("""ns/all=
859r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0
860s Fast Guard Running Stable Unnamed Valid
861w Bandwidth=51500
862p reject 1-65535""")
863
864        expected = [('new', {'id': 456}),
865                    ('launched', {}),
866                    ('extend', {'id': 123})
867                    ]
868        listen = CircuitListener(expected)
869        # first add a Circuit before we listen
870        self.protocol.dataReceived(b"650 CIRC 123 LAUNCHED PURPOSE=GENERAL\r\n")
871        self.assertEqual(len(self.state.circuits), 1)
872
873        # make sure we get added to existing circuits
874        self.state.add_circuit_listener(listen)
875        first_circuit = list(self.state.circuits.values())[0]
876        self.assertTrue(listen in first_circuit.listeners)
877
878        # now add a Circuit after we started listening
879        self.protocol.dataReceived(b"650 CIRC 456 LAUNCHED PURPOSE=GENERAL\r\n")
880        self.assertEqual(len(self.state.circuits), 2)
881        self.assertTrue(listen in list(self.state.circuits.values())[0].listeners)
882        self.assertTrue(listen in list(self.state.circuits.values())[1].listeners)
883
884        # now update the first Circuit to ensure we're really, really
885        # listening
886        self.protocol.dataReceived(b"650 CIRC 123 EXTENDED $D82183B1C09E1D7795FF2D7116BAB5106AA3E60E~PPrivCom012 PURPOSE=GENERAL\r\n")
887        self.assertEqual(len(listen.expected), 0)
888
889    def test_router_from_id_invalid_key(self):
890        self.failUnlessRaises(KeyError, self.state.router_from_id, 'somethingcompletelydifferent..thatis42long')
891
892    def test_router_from_named_router(self):
893        r = self.state.router_from_id('$AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=foo')
894        self.assertEqual(r.id_hex, '$AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA')
895        self.assertEqual(r.unique_name, 'foo')
896
897    def confirm_router_state(self, x):
898        self.assertTrue('$624926802351575FF7E4E3D60EFA3BFB56E67E8A' in self.state.routers)
899        router = self.state.routers['$624926802351575FF7E4E3D60EFA3BFB56E67E8A']
900        self.assertTrue('exit' in router.flags)
901        self.assertTrue('fast' in router.flags)
902        self.assertTrue('guard' in router.flags)
903        self.assertTrue('hsdir' in router.flags)
904        self.assertTrue('named' in router.flags)
905        self.assertTrue('running' in router.flags)
906        self.assertTrue('stable' in router.flags)
907        self.assertTrue('v2dir' in router.flags)
908        self.assertTrue('valid' in router.flags)
909        self.assertTrue('futureproof' in router.flags)
910        self.assertEqual(router.bandwidth, 518000)
911        self.assertTrue(router.accepts_port(43))
912        self.assertTrue(router.accepts_port(53))
913        self.assertTrue(not router.accepts_port(44))
914        self.assertTrue(router.accepts_port(989))
915        self.assertTrue(router.accepts_port(990))
916        self.assertTrue(not router.accepts_port(991))
917        self.assertTrue(not router.accepts_port(988))
918
919    def test_router_with_ipv6_address(self):
920        self.state._update_network_status("""ns/all=
921r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0
922a [2001:0:0:0::0]:4321
923s Fast Guard Running Stable Named Valid
924w Bandwidth=51500
925p reject 1-65535""")
926        self.assertEqual(len(self.state.routers_by_name['PPrivCom012'][0].ip_v6), 1)
927        self.assertEqual(self.state.routers_by_name['PPrivCom012'][0].ip_v6[0], '[2001:0:0:0::0]:4321')
928
929    def test_invalid_routers(self):
930        try:
931            self.state._update_network_status('''ns/all=
932r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
933r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
934s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
935w Bandwidth=518000
936p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888
937.''')
938            self.fail()
939
940        except RuntimeError as e:
941            # self.assertTrue('Illegal state' in str(e))
942            # flip back when we go back to Automat
943            self.assertTrue('Expected "s "' in str(e))
944
945    def test_routers_no_policy(self):
946        """
947        ensure we can parse a router descriptor which has no p line
948        """
949
950        self.state._update_network_status('''ns/all=
951r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
952s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
953w Bandwidth=518000
954r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0
955s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
956w Bandwidth=518000
957p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888
958.''')
959        self.assertTrue('fake' in self.state.routers.keys())
960        self.assertTrue('PPrivCom012' in self.state.routers.keys())
961
962    def test_routers_no_bandwidth(self):
963        """
964        ensure we can parse a router descriptor which has no w line
965        """
966
967        self.state._update_network_status('''ns/all=
968r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
969s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
970r PPrivCom012 2CGDscCeHXeV/y1xFrq1EGqj5g4 QX7NVLwx7pwCuk6s8sxB4rdaCKI 2011-12-20 08:34:19 84.19.178.6 9001 0
971s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
972w Bandwidth=518000
973p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888
974.''')
975        self.assertTrue('fake' in self.state.routers.keys())
976        self.assertTrue('PPrivCom012' in self.state.routers.keys())
977
978    def test_router_factory(self):
979        self.state._update_network_status('''ns/all=
980r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
981s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
982w Bandwidth=518000
983p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888
984r fake YxxmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
985s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof
986w Bandwidth=543000
987p accept 43,53
988.''')
989        self.assertTrue('$624926802351575FF7E4E3D60EFA3BFB56E67E8A' in self.state.routers)
990        r = self.state.routers['$624926802351575FF7E4E3D60EFA3BFB56E67E8A']
991        self.assertEqual(r.controller, self.state.protocol)
992        self.assertEqual(r.bandwidth, 518000)
993        self.assertEqual(len(self.state.routers_by_name['fake']), 2)
994
995        # now we do an update
996        self.state._update_network_status('''ns/all=
997r fake YkkmgCNRV1/35OPWDvo7+1bmfoo tanLV/4ZfzpYQW0xtGFqAa46foo 2011-12-12 16:29:16 12.45.56.78 443 80
998s Exit Fast Guard HSDir Named Running Stable V2Dir Valid FutureProof Authority
999w Bandwidth=543000
1000p accept 43,53,79-81,110,143,194,220,443,953,989-990,993,995,1194,1293,1723,1863,2082-2083,2086-2087,2095-2096,3128,4321,5050,5190,5222-5223,6679,6697,7771,8000,8008,8080-8081,8090,8118,8123,8181,8300,8443,8888
1001.''')
1002        self.assertEqual(r.bandwidth, 543000)
1003
1004    def test_empty_stream_update(self):
1005        self.state._stream_update('''stream-status=''')
1006
1007    def test_addrmap(self):
1008        self.state._addr_map('example.com 127.0.0.1 "2012-01-01 00:00:00" EXPIRES=NEVER')
1009
1010    def test_double_newconsensus(self):
1011        """
1012        The arrival of a second NEWCONSENSUS event causes parsing
1013        errors.
1014        """
1015
1016        # bootstrap the TorState so we can send it a "real" 650
1017        # update
1018
1019        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
1020        self.state._bootstrap()
1021
1022        self.send(b"250+ns/all=")
1023        self.send(b".")
1024        self.send(b"250 OK")
1025
1026        self.send(b"250+circuit-status=")
1027        self.send(b".")
1028        self.send(b"250 OK")
1029
1030        self.send(b"250-stream-status=")
1031        self.send(b"250 OK")
1032
1033        self.send(b"250-address-mappings/all=")
1034        self.send(b'250 OK')
1035
1036        for ignored in self.state.event_map.items():
1037            self.send(b"250 OK")
1038
1039        self.send(b"250-entry-guards=")
1040        self.send(b"250 OK")
1041
1042        self.send(b"250 OK")
1043
1044        # state is now bootstrapped, we can send our NEWCONSENSUS update
1045
1046        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1047r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1048s Fast Guard Running Stable Valid
1049w Bandwidth=166
1050p reject 1-65535
1051.
1052650 OK
1053'''.split(b'\n')))
1054
1055        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1056r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1057s Fast Guard Running Stable Valid
1058w Bandwidth=166
1059p reject 1-65535
1060.
1061650 OK
1062'''.split(b'\n')))
1063
1064        self.assertEqual(1, len(self.state.all_routers))
1065        self.assertTrue('Unnamed' in self.state.routers)
1066        self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers)
1067
1068    def test_newconsensus_remove_routers(self):
1069        """
1070        router removed from consensus is removed
1071        """
1072
1073        # bootstrap the TorState so we can send it a "real" 650
1074        # update
1075
1076        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
1077        self.state._bootstrap()
1078
1079        self.send(b"250+ns/all=")
1080        self.send(b".")
1081        self.send(b"250 OK")
1082
1083        self.send(b"250+circuit-status=")
1084        self.send(b".")
1085        self.send(b"250 OK")
1086
1087        self.send(b"250-stream-status=")
1088        self.send(b"250 OK")
1089
1090        self.send(b"250-address-mappings/all=")
1091        self.send(b'250 OK')
1092
1093        for ignored in self.state.event_map.items():
1094            self.send(b"250 OK")
1095
1096        self.send(b"250-entry-guards=")
1097        self.send(b"250 OK")
1098
1099        self.send(b"250 OK")
1100
1101        # state is now bootstrapped, we can send our NEWCONSENSUS update
1102
1103        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1104r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1105s Fast Guard Running Stable Valid
1106w Bandwidth=166
1107p reject 1-65535
1108r Foo ABJJJJUFz1lvQS0jq8nhTdRiXEk /zzzUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1109s Fast Guard Running Stable Valid
1110w Bandwidth=166
1111p reject 1-65535
1112.
1113650 OK
1114'''.split(b'\n')))
1115
1116        self.assertEqual(2, len(self.state.all_routers))
1117        self.assertTrue('Unnamed' in self.state.routers)
1118        self.assertTrue('Foo' in self.state.routers_by_name)
1119        self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers)
1120        self.assertTrue('$001249249505CF596F412D23ABC9E14DD4625C49' in self.state.routers)
1121
1122        # this is a different fingerprint, but same name
1123        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1124r Unnamed ABBBguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1125s Fast Guard Running Stable Valid
1126w Bandwidth=166
1127p reject 1-65535
1128.
1129650 OK
1130'''.split(b'\n')))
1131
1132        self.assertEqual(1, len(self.state.all_routers))
1133        self.assertIn('Unnamed', self.state.routers)
1134        self.assertIn('$00104182E505CF596F412D23ABC9E14DD4625C49', self.state.routers)
1135        self.assertNotIn('$00126582E505CF596F412D23ABC9E14DD4625C49', self.state.routers)
1136        self.assertNotIn('$00126582E505CF596F412D23ABC9E14DD4625C49', self.state.routers_by_hash)
1137        self.assertNotIn('$001249249505CF596F412D23ABC9E14DD4625C49', self.state.routers)
1138        self.assertNotIn('$001249249505CF596F412D23ABC9E14DD4625C49', self.state.routers_by_hash)
1139        self.assertNotIn('Foo', self.state.routers_by_name)
1140
1141    def test_NEWCONSENSUS_ends_with_OK_on_w(self):
1142        """
1143        The arrival of a second NEWCONSENSUS event causes parsing
1144        errors.
1145        """
1146
1147        # bootstrap the TorState so we can send it a "real" 650
1148        # update
1149
1150        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
1151        self.state._bootstrap()
1152
1153        self.send(b"250+ns/all=")
1154        self.send(b".")
1155        self.send(b"250 OK")
1156
1157        self.send(b"250+circuit-status=")
1158        self.send(b".")
1159        self.send(b"250 OK")
1160
1161        self.send(b"250-stream-status=")
1162        self.send(b"250 OK")
1163
1164        self.send(b"250-address-mappings/all=")
1165        self.send(b"250 OK")
1166
1167        for ignored in self.state.event_map.items():
1168            self.send(b"250 OK")
1169
1170        self.send(b"250-entry-guards=")
1171        self.send(b"250 OK")
1172
1173        self.send(b"250 OK")
1174
1175        # state is now bootstrapped, we can send our NEWCONSENSUS update
1176
1177        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1178r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1179s Fast Guard Running Stable Valid
1180w Bandwidth=166
1181.
1182650 OK
1183'''.split(b'\n')))
1184
1185        self.assertTrue('Unnamed' in self.state.routers)
1186        self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers)
1187
1188    def test_NEWCONSENSUS_ends_with_OK_on_s(self):
1189        """
1190        The arrival of a second NEWCONSENSUS event causes parsing
1191        errors.
1192        """
1193
1194        # bootstrap the TorState so we can send it a "real" 650
1195        # update
1196
1197        self.protocol._set_valid_events(' '.join(self.state.event_map.keys()))
1198        self.state._bootstrap()
1199
1200        self.send(b"250+ns/all=")
1201        self.send(b".")
1202        self.send(b"250 OK")
1203
1204        self.send(b"250+circuit-status=")
1205        self.send(b".")
1206        self.send(b"250 OK")
1207
1208        self.send(b"250-stream-status=")
1209        self.send(b"250 OK")
1210
1211        self.send(b"250-address-mappings/all=")
1212        self.send(b"250 OK")
1213
1214        for ignored in self.state.event_map.items():
1215            self.send(b"250 OK")
1216
1217        self.send(b"250-entry-guards=")
1218        self.send(b"250 OK")
1219
1220        self.send(b"250 OK")
1221
1222        # state is now bootstrapped, we can send our NEWCONSENSUS update
1223
1224        self.protocol.dataReceived(b'\r\n'.join(b'''650+NEWCONSENSUS
1225r Unnamed ABJlguUFz1lvQS0jq8nhTdRiXEk /zIVUg1tKMUeyUBoyimzorbQN9E 2012-05-23 01:10:22 219.94.255.254 9001 0
1226s Fast Guard Running Stable Valid
1227.
1228650 OK
1229'''.split(b'\n')))
1230
1231        self.assertTrue('Unnamed' in self.state.routers)
1232        self.assertTrue('$00126582E505CF596F412D23ABC9E14DD4625C49' in self.state.routers)
1233
1234    def test_stream_create(self):
1235        self.state._stream_update('1610 NEW 0 1.2.3.4:56')
1236        self.assertTrue(1610 in self.state.streams)
1237
1238    def test_stream_destroy(self):
1239        self.state._stream_update('1610 NEW 0 1.2.3.4:56')
1240        self.assertTrue(1610 in self.state.streams)
1241        self.state._stream_update("1610 FAILED 0 www.example.com:0 REASON=DONE REMOTE_REASON=FAILED")
1242        self.assertTrue(1610 not in self.state.streams)
1243
1244    def test_stream_detach(self):
1245        circ = FakeCircuit(1)
1246        circ.state = 'BUILT'
1247        self.state.circuits[1] = circ
1248
1249        self.state._stream_update('1610 NEW 0 1.2.3.4:56')
1250        self.assertTrue(1610 in self.state.streams)
1251        self.state._stream_update("1610 SUCCEEDED 1 4.3.2.1:80")
1252        self.assertEqual(self.state.streams[1610].circuit, circ)
1253
1254        self.state._stream_update("1610 DETACHED 0 www.example.com:0 REASON=DONE REMOTE_REASON=FAILED")
1255        self.assertEqual(self.state.streams[1610].circuit, None)
1256
1257    def test_stream_listener(self):
1258        self.protocol._set_valid_events('CIRC STREAM ORCONN BW DEBUG INFO NOTICE WARN ERR NEWDESC ADDRMAP AUTHDIR_NEWDESCS DESCCHANGED NS STATUS_GENERAL STATUS_CLIENT STATUS_SERVER GUARD STREAM_BW CLIENTS_SEEN NEWCONSENSUS BUILDTIMEOUT_SET')
1259        self.state._add_events()
1260        for ignored in self.state.event_map.items():
1261            self.send(b"250 OK")
1262
1263        expected = [('new', {}),
1264                    ]
1265        listen = StreamListener(expected)
1266        self.send(b"650 STREAM 77 NEW 0 www.yahoo.cn:80 SOURCE_ADDR=127.0.0.1:54315 PURPOSE=USER")
1267        self.state.add_stream_listener(listen)
1268
1269        self.assertEqual(1, len(self.state.streams.values()))
1270        self.assertTrue(listen in list(self.state.streams.values())[0].listeners)
1271        self.assertEqual(len(self.state.streams), 1)
1272        self.assertEqual(len(listen.expected), 1)
1273
1274        self.send(b"650 STREAM 78 NEW 0 www.yahoo.cn:80 SOURCE_ADDR=127.0.0.1:54315 PURPOSE=USER")
1275        self.assertEqual(len(self.state.streams), 2)
1276        self.assertEqual(len(listen.expected), 0)
1277
1278    def test_build_circuit(self):
1279        class FakeRouter:
1280            def __init__(self, i):
1281                self.id_hex = i
1282                self.flags = []
1283
1284        path = []
1285        for x in range(3):
1286            path.append(FakeRouter("$%040d" % x))
1287        # can't just check flags for guard status, need to know if
1288        # it's in the running Tor's notion of Entry Guards
1289        path[0].flags = ['guard']
1290
1291        self.state.build_circuit(path, using_guards=True)
1292        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n')
1293        # should have gotten a warning about this not being an entry
1294        # guard
1295        self.assertEqual(len(self.flushWarnings()), 1)
1296
1297    def test_build_circuit_no_routers(self):
1298        self.state.build_circuit()
1299        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0\r\n')
1300
1301    def test_build_circuit_unfound_router(self):
1302        self.state.build_circuit(routers=[b'AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA'], using_guards=False)
1303        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA\r\n')
1304
1305    def circuit_callback(self, circ):
1306        self.assertTrue(isinstance(circ, Circuit))
1307        self.assertEqual(circ.id, 1234)
1308
1309    def test_build_circuit_final_callback(self):
1310        class FakeRouter:
1311            def __init__(self, i):
1312                self.id_hex = i
1313                self.flags = []
1314
1315        path = []
1316        for x in range(3):
1317            path.append(FakeRouter("$%040d" % x))
1318        # can't just check flags for guard status, need to know if
1319        # it's in the running Tor's notion of Entry Guards
1320        path[0].flags = ['guard']
1321
1322        # FIXME TODO we should verify we get a circuit_new event for
1323        # this circuit
1324
1325        d = self.state.build_circuit(path, using_guards=True)
1326        d.addCallback(self.circuit_callback)
1327        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n')
1328        self.send(b"250 EXTENDED 1234")
1329        # should have gotten a warning about this not being an entry
1330        # guard
1331        self.assertEqual(len(self.flushWarnings()), 1)
1332        return d
1333
1334    def test_build_circuit_error(self):
1335        """
1336        tests that we check the callback properly
1337        """
1338
1339        try:
1340            self.state._find_circuit_after_extend("FOO 1234")
1341            self.assertTrue(False)
1342        except RuntimeError as e:
1343            self.assertTrue('Expected EXTENDED' in str(e))
1344
1345    def test_listener_mixins(self):
1346        self.assertTrue(verifyClass(IStreamListener, StreamListenerMixin))
1347        self.assertTrue(verifyClass(ICircuitListener, CircuitListenerMixin))
1348
1349    def test_build_circuit_timedout(self):
1350        class FakeRouter:
1351            def __init__(self, i):
1352                self.id_hex = i
1353                self.flags = []
1354
1355        path = []
1356        for x in range(3):
1357            path.append(FakeRouter("$%040d" % x))
1358        # can't just check flags for guard status, need to know if
1359        # it's in the running Tor's notion of Entry Guards
1360        path[0].flags = ['guard']
1361
1362        # FIXME TODO we should verify we get a circuit_new event for
1363        # this circuit
1364        timeout = 10
1365        clock = task.Clock()
1366
1367        d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False)
1368        clock.advance(10)
1369
1370        def check_for_timeout_error(f):
1371            self.assertTrue(isinstance(f.type(), CircuitBuildTimedOutError))
1372        d.addErrback(check_for_timeout_error)
1373        return d
1374
1375    @defer.inlineCallbacks
1376    def test_build_circuit_cancelled(self):
1377        class FakeRouter:
1378            def __init__(self, i):
1379                self.id_hex = i
1380                self.flags = []
1381
1382        path = []
1383        for x in range(3):
1384            path.append(FakeRouter("$%040d" % x))
1385        # can't just check flags for guard status, need to know if
1386        # it's in the running Tor's notion of Entry Guards
1387        path[0].flags = ['guard']
1388
1389        class FakeCircuit:
1390            close_called = False
1391
1392            def when_built(self):
1393                return defer.Deferred()
1394
1395            def close(self):
1396                self.close_called = True
1397                return defer.succeed(None)
1398
1399        circ = FakeCircuit()
1400
1401        def _build(*args, **kw):
1402            return defer.succeed(circ)
1403        self.state.build_circuit = _build
1404
1405        timeout = 10
1406        clock = task.Clock()
1407
1408        # we want this circuit to get to BUILT, but *then* we call
1409        # .cancel() on the deferred -- in which case, the circuit must
1410        # be closed
1411        d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False)
1412        clock.advance(1)
1413        d.cancel()
1414
1415        with self.assertRaises(CircuitBuildTimedOutError):
1416            yield d
1417        self.assertTrue(circ.close_called)
1418
1419    def test_build_circuit_timeout_after_progress(self):
1420        """
1421        Similar to above but we timeout after Tor has ack'd our
1422        circuit-creation attempt, but before reaching BUILT.
1423        """
1424        class FakeRouter:
1425            def __init__(self, i):
1426                self.id_hex = i
1427                self.flags = []
1428
1429        class FakeCircuit(Circuit):
1430            def close(self):
1431                return defer.succeed(None)
1432
1433        path = []
1434        for x in range(3):
1435            path.append(FakeRouter("$%040d" % x))
1436
1437        def fake_queue(cmd):
1438            self.assertTrue(cmd.startswith('EXTENDCIRCUIT 0'))
1439            return defer.succeed("EXTENDED 1234")
1440
1441        queue_command = patch.object(self.protocol, 'queue_command', fake_queue)
1442        circuit_factory = patch.object(self.state, 'circuit_factory', FakeCircuit)
1443        with queue_command, circuit_factory:
1444            timeout = 10
1445            clock = task.Clock()
1446
1447            d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=False)
1448            clock.advance(timeout + 1)
1449
1450            def check_for_timeout_error(f):
1451                self.assertTrue(isinstance(f.type(), CircuitBuildTimedOutError))
1452            d.addErrback(check_for_timeout_error)
1453        return d
1454
1455    def test_build_circuit_not_timedout(self):
1456        class FakeRouter:
1457            def __init__(self, i):
1458                self.id_hex = i
1459                self.flags = []
1460
1461        path = []
1462        for x in range(3):
1463            path.append(FakeRouter("$%040d" % x))
1464        path[0].flags = ['guard']
1465
1466        timeout = 10
1467        clock = task.Clock()
1468        d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=True)
1469        d.addCallback(self.circuit_callback)
1470
1471        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n')
1472        self.send(b"250 EXTENDED 1234")
1473        # we can't just .send(b'650 CIRC 1234 BUILT') this because we
1474        # didn't fully hook up the protocol to the state, e.g. via
1475        # post_bootstrap etc.
1476        self.state.circuits[1234].update(['1234', 'BUILT'])
1477        # should have gotten a warning about this not being an entry
1478        # guard
1479        self.assertEqual(len(self.flushWarnings()), 1)
1480        return d
1481
1482    def test_build_circuit_failure(self):
1483        class FakeRouter:
1484            def __init__(self, i):
1485                self.id_hex = i
1486                self.flags = []
1487
1488        path = []
1489        for x in range(3):
1490            path.append(FakeRouter("$%040d" % x))
1491        path[0].flags = ['guard']
1492
1493        timeout = 10
1494        clock = task.Clock()
1495        d = build_timeout_circuit(self.state, clock, path, timeout, using_guards=True)
1496        d.addCallback(self.circuit_callback)
1497
1498        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002\r\n')
1499        self.send(b"250 EXTENDED 1234")
1500        # we can't just .send(b'650 CIRC 1234 BUILT') this because we
1501        # didn't fully hook up the protocol to the state, e.g. via
1502        # post_bootstrap etc.
1503        self.state.circuits[1234].update(['1234', 'FAILED', 'REASON=TIMEOUT'])
1504
1505        def check_reason(fail):
1506            self.assertEqual(fail.value.reason, 'TIMEOUT')
1507        d.addErrback(check_reason)
1508
1509        return d
1510
1511    def test_build_circuit_with_purpose(self):
1512        class FakeRouter:
1513            def __init__(self, i):
1514                self.id_hex = i
1515                self.flags = []
1516
1517        path = []
1518        for x in range(3):
1519            path.append(FakeRouter("$%040d" % x))
1520        path[0].flags = ['guard']
1521
1522        d = self.state.build_circuit(path, using_guards=True, purpose="general")
1523        d.addCallback(self.circuit_callback)
1524
1525        self.assertEqual(self.transport.value(), b'EXTENDCIRCUIT 0 0000000000000000000000000000000000000000,0000000000000000000000000000000000000001,0000000000000000000000000000000000000002 purpose=general\r\n')
1526        self.send(b"250 EXTENDED 1234")
1527        # we can't just .send(b'650 CIRC 1234 BUILT') this because we
1528        # didn't fully hook up the protocol to the state, e.g. via
1529        # post_bootstrap etc.
1530        self.state.circuits[1234].update(['1234', 'FAILED', 'REASON=TIMEOUT'])
1531
1532        def check_reason(fail):
1533            self.assertEqual(fail.value.reason, 'TIMEOUT')
1534        d.addErrback(check_reason)
1535
1536        return d
1537
1538
1539class ComposibleListenerTests(unittest.TestCase):
1540
1541    def setUp(self):
1542        self.protocol = TorControlProtocol()
1543        self.state = TorState(self.protocol)
1544
1545    def test_circuit_new(self):
1546        listener_calls = []
1547
1548        @self.state.on_circuit_new
1549        def _(circ):
1550            listener_calls.append(circ)
1551
1552        c = self.state._maybe_create_circuit("42")
1553        c.update(["42", "LAUNCHED"])
1554
1555        self.assertEqual(len(listener_calls), 1)
1556        self.assertEqual(listener_calls[0].id, 42)
1557
1558    def test_circuit_launched(self):
1559        listener_calls = []
1560
1561        @self.state.on_circuit_launched
1562        def _(circ):
1563            listener_calls.append(circ)
1564
1565        c = self.state._maybe_create_circuit("42")
1566        c.update(["42", "LAUNCHED"])
1567
1568        self.assertEqual(len(listener_calls), 1)
1569        self.assertEqual(listener_calls[0].id, 42)
1570
1571    def test_circuit_extend(self):
1572        listener_calls = []
1573
1574        @self.state.on_circuit_extend
1575        def _(circ, router):
1576            listener_calls.append(circ)
1577
1578        c = self.state._maybe_create_circuit("42")
1579        c.update(["42", "LAUNCHED"])
1580        c.update(["42", "BUILDING", "$deadbeef,$1ee7"])
1581
1582        # we get two calls because we've now extended to 2 relays
1583        self.assertEqual(len(listener_calls), 2)
1584        self.assertEqual(listener_calls[0].id, 42)
1585        self.assertEqual(listener_calls[1].id, 42)
1586
1587    def test_circuit_built(self):
1588        listener_calls = []
1589
1590        @self.state.on_circuit_built
1591        def _(circ):
1592            listener_calls.append(circ)
1593
1594        c = self.state._maybe_create_circuit("42")
1595        c.update(["42", "LAUNCHED"])
1596        c.update(["42", "BUILT"])
1597
1598        self.assertEqual(len(listener_calls), 1)
1599        self.assertEqual(listener_calls[0].id, 42)
1600
1601    def test_circuit_closed(self):
1602        listener_calls = []
1603
1604        @self.state.on_circuit_closed
1605        def _(circ):
1606            listener_calls.append(circ)
1607
1608        c = self.state._maybe_create_circuit("42")
1609        c.update(["42", "LAUNCHED"])
1610        c.update(["42", "CLOSED"])
1611
1612        self.assertEqual(len(listener_calls), 1)
1613        self.assertEqual(listener_calls[0].id, 42)
1614
1615    def test_circuit_failed(self):
1616        listener_calls = []
1617
1618        @self.state.on_circuit_failed
1619        def _(circ):
1620            listener_calls.append(circ)
1621
1622        c = self.state._maybe_create_circuit("42")
1623        c.update(["42", "LAUNCHED"])
1624        c.update(["42", "FAILED"])
1625
1626        self.assertEqual(len(listener_calls), 1)
1627        self.assertEqual(listener_calls[0].id, 42)
1628
1629    def test_stream_events(self):
1630        """
1631        one for the philosophers: is this 'one, but big' test better /
1632        easier to read than the N circuit tests above?
1633        """
1634        listener_calls = []  # list of 2-tuples
1635
1636        @self.state.on_stream_new
1637        def _(stream):
1638            listener_calls.append(("new", stream.id))
1639
1640        @self.state.on_stream_succeeded
1641        def _(stream):
1642            listener_calls.append(("succeeded", stream.id))
1643
1644        @self.state.on_stream_attach
1645        def _(stream, circ):
1646            listener_calls.append(("attach", stream.id))
1647
1648        @self.state.on_stream_detach
1649        def _(stream):
1650            listener_calls.append(("detach", stream.id))
1651
1652        @self.state.on_stream_closed
1653        def _(stream):
1654            listener_calls.append(("closed", stream.id))
1655
1656        @self.state.on_stream_failed
1657        def _(stream):
1658            listener_calls.append(("failed", stream.id))
1659
1660        circ = self.state._maybe_create_circuit("42")
1661        circ.update(["42", "LAUNCHED"])
1662
1663        self.state._stream_update("1234 NEW 0 meejah.ca:80")
1664        self.state._stream_update("1234 SUCCEEDED 42")
1665        self.state._stream_update("1234 DETACHED 0")
1666        self.state._stream_update("1234 CLOSED 0")
1667        self.state._stream_update("1234 FAILED 0")
1668
1669        self.assertEqual(
1670            listener_calls,
1671            [
1672                ("new", 1234),
1673                ("succeeded", 1234),
1674                ("attach", 1234),
1675                ("detach", 1234),
1676                ("closed", 1234),
1677                ("failed", 1234),
1678            ]
1679        )
1680