1#!/usr/local/bin/python3.8 2# -*- coding: utf-8 -*- 3 4# (c) 2020, Felix Fontein <felix@fontein.de> 5# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 6 7from __future__ import absolute_import, division, print_function 8__metaclass__ = type 9 10 11DOCUMENTATION = r''' 12--- 13author: Felix Fontein (@felixfontein) 14module: sops_encrypt 15short_description: Encrypt data with sops 16version_added: '0.1.0' 17description: 18 - Allows to encrypt binary data (Base64 encoded), text data, JSON or YAML data with sops. 19options: 20 path: 21 description: 22 - The sops encrypt file. 23 type: path 24 required: true 25 force: 26 description: 27 - Force rewriting the encrypted file. 28 type: bool 29 default: false 30 content_text: 31 description: 32 - The data to encrypt. Must be a Unicode text. 33 - Please note that the module might not be idempotent if the text can be parsed as JSON or YAML. 34 - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. 35 type: str 36 content_binary: 37 description: 38 - The data to encrypt. Must be L(Base64 encoded,https://en.wikipedia.org/wiki/Base64) binary data. 39 - Please note that the module might not be idempotent if the data can be parsed as JSON or YAML. 40 - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. 41 type: str 42 content_json: 43 description: 44 - The data to encrypt. Must be a JSON dictionary. 45 - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. 46 type: dict 47 content_yaml: 48 description: 49 - The data to encrypt. Must be a YAML dictionary. 50 - Please note that Ansible only allows to pass data that can be represented as a JSON dictionary. 51 - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified. 52 type: dict 53extends_documentation_fragment: 54 - ansible.builtin.files 55 - community.sops.sops 56 - community.sops.sops.encrypt_specific 57seealso: 58 - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup> 59 description: The sops lookup can be used decrypt sops-encrypted files. 60notes: 61 - Supports C(check_mode). 62''' 63 64EXAMPLES = r''' 65- name: Encrypt a secret text 66 community.sops.sops_encrypt: 67 path: text-data.sops 68 content_text: This is a secret text. 69 70- name: Encrypt the contents of a file 71 community.sops.sops_encrypt: 72 path: binary-data.sops 73 content_binary: "{{ lookup('ansible.builtin.file', '/path/to/file', rstrip=false) | b64encode }}" 74 75- name: Encrypt some datastructure as YAML 76 community.sops.sops_encrypt: 77 path: stuff.sops.yaml 78 content_yaml: "{{ result }}" 79''' 80 81RETURN = r''' # ''' 82 83 84import base64 85import json 86import os 87import traceback 88 89from ansible.module_utils.basic import AnsibleModule, missing_required_lib 90from ansible.module_utils.common.text.converters import to_text 91 92from ansible_collections.community.sops.plugins.module_utils.io import write_file 93from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError, get_sops_argument_spec 94 95try: 96 import yaml 97 HAS_YAML = True 98except ImportError: 99 YAML_IMP_ERR = traceback.format_exc() 100 HAS_YAML = False 101 yaml = None 102 103 104def get_data_type(module): 105 if module.params['content_text'] is not None: 106 return 'binary' 107 if module.params['content_binary'] is not None: 108 return 'binary' 109 if module.params['content_json'] is not None: 110 return 'json' 111 if module.params['content_yaml'] is not None: 112 return 'yaml' 113 module.fail_json(msg='Internal error: unknown content type') 114 115 116def compare_encoded_content(module, binary_data, content): 117 if module.params['content_text'] is not None: 118 return content == module.params['content_text'].encode('utf-8') 119 if module.params['content_binary'] is not None: 120 return content == binary_data 121 if module.params['content_json'] is not None: 122 # Compare JSON 123 try: 124 return json.loads(content) == module.params['content_json'] 125 except Exception: 126 # Treat parsing errors as content not equal 127 return False 128 if module.params['content_yaml'] is not None: 129 # Compare YAML 130 try: 131 return yaml.safe_load(content) == module.params['content_yaml'] 132 except Exception: 133 # Treat parsing errors as content not equal 134 return False 135 module.fail_json(msg='Internal error: unknown content type') 136 137 138def get_encoded_type_content(module, binary_data): 139 if module.params['content_text'] is not None: 140 return 'binary', module.params['content_text'].encode('utf-8') 141 if module.params['content_binary'] is not None: 142 return 'binary', binary_data 143 if module.params['content_json'] is not None: 144 return 'json', json.dumps(module.params['content_json']).encode('utf-8') 145 if module.params['content_yaml'] is not None: 146 return 'yaml', yaml.safe_dump(module.params['content_yaml']).encode('utf-8') 147 module.fail_json(msg='Internal error: unknown content type') 148 149 150def main(): 151 argument_spec = dict( 152 path=dict(type='path', required=True), 153 force=dict(type='bool', default=False), 154 content_text=dict(type='str', no_log=True), 155 content_binary=dict(type='str', no_log=True), 156 content_json=dict(type='dict', no_log=True), 157 content_yaml=dict(type='dict', no_log=True), 158 ) 159 argument_spec.update(get_sops_argument_spec(add_encrypt_specific=True)) 160 module = AnsibleModule( 161 argument_spec=argument_spec, 162 mutually_exclusive=[ 163 ('content_text', 'content_binary', 'content_json', 'content_yaml'), 164 ], 165 required_one_of=[ 166 ('content_text', 'content_binary', 'content_json', 'content_yaml'), 167 ], 168 supports_check_mode=True, 169 add_file_common_args=True, 170 ) 171 172 # Check YAML 173 if module.params['content_yaml'] is not None and not HAS_YAML: 174 module.fail_json(msg=missing_required_lib('pyyaml'), exception=YAML_IMP_ERR) 175 176 # Decode binary data 177 binary_data = None 178 if module.params['content_binary'] is not None: 179 try: 180 binary_data = base64.b64decode(module.params['content_binary']) 181 except Exception as e: 182 module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e)) 183 184 path = module.params['path'] 185 directory = os.path.dirname(path) or None 186 changed = False 187 188 def get_option_value(argument_name): 189 return module.params.get(argument_name) 190 191 try: 192 if module.params['force'] or not os.path.exists(path): 193 # Simply encrypt 194 changed = True 195 else: 196 # Change detection: check if encrypted data equals new data 197 decrypted_content = Sops.decrypt( 198 path, decode_output=False, output_type=get_data_type(module), rstrip=False, 199 get_option_value=get_option_value, module=module, 200 ) 201 if not compare_encoded_content(module, binary_data, decrypted_content): 202 changed = True 203 204 if changed and not module.check_mode: 205 input_type, input_data = get_encoded_type_content(module, binary_data) 206 output_type = None 207 if path.endswith('.json'): 208 output_type = 'json' 209 elif path.endswith('.yaml'): 210 output_type = 'yaml' 211 data = Sops.encrypt( 212 data=input_data, cwd=directory, input_type=input_type, output_type=output_type, 213 get_option_value=get_option_value, module=module, 214 ) 215 write_file(module, data) 216 except SopsError as e: 217 module.fail_json(msg=to_text(e)) 218 219 file_args = module.load_file_common_arguments(module.params) 220 changed = module.set_fs_attributes_if_different(file_args, changed) 221 222 module.exit_json(changed=changed) 223 224 225if __name__ == '__main__': 226 main() 227