1#!/usr/bin/python
2# -*- coding: utf-8 -*-
3
4# Ansible module to import third party repo keys to your rpm db
5# Copyright: (c) 2013, Héctor Acosta <hector.acosta@gazzang.com>
6
7# GNU General Public License v3.0+ (see COPYING or https://www.gnu.org/licenses/gpl-3.0.txt)
8
9from __future__ import absolute_import, division, print_function
10__metaclass__ = type
11
12
13DOCUMENTATION = '''
14---
15module: rpm_key
16author:
17  - Hector Acosta (@hacosta) <hector.acosta@gazzang.com>
18short_description: Adds or removes a gpg key from the rpm db
19description:
20  - Adds or removes (rpm --import) a gpg key to your rpm database.
21version_added: "1.3"
22options:
23    key:
24      description:
25        - Key that will be modified. Can be a url, a file on the managed node, or a keyid if the key
26          already exists in the database.
27      required: true
28    state:
29      description:
30        - If the key will be imported or removed from the rpm db.
31      default: present
32      choices: [ absent, present ]
33    validate_certs:
34      description:
35        - If C(no) and the C(key) is a url starting with https, SSL certificates will not be validated.
36        - This should only be used on personally controlled sites using self-signed certificates.
37      type: bool
38      default: 'yes'
39    fingerprint:
40      description:
41        - The long-form fingerprint of the key being imported.
42        - This will be used to verify the specified key.
43      type: str
44      version_added: 2.9
45notes:
46  - Supports C(check_mode).
47'''
48
49EXAMPLES = '''
50- name: Import a key from a url
51  ansible.builtin.rpm_key:
52    state: present
53    key: http://apt.sw.be/RPM-GPG-KEY.dag.txt
54
55- name: Import a key from a file
56  ansible.builtin.rpm_key:
57    state: present
58    key: /path/to/key.gpg
59
60- name: Ensure a key is not present in the db
61  ansible.builtin.rpm_key:
62    state: absent
63    key: DEADB33F
64
65- name: Verify the key, using a fingerprint, before import
66  ansible.builtin.rpm_key:
67    key: /path/to/RPM-GPG-KEY.dag.txt
68    fingerprint: EBC6 E12C 62B1 C734 026B  2122 A20E 5214 6B8D 79E6
69'''
70
71RETURN = r'''#'''
72
73import re
74import os.path
75import tempfile
76
77# import module snippets
78from ansible.module_utils.basic import AnsibleModule
79from ansible.module_utils.urls import fetch_url
80from ansible.module_utils._text import to_native
81
82
83def is_pubkey(string):
84    """Verifies if string is a pubkey"""
85    pgp_regex = ".*?(-----BEGIN PGP PUBLIC KEY BLOCK-----.*?-----END PGP PUBLIC KEY BLOCK-----).*"
86    return bool(re.match(pgp_regex, to_native(string, errors='surrogate_or_strict'), re.DOTALL))
87
88
89class RpmKey(object):
90
91    def __init__(self, module):
92        # If the key is a url, we need to check if it's present to be idempotent,
93        # to do that, we need to check the keyid, which we can get from the armor.
94        keyfile = None
95        should_cleanup_keyfile = False
96        self.module = module
97        self.rpm = self.module.get_bin_path('rpm', True)
98        state = module.params['state']
99        key = module.params['key']
100        fingerprint = module.params['fingerprint']
101        if fingerprint:
102            fingerprint = fingerprint.replace(' ', '').upper()
103
104        self.gpg = self.module.get_bin_path('gpg')
105        if not self.gpg:
106            self.gpg = self.module.get_bin_path('gpg2', required=True)
107
108        if '://' in key:
109            keyfile = self.fetch_key(key)
110            keyid = self.getkeyid(keyfile)
111            should_cleanup_keyfile = True
112        elif self.is_keyid(key):
113            keyid = key
114        elif os.path.isfile(key):
115            keyfile = key
116            keyid = self.getkeyid(keyfile)
117        else:
118            self.module.fail_json(msg="Not a valid key %s" % key)
119        keyid = self.normalize_keyid(keyid)
120
121        if state == 'present':
122            if self.is_key_imported(keyid):
123                module.exit_json(changed=False)
124            else:
125                if not keyfile:
126                    self.module.fail_json(msg="When importing a key, a valid file must be given")
127                if fingerprint:
128                    has_fingerprint = self.getfingerprint(keyfile)
129                    if fingerprint != has_fingerprint:
130                        self.module.fail_json(
131                            msg="The specified fingerprint, '%s', does not match the key fingerprint '%s'" % (fingerprint, has_fingerprint)
132                        )
133                self.import_key(keyfile)
134                if should_cleanup_keyfile:
135                    self.module.cleanup(keyfile)
136                module.exit_json(changed=True)
137        else:
138            if self.is_key_imported(keyid):
139                self.drop_key(keyid)
140                module.exit_json(changed=True)
141            else:
142                module.exit_json(changed=False)
143
144    def fetch_key(self, url):
145        """Downloads a key from url, returns a valid path to a gpg key"""
146        rsp, info = fetch_url(self.module, url)
147        if info['status'] != 200:
148            self.module.fail_json(msg="failed to fetch key at %s , error was: %s" % (url, info['msg']))
149
150        key = rsp.read()
151        if not is_pubkey(key):
152            self.module.fail_json(msg="Not a public key: %s" % url)
153        tmpfd, tmpname = tempfile.mkstemp()
154        self.module.add_cleanup_file(tmpname)
155        tmpfile = os.fdopen(tmpfd, "w+b")
156        tmpfile.write(key)
157        tmpfile.close()
158        return tmpname
159
160    def normalize_keyid(self, keyid):
161        """Ensure a keyid doesn't have a leading 0x, has leading or trailing whitespace, and make sure is uppercase"""
162        ret = keyid.strip().upper()
163        if ret.startswith('0x'):
164            return ret[2:]
165        elif ret.startswith('0X'):
166            return ret[2:]
167        else:
168            return ret
169
170    def getkeyid(self, keyfile):
171        stdout, stderr = self.execute_command([self.gpg, '--no-tty', '--batch', '--with-colons', '--fixed-list-mode', keyfile])
172        for line in stdout.splitlines():
173            line = line.strip()
174            if line.startswith('pub:'):
175                return line.split(':')[4]
176
177        self.module.fail_json(msg="Unexpected gpg output")
178
179    def getfingerprint(self, keyfile):
180        stdout, stderr = self.execute_command([
181            self.gpg, '--no-tty', '--batch', '--with-colons',
182            '--fixed-list-mode', '--with-fingerprint', keyfile
183        ])
184        for line in stdout.splitlines():
185            line = line.strip()
186            if line.startswith('fpr:'):
187                # As mentioned here,
188                #
189                # https://git.gnupg.org/cgi-bin/gitweb.cgi?p=gnupg.git;a=blob_plain;f=doc/DETAILS
190                #
191                # The description of the `fpr` field says
192                #
193                # "fpr :: Fingerprint (fingerprint is in field 10)"
194                #
195                return line.split(':')[9]
196
197        self.module.fail_json(msg="Unexpected gpg output")
198
199    def is_keyid(self, keystr):
200        """Verifies if a key, as provided by the user is a keyid"""
201        return re.match('(0x)?[0-9a-f]{8}', keystr, flags=re.IGNORECASE)
202
203    def execute_command(self, cmd):
204        rc, stdout, stderr = self.module.run_command(cmd, use_unsafe_shell=True)
205        if rc != 0:
206            self.module.fail_json(msg=stderr)
207        return stdout, stderr
208
209    def is_key_imported(self, keyid):
210        cmd = self.rpm + ' -q  gpg-pubkey'
211        rc, stdout, stderr = self.module.run_command(cmd)
212        if rc != 0:  # No key is installed on system
213            return False
214        cmd += ' --qf "%{description}" | ' + self.gpg + ' --no-tty --batch --with-colons --fixed-list-mode -'
215        stdout, stderr = self.execute_command(cmd)
216        for line in stdout.splitlines():
217            if keyid in line.split(':')[4]:
218                return True
219        return False
220
221    def import_key(self, keyfile):
222        if not self.module.check_mode:
223            self.execute_command([self.rpm, '--import', keyfile])
224
225    def drop_key(self, keyid):
226        if not self.module.check_mode:
227            self.execute_command([self.rpm, '--erase', '--allmatches', "gpg-pubkey-%s" % keyid[-8:].lower()])
228
229
230def main():
231    module = AnsibleModule(
232        argument_spec=dict(
233            state=dict(type='str', default='present', choices=['absent', 'present']),
234            key=dict(type='str', required=True),
235            fingerprint=dict(type='str'),
236            validate_certs=dict(type='bool', default=True),
237        ),
238        supports_check_mode=True,
239    )
240
241    RpmKey(module)
242
243
244if __name__ == '__main__':
245    main()
246