1# -*- coding: utf-8 -*-
2
3# Copyright: (c) 2021, Felix Fontein <felix@fontein.de>
4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
5
6from __future__ import absolute_import, division, print_function
7__metaclass__ = type
8
9from ansible.errors import AnsibleError, AnsibleFilterError
10from ansible.module_utils.common.text.converters import to_bytes, to_native
11from ansible.utils.display import Display
12
13from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError
14
15
16_VALID_TYPES = set(['binary', 'json', 'yaml', 'dotenv'])
17
18
19def decrypt_filter(data, input_type='yaml', output_type='yaml', sops_binary='sops', rstrip=True, decode_output=True,
20                   aws_profile=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None,
21                   config_path=None, enable_local_keyservice=None, keyservice=None):
22    '''Decrypt sops-encrypted data.'''
23
24    # Check parameters
25    if input_type not in _VALID_TYPES:
26        raise AnsibleFilterError('input_type must be one of {expected}; got "{value}"'.format(
27            expected=', '.join(sorted(_VALID_TYPES)), value=input_type))
28    if output_type not in _VALID_TYPES:
29        raise AnsibleFilterError('output_type must be one of {expected}; got "{value}"'.format(
30            expected=', '.join(sorted(_VALID_TYPES)), value=output_type))
31
32    # Create option value querier
33    def get_option_value(argument_name):
34        if argument_name == 'sops_binary':
35            return sops_binary
36        if argument_name == 'aws_profile':
37            return aws_profile
38        if argument_name == 'aws_access_key_id':
39            return aws_access_key_id
40        if argument_name == 'aws_secret_access_key':
41            return aws_secret_access_key
42        if argument_name == 'aws_session_token':
43            return aws_session_token
44        if argument_name == 'config_path':
45            return config_path
46        if argument_name == 'enable_local_keyservice':
47            return enable_local_keyservice
48        if argument_name == 'keyservice':
49            return keyservice
50        raise AssertionError('internal error: should not be reached')
51
52    # Decode
53    data = to_bytes(data)
54    try:
55        output = Sops.decrypt(
56            None, content=data, display=Display(), rstrip=rstrip, decode_output=decode_output,
57            input_type=input_type, output_type=output_type, get_option_value=get_option_value)
58    except SopsError as e:
59        raise AnsibleFilterError(to_native(e))
60
61    return output
62
63
64class FilterModule(object):
65    '''Ansible jinja2 filters'''
66
67    def filters(self):
68        return {
69            'decrypt': decrypt_filter,
70        }
71