1# 2# This file is a part of DNSViz, a tool suite for DNS/DNSSEC monitoring, 3# analysis, and visualization. 4# Created by Casey Deccio (casey@deccio.net) 5# 6# Copyright 2012-2014 Sandia Corporation. Under the terms of Contract 7# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains 8# certain rights in this software. 9# 10# Copyright 2014-2016 VeriSign, Inc. 11# 12# Copyright 2016-2021 Casey Deccio 13# 14# DNSViz is free software; you can redistribute it and/or modify 15# it under the terms of the GNU General Public License as published by 16# the Free Software Foundation; either version 2 of the License, or 17# (at your option) any later version. 18# 19# DNSViz is distributed in the hope that it will be useful, 20# but WITHOUT ANY WARRANTY; without even the implied warranty of 21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 22# GNU General Public License for more details. 23# 24# You should have received a copy of the GNU General Public License along 25# with DNSViz. If not, see <http://www.gnu.org/licenses/>. 26# 27 28from __future__ import unicode_literals 29 30import atexit 31import base64 32import binascii 33import logging 34import struct 35import hashlib 36import os 37import re 38 39from . import format as fmt 40lb2s = fmt.latin1_binary_to_string 41 42logger = logging.getLogger(__name__) 43 44ALG_TYPE_DNSSEC = 0 45ALG_TYPE_DIGEST = 1 46ALG_TYPE_NSEC3 = 2 47 48ALG_TYPE_DNSSEC_TEXT = [ 49 'algorithm', 50 'digest algorithm', 51 'NSEC3 algorithm', 52] 53 54_crypto_sources = { 55 'M2Crypto >= 0.21.1': (set([1,5,7,8,10]), set([1,2,4]), set([1])), 56 'M2Crypto >= 0.24.0': (set([3,6,13,14]), set(), set()), 57 'M2Crypto >= 0.24.0 and either openssl < 1.1.0 or openssl >= 1.1.0 plus the OpenSSL GOST Engine': (set([12]), set([3]), set()), 58 'M2Crypto >= 0.37.0 and openssl >= 1.1.1': (set([15,16]), set(), set()), 59} 60_logged_modules = set() 61 62_supported_algs = set() 63_supported_digest_algs = set() 64_supported_nsec3_algs = set([1]) 65try: 66 from M2Crypto import EVP, RSA 67 from M2Crypto.m2 import hex_to_bn, bn_to_mpi 68except: 69 pass 70else: 71 _supported_algs.update(set([1,5,7,8,10])) 72 _supported_digest_algs.update(set([1,2,4])) 73 74GOST_PREFIX = b'\x30\x63\x30\x1c\x06\x06\x2a\x85\x03\x02\x02\x13\x30\x12\x06\x07\x2a\x85\x03\x02\x02\x23\x01\x06\x07\x2a\x85\x03\x02\x02\x1e\x01\x03\x43\x00\x04\x40' 75GOST_ENGINE_NAME = b'gost' 76GOST_DIGEST_NAME = b'GOST R 34.11-94' 77 78ED25519_PREFIX = b'\x30\x2a\x30\x05\x06\x03\x2b\x65\x70\x03\x21\x00' 79ED448_PREFIX = b'\x30\x43\x30\x05\x06\x03\x2b\x65\x71\x03\x3a\x00' 80 81# python3/python2 dual compatibility 82if not isinstance(GOST_ENGINE_NAME, str): 83 GOST_ENGINE_NAME = lb2s(GOST_ENGINE_NAME) 84 GOST_DIGEST_NAME = lb2s(GOST_DIGEST_NAME) 85 86try: 87 # available from python 3.1 88 base64encodebytes = base64.encodebytes 89except AttributeError: 90 # available until python 3.8 91 base64encodebytes = base64.encodestring 92 93EC_NOCOMPRESSION = b'\x04' 94 95 96def _init_dynamic(): 97 try: 98 Engine.load_dynamic() 99 except Engine.EngineError: 100 pass 101 else: 102 atexit.register(Engine.cleanup) 103 104def _check_dsa_support(): 105 try: 106 DSA.pub_key_from_params 107 _supported_algs.update((3,6)) 108 except AttributeError: 109 pass 110 111def _check_gost_support(): 112 _gost_init() 113 try: 114 md = EVP.MessageDigest(GOST_DIGEST_NAME) 115 except ValueError: 116 pass 117 else: 118 _supported_algs.add(12) 119 _supported_digest_algs.add(3) 120 finally: 121 _gost_cleanup() 122 123def _check_ec_support(): 124 try: 125 EC.pub_key_from_params 126 _supported_algs.update((13,14)) 127 except AttributeError: 128 pass 129 130def _check_ed_support(): 131 if m2.OPENSSL_VERSION_NUMBER >= 0x10101000: 132 _supported_algs.update((15,16)) 133 134def alg_is_supported(alg): 135 return alg in _supported_algs 136 137def digest_alg_is_supported(alg): 138 return alg in _supported_digest_algs 139 140def nsec3_alg_is_supported(alg): 141 return alg in _supported_nsec3_algs 142 143def _log_unsupported_alg(alg, alg_type): 144 for mod in _crypto_sources: 145 if alg in _crypto_sources[mod][alg_type]: 146 if mod not in _logged_modules: 147 _logged_modules.add(mod) 148 logger.warning('Warning: Without the installation of %s, cryptographic validation of DNSSEC %s %d (and possibly others) is not supported.' % (mod, ALG_TYPE_DNSSEC_TEXT[alg_type], alg)) 149 return 150 151def _gost_init(): 152 try: 153 gost = Engine.Engine(GOST_ENGINE_NAME) 154 gost.init() 155 gost.set_default() 156 except ValueError: 157 pass 158 159def _gost_cleanup(): 160 from M2Crypto import Engine 161 try: 162 gost = Engine.Engine(GOST_ENGINE_NAME) 163 except ValueError: 164 pass 165 else: 166 gost.finish() 167 168try: 169 from M2Crypto import DSA 170except: 171 pass 172else: 173 _check_dsa_support() 174 175try: 176 from M2Crypto import Engine, m2 177 _init_dynamic() 178except: 179 pass 180else: 181 _check_gost_support() 182 183try: 184 from M2Crypto import EC 185except: 186 pass 187else: 188 _check_ec_support() 189 190try: 191 from M2Crypto.m2 import digest_verify_init 192except: 193 pass 194else: 195 _check_ed_support() 196 197def validate_ds_digest(digest_alg, digest, dnskey_msg): 198 if not digest_alg_is_supported(digest_alg): 199 _log_unsupported_alg(digest_alg, ALG_TYPE_DIGEST) 200 return None 201 202 if digest_alg == 1: 203 md = EVP.MessageDigest('sha1') 204 md.update(dnskey_msg) 205 return md.final() == digest 206 elif digest_alg == 2: 207 md = EVP.MessageDigest('sha256') 208 md.update(dnskey_msg) 209 return md.final() == digest 210 elif digest_alg == 3: 211 _gost_init() 212 try: 213 md = EVP.MessageDigest(GOST_DIGEST_NAME) 214 md.update(dnskey_msg) 215 return md.final() == digest 216 finally: 217 _gost_cleanup() 218 elif digest_alg == 4: 219 md = EVP.MessageDigest('sha384') 220 md.update(dnskey_msg) 221 return md.final() == digest 222 223def _dnskey_to_dsa(key): 224 # get T 225 t = key[0] 226 # python3/python2 dual compatibility 227 if not isinstance(t, int): 228 t = ord(t) 229 offset = 1 230 231 # get Q 232 new_offset = offset+20 233 q = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset]))) 234 offset = new_offset 235 236 # get P 237 new_offset = offset+64+(t<<3) 238 p = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset]))) 239 offset = new_offset 240 241 # get G 242 new_offset = offset+64+(t<<3) 243 g = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset]))) 244 offset = new_offset 245 246 # get Y 247 new_offset = offset+64+(t<<3) 248 y = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset]))) 249 offset = new_offset 250 251 # create the DSA public key 252 return DSA.pub_key_from_params(p,q,g,y) 253 254def _dnskey_to_rsa(key): 255 try: 256 # get the exponent length 257 e_len = key[0] 258 except IndexError: 259 return None 260 # python3/python2 dual compatibility 261 if not isinstance(e_len, int): 262 e_len = ord(e_len) 263 264 offset = 1 265 if e_len == 0: 266 e_len, = struct.unpack(b'!H',key[1:3]) 267 offset = 3 268 269 # get the exponent 270 e = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:offset+e_len]))) 271 offset += e_len 272 273 # get the modulus 274 n = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:]))) 275 276 # create the RSA public key 277 rsa = RSA.new_pub_key((e,n)) 278 pubkey = EVP.PKey() 279 pubkey.assign_rsa(rsa) 280 281 return pubkey 282 283def _dnskey_to_gost(key): 284 der = GOST_PREFIX + key 285 pem = b'-----BEGIN PUBLIC KEY-----\n'+base64encodebytes(der)+b'-----END PUBLIC KEY-----' 286 287 return EVP.load_key_string_pubkey(pem) 288 289def _dnskey_to_ed(alg, key): 290 if alg == 15: 291 der = ED25519_PREFIX + key 292 elif alg == 16: 293 der = ED448_PREFIX + key 294 else: 295 raise ValueError('Algorithm not supported') 296 297 pem = b'-----BEGIN PUBLIC KEY-----\n'+base64encodebytes(der)+b'-----END PUBLIC KEY-----' 298 return EVP.load_key_string_pubkey(pem) 299 300def _dnskey_to_ec(alg, key): 301 if alg == 13: 302 curve = EC.NID_X9_62_prime256v1 303 elif alg == 14: 304 curve = EC.NID_secp384r1 305 else: 306 raise ValueError('Algorithm not supported') 307 308 try: 309 return EC.pub_key_from_params(curve, EC_NOCOMPRESSION + key) 310 except ValueError: 311 return None 312 313def _validate_rrsig_rsa(alg, sig, msg, key): 314 pubkey = _dnskey_to_rsa(key) 315 316 # if the key is invalid, then the signature is also invalid 317 if pubkey is None: 318 return False 319 320 if alg in (1,): 321 md='md5' 322 elif alg in (5,7): 323 md='sha1' 324 elif alg in (8,): 325 md='sha256' 326 elif alg in (10,): 327 md='sha512' 328 else: 329 raise ValueError('RSA Algorithm unknown.') 330 331 # reset context for appropriate hash 332 pubkey.reset_context(md=md) 333 pubkey.verify_init() 334 pubkey.verify_update(msg) 335 336 return pubkey.verify_final(sig) == 1 337 338def _validate_rrsig_dsa(alg, sig, msg, key): 339 pubkey = _dnskey_to_dsa(key) 340 341 # if the key is invalid, then the signature is also invalid 342 if pubkey is None: 343 return False 344 345 # get T 346 t = sig[0] 347 # python3/python2 dual compatibility 348 if not isinstance(t, int): 349 t = ord(t) 350 offset = 1 351 352 # get R 353 new_offset = offset+20 354 r = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset]))) 355 offset = new_offset 356 357 # get S 358 new_offset = offset+20 359 s = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset]))) 360 offset = new_offset 361 362 md = EVP.MessageDigest('sha1') 363 md.update(msg) 364 digest = md.final() 365 366 return pubkey.verify(digest, r, s) == 1 367 368def _validate_rrsig_gost(alg, sig, msg, key): 369 _gost_init() 370 371 try: 372 pubkey = _dnskey_to_gost(key) 373 374 # if the key is invalid, then the signature is also invalid 375 if pubkey is None: 376 return False 377 378 pubkey.md = m2.get_digestbyname(GOST_DIGEST_NAME) 379 pubkey.verify_init() 380 pubkey.verify_update(msg) 381 382 return pubkey.verify_final(sig) == 1 383 384 finally: 385 _gost_cleanup() 386 387def _validate_rrsig_ec(alg, sig, msg, key): 388 pubkey = _dnskey_to_ec(alg, key) 389 390 # if the key is invalid, then the signature is also invalid 391 if pubkey is None: 392 return False 393 394 if alg in (13,): 395 alg='sha256' 396 sigsize = 64 397 elif alg in (14,): 398 alg='sha384' 399 sigsize = 96 400 else: 401 raise ValueError('EC hash algorithm unknown!') 402 403 if sigsize != len(sig): 404 return False 405 406 offset = 0 407 408 # get R 409 new_offset = offset+sigsize//2 410 r = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset]))) 411 offset = new_offset 412 413 # get S 414 new_offset = offset+sigsize//2 415 s = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset]))) 416 offset = new_offset 417 418 md = EVP.MessageDigest(alg) 419 md.update(msg) 420 digest = md.final() 421 422 return pubkey.verify_dsa(digest, r, s) == 1 423 424def _validate_rrsig_ed(alg, sig, msg, key): 425 pubkey = _dnskey_to_ed(alg, key) 426 427 # if the key is invalid, then the signature is also invalid 428 if pubkey is None: 429 return False 430 431 pubkey.reset_context(None) 432 pubkey.digest_verify_init() 433 return pubkey.digest_verify(sig, msg) == 1 434 435def validate_rrsig(alg, sig, msg, key): 436 if not alg_is_supported(alg): 437 _log_unsupported_alg(alg, ALG_TYPE_DNSSEC) 438 return None 439 440 # create an RSA key object for RSA keys 441 if alg in (1,5,7,8,10): 442 return _validate_rrsig_rsa(alg, sig, msg, key) 443 elif alg in (3,6): 444 return _validate_rrsig_dsa(alg, sig, msg, key) 445 elif alg in (12,): 446 return _validate_rrsig_gost(alg, sig, msg, key) 447 elif alg in (13,14): 448 return _validate_rrsig_ec(alg, sig, msg, key) 449 elif alg in (15,16): 450 return _validate_rrsig_ed(alg, sig, msg, key) 451 452def get_digest_for_nsec3(val, salt, alg, iterations): 453 if not nsec3_alg_is_supported(alg): 454 _log_unsupported_alg(alg, ALG_TYPE_NSEC3) 455 return None 456 457 if alg == 1: 458 hash_func = hashlib.sha1 459 460 for i in range(iterations + 1): 461 val = hash_func(val + salt).digest() 462 return val 463