1import base64 2import json 3import os 4import os.path 5import random 6import shutil 7import tempfile 8import unittest 9 10from docker import auth, credentials, errors 11import pytest 12 13try: 14 from unittest import mock 15except ImportError: 16 from unittest import mock 17 18 19class RegressionTest(unittest.TestCase): 20 def test_803_urlsafe_encode(self): 21 auth_data = { 22 'username': 'root', 23 'password': 'GR?XGR?XGR?XGR?X' 24 } 25 encoded = auth.encode_header(auth_data) 26 assert b'/' not in encoded 27 assert b'_' in encoded 28 29 30class ResolveRepositoryNameTest(unittest.TestCase): 31 def test_resolve_repository_name_hub_library_image(self): 32 assert auth.resolve_repository_name('image') == ( 33 'docker.io', 'image' 34 ) 35 36 def test_resolve_repository_name_dotted_hub_library_image(self): 37 assert auth.resolve_repository_name('image.valid') == ( 38 'docker.io', 'image.valid' 39 ) 40 41 def test_resolve_repository_name_hub_image(self): 42 assert auth.resolve_repository_name('username/image') == ( 43 'docker.io', 'username/image' 44 ) 45 46 def test_explicit_hub_index_library_image(self): 47 assert auth.resolve_repository_name('docker.io/image') == ( 48 'docker.io', 'image' 49 ) 50 51 def test_explicit_legacy_hub_index_library_image(self): 52 assert auth.resolve_repository_name('index.docker.io/image') == ( 53 'docker.io', 'image' 54 ) 55 56 def test_resolve_repository_name_private_registry(self): 57 assert auth.resolve_repository_name('my.registry.net/image') == ( 58 'my.registry.net', 'image' 59 ) 60 61 def test_resolve_repository_name_private_registry_with_port(self): 62 assert auth.resolve_repository_name('my.registry.net:5000/image') == ( 63 'my.registry.net:5000', 'image' 64 ) 65 66 def test_resolve_repository_name_private_registry_with_username(self): 67 assert auth.resolve_repository_name( 68 'my.registry.net/username/image' 69 ) == ('my.registry.net', 'username/image') 70 71 def test_resolve_repository_name_no_dots_but_port(self): 72 assert auth.resolve_repository_name('hostname:5000/image') == ( 73 'hostname:5000', 'image' 74 ) 75 76 def test_resolve_repository_name_no_dots_but_port_and_username(self): 77 assert auth.resolve_repository_name( 78 'hostname:5000/username/image' 79 ) == ('hostname:5000', 'username/image') 80 81 def test_resolve_repository_name_localhost(self): 82 assert auth.resolve_repository_name('localhost/image') == ( 83 'localhost', 'image' 84 ) 85 86 def test_resolve_repository_name_localhost_with_username(self): 87 assert auth.resolve_repository_name('localhost/username/image') == ( 88 'localhost', 'username/image' 89 ) 90 91 def test_invalid_index_name(self): 92 with pytest.raises(errors.InvalidRepository): 93 auth.resolve_repository_name('-gecko.com/image') 94 95 96def encode_auth(auth_info): 97 return base64.b64encode( 98 auth_info.get('username', '').encode('utf-8') + b':' + 99 auth_info.get('password', '').encode('utf-8')) 100 101 102class ResolveAuthTest(unittest.TestCase): 103 index_config = {'auth': encode_auth({'username': 'indexuser'})} 104 private_config = {'auth': encode_auth({'username': 'privateuser'})} 105 legacy_config = {'auth': encode_auth({'username': 'legacyauth'})} 106 107 auth_config = auth.AuthConfig({ 108 'auths': auth.parse_auth({ 109 'https://index.docker.io/v1/': index_config, 110 'my.registry.net': private_config, 111 'http://legacy.registry.url/v1/': legacy_config, 112 }) 113 }) 114 115 def test_resolve_authconfig_hostname_only(self): 116 assert auth.resolve_authconfig( 117 self.auth_config, 'my.registry.net' 118 )['username'] == 'privateuser' 119 120 def test_resolve_authconfig_no_protocol(self): 121 assert auth.resolve_authconfig( 122 self.auth_config, 'my.registry.net/v1/' 123 )['username'] == 'privateuser' 124 125 def test_resolve_authconfig_no_path(self): 126 assert auth.resolve_authconfig( 127 self.auth_config, 'http://my.registry.net' 128 )['username'] == 'privateuser' 129 130 def test_resolve_authconfig_no_path_trailing_slash(self): 131 assert auth.resolve_authconfig( 132 self.auth_config, 'http://my.registry.net/' 133 )['username'] == 'privateuser' 134 135 def test_resolve_authconfig_no_path_wrong_secure_proto(self): 136 assert auth.resolve_authconfig( 137 self.auth_config, 'https://my.registry.net' 138 )['username'] == 'privateuser' 139 140 def test_resolve_authconfig_no_path_wrong_insecure_proto(self): 141 assert auth.resolve_authconfig( 142 self.auth_config, 'http://index.docker.io' 143 )['username'] == 'indexuser' 144 145 def test_resolve_authconfig_path_wrong_proto(self): 146 assert auth.resolve_authconfig( 147 self.auth_config, 'https://my.registry.net/v1/' 148 )['username'] == 'privateuser' 149 150 def test_resolve_authconfig_default_registry(self): 151 assert auth.resolve_authconfig( 152 self.auth_config 153 )['username'] == 'indexuser' 154 155 def test_resolve_authconfig_default_explicit_none(self): 156 assert auth.resolve_authconfig( 157 self.auth_config, None 158 )['username'] == 'indexuser' 159 160 def test_resolve_authconfig_fully_explicit(self): 161 assert auth.resolve_authconfig( 162 self.auth_config, 'http://my.registry.net/v1/' 163 )['username'] == 'privateuser' 164 165 def test_resolve_authconfig_legacy_config(self): 166 assert auth.resolve_authconfig( 167 self.auth_config, 'legacy.registry.url' 168 )['username'] == 'legacyauth' 169 170 def test_resolve_authconfig_no_match(self): 171 assert auth.resolve_authconfig( 172 self.auth_config, 'does.not.exist' 173 ) is None 174 175 def test_resolve_registry_and_auth_library_image(self): 176 image = 'image' 177 assert auth.resolve_authconfig( 178 self.auth_config, auth.resolve_repository_name(image)[0] 179 )['username'] == 'indexuser' 180 181 def test_resolve_registry_and_auth_hub_image(self): 182 image = 'username/image' 183 assert auth.resolve_authconfig( 184 self.auth_config, auth.resolve_repository_name(image)[0] 185 )['username'] == 'indexuser' 186 187 def test_resolve_registry_and_auth_explicit_hub(self): 188 image = 'docker.io/username/image' 189 assert auth.resolve_authconfig( 190 self.auth_config, auth.resolve_repository_name(image)[0] 191 )['username'] == 'indexuser' 192 193 def test_resolve_registry_and_auth_explicit_legacy_hub(self): 194 image = 'index.docker.io/username/image' 195 assert auth.resolve_authconfig( 196 self.auth_config, auth.resolve_repository_name(image)[0] 197 )['username'] == 'indexuser' 198 199 def test_resolve_registry_and_auth_private_registry(self): 200 image = 'my.registry.net/image' 201 assert auth.resolve_authconfig( 202 self.auth_config, auth.resolve_repository_name(image)[0] 203 )['username'] == 'privateuser' 204 205 def test_resolve_registry_and_auth_unauthenticated_registry(self): 206 image = 'other.registry.net/image' 207 assert auth.resolve_authconfig( 208 self.auth_config, auth.resolve_repository_name(image)[0] 209 ) is None 210 211 def test_resolve_auth_with_empty_credstore_and_auth_dict(self): 212 auth_config = auth.AuthConfig({ 213 'auths': auth.parse_auth({ 214 'https://index.docker.io/v1/': self.index_config, 215 }), 216 'credsStore': 'blackbox' 217 }) 218 with mock.patch( 219 'docker.auth.AuthConfig._resolve_authconfig_credstore' 220 ) as m: 221 m.return_value = None 222 assert 'indexuser' == auth.resolve_authconfig( 223 auth_config, None 224 )['username'] 225 226 227class LoadConfigTest(unittest.TestCase): 228 def test_load_config_no_file(self): 229 folder = tempfile.mkdtemp() 230 self.addCleanup(shutil.rmtree, folder) 231 cfg = auth.load_config(folder) 232 assert cfg is not None 233 234 def test_load_legacy_config(self): 235 folder = tempfile.mkdtemp() 236 self.addCleanup(shutil.rmtree, folder) 237 cfg_path = os.path.join(folder, '.dockercfg') 238 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 239 with open(cfg_path, 'w') as f: 240 f.write(f'auth = {auth_}\n') 241 f.write('email = sakuya@scarlet.net') 242 243 cfg = auth.load_config(cfg_path) 244 assert auth.resolve_authconfig(cfg) is not None 245 assert cfg.auths[auth.INDEX_NAME] is not None 246 cfg = cfg.auths[auth.INDEX_NAME] 247 assert cfg['username'] == 'sakuya' 248 assert cfg['password'] == 'izayoi' 249 assert cfg['email'] == 'sakuya@scarlet.net' 250 assert cfg.get('Auth') is None 251 252 def test_load_json_config(self): 253 folder = tempfile.mkdtemp() 254 self.addCleanup(shutil.rmtree, folder) 255 cfg_path = os.path.join(folder, '.dockercfg') 256 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 257 email = 'sakuya@scarlet.net' 258 with open(cfg_path, 'w') as f: 259 json.dump( 260 {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f 261 ) 262 cfg = auth.load_config(cfg_path) 263 assert auth.resolve_authconfig(cfg) is not None 264 assert cfg.auths[auth.INDEX_URL] is not None 265 cfg = cfg.auths[auth.INDEX_URL] 266 assert cfg['username'] == 'sakuya' 267 assert cfg['password'] == 'izayoi' 268 assert cfg['email'] == email 269 assert cfg.get('Auth') is None 270 271 def test_load_modern_json_config(self): 272 folder = tempfile.mkdtemp() 273 self.addCleanup(shutil.rmtree, folder) 274 cfg_path = os.path.join(folder, 'config.json') 275 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 276 email = 'sakuya@scarlet.net' 277 with open(cfg_path, 'w') as f: 278 json.dump({ 279 'auths': { 280 auth.INDEX_URL: { 281 'auth': auth_, 'email': email 282 } 283 } 284 }, f) 285 cfg = auth.load_config(cfg_path) 286 assert auth.resolve_authconfig(cfg) is not None 287 assert cfg.auths[auth.INDEX_URL] is not None 288 cfg = cfg.auths[auth.INDEX_URL] 289 assert cfg['username'] == 'sakuya' 290 assert cfg['password'] == 'izayoi' 291 assert cfg['email'] == email 292 293 def test_load_config_with_random_name(self): 294 folder = tempfile.mkdtemp() 295 self.addCleanup(shutil.rmtree, folder) 296 297 dockercfg_path = os.path.join(folder, 298 '.{}.dockercfg'.format( 299 random.randrange(100000))) 300 registry = 'https://your.private.registry.io' 301 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 302 config = { 303 registry: { 304 'auth': f'{auth_}', 305 'email': 'sakuya@scarlet.net' 306 } 307 } 308 309 with open(dockercfg_path, 'w') as f: 310 json.dump(config, f) 311 312 cfg = auth.load_config(dockercfg_path).auths 313 assert registry in cfg 314 assert cfg[registry] is not None 315 cfg = cfg[registry] 316 assert cfg['username'] == 'sakuya' 317 assert cfg['password'] == 'izayoi' 318 assert cfg['email'] == 'sakuya@scarlet.net' 319 assert cfg.get('auth') is None 320 321 def test_load_config_custom_config_env(self): 322 folder = tempfile.mkdtemp() 323 self.addCleanup(shutil.rmtree, folder) 324 325 dockercfg_path = os.path.join(folder, 'config.json') 326 registry = 'https://your.private.registry.io' 327 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 328 config = { 329 registry: { 330 'auth': f'{auth_}', 331 'email': 'sakuya@scarlet.net' 332 } 333 } 334 335 with open(dockercfg_path, 'w') as f: 336 json.dump(config, f) 337 338 with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): 339 cfg = auth.load_config(None).auths 340 assert registry in cfg 341 assert cfg[registry] is not None 342 cfg = cfg[registry] 343 assert cfg['username'] == 'sakuya' 344 assert cfg['password'] == 'izayoi' 345 assert cfg['email'] == 'sakuya@scarlet.net' 346 assert cfg.get('auth') is None 347 348 def test_load_config_custom_config_env_with_auths(self): 349 folder = tempfile.mkdtemp() 350 self.addCleanup(shutil.rmtree, folder) 351 352 dockercfg_path = os.path.join(folder, 'config.json') 353 registry = 'https://your.private.registry.io' 354 auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii') 355 config = { 356 'auths': { 357 registry: { 358 'auth': f'{auth_}', 359 'email': 'sakuya@scarlet.net' 360 } 361 } 362 } 363 364 with open(dockercfg_path, 'w') as f: 365 json.dump(config, f) 366 367 with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): 368 cfg = auth.load_config(None) 369 assert registry in cfg.auths 370 cfg = cfg.auths[registry] 371 assert cfg['username'] == 'sakuya' 372 assert cfg['password'] == 'izayoi' 373 assert cfg['email'] == 'sakuya@scarlet.net' 374 assert cfg.get('auth') is None 375 376 def test_load_config_custom_config_env_utf8(self): 377 folder = tempfile.mkdtemp() 378 self.addCleanup(shutil.rmtree, folder) 379 380 dockercfg_path = os.path.join(folder, 'config.json') 381 registry = 'https://your.private.registry.io' 382 auth_ = base64.b64encode( 383 b'sakuya\xc3\xa6:izayoi\xc3\xa6').decode('ascii') 384 config = { 385 'auths': { 386 registry: { 387 'auth': f'{auth_}', 388 'email': 'sakuya@scarlet.net' 389 } 390 } 391 } 392 393 with open(dockercfg_path, 'w') as f: 394 json.dump(config, f) 395 396 with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}): 397 cfg = auth.load_config(None) 398 assert registry in cfg.auths 399 cfg = cfg.auths[registry] 400 assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8') 401 assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8') 402 assert cfg['email'] == 'sakuya@scarlet.net' 403 assert cfg.get('auth') is None 404 405 def test_load_config_unknown_keys(self): 406 folder = tempfile.mkdtemp() 407 self.addCleanup(shutil.rmtree, folder) 408 dockercfg_path = os.path.join(folder, 'config.json') 409 config = { 410 'detachKeys': 'ctrl-q, ctrl-u, ctrl-i' 411 } 412 with open(dockercfg_path, 'w') as f: 413 json.dump(config, f) 414 415 cfg = auth.load_config(dockercfg_path) 416 assert dict(cfg) == {'auths': {}} 417 418 def test_load_config_invalid_auth_dict(self): 419 folder = tempfile.mkdtemp() 420 self.addCleanup(shutil.rmtree, folder) 421 dockercfg_path = os.path.join(folder, 'config.json') 422 config = { 423 'auths': { 424 'scarlet.net': {'sakuya': 'izayoi'} 425 } 426 } 427 with open(dockercfg_path, 'w') as f: 428 json.dump(config, f) 429 430 cfg = auth.load_config(dockercfg_path) 431 assert dict(cfg) == {'auths': {'scarlet.net': {}}} 432 433 def test_load_config_identity_token(self): 434 folder = tempfile.mkdtemp() 435 registry = 'scarlet.net' 436 token = '1ce1cebb-503e-7043-11aa-7feb8bd4a1ce' 437 self.addCleanup(shutil.rmtree, folder) 438 dockercfg_path = os.path.join(folder, 'config.json') 439 auth_entry = encode_auth({'username': 'sakuya'}).decode('ascii') 440 config = { 441 'auths': { 442 registry: { 443 'auth': auth_entry, 444 'identitytoken': token 445 } 446 } 447 } 448 with open(dockercfg_path, 'w') as f: 449 json.dump(config, f) 450 451 cfg = auth.load_config(dockercfg_path) 452 assert registry in cfg.auths 453 cfg = cfg.auths[registry] 454 assert 'IdentityToken' in cfg 455 assert cfg['IdentityToken'] == token 456 457 458class CredstoreTest(unittest.TestCase): 459 def setUp(self): 460 self.authconfig = auth.AuthConfig({'credsStore': 'default'}) 461 self.default_store = InMemoryStore('default') 462 self.authconfig._stores['default'] = self.default_store 463 self.default_store.store( 464 'https://gensokyo.jp/v2', 'sakuya', 'izayoi', 465 ) 466 self.default_store.store( 467 'https://default.com/v2', 'user', 'hunter2', 468 ) 469 470 def test_get_credential_store(self): 471 auth_config = auth.AuthConfig({ 472 'credHelpers': { 473 'registry1.io': 'truesecret', 474 'registry2.io': 'powerlock' 475 }, 476 'credsStore': 'blackbox', 477 }) 478 479 assert auth_config.get_credential_store('registry1.io') == 'truesecret' 480 assert auth_config.get_credential_store('registry2.io') == 'powerlock' 481 assert auth_config.get_credential_store('registry3.io') == 'blackbox' 482 483 def test_get_credential_store_no_default(self): 484 auth_config = auth.AuthConfig({ 485 'credHelpers': { 486 'registry1.io': 'truesecret', 487 'registry2.io': 'powerlock' 488 }, 489 }) 490 assert auth_config.get_credential_store('registry2.io') == 'powerlock' 491 assert auth_config.get_credential_store('registry3.io') is None 492 493 def test_get_credential_store_default_index(self): 494 auth_config = auth.AuthConfig({ 495 'credHelpers': { 496 'https://index.docker.io/v1/': 'powerlock' 497 }, 498 'credsStore': 'truesecret' 499 }) 500 501 assert auth_config.get_credential_store(None) == 'powerlock' 502 assert auth_config.get_credential_store('docker.io') == 'powerlock' 503 assert auth_config.get_credential_store('images.io') == 'truesecret' 504 505 def test_get_credential_store_with_plain_dict(self): 506 auth_config = { 507 'credHelpers': { 508 'registry1.io': 'truesecret', 509 'registry2.io': 'powerlock' 510 }, 511 'credsStore': 'blackbox', 512 } 513 514 assert auth.get_credential_store( 515 auth_config, 'registry1.io' 516 ) == 'truesecret' 517 assert auth.get_credential_store( 518 auth_config, 'registry2.io' 519 ) == 'powerlock' 520 assert auth.get_credential_store( 521 auth_config, 'registry3.io' 522 ) == 'blackbox' 523 524 def test_get_all_credentials_credstore_only(self): 525 assert self.authconfig.get_all_credentials() == { 526 'https://gensokyo.jp/v2': { 527 'Username': 'sakuya', 528 'Password': 'izayoi', 529 'ServerAddress': 'https://gensokyo.jp/v2', 530 }, 531 'gensokyo.jp': { 532 'Username': 'sakuya', 533 'Password': 'izayoi', 534 'ServerAddress': 'https://gensokyo.jp/v2', 535 }, 536 'https://default.com/v2': { 537 'Username': 'user', 538 'Password': 'hunter2', 539 'ServerAddress': 'https://default.com/v2', 540 }, 541 'default.com': { 542 'Username': 'user', 543 'Password': 'hunter2', 544 'ServerAddress': 'https://default.com/v2', 545 }, 546 } 547 548 def test_get_all_credentials_with_empty_credhelper(self): 549 self.authconfig['credHelpers'] = { 550 'registry1.io': 'truesecret', 551 } 552 self.authconfig._stores['truesecret'] = InMemoryStore() 553 assert self.authconfig.get_all_credentials() == { 554 'https://gensokyo.jp/v2': { 555 'Username': 'sakuya', 556 'Password': 'izayoi', 557 'ServerAddress': 'https://gensokyo.jp/v2', 558 }, 559 'gensokyo.jp': { 560 'Username': 'sakuya', 561 'Password': 'izayoi', 562 'ServerAddress': 'https://gensokyo.jp/v2', 563 }, 564 'https://default.com/v2': { 565 'Username': 'user', 566 'Password': 'hunter2', 567 'ServerAddress': 'https://default.com/v2', 568 }, 569 'default.com': { 570 'Username': 'user', 571 'Password': 'hunter2', 572 'ServerAddress': 'https://default.com/v2', 573 }, 574 'registry1.io': None, 575 } 576 577 def test_get_all_credentials_with_credhelpers_only(self): 578 del self.authconfig['credsStore'] 579 assert self.authconfig.get_all_credentials() == {} 580 581 self.authconfig['credHelpers'] = { 582 'https://gensokyo.jp/v2': 'default', 583 'https://default.com/v2': 'default', 584 } 585 586 assert self.authconfig.get_all_credentials() == { 587 'https://gensokyo.jp/v2': { 588 'Username': 'sakuya', 589 'Password': 'izayoi', 590 'ServerAddress': 'https://gensokyo.jp/v2', 591 }, 592 'gensokyo.jp': { 593 'Username': 'sakuya', 594 'Password': 'izayoi', 595 'ServerAddress': 'https://gensokyo.jp/v2', 596 }, 597 'https://default.com/v2': { 598 'Username': 'user', 599 'Password': 'hunter2', 600 'ServerAddress': 'https://default.com/v2', 601 }, 602 'default.com': { 603 'Username': 'user', 604 'Password': 'hunter2', 605 'ServerAddress': 'https://default.com/v2', 606 }, 607 } 608 609 def test_get_all_credentials_with_auths_entries(self): 610 self.authconfig.add_auth('registry1.io', { 611 'ServerAddress': 'registry1.io', 612 'Username': 'reimu', 613 'Password': 'hakurei', 614 }) 615 616 assert self.authconfig.get_all_credentials() == { 617 'https://gensokyo.jp/v2': { 618 'Username': 'sakuya', 619 'Password': 'izayoi', 620 'ServerAddress': 'https://gensokyo.jp/v2', 621 }, 622 'gensokyo.jp': { 623 'Username': 'sakuya', 624 'Password': 'izayoi', 625 'ServerAddress': 'https://gensokyo.jp/v2', 626 }, 627 'https://default.com/v2': { 628 'Username': 'user', 629 'Password': 'hunter2', 630 'ServerAddress': 'https://default.com/v2', 631 }, 632 'default.com': { 633 'Username': 'user', 634 'Password': 'hunter2', 635 'ServerAddress': 'https://default.com/v2', 636 }, 637 'registry1.io': { 638 'ServerAddress': 'registry1.io', 639 'Username': 'reimu', 640 'Password': 'hakurei', 641 }, 642 } 643 644 def test_get_all_credentials_with_empty_auths_entry(self): 645 self.authconfig.add_auth('default.com', {}) 646 647 assert self.authconfig.get_all_credentials() == { 648 'https://gensokyo.jp/v2': { 649 'Username': 'sakuya', 650 'Password': 'izayoi', 651 'ServerAddress': 'https://gensokyo.jp/v2', 652 }, 653 'gensokyo.jp': { 654 'Username': 'sakuya', 655 'Password': 'izayoi', 656 'ServerAddress': 'https://gensokyo.jp/v2', 657 }, 658 'https://default.com/v2': { 659 'Username': 'user', 660 'Password': 'hunter2', 661 'ServerAddress': 'https://default.com/v2', 662 }, 663 'default.com': { 664 'Username': 'user', 665 'Password': 'hunter2', 666 'ServerAddress': 'https://default.com/v2', 667 }, 668 } 669 670 def test_get_all_credentials_credstore_overrides_auth_entry(self): 671 self.authconfig.add_auth('default.com', { 672 'Username': 'shouldnotsee', 673 'Password': 'thisentry', 674 'ServerAddress': 'https://default.com/v2', 675 }) 676 677 assert self.authconfig.get_all_credentials() == { 678 'https://gensokyo.jp/v2': { 679 'Username': 'sakuya', 680 'Password': 'izayoi', 681 'ServerAddress': 'https://gensokyo.jp/v2', 682 }, 683 'gensokyo.jp': { 684 'Username': 'sakuya', 685 'Password': 'izayoi', 686 'ServerAddress': 'https://gensokyo.jp/v2', 687 }, 688 'https://default.com/v2': { 689 'Username': 'user', 690 'Password': 'hunter2', 691 'ServerAddress': 'https://default.com/v2', 692 }, 693 'default.com': { 694 'Username': 'user', 695 'Password': 'hunter2', 696 'ServerAddress': 'https://default.com/v2', 697 }, 698 } 699 700 def test_get_all_credentials_helpers_override_default(self): 701 self.authconfig['credHelpers'] = { 702 'https://default.com/v2': 'truesecret', 703 } 704 truesecret = InMemoryStore('truesecret') 705 truesecret.store('https://default.com/v2', 'reimu', 'hakurei') 706 self.authconfig._stores['truesecret'] = truesecret 707 assert self.authconfig.get_all_credentials() == { 708 'https://gensokyo.jp/v2': { 709 'Username': 'sakuya', 710 'Password': 'izayoi', 711 'ServerAddress': 'https://gensokyo.jp/v2', 712 }, 713 'gensokyo.jp': { 714 'Username': 'sakuya', 715 'Password': 'izayoi', 716 'ServerAddress': 'https://gensokyo.jp/v2', 717 }, 718 'https://default.com/v2': { 719 'Username': 'reimu', 720 'Password': 'hakurei', 721 'ServerAddress': 'https://default.com/v2', 722 }, 723 'default.com': { 724 'Username': 'reimu', 725 'Password': 'hakurei', 726 'ServerAddress': 'https://default.com/v2', 727 }, 728 } 729 730 def test_get_all_credentials_3_sources(self): 731 self.authconfig['credHelpers'] = { 732 'registry1.io': 'truesecret', 733 } 734 truesecret = InMemoryStore('truesecret') 735 truesecret.store('registry1.io', 'reimu', 'hakurei') 736 self.authconfig._stores['truesecret'] = truesecret 737 self.authconfig.add_auth('registry2.io', { 738 'ServerAddress': 'registry2.io', 739 'Username': 'reimu', 740 'Password': 'hakurei', 741 }) 742 743 assert self.authconfig.get_all_credentials() == { 744 'https://gensokyo.jp/v2': { 745 'Username': 'sakuya', 746 'Password': 'izayoi', 747 'ServerAddress': 'https://gensokyo.jp/v2', 748 }, 749 'gensokyo.jp': { 750 'Username': 'sakuya', 751 'Password': 'izayoi', 752 'ServerAddress': 'https://gensokyo.jp/v2', 753 }, 754 'https://default.com/v2': { 755 'Username': 'user', 756 'Password': 'hunter2', 757 'ServerAddress': 'https://default.com/v2', 758 }, 759 'default.com': { 760 'Username': 'user', 761 'Password': 'hunter2', 762 'ServerAddress': 'https://default.com/v2', 763 }, 764 'registry1.io': { 765 'ServerAddress': 'registry1.io', 766 'Username': 'reimu', 767 'Password': 'hakurei', 768 }, 769 'registry2.io': { 770 'ServerAddress': 'registry2.io', 771 'Username': 'reimu', 772 'Password': 'hakurei', 773 } 774 } 775 776 777class InMemoryStore(credentials.Store): 778 def __init__(self, *args, **kwargs): 779 self.__store = {} 780 781 def get(self, server): 782 try: 783 return self.__store[server] 784 except KeyError: 785 raise credentials.errors.CredentialsNotFound() 786 787 def store(self, server, username, secret): 788 self.__store[server] = { 789 'ServerURL': server, 790 'Username': username, 791 'Secret': secret, 792 } 793 794 def list(self): 795 return { 796 k: v['Username'] for k, v in self.__store.items() 797 } 798 799 def erase(self, server): 800 del self.__store[server] 801