1# Copyright (c), Edoardo Tenani <e.tenani@arduino.cc>, 2018-2020 2# Simplified BSD License (see licenses/simplified_bsd.txt or https://opensource.org/licenses/BSD-2-Clause) 3 4from __future__ import absolute_import, division, print_function 5__metaclass__ = type 6 7 8import abc 9import os 10 11from ansible.module_utils import six 12from ansible.module_utils.common.text.converters import to_text, to_native 13 14# Since this is used both by plugins and modules, we need subprocess in case the `module` parameter is not used 15from subprocess import Popen, PIPE 16 17 18# From https://github.com/mozilla/sops/blob/master/cmd/sops/codes/codes.go 19# Should be manually updated 20SOPS_ERROR_CODES = { 21 1: "ErrorGeneric", 22 2: "CouldNotReadInputFile", 23 3: "CouldNotWriteOutputFile", 24 4: "ErrorDumpingTree", 25 5: "ErrorReadingConfig", 26 6: "ErrorInvalidKMSEncryptionContextFormat", 27 7: "ErrorInvalidSetFormat", 28 8: "ErrorConflictingParameters", 29 21: "ErrorEncryptingMac", 30 23: "ErrorEncryptingTree", 31 24: "ErrorDecryptingMac", 32 25: "ErrorDecryptingTree", 33 49: "CannotChangeKeysFromNonExistentFile", 34 51: "MacMismatch", 35 52: "MacNotFound", 36 61: "ConfigFileNotFound", 37 85: "KeyboardInterrupt", 38 91: "InvalidTreePathFormat", 39 100: "NoFileSpecified", 40 128: "CouldNotRetrieveKey", 41 111: "NoEncryptionKeyFound", 42 200: "FileHasNotBeenModified", 43 201: "NoEditorFound", 44 202: "FailedToCompareVersions", 45 203: "FileAlreadyEncrypted" 46} 47 48 49def _create_single_arg(argument_name): 50 def f(value, arguments, env): 51 arguments.extend([argument_name, to_native(value)]) 52 53 return f 54 55 56def _create_comma_separated(argument_name): 57 def f(value, arguments, env): 58 arguments.extend([argument_name, ','.join([to_native(v) for v in value])]) 59 60 return f 61 62 63def _create_repeated(argument_name): 64 def f(value, arguments, env): 65 for v in value: 66 arguments.extend([argument_name, to_native(v)]) 67 68 return f 69 70 71def _create_boolean(argument_name): 72 def f(value, arguments, env): 73 if value: 74 arguments.append(argument_name) 75 76 return f 77 78 79def _create_env_variable(argument_name): 80 def f(value, arguments, env): 81 env[argument_name] = value 82 83 return f 84 85 86GENERAL_OPTIONS = { 87 'aws_profile': _create_single_arg('--aws-profile'), 88 'aws_access_key_id': _create_env_variable('AWS_ACCESS_KEY_ID'), 89 'aws_secret_access_key': _create_env_variable('AWS_SECRET_ACCESS_KEY'), 90 'aws_session_token': _create_env_variable('AWS_SESSION_TOKEN'), 91 'config_path': _create_single_arg('--config'), 92 'enable_local_keyservice': _create_boolean('--enable-local-keyservice'), 93 'keyservice': _create_repeated('--keyservice'), 94} 95 96 97ENCRYPT_OPTIONS = { 98 'kms': _create_comma_separated('--kms'), 99 'gcp_kms': _create_comma_separated('--gcp-kms'), 100 'azure_kv': _create_comma_separated('--azure-kv'), 101 'hc_vault_transit': _create_comma_separated('--hc-vault-transit'), 102 'pgp': _create_comma_separated('--pgp'), 103 'unencrypted_suffix': _create_single_arg('--unencrypted-suffix'), 104 'encrypted_suffix': _create_single_arg('--encrypted-suffix'), 105 'unencrypted_regex': _create_single_arg('--unencrypted-regex'), 106 'encrypted_regex': _create_single_arg('--encrypted-regex'), 107 'encryption_context': _create_comma_separated('--encryption-context'), 108 'shamir_secret_sharing_threshold': _create_single_arg('--shamir-secret-sharing-threshold'), 109} 110 111 112class SopsError(Exception): 113 ''' Extend Exception class with sops specific informations ''' 114 115 def __init__(self, filename, exit_code, message, decryption=True): 116 if exit_code in SOPS_ERROR_CODES: 117 exception_name = SOPS_ERROR_CODES[exit_code] 118 message = "error with file %s: %s exited with code %d: %s" % ( 119 filename, exception_name, exit_code, to_native(message)) 120 else: 121 message = "could not %s file %s; Unknown sops error code: %s; message: %s" % ( 122 'decrypt' if decryption else 'encrypt', filename, exit_code, to_native(message)) 123 super(SopsError, self).__init__(message) 124 125 126class Sops(): 127 ''' Utility class to perform sops CLI actions ''' 128 129 @staticmethod 130 def _add_options(command, env, get_option_value, options): 131 if get_option_value is None: 132 return 133 for option, f in options.items(): 134 v = get_option_value(option) 135 if v is not None: 136 f(v, command, env) 137 138 @staticmethod 139 def get_sops_binary(get_option_value): 140 cmd = get_option_value('sops_binary') if get_option_value else None 141 if cmd is None: 142 cmd = 'sops' 143 return cmd 144 145 @staticmethod 146 def decrypt(encrypted_file, content=None, 147 display=None, decode_output=True, rstrip=True, input_type=None, output_type=None, get_option_value=None, module=None): 148 # Run sops directly, python module is deprecated 149 command = [Sops.get_sops_binary(get_option_value)] 150 env = os.environ.copy() 151 Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS) 152 if input_type is not None: 153 command.extend(["--input-type", input_type]) 154 if output_type is not None: 155 command.extend(["--output-type", output_type]) 156 if content is not None: 157 encrypted_file = '/dev/stdin' 158 command.extend(["--decrypt", encrypted_file]) 159 160 if module: 161 exit_code, output, err = module.run_command(command, environ_update=env, encoding=None, data=content, binary_data=True) 162 else: 163 process = Popen(command, stdin=None if content is None else PIPE, stdout=PIPE, stderr=PIPE, env=env) 164 (output, err) = process.communicate(input=content) 165 exit_code = process.returncode 166 167 if decode_output: 168 # output is binary, we want UTF-8 string 169 output = to_text(output, errors='surrogate_or_strict') 170 # the process output is the decrypted secret; be cautious 171 172 # sops logs always to stderr, as stdout is used for 173 # file content 174 if err and display: 175 display.vvvv(to_text(err, errors='surrogate_or_strict')) 176 177 if exit_code > 0: 178 raise SopsError(encrypted_file, exit_code, err, decryption=True) 179 180 if rstrip: 181 output = output.rstrip() 182 183 return output 184 185 @staticmethod 186 def encrypt(data, display=None, cwd=None, input_type=None, output_type=None, get_option_value=None, module=None): 187 # Run sops directly, python module is deprecated 188 command = [Sops.get_sops_binary(get_option_value)] 189 env = os.environ.copy() 190 Sops._add_options(command, env, get_option_value, GENERAL_OPTIONS) 191 Sops._add_options(command, env, get_option_value, ENCRYPT_OPTIONS) 192 if input_type is not None: 193 command.extend(["--input-type", input_type]) 194 if output_type is not None: 195 command.extend(["--output-type", output_type]) 196 command.extend(["--encrypt", "/dev/stdin"]) 197 198 if module: 199 exit_code, output, err = module.run_command(command, data=data, binary_data=True, cwd=cwd, environ_update=env, encoding=None) 200 else: 201 process = Popen(command, stdin=PIPE, stdout=PIPE, stderr=PIPE, cwd=cwd, env=env) 202 (output, err) = process.communicate(input=data) 203 exit_code = process.returncode 204 205 # sops logs always to stderr, as stdout is used for 206 # file content 207 if err and display: 208 display.vvvv(to_text(err, errors='surrogate_or_strict')) 209 210 if exit_code > 0: 211 raise SopsError('to stdout', exit_code, err, decryption=False) 212 213 return output 214 215 216def get_sops_argument_spec(add_encrypt_specific=False): 217 argument_spec = { 218 'sops_binary': { 219 'type': 'path', 220 }, 221 'aws_profile': { 222 'type': 'str', 223 }, 224 'aws_access_key_id': { 225 'type': 'str', 226 }, 227 'aws_secret_access_key': { 228 'type': 'str', 229 'no_log': True, 230 }, 231 'aws_session_token': { 232 'type': 'str', 233 'no_log': True, 234 }, 235 'config_path': { 236 'type': 'path', 237 }, 238 'enable_local_keyservice': { 239 'type': 'bool', 240 'default': False, 241 }, 242 'keyservice': { 243 'type': 'list', 244 'elements': 'str', 245 }, 246 } 247 if add_encrypt_specific: 248 argument_spec.update({ 249 'kms': { 250 'type': 'list', 251 'elements': 'str', 252 }, 253 'gcp_kms': { 254 'type': 'list', 255 'elements': 'str', 256 }, 257 'azure_kv': { 258 'type': 'list', 259 'elements': 'str', 260 }, 261 'hc_vault_transit': { 262 'type': 'list', 263 'elements': 'str', 264 }, 265 'pgp': { 266 'type': 'list', 267 'elements': 'str', 268 }, 269 'unencrypted_suffix': { 270 'type': 'str', 271 }, 272 'encrypted_suffix': { 273 'type': 'str', 274 }, 275 'unencrypted_regex': { 276 'type': 'str', 277 }, 278 'encrypted_regex': { 279 'type': 'str', 280 }, 281 'encryption_context': { 282 'type': 'list', 283 'elements': 'str', 284 }, 285 'shamir_secret_sharing_threshold': { 286 'type': 'int', 287 'no_log': False, 288 }, 289 }) 290 return argument_spec 291