1"""
2Unit tests for stem.descriptor.hidden_service for version 3.
3"""
4
5import base64
6import functools
7import unittest
8
9import stem.client.datatype
10import stem.descriptor
11import stem.descriptor.hidden_service
12import stem.prereq
13
14import test.require
15
16from stem.descriptor.hidden_service import (
17  IntroductionPointV3,
18  HiddenServiceDescriptorV3,
19  AuthorizedClient,
20  OuterLayer,
21  InnerLayer,
22)
23
24from test.unit.descriptor import (
25  get_resource,
26  base_expect_invalid_attr,
27  base_expect_invalid_attr_for_text,
28)
29
30try:
31  # added in python 2.7
32  from collections import OrderedDict
33except ImportError:
34  from stem.util.ordereddict import OrderedDict
35
36try:
37  # added in python 3.3
38  from unittest.mock import patch, Mock
39except ImportError:
40  from mock import patch, Mock
41
42require_sha3 = test.require.needs(stem.prereq._is_sha3_available, 'requires sha3')
43require_x25519 = test.require.needs(lambda: stem.descriptor.hidden_service.X25519_AVAILABLE, 'requires openssl x5509')
44
45expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptorV3, 'version', 3)
46expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptorV3, 'version', 3)
47
48HS_ADDRESS = u'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion'
49HS_PUBKEY = b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1'
50
51EXPECTED_SIGNING_CERT = """\
52-----BEGIN ED25519 CERT-----
53AQgABl5/AZLmgPpXVS59SEydKj7bRvvAduVOqQt3u4Tj5tVlfVKhAQAgBABUhpfe
54/Wd3p/M74DphsGcIMee/npQ9BTzkzCyTyVmDbykek2EciWaOTCVZJVyiKPErngfW
55BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk=
56-----END ED25519 CERT-----\
57"""
58
59EXPECTED_OUTER_LAYER = """\
60desc-auth-type foo
61desc-auth-ephemeral-key bar
62auth-client JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw
63auth-client 1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA
64encrypted
65-----BEGIN MESSAGE-----
66malformed block
67-----END MESSAGE-----\
68"""
69
70with open(get_resource('hidden_service_v3')) as descriptor_file:
71  HS_DESC_STR = descriptor_file.read()
72
73with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file:
74  OUTER_LAYER_STR = outer_layer_file.read()
75
76with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file:
77  INNER_LAYER_STR = inner_layer_file.read()
78
79with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file:
80  INTRO_POINT_STR = intro_point_file.read()
81
82
83class TestHiddenServiceDescriptorV3(unittest.TestCase):
84  def test_real_descriptor(self):
85    """
86    Parse a descriptor for a testing hidden service from asn...
87
88      sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion
89    """
90
91    with open(get_resource('hidden_service_v3'), 'rb') as descriptor_file:
92      desc = next(stem.descriptor.parse_file(descriptor_file, 'hidden-service-descriptor-3 1.0', validate = True))
93
94    self.assertEqual(3, desc.version)
95    self.assertEqual(180, desc.lifetime)
96    self.assertEqual(EXPECTED_SIGNING_CERT, str(desc.signing_cert))
97    self.assertEqual(42, desc.revision_counter)
98    self.assertTrue('eaH8VdaTKS' in desc.superencrypted)
99    self.assertEqual('aglChCQF+lbzKgyxJJTpYGVShV/GMDRJ4+cRGCp+a2y/yX/tLSh7hzqI7rVZrUoGj74Xr1CLMYO3fXYCS+DPDQ', desc.signature)
100
101  @require_sha3
102  @test.require.ed25519_support
103  def test_decryption(self):
104    """
105    Decrypt our descriptor and validate its content.
106    """
107
108    desc = HiddenServiceDescriptorV3.from_str(HS_DESC_STR)
109    inner_layer = desc.decrypt(HS_ADDRESS)
110
111    self.assertEqual(INNER_LAYER_STR, str(inner_layer))
112    self.assertEqual(OUTER_LAYER_STR.rstrip('\x00'), str(inner_layer.outer))
113
114  def test_outer_layer(self):
115    """
116    Parse the outer layer of our test descriptor.
117    """
118
119    desc = OuterLayer(OUTER_LAYER_STR)
120
121    self.assertEqual('x25519', desc.auth_type)
122    self.assertEqual('WjZCU9sV1oxkxaPcd7/YozeZgq0lEs6DhWyrdYRNJR4=', desc.ephemeral_key)
123    self.assertTrue('BsRYMH/No+LgetIFv' in desc.encrypted)
124
125    client = desc.clients['D0Bz0OlEMCg']
126
127    self.assertEqual(16, len(desc.clients))
128    self.assertEqual('D0Bz0OlEMCg', client.id)
129    self.assertEqual('or3nS3ScSPYfLJuP9osGiQ', client.iv)
130    self.assertEqual('B40RdIWhw7kdA7lt3KJPvQ', client.cookie)
131
132  def test_inner_layer(self):
133    """
134    Parse the inner layer of our test descriptor.
135    """
136
137    desc = InnerLayer(INNER_LAYER_STR)
138
139    self.assertEqual([2], desc.formats)
140    self.assertEqual(['ed25519'], desc.intro_auth)
141    self.assertEqual(True, desc.is_single_service)
142    self.assertEqual(4, len(desc.introduction_points))
143
144    intro_point = desc.introduction_points[0]
145
146    self.assertEqual(2, len(intro_point.link_specifiers))
147
148    link_specifier = intro_point.link_specifiers[0]
149    self.assertEqual(stem.client.datatype.LinkByFingerprint, type(link_specifier))
150    self.assertEqual('CCCCCCCCCCCCCCCCCCCC', link_specifier.fingerprint)
151
152    link_specifier = intro_point.link_specifiers[1]
153    self.assertEqual(stem.client.datatype.LinkByIPv4, type(link_specifier))
154    self.assertEqual('1.2.3.4', link_specifier.address)
155    self.assertEqual(9001, link_specifier.port)
156
157    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw)
158    self.assertTrue('ID2l9EFNrp' in intro_point.auth_key_cert.to_base64())
159    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw)
160    self.assertTrue('ZvjPt5IfeQ', intro_point.enc_key_cert)
161    self.assertEqual(None, intro_point.legacy_key_raw)
162    self.assertEqual(None, intro_point.legacy_key_cert)
163
164  @test.require.ed25519_support
165  def test_required_fields(self):
166    """
167    Check that we require the mandatory fields.
168    """
169
170    line_to_attr = {
171      'hs-descriptor': 'version',
172      'descriptor-lifetime': 'lifetime',
173      'descriptor-signing-key-cert': 'signing_cert',
174      'revision-counter': 'revision_counter',
175      'superencrypted': 'superencrypted',
176      'signature': 'signature',
177    }
178
179    for line in stem.descriptor.hidden_service.REQUIRED_V3_FIELDS:
180      desc_text = HiddenServiceDescriptorV3.content(exclude = (line,))
181      expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None)
182
183  @test.require.ed25519_support
184  def test_invalid_version(self):
185    """
186    Checks that our version field expects a numeric value.
187    """
188
189    test_values = (
190      '',
191      '-10',
192      'hello',
193    )
194
195    for test_value in test_values:
196      expect_invalid_attr(self, {'hs-descriptor': test_value}, 'version')
197
198  @test.require.ed25519_support
199  def test_invalid_lifetime(self):
200    """
201    Checks that our lifetime field expects a numeric value.
202    """
203
204    test_values = (
205      '',
206      '-10',
207      'hello',
208    )
209
210    for test_value in test_values:
211      expect_invalid_attr(self, {'descriptor-lifetime': test_value}, 'lifetime')
212
213  @test.require.ed25519_support
214  def test_invalid_revision_counter(self):
215    """
216    Checks that our revision counter field expects a numeric value.
217    """
218
219    test_values = (
220      '',
221      '-10',
222      'hello',
223    )
224
225    for test_value in test_values:
226      expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter')
227
228  @require_sha3
229  def test_address_from_identity_key(self):
230    self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_identity_key(HS_PUBKEY))
231
232  @require_sha3
233  def test_identity_key_from_address(self):
234    self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.identity_key_from_address(HS_ADDRESS))
235    self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.identity_key_from_address, 'boom')
236    self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.identity_key_from_address, '5' * 56)
237
238  def test_intro_point_parse(self):
239    """
240    Parse a v3 introduction point.
241    """
242
243    intro_point = IntroductionPointV3.parse(INTRO_POINT_STR)
244
245    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw)
246    self.assertTrue('0Acq8QW8O7O' in intro_point.auth_key_cert.to_base64())
247    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw)
248    self.assertTrue('4807i5', intro_point.enc_key_cert.to_base64())
249    self.assertTrue('JAoGBAMO3' in intro_point.legacy_key_raw)
250    self.assertTrue('Ln1ITJ0qP' in intro_point.legacy_key_cert)
251
252  def test_intro_point_encode(self):
253    """
254    Encode an introduction point back into a string.
255    """
256
257    intro_point = IntroductionPointV3.parse(INTRO_POINT_STR)
258    self.assertEqual(INTRO_POINT_STR.rstrip(), intro_point.encode())
259
260  @require_x25519
261  @test.require.ed25519_support
262  def test_intro_point_crypto(self):
263    """
264    Retrieve IntroductionPointV3 cryptographic materials.
265    """
266
267    def base64_key(key):
268      pubkey = stem.util._pubkey_bytes(key)
269      pubkey_b64 = base64.b64encode(pubkey)
270      return stem.util.str_tools._to_unicode(pubkey_b64)
271
272    from cryptography.hazmat.backends.openssl.x25519 import X25519PublicKey
273
274    intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0]
275
276    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw)
277    self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw)
278
279    self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey))
280    self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey))
281
282    self.assertEqual(intro_point.onion_key_raw, base64_key(intro_point.onion_key()))
283    self.assertEqual(intro_point.enc_key_raw, base64_key(intro_point.enc_key()))
284
285    self.assertEqual(None, intro_point.legacy_key_raw)
286    self.assertEqual(None, intro_point.legacy_key())
287
288  @patch('stem.prereq.is_crypto_available', Mock(return_value = False))
289  def test_intro_point_crypto_without_prereq(self):
290    """
291    Fetch cryptographic materials when the module is unavailable.
292    """
293
294    intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0]
295    self.assertRaisesWith(ImportError, 'cryptography module unavailable', intro_point.onion_key)
296
297  @test.require.ed25519_support
298  def test_intro_point_creation(self):
299    """
300    Create an introduction point, encode it, then re-parse.
301    """
302
303    intro_point = IntroductionPointV3.create_for_address('1.1.1.1', 9001)
304
305    self.assertEqual(1, len(intro_point.link_specifiers))
306    self.assertEqual(stem.client.datatype.LinkByIPv4, type(intro_point.link_specifiers[0]))
307    self.assertEqual('1.1.1.1', intro_point.link_specifiers[0].address)
308    self.assertEqual(9001, intro_point.link_specifiers[0].port)
309
310    reparsed = IntroductionPointV3.parse(intro_point.encode())
311    self.assertEqual(intro_point, reparsed)
312
313  @test.require.ed25519_support
314  def test_inner_layer_creation(self):
315    """
316    Internal layer creation.
317    """
318
319    # minimal layer
320
321    self.assertEqual(b'create2-formats 2', InnerLayer.content())
322    self.assertEqual([2], InnerLayer.create().formats)
323
324    # specify their only mandatory parameter (formats)
325
326    self.assertEqual(b'create2-formats 1 2 3', InnerLayer.content({'create2-formats': '1 2 3'}))
327    self.assertEqual([1, 2, 3], InnerLayer.create({'create2-formats': '1 2 3'}).formats)
328
329    # include optional parameters
330
331    desc = InnerLayer.create(OrderedDict((
332      ('intro-auth-required', 'ed25519'),
333      ('single-onion-service', ''),
334    )))
335
336    self.assertEqual([2], desc.formats)
337    self.assertEqual(['ed25519'], desc.intro_auth)
338    self.assertEqual(True, desc.is_single_service)
339    self.assertEqual([], desc.introduction_points)
340
341    # include introduction points
342
343    desc = InnerLayer.create(introduction_points = [
344      IntroductionPointV3.create_for_address('1.1.1.1', 9001),
345      IntroductionPointV3.create_for_address('2.2.2.2', 9001),
346      IntroductionPointV3.create_for_address('3.3.3.3', 9001),
347    ])
348
349    self.assertEqual(3, len(desc.introduction_points))
350    self.assertEqual('1.1.1.1', desc.introduction_points[0].link_specifiers[0].address)
351
352    self.assertTrue(InnerLayer.content(introduction_points = [
353      IntroductionPointV3.create_for_address('1.1.1.1', 9001),
354    ]).startswith(b'create2-formats 2\nintroduction-point AQAGAQEBASMp'))
355
356  @test.require.ed25519_support
357  def test_outer_layer_creation(self):
358    """
359    Outer layer creation.
360    """
361
362    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
363
364    # minimal layer
365
366    self.assertTrue(OuterLayer.content().startswith(b'desc-auth-type x25519\ndesc-auth-ephemeral-key '))
367    self.assertEqual('x25519', OuterLayer.create().auth_type)
368
369    # specify the parameters
370
371    desc = OuterLayer.create({
372      'desc-auth-type': 'foo',
373      'desc-auth-ephemeral-key': 'bar',
374      'auth-client': [
375        'JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw',
376        '1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA',
377      ],
378      'encrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----',
379    })
380
381    self.assertEqual('foo', desc.auth_type)
382    self.assertEqual('bar', desc.ephemeral_key)
383    self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.encrypted)
384
385    self.assertEqual({
386      '1D8VBAh9hdM': AuthorizedClient(id = b'1D8VBAh9hdM', iv = b'6K/uO3sRqBp6URrKC7GB6Q', cookie = b'ElwRj5+6SN9kb8bRhiiQvA'),
387      'JNil86N07AA': AuthorizedClient(id = b'JNil86N07AA', iv = b'epkaL79NtajmgME/egi8oA', cookie = b'qosYH4rXisxda3X7p9b6fw'),
388    }, desc.clients)
389
390    self.assertEqual(EXPECTED_OUTER_LAYER, str(desc))
391
392    # create an inner layer then decrypt it
393
394    revision_counter = 5
395    blinded_key = stem.util._pubkey_bytes(Ed25519PrivateKey.generate())
396    subcredential = HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key)
397
398    outer_layer = OuterLayer.create(
399      inner_layer = InnerLayer.create(
400        introduction_points = [
401          IntroductionPointV3.create_for_address('1.1.1.1', 9001),
402        ]
403      ),
404      revision_counter = revision_counter,
405      subcredential = subcredential,
406      blinded_key = blinded_key,
407    )
408
409    inner_layer = InnerLayer._decrypt(outer_layer, revision_counter, subcredential, blinded_key)
410
411    self.assertEqual(1, len(inner_layer.introduction_points))
412    self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
413
414  @test.require.ed25519_support
415  def test_descriptor_creation(self):
416    """
417    HiddenServiceDescriptorV3 creation.
418    """
419
420    # minimal descriptor
421
422    self.assertTrue(HiddenServiceDescriptorV3.content().startswith(b'hs-descriptor 3\ndescriptor-lifetime 180\n'))
423    self.assertEqual(180, HiddenServiceDescriptorV3.create().lifetime)
424
425    # specify the parameters
426
427    desc = HiddenServiceDescriptorV3.create({
428      'hs-descriptor': '4',
429      'descriptor-lifetime': '123',
430      'descriptor-signing-key-cert': '\n-----BEGIN ED25519 CERT-----\nmalformed block\n-----END ED25519 CERT-----',
431      'revision-counter': '5',
432      'superencrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----',
433      'signature': 'abcde',
434    }, validate = False)
435
436    self.assertEqual(4, desc.version)
437    self.assertEqual(123, desc.lifetime)
438    self.assertEqual(None, desc.signing_cert)  # malformed cert dropped because validation is disabled
439    self.assertEqual(5, desc.revision_counter)
440    self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.superencrypted)
441    self.assertEqual('abcde', desc.signature)
442
443    # include introduction points
444
445    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
446
447    identity_key = Ed25519PrivateKey.generate()
448    onion_address = HiddenServiceDescriptorV3.address_from_identity_key(identity_key)
449
450    desc = HiddenServiceDescriptorV3.create(
451      identity_key = identity_key,
452      inner_layer = InnerLayer.create(introduction_points = [
453        IntroductionPointV3.create_for_address('1.1.1.1', 9001),
454        IntroductionPointV3.create_for_address('2.2.2.2', 9001),
455        IntroductionPointV3.create_for_address('3.3.3.3', 9001),
456      ]),
457    )
458
459    inner_layer = desc.decrypt(onion_address)
460    self.assertEqual(3, len(inner_layer.introduction_points))
461    self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address)
462
463  @test.require.ed25519_support
464  def test_blinding(self):
465    """
466    Create a descriptor with key blinding.
467    """
468
469    from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
470
471    expected_blinded_key = b'\xb5\xefEA\xfaI\x1a\xd8*p\xcd\x97\x01\x90O\xa8p\xd3\x10\x16\x8e-\x19\xab+\x92\xbc\xf6\xe7\x92\xc2k'
472
473    desc = HiddenServiceDescriptorV3.create(
474      identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32),
475      blinding_nonce = b'a' * 32,
476    )
477
478    self.assertEqual(64, len(desc.signing_cert.signature))
479    self.assertEqual(expected_blinded_key, desc.signing_cert.signing_key())
480