1#
2# This file is a part of DNSViz, a tool suite for DNS/DNSSEC monitoring,
3# analysis, and visualization.
4# Created by Casey Deccio (casey@deccio.net)
5#
6# Copyright 2012-2014 Sandia Corporation. Under the terms of Contract
7# DE-AC04-94AL85000 with Sandia Corporation, the U.S. Government retains
8# certain rights in this software.
9#
10# Copyright 2014-2016 VeriSign, Inc.
11#
12# Copyright 2016-2021 Casey Deccio
13#
14# DNSViz is free software; you can redistribute it and/or modify
15# it under the terms of the GNU General Public License as published by
16# the Free Software Foundation; either version 2 of the License, or
17# (at your option) any later version.
18#
19# DNSViz is distributed in the hope that it will be useful,
20# but WITHOUT ANY WARRANTY; without even the implied warranty of
21# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
22# GNU General Public License for more details.
23#
24# You should have received a copy of the GNU General Public License along
25# with DNSViz.  If not, see <http://www.gnu.org/licenses/>.
26#
27
28from __future__ import unicode_literals
29
30import atexit
31import base64
32import binascii
33import logging
34import struct
35import hashlib
36import os
37import re
38
39from . import format as fmt
40lb2s = fmt.latin1_binary_to_string
41
42logger = logging.getLogger(__name__)
43
44ALG_TYPE_DNSSEC = 0
45ALG_TYPE_DIGEST = 1
46ALG_TYPE_NSEC3 = 2
47
48ALG_TYPE_DNSSEC_TEXT = [
49        'algorithm',
50        'digest algorithm',
51        'NSEC3 algorithm',
52]
53
54_crypto_sources = {
55        'M2Crypto >= 0.21.1': (set([1,5,7,8,10]), set([1,2,4]), set([1])),
56        'M2Crypto >= 0.24.0': (set([3,6,13,14]), set(), set()),
57        'M2Crypto >= 0.24.0 and either openssl < 1.1.0 or openssl >= 1.1.0 plus the OpenSSL GOST Engine': (set([12]), set([3]), set()),
58        'M2Crypto >= 0.37.0 and openssl >= 1.1.1': (set([15,16]), set(), set()),
59}
60_logged_modules = set()
61
62_supported_algs = set()
63_supported_digest_algs = set()
64_supported_nsec3_algs = set([1])
65try:
66    from M2Crypto import EVP, RSA
67    from M2Crypto.m2 import hex_to_bn, bn_to_mpi
68except:
69    pass
70else:
71    _supported_algs.update(set([1,5,7,8,10]))
72    _supported_digest_algs.update(set([1,2,4]))
73
74GOST_PREFIX = b'\x30\x63\x30\x1c\x06\x06\x2a\x85\x03\x02\x02\x13\x30\x12\x06\x07\x2a\x85\x03\x02\x02\x23\x01\x06\x07\x2a\x85\x03\x02\x02\x1e\x01\x03\x43\x00\x04\x40'
75GOST_ENGINE_NAME = b'gost'
76GOST_DIGEST_NAME = b'GOST R 34.11-94'
77
78ED25519_PREFIX = b'\x30\x2a\x30\x05\x06\x03\x2b\x65\x70\x03\x21\x00'
79ED448_PREFIX = b'\x30\x43\x30\x05\x06\x03\x2b\x65\x71\x03\x3a\x00'
80
81# python3/python2 dual compatibility
82if not isinstance(GOST_ENGINE_NAME, str):
83    GOST_ENGINE_NAME = lb2s(GOST_ENGINE_NAME)
84    GOST_DIGEST_NAME = lb2s(GOST_DIGEST_NAME)
85
86try:
87    # available from python 3.1
88    base64encodebytes = base64.encodebytes
89except AttributeError:
90    # available until python 3.8
91    base64encodebytes = base64.encodestring
92
93EC_NOCOMPRESSION = b'\x04'
94
95
96def _init_dynamic():
97    try:
98        Engine.load_dynamic()
99    except Engine.EngineError:
100        pass
101    else:
102        atexit.register(Engine.cleanup)
103
104def _check_dsa_support():
105    try:
106        DSA.pub_key_from_params
107        _supported_algs.update((3,6))
108    except AttributeError:
109        pass
110
111def _check_gost_support():
112    _gost_init()
113    try:
114        md = EVP.MessageDigest(GOST_DIGEST_NAME)
115    except ValueError:
116        pass
117    else:
118        _supported_algs.add(12)
119        _supported_digest_algs.add(3)
120    finally:
121        _gost_cleanup()
122
123def _check_ec_support():
124    try:
125        EC.pub_key_from_params
126        _supported_algs.update((13,14))
127    except AttributeError:
128        pass
129
130def _check_ed_support():
131    if m2.OPENSSL_VERSION_NUMBER >= 0x10101000:
132        _supported_algs.update((15,16))
133
134def alg_is_supported(alg):
135    return alg in _supported_algs
136
137def digest_alg_is_supported(alg):
138    return alg in _supported_digest_algs
139
140def nsec3_alg_is_supported(alg):
141    return alg in _supported_nsec3_algs
142
143def _log_unsupported_alg(alg, alg_type):
144    for mod in _crypto_sources:
145        if alg in _crypto_sources[mod][alg_type]:
146            if mod not in _logged_modules:
147                _logged_modules.add(mod)
148                logger.warning('Warning: Without the installation of %s, cryptographic validation of DNSSEC %s %d (and possibly others) is not supported.' % (mod, ALG_TYPE_DNSSEC_TEXT[alg_type], alg))
149            return
150
151def _gost_init():
152    try:
153        gost = Engine.Engine(GOST_ENGINE_NAME)
154        gost.init()
155        gost.set_default()
156    except ValueError:
157        pass
158
159def _gost_cleanup():
160    from M2Crypto import Engine
161    try:
162        gost = Engine.Engine(GOST_ENGINE_NAME)
163    except ValueError:
164        pass
165    else:
166        gost.finish()
167
168try:
169    from M2Crypto import DSA
170except:
171    pass
172else:
173    _check_dsa_support()
174
175try:
176    from M2Crypto import Engine, m2
177    _init_dynamic()
178except:
179    pass
180else:
181    _check_gost_support()
182
183try:
184    from M2Crypto import EC
185except:
186    pass
187else:
188    _check_ec_support()
189
190try:
191    from M2Crypto.m2 import digest_verify_init
192except:
193    pass
194else:
195    _check_ed_support()
196
197def validate_ds_digest(digest_alg, digest, dnskey_msg):
198    if not digest_alg_is_supported(digest_alg):
199        _log_unsupported_alg(digest_alg, ALG_TYPE_DIGEST)
200        return None
201
202    if digest_alg == 1:
203        md = EVP.MessageDigest('sha1')
204        md.update(dnskey_msg)
205        return md.final() == digest
206    elif digest_alg == 2:
207        md = EVP.MessageDigest('sha256')
208        md.update(dnskey_msg)
209        return md.final() == digest
210    elif digest_alg == 3:
211        _gost_init()
212        try:
213            md = EVP.MessageDigest(GOST_DIGEST_NAME)
214            md.update(dnskey_msg)
215            return md.final() == digest
216        finally:
217            _gost_cleanup()
218    elif digest_alg == 4:
219        md = EVP.MessageDigest('sha384')
220        md.update(dnskey_msg)
221        return md.final() == digest
222
223def _dnskey_to_dsa(key):
224    # get T
225    t = key[0]
226    # python3/python2 dual compatibility
227    if not isinstance(t, int):
228        t = ord(t)
229    offset = 1
230
231    # get Q
232    new_offset = offset+20
233    q = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset])))
234    offset = new_offset
235
236    # get P
237    new_offset = offset+64+(t<<3)
238    p = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset])))
239    offset = new_offset
240
241    # get G
242    new_offset = offset+64+(t<<3)
243    g = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset])))
244    offset = new_offset
245
246    # get Y
247    new_offset = offset+64+(t<<3)
248    y = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:new_offset])))
249    offset = new_offset
250
251    # create the DSA public key
252    return DSA.pub_key_from_params(p,q,g,y)
253
254def _dnskey_to_rsa(key):
255    try:
256        # get the exponent length
257        e_len = key[0]
258    except IndexError:
259        return None
260    # python3/python2 dual compatibility
261    if not isinstance(e_len, int):
262        e_len = ord(e_len)
263
264    offset = 1
265    if e_len == 0:
266        e_len, = struct.unpack(b'!H',key[1:3])
267        offset = 3
268
269    # get the exponent
270    e = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:offset+e_len])))
271    offset += e_len
272
273    # get the modulus
274    n = bn_to_mpi(hex_to_bn(binascii.hexlify(key[offset:])))
275
276    # create the RSA public key
277    rsa = RSA.new_pub_key((e,n))
278    pubkey = EVP.PKey()
279    pubkey.assign_rsa(rsa)
280
281    return pubkey
282
283def _dnskey_to_gost(key):
284    der = GOST_PREFIX + key
285    pem = b'-----BEGIN PUBLIC KEY-----\n'+base64encodebytes(der)+b'-----END PUBLIC KEY-----'
286
287    return EVP.load_key_string_pubkey(pem)
288
289def _dnskey_to_ed(alg, key):
290    if alg == 15:
291        der = ED25519_PREFIX + key
292    elif alg == 16:
293        der = ED448_PREFIX + key
294    else:
295        raise ValueError('Algorithm not supported')
296
297    pem = b'-----BEGIN PUBLIC KEY-----\n'+base64encodebytes(der)+b'-----END PUBLIC KEY-----'
298    return EVP.load_key_string_pubkey(pem)
299
300def _dnskey_to_ec(alg, key):
301    if alg == 13:
302        curve = EC.NID_X9_62_prime256v1
303    elif alg == 14:
304        curve = EC.NID_secp384r1
305    else:
306        raise ValueError('Algorithm not supported')
307
308    try:
309        return EC.pub_key_from_params(curve, EC_NOCOMPRESSION + key)
310    except ValueError:
311        return None
312
313def _validate_rrsig_rsa(alg, sig, msg, key):
314    pubkey = _dnskey_to_rsa(key)
315
316    # if the key is invalid, then the signature is also invalid
317    if pubkey is None:
318        return False
319
320    if alg in (1,):
321        md='md5'
322    elif alg in (5,7):
323        md='sha1'
324    elif alg in (8,):
325        md='sha256'
326    elif alg in (10,):
327        md='sha512'
328    else:
329        raise ValueError('RSA Algorithm unknown.')
330
331    # reset context for appropriate hash
332    pubkey.reset_context(md=md)
333    pubkey.verify_init()
334    pubkey.verify_update(msg)
335
336    return pubkey.verify_final(sig) == 1
337
338def _validate_rrsig_dsa(alg, sig, msg, key):
339    pubkey = _dnskey_to_dsa(key)
340
341    # if the key is invalid, then the signature is also invalid
342    if pubkey is None:
343        return False
344
345    # get T
346    t = sig[0]
347    # python3/python2 dual compatibility
348    if not isinstance(t, int):
349        t = ord(t)
350    offset = 1
351
352    # get R
353    new_offset = offset+20
354    r = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset])))
355    offset = new_offset
356
357    # get S
358    new_offset = offset+20
359    s = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset])))
360    offset = new_offset
361
362    md = EVP.MessageDigest('sha1')
363    md.update(msg)
364    digest = md.final()
365
366    return pubkey.verify(digest, r, s) == 1
367
368def _validate_rrsig_gost(alg, sig, msg, key):
369    _gost_init()
370
371    try:
372        pubkey = _dnskey_to_gost(key)
373
374        # if the key is invalid, then the signature is also invalid
375        if pubkey is None:
376            return False
377
378        pubkey.md = m2.get_digestbyname(GOST_DIGEST_NAME)
379        pubkey.verify_init()
380        pubkey.verify_update(msg)
381
382        return pubkey.verify_final(sig) == 1
383
384    finally:
385        _gost_cleanup()
386
387def _validate_rrsig_ec(alg, sig, msg, key):
388    pubkey = _dnskey_to_ec(alg, key)
389
390    # if the key is invalid, then the signature is also invalid
391    if pubkey is None:
392        return False
393
394    if alg in (13,):
395        alg='sha256'
396        sigsize = 64
397    elif alg in (14,):
398        alg='sha384'
399        sigsize = 96
400    else:
401        raise ValueError('EC hash algorithm unknown!')
402
403    if sigsize != len(sig):
404        return False
405
406    offset = 0
407
408    # get R
409    new_offset = offset+sigsize//2
410    r = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset])))
411    offset = new_offset
412
413    # get S
414    new_offset = offset+sigsize//2
415    s = bn_to_mpi(hex_to_bn(binascii.hexlify(sig[offset:new_offset])))
416    offset = new_offset
417
418    md = EVP.MessageDigest(alg)
419    md.update(msg)
420    digest = md.final()
421
422    return pubkey.verify_dsa(digest, r, s) == 1
423
424def _validate_rrsig_ed(alg, sig, msg, key):
425    pubkey = _dnskey_to_ed(alg, key)
426
427    # if the key is invalid, then the signature is also invalid
428    if pubkey is None:
429        return False
430
431    pubkey.reset_context(None)
432    pubkey.digest_verify_init()
433    return pubkey.digest_verify(sig, msg) == 1
434
435def validate_rrsig(alg, sig, msg, key):
436    if not alg_is_supported(alg):
437        _log_unsupported_alg(alg, ALG_TYPE_DNSSEC)
438        return None
439
440    # create an RSA key object for RSA keys
441    if alg in (1,5,7,8,10):
442        return _validate_rrsig_rsa(alg, sig, msg, key)
443    elif alg in (3,6):
444        return _validate_rrsig_dsa(alg, sig, msg, key)
445    elif alg in (12,):
446        return _validate_rrsig_gost(alg, sig, msg, key)
447    elif alg in (13,14):
448        return _validate_rrsig_ec(alg, sig, msg, key)
449    elif alg in (15,16):
450        return _validate_rrsig_ed(alg, sig, msg, key)
451
452def get_digest_for_nsec3(val, salt, alg, iterations):
453    if not nsec3_alg_is_supported(alg):
454        _log_unsupported_alg(alg, ALG_TYPE_NSEC3)
455        return None
456
457    if alg == 1:
458        hash_func = hashlib.sha1
459
460    for i in range(iterations + 1):
461        val = hash_func(val + salt).digest()
462    return val
463