1# Copyright (C) Dnspython Contributors, see LICENSE for text of ISC license 2 3# Copyright (C) 2001-2017 Nominum, Inc. 4# 5# Permission to use, copy, modify, and distribute this software and its 6# documentation for any purpose with or without fee is hereby granted, 7# provided that the above copyright notice and this permission notice 8# appear in all copies. 9# 10# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 11# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 12# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 13# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 14# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 15# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 16# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 17 18"""DNS rdata.""" 19 20from io import BytesIO 21import base64 22import binascii 23 24import dns.exception 25import dns.name 26import dns.rdataclass 27import dns.rdatatype 28import dns.tokenizer 29import dns.wiredata 30from ._compat import xrange, string_types, text_type 31 32try: 33 import threading as _threading 34except ImportError: 35 import dummy_threading as _threading 36 37_hex_chunksize = 32 38 39 40def _hexify(data, chunksize=_hex_chunksize): 41 """Convert a binary string into its hex encoding, broken up into chunks 42 of chunksize characters separated by a space. 43 """ 44 45 line = binascii.hexlify(data) 46 return b' '.join([line[i:i + chunksize] 47 for i 48 in range(0, len(line), chunksize)]).decode() 49 50_base64_chunksize = 32 51 52 53def _base64ify(data, chunksize=_base64_chunksize): 54 """Convert a binary string into its base64 encoding, broken up into chunks 55 of chunksize characters separated by a space. 56 """ 57 58 line = base64.b64encode(data) 59 return b' '.join([line[i:i + chunksize] 60 for i 61 in range(0, len(line), chunksize)]).decode() 62 63__escaped = bytearray(b'"\\') 64 65def _escapify(qstring): 66 """Escape the characters in a quoted string which need it.""" 67 68 if isinstance(qstring, text_type): 69 qstring = qstring.encode() 70 if not isinstance(qstring, bytearray): 71 qstring = bytearray(qstring) 72 73 text = '' 74 for c in qstring: 75 if c in __escaped: 76 text += '\\' + chr(c) 77 elif c >= 0x20 and c < 0x7F: 78 text += chr(c) 79 else: 80 text += '\\%03d' % c 81 return text 82 83 84def _truncate_bitmap(what): 85 """Determine the index of greatest byte that isn't all zeros, and 86 return the bitmap that contains all the bytes less than that index. 87 """ 88 89 for i in xrange(len(what) - 1, -1, -1): 90 if what[i] != 0: 91 return what[0: i + 1] 92 return what[0:1] 93 94 95class Rdata(object): 96 """Base class for all DNS rdata types.""" 97 98 __slots__ = ['rdclass', 'rdtype'] 99 100 def __init__(self, rdclass, rdtype): 101 """Initialize an rdata. 102 103 *rdclass*, an ``int`` is the rdataclass of the Rdata. 104 *rdtype*, an ``int`` is the rdatatype of the Rdata. 105 """ 106 107 self.rdclass = rdclass 108 self.rdtype = rdtype 109 110 def covers(self): 111 """Return the type a Rdata covers. 112 113 DNS SIG/RRSIG rdatas apply to a specific type; this type is 114 returned by the covers() function. If the rdata type is not 115 SIG or RRSIG, dns.rdatatype.NONE is returned. This is useful when 116 creating rdatasets, allowing the rdataset to contain only RRSIGs 117 of a particular type, e.g. RRSIG(NS). 118 119 Returns an ``int``. 120 """ 121 122 return dns.rdatatype.NONE 123 124 def extended_rdatatype(self): 125 """Return a 32-bit type value, the least significant 16 bits of 126 which are the ordinary DNS type, and the upper 16 bits of which are 127 the "covered" type, if any. 128 129 Returns an ``int``. 130 """ 131 132 return self.covers() << 16 | self.rdtype 133 134 def to_text(self, origin=None, relativize=True, **kw): 135 """Convert an rdata to text format. 136 137 Returns a ``text``. 138 """ 139 140 raise NotImplementedError 141 142 def to_wire(self, file, compress=None, origin=None): 143 """Convert an rdata to wire format. 144 145 Returns a ``binary``. 146 """ 147 148 raise NotImplementedError 149 150 def to_digestable(self, origin=None): 151 """Convert rdata to a format suitable for digesting in hashes. This 152 is also the DNSSEC canonical form. 153 154 Returns a ``binary``. 155 """ 156 157 f = BytesIO() 158 self.to_wire(f, None, origin) 159 return f.getvalue() 160 161 def validate(self): 162 """Check that the current contents of the rdata's fields are 163 valid. 164 165 If you change an rdata by assigning to its fields, 166 it is a good idea to call validate() when you are done making 167 changes. 168 169 Raises various exceptions if there are problems. 170 171 Returns ``None``. 172 """ 173 174 dns.rdata.from_text(self.rdclass, self.rdtype, self.to_text()) 175 176 def __repr__(self): 177 covers = self.covers() 178 if covers == dns.rdatatype.NONE: 179 ctext = '' 180 else: 181 ctext = '(' + dns.rdatatype.to_text(covers) + ')' 182 return '<DNS ' + dns.rdataclass.to_text(self.rdclass) + ' ' + \ 183 dns.rdatatype.to_text(self.rdtype) + ctext + ' rdata: ' + \ 184 str(self) + '>' 185 186 def __str__(self): 187 return self.to_text() 188 189 def _cmp(self, other): 190 """Compare an rdata with another rdata of the same rdtype and 191 rdclass. 192 193 Return < 0 if self < other in the DNSSEC ordering, 0 if self 194 == other, and > 0 if self > other. 195 196 """ 197 our = self.to_digestable(dns.name.root) 198 their = other.to_digestable(dns.name.root) 199 if our == their: 200 return 0 201 elif our > their: 202 return 1 203 else: 204 return -1 205 206 def __eq__(self, other): 207 if not isinstance(other, Rdata): 208 return False 209 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 210 return False 211 return self._cmp(other) == 0 212 213 def __ne__(self, other): 214 if not isinstance(other, Rdata): 215 return True 216 if self.rdclass != other.rdclass or self.rdtype != other.rdtype: 217 return True 218 return self._cmp(other) != 0 219 220 def __lt__(self, other): 221 if not isinstance(other, Rdata) or \ 222 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 223 224 return NotImplemented 225 return self._cmp(other) < 0 226 227 def __le__(self, other): 228 if not isinstance(other, Rdata) or \ 229 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 230 return NotImplemented 231 return self._cmp(other) <= 0 232 233 def __ge__(self, other): 234 if not isinstance(other, Rdata) or \ 235 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 236 return NotImplemented 237 return self._cmp(other) >= 0 238 239 def __gt__(self, other): 240 if not isinstance(other, Rdata) or \ 241 self.rdclass != other.rdclass or self.rdtype != other.rdtype: 242 return NotImplemented 243 return self._cmp(other) > 0 244 245 def __hash__(self): 246 return hash(self.to_digestable(dns.name.root)) 247 248 @classmethod 249 def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True): 250 raise NotImplementedError 251 252 @classmethod 253 def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None): 254 raise NotImplementedError 255 256 def choose_relativity(self, origin=None, relativize=True): 257 """Convert any domain names in the rdata to the specified 258 relativization. 259 """ 260 261class GenericRdata(Rdata): 262 263 """Generic Rdata Class 264 265 This class is used for rdata types for which we have no better 266 implementation. It implements the DNS "unknown RRs" scheme. 267 """ 268 269 __slots__ = ['data'] 270 271 def __init__(self, rdclass, rdtype, data): 272 super(GenericRdata, self).__init__(rdclass, rdtype) 273 self.data = data 274 275 def to_text(self, origin=None, relativize=True, **kw): 276 return r'\# %d ' % len(self.data) + _hexify(self.data) 277 278 @classmethod 279 def from_text(cls, rdclass, rdtype, tok, origin=None, relativize=True): 280 token = tok.get() 281 if not token.is_identifier() or token.value != r'\#': 282 raise dns.exception.SyntaxError( 283 r'generic rdata does not start with \#') 284 length = tok.get_int() 285 chunks = [] 286 while 1: 287 token = tok.get() 288 if token.is_eol_or_eof(): 289 break 290 chunks.append(token.value.encode()) 291 hex = b''.join(chunks) 292 data = binascii.unhexlify(hex) 293 if len(data) != length: 294 raise dns.exception.SyntaxError( 295 'generic rdata hex data has wrong length') 296 return cls(rdclass, rdtype, data) 297 298 def to_wire(self, file, compress=None, origin=None): 299 file.write(self.data) 300 301 @classmethod 302 def from_wire(cls, rdclass, rdtype, wire, current, rdlen, origin=None): 303 return cls(rdclass, rdtype, wire[current: current + rdlen]) 304 305_rdata_modules = {} 306_module_prefix = 'dns.rdtypes' 307_import_lock = _threading.Lock() 308 309def get_rdata_class(rdclass, rdtype): 310 311 def import_module(name): 312 with _import_lock: 313 mod = __import__(name) 314 components = name.split('.') 315 for comp in components[1:]: 316 mod = getattr(mod, comp) 317 return mod 318 319 mod = _rdata_modules.get((rdclass, rdtype)) 320 rdclass_text = dns.rdataclass.to_text(rdclass) 321 rdtype_text = dns.rdatatype.to_text(rdtype) 322 rdtype_text = rdtype_text.replace('-', '_') 323 if not mod: 324 mod = _rdata_modules.get((dns.rdatatype.ANY, rdtype)) 325 if not mod: 326 try: 327 mod = import_module('.'.join([_module_prefix, 328 rdclass_text, rdtype_text])) 329 _rdata_modules[(rdclass, rdtype)] = mod 330 except ImportError: 331 try: 332 mod = import_module('.'.join([_module_prefix, 333 'ANY', rdtype_text])) 334 _rdata_modules[(dns.rdataclass.ANY, rdtype)] = mod 335 except ImportError: 336 mod = None 337 if mod: 338 cls = getattr(mod, rdtype_text) 339 else: 340 cls = GenericRdata 341 return cls 342 343 344def from_text(rdclass, rdtype, tok, origin=None, relativize=True): 345 """Build an rdata object from text format. 346 347 This function attempts to dynamically load a class which 348 implements the specified rdata class and type. If there is no 349 class-and-type-specific implementation, the GenericRdata class 350 is used. 351 352 Once a class is chosen, its from_text() class method is called 353 with the parameters to this function. 354 355 If *tok* is a ``text``, then a tokenizer is created and the string 356 is used as its input. 357 358 *rdclass*, an ``int``, the rdataclass. 359 360 *rdtype*, an ``int``, the rdatatype. 361 362 *tok*, a ``dns.tokenizer.Tokenizer`` or a ``text``. 363 364 *origin*, a ``dns.name.Name`` (or ``None``), the 365 origin to use for relative names. 366 367 *relativize*, a ``bool``. If true, name will be relativized to 368 the specified origin. 369 370 Returns an instance of the chosen Rdata subclass. 371 """ 372 373 if isinstance(tok, string_types): 374 tok = dns.tokenizer.Tokenizer(tok) 375 cls = get_rdata_class(rdclass, rdtype) 376 if cls != GenericRdata: 377 # peek at first token 378 token = tok.get() 379 tok.unget(token) 380 if token.is_identifier() and \ 381 token.value == r'\#': 382 # 383 # Known type using the generic syntax. Extract the 384 # wire form from the generic syntax, and then run 385 # from_wire on it. 386 # 387 rdata = GenericRdata.from_text(rdclass, rdtype, tok, origin, 388 relativize) 389 return from_wire(rdclass, rdtype, rdata.data, 0, len(rdata.data), 390 origin) 391 return cls.from_text(rdclass, rdtype, tok, origin, relativize) 392 393 394def from_wire(rdclass, rdtype, wire, current, rdlen, origin=None): 395 """Build an rdata object from wire format 396 397 This function attempts to dynamically load a class which 398 implements the specified rdata class and type. If there is no 399 class-and-type-specific implementation, the GenericRdata class 400 is used. 401 402 Once a class is chosen, its from_wire() class method is called 403 with the parameters to this function. 404 405 *rdclass*, an ``int``, the rdataclass. 406 407 *rdtype*, an ``int``, the rdatatype. 408 409 *wire*, a ``binary``, the wire-format message. 410 411 *current*, an ``int``, the offset in wire of the beginning of 412 the rdata. 413 414 *rdlen*, an ``int``, the length of the wire-format rdata 415 416 *origin*, a ``dns.name.Name`` (or ``None``). If not ``None``, 417 then names will be relativized to this origin. 418 419 Returns an instance of the chosen Rdata subclass. 420 """ 421 422 wire = dns.wiredata.maybe_wrap(wire) 423 cls = get_rdata_class(rdclass, rdtype) 424 return cls.from_wire(rdclass, rdtype, wire, current, rdlen, origin) 425 426 427class RdatatypeExists(dns.exception.DNSException): 428 """DNS rdatatype already exists.""" 429 supp_kwargs = {'rdclass', 'rdtype'} 430 fmt = "The rdata type with class {rdclass} and rdtype {rdtype} " + \ 431 "already exists." 432 433 434def register_type(implementation, rdtype, rdtype_text, is_singleton=False, 435 rdclass=dns.rdataclass.IN): 436 """Dynamically register a module to handle an rdatatype. 437 438 *implementation*, a module implementing the type in the usual dnspython 439 way. 440 441 *rdtype*, an ``int``, the rdatatype to register. 442 443 *rdtype_text*, a ``text``, the textual form of the rdatatype. 444 445 *is_singleton*, a ``bool``, indicating if the type is a singleton (i.e. 446 RRsets of the type can have only one member.) 447 448 *rdclass*, the rdataclass of the type, or ``dns.rdataclass.ANY`` if 449 it applies to all classes. 450 """ 451 452 existing_cls = get_rdata_class(rdclass, rdtype) 453 if existing_cls != GenericRdata: 454 raise RdatatypeExists(rdclass=rdclass, rdtype=rdtype) 455 _rdata_modules[(rdclass, rdtype)] = implementation 456 dns.rdatatype.register_type(rdtype, rdtype_text, is_singleton) 457