1""" 2Unit tests for stem.descriptor.hidden_service for version 3. 3""" 4 5import base64 6import functools 7import unittest 8 9import stem.client.datatype 10import stem.descriptor 11import stem.descriptor.hidden_service 12import stem.prereq 13 14import test.require 15 16from stem.descriptor.hidden_service import ( 17 IntroductionPointV3, 18 HiddenServiceDescriptorV3, 19 AuthorizedClient, 20 OuterLayer, 21 InnerLayer, 22) 23 24from test.unit.descriptor import ( 25 get_resource, 26 base_expect_invalid_attr, 27 base_expect_invalid_attr_for_text, 28) 29 30try: 31 # added in python 2.7 32 from collections import OrderedDict 33except ImportError: 34 from stem.util.ordereddict import OrderedDict 35 36try: 37 # added in python 3.3 38 from unittest.mock import patch, Mock 39except ImportError: 40 from mock import patch, Mock 41 42require_sha3 = test.require.needs(stem.prereq._is_sha3_available, 'requires sha3') 43require_x25519 = test.require.needs(lambda: stem.descriptor.hidden_service.X25519_AVAILABLE, 'requires openssl x5509') 44 45expect_invalid_attr = functools.partial(base_expect_invalid_attr, HiddenServiceDescriptorV3, 'version', 3) 46expect_invalid_attr_for_text = functools.partial(base_expect_invalid_attr_for_text, HiddenServiceDescriptorV3, 'version', 3) 47 48HS_ADDRESS = u'sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion' 49HS_PUBKEY = b'\x92\xe6\x80\xfaWU.}HL\x9d*>\xdbF\xfb\xc0v\xe5N\xa9\x0bw\xbb\x84\xe3\xe6\xd5e}R\xa1' 50 51EXPECTED_SIGNING_CERT = """\ 52-----BEGIN ED25519 CERT----- 53AQgABl5/AZLmgPpXVS59SEydKj7bRvvAduVOqQt3u4Tj5tVlfVKhAQAgBABUhpfe 54/Wd3p/M74DphsGcIMee/npQ9BTzkzCyTyVmDbykek2EciWaOTCVZJVyiKPErngfW 55BDwQZ8rhp05oCqhhY3oFHqG9KS7HGzv9g2v1/PrVJMbkfpwu1YK4b3zIZAk= 56-----END ED25519 CERT-----\ 57""" 58 59EXPECTED_OUTER_LAYER = """\ 60desc-auth-type foo 61desc-auth-ephemeral-key bar 62auth-client JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw 63auth-client 1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA 64encrypted 65-----BEGIN MESSAGE----- 66malformed block 67-----END MESSAGE-----\ 68""" 69 70with open(get_resource('hidden_service_v3')) as descriptor_file: 71 HS_DESC_STR = descriptor_file.read() 72 73with open(get_resource('hidden_service_v3_outer_layer')) as outer_layer_file: 74 OUTER_LAYER_STR = outer_layer_file.read() 75 76with open(get_resource('hidden_service_v3_inner_layer')) as inner_layer_file: 77 INNER_LAYER_STR = inner_layer_file.read() 78 79with open(get_resource('hidden_service_v3_intro_point')) as intro_point_file: 80 INTRO_POINT_STR = intro_point_file.read() 81 82 83class TestHiddenServiceDescriptorV3(unittest.TestCase): 84 def test_real_descriptor(self): 85 """ 86 Parse a descriptor for a testing hidden service from asn... 87 88 sltib6sxkuxh2scmtuvd5w2g7pahnzkovefxpo4e4ptnkzl5kkq5h2ad.onion 89 """ 90 91 with open(get_resource('hidden_service_v3'), 'rb') as descriptor_file: 92 desc = next(stem.descriptor.parse_file(descriptor_file, 'hidden-service-descriptor-3 1.0', validate = True)) 93 94 self.assertEqual(3, desc.version) 95 self.assertEqual(180, desc.lifetime) 96 self.assertEqual(EXPECTED_SIGNING_CERT, str(desc.signing_cert)) 97 self.assertEqual(42, desc.revision_counter) 98 self.assertTrue('eaH8VdaTKS' in desc.superencrypted) 99 self.assertEqual('aglChCQF+lbzKgyxJJTpYGVShV/GMDRJ4+cRGCp+a2y/yX/tLSh7hzqI7rVZrUoGj74Xr1CLMYO3fXYCS+DPDQ', desc.signature) 100 101 @require_sha3 102 @test.require.ed25519_support 103 def test_decryption(self): 104 """ 105 Decrypt our descriptor and validate its content. 106 """ 107 108 desc = HiddenServiceDescriptorV3.from_str(HS_DESC_STR) 109 inner_layer = desc.decrypt(HS_ADDRESS) 110 111 self.assertEqual(INNER_LAYER_STR, str(inner_layer)) 112 self.assertEqual(OUTER_LAYER_STR.rstrip('\x00'), str(inner_layer.outer)) 113 114 def test_outer_layer(self): 115 """ 116 Parse the outer layer of our test descriptor. 117 """ 118 119 desc = OuterLayer(OUTER_LAYER_STR) 120 121 self.assertEqual('x25519', desc.auth_type) 122 self.assertEqual('WjZCU9sV1oxkxaPcd7/YozeZgq0lEs6DhWyrdYRNJR4=', desc.ephemeral_key) 123 self.assertTrue('BsRYMH/No+LgetIFv' in desc.encrypted) 124 125 client = desc.clients['D0Bz0OlEMCg'] 126 127 self.assertEqual(16, len(desc.clients)) 128 self.assertEqual('D0Bz0OlEMCg', client.id) 129 self.assertEqual('or3nS3ScSPYfLJuP9osGiQ', client.iv) 130 self.assertEqual('B40RdIWhw7kdA7lt3KJPvQ', client.cookie) 131 132 def test_inner_layer(self): 133 """ 134 Parse the inner layer of our test descriptor. 135 """ 136 137 desc = InnerLayer(INNER_LAYER_STR) 138 139 self.assertEqual([2], desc.formats) 140 self.assertEqual(['ed25519'], desc.intro_auth) 141 self.assertEqual(True, desc.is_single_service) 142 self.assertEqual(4, len(desc.introduction_points)) 143 144 intro_point = desc.introduction_points[0] 145 146 self.assertEqual(2, len(intro_point.link_specifiers)) 147 148 link_specifier = intro_point.link_specifiers[0] 149 self.assertEqual(stem.client.datatype.LinkByFingerprint, type(link_specifier)) 150 self.assertEqual('CCCCCCCCCCCCCCCCCCCC', link_specifier.fingerprint) 151 152 link_specifier = intro_point.link_specifiers[1] 153 self.assertEqual(stem.client.datatype.LinkByIPv4, type(link_specifier)) 154 self.assertEqual('1.2.3.4', link_specifier.address) 155 self.assertEqual(9001, link_specifier.port) 156 157 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw) 158 self.assertTrue('ID2l9EFNrp' in intro_point.auth_key_cert.to_base64()) 159 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw) 160 self.assertTrue('ZvjPt5IfeQ', intro_point.enc_key_cert) 161 self.assertEqual(None, intro_point.legacy_key_raw) 162 self.assertEqual(None, intro_point.legacy_key_cert) 163 164 @test.require.ed25519_support 165 def test_required_fields(self): 166 """ 167 Check that we require the mandatory fields. 168 """ 169 170 line_to_attr = { 171 'hs-descriptor': 'version', 172 'descriptor-lifetime': 'lifetime', 173 'descriptor-signing-key-cert': 'signing_cert', 174 'revision-counter': 'revision_counter', 175 'superencrypted': 'superencrypted', 176 'signature': 'signature', 177 } 178 179 for line in stem.descriptor.hidden_service.REQUIRED_V3_FIELDS: 180 desc_text = HiddenServiceDescriptorV3.content(exclude = (line,)) 181 expect_invalid_attr_for_text(self, desc_text, line_to_attr[line], None) 182 183 @test.require.ed25519_support 184 def test_invalid_version(self): 185 """ 186 Checks that our version field expects a numeric value. 187 """ 188 189 test_values = ( 190 '', 191 '-10', 192 'hello', 193 ) 194 195 for test_value in test_values: 196 expect_invalid_attr(self, {'hs-descriptor': test_value}, 'version') 197 198 @test.require.ed25519_support 199 def test_invalid_lifetime(self): 200 """ 201 Checks that our lifetime field expects a numeric value. 202 """ 203 204 test_values = ( 205 '', 206 '-10', 207 'hello', 208 ) 209 210 for test_value in test_values: 211 expect_invalid_attr(self, {'descriptor-lifetime': test_value}, 'lifetime') 212 213 @test.require.ed25519_support 214 def test_invalid_revision_counter(self): 215 """ 216 Checks that our revision counter field expects a numeric value. 217 """ 218 219 test_values = ( 220 '', 221 '-10', 222 'hello', 223 ) 224 225 for test_value in test_values: 226 expect_invalid_attr(self, {'revision-counter': test_value}, 'revision_counter') 227 228 @require_sha3 229 def test_address_from_identity_key(self): 230 self.assertEqual(HS_ADDRESS, HiddenServiceDescriptorV3.address_from_identity_key(HS_PUBKEY)) 231 232 @require_sha3 233 def test_identity_key_from_address(self): 234 self.assertEqual(HS_PUBKEY, HiddenServiceDescriptorV3.identity_key_from_address(HS_ADDRESS)) 235 self.assertRaisesWith(ValueError, "'boom.onion' isn't a valid hidden service v3 address", HiddenServiceDescriptorV3.identity_key_from_address, 'boom') 236 self.assertRaisesWith(ValueError, 'Bad checksum (expected def7 but was 842e)', HiddenServiceDescriptorV3.identity_key_from_address, '5' * 56) 237 238 def test_intro_point_parse(self): 239 """ 240 Parse a v3 introduction point. 241 """ 242 243 intro_point = IntroductionPointV3.parse(INTRO_POINT_STR) 244 245 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw) 246 self.assertTrue('0Acq8QW8O7O' in intro_point.auth_key_cert.to_base64()) 247 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw) 248 self.assertTrue('4807i5', intro_point.enc_key_cert.to_base64()) 249 self.assertTrue('JAoGBAMO3' in intro_point.legacy_key_raw) 250 self.assertTrue('Ln1ITJ0qP' in intro_point.legacy_key_cert) 251 252 def test_intro_point_encode(self): 253 """ 254 Encode an introduction point back into a string. 255 """ 256 257 intro_point = IntroductionPointV3.parse(INTRO_POINT_STR) 258 self.assertEqual(INTRO_POINT_STR.rstrip(), intro_point.encode()) 259 260 @require_x25519 261 @test.require.ed25519_support 262 def test_intro_point_crypto(self): 263 """ 264 Retrieve IntroductionPointV3 cryptographic materials. 265 """ 266 267 def base64_key(key): 268 pubkey = stem.util._pubkey_bytes(key) 269 pubkey_b64 = base64.b64encode(pubkey) 270 return stem.util.str_tools._to_unicode(pubkey_b64) 271 272 from cryptography.hazmat.backends.openssl.x25519 import X25519PublicKey 273 274 intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0] 275 276 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.onion_key_raw) 277 self.assertEqual('AAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAAA=', intro_point.enc_key_raw) 278 279 self.assertTrue(isinstance(intro_point.onion_key(), X25519PublicKey)) 280 self.assertTrue(isinstance(intro_point.enc_key(), X25519PublicKey)) 281 282 self.assertEqual(intro_point.onion_key_raw, base64_key(intro_point.onion_key())) 283 self.assertEqual(intro_point.enc_key_raw, base64_key(intro_point.enc_key())) 284 285 self.assertEqual(None, intro_point.legacy_key_raw) 286 self.assertEqual(None, intro_point.legacy_key()) 287 288 @patch('stem.prereq.is_crypto_available', Mock(return_value = False)) 289 def test_intro_point_crypto_without_prereq(self): 290 """ 291 Fetch cryptographic materials when the module is unavailable. 292 """ 293 294 intro_point = InnerLayer(INNER_LAYER_STR).introduction_points[0] 295 self.assertRaisesWith(ImportError, 'cryptography module unavailable', intro_point.onion_key) 296 297 @test.require.ed25519_support 298 def test_intro_point_creation(self): 299 """ 300 Create an introduction point, encode it, then re-parse. 301 """ 302 303 intro_point = IntroductionPointV3.create_for_address('1.1.1.1', 9001) 304 305 self.assertEqual(1, len(intro_point.link_specifiers)) 306 self.assertEqual(stem.client.datatype.LinkByIPv4, type(intro_point.link_specifiers[0])) 307 self.assertEqual('1.1.1.1', intro_point.link_specifiers[0].address) 308 self.assertEqual(9001, intro_point.link_specifiers[0].port) 309 310 reparsed = IntroductionPointV3.parse(intro_point.encode()) 311 self.assertEqual(intro_point, reparsed) 312 313 @test.require.ed25519_support 314 def test_inner_layer_creation(self): 315 """ 316 Internal layer creation. 317 """ 318 319 # minimal layer 320 321 self.assertEqual(b'create2-formats 2', InnerLayer.content()) 322 self.assertEqual([2], InnerLayer.create().formats) 323 324 # specify their only mandatory parameter (formats) 325 326 self.assertEqual(b'create2-formats 1 2 3', InnerLayer.content({'create2-formats': '1 2 3'})) 327 self.assertEqual([1, 2, 3], InnerLayer.create({'create2-formats': '1 2 3'}).formats) 328 329 # include optional parameters 330 331 desc = InnerLayer.create(OrderedDict(( 332 ('intro-auth-required', 'ed25519'), 333 ('single-onion-service', ''), 334 ))) 335 336 self.assertEqual([2], desc.formats) 337 self.assertEqual(['ed25519'], desc.intro_auth) 338 self.assertEqual(True, desc.is_single_service) 339 self.assertEqual([], desc.introduction_points) 340 341 # include introduction points 342 343 desc = InnerLayer.create(introduction_points = [ 344 IntroductionPointV3.create_for_address('1.1.1.1', 9001), 345 IntroductionPointV3.create_for_address('2.2.2.2', 9001), 346 IntroductionPointV3.create_for_address('3.3.3.3', 9001), 347 ]) 348 349 self.assertEqual(3, len(desc.introduction_points)) 350 self.assertEqual('1.1.1.1', desc.introduction_points[0].link_specifiers[0].address) 351 352 self.assertTrue(InnerLayer.content(introduction_points = [ 353 IntroductionPointV3.create_for_address('1.1.1.1', 9001), 354 ]).startswith(b'create2-formats 2\nintroduction-point AQAGAQEBASMp')) 355 356 @test.require.ed25519_support 357 def test_outer_layer_creation(self): 358 """ 359 Outer layer creation. 360 """ 361 362 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 363 364 # minimal layer 365 366 self.assertTrue(OuterLayer.content().startswith(b'desc-auth-type x25519\ndesc-auth-ephemeral-key ')) 367 self.assertEqual('x25519', OuterLayer.create().auth_type) 368 369 # specify the parameters 370 371 desc = OuterLayer.create({ 372 'desc-auth-type': 'foo', 373 'desc-auth-ephemeral-key': 'bar', 374 'auth-client': [ 375 'JNil86N07AA epkaL79NtajmgME/egi8oA qosYH4rXisxda3X7p9b6fw', 376 '1D8VBAh9hdM 6K/uO3sRqBp6URrKC7GB6Q ElwRj5+6SN9kb8bRhiiQvA', 377 ], 378 'encrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', 379 }) 380 381 self.assertEqual('foo', desc.auth_type) 382 self.assertEqual('bar', desc.ephemeral_key) 383 self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.encrypted) 384 385 self.assertEqual({ 386 '1D8VBAh9hdM': AuthorizedClient(id = b'1D8VBAh9hdM', iv = b'6K/uO3sRqBp6URrKC7GB6Q', cookie = b'ElwRj5+6SN9kb8bRhiiQvA'), 387 'JNil86N07AA': AuthorizedClient(id = b'JNil86N07AA', iv = b'epkaL79NtajmgME/egi8oA', cookie = b'qosYH4rXisxda3X7p9b6fw'), 388 }, desc.clients) 389 390 self.assertEqual(EXPECTED_OUTER_LAYER, str(desc)) 391 392 # create an inner layer then decrypt it 393 394 revision_counter = 5 395 blinded_key = stem.util._pubkey_bytes(Ed25519PrivateKey.generate()) 396 subcredential = HiddenServiceDescriptorV3._subcredential(Ed25519PrivateKey.generate(), blinded_key) 397 398 outer_layer = OuterLayer.create( 399 inner_layer = InnerLayer.create( 400 introduction_points = [ 401 IntroductionPointV3.create_for_address('1.1.1.1', 9001), 402 ] 403 ), 404 revision_counter = revision_counter, 405 subcredential = subcredential, 406 blinded_key = blinded_key, 407 ) 408 409 inner_layer = InnerLayer._decrypt(outer_layer, revision_counter, subcredential, blinded_key) 410 411 self.assertEqual(1, len(inner_layer.introduction_points)) 412 self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address) 413 414 @test.require.ed25519_support 415 def test_descriptor_creation(self): 416 """ 417 HiddenServiceDescriptorV3 creation. 418 """ 419 420 # minimal descriptor 421 422 self.assertTrue(HiddenServiceDescriptorV3.content().startswith(b'hs-descriptor 3\ndescriptor-lifetime 180\n')) 423 self.assertEqual(180, HiddenServiceDescriptorV3.create().lifetime) 424 425 # specify the parameters 426 427 desc = HiddenServiceDescriptorV3.create({ 428 'hs-descriptor': '4', 429 'descriptor-lifetime': '123', 430 'descriptor-signing-key-cert': '\n-----BEGIN ED25519 CERT-----\nmalformed block\n-----END ED25519 CERT-----', 431 'revision-counter': '5', 432 'superencrypted': '\n-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', 433 'signature': 'abcde', 434 }, validate = False) 435 436 self.assertEqual(4, desc.version) 437 self.assertEqual(123, desc.lifetime) 438 self.assertEqual(None, desc.signing_cert) # malformed cert dropped because validation is disabled 439 self.assertEqual(5, desc.revision_counter) 440 self.assertEqual('-----BEGIN MESSAGE-----\nmalformed block\n-----END MESSAGE-----', desc.superencrypted) 441 self.assertEqual('abcde', desc.signature) 442 443 # include introduction points 444 445 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 446 447 identity_key = Ed25519PrivateKey.generate() 448 onion_address = HiddenServiceDescriptorV3.address_from_identity_key(identity_key) 449 450 desc = HiddenServiceDescriptorV3.create( 451 identity_key = identity_key, 452 inner_layer = InnerLayer.create(introduction_points = [ 453 IntroductionPointV3.create_for_address('1.1.1.1', 9001), 454 IntroductionPointV3.create_for_address('2.2.2.2', 9001), 455 IntroductionPointV3.create_for_address('3.3.3.3', 9001), 456 ]), 457 ) 458 459 inner_layer = desc.decrypt(onion_address) 460 self.assertEqual(3, len(inner_layer.introduction_points)) 461 self.assertEqual('1.1.1.1', inner_layer.introduction_points[0].link_specifiers[0].address) 462 463 @test.require.ed25519_support 464 def test_blinding(self): 465 """ 466 Create a descriptor with key blinding. 467 """ 468 469 from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 470 471 expected_blinded_key = b'\xb5\xefEA\xfaI\x1a\xd8*p\xcd\x97\x01\x90O\xa8p\xd3\x10\x16\x8e-\x19\xab+\x92\xbc\xf6\xe7\x92\xc2k' 472 473 desc = HiddenServiceDescriptorV3.create( 474 identity_key = Ed25519PrivateKey.from_private_bytes(b'a' * 32), 475 blinding_nonce = b'a' * 32, 476 ) 477 478 self.assertEqual(64, len(desc.signing_cert.signature)) 479 self.assertEqual(expected_blinded_key, desc.signing_cert.signing_key()) 480