1# Copyright (C) 2003-2007, 2009, 2011 Nominum, Inc.
2#
3# Permission to use, copy, modify, and distribute this software and its
4# documentation for any purpose with or without fee is hereby granted,
5# provided that the above copyright notice and this permission notice
6# appear in all copies.
7#
8# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
9# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
10# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
11# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
12# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
13# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
14# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
15
16"""Common DNSSEC-related functions and constants."""
17
18from io import BytesIO
19import struct
20import time
21
22import dns.exception
23import dns.hash
24import dns.name
25import dns.node
26import dns.rdataset
27import dns.rdata
28import dns.rdatatype
29import dns.rdataclass
30from ._compat import string_types
31
32
33class UnsupportedAlgorithm(dns.exception.DNSException):
34
35    """The DNSSEC algorithm is not supported."""
36
37
38class ValidationFailure(dns.exception.DNSException):
39
40    """The DNSSEC signature is invalid."""
41
42RSAMD5 = 1
43DH = 2
44DSA = 3
45ECC = 4
46RSASHA1 = 5
47DSANSEC3SHA1 = 6
48RSASHA1NSEC3SHA1 = 7
49RSASHA256 = 8
50RSASHA512 = 10
51ECDSAP256SHA256 = 13
52ECDSAP384SHA384 = 14
53INDIRECT = 252
54PRIVATEDNS = 253
55PRIVATEOID = 254
56
57_algorithm_by_text = {
58    'RSAMD5': RSAMD5,
59    'DH': DH,
60    'DSA': DSA,
61    'ECC': ECC,
62    'RSASHA1': RSASHA1,
63    'DSANSEC3SHA1': DSANSEC3SHA1,
64    'RSASHA1NSEC3SHA1': RSASHA1NSEC3SHA1,
65    'RSASHA256': RSASHA256,
66    'RSASHA512': RSASHA512,
67    'INDIRECT': INDIRECT,
68    'ECDSAP256SHA256': ECDSAP256SHA256,
69    'ECDSAP384SHA384': ECDSAP384SHA384,
70    'PRIVATEDNS': PRIVATEDNS,
71    'PRIVATEOID': PRIVATEOID,
72}
73
74# We construct the inverse mapping programmatically to ensure that we
75# cannot make any mistakes (e.g. omissions, cut-and-paste errors) that
76# would cause the mapping not to be true inverse.
77
78_algorithm_by_value = dict((y, x) for x, y in _algorithm_by_text.items())
79
80
81def algorithm_from_text(text):
82    """Convert text into a DNSSEC algorithm value
83    @rtype: int"""
84
85    value = _algorithm_by_text.get(text.upper())
86    if value is None:
87        value = int(text)
88    return value
89
90
91def algorithm_to_text(value):
92    """Convert a DNSSEC algorithm value to text
93    @rtype: string"""
94
95    text = _algorithm_by_value.get(value)
96    if text is None:
97        text = str(value)
98    return text
99
100
101def _to_rdata(record, origin):
102    s = BytesIO()
103    record.to_wire(s, origin=origin)
104    return s.getvalue()
105
106
107def key_id(key, origin=None):
108    rdata = _to_rdata(key, origin)
109    rdata = bytearray(rdata)
110    if key.algorithm == RSAMD5:
111        return (rdata[-3] << 8) + rdata[-2]
112    else:
113        total = 0
114        for i in range(len(rdata) // 2):
115            total += (rdata[2 * i] << 8) + \
116                rdata[2 * i + 1]
117        if len(rdata) % 2 != 0:
118            total += rdata[len(rdata) - 1] << 8
119        total += ((total >> 16) & 0xffff)
120        return total & 0xffff
121
122
123def make_ds(name, key, algorithm, origin=None):
124    if algorithm.upper() == 'SHA1':
125        dsalg = 1
126        hash = dns.hash.hashes['SHA1']()
127    elif algorithm.upper() == 'SHA256':
128        dsalg = 2
129        hash = dns.hash.hashes['SHA256']()
130    else:
131        raise UnsupportedAlgorithm('unsupported algorithm "%s"' % algorithm)
132
133    if isinstance(name, string_types):
134        name = dns.name.from_text(name, origin)
135    hash.update(name.canonicalize().to_wire())
136    hash.update(_to_rdata(key, origin))
137    digest = hash.digest()
138
139    dsrdata = struct.pack("!HBB", key_id(key), key.algorithm, dsalg) + digest
140    return dns.rdata.from_wire(dns.rdataclass.IN, dns.rdatatype.DS, dsrdata, 0,
141                               len(dsrdata))
142
143
144def _find_candidate_keys(keys, rrsig):
145    candidate_keys = []
146    value = keys.get(rrsig.signer)
147    if value is None:
148        return None
149    if isinstance(value, dns.node.Node):
150        try:
151            rdataset = value.find_rdataset(dns.rdataclass.IN,
152                                           dns.rdatatype.DNSKEY)
153        except KeyError:
154            return None
155    else:
156        rdataset = value
157    for rdata in rdataset:
158        if rdata.algorithm == rrsig.algorithm and \
159                key_id(rdata) == rrsig.key_tag:
160            candidate_keys.append(rdata)
161    return candidate_keys
162
163
164def _is_rsa(algorithm):
165    return algorithm in (RSAMD5, RSASHA1,
166                         RSASHA1NSEC3SHA1, RSASHA256,
167                         RSASHA512)
168
169
170def _is_dsa(algorithm):
171    return algorithm in (DSA, DSANSEC3SHA1)
172
173
174def _is_ecdsa(algorithm):
175    return _have_ecdsa and (algorithm in (ECDSAP256SHA256, ECDSAP384SHA384))
176
177
178def _is_md5(algorithm):
179    return algorithm == RSAMD5
180
181
182def _is_sha1(algorithm):
183    return algorithm in (DSA, RSASHA1,
184                         DSANSEC3SHA1, RSASHA1NSEC3SHA1)
185
186
187def _is_sha256(algorithm):
188    return algorithm in (RSASHA256, ECDSAP256SHA256)
189
190
191def _is_sha384(algorithm):
192    return algorithm == ECDSAP384SHA384
193
194
195def _is_sha512(algorithm):
196    return algorithm == RSASHA512
197
198
199def _make_hash(algorithm):
200    if _is_md5(algorithm):
201        return dns.hash.hashes['MD5']()
202    if _is_sha1(algorithm):
203        return dns.hash.hashes['SHA1']()
204    if _is_sha256(algorithm):
205        return dns.hash.hashes['SHA256']()
206    if _is_sha384(algorithm):
207        return dns.hash.hashes['SHA384']()
208    if _is_sha512(algorithm):
209        return dns.hash.hashes['SHA512']()
210    raise ValidationFailure('unknown hash for algorithm %u' % algorithm)
211
212
213def _make_algorithm_id(algorithm):
214    if _is_md5(algorithm):
215        oid = [0x2a, 0x86, 0x48, 0x86, 0xf7, 0x0d, 0x02, 0x05]
216    elif _is_sha1(algorithm):
217        oid = [0x2b, 0x0e, 0x03, 0x02, 0x1a]
218    elif _is_sha256(algorithm):
219        oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x01]
220    elif _is_sha512(algorithm):
221        oid = [0x60, 0x86, 0x48, 0x01, 0x65, 0x03, 0x04, 0x02, 0x03]
222    else:
223        raise ValidationFailure('unknown algorithm %u' % algorithm)
224    olen = len(oid)
225    dlen = _make_hash(algorithm).digest_size
226    idbytes = [0x30] + [8 + olen + dlen] + \
227              [0x30, olen + 4] + [0x06, olen] + oid + \
228              [0x05, 0x00] + [0x04, dlen]
229    return struct.pack('!%dB' % len(idbytes), *idbytes)
230
231
232def _validate_rrsig(rrset, rrsig, keys, origin=None, now=None):
233    """Validate an RRset against a single signature rdata
234
235    The owner name of the rrsig is assumed to be the same as the owner name
236    of the rrset.
237
238    @param rrset: The RRset to validate
239    @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
240    tuple
241    @param rrsig: The signature rdata
242    @type rrsig: dns.rrset.Rdata
243    @param keys: The key dictionary.
244    @type keys: a dictionary keyed by dns.name.Name with node or rdataset
245    values
246    @param origin: The origin to use for relative names
247    @type origin: dns.name.Name or None
248    @param now: The time to use when validating the signatures.  The default
249    is the current time.
250    @type now: int
251    """
252
253    if isinstance(origin, string_types):
254        origin = dns.name.from_text(origin, dns.name.root)
255
256    for candidate_key in _find_candidate_keys(keys, rrsig):
257        if not candidate_key:
258            raise ValidationFailure('unknown key')
259
260        # For convenience, allow the rrset to be specified as a (name,
261        # rdataset) tuple as well as a proper rrset
262        if isinstance(rrset, tuple):
263            rrname = rrset[0]
264            rdataset = rrset[1]
265        else:
266            rrname = rrset.name
267            rdataset = rrset
268
269        if now is None:
270            now = time.time()
271        if rrsig.expiration < now:
272            raise ValidationFailure('expired')
273        if rrsig.inception > now:
274            raise ValidationFailure('not yet valid')
275
276        hash = _make_hash(rrsig.algorithm)
277
278        if _is_rsa(rrsig.algorithm):
279            keyptr = candidate_key.key
280            (bytes_,) = struct.unpack('!B', keyptr[0:1])
281            keyptr = keyptr[1:]
282            if bytes_ == 0:
283                (bytes_,) = struct.unpack('!H', keyptr[0:2])
284                keyptr = keyptr[2:]
285            rsa_e = keyptr[0:bytes_]
286            rsa_n = keyptr[bytes_:]
287            keylen = len(rsa_n) * 8
288            pubkey = Crypto.PublicKey.RSA.construct(
289                (Crypto.Util.number.bytes_to_long(rsa_n),
290                 Crypto.Util.number.bytes_to_long(rsa_e)))
291            sig = (Crypto.Util.number.bytes_to_long(rrsig.signature),)
292        elif _is_dsa(rrsig.algorithm):
293            keyptr = candidate_key.key
294            (t,) = struct.unpack('!B', keyptr[0:1])
295            keyptr = keyptr[1:]
296            octets = 64 + t * 8
297            dsa_q = keyptr[0:20]
298            keyptr = keyptr[20:]
299            dsa_p = keyptr[0:octets]
300            keyptr = keyptr[octets:]
301            dsa_g = keyptr[0:octets]
302            keyptr = keyptr[octets:]
303            dsa_y = keyptr[0:octets]
304            pubkey = Crypto.PublicKey.DSA.construct(
305                (Crypto.Util.number.bytes_to_long(dsa_y),
306                 Crypto.Util.number.bytes_to_long(dsa_g),
307                 Crypto.Util.number.bytes_to_long(dsa_p),
308                 Crypto.Util.number.bytes_to_long(dsa_q)))
309            (dsa_r, dsa_s) = struct.unpack('!20s20s', rrsig.signature[1:])
310            sig = (Crypto.Util.number.bytes_to_long(dsa_r),
311                   Crypto.Util.number.bytes_to_long(dsa_s))
312        elif _is_ecdsa(rrsig.algorithm):
313            if rrsig.algorithm == ECDSAP256SHA256:
314                curve = ecdsa.curves.NIST256p
315                key_len = 32
316            elif rrsig.algorithm == ECDSAP384SHA384:
317                curve = ecdsa.curves.NIST384p
318                key_len = 48
319            else:
320                # shouldn't happen
321                raise ValidationFailure('unknown ECDSA curve')
322            keyptr = candidate_key.key
323            x = Crypto.Util.number.bytes_to_long(keyptr[0:key_len])
324            y = Crypto.Util.number.bytes_to_long(keyptr[key_len:key_len * 2])
325            assert ecdsa.ecdsa.point_is_valid(curve.generator, x, y)
326            point = ecdsa.ellipticcurve.Point(curve.curve, x, y, curve.order)
327            verifying_key = ecdsa.keys.VerifyingKey.from_public_point(point,
328                                                                      curve)
329            pubkey = ECKeyWrapper(verifying_key, key_len)
330            r = rrsig.signature[:key_len]
331            s = rrsig.signature[key_len:]
332            sig = ecdsa.ecdsa.Signature(Crypto.Util.number.bytes_to_long(r),
333                                        Crypto.Util.number.bytes_to_long(s))
334        else:
335            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
336
337        hash.update(_to_rdata(rrsig, origin)[:18])
338        hash.update(rrsig.signer.to_digestable(origin))
339
340        if rrsig.labels < len(rrname) - 1:
341            suffix = rrname.split(rrsig.labels + 1)[1]
342            rrname = dns.name.from_text('*', suffix)
343        rrnamebuf = rrname.to_digestable(origin)
344        rrfixed = struct.pack('!HHI', rdataset.rdtype, rdataset.rdclass,
345                              rrsig.original_ttl)
346        rrlist = sorted(rdataset)
347        for rr in rrlist:
348            hash.update(rrnamebuf)
349            hash.update(rrfixed)
350            rrdata = rr.to_digestable(origin)
351            rrlen = struct.pack('!H', len(rrdata))
352            hash.update(rrlen)
353            hash.update(rrdata)
354
355        digest = hash.digest()
356
357        if _is_rsa(rrsig.algorithm):
358            # PKCS1 algorithm identifier goop
359            digest = _make_algorithm_id(rrsig.algorithm) + digest
360            padlen = keylen // 8 - len(digest) - 3
361            digest = struct.pack('!%dB' % (2 + padlen + 1),
362                                 *([0, 1] + [0xFF] * padlen + [0])) + digest
363        elif _is_dsa(rrsig.algorithm) or _is_ecdsa(rrsig.algorithm):
364            pass
365        else:
366            # Raise here for code clarity; this won't actually ever happen
367            # since if the algorithm is really unknown we'd already have
368            # raised an exception above
369            raise ValidationFailure('unknown algorithm %u' % rrsig.algorithm)
370
371        if pubkey.verify(digest, sig):
372            return
373    raise ValidationFailure('verify failure')
374
375
376def _validate(rrset, rrsigset, keys, origin=None, now=None):
377    """Validate an RRset
378
379    @param rrset: The RRset to validate
380    @type rrset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
381    tuple
382    @param rrsigset: The signature RRset
383    @type rrsigset: dns.rrset.RRset or (dns.name.Name, dns.rdataset.Rdataset)
384    tuple
385    @param keys: The key dictionary.
386    @type keys: a dictionary keyed by dns.name.Name with node or rdataset
387    values
388    @param origin: The origin to use for relative names
389    @type origin: dns.name.Name or None
390    @param now: The time to use when validating the signatures.  The default
391    is the current time.
392    @type now: int
393    """
394
395    if isinstance(origin, string_types):
396        origin = dns.name.from_text(origin, dns.name.root)
397
398    if isinstance(rrset, tuple):
399        rrname = rrset[0]
400    else:
401        rrname = rrset.name
402
403    if isinstance(rrsigset, tuple):
404        rrsigname = rrsigset[0]
405        rrsigrdataset = rrsigset[1]
406    else:
407        rrsigname = rrsigset.name
408        rrsigrdataset = rrsigset
409
410    rrname = rrname.choose_relativity(origin)
411    rrsigname = rrname.choose_relativity(origin)
412    if rrname != rrsigname:
413        raise ValidationFailure("owner names do not match")
414
415    for rrsig in rrsigrdataset:
416        try:
417            _validate_rrsig(rrset, rrsig, keys, origin, now)
418            return
419        except ValidationFailure:
420            pass
421    raise ValidationFailure("no RRSIGs validated")
422
423
424def _need_pycrypto(*args, **kwargs):
425    raise NotImplementedError("DNSSEC validation requires pycrypto")
426
427try:
428    import Crypto.PublicKey.RSA
429    import Crypto.PublicKey.DSA
430    import Crypto.Util.number
431    validate = _validate
432    validate_rrsig = _validate_rrsig
433    _have_pycrypto = True
434except ImportError:
435    validate = _need_pycrypto
436    validate_rrsig = _need_pycrypto
437    _have_pycrypto = False
438
439try:
440    import ecdsa
441    import ecdsa.ecdsa
442    import ecdsa.ellipticcurve
443    import ecdsa.keys
444    _have_ecdsa = True
445
446    class ECKeyWrapper(object):
447
448        def __init__(self, key, key_len):
449            self.key = key
450            self.key_len = key_len
451
452        def verify(self, digest, sig):
453            diglong = Crypto.Util.number.bytes_to_long(digest)
454            return self.key.pubkey.verifies(diglong, sig)
455
456except ImportError:
457    _have_ecdsa = False
458