1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license
2
3# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc.
4#
5# Permission to use, copy, modify, and distribute this software and its
6# documentation for any purpose with or without fee is hereby granted,
7# provided that the above copyright notice and this permission notice
8# appear in all copies.
9#
10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES
11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF
12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR
13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES
14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN
15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT
16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE.
17
18"""DNS TSIG support."""
19
20import base64
21import hashlib
22import hmac
23import struct
24
25import dns.exception
26import dns.rdataclass
27import dns.name
28import dns.rcode
29
30class BadTime(dns.exception.DNSException):
31
32    """The current time is not within the TSIG's validity time."""
33
34
35class BadSignature(dns.exception.DNSException):
36
37    """The TSIG signature fails to verify."""
38
39
40class BadKey(dns.exception.DNSException):
41
42    """The TSIG record owner name does not match the key."""
43
44
45class BadAlgorithm(dns.exception.DNSException):
46
47    """The TSIG algorithm does not match the key."""
48
49
50class PeerError(dns.exception.DNSException):
51
52    """Base class for all TSIG errors generated by the remote peer"""
53
54
55class PeerBadKey(PeerError):
56
57    """The peer didn't know the key we used"""
58
59
60class PeerBadSignature(PeerError):
61
62    """The peer didn't like the signature we sent"""
63
64
65class PeerBadTime(PeerError):
66
67    """The peer didn't like the time we sent"""
68
69
70class PeerBadTruncation(PeerError):
71
72    """The peer didn't like amount of truncation in the TSIG we sent"""
73
74
75# TSIG Algorithms
76
77HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT")
78HMAC_SHA1 = dns.name.from_text("hmac-sha1")
79HMAC_SHA224 = dns.name.from_text("hmac-sha224")
80HMAC_SHA256 = dns.name.from_text("hmac-sha256")
81HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128")
82HMAC_SHA384 = dns.name.from_text("hmac-sha384")
83HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192")
84HMAC_SHA512 = dns.name.from_text("hmac-sha512")
85HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256")
86GSS_TSIG = dns.name.from_text("gss-tsig")
87
88default_algorithm = HMAC_SHA256
89
90
91class GSSTSig:
92    """
93    GSS-TSIG TSIG implementation.  This uses the GSS-API context established
94    in the TKEY message handshake to sign messages using GSS-API message
95    integrity codes, per the RFC.
96
97    In order to avoid a direct GSSAPI dependency, the keyring holds a ref
98    to the GSSAPI object required, rather than the key itself.
99    """
100    def __init__(self, gssapi_context):
101        self.gssapi_context = gssapi_context
102        self.data = b''
103        self.name = 'gss-tsig'
104
105    def update(self, data):
106        self.data += data
107
108    def sign(self):
109        # defer to the GSSAPI function to sign
110        return self.gssapi_context.get_signature(self.data)
111
112    def verify(self, expected):
113        try:
114            # defer to the GSSAPI function to verify
115            return self.gssapi_context.verify_signature(self.data, expected)
116        except Exception:
117            # note the usage of a bare exception
118            raise BadSignature
119
120
121class GSSTSigAdapter:
122    def __init__(self, keyring):
123        self.keyring = keyring
124
125    def __call__(self, message, keyname):
126        if keyname in self.keyring:
127            key = self.keyring[keyname]
128            if isinstance(key, Key) and key.algorithm == GSS_TSIG:
129                if message:
130                    GSSTSigAdapter.parse_tkey_and_step(key, message, keyname)
131            return key
132        else:
133            return None
134
135    @classmethod
136    def parse_tkey_and_step(cls, key, message, keyname):
137        # if the message is a TKEY type, absorb the key material
138        # into the context using step(); this is used to allow the
139        # client to complete the GSSAPI negotiation before attempting
140        # to verify the signed response to a TKEY message exchange
141        try:
142            rrset = message.find_rrset(message.answer, keyname,
143                                       dns.rdataclass.ANY,
144                                       dns.rdatatype.TKEY)
145            if rrset:
146                token = rrset[0].key
147                gssapi_context = key.secret
148                return gssapi_context.step(token)
149        except KeyError:
150            pass
151
152
153class HMACTSig:
154    """
155    HMAC TSIG implementation.  This uses the HMAC python module to handle the
156    sign/verify operations.
157    """
158
159    _hashes = {
160        HMAC_SHA1: hashlib.sha1,
161        HMAC_SHA224: hashlib.sha224,
162        HMAC_SHA256: hashlib.sha256,
163        HMAC_SHA256_128: (hashlib.sha256, 128),
164        HMAC_SHA384: hashlib.sha384,
165        HMAC_SHA384_192: (hashlib.sha384, 192),
166        HMAC_SHA512: hashlib.sha512,
167        HMAC_SHA512_256: (hashlib.sha512, 256),
168        HMAC_MD5: hashlib.md5,
169    }
170
171    def __init__(self, key, algorithm):
172        try:
173            hashinfo = self._hashes[algorithm]
174        except KeyError:
175            raise NotImplementedError(f"TSIG algorithm {algorithm} " +
176                                      "is not supported")
177
178        # create the HMAC context
179        if isinstance(hashinfo, tuple):
180            self.hmac_context = hmac.new(key, digestmod=hashinfo[0])
181            self.size = hashinfo[1]
182        else:
183            self.hmac_context = hmac.new(key, digestmod=hashinfo)
184            self.size = None
185        self.name = self.hmac_context.name
186        if self.size:
187            self.name += f'-{self.size}'
188
189    def update(self, data):
190        return self.hmac_context.update(data)
191
192    def sign(self):
193        # defer to the HMAC digest() function for that digestmod
194        digest = self.hmac_context.digest()
195        if self.size:
196            digest = digest[: (self.size // 8)]
197        return digest
198
199    def verify(self, expected):
200        # re-digest and compare the results
201        mac = self.sign()
202        if not hmac.compare_digest(mac, expected):
203            raise BadSignature
204
205
206def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None,
207            multi=None):
208    """Return a context containing the TSIG rdata for the input parameters
209    @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object
210    @raises ValueError: I{other_data} is too long
211    @raises NotImplementedError: I{algorithm} is not supported
212    """
213
214    first = not (ctx and multi)
215    if first:
216        ctx = get_context(key)
217        if request_mac:
218            ctx.update(struct.pack('!H', len(request_mac)))
219            ctx.update(request_mac)
220    ctx.update(struct.pack('!H', rdata.original_id))
221    ctx.update(wire[2:])
222    if first:
223        ctx.update(key.name.to_digestable())
224        ctx.update(struct.pack('!H', dns.rdataclass.ANY))
225        ctx.update(struct.pack('!I', 0))
226    if time is None:
227        time = rdata.time_signed
228    upper_time = (time >> 32) & 0xffff
229    lower_time = time & 0xffffffff
230    time_encoded = struct.pack('!HIH', upper_time, lower_time, rdata.fudge)
231    other_len = len(rdata.other)
232    if other_len > 65535:
233        raise ValueError('TSIG Other Data is > 65535 bytes')
234    if first:
235        ctx.update(key.algorithm.to_digestable() + time_encoded)
236        ctx.update(struct.pack('!HH', rdata.error, other_len) + rdata.other)
237    else:
238        ctx.update(time_encoded)
239    return ctx
240
241
242def _maybe_start_digest(key, mac, multi):
243    """If this is the first message in a multi-message sequence,
244    start a new context.
245    @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object
246    """
247    if multi:
248        ctx = get_context(key)
249        ctx.update(struct.pack('!H', len(mac)))
250        ctx.update(mac)
251        return ctx
252    else:
253        return None
254
255
256def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False):
257    """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata
258    for the input parameters, the HMAC MAC calculated by applying the
259    TSIG signature algorithm, and the TSIG digest context.
260    @rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object)
261    @raises ValueError: I{other_data} is too long
262    @raises NotImplementedError: I{algorithm} is not supported
263    """
264
265    ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi)
266    mac = ctx.sign()
267    tsig = rdata.replace(time_signed=time, mac=mac)
268
269    return (tsig, _maybe_start_digest(key, mac, multi))
270
271
272def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None,
273             multi=False):
274    """Validate the specified TSIG rdata against the other input parameters.
275
276    @raises FormError: The TSIG is badly formed.
277    @raises BadTime: There is too much time skew between the client and the
278    server.
279    @raises BadSignature: The TSIG signature did not validate
280    @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object"""
281
282    (adcount,) = struct.unpack("!H", wire[10:12])
283    if adcount == 0:
284        raise dns.exception.FormError
285    adcount -= 1
286    new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start]
287    if rdata.error != 0:
288        if rdata.error == dns.rcode.BADSIG:
289            raise PeerBadSignature
290        elif rdata.error == dns.rcode.BADKEY:
291            raise PeerBadKey
292        elif rdata.error == dns.rcode.BADTIME:
293            raise PeerBadTime
294        elif rdata.error == dns.rcode.BADTRUNC:
295            raise PeerBadTruncation
296        else:
297            raise PeerError('unknown TSIG error code %d' % rdata.error)
298    if abs(rdata.time_signed - now) > rdata.fudge:
299        raise BadTime
300    if key.name != owner:
301        raise BadKey
302    if key.algorithm != rdata.algorithm:
303        raise BadAlgorithm
304    ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi)
305    ctx.verify(rdata.mac)
306    return _maybe_start_digest(key, rdata.mac, multi)
307
308
309def get_context(key):
310    """Returns an HMAC context for the specified key.
311
312    @rtype: HMAC context
313    @raises NotImplementedError: I{algorithm} is not supported
314    """
315
316    if key.algorithm == GSS_TSIG:
317        return GSSTSig(key.secret)
318    else:
319        return HMACTSig(key.secret, key.algorithm)
320
321
322class Key:
323    def __init__(self, name, secret, algorithm=default_algorithm):
324        if isinstance(name, str):
325            name = dns.name.from_text(name)
326        self.name = name
327        if isinstance(secret, str):
328            secret = base64.decodebytes(secret.encode())
329        self.secret = secret
330        if isinstance(algorithm, str):
331            algorithm = dns.name.from_text(algorithm)
332        self.algorithm = algorithm
333
334    def __eq__(self, other):
335        return (isinstance(other, Key) and
336                self.name == other.name and
337                self.secret == other.secret and
338                self.algorithm == other.algorithm)
339
340    def __repr__(self):
341        return f"<DNS key name='{self.name}', " + \
342               f"algorithm='{self.algorithm}', " + \
343               f"secret='{base64.b64encode(self.secret).decode()}'>"
344