1# dnssec.py 2# DNS extension for automatic GPG key verification 3# 4# Copyright (C) 2012-2018 Red Hat, Inc. 5# 6# This copyrighted material is made available to anyone wishing to use, 7# modify, copy, or redistribute it subject to the terms and conditions of 8# the GNU General Public License v.2, or (at your option) any later version. 9# This program is distributed in the hope that it will be useful, but WITHOUT 10# ANY WARRANTY expressed or implied, including the implied warranties of 11# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU General 12# Public License for more details. You should have received a copy of the 13# GNU General Public License along with this program; if not, write to the 14# Free Software Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston, MA 15# 02110-1301, USA. Any Red Hat trademarks that are incorporated in the 16# source code or documentation are not subject to the GNU General Public 17# License and may only be used or replicated with the express permission of 18# Red Hat, Inc. 19# 20 21from __future__ import print_function 22from __future__ import absolute_import 23from __future__ import unicode_literals 24 25from enum import Enum 26import base64 27import hashlib 28import logging 29import re 30 31from dnf.i18n import _ 32import dnf.rpm 33import dnf.exceptions 34 35logger = logging.getLogger("dnf") 36 37 38RR_TYPE_OPENPGPKEY = 61 39 40 41class DnssecError(dnf.exceptions.Error): 42 """ 43 Exception used in the dnssec module 44 """ 45 def __repr__(self): 46 return "<DnssecError, value='{}'>"\ 47 .format(self.value if self.value is not None else "Not specified") 48 49 50def email2location(email_address, tag="_openpgpkey"): 51 # type: (str, str) -> str 52 """ 53 Implements RFC 7929, section 3 54 https://tools.ietf.org/html/rfc7929#section-3 55 :param email_address: 56 :param tag: 57 :return: 58 """ 59 split = email_address.rsplit("@", 1) 60 if len(split) != 2: 61 msg = "Email address must contain exactly one '@' sign." 62 raise DnssecError(msg) 63 64 local = split[0] 65 domain = split[1] 66 hash = hashlib.sha256() 67 hash.update(local.encode('utf-8')) 68 digest = base64.b16encode(hash.digest()[0:28])\ 69 .decode("utf-8")\ 70 .lower() 71 return digest + "." + tag + "." + domain 72 73 74class Validity(Enum): 75 """ 76 Output of the verification algorithm. 77 TODO: this type might be simplified in order to less reflect the underlying DNS layer. 78 TODO: more specifically the variants from 3 to 5 should have more understandable names 79 """ 80 VALID = 1 81 REVOKED = 2 82 PROVEN_NONEXISTENCE = 3 83 RESULT_NOT_SECURE = 4 84 BOGUS_RESULT = 5 85 ERROR = 9 86 87 88class NoKey: 89 """ 90 This class represents an absence of a key in the cache. It is an expression of non-existence 91 using the Python's type system. 92 """ 93 pass 94 95 96class KeyInfo: 97 """ 98 Wrapper class for email and associated verification key, where both are represented in 99 form of a string. 100 """ 101 def __init__(self, email=None, key=None): 102 self.email = email 103 self.key = key 104 105 def __repr__(self): 106 return 'KeyInfo("{}", "{}...")'.format(self.email, self.key.decode('ascii')[:6]) 107 108 @staticmethod 109 def from_rpm_key_object(userid, raw_key): 110 # type: (str, bytes) -> KeyInfo 111 """ 112 Since dnf uses different format of the key than the one used in DNS RR, I need to convert 113 the former one into the new one. 114 """ 115 input_email = re.search('<(.*@.*)>', userid) 116 if input_email is None: 117 raise DnssecError 118 119 email = input_email.group(1) 120 key = raw_key.decode('ascii').split('\n') 121 122 start = 0 123 stop = 0 124 for i in range(0, len(key)): 125 if key[i] == '-----BEGIN PGP PUBLIC KEY BLOCK-----': 126 start = i 127 if key[i] == '-----END PGP PUBLIC KEY BLOCK-----': 128 stop = i 129 130 cat_key = ''.join(key[start + 2:stop - 1]).encode('ascii') 131 return KeyInfo(email, cat_key) 132 133 134class DNSSECKeyVerification: 135 """ 136 The main class when it comes to verification itself. It wraps Unbound context and a cache with 137 already obtained results. 138 """ 139 140 # Mapping from email address to b64 encoded public key or NoKey in case of proven nonexistence 141 _cache = {} 142 # type: Dict[str, Union[str, NoKey]] 143 144 @staticmethod 145 def _cache_hit(key_union, input_key_string): 146 # type: (Union[str, NoKey], str) -> Validity 147 """ 148 Compare the key in case it was found in the cache. 149 """ 150 if key_union == input_key_string: 151 logger.debug("Cache hit, valid key") 152 return Validity.VALID 153 elif key_union is NoKey: 154 logger.debug("Cache hit, proven non-existence") 155 return Validity.PROVEN_NONEXISTENCE 156 else: 157 logger.debug("Key in cache: {}".format(key_union)) 158 logger.debug("Input key : {}".format(input_key_string)) 159 return Validity.REVOKED 160 161 @staticmethod 162 def _cache_miss(input_key): 163 # type: (KeyInfo) -> Validity 164 """ 165 In case the key was not found in the cache, create an Unbound context and contact the DNS 166 system 167 """ 168 try: 169 import unbound 170 except ImportError as e: 171 msg = _("Configuration option 'gpgkey_dns_verification' requires " 172 "python3-unbound ({})".format(e)) 173 raise dnf.exceptions.Error(msg) 174 175 ctx = unbound.ub_ctx() 176 if ctx.set_option("verbosity:", "0") != 0: 177 logger.debug("Unbound context: Failed to set verbosity") 178 179 if ctx.set_option("qname-minimisation:", "yes") != 0: 180 logger.debug("Unbound context: Failed to set qname minimisation") 181 182 if ctx.resolvconf() != 0: 183 logger.debug("Unbound context: Failed to read resolv.conf") 184 185 if ctx.add_ta_file("/var/lib/unbound/root.key") != 0: 186 logger.debug("Unbound context: Failed to add trust anchor file") 187 188 status, result = ctx.resolve(email2location(input_key.email), 189 RR_TYPE_OPENPGPKEY, unbound.RR_CLASS_IN) 190 if status != 0: 191 logger.debug("Communication with DNS servers failed") 192 return Validity.ERROR 193 if result.bogus: 194 logger.debug("DNSSEC signatures are wrong") 195 return Validity.BOGUS_RESULT 196 if not result.secure: 197 logger.debug("Result is not secured with DNSSEC") 198 return Validity.RESULT_NOT_SECURE 199 if result.nxdomain or (result.rcode == unbound.RCODE_NOERROR and not result.havedata): 200 logger.debug("Non-existence of this record was proven by DNSSEC") 201 return Validity.PROVEN_NONEXISTENCE 202 if not result.havedata: 203 # TODO: This is weird result, but there is no way to perform validation, so just return 204 # an error 205 # Should handle only SERVFAIL, REFUSED and similar rcodes 206 logger.debug("Unknown error in DNS communication: {}".format(result.rcode_str)) 207 return Validity.ERROR 208 else: 209 data = result.data.as_raw_data()[0] 210 dns_data_b64 = base64.b64encode(data) 211 if dns_data_b64 == input_key.key: 212 return Validity.VALID 213 else: 214 # In case it is different, print the keys for further examination in debug mode 215 logger.debug("Key from DNS: {}".format(dns_data_b64)) 216 logger.debug("Input key : {}".format(input_key.key)) 217 return Validity.REVOKED 218 219 @staticmethod 220 def verify(input_key): 221 # type: (KeyInfo) -> Validity 222 """ 223 Public API. Use this method to verify a KeyInfo object. 224 """ 225 logger.debug("Running verification for key with id: {}".format(input_key.email)) 226 key_union = DNSSECKeyVerification._cache.get(input_key.email) 227 if key_union is not None: 228 return DNSSECKeyVerification._cache_hit(key_union, input_key.key) 229 else: 230 result = DNSSECKeyVerification._cache_miss(input_key) 231 if result == Validity.VALID: 232 DNSSECKeyVerification._cache[input_key.email] = input_key.key 233 elif result == Validity.PROVEN_NONEXISTENCE: 234 DNSSECKeyVerification._cache[input_key.email] = NoKey() 235 return result 236 237 238def nice_user_msg(ki, v): 239 # type: (KeyInfo, Validity) -> str 240 """ 241 Inform the user about key validity in a human readable way. 242 """ 243 prefix = _("DNSSEC extension: Key for user ") + ki.email + " " 244 if v == Validity.VALID: 245 return prefix + _("is valid.") 246 else: 247 return prefix + _("has unknown status.") 248 249 250def any_msg(m): 251 # type: (str) -> str 252 """ 253 Label any given message with DNSSEC extension tag 254 """ 255 return _("DNSSEC extension: ") + m 256 257 258class RpmImportedKeys: 259 """ 260 Wrapper around keys, that are imported in the RPM database. 261 262 The keys are stored in packages with name gpg-pubkey, where the version and 263 release is different for each of them. The key content itself is stored as 264 an ASCII armored string in the package description, so it needs to be parsed 265 before it can be used. 266 """ 267 @staticmethod 268 def _query_db_for_gpg_keys(): 269 # type: () -> List[KeyInfo] 270 # TODO: base.conf.installroot ?? -----------------------\ 271 transaction_set = dnf.rpm.transaction.TransactionWrapper() 272 packages = transaction_set.dbMatch("name", "gpg-pubkey") 273 return_list = [] 274 for pkg in packages: 275 packager = dnf.rpm.getheader(pkg, 'packager') 276 email = re.search('<(.*@.*)>', packager).group(1) 277 description = dnf.rpm.getheader(pkg, 'description') 278 key_lines = description.split('\n')[3:-3] 279 key_str = ''.join(key_lines) 280 return_list += [KeyInfo(email, key_str.encode('ascii'))] 281 282 return return_list 283 284 @staticmethod 285 def check_imported_keys_validity(): 286 keys = RpmImportedKeys._query_db_for_gpg_keys() 287 logger.info(any_msg(_("Testing already imported keys for their validity."))) 288 for key in keys: 289 try: 290 result = DNSSECKeyVerification.verify(key) 291 except DnssecError as e: 292 # Errors in this exception should not be fatal, print it and just continue 293 logger.warning("DNSSEC extension error (email={}): {}" 294 .format(key.email, e.value)) 295 continue 296 # TODO: remove revoked keys automatically and possibly ask user to confirm 297 if result == Validity.VALID: 298 logger.debug(any_msg("GPG Key {} is valid".format(key.email))) 299 pass 300 elif result == Validity.PROVEN_NONEXISTENCE: 301 logger.debug(any_msg("GPG Key {} does not support DNS" 302 " verification".format(key.email))) 303 elif result == Validity.BOGUS_RESULT: 304 logger.info(any_msg("GPG Key {} could not be verified, because DNSSEC signatures" 305 " are bogus. Possible causes: wrong configuration of the DNS" 306 " server, MITM attack".format(key.email))) 307 elif result == Validity.REVOKED: 308 logger.info(any_msg("GPG Key {} has been revoked and should" 309 " be removed immediately".format(key.email))) 310 else: 311 logger.debug(any_msg("GPG Key {} could not be tested".format(key.email))) 312