1import json 2import os 3import subprocess 4 5import six 6 7from . import constants 8from . import errors 9from .utils import create_environment_dict 10from .utils import find_executable 11 12 13class Store(object): 14 def __init__(self, program, environment=None): 15 """ Create a store object that acts as an interface to 16 perform the basic operations for storing, retrieving 17 and erasing credentials using `program`. 18 """ 19 self.program = constants.PROGRAM_PREFIX + program 20 self.exe = find_executable(self.program) 21 self.environment = environment 22 if self.exe is None: 23 raise errors.InitializationError( 24 '{0} not installed or not available in PATH'.format( 25 self.program 26 ) 27 ) 28 29 def get(self, server): 30 """ Retrieve credentials for `server`. If no credentials are found, 31 a `StoreError` will be raised. 32 """ 33 if not isinstance(server, six.binary_type): 34 server = server.encode('utf-8') 35 data = self._execute('get', server) 36 result = json.loads(data.decode('utf-8')) 37 38 # docker-credential-pass will return an object for inexistent servers 39 # whereas other helpers will exit with returncode != 0. For 40 # consistency, if no significant data is returned, 41 # raise CredentialsNotFound 42 if result['Username'] == '' and result['Secret'] == '': 43 raise errors.CredentialsNotFound( 44 'No matching credentials in {}'.format(self.program) 45 ) 46 47 return result 48 49 def store(self, server, username, secret): 50 """ Store credentials for `server`. Raises a `StoreError` if an error 51 occurs. 52 """ 53 data_input = json.dumps({ 54 'ServerURL': server, 55 'Username': username, 56 'Secret': secret 57 }).encode('utf-8') 58 return self._execute('store', data_input) 59 60 def erase(self, server): 61 """ Erase credentials for `server`. Raises a `StoreError` if an error 62 occurs. 63 """ 64 if not isinstance(server, six.binary_type): 65 server = server.encode('utf-8') 66 self._execute('erase', server) 67 68 def list(self): 69 """ List stored credentials. Requires v0.4.0+ of the helper. 70 """ 71 data = self._execute('list', None) 72 return json.loads(data.decode('utf-8')) 73 74 def _execute(self, subcmd, data_input): 75 output = None 76 env = create_environment_dict(self.environment) 77 try: 78 if six.PY3: 79 output = subprocess.check_output( 80 [self.exe, subcmd], input=data_input, env=env, 81 ) 82 else: 83 process = subprocess.Popen( 84 [self.exe, subcmd], stdin=subprocess.PIPE, 85 stdout=subprocess.PIPE, env=env, 86 ) 87 output, err = process.communicate(data_input) 88 if process.returncode != 0: 89 raise subprocess.CalledProcessError( 90 returncode=process.returncode, cmd='', output=output 91 ) 92 except subprocess.CalledProcessError as e: 93 raise errors.process_store_error(e, self.program) 94 except OSError as e: 95 if e.errno == os.errno.ENOENT: 96 raise errors.StoreError( 97 '{0} not installed or not available in PATH'.format( 98 self.program 99 ) 100 ) 101 else: 102 raise errors.StoreError( 103 'Unexpected OS error "{0}", errno={1}'.format( 104 e.strerror, e.errno 105 ) 106 ) 107 return output 108