1#!/usr/bin/python 2# -*- coding: utf-8 -*- 3 4# Ansible module to import third party repo keys to your rpm db 5# Copyright: (c) 2013, Héctor Acosta <hector.acosta@gazzang.com> 6 7# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt) 8 9from __future__ import absolute_import, division, print_function 10__metaclass__ = type 11 12 13DOCUMENTATION = ''' 14--- 15module: rpm_key 16author: 17 - Hector Acosta (@hacosta) <hector.acosta@gazzang.com> 18short_description: Adds or removes a gpg key from the rpm db 19description: 20 - Adds or removes (rpm --import) a gpg key to your rpm database. 21version_added: "1.3" 22options: 23 key: 24 description: 25 - Key that will be modified. Can be a url, a file on the managed node, or a keyid if the key 26 already exists in the database. 27 required: true 28 state: 29 description: 30 - If the key will be imported or removed from the rpm db. 31 default: present 32 choices: [ absent, present ] 33 validate_certs: 34 description: 35 - If C(no) and the C(key) is a url starting with https, SSL certificates will not be validated. 36 - This should only be used on personally controlled sites using self-signed certificates. 37 type: bool 38 default: 'yes' 39 fingerprint: 40 description: 41 - The long-form fingerprint of the key being imported. 42 - This will be used to verify the specified key. 43 type: str 44 version_added: 2.9 45notes: 46 - Supports C(check_mode). 47''' 48 49EXAMPLES = ''' 50- name: Import a key from a url 51 ansible.builtin.rpm_key: 52 state: present 53 key: http://apt.sw.be/RPM-GPG-KEY.dag.txt 54 55- name: Import a key from a file 56 ansible.builtin.rpm_key: 57 state: present 58 key: /path/to/key.gpg 59 60- name: Ensure a key is not present in the db 61 ansible.builtin.rpm_key: 62 state: absent 63 key: DEADB33F 64 65- name: Verify the key, using a fingerprint, before import 66 ansible.builtin.rpm_key: 67 key: /path/to/RPM-GPG-KEY.dag.txt 68 fingerprint: EBC6 E12C 62B1 C734 026B 2122 A20E 5214 6B8D 79E6 69''' 70 71RETURN = r'''#''' 72 73import re 74import os.path 75import tempfile 76 77# import module snippets 78from ansible.module_utils.basic import AnsibleModule 79from ansible.module_utils.urls import fetch_url 80from ansible.module_utils._text import to_native 81 82 83def is_pubkey(string): 84 """Verifies if string is a pubkey""" 85 pgp_regex = ".*?(-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----).*" 86 return bool(re.match(pgp_regex, to_native(string, errors='surrogate_or_strict'), re.DOTALL)) 87 88 89class RpmKey(object): 90 91 def __init__(self, module): 92 # If the key is a url, we need to check if it's present to be idempotent, 93 # to do that, we need to check the keyid, which we can get from the armor. 94 keyfile = None 95 should_cleanup_keyfile = False 96 self.module = module 97 self.rpm = self.module.get_bin_path('rpm', True) 98 state = module.params['state'] 99 key = module.params['key'] 100 fingerprint = module.params['fingerprint'] 101 if fingerprint: 102 fingerprint = fingerprint.replace(' ', '').upper() 103 104 self.gpg = self.module.get_bin_path('gpg') 105 if not self.gpg: 106 self.gpg = self.module.get_bin_path('gpg2', required=True) 107 108 if '://' in key: 109 keyfile = self.fetch_key(key) 110 keyid = self.getkeyid(keyfile) 111 should_cleanup_keyfile = True 112 elif self.is_keyid(key): 113 keyid = key 114 elif os.path.isfile(key): 115 keyfile = key 116 keyid = self.getkeyid(keyfile) 117 else: 118 self.module.fail_json(msg="Not a valid key %s" % key) 119 keyid = self.normalize_keyid(keyid) 120 121 if state == 'present': 122 if self.is_key_imported(keyid): 123 module.exit_json(changed=False) 124 else: 125 if not keyfile: 126 self.module.fail_json(msg="When importing a key, a valid file must be given") 127 if fingerprint: 128 has_fingerprint = self.getfingerprint(keyfile) 129 if fingerprint != has_fingerprint: 130 self.module.fail_json( 131 msg="The specified fingerprint, '%s', does not match the key fingerprint '%s'" % (fingerprint, has_fingerprint) 132 ) 133 self.import_key(keyfile) 134 if should_cleanup_keyfile: 135 self.module.cleanup(keyfile) 136 module.exit_json(changed=True) 137 else: 138 if self.is_key_imported(keyid): 139 self.drop_key(keyid) 140 module.exit_json(changed=True) 141 else: 142 module.exit_json(changed=False) 143 144 def fetch_key(self, url): 145 """Downloads a key from url, returns a valid path to a gpg key""" 146 rsp, info = fetch_url(self.module, url) 147 if info['status'] != 200: 148 self.module.fail_json(msg="failed to fetch key at %s , error was: %s" % (url, info['msg'])) 149 150 key = rsp.read() 151 if not is_pubkey(key): 152 self.module.fail_json(msg="Not a public key: %s" % url) 153 tmpfd, tmpname = tempfile.mkstemp() 154 self.module.add_cleanup_file(tmpname) 155 tmpfile = os.fdopen(tmpfd, "w+b") 156 tmpfile.write(key) 157 tmpfile.close() 158 return tmpname 159 160 def normalize_keyid(self, keyid): 161 """Ensure a keyid doesn't have a leading 0x, has leading or trailing whitespace, and make sure is uppercase""" 162 ret = keyid.strip().upper() 163 if ret.startswith('0x'): 164 return ret[2:] 165 elif ret.startswith('0X'): 166 return ret[2:] 167 else: 168 return ret 169 170 def getkeyid(self, keyfile): 171 stdout, stderr = self.execute_command([self.gpg, '--no-tty', '--batch', '--with-colons', '--fixed-list-mode', keyfile]) 172 for line in stdout.splitlines(): 173 line = line.strip() 174 if line.startswith('pub:'): 175 return line.split(':')[4] 176 177 self.module.fail_json(msg="Unexpected gpg output") 178 179 def getfingerprint(self, keyfile): 180 stdout, stderr = self.execute_command([ 181 self.gpg, '--no-tty', '--batch', '--with-colons', 182 '--fixed-list-mode', '--with-fingerprint', keyfile 183 ]) 184 for line in stdout.splitlines(): 185 line = line.strip() 186 if line.startswith('fpr:'): 187 # As mentioned here, 188 # 189 # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob_plain;f=doc/DETAILS 190 # 191 # The description of the `fpr` field says 192 # 193 # "fpr :: Fingerprint (fingerprint is in field 10)" 194 # 195 return line.split(':')[9] 196 197 self.module.fail_json(msg="Unexpected gpg output") 198 199 def is_keyid(self, keystr): 200 """Verifies if a key, as provided by the user is a keyid""" 201 return re.match('(0x)?[0-9a-f]{8}', keystr, flags=re.IGNORECASE) 202 203 def execute_command(self, cmd): 204 rc, stdout, stderr = self.module.run_command(cmd, use_unsafe_shell=True) 205 if rc != 0: 206 self.module.fail_json(msg=stderr) 207 return stdout, stderr 208 209 def is_key_imported(self, keyid): 210 cmd = self.rpm + ' -q gpg-pubkey' 211 rc, stdout, stderr = self.module.run_command(cmd) 212 if rc != 0: # No key is installed on system 213 return False 214 cmd += ' --qf "%{description}" | ' + self.gpg + ' --no-tty --batch --with-colons --fixed-list-mode -' 215 stdout, stderr = self.execute_command(cmd) 216 for line in stdout.splitlines(): 217 if keyid in line.split(':')[4]: 218 return True 219 return False 220 221 def import_key(self, keyfile): 222 if not self.module.check_mode: 223 self.execute_command([self.rpm, '--import', keyfile]) 224 225 def drop_key(self, keyid): 226 if not self.module.check_mode: 227 self.execute_command([self.rpm, '--erase', '--allmatches', "gpg-pubkey-%s" % keyid[-8:].lower()]) 228 229 230def main(): 231 module = AnsibleModule( 232 argument_spec=dict( 233 state=dict(type='str', default='present', choices=['absent', 'present']), 234 key=dict(type='str', required=True), 235 fingerprint=dict(type='str'), 236 validate_certs=dict(type='bool', default=True), 237 ), 238 supports_check_mode=True, 239 ) 240 241 RpmKey(module) 242 243 244if __name__ == '__main__': 245 main() 246