1# dnssec.py
2# DNS extension for automatic GPG key verification
3#
4# Copyright (C) 2012-2018 Red Hat, Inc.
5#
6# This copyrighted material is made available to anyone wishing to use,
7# modify, copy, or redistribute it subject to the terms and conditions of
8# the GNU General Public License v.2, or (at your option) any later version.
9# This program is distributed in the hope that it will be useful, but WITHOUT
10# ANY WARRANTY expressed or implied, including the implied warranties of
11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the GNU General
12# Public License for more details.  You should have received a copy of the
13# GNU General Public License along with this program; if not, write to the
14# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA
15# 02110-1301, USA.  Any Red Hat trademarks that are incorporated in the
16# source code or documentation are not subject to the GNU General Public
17# License and may only be used or replicated with the express permission of
18# Red Hat, Inc.
19#
20
21from __future__ import print_function
22from __future__ import absolute_import
23from __future__ import unicode_literals
24
25from enum import Enum
26import base64
27import hashlib
28import logging
29import re
30
31from dnf.i18n import _
32import dnf.rpm
33import dnf.exceptions
34
35logger = logging.getLogger("dnf")
36
37
38RR_TYPE_OPENPGPKEY = 61
39
40
41class DnssecError(dnf.exceptions.Error):
42    """
43    Exception used in the dnssec module
44    """
45    def __repr__(self):
46        return "<DnssecError, value='{}'>"\
47            .format(self.value if self.value is not None else "Not specified")
48
49
50def email2location(email_address, tag="_openpgpkey"):
51    # type: (str, str) -> str
52    """
53    Implements RFC 7929, section 3
54    https://tools.ietf.org/html/rfc7929#section-3
55    :param email_address:
56    :param tag:
57    :return:
58    """
59    split = email_address.rsplit("@", 1)
60    if len(split) != 2:
61        msg = "Email address must contain exactly one '@' sign."
62        raise DnssecError(msg)
63
64    local = split[0]
65    domain = split[1]
66    hash = hashlib.sha256()
67    hash.update(local.encode('utf-8'))
68    digest = base64.b16encode(hash.digest()[0:28])\
69        .decode("utf-8")\
70        .lower()
71    return digest + "." + tag + "." + domain
72
73
74class Validity(Enum):
75    """
76    Output of the verification algorithm.
77    TODO: this type might be simplified in order to less reflect the underlying DNS layer.
78    TODO: more specifically the variants from 3 to 5 should have more understandable names
79    """
80    VALID = 1
81    REVOKED = 2
82    PROVEN_NONEXISTENCE = 3
83    RESULT_NOT_SECURE = 4
84    BOGUS_RESULT = 5
85    ERROR = 9
86
87
88class NoKey:
89    """
90    This class represents an absence of a key in the cache. It is an expression of non-existence
91    using the Python's type system.
92    """
93    pass
94
95
96class KeyInfo:
97    """
98    Wrapper class for email and associated verification key, where both are represented in
99    form of a string.
100    """
101    def __init__(self, email=None, key=None):
102        self.email = email
103        self.key = key
104
105    def __repr__(self):
106        return 'KeyInfo("{}", "{}...")'.format(self.email, self.key.decode('ascii')[:6])
107
108    @staticmethod
109    def from_rpm_key_object(userid, raw_key):
110        # type: (str, bytes) -> KeyInfo
111        """
112        Since dnf uses different format of the key than the one used in DNS RR, I need to convert
113        the former one into the new one.
114        """
115        input_email = re.search('<(.*@.*)>', userid)
116        if input_email is None:
117            raise DnssecError
118
119        email = input_email.group(1)
120        key = raw_key.decode('ascii').split('\n')
121
122        start = 0
123        stop = 0
124        for i in range(0, len(key)):
125            if key[i] == '-----BEGIN PGP PUBLIC KEY BLOCK-----':
126                start = i
127            if key[i] == '-----END PGP PUBLIC KEY BLOCK-----':
128                stop = i
129
130        cat_key = ''.join(key[start + 2:stop - 1]).encode('ascii')
131        return KeyInfo(email, cat_key)
132
133
134class DNSSECKeyVerification:
135    """
136    The main class when it comes to verification itself. It wraps Unbound context and a cache with
137    already obtained results.
138    """
139
140    # Mapping from email address to b64 encoded public key or NoKey in case of proven nonexistence
141    _cache = {}
142    # type: Dict[str, Union[str, NoKey]]
143
144    @staticmethod
145    def _cache_hit(key_union, input_key_string):
146        # type: (Union[str, NoKey], str) -> Validity
147        """
148        Compare the key in case it was found in the cache.
149        """
150        if key_union == input_key_string:
151            logger.debug("Cache hit, valid key")
152            return Validity.VALID
153        elif key_union is NoKey:
154            logger.debug("Cache hit, proven non-existence")
155            return Validity.PROVEN_NONEXISTENCE
156        else:
157            logger.debug("Key in cache: {}".format(key_union))
158            logger.debug("Input key   : {}".format(input_key_string))
159            return Validity.REVOKED
160
161    @staticmethod
162    def _cache_miss(input_key):
163        # type: (KeyInfo) -> Validity
164        """
165        In case the key was not found in the cache, create an Unbound context and contact the DNS
166        system
167        """
168        try:
169            import unbound
170        except ImportError as e:
171            msg = _("Configuration option 'gpgkey_dns_verification' requires "
172                    "python3-unbound ({})".format(e))
173            raise dnf.exceptions.Error(msg)
174
175        ctx = unbound.ub_ctx()
176        if ctx.set_option("verbosity:", "0") != 0:
177            logger.debug("Unbound context: Failed to set verbosity")
178
179        if ctx.set_option("qname-minimisation:", "yes") != 0:
180            logger.debug("Unbound context: Failed to set qname minimisation")
181
182        if ctx.resolvconf() != 0:
183            logger.debug("Unbound context: Failed to read resolv.conf")
184
185        if ctx.add_ta_file("/var/lib/unbound/root.key") != 0:
186            logger.debug("Unbound context: Failed to add trust anchor file")
187
188        status, result = ctx.resolve(email2location(input_key.email),
189                                     RR_TYPE_OPENPGPKEY, unbound.RR_CLASS_IN)
190        if status != 0:
191            logger.debug("Communication with DNS servers failed")
192            return Validity.ERROR
193        if result.bogus:
194            logger.debug("DNSSEC signatures are wrong")
195            return Validity.BOGUS_RESULT
196        if not result.secure:
197            logger.debug("Result is not secured with DNSSEC")
198            return Validity.RESULT_NOT_SECURE
199        if result.nxdomain or (result.rcode == unbound.RCODE_NOERROR and not result.havedata):
200            logger.debug("Non-existence of this record was proven by DNSSEC")
201            return Validity.PROVEN_NONEXISTENCE
202        if not result.havedata:
203            # TODO: This is weird result, but there is no way to perform validation, so just return
204            # an error
205            # Should handle only SERVFAIL, REFUSED and similar rcodes
206            logger.debug("Unknown error in DNS communication: {}".format(result.rcode_str))
207            return Validity.ERROR
208        else:
209            data = result.data.as_raw_data()[0]
210            dns_data_b64 = base64.b64encode(data)
211            if dns_data_b64 == input_key.key:
212                return Validity.VALID
213            else:
214                # In case it is different, print the keys for further examination in debug mode
215                logger.debug("Key from DNS: {}".format(dns_data_b64))
216                logger.debug("Input key   : {}".format(input_key.key))
217                return Validity.REVOKED
218
219    @staticmethod
220    def verify(input_key):
221        # type: (KeyInfo) -> Validity
222        """
223        Public API. Use this method to verify a KeyInfo object.
224        """
225        logger.debug("Running verification for key with id: {}".format(input_key.email))
226        key_union = DNSSECKeyVerification._cache.get(input_key.email)
227        if key_union is not None:
228            return DNSSECKeyVerification._cache_hit(key_union, input_key.key)
229        else:
230            result = DNSSECKeyVerification._cache_miss(input_key)
231            if result == Validity.VALID:
232                DNSSECKeyVerification._cache[input_key.email] = input_key.key
233            elif result == Validity.PROVEN_NONEXISTENCE:
234                DNSSECKeyVerification._cache[input_key.email] = NoKey()
235            return result
236
237
238def nice_user_msg(ki, v):
239    # type: (KeyInfo, Validity) -> str
240    """
241    Inform the user about key validity in a human readable way.
242    """
243    prefix = _("DNSSEC extension: Key for user ") + ki.email + " "
244    if v == Validity.VALID:
245        return prefix + _("is valid.")
246    else:
247        return prefix + _("has unknown status.")
248
249
250def any_msg(m):
251    # type: (str) -> str
252    """
253    Label any given message with DNSSEC extension tag
254    """
255    return _("DNSSEC extension: ") + m
256
257
258class RpmImportedKeys:
259    """
260    Wrapper around keys, that are imported in the RPM database.
261
262    The keys are stored in packages with name gpg-pubkey, where the version and
263    release is different for each of them. The key content itself is stored as
264    an ASCII armored string in the package description, so it needs to be parsed
265    before it can be used.
266    """
267    @staticmethod
268    def _query_db_for_gpg_keys():
269        # type: () -> List[KeyInfo]
270        # TODO: base.conf.installroot ?? -----------------------\
271        transaction_set = dnf.rpm.transaction.TransactionWrapper()
272        packages = transaction_set.dbMatch("name", "gpg-pubkey")
273        return_list = []
274        for pkg in packages:
275            packager = dnf.rpm.getheader(pkg, 'packager')
276            email = re.search('<(.*@.*)>', packager).group(1)
277            description = dnf.rpm.getheader(pkg, 'description')
278            key_lines = description.split('\n')[3:-3]
279            key_str = ''.join(key_lines)
280            return_list += [KeyInfo(email, key_str.encode('ascii'))]
281
282        return return_list
283
284    @staticmethod
285    def check_imported_keys_validity():
286        keys = RpmImportedKeys._query_db_for_gpg_keys()
287        logger.info(any_msg(_("Testing already imported keys for their validity.")))
288        for key in keys:
289            try:
290                result = DNSSECKeyVerification.verify(key)
291            except DnssecError as e:
292                # Errors in this exception should not be fatal, print it and just continue
293                logger.warning("DNSSEC extension error (email={}): {}"
294                             .format(key.email, e.value))
295                continue
296            # TODO: remove revoked keys automatically and possibly ask user to confirm
297            if result == Validity.VALID:
298                logger.debug(any_msg("GPG Key {} is valid".format(key.email)))
299                pass
300            elif result == Validity.PROVEN_NONEXISTENCE:
301                logger.debug(any_msg("GPG Key {} does not support DNS"
302                                    " verification".format(key.email)))
303            elif result == Validity.BOGUS_RESULT:
304                logger.info(any_msg("GPG Key {} could not be verified, because DNSSEC signatures"
305                                    " are bogus. Possible causes: wrong configuration of the DNS"
306                                    " server, MITM attack".format(key.email)))
307            elif result == Validity.REVOKED:
308                logger.info(any_msg("GPG Key {} has been revoked and should"
309                                    " be removed immediately".format(key.email)))
310            else:
311                logger.debug(any_msg("GPG Key {} could not be tested".format(key.email)))
312