1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 2 3# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. 4# 5# Permission to use, copy, modify, and distribute this software and its 6# documentation for any purpose with or without fee is hereby granted, 7# provided that the above copyright notice and this permission notice 8# appear in all copies. 9# 10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 18"""DNS TSIG support.""" 19 20import base64 21import hashlib 22import hmac 23import struct 24 25import dns.exception 26import dns.rdataclass 27import dns.name 28import dns.rcode 29 30class BadTime(dns.exception.DNSException): 31 32 """The current time is not within the TSIG's validity time.""" 33 34 35class BadSignature(dns.exception.DNSException): 36 37 """The TSIG signature fails to verify.""" 38 39 40class BadKey(dns.exception.DNSException): 41 42 """The TSIG record owner name does not match the key.""" 43 44 45class BadAlgorithm(dns.exception.DNSException): 46 47 """The TSIG algorithm does not match the key.""" 48 49 50class PeerError(dns.exception.DNSException): 51 52 """Base class for all TSIG errors generated by the remote peer""" 53 54 55class PeerBadKey(PeerError): 56 57 """The peer didn't know the key we used""" 58 59 60class PeerBadSignature(PeerError): 61 62 """The peer didn't like the signature we sent""" 63 64 65class PeerBadTime(PeerError): 66 67 """The peer didn't like the time we sent""" 68 69 70class PeerBadTruncation(PeerError): 71 72 """The peer didn't like amount of truncation in the TSIG we sent""" 73 74 75# TSIG Algorithms 76 77HMAC_MD5 = dns.name.from_text("HMAC-MD5.SIG-ALG.REG.INT") 78HMAC_SHA1 = dns.name.from_text("hmac-sha1") 79HMAC_SHA224 = dns.name.from_text("hmac-sha224") 80HMAC_SHA256 = dns.name.from_text("hmac-sha256") 81HMAC_SHA256_128 = dns.name.from_text("hmac-sha256-128") 82HMAC_SHA384 = dns.name.from_text("hmac-sha384") 83HMAC_SHA384_192 = dns.name.from_text("hmac-sha384-192") 84HMAC_SHA512 = dns.name.from_text("hmac-sha512") 85HMAC_SHA512_256 = dns.name.from_text("hmac-sha512-256") 86GSS_TSIG = dns.name.from_text("gss-tsig") 87 88default_algorithm = HMAC_SHA256 89 90 91class GSSTSig: 92 """ 93 GSS-TSIG TSIG implementation. This uses the GSS-API context established 94 in the TKEY message handshake to sign messages using GSS-API message 95 integrity codes, per the RFC. 96 97 In order to avoid a direct GSSAPI dependency, the keyring holds a ref 98 to the GSSAPI object required, rather than the key itself. 99 """ 100 def __init__(self, gssapi_context): 101 self.gssapi_context = gssapi_context 102 self.data = b'' 103 self.name = 'gss-tsig' 104 105 def update(self, data): 106 self.data += data 107 108 def sign(self): 109 # defer to the GSSAPI function to sign 110 return self.gssapi_context.get_signature(self.data) 111 112 def verify(self, expected): 113 try: 114 # defer to the GSSAPI function to verify 115 return self.gssapi_context.verify_signature(self.data, expected) 116 except Exception: 117 # note the usage of a bare exception 118 raise BadSignature 119 120 121class GSSTSigAdapter: 122 def __init__(self, keyring): 123 self.keyring = keyring 124 125 def __call__(self, message, keyname): 126 if keyname in self.keyring: 127 key = self.keyring[keyname] 128 if isinstance(key, Key) and key.algorithm == GSS_TSIG: 129 if message: 130 GSSTSigAdapter.parse_tkey_and_step(key, message, keyname) 131 return key 132 else: 133 return None 134 135 @classmethod 136 def parse_tkey_and_step(cls, key, message, keyname): 137 # if the message is a TKEY type, absorb the key material 138 # into the context using step(); this is used to allow the 139 # client to complete the GSSAPI negotiation before attempting 140 # to verify the signed response to a TKEY message exchange 141 try: 142 rrset = message.find_rrset(message.answer, keyname, 143 dns.rdataclass.ANY, 144 dns.rdatatype.TKEY) 145 if rrset: 146 token = rrset[0].key 147 gssapi_context = key.secret 148 return gssapi_context.step(token) 149 except KeyError: 150 pass 151 152 153class HMACTSig: 154 """ 155 HMAC TSIG implementation. This uses the HMAC python module to handle the 156 sign/verify operations. 157 """ 158 159 _hashes = { 160 HMAC_SHA1: hashlib.sha1, 161 HMAC_SHA224: hashlib.sha224, 162 HMAC_SHA256: hashlib.sha256, 163 HMAC_SHA256_128: (hashlib.sha256, 128), 164 HMAC_SHA384: hashlib.sha384, 165 HMAC_SHA384_192: (hashlib.sha384, 192), 166 HMAC_SHA512: hashlib.sha512, 167 HMAC_SHA512_256: (hashlib.sha512, 256), 168 HMAC_MD5: hashlib.md5, 169 } 170 171 def __init__(self, key, algorithm): 172 try: 173 hashinfo = self._hashes[algorithm] 174 except KeyError: 175 raise NotImplementedError(f"TSIG algorithm {algorithm} " + 176 "is not supported") 177 178 # create the HMAC context 179 if isinstance(hashinfo, tuple): 180 self.hmac_context = hmac.new(key, digestmod=hashinfo[0]) 181 self.size = hashinfo[1] 182 else: 183 self.hmac_context = hmac.new(key, digestmod=hashinfo) 184 self.size = None 185 self.name = self.hmac_context.name 186 if self.size: 187 self.name += f'-{self.size}' 188 189 def update(self, data): 190 return self.hmac_context.update(data) 191 192 def sign(self): 193 # defer to the HMAC digest() function for that digestmod 194 digest = self.hmac_context.digest() 195 if self.size: 196 digest = digest[: (self.size // 8)] 197 return digest 198 199 def verify(self, expected): 200 # re-digest and compare the results 201 mac = self.sign() 202 if not hmac.compare_digest(mac, expected): 203 raise BadSignature 204 205 206def _digest(wire, key, rdata, time=None, request_mac=None, ctx=None, 207 multi=None): 208 """Return a context containing the TSIG rdata for the input parameters 209 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object 210 @raises ValueError: I{other_data} is too long 211 @raises NotImplementedError: I{algorithm} is not supported 212 """ 213 214 first = not (ctx and multi) 215 if first: 216 ctx = get_context(key) 217 if request_mac: 218 ctx.update(struct.pack('!H', len(request_mac))) 219 ctx.update(request_mac) 220 ctx.update(struct.pack('!H', rdata.original_id)) 221 ctx.update(wire[2:]) 222 if first: 223 ctx.update(key.name.to_digestable()) 224 ctx.update(struct.pack('!H', dns.rdataclass.ANY)) 225 ctx.update(struct.pack('!I', 0)) 226 if time is None: 227 time = rdata.time_signed 228 upper_time = (time >> 32) & 0xffff 229 lower_time = time & 0xffffffff 230 time_encoded = struct.pack('!HIH', upper_time, lower_time, rdata.fudge) 231 other_len = len(rdata.other) 232 if other_len > 65535: 233 raise ValueError('TSIG Other Data is > 65535 bytes') 234 if first: 235 ctx.update(key.algorithm.to_digestable() + time_encoded) 236 ctx.update(struct.pack('!HH', rdata.error, other_len) + rdata.other) 237 else: 238 ctx.update(time_encoded) 239 return ctx 240 241 242def _maybe_start_digest(key, mac, multi): 243 """If this is the first message in a multi-message sequence, 244 start a new context. 245 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object 246 """ 247 if multi: 248 ctx = get_context(key) 249 ctx.update(struct.pack('!H', len(mac))) 250 ctx.update(mac) 251 return ctx 252 else: 253 return None 254 255 256def sign(wire, key, rdata, time=None, request_mac=None, ctx=None, multi=False): 257 """Return a (tsig_rdata, mac, ctx) tuple containing the HMAC TSIG rdata 258 for the input parameters, the HMAC MAC calculated by applying the 259 TSIG signature algorithm, and the TSIG digest context. 260 @rtype: (string, dns.tsig.HMACTSig or dns.tsig.GSSTSig object) 261 @raises ValueError: I{other_data} is too long 262 @raises NotImplementedError: I{algorithm} is not supported 263 """ 264 265 ctx = _digest(wire, key, rdata, time, request_mac, ctx, multi) 266 mac = ctx.sign() 267 tsig = rdata.replace(time_signed=time, mac=mac) 268 269 return (tsig, _maybe_start_digest(key, mac, multi)) 270 271 272def validate(wire, key, owner, rdata, now, request_mac, tsig_start, ctx=None, 273 multi=False): 274 """Validate the specified TSIG rdata against the other input parameters. 275 276 @raises FormError: The TSIG is badly formed. 277 @raises BadTime: There is too much time skew between the client and the 278 server. 279 @raises BadSignature: The TSIG signature did not validate 280 @rtype: dns.tsig.HMACTSig or dns.tsig.GSSTSig object""" 281 282 (adcount,) = struct.unpack("!H", wire[10:12]) 283 if adcount == 0: 284 raise dns.exception.FormError 285 adcount -= 1 286 new_wire = wire[0:10] + struct.pack("!H", adcount) + wire[12:tsig_start] 287 if rdata.error != 0: 288 if rdata.error == dns.rcode.BADSIG: 289 raise PeerBadSignature 290 elif rdata.error == dns.rcode.BADKEY: 291 raise PeerBadKey 292 elif rdata.error == dns.rcode.BADTIME: 293 raise PeerBadTime 294 elif rdata.error == dns.rcode.BADTRUNC: 295 raise PeerBadTruncation 296 else: 297 raise PeerError('unknown TSIG error code %d' % rdata.error) 298 if abs(rdata.time_signed - now) > rdata.fudge: 299 raise BadTime 300 if key.name != owner: 301 raise BadKey 302 if key.algorithm != rdata.algorithm: 303 raise BadAlgorithm 304 ctx = _digest(new_wire, key, rdata, None, request_mac, ctx, multi) 305 ctx.verify(rdata.mac) 306 return _maybe_start_digest(key, rdata.mac, multi) 307 308 309def get_context(key): 310 """Returns an HMAC context for the specified key. 311 312 @rtype: HMAC context 313 @raises NotImplementedError: I{algorithm} is not supported 314 """ 315 316 if key.algorithm == GSS_TSIG: 317 return GSSTSig(key.secret) 318 else: 319 return HMACTSig(key.secret, key.algorithm) 320 321 322class Key: 323 def __init__(self, name, secret, algorithm=default_algorithm): 324 if isinstance(name, str): 325 name = dns.name.from_text(name) 326 self.name = name 327 if isinstance(secret, str): 328 secret = base64.decodebytes(secret.encode()) 329 self.secret = secret 330 if isinstance(algorithm, str): 331 algorithm = dns.name.from_text(algorithm) 332 self.algorithm = algorithm 333 334 def __eq__(self, other): 335 return (isinstance(other, Key) and 336 self.name == other.name and 337 self.secret == other.secret and 338 self.algorithm == other.algorithm) 339 340 def __repr__(self): 341 return f"<DNS key name='{self.name}', " + \ 342 f"algorithm='{self.algorithm}', " + \ 343 f"secret='{base64.b64encode(self.secret).decode()}'>" 344