1# -*- coding: utf-8 -*- 2 3# Copyright: (c) 2021, Felix Fontein <felix@fontein.de> 4# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 5 6from __future__ import absolute_import, division, print_function 7__metaclass__ = type 8 9from ansible.errors import AnsibleError, AnsibleFilterError 10from ansible.module_utils.common.text.converters import to_bytes, to_native 11from ansible.utils.display import Display 12 13from ansible_collections.community.sops.plugins.module_utils.sops import Sops, SopsError 14 15 16_VALID_TYPES = set(['binary', 'json', 'yaml', 'dotenv']) 17 18 19def decrypt_filter(data, input_type='yaml', output_type='yaml', sops_binary='sops', rstrip=True, decode_output=True, 20 aws_profile=None, aws_access_key_id=None, aws_secret_access_key=None, aws_session_token=None, 21 config_path=None, enable_local_keyservice=None, keyservice=None): 22 '''Decrypt sops-encrypted data.''' 23 24 # Check parameters 25 if input_type not in _VALID_TYPES: 26 raise AnsibleFilterError('input_type must be one of {expected}; got "{value}"'.format( 27 expected=', '.join(sorted(_VALID_TYPES)), value=input_type)) 28 if output_type not in _VALID_TYPES: 29 raise AnsibleFilterError('output_type must be one of {expected}; got "{value}"'.format( 30 expected=', '.join(sorted(_VALID_TYPES)), value=output_type)) 31 32 # Create option value querier 33 def get_option_value(argument_name): 34 if argument_name == 'sops_binary': 35 return sops_binary 36 if argument_name == 'aws_profile': 37 return aws_profile 38 if argument_name == 'aws_access_key_id': 39 return aws_access_key_id 40 if argument_name == 'aws_secret_access_key': 41 return aws_secret_access_key 42 if argument_name == 'aws_session_token': 43 return aws_session_token 44 if argument_name == 'config_path': 45 return config_path 46 if argument_name == 'enable_local_keyservice': 47 return enable_local_keyservice 48 if argument_name == 'keyservice': 49 return keyservice 50 raise AssertionError('internal error: should not be reached') 51 52 # Decode 53 data = to_bytes(data) 54 try: 55 output = Sops.decrypt( 56 None, content=data, display=Display(), rstrip=rstrip, decode_output=decode_output, 57 input_type=input_type, output_type=output_type, get_option_value=get_option_value) 58 except SopsError as e: 59 raise AnsibleFilterError(to_native(e)) 60 61 return output 62 63 64class FilterModule(object): 65 '''Ansible jinja2 filters''' 66 67 def filters(self): 68 return { 69 'decrypt': decrypt_filter, 70 } 71