1#!/usr/local/bin/python3.8
2# -*- coding: utf-8 -*-
3
4# (c) 2020, Felix Fontein <felix@fontein.de>
5# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
6
7from __future__ import absolute_import, division, print_function
8__metaclass__ = type
9
10
11DOCUMENTATION = r'''
12---
13author: Felix Fontein (@felixfontein)
14module: sops_encrypt
15short_description: Encrypt data with sops
16version_added: '0.1.0'
17description:
18  - Allows to encrypt binary data (Base64 encoded), text data, JSON or YAML data with sops.
19options:
20  path:
21    description:
22      - The sops encrypt file.
23    type: path
24    required: true
25  force:
26    description:
27      - Force rewriting the encrypted file.
28    type: bool
29    default: false
30  content_text:
31    description:
32      - The data to encrypt. Must be a Unicode text.
33      - Please note that the module might not be idempotent if the text can be parsed as JSON or YAML.
34      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
35    type: str
36  content_binary:
37    description:
38      - The data to encrypt. Must be L(Base64 encoded,https://en.wikipedia.org/wiki/Base64) binary data.
39      - Please note that the module might not be idempotent if the data can be parsed as JSON or YAML.
40      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
41    type: str
42  content_json:
43    description:
44      - The data to encrypt. Must be a JSON dictionary.
45      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
46    type: dict
47  content_yaml:
48    description:
49      - The data to encrypt. Must be a YAML dictionary.
50      - Please note that Ansible only allows to pass data that can be represented as a JSON dictionary.
51      - Exactly one of I(content_text), I(content_binary), I(content_json) and I(content_yaml) must be specified.
52    type: dict
53extends_documentation_fragment:
54  - ansible.builtin.files
55  - community.sops.sops
56  - community.sops.sops.encrypt_specific
57seealso:
58  - ref: community.sops.sops lookup <ansible_collections.community.sops.sops_lookup>
59    description: The sops lookup can be used decrypt sops-encrypted files.
60notes:
61  - Supports C(check_mode).
62'''
63
64EXAMPLES = r'''
65- name: Encrypt a secret text
66  community.sops.sops_encrypt:
67    path: text-data.sops
68    content_text: This is a secret text.
69
70- name: Encrypt the contents of a file
71  community.sops.sops_encrypt:
72    path: binary-data.sops
73    content_binary: "{{ lookup('ansible.builtin.file', '/path/to/file', rstrip=false) | b64encode }}"
74
75- name: Encrypt some datastructure as YAML
76  community.sops.sops_encrypt:
77    path: stuff.sops.yaml
78    content_yaml: "{{ result }}"
79'''
80
81RETURN = r''' # '''
82
83
84import base64
85import json
86import os
87import traceback
88
89from ansible.module_utils.basic import AnsibleModule, missing_required_lib
90from ansible.module_utils.common.text.converters import to_text
91
92from ansible_collections.community.sops.plugins.module_utils.io import write_file
93from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError, get_sops_argument_spec
94
95try:
96    import yaml
97    HAS_YAML = True
98except ImportError:
99    YAML_IMP_ERR = traceback.format_exc()
100    HAS_YAML = False
101    yaml = None
102
103
104def get_data_type(module):
105    if module.params['content_text'] is not None:
106        return 'binary'
107    if module.params['content_binary'] is not None:
108        return 'binary'
109    if module.params['content_json'] is not None:
110        return 'json'
111    if module.params['content_yaml'] is not None:
112        return 'yaml'
113    module.fail_json(msg='Internal error: unknown content type')
114
115
116def compare_encoded_content(module, binary_data, content):
117    if module.params['content_text'] is not None:
118        return content == module.params['content_text'].encode('utf-8')
119    if module.params['content_binary'] is not None:
120        return content == binary_data
121    if module.params['content_json'] is not None:
122        # Compare JSON
123        try:
124            return json.loads(content) == module.params['content_json']
125        except Exception:
126            # Treat parsing errors as content not equal
127            return False
128    if module.params['content_yaml'] is not None:
129        # Compare YAML
130        try:
131            return yaml.safe_load(content) == module.params['content_yaml']
132        except Exception:
133            # Treat parsing errors as content not equal
134            return False
135    module.fail_json(msg='Internal error: unknown content type')
136
137
138def get_encoded_type_content(module, binary_data):
139    if module.params['content_text'] is not None:
140        return 'binary', module.params['content_text'].encode('utf-8')
141    if module.params['content_binary'] is not None:
142        return 'binary', binary_data
143    if module.params['content_json'] is not None:
144        return 'json', json.dumps(module.params['content_json']).encode('utf-8')
145    if module.params['content_yaml'] is not None:
146        return 'yaml', yaml.safe_dump(module.params['content_yaml']).encode('utf-8')
147    module.fail_json(msg='Internal error: unknown content type')
148
149
150def main():
151    argument_spec = dict(
152        path=dict(type='path', required=True),
153        force=dict(type='bool', default=False),
154        content_text=dict(type='str', no_log=True),
155        content_binary=dict(type='str', no_log=True),
156        content_json=dict(type='dict', no_log=True),
157        content_yaml=dict(type='dict', no_log=True),
158    )
159    argument_spec.update(get_sops_argument_spec(add_encrypt_specific=True))
160    module = AnsibleModule(
161        argument_spec=argument_spec,
162        mutually_exclusive=[
163            ('content_text', 'content_binary', 'content_json', 'content_yaml'),
164        ],
165        required_one_of=[
166            ('content_text', 'content_binary', 'content_json', 'content_yaml'),
167        ],
168        supports_check_mode=True,
169        add_file_common_args=True,
170    )
171
172    # Check YAML
173    if module.params['content_yaml'] is not None and not HAS_YAML:
174        module.fail_json(msg=missing_required_lib('pyyaml'), exception=YAML_IMP_ERR)
175
176    # Decode binary data
177    binary_data = None
178    if module.params['content_binary'] is not None:
179        try:
180            binary_data = base64.b64decode(module.params['content_binary'])
181        except Exception as e:
182            module.fail_json(msg='Cannot decode Base64 encoded data: {0}'.format(e))
183
184    path = module.params['path']
185    directory = os.path.dirname(path) or None
186    changed = False
187
188    def get_option_value(argument_name):
189        return module.params.get(argument_name)
190
191    try:
192        if module.params['force'] or not os.path.exists(path):
193            # Simply encrypt
194            changed = True
195        else:
196            # Change detection: check if encrypted data equals new data
197            decrypted_content = Sops.decrypt(
198                path, decode_output=False, output_type=get_data_type(module), rstrip=False,
199                get_option_value=get_option_value, module=module,
200            )
201            if not compare_encoded_content(module, binary_data, decrypted_content):
202                changed = True
203
204        if changed and not module.check_mode:
205            input_type, input_data = get_encoded_type_content(module, binary_data)
206            output_type = None
207            if path.endswith('.json'):
208                output_type = 'json'
209            elif path.endswith('.yaml'):
210                output_type = 'yaml'
211            data = Sops.encrypt(
212                data=input_data, cwd=directory, input_type=input_type, output_type=output_type,
213                get_option_value=get_option_value, module=module,
214            )
215            write_file(module, data)
216    except SopsError as e:
217        module.fail_json(msg=to_text(e))
218
219    file_args = module.load_file_common_arguments(module.params)
220    changed = module.set_fs_attributes_if_different(file_args, changed)
221
222    module.exit_json(changed=changed)
223
224
225if __name__ == '__main__':
226    main()
227