1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 2 3# Copyright (C) 2003-2017 Nominum, Inc. 4# 5# Permission to use, copy, modify, and distribute this software and its 6# documentation for any purpose with or without fee is hereby granted, 7# provided that the above copyright notice and this permission notice 8# appear in all copies. 9# 10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 18"""Common DNSSEC-related functions and constants.""" 19 20from io import BytesIO 21import struct 22import time 23 24import dns.exception 25import dns.name 26import dns.node 27import dns.rdataset 28import dns.rdata 29import dns.rdatatype 30import dns.rdataclass 31from ._compat import string_types 32 33 34class UnsupportedAlgorithm(dns.exception.DNSException): 35 """The DNSSEC algorithm is not supported.""" 36 37 38class ValidationFailure(dns.exception.DNSException): 39 """The DNSSEC signature is invalid.""" 40 41 42#: RSAMD5 43RSAMD5 = 1 44#: DH 45DH = 2 46#: DSA 47DSA = 3 48#: ECC 49ECC = 4 50#: RSASHA1 51RSASHA1 = 5 52#: DSANSEC3SHA1 53DSANSEC3SHA1 = 6 54#: RSASHA1NSEC3SHA1 55RSASHA1NSEC3SHA1 = 7 56#: RSASHA256 57RSASHA256 = 8 58#: RSASHA512 59RSASHA512 = 10 60#: ECDSAP256SHA256 61ECDSAP256SHA256 = 13 62#: ECDSAP384SHA384 63ECDSAP384SHA384 = 14 64#: INDIRECT 65INDIRECT = 252 66#: PRIVATEDNS 67PRIVATEDNS = 253 68#: PRIVATEOID 69PRIVATEOID = 254 70 71_algorithm_by_text = { 72 'RSAMD5': RSAMD5, 73 'DH': DH, 74 'DSA': DSA, 75 'ECC': ECC, 76 'RSASHA1': RSASHA1, 77 'DSANSEC3SHA1': DSANSEC3SHA1, 78 'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1, 79 'RSASHA256': RSASHA256, 80 'RSASHA512': RSASHA512, 81 'INDIRECT': INDIRECT, 82 'ECDSAP256SHA256': ECDSAP256SHA256, 83 'ECDSAP384SHA384': ECDSAP384SHA384, 84 'PRIVATEDNS': PRIVATEDNS, 85 'PRIVATEOID': PRIVATEOID, 86} 87 88# We construct the inverse mapping programmatically to ensure that we 89# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that 90# would cause the mapping not to be true inverse. 91 92_algorithm_by_value = {y: x for x, y in _algorithm_by_text.items()} 93 94 95def algorithm_from_text(text): 96 """Convert text into a DNSSEC algorithm value. 97 98 Returns an ``int``. 99 """ 100 101 value = _algorithm_by_text.get(text.upper()) 102 if value is None: 103 value = int(text) 104 return value 105 106 107def algorithm_to_text(value): 108 """Convert a DNSSEC algorithm value to text 109 110 Returns a ``str``. 111 """ 112 113 text = _algorithm_by_value.get(value) 114 if text is None: 115 text = str(value) 116 return text 117 118 119def _to_rdata(record, origin): 120 s = BytesIO() 121 record.to_wire(s, origin=origin) 122 return s.getvalue() 123 124 125def key_id(key, origin=None): 126 """Return the key id (a 16-bit number) for the specified key. 127 128 Note the *origin* parameter of this function is historical and 129 is not needed. 130 131 Returns an ``int`` between 0 and 65535. 132 """ 133 134 rdata = _to_rdata(key, origin) 135 rdata = bytearray(rdata) 136 if key.algorithm == RSAMD5: 137 return (rdata[-3] << 8) + rdata[-2] 138 else: 139 total = 0 140 for i in range(len(rdata) // 2): 141 total += (rdata[2 * i] << 8) + \ 142 rdata[2 * i + 1] 143 if len(rdata) % 2 != 0: 144 total += rdata[len(rdata) - 1] << 8 145 total += ((total >> 16) & 0xffff) 146 return total & 0xffff 147 148 149def make_ds(name, key, algorithm, origin=None): 150 """Create a DS record for a DNSSEC key. 151 152 *name* is the owner name of the DS record. 153 154 *key* is a ``dns.rdtypes.ANY.DNSKEY``. 155 156 *algorithm* is a string describing which hash algorithm to use. The 157 currently supported hashes are "SHA1" and "SHA256". Case does not 158 matter for these strings. 159 160 *origin* is a ``dns.name.Name`` and will be used as the origin 161 if *key* is a relative name. 162 163 Returns a ``dns.rdtypes.ANY.DS``. 164 """ 165 166 if algorithm.upper() == 'SHA1': 167 dsalg = 1 168 hash = SHA1.new() 169 elif algorithm.upper() == 'SHA256': 170 dsalg = 2 171 hash = SHA256.new() 172 else: 173 raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm) 174 175 if isinstance(name, string_types): 176 name = dns.name.from_text(name, origin) 177 hash.update(name.canonicalize().to_wire()) 178 hash.update(_to_rdata(key, origin)) 179 digest = hash.digest() 180 181 dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest 182 return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0, 183 len(dsrdata)) 184 185 186def _find_candidate_keys(keys, rrsig): 187 candidate_keys = [] 188 value = keys.get(rrsig.signer) 189 if value is None: 190 return None 191 if isinstance(value, dns.node.Node): 192 try: 193 rdataset = value.find_rdataset(dns.rdataclass.IN, 194 dns.rdatatype.DNSKEY) 195 except KeyError: 196 return None 197 else: 198 rdataset = value 199 for rdata in rdataset: 200 if rdata.algorithm == rrsig.algorithm and \ 201 key_id(rdata) == rrsig.key_tag: 202 candidate_keys.append(rdata) 203 return candidate_keys 204 205 206def _is_rsa(algorithm): 207 return algorithm in (RSAMD5, RSASHA1, 208 RSASHA1NSEC3SHA1, RSASHA256, 209 RSASHA512) 210 211 212def _is_dsa(algorithm): 213 return algorithm in (DSA, DSANSEC3SHA1) 214 215 216def _is_ecdsa(algorithm): 217 return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384)) 218 219 220def _is_md5(algorithm): 221 return algorithm == RSAMD5 222 223 224def _is_sha1(algorithm): 225 return algorithm in (DSA, RSASHA1, 226 DSANSEC3SHA1, RSASHA1NSEC3SHA1) 227 228 229def _is_sha256(algorithm): 230 return algorithm in (RSASHA256, ECDSAP256SHA256) 231 232 233def _is_sha384(algorithm): 234 return algorithm == ECDSAP384SHA384 235 236 237def _is_sha512(algorithm): 238 return algorithm == RSASHA512 239 240 241def _make_hash(algorithm): 242 if _is_md5(algorithm): 243 return MD5.new() 244 if _is_sha1(algorithm): 245 return SHA1.new() 246 if _is_sha256(algorithm): 247 return SHA256.new() 248 if _is_sha384(algorithm): 249 return SHA384.new() 250 if _is_sha512(algorithm): 251 return SHA512.new() 252 raise ValidationFailure('unknown hash for algorithm %u' % algorithm) 253 254 255def _make_algorithm_id(algorithm): 256 if _is_md5(algorithm): 257 oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05] 258 elif _is_sha1(algorithm): 259 oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a] 260 elif _is_sha256(algorithm): 261 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01] 262 elif _is_sha512(algorithm): 263 oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03] 264 else: 265 raise ValidationFailure('unknown algorithm %u' % algorithm) 266 olen = len(oid) 267 dlen = _make_hash(algorithm).digest_size 268 idbytes = [0x30] + [8 + olen + dlen] + \ 269 [0x30, olen + 4] + [0x06, olen] + oid + \ 270 [0x05, 0x00] + [0x04, dlen] 271 return struct.pack('!%dB' % len(idbytes), *idbytes) 272 273 274def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None): 275 """Validate an RRset against a single signature rdata 276 277 The owner name of *rrsig* is assumed to be the same as the owner name 278 of *rrset*. 279 280 *rrset* is the RRset to validate. It can be a ``dns.rrset.RRset`` or 281 a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 282 283 *rrsig* is a ``dns.rdata.Rdata``, the signature to validate. 284 285 *keys* is the key dictionary, used to find the DNSKEY associated with 286 a given name. The dictionary is keyed by a ``dns.name.Name``, and has 287 ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values. 288 289 *origin* is a ``dns.name.Name``, the origin to use for relative names. 290 291 *now* is an ``int``, the time to use when validating the signatures, 292 in seconds since the UNIX epoch. The default is the current time. 293 """ 294 295 if isinstance(origin, string_types): 296 origin = dns.name.from_text(origin, dns.name.root) 297 298 candidate_keys = _find_candidate_keys(keys, rrsig) 299 if candidate_keys is None: 300 raise ValidationFailure('unknown key') 301 302 for candidate_key in candidate_keys: 303 # For convenience, allow the rrset to be specified as a (name, 304 # rdataset) tuple as well as a proper rrset 305 if isinstance(rrset, tuple): 306 rrname = rrset[0] 307 rdataset = rrset[1] 308 else: 309 rrname = rrset.name 310 rdataset = rrset 311 312 if now is None: 313 now = time.time() 314 if rrsig.expiration < now: 315 raise ValidationFailure('expired') 316 if rrsig.inception > now: 317 raise ValidationFailure('not yet valid') 318 319 hash = _make_hash(rrsig.algorithm) 320 321 if _is_rsa(rrsig.algorithm): 322 keyptr = candidate_key.key 323 (bytes_,) = struct.unpack('!B', keyptr[0:1]) 324 keyptr = keyptr[1:] 325 if bytes_ == 0: 326 (bytes_,) = struct.unpack('!H', keyptr[0:2]) 327 keyptr = keyptr[2:] 328 rsa_e = keyptr[0:bytes_] 329 rsa_n = keyptr[bytes_:] 330 try: 331 pubkey = CryptoRSA.construct( 332 (number.bytes_to_long(rsa_n), 333 number.bytes_to_long(rsa_e))) 334 except ValueError: 335 raise ValidationFailure('invalid public key') 336 sig = rrsig.signature 337 elif _is_dsa(rrsig.algorithm): 338 keyptr = candidate_key.key 339 (t,) = struct.unpack('!B', keyptr[0:1]) 340 keyptr = keyptr[1:] 341 octets = 64 + t * 8 342 dsa_q = keyptr[0:20] 343 keyptr = keyptr[20:] 344 dsa_p = keyptr[0:octets] 345 keyptr = keyptr[octets:] 346 dsa_g = keyptr[0:octets] 347 keyptr = keyptr[octets:] 348 dsa_y = keyptr[0:octets] 349 pubkey = CryptoDSA.construct( 350 (number.bytes_to_long(dsa_y), 351 number.bytes_to_long(dsa_g), 352 number.bytes_to_long(dsa_p), 353 number.bytes_to_long(dsa_q))) 354 sig = rrsig.signature[1:] 355 elif _is_ecdsa(rrsig.algorithm): 356 # use ecdsa for NIST-384p -- not currently supported by pycryptodome 357 358 keyptr = candidate_key.key 359 360 if rrsig.algorithm == ECDSAP256SHA256: 361 curve = ecdsa.curves.NIST256p 362 key_len = 32 363 elif rrsig.algorithm == ECDSAP384SHA384: 364 curve = ecdsa.curves.NIST384p 365 key_len = 48 366 367 x = number.bytes_to_long(keyptr[0:key_len]) 368 y = number.bytes_to_long(keyptr[key_len:key_len * 2]) 369 if not ecdsa.ecdsa.point_is_valid(curve.generator, x, y): 370 raise ValidationFailure('invalid ECDSA key') 371 point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order) 372 verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point, 373 curve) 374 pubkey = ECKeyWrapper(verifying_key, key_len) 375 r = rrsig.signature[:key_len] 376 s = rrsig.signature[key_len:] 377 sig = ecdsa.ecdsa.Signature(number.bytes_to_long(r), 378 number.bytes_to_long(s)) 379 380 else: 381 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 382 383 hash.update(_to_rdata(rrsig, origin)[:18]) 384 hash.update(rrsig.signer.to_digestable(origin)) 385 386 if rrsig.labels < len(rrname) - 1: 387 suffix = rrname.split(rrsig.labels + 1)[1] 388 rrname = dns.name.from_text('*', suffix) 389 rrnamebuf = rrname.to_digestable(origin) 390 rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass, 391 rrsig.original_ttl) 392 rrlist = sorted(rdataset) 393 for rr in rrlist: 394 hash.update(rrnamebuf) 395 hash.update(rrfixed) 396 rrdata = rr.to_digestable(origin) 397 rrlen = struct.pack('!H', len(rrdata)) 398 hash.update(rrlen) 399 hash.update(rrdata) 400 401 try: 402 if _is_rsa(rrsig.algorithm): 403 verifier = pkcs1_15.new(pubkey) 404 # will raise ValueError if verify fails: 405 verifier.verify(hash, sig) 406 elif _is_dsa(rrsig.algorithm): 407 verifier = DSS.new(pubkey, 'fips-186-3') 408 verifier.verify(hash, sig) 409 elif _is_ecdsa(rrsig.algorithm): 410 digest = hash.digest() 411 if not pubkey.verify(digest, sig): 412 raise ValueError 413 else: 414 # Raise here for code clarity; this won't actually ever happen 415 # since if the algorithm is really unknown we'd already have 416 # raised an exception above 417 raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm) 418 # If we got here, we successfully verified so we can return without error 419 return 420 except ValueError: 421 # this happens on an individual validation failure 422 continue 423 # nothing verified -- raise failure: 424 raise ValidationFailure('verify failure') 425 426 427def _validate(rrset, rrsigset, keys, origin=None, now=None): 428 """Validate an RRset. 429 430 *rrset* is the RRset to validate. It can be a ``dns.rrset.RRset`` or 431 a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 432 433 *rrsigset* is the signature RRset to be validated. It can be a 434 ``dns.rrset.RRset`` or a ``(dns.name.Name, dns.rdataset.Rdataset)`` tuple. 435 436 *keys* is the key dictionary, used to find the DNSKEY associated with 437 a given name. The dictionary is keyed by a ``dns.name.Name``, and has 438 ``dns.node.Node`` or ``dns.rdataset.Rdataset`` values. 439 440 *origin* is a ``dns.name.Name``, the origin to use for relative names. 441 442 *now* is an ``int``, the time to use when validating the signatures, 443 in seconds since the UNIX epoch. The default is the current time. 444 """ 445 446 if isinstance(origin, string_types): 447 origin = dns.name.from_text(origin, dns.name.root) 448 449 if isinstance(rrset, tuple): 450 rrname = rrset[0] 451 else: 452 rrname = rrset.name 453 454 if isinstance(rrsigset, tuple): 455 rrsigname = rrsigset[0] 456 rrsigrdataset = rrsigset[1] 457 else: 458 rrsigname = rrsigset.name 459 rrsigrdataset = rrsigset 460 461 rrname = rrname.choose_relativity(origin) 462 rrsigname = rrsigname.choose_relativity(origin) 463 if rrname != rrsigname: 464 raise ValidationFailure("owner names do not match") 465 466 for rrsig in rrsigrdataset: 467 try: 468 _validate_rrsig(rrset, rrsig, keys, origin, now) 469 return 470 except ValidationFailure: 471 pass 472 raise ValidationFailure("no RRSIGs validated") 473 474 475def _need_pycrypto(*args, **kwargs): 476 raise NotImplementedError("DNSSEC validation requires pycryptodome/pycryptodomex") 477 478 479try: 480 try: 481 # test we're using pycryptodome, not pycrypto (which misses SHA1 for example) 482 from Crypto.Hash import MD5, SHA1, SHA256, SHA384, SHA512 483 from Crypto.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA 484 from Crypto.Signature import pkcs1_15, DSS 485 from Crypto.Util import number 486 except ImportError: 487 from Cryptodome.Hash import MD5, SHA1, SHA256, SHA384, SHA512 488 from Cryptodome.PublicKey import RSA as CryptoRSA, DSA as CryptoDSA 489 from Cryptodome.Signature import pkcs1_15, DSS 490 from Cryptodome.Util import number 491except ImportError: 492 validate = _need_pycrypto 493 validate_rrsig = _need_pycrypto 494 _have_pycrypto = False 495 _have_ecdsa = False 496else: 497 validate = _validate 498 validate_rrsig = _validate_rrsig 499 _have_pycrypto = True 500 501 try: 502 import ecdsa 503 import ecdsa.ecdsa 504 import ecdsa.ellipticcurve 505 import ecdsa.keys 506 except ImportError: 507 _have_ecdsa = False 508 else: 509 _have_ecdsa = True 510 511 class ECKeyWrapper(object): 512 513 def __init__(self, key, key_len): 514 self.key = key 515 self.key_len = key_len 516 517 def verify(self, digest, sig): 518 diglong = number.bytes_to_long(digest) 519 return self.key.pubkey.verifies(diglong, sig) 520