1import base64
2import json
3import os
4import os.path
5import random
6import shutil
7import tempfile
8import unittest
9
10from docker import auth, credentials, errors
11import pytest
12
13try:
14    from unittest import mock
15except ImportError:
16    from unittest import mock
17
18
19class RegressionTest(unittest.TestCase):
20    def test_803_urlsafe_encode(self):
21        auth_data = {
22            'username': 'root',
23            'password': 'GR?XGR?XGR?XGR?X'
24        }
25        encoded = auth.encode_header(auth_data)
26        assert b'/' not in encoded
27        assert b'_' in encoded
28
29
30class ResolveRepositoryNameTest(unittest.TestCase):
31    def test_resolve_repository_name_hub_library_image(self):
32        assert auth.resolve_repository_name('image') == (
33            'docker.io', 'image'
34        )
35
36    def test_resolve_repository_name_dotted_hub_library_image(self):
37        assert auth.resolve_repository_name('image.valid') == (
38            'docker.io', 'image.valid'
39        )
40
41    def test_resolve_repository_name_hub_image(self):
42        assert auth.resolve_repository_name('username/image') == (
43            'docker.io', 'username/image'
44        )
45
46    def test_explicit_hub_index_library_image(self):
47        assert auth.resolve_repository_name('docker.io/image') == (
48            'docker.io', 'image'
49        )
50
51    def test_explicit_legacy_hub_index_library_image(self):
52        assert auth.resolve_repository_name('index.docker.io/image') == (
53            'docker.io', 'image'
54        )
55
56    def test_resolve_repository_name_private_registry(self):
57        assert auth.resolve_repository_name('my.registry.net/image') == (
58            'my.registry.net', 'image'
59        )
60
61    def test_resolve_repository_name_private_registry_with_port(self):
62        assert auth.resolve_repository_name('my.registry.net:5000/image') == (
63            'my.registry.net:5000', 'image'
64        )
65
66    def test_resolve_repository_name_private_registry_with_username(self):
67        assert auth.resolve_repository_name(
68            'my.registry.net/username/image'
69        ) == ('my.registry.net', 'username/image')
70
71    def test_resolve_repository_name_no_dots_but_port(self):
72        assert auth.resolve_repository_name('hostname:5000/image') == (
73            'hostname:5000', 'image'
74        )
75
76    def test_resolve_repository_name_no_dots_but_port_and_username(self):
77        assert auth.resolve_repository_name(
78            'hostname:5000/username/image'
79        ) == ('hostname:5000', 'username/image')
80
81    def test_resolve_repository_name_localhost(self):
82        assert auth.resolve_repository_name('localhost/image') == (
83            'localhost', 'image'
84        )
85
86    def test_resolve_repository_name_localhost_with_username(self):
87        assert auth.resolve_repository_name('localhost/username/image') == (
88            'localhost', 'username/image'
89        )
90
91    def test_invalid_index_name(self):
92        with pytest.raises(errors.InvalidRepository):
93            auth.resolve_repository_name('-gecko.com/image')
94
95
96def encode_auth(auth_info):
97    return base64.b64encode(
98        auth_info.get('username', '').encode('utf-8') + b':' +
99        auth_info.get('password', '').encode('utf-8'))
100
101
102class ResolveAuthTest(unittest.TestCase):
103    index_config = {'auth': encode_auth({'username': 'indexuser'})}
104    private_config = {'auth': encode_auth({'username': 'privateuser'})}
105    legacy_config = {'auth': encode_auth({'username': 'legacyauth'})}
106
107    auth_config = auth.AuthConfig({
108        'auths': auth.parse_auth({
109            'https://index.docker.io/v1/': index_config,
110            'my.registry.net': private_config,
111            'http://legacy.registry.url/v1/': legacy_config,
112        })
113    })
114
115    def test_resolve_authconfig_hostname_only(self):
116        assert auth.resolve_authconfig(
117            self.auth_config, 'my.registry.net'
118        )['username'] == 'privateuser'
119
120    def test_resolve_authconfig_no_protocol(self):
121        assert auth.resolve_authconfig(
122            self.auth_config, 'my.registry.net/v1/'
123        )['username'] == 'privateuser'
124
125    def test_resolve_authconfig_no_path(self):
126        assert auth.resolve_authconfig(
127            self.auth_config, 'http://my.registry.net'
128        )['username'] == 'privateuser'
129
130    def test_resolve_authconfig_no_path_trailing_slash(self):
131        assert auth.resolve_authconfig(
132            self.auth_config, 'http://my.registry.net/'
133        )['username'] == 'privateuser'
134
135    def test_resolve_authconfig_no_path_wrong_secure_proto(self):
136        assert auth.resolve_authconfig(
137            self.auth_config, 'https://my.registry.net'
138        )['username'] == 'privateuser'
139
140    def test_resolve_authconfig_no_path_wrong_insecure_proto(self):
141        assert auth.resolve_authconfig(
142            self.auth_config, 'http://index.docker.io'
143        )['username'] == 'indexuser'
144
145    def test_resolve_authconfig_path_wrong_proto(self):
146        assert auth.resolve_authconfig(
147            self.auth_config, 'https://my.registry.net/v1/'
148        )['username'] == 'privateuser'
149
150    def test_resolve_authconfig_default_registry(self):
151        assert auth.resolve_authconfig(
152            self.auth_config
153        )['username'] == 'indexuser'
154
155    def test_resolve_authconfig_default_explicit_none(self):
156        assert auth.resolve_authconfig(
157            self.auth_config, None
158        )['username'] == 'indexuser'
159
160    def test_resolve_authconfig_fully_explicit(self):
161        assert auth.resolve_authconfig(
162            self.auth_config, 'http://my.registry.net/v1/'
163        )['username'] == 'privateuser'
164
165    def test_resolve_authconfig_legacy_config(self):
166        assert auth.resolve_authconfig(
167            self.auth_config, 'legacy.registry.url'
168        )['username'] == 'legacyauth'
169
170    def test_resolve_authconfig_no_match(self):
171        assert auth.resolve_authconfig(
172            self.auth_config, 'does.not.exist'
173        ) is None
174
175    def test_resolve_registry_and_auth_library_image(self):
176        image = 'image'
177        assert auth.resolve_authconfig(
178            self.auth_config, auth.resolve_repository_name(image)[0]
179        )['username'] == 'indexuser'
180
181    def test_resolve_registry_and_auth_hub_image(self):
182        image = 'username/image'
183        assert auth.resolve_authconfig(
184            self.auth_config, auth.resolve_repository_name(image)[0]
185        )['username'] == 'indexuser'
186
187    def test_resolve_registry_and_auth_explicit_hub(self):
188        image = 'docker.io/username/image'
189        assert auth.resolve_authconfig(
190            self.auth_config, auth.resolve_repository_name(image)[0]
191        )['username'] == 'indexuser'
192
193    def test_resolve_registry_and_auth_explicit_legacy_hub(self):
194        image = 'index.docker.io/username/image'
195        assert auth.resolve_authconfig(
196            self.auth_config, auth.resolve_repository_name(image)[0]
197        )['username'] == 'indexuser'
198
199    def test_resolve_registry_and_auth_private_registry(self):
200        image = 'my.registry.net/image'
201        assert auth.resolve_authconfig(
202            self.auth_config, auth.resolve_repository_name(image)[0]
203        )['username'] == 'privateuser'
204
205    def test_resolve_registry_and_auth_unauthenticated_registry(self):
206        image = 'other.registry.net/image'
207        assert auth.resolve_authconfig(
208            self.auth_config, auth.resolve_repository_name(image)[0]
209        ) is None
210
211    def test_resolve_auth_with_empty_credstore_and_auth_dict(self):
212        auth_config = auth.AuthConfig({
213            'auths': auth.parse_auth({
214                'https://index.docker.io/v1/': self.index_config,
215            }),
216            'credsStore': 'blackbox'
217        })
218        with mock.patch(
219            'docker.auth.AuthConfig._resolve_authconfig_credstore'
220        ) as m:
221            m.return_value = None
222            assert 'indexuser' == auth.resolve_authconfig(
223                auth_config, None
224            )['username']
225
226
227class LoadConfigTest(unittest.TestCase):
228    def test_load_config_no_file(self):
229        folder = tempfile.mkdtemp()
230        self.addCleanup(shutil.rmtree, folder)
231        cfg = auth.load_config(folder)
232        assert cfg is not None
233
234    def test_load_legacy_config(self):
235        folder = tempfile.mkdtemp()
236        self.addCleanup(shutil.rmtree, folder)
237        cfg_path = os.path.join(folder, '.dockercfg')
238        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
239        with open(cfg_path, 'w') as f:
240            f.write(f'auth = {auth_}\n')
241            f.write('email = sakuya@scarlet.net')
242
243        cfg = auth.load_config(cfg_path)
244        assert auth.resolve_authconfig(cfg) is not None
245        assert cfg.auths[auth.INDEX_NAME] is not None
246        cfg = cfg.auths[auth.INDEX_NAME]
247        assert cfg['username'] == 'sakuya'
248        assert cfg['password'] == 'izayoi'
249        assert cfg['email'] == 'sakuya@scarlet.net'
250        assert cfg.get('Auth') is None
251
252    def test_load_json_config(self):
253        folder = tempfile.mkdtemp()
254        self.addCleanup(shutil.rmtree, folder)
255        cfg_path = os.path.join(folder, '.dockercfg')
256        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
257        email = 'sakuya@scarlet.net'
258        with open(cfg_path, 'w') as f:
259            json.dump(
260                {auth.INDEX_URL: {'auth': auth_, 'email': email}}, f
261            )
262        cfg = auth.load_config(cfg_path)
263        assert auth.resolve_authconfig(cfg) is not None
264        assert cfg.auths[auth.INDEX_URL] is not None
265        cfg = cfg.auths[auth.INDEX_URL]
266        assert cfg['username'] == 'sakuya'
267        assert cfg['password'] == 'izayoi'
268        assert cfg['email'] == email
269        assert cfg.get('Auth') is None
270
271    def test_load_modern_json_config(self):
272        folder = tempfile.mkdtemp()
273        self.addCleanup(shutil.rmtree, folder)
274        cfg_path = os.path.join(folder, 'config.json')
275        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
276        email = 'sakuya@scarlet.net'
277        with open(cfg_path, 'w') as f:
278            json.dump({
279                'auths': {
280                    auth.INDEX_URL: {
281                        'auth': auth_, 'email': email
282                    }
283                }
284            }, f)
285        cfg = auth.load_config(cfg_path)
286        assert auth.resolve_authconfig(cfg) is not None
287        assert cfg.auths[auth.INDEX_URL] is not None
288        cfg = cfg.auths[auth.INDEX_URL]
289        assert cfg['username'] == 'sakuya'
290        assert cfg['password'] == 'izayoi'
291        assert cfg['email'] == email
292
293    def test_load_config_with_random_name(self):
294        folder = tempfile.mkdtemp()
295        self.addCleanup(shutil.rmtree, folder)
296
297        dockercfg_path = os.path.join(folder,
298                                      '.{}.dockercfg'.format(
299                                          random.randrange(100000)))
300        registry = 'https://your.private.registry.io'
301        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
302        config = {
303            registry: {
304                'auth': f'{auth_}',
305                'email': 'sakuya@scarlet.net'
306            }
307        }
308
309        with open(dockercfg_path, 'w') as f:
310            json.dump(config, f)
311
312        cfg = auth.load_config(dockercfg_path).auths
313        assert registry in cfg
314        assert cfg[registry] is not None
315        cfg = cfg[registry]
316        assert cfg['username'] == 'sakuya'
317        assert cfg['password'] == 'izayoi'
318        assert cfg['email'] == 'sakuya@scarlet.net'
319        assert cfg.get('auth') is None
320
321    def test_load_config_custom_config_env(self):
322        folder = tempfile.mkdtemp()
323        self.addCleanup(shutil.rmtree, folder)
324
325        dockercfg_path = os.path.join(folder, 'config.json')
326        registry = 'https://your.private.registry.io'
327        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
328        config = {
329            registry: {
330                'auth': f'{auth_}',
331                'email': 'sakuya@scarlet.net'
332            }
333        }
334
335        with open(dockercfg_path, 'w') as f:
336            json.dump(config, f)
337
338        with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
339            cfg = auth.load_config(None).auths
340            assert registry in cfg
341            assert cfg[registry] is not None
342            cfg = cfg[registry]
343            assert cfg['username'] == 'sakuya'
344            assert cfg['password'] == 'izayoi'
345            assert cfg['email'] == 'sakuya@scarlet.net'
346            assert cfg.get('auth') is None
347
348    def test_load_config_custom_config_env_with_auths(self):
349        folder = tempfile.mkdtemp()
350        self.addCleanup(shutil.rmtree, folder)
351
352        dockercfg_path = os.path.join(folder, 'config.json')
353        registry = 'https://your.private.registry.io'
354        auth_ = base64.b64encode(b'sakuya:izayoi').decode('ascii')
355        config = {
356            'auths': {
357                registry: {
358                    'auth': f'{auth_}',
359                    'email': 'sakuya@scarlet.net'
360                }
361            }
362        }
363
364        with open(dockercfg_path, 'w') as f:
365            json.dump(config, f)
366
367        with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
368            cfg = auth.load_config(None)
369            assert registry in cfg.auths
370            cfg = cfg.auths[registry]
371            assert cfg['username'] == 'sakuya'
372            assert cfg['password'] == 'izayoi'
373            assert cfg['email'] == 'sakuya@scarlet.net'
374            assert cfg.get('auth') is None
375
376    def test_load_config_custom_config_env_utf8(self):
377        folder = tempfile.mkdtemp()
378        self.addCleanup(shutil.rmtree, folder)
379
380        dockercfg_path = os.path.join(folder, 'config.json')
381        registry = 'https://your.private.registry.io'
382        auth_ = base64.b64encode(
383            b'sakuya\xc3\xa6:izayoi\xc3\xa6').decode('ascii')
384        config = {
385            'auths': {
386                registry: {
387                    'auth': f'{auth_}',
388                    'email': 'sakuya@scarlet.net'
389                }
390            }
391        }
392
393        with open(dockercfg_path, 'w') as f:
394            json.dump(config, f)
395
396        with mock.patch.dict(os.environ, {'DOCKER_CONFIG': folder}):
397            cfg = auth.load_config(None)
398            assert registry in cfg.auths
399            cfg = cfg.auths[registry]
400            assert cfg['username'] == b'sakuya\xc3\xa6'.decode('utf8')
401            assert cfg['password'] == b'izayoi\xc3\xa6'.decode('utf8')
402            assert cfg['email'] == 'sakuya@scarlet.net'
403            assert cfg.get('auth') is None
404
405    def test_load_config_unknown_keys(self):
406        folder = tempfile.mkdtemp()
407        self.addCleanup(shutil.rmtree, folder)
408        dockercfg_path = os.path.join(folder, 'config.json')
409        config = {
410            'detachKeys': 'ctrl-q, ctrl-u, ctrl-i'
411        }
412        with open(dockercfg_path, 'w') as f:
413            json.dump(config, f)
414
415        cfg = auth.load_config(dockercfg_path)
416        assert dict(cfg) == {'auths': {}}
417
418    def test_load_config_invalid_auth_dict(self):
419        folder = tempfile.mkdtemp()
420        self.addCleanup(shutil.rmtree, folder)
421        dockercfg_path = os.path.join(folder, 'config.json')
422        config = {
423            'auths': {
424                'scarlet.net': {'sakuya': 'izayoi'}
425            }
426        }
427        with open(dockercfg_path, 'w') as f:
428            json.dump(config, f)
429
430        cfg = auth.load_config(dockercfg_path)
431        assert dict(cfg) == {'auths': {'scarlet.net': {}}}
432
433    def test_load_config_identity_token(self):
434        folder = tempfile.mkdtemp()
435        registry = 'scarlet.net'
436        token = '1ce1cebb-503e-7043-11aa-7feb8bd4a1ce'
437        self.addCleanup(shutil.rmtree, folder)
438        dockercfg_path = os.path.join(folder, 'config.json')
439        auth_entry = encode_auth({'username': 'sakuya'}).decode('ascii')
440        config = {
441            'auths': {
442                registry: {
443                    'auth': auth_entry,
444                    'identitytoken': token
445                }
446            }
447        }
448        with open(dockercfg_path, 'w') as f:
449            json.dump(config, f)
450
451        cfg = auth.load_config(dockercfg_path)
452        assert registry in cfg.auths
453        cfg = cfg.auths[registry]
454        assert 'IdentityToken' in cfg
455        assert cfg['IdentityToken'] == token
456
457
458class CredstoreTest(unittest.TestCase):
459    def setUp(self):
460        self.authconfig = auth.AuthConfig({'credsStore': 'default'})
461        self.default_store = InMemoryStore('default')
462        self.authconfig._stores['default'] = self.default_store
463        self.default_store.store(
464            'https://gensokyo.jp/v2', 'sakuya', 'izayoi',
465        )
466        self.default_store.store(
467            'https://default.com/v2', 'user', 'hunter2',
468        )
469
470    def test_get_credential_store(self):
471        auth_config = auth.AuthConfig({
472            'credHelpers': {
473                'registry1.io': 'truesecret',
474                'registry2.io': 'powerlock'
475            },
476            'credsStore': 'blackbox',
477        })
478
479        assert auth_config.get_credential_store('registry1.io') == 'truesecret'
480        assert auth_config.get_credential_store('registry2.io') == 'powerlock'
481        assert auth_config.get_credential_store('registry3.io') == 'blackbox'
482
483    def test_get_credential_store_no_default(self):
484        auth_config = auth.AuthConfig({
485            'credHelpers': {
486                'registry1.io': 'truesecret',
487                'registry2.io': 'powerlock'
488            },
489        })
490        assert auth_config.get_credential_store('registry2.io') == 'powerlock'
491        assert auth_config.get_credential_store('registry3.io') is None
492
493    def test_get_credential_store_default_index(self):
494        auth_config = auth.AuthConfig({
495            'credHelpers': {
496                'https://index.docker.io/v1/': 'powerlock'
497            },
498            'credsStore': 'truesecret'
499        })
500
501        assert auth_config.get_credential_store(None) == 'powerlock'
502        assert auth_config.get_credential_store('docker.io') == 'powerlock'
503        assert auth_config.get_credential_store('images.io') == 'truesecret'
504
505    def test_get_credential_store_with_plain_dict(self):
506        auth_config = {
507            'credHelpers': {
508                'registry1.io': 'truesecret',
509                'registry2.io': 'powerlock'
510            },
511            'credsStore': 'blackbox',
512        }
513
514        assert auth.get_credential_store(
515            auth_config, 'registry1.io'
516        ) == 'truesecret'
517        assert auth.get_credential_store(
518            auth_config, 'registry2.io'
519        ) == 'powerlock'
520        assert auth.get_credential_store(
521            auth_config, 'registry3.io'
522        ) == 'blackbox'
523
524    def test_get_all_credentials_credstore_only(self):
525        assert self.authconfig.get_all_credentials() == {
526            'https://gensokyo.jp/v2': {
527                'Username': 'sakuya',
528                'Password': 'izayoi',
529                'ServerAddress': 'https://gensokyo.jp/v2',
530            },
531            'gensokyo.jp': {
532                'Username': 'sakuya',
533                'Password': 'izayoi',
534                'ServerAddress': 'https://gensokyo.jp/v2',
535            },
536            'https://default.com/v2': {
537                'Username': 'user',
538                'Password': 'hunter2',
539                'ServerAddress': 'https://default.com/v2',
540            },
541            'default.com': {
542                'Username': 'user',
543                'Password': 'hunter2',
544                'ServerAddress': 'https://default.com/v2',
545            },
546        }
547
548    def test_get_all_credentials_with_empty_credhelper(self):
549        self.authconfig['credHelpers'] = {
550            'registry1.io': 'truesecret',
551        }
552        self.authconfig._stores['truesecret'] = InMemoryStore()
553        assert self.authconfig.get_all_credentials() == {
554            'https://gensokyo.jp/v2': {
555                'Username': 'sakuya',
556                'Password': 'izayoi',
557                'ServerAddress': 'https://gensokyo.jp/v2',
558            },
559            'gensokyo.jp': {
560                'Username': 'sakuya',
561                'Password': 'izayoi',
562                'ServerAddress': 'https://gensokyo.jp/v2',
563            },
564            'https://default.com/v2': {
565                'Username': 'user',
566                'Password': 'hunter2',
567                'ServerAddress': 'https://default.com/v2',
568            },
569            'default.com': {
570                'Username': 'user',
571                'Password': 'hunter2',
572                'ServerAddress': 'https://default.com/v2',
573            },
574            'registry1.io': None,
575        }
576
577    def test_get_all_credentials_with_credhelpers_only(self):
578        del self.authconfig['credsStore']
579        assert self.authconfig.get_all_credentials() == {}
580
581        self.authconfig['credHelpers'] = {
582            'https://gensokyo.jp/v2': 'default',
583            'https://default.com/v2': 'default',
584        }
585
586        assert self.authconfig.get_all_credentials() == {
587            'https://gensokyo.jp/v2': {
588                'Username': 'sakuya',
589                'Password': 'izayoi',
590                'ServerAddress': 'https://gensokyo.jp/v2',
591            },
592            'gensokyo.jp': {
593                'Username': 'sakuya',
594                'Password': 'izayoi',
595                'ServerAddress': 'https://gensokyo.jp/v2',
596            },
597            'https://default.com/v2': {
598                'Username': 'user',
599                'Password': 'hunter2',
600                'ServerAddress': 'https://default.com/v2',
601            },
602            'default.com': {
603                'Username': 'user',
604                'Password': 'hunter2',
605                'ServerAddress': 'https://default.com/v2',
606            },
607        }
608
609    def test_get_all_credentials_with_auths_entries(self):
610        self.authconfig.add_auth('registry1.io', {
611            'ServerAddress': 'registry1.io',
612            'Username': 'reimu',
613            'Password': 'hakurei',
614        })
615
616        assert self.authconfig.get_all_credentials() == {
617            'https://gensokyo.jp/v2': {
618                'Username': 'sakuya',
619                'Password': 'izayoi',
620                'ServerAddress': 'https://gensokyo.jp/v2',
621            },
622            'gensokyo.jp': {
623                'Username': 'sakuya',
624                'Password': 'izayoi',
625                'ServerAddress': 'https://gensokyo.jp/v2',
626            },
627            'https://default.com/v2': {
628                'Username': 'user',
629                'Password': 'hunter2',
630                'ServerAddress': 'https://default.com/v2',
631            },
632            'default.com': {
633                'Username': 'user',
634                'Password': 'hunter2',
635                'ServerAddress': 'https://default.com/v2',
636            },
637            'registry1.io': {
638                'ServerAddress': 'registry1.io',
639                'Username': 'reimu',
640                'Password': 'hakurei',
641            },
642        }
643
644    def test_get_all_credentials_with_empty_auths_entry(self):
645        self.authconfig.add_auth('default.com', {})
646
647        assert self.authconfig.get_all_credentials() == {
648            'https://gensokyo.jp/v2': {
649                'Username': 'sakuya',
650                'Password': 'izayoi',
651                'ServerAddress': 'https://gensokyo.jp/v2',
652            },
653            'gensokyo.jp': {
654                'Username': 'sakuya',
655                'Password': 'izayoi',
656                'ServerAddress': 'https://gensokyo.jp/v2',
657            },
658            'https://default.com/v2': {
659                'Username': 'user',
660                'Password': 'hunter2',
661                'ServerAddress': 'https://default.com/v2',
662            },
663            'default.com': {
664                'Username': 'user',
665                'Password': 'hunter2',
666                'ServerAddress': 'https://default.com/v2',
667            },
668        }
669
670    def test_get_all_credentials_credstore_overrides_auth_entry(self):
671        self.authconfig.add_auth('default.com', {
672            'Username': 'shouldnotsee',
673            'Password': 'thisentry',
674            'ServerAddress': 'https://default.com/v2',
675        })
676
677        assert self.authconfig.get_all_credentials() == {
678            'https://gensokyo.jp/v2': {
679                'Username': 'sakuya',
680                'Password': 'izayoi',
681                'ServerAddress': 'https://gensokyo.jp/v2',
682            },
683            'gensokyo.jp': {
684                'Username': 'sakuya',
685                'Password': 'izayoi',
686                'ServerAddress': 'https://gensokyo.jp/v2',
687            },
688            'https://default.com/v2': {
689                'Username': 'user',
690                'Password': 'hunter2',
691                'ServerAddress': 'https://default.com/v2',
692            },
693            'default.com': {
694                'Username': 'user',
695                'Password': 'hunter2',
696                'ServerAddress': 'https://default.com/v2',
697            },
698        }
699
700    def test_get_all_credentials_helpers_override_default(self):
701        self.authconfig['credHelpers'] = {
702            'https://default.com/v2': 'truesecret',
703        }
704        truesecret = InMemoryStore('truesecret')
705        truesecret.store('https://default.com/v2', 'reimu', 'hakurei')
706        self.authconfig._stores['truesecret'] = truesecret
707        assert self.authconfig.get_all_credentials() == {
708            'https://gensokyo.jp/v2': {
709                'Username': 'sakuya',
710                'Password': 'izayoi',
711                'ServerAddress': 'https://gensokyo.jp/v2',
712            },
713            'gensokyo.jp': {
714                'Username': 'sakuya',
715                'Password': 'izayoi',
716                'ServerAddress': 'https://gensokyo.jp/v2',
717            },
718            'https://default.com/v2': {
719                'Username': 'reimu',
720                'Password': 'hakurei',
721                'ServerAddress': 'https://default.com/v2',
722            },
723            'default.com': {
724                'Username': 'reimu',
725                'Password': 'hakurei',
726                'ServerAddress': 'https://default.com/v2',
727            },
728        }
729
730    def test_get_all_credentials_3_sources(self):
731        self.authconfig['credHelpers'] = {
732            'registry1.io': 'truesecret',
733        }
734        truesecret = InMemoryStore('truesecret')
735        truesecret.store('registry1.io', 'reimu', 'hakurei')
736        self.authconfig._stores['truesecret'] = truesecret
737        self.authconfig.add_auth('registry2.io', {
738            'ServerAddress': 'registry2.io',
739            'Username': 'reimu',
740            'Password': 'hakurei',
741        })
742
743        assert self.authconfig.get_all_credentials() == {
744            'https://gensokyo.jp/v2': {
745                'Username': 'sakuya',
746                'Password': 'izayoi',
747                'ServerAddress': 'https://gensokyo.jp/v2',
748            },
749            'gensokyo.jp': {
750                'Username': 'sakuya',
751                'Password': 'izayoi',
752                'ServerAddress': 'https://gensokyo.jp/v2',
753            },
754            'https://default.com/v2': {
755                'Username': 'user',
756                'Password': 'hunter2',
757                'ServerAddress': 'https://default.com/v2',
758            },
759            'default.com': {
760                'Username': 'user',
761                'Password': 'hunter2',
762                'ServerAddress': 'https://default.com/v2',
763            },
764            'registry1.io': {
765                'ServerAddress': 'registry1.io',
766                'Username': 'reimu',
767                'Password': 'hakurei',
768            },
769            'registry2.io': {
770                'ServerAddress': 'registry2.io',
771                'Username': 'reimu',
772                'Password': 'hakurei',
773            }
774        }
775
776
777class InMemoryStore(credentials.Store):
778    def __init__(self, *args, **kwargs):
779        self.__store = {}
780
781    def get(self, server):
782        try:
783            return self.__store[server]
784        except KeyError:
785            raise credentials.errors.CredentialsNotFound()
786
787    def store(self, server, username, secret):
788        self.__store[server] = {
789            'ServerURL': server,
790            'Username': username,
791            'Secret': secret,
792        }
793
794    def list(self):
795        return {
796            k: v['Username'] for k, v in self.__store.items()
797        }
798
799    def erase(self, server):
800        del self.__store[server]
801