1# Copyright (C) 2001-2007, 2009-2011 Nominum, Inc. 2# 3# Permission to use, copy, modify, and distribute this software and its 4# documentation for any purpose with or without fee is hereby granted, 5# provided that the above copyright notice and this permission notice 6# appear in all copies. 7# 8# THE SOFTWARE IS PROVIDED "AS IS" AND NOMINUM DISCLAIMS ALL WARRANTIES 9# WITH REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF 10# MERCHANTABILITY AND FITNESS. IN NO EVENT SHALL NOMINUM BE LIABLE FOR 11# ANY SPECIAL, DIRECT, INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES 12# WHATSOEVER RESULTING FROM LOSS OF USE, DATA OR PROFITS, WHETHER IN AN 13# ACTION OF CONTRACT, NEGLIGENCE OR OTHER TORTIOUS ACTION, ARISING OUT 14# OF OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. 15 16"""DNS Names. 17 18@var root: The DNS root name. 19@type root: dns.name.Name object 20@var empty: The empty DNS name. 21@type empty: dns.name.Name object 22""" 23 24from io import BytesIO 25import struct 26import sys 27import copy 28import encodings.idna 29try: 30 import idna 31 have_idna_2008 = True 32except ImportError: 33 have_idna_2008 = False 34 35import dns.exception 36import dns.wiredata 37 38from ._compat import long, binary_type, text_type, unichr, maybe_decode 39 40try: 41 maxint = sys.maxint 42except AttributeError: 43 maxint = (1 << (8 * struct.calcsize("P"))) // 2 - 1 44 45NAMERELN_NONE = 0 46NAMERELN_SUPERDOMAIN = 1 47NAMERELN_SUBDOMAIN = 2 48NAMERELN_EQUAL = 3 49NAMERELN_COMMONANCESTOR = 4 50 51 52class EmptyLabel(dns.exception.SyntaxError): 53 54 """A DNS label is empty.""" 55 56 57class BadEscape(dns.exception.SyntaxError): 58 59 """An escaped code in a text format of DNS name is invalid.""" 60 61 62class BadPointer(dns.exception.FormError): 63 64 """A DNS compression pointer points forward instead of backward.""" 65 66 67class BadLabelType(dns.exception.FormError): 68 69 """The label type in DNS name wire format is unknown.""" 70 71 72class NeedAbsoluteNameOrOrigin(dns.exception.DNSException): 73 74 """An attempt was made to convert a non-absolute name to 75 wire when there was also a non-absolute (or missing) origin.""" 76 77 78class NameTooLong(dns.exception.FormError): 79 80 """A DNS name is > 255 octets long.""" 81 82 83class LabelTooLong(dns.exception.SyntaxError): 84 85 """A DNS label is > 63 octets long.""" 86 87 88class AbsoluteConcatenation(dns.exception.DNSException): 89 90 """An attempt was made to append anything other than the 91 empty name to an absolute DNS name.""" 92 93 94class NoParent(dns.exception.DNSException): 95 96 """An attempt was made to get the parent of the root name 97 or the empty name.""" 98 99class NoIDNA2008(dns.exception.DNSException): 100 101 """IDNA 2008 processing was requested but the idna module is not 102 available.""" 103 104 105class IDNAException(dns.exception.DNSException): 106 107 """IDNA processing raised an exception.""" 108 109 supp_kwargs = set(['idna_exception']) 110 fmt = "IDNA processing exception: {idna_exception}" 111 112class IDNACodec(object): 113 114 """Abstract base class for IDNA encoder/decoders.""" 115 116 def __init__(self): 117 pass 118 119 def encode(self, label): 120 raise NotImplementedError 121 122 def decode(self, label): 123 # We do not apply any IDNA policy on decode; we just 124 downcased = label.lower() 125 if downcased.startswith(b'xn--'): 126 try: 127 label = downcased[4:].decode('punycode') 128 except Exception as e: 129 raise IDNAException(idna_exception=e) 130 else: 131 label = maybe_decode(label) 132 return _escapify(label, True) 133 134class IDNA2003Codec(IDNACodec): 135 136 """IDNA 2003 encoder/decoder.""" 137 138 def __init__(self, strict_decode=False): 139 """Initialize the IDNA 2003 encoder/decoder. 140 @param strict_decode: If True, then IDNA2003 checking is done when 141 decoding. This can cause failures if the name was encoded with 142 IDNA2008. The default is False. 143 @type strict_decode: bool 144 """ 145 super(IDNA2003Codec, self).__init__() 146 self.strict_decode = strict_decode 147 148 def encode(self, label): 149 if label == '': 150 return b'' 151 try: 152 return encodings.idna.ToASCII(label) 153 except UnicodeError: 154 raise LabelTooLong 155 156 def decode(self, label): 157 if not self.strict_decode: 158 return super(IDNA2003Codec, self).decode(label) 159 if label == b'': 160 return u'' 161 try: 162 return _escapify(encodings.idna.ToUnicode(label), True) 163 except Exception as e: 164 raise IDNAException(idna_exception=e) 165 166class IDNA2008Codec(IDNACodec): 167 168 """IDNA 2008 encoder/decoder.""" 169 170 def __init__(self, uts_46=False, transitional=False, 171 allow_pure_ascii=False, strict_decode=False): 172 """Initialize the IDNA 2008 encoder/decoder. 173 @param uts_46: If True, apply Unicode IDNA compatibility processing 174 as described in Unicode Technical Standard #46 175 (U{http://unicode.org/reports/tr46/}). This parameter is only 176 meaningful if IDNA 2008 is in use. If False, do not apply 177 the mapping. The default is False 178 @type uts_46: bool 179 @param transitional: If True, use the "transitional" mode described 180 in Unicode Technical Standard #46. This parameter is only 181 meaningful if IDNA 2008 is in use. The default is False. 182 @type transitional: bool 183 @param allow_pure_ascii: If True, then a label which 184 consists of only ASCII characters is allowed. This is less strict 185 than regular IDNA 2008, but is also necessary for mixed names, 186 e.g. a name with starting with "_sip._tcp." and ending in an IDN 187 suffixm which would otherwise be disallowed. The default is False 188 @type allow_pure_ascii: bool 189 @param strict_decode: If True, then IDNA2008 checking is done when 190 decoding. This can cause failures if the name was encoded with 191 IDNA2003. The default is False. 192 @type strict_decode: bool 193 """ 194 super(IDNA2008Codec, self).__init__() 195 self.uts_46 = uts_46 196 self.transitional = transitional 197 self.allow_pure_ascii = allow_pure_ascii 198 self.strict_decode = strict_decode 199 200 def is_all_ascii(self, label): 201 for c in label: 202 if ord(c) > 0x7f: 203 return False 204 return True 205 206 def encode(self, label): 207 if label == '': 208 return b'' 209 if self.allow_pure_ascii and self.is_all_ascii(label): 210 return label.encode('ascii') 211 if not have_idna_2008: 212 raise NoIDNA2008 213 try: 214 if self.uts_46: 215 label = idna.uts46_remap(label, False, self.transitional) 216 return idna.alabel(label) 217 except idna.IDNAError as e: 218 raise IDNAException(idna_exception=e) 219 220 def decode(self, label): 221 if not self.strict_decode: 222 return super(IDNA2008Codec, self).decode(label) 223 if label == b'': 224 return u'' 225 if not have_idna_2008: 226 raise NoIDNA2008 227 try: 228 if self.uts_46: 229 label = idna.uts46_remap(label, False, False) 230 return _escapify(idna.ulabel(label), True) 231 except idna.IDNAError as e: 232 raise IDNAException(idna_exception=e) 233 234_escaped = bytearray(b'"().;\\@$') 235 236IDNA_2003_Practical = IDNA2003Codec(False) 237IDNA_2003_Strict = IDNA2003Codec(True) 238IDNA_2003 = IDNA_2003_Practical 239IDNA_2008_Practical = IDNA2008Codec(True, False, True, False) 240IDNA_2008_UTS_46 = IDNA2008Codec(True, False, False, False) 241IDNA_2008_Strict = IDNA2008Codec(False, False, False, True) 242IDNA_2008_Transitional = IDNA2008Codec(True, True, False, False) 243IDNA_2008 = IDNA_2008_Practical 244 245def _escapify(label, unicode_mode=False): 246 """Escape the characters in label which need it. 247 @param unicode_mode: escapify only special and whitespace (<= 0x20) 248 characters 249 @returns: the escaped string 250 @rtype: string""" 251 if not unicode_mode: 252 text = '' 253 if isinstance(label, text_type): 254 label = label.encode() 255 for c in bytearray(label): 256 if c in _escaped: 257 text += '\\' + chr(c) 258 elif c > 0x20 and c < 0x7F: 259 text += chr(c) 260 else: 261 text += '\\%03d' % c 262 return text.encode() 263 264 text = u'' 265 if isinstance(label, binary_type): 266 label = label.decode() 267 for c in label: 268 if c > u'\x20' and c < u'\x7f': 269 text += c 270 else: 271 if c >= u'\x7f': 272 text += c 273 else: 274 text += u'\\%03d' % ord(c) 275 return text 276 277def _validate_labels(labels): 278 """Check for empty labels in the middle of a label sequence, 279 labels that are too long, and for too many labels. 280 @raises NameTooLong: the name as a whole is too long 281 @raises EmptyLabel: a label is empty (i.e. the root label) and appears 282 in a position other than the end of the label sequence""" 283 284 l = len(labels) 285 total = 0 286 i = -1 287 j = 0 288 for label in labels: 289 ll = len(label) 290 total += ll + 1 291 if ll > 63: 292 raise LabelTooLong 293 if i < 0 and label == b'': 294 i = j 295 j += 1 296 if total > 255: 297 raise NameTooLong 298 if i >= 0 and i != l - 1: 299 raise EmptyLabel 300 301 302def _ensure_bytes(label): 303 if isinstance(label, binary_type): 304 return label 305 if isinstance(label, text_type): 306 return label.encode() 307 raise ValueError 308 309 310class Name(object): 311 312 """A DNS name. 313 314 The dns.name.Name class represents a DNS name as a tuple of labels. 315 Instances of the class are immutable. 316 317 @ivar labels: The tuple of labels in the name. Each label is a string of 318 up to 63 octets.""" 319 320 __slots__ = ['labels'] 321 322 def __init__(self, labels): 323 """Initialize a domain name from a list of labels. 324 @param labels: the labels 325 @type labels: any iterable whose values are strings 326 """ 327 labels = [_ensure_bytes(x) for x in labels] 328 super(Name, self).__setattr__('labels', tuple(labels)) 329 _validate_labels(self.labels) 330 331 def __setattr__(self, name, value): 332 raise TypeError("object doesn't support attribute assignment") 333 334 def __copy__(self): 335 return Name(self.labels) 336 337 def __deepcopy__(self, memo): 338 return Name(copy.deepcopy(self.labels, memo)) 339 340 def __getstate__(self): 341 return {'labels': self.labels} 342 343 def __setstate__(self, state): 344 super(Name, self).__setattr__('labels', state['labels']) 345 _validate_labels(self.labels) 346 347 def is_absolute(self): 348 """Is the most significant label of this name the root label? 349 @rtype: bool 350 """ 351 352 return len(self.labels) > 0 and self.labels[-1] == b'' 353 354 def is_wild(self): 355 """Is this name wild? (I.e. Is the least significant label '*'?) 356 @rtype: bool 357 """ 358 359 return len(self.labels) > 0 and self.labels[0] == b'*' 360 361 def __hash__(self): 362 """Return a case-insensitive hash of the name. 363 @rtype: int 364 """ 365 366 h = long(0) 367 for label in self.labels: 368 for c in bytearray(label.lower()): 369 h += (h << 3) + c 370 return int(h % maxint) 371 372 def fullcompare(self, other): 373 """Compare two names, returning a 3-tuple (relation, order, nlabels). 374 375 I{relation} describes the relation ship between the names, 376 and is one of: dns.name.NAMERELN_NONE, 377 dns.name.NAMERELN_SUPERDOMAIN, dns.name.NAMERELN_SUBDOMAIN, 378 dns.name.NAMERELN_EQUAL, or dns.name.NAMERELN_COMMONANCESTOR 379 380 I{order} is < 0 if self < other, > 0 if self > other, and == 381 0 if self == other. A relative name is always less than an 382 absolute name. If both names have the same relativity, then 383 the DNSSEC order relation is used to order them. 384 385 I{nlabels} is the number of significant labels that the two names 386 have in common. 387 """ 388 389 sabs = self.is_absolute() 390 oabs = other.is_absolute() 391 if sabs != oabs: 392 if sabs: 393 return (NAMERELN_NONE, 1, 0) 394 else: 395 return (NAMERELN_NONE, -1, 0) 396 l1 = len(self.labels) 397 l2 = len(other.labels) 398 ldiff = l1 - l2 399 if ldiff < 0: 400 l = l1 401 else: 402 l = l2 403 404 order = 0 405 nlabels = 0 406 namereln = NAMERELN_NONE 407 while l > 0: 408 l -= 1 409 l1 -= 1 410 l2 -= 1 411 label1 = self.labels[l1].lower() 412 label2 = other.labels[l2].lower() 413 if label1 < label2: 414 order = -1 415 if nlabels > 0: 416 namereln = NAMERELN_COMMONANCESTOR 417 return (namereln, order, nlabels) 418 elif label1 > label2: 419 order = 1 420 if nlabels > 0: 421 namereln = NAMERELN_COMMONANCESTOR 422 return (namereln, order, nlabels) 423 nlabels += 1 424 order = ldiff 425 if ldiff < 0: 426 namereln = NAMERELN_SUPERDOMAIN 427 elif ldiff > 0: 428 namereln = NAMERELN_SUBDOMAIN 429 else: 430 namereln = NAMERELN_EQUAL 431 return (namereln, order, nlabels) 432 433 def is_subdomain(self, other): 434 """Is self a subdomain of other? 435 436 The notion of subdomain includes equality. 437 @rtype: bool 438 """ 439 440 (nr, o, nl) = self.fullcompare(other) 441 if nr == NAMERELN_SUBDOMAIN or nr == NAMERELN_EQUAL: 442 return True 443 return False 444 445 def is_superdomain(self, other): 446 """Is self a superdomain of other? 447 448 The notion of subdomain includes equality. 449 @rtype: bool 450 """ 451 452 (nr, o, nl) = self.fullcompare(other) 453 if nr == NAMERELN_SUPERDOMAIN or nr == NAMERELN_EQUAL: 454 return True 455 return False 456 457 def canonicalize(self): 458 """Return a name which is equal to the current name, but is in 459 DNSSEC canonical form. 460 @rtype: dns.name.Name object 461 """ 462 463 return Name([x.lower() for x in self.labels]) 464 465 def __eq__(self, other): 466 if isinstance(other, Name): 467 return self.fullcompare(other)[1] == 0 468 else: 469 return False 470 471 def __ne__(self, other): 472 if isinstance(other, Name): 473 return self.fullcompare(other)[1] != 0 474 else: 475 return True 476 477 def __lt__(self, other): 478 if isinstance(other, Name): 479 return self.fullcompare(other)[1] < 0 480 else: 481 return NotImplemented 482 483 def __le__(self, other): 484 if isinstance(other, Name): 485 return self.fullcompare(other)[1] <= 0 486 else: 487 return NotImplemented 488 489 def __ge__(self, other): 490 if isinstance(other, Name): 491 return self.fullcompare(other)[1] >= 0 492 else: 493 return NotImplemented 494 495 def __gt__(self, other): 496 if isinstance(other, Name): 497 return self.fullcompare(other)[1] > 0 498 else: 499 return NotImplemented 500 501 def __repr__(self): 502 return '<DNS name ' + self.__str__() + '>' 503 504 def __str__(self): 505 return self.to_text(False) 506 507 def to_text(self, omit_final_dot=False): 508 """Convert name to text format. 509 @param omit_final_dot: If True, don't emit the final dot (denoting the 510 root label) for absolute names. The default is False. 511 @rtype: string 512 """ 513 514 if len(self.labels) == 0: 515 return maybe_decode(b'@') 516 if len(self.labels) == 1 and self.labels[0] == b'': 517 return maybe_decode(b'.') 518 if omit_final_dot and self.is_absolute(): 519 l = self.labels[:-1] 520 else: 521 l = self.labels 522 s = b'.'.join(map(_escapify, l)) 523 return maybe_decode(s) 524 525 def to_unicode(self, omit_final_dot=False, idna_codec=None): 526 """Convert name to Unicode text format. 527 528 IDN ACE labels are converted to Unicode. 529 530 @param omit_final_dot: If True, don't emit the final dot (denoting the 531 root label) for absolute names. The default is False. 532 @type omit_final_dot: bool 533 @param idna_codec: IDNA encoder/decoder. If None, the 534 IDNA_2003_Practical encoder/decoder is used. The IDNA_2003_Practical 535 decoder does not impose any policy, it just decodes punycode, so if 536 you don't want checking for compliance, you can use this decoder for 537 IDNA2008 as well. 538 @type idna_codec: dns.name.IDNA 539 @rtype: string 540 """ 541 542 if len(self.labels) == 0: 543 return u'@' 544 if len(self.labels) == 1 and self.labels[0] == b'': 545 return u'.' 546 if omit_final_dot and self.is_absolute(): 547 l = self.labels[:-1] 548 else: 549 l = self.labels 550 if idna_codec is None: 551 idna_codec = IDNA_2003_Practical 552 return u'.'.join([idna_codec.decode(x) for x in l]) 553 554 def to_digestable(self, origin=None): 555 """Convert name to a format suitable for digesting in hashes. 556 557 The name is canonicalized and converted to uncompressed wire format. 558 559 @param origin: If the name is relative and origin is not None, then 560 origin will be appended to it. 561 @type origin: dns.name.Name object 562 @raises NeedAbsoluteNameOrOrigin: All names in wire format are 563 absolute. If self is a relative name, then an origin must be supplied; 564 if it is missing, then this exception is raised 565 @rtype: string 566 """ 567 568 if not self.is_absolute(): 569 if origin is None or not origin.is_absolute(): 570 raise NeedAbsoluteNameOrOrigin 571 labels = list(self.labels) 572 labels.extend(list(origin.labels)) 573 else: 574 labels = self.labels 575 dlabels = [struct.pack('!B%ds' % len(x), len(x), x.lower()) 576 for x in labels] 577 return b''.join(dlabels) 578 579 def to_wire(self, file=None, compress=None, origin=None): 580 """Convert name to wire format, possibly compressing it. 581 582 @param file: the file where the name is emitted (typically 583 a BytesIO file). If None, a string containing the wire name 584 will be returned. 585 @type file: file or None 586 @param compress: The compression table. If None (the default) names 587 will not be compressed. 588 @type compress: dict 589 @param origin: If the name is relative and origin is not None, then 590 origin will be appended to it. 591 @type origin: dns.name.Name object 592 @raises NeedAbsoluteNameOrOrigin: All names in wire format are 593 absolute. If self is a relative name, then an origin must be supplied; 594 if it is missing, then this exception is raised 595 """ 596 597 if file is None: 598 file = BytesIO() 599 want_return = True 600 else: 601 want_return = False 602 603 if not self.is_absolute(): 604 if origin is None or not origin.is_absolute(): 605 raise NeedAbsoluteNameOrOrigin 606 labels = list(self.labels) 607 labels.extend(list(origin.labels)) 608 else: 609 labels = self.labels 610 i = 0 611 for label in labels: 612 n = Name(labels[i:]) 613 i += 1 614 if compress is not None: 615 pos = compress.get(n) 616 else: 617 pos = None 618 if pos is not None: 619 value = 0xc000 + pos 620 s = struct.pack('!H', value) 621 file.write(s) 622 break 623 else: 624 if compress is not None and len(n) > 1: 625 pos = file.tell() 626 if pos <= 0x3fff: 627 compress[n] = pos 628 l = len(label) 629 file.write(struct.pack('!B', l)) 630 if l > 0: 631 file.write(label) 632 if want_return: 633 return file.getvalue() 634 635 def __len__(self): 636 """The length of the name (in labels). 637 @rtype: int 638 """ 639 640 return len(self.labels) 641 642 def __getitem__(self, index): 643 return self.labels[index] 644 645 def __add__(self, other): 646 return self.concatenate(other) 647 648 def __sub__(self, other): 649 return self.relativize(other) 650 651 def split(self, depth): 652 """Split a name into a prefix and suffix at depth. 653 654 @param depth: the number of labels in the suffix 655 @type depth: int 656 @raises ValueError: the depth was not >= 0 and <= the length of the 657 name. 658 @returns: the tuple (prefix, suffix) 659 @rtype: tuple 660 """ 661 662 l = len(self.labels) 663 if depth == 0: 664 return (self, dns.name.empty) 665 elif depth == l: 666 return (dns.name.empty, self) 667 elif depth < 0 or depth > l: 668 raise ValueError( 669 'depth must be >= 0 and <= the length of the name') 670 return (Name(self[: -depth]), Name(self[-depth:])) 671 672 def concatenate(self, other): 673 """Return a new name which is the concatenation of self and other. 674 @rtype: dns.name.Name object 675 @raises AbsoluteConcatenation: self is absolute and other is 676 not the empty name 677 """ 678 679 if self.is_absolute() and len(other) > 0: 680 raise AbsoluteConcatenation 681 labels = list(self.labels) 682 labels.extend(list(other.labels)) 683 return Name(labels) 684 685 def relativize(self, origin): 686 """If self is a subdomain of origin, return a new name which is self 687 relative to origin. Otherwise return self. 688 @rtype: dns.name.Name object 689 """ 690 691 if origin is not None and self.is_subdomain(origin): 692 return Name(self[: -len(origin)]) 693 else: 694 return self 695 696 def derelativize(self, origin): 697 """If self is a relative name, return a new name which is the 698 concatenation of self and origin. Otherwise return self. 699 @rtype: dns.name.Name object 700 """ 701 702 if not self.is_absolute(): 703 return self.concatenate(origin) 704 else: 705 return self 706 707 def choose_relativity(self, origin=None, relativize=True): 708 """Return a name with the relativity desired by the caller. If 709 origin is None, then self is returned. Otherwise, if 710 relativize is true the name is relativized, and if relativize is 711 false the name is derelativized. 712 @rtype: dns.name.Name object 713 """ 714 715 if origin: 716 if relativize: 717 return self.relativize(origin) 718 else: 719 return self.derelativize(origin) 720 else: 721 return self 722 723 def parent(self): 724 """Return the parent of the name. 725 @rtype: dns.name.Name object 726 @raises NoParent: the name is either the root name or the empty name, 727 and thus has no parent. 728 """ 729 if self == root or self == empty: 730 raise NoParent 731 return Name(self.labels[1:]) 732 733root = Name([b'']) 734empty = Name([]) 735 736 737def from_unicode(text, origin=root, idna_codec=None): 738 """Convert unicode text into a Name object. 739 740 Labels are encoded in IDN ACE form. 741 742 @param text: The text to convert into a name. 743 @type text: Unicode string 744 @param origin: The origin to append to non-absolute names. 745 @type origin: dns.name.Name 746 @param idna_codec: IDNA encoder/decoder. If None, the default IDNA 2003 747 encoder/decoder is used. 748 @type idna_codec: dns.name.IDNA 749 @rtype: dns.name.Name object 750 """ 751 752 if not isinstance(text, text_type): 753 raise ValueError("input to from_unicode() must be a unicode string") 754 if not (origin is None or isinstance(origin, Name)): 755 raise ValueError("origin must be a Name or None") 756 labels = [] 757 label = u'' 758 escaping = False 759 edigits = 0 760 total = 0 761 if idna_codec is None: 762 idna_codec = IDNA_2003 763 if text == u'@': 764 text = u'' 765 if text: 766 if text == u'.': 767 return Name([b'']) # no Unicode "u" on this constant! 768 for c in text: 769 if escaping: 770 if edigits == 0: 771 if c.isdigit(): 772 total = int(c) 773 edigits += 1 774 else: 775 label += c 776 escaping = False 777 else: 778 if not c.isdigit(): 779 raise BadEscape 780 total *= 10 781 total += int(c) 782 edigits += 1 783 if edigits == 3: 784 escaping = False 785 label += unichr(total) 786 elif c in [u'.', u'\u3002', u'\uff0e', u'\uff61']: 787 if len(label) == 0: 788 raise EmptyLabel 789 labels.append(idna_codec.encode(label)) 790 label = u'' 791 elif c == u'\\': 792 escaping = True 793 edigits = 0 794 total = 0 795 else: 796 label += c 797 if escaping: 798 raise BadEscape 799 if len(label) > 0: 800 labels.append(idna_codec.encode(label)) 801 else: 802 labels.append(b'') 803 804 if (len(labels) == 0 or labels[-1] != b'') and origin is not None: 805 labels.extend(list(origin.labels)) 806 return Name(labels) 807 808 809def from_text(text, origin=root, idna_codec=None): 810 """Convert text into a Name object. 811 812 @param text: The text to convert into a name. 813 @type text: string 814 @param origin: The origin to append to non-absolute names. 815 @type origin: dns.name.Name 816 @param idna_codec: IDNA encoder/decoder. If None, the default IDNA 2003 817 encoder/decoder is used. 818 @type idna_codec: dns.name.IDNA 819 @rtype: dns.name.Name object 820 """ 821 822 if isinstance(text, text_type): 823 return from_unicode(text, origin, idna_codec) 824 if not isinstance(text, binary_type): 825 raise ValueError("input to from_text() must be a string") 826 if not (origin is None or isinstance(origin, Name)): 827 raise ValueError("origin must be a Name or None") 828 labels = [] 829 label = b'' 830 escaping = False 831 edigits = 0 832 total = 0 833 if text == b'@': 834 text = b'' 835 if text: 836 if text == b'.': 837 return Name([b'']) 838 for c in bytearray(text): 839 byte_ = struct.pack('!B', c) 840 if escaping: 841 if edigits == 0: 842 if byte_.isdigit(): 843 total = int(byte_) 844 edigits += 1 845 else: 846 label += byte_ 847 escaping = False 848 else: 849 if not byte_.isdigit(): 850 raise BadEscape 851 total *= 10 852 total += int(byte_) 853 edigits += 1 854 if edigits == 3: 855 escaping = False 856 label += struct.pack('!B', total) 857 elif byte_ == b'.': 858 if len(label) == 0: 859 raise EmptyLabel 860 labels.append(label) 861 label = b'' 862 elif byte_ == b'\\': 863 escaping = True 864 edigits = 0 865 total = 0 866 else: 867 label += byte_ 868 if escaping: 869 raise BadEscape 870 if len(label) > 0: 871 labels.append(label) 872 else: 873 labels.append(b'') 874 if (len(labels) == 0 or labels[-1] != b'') and origin is not None: 875 labels.extend(list(origin.labels)) 876 return Name(labels) 877 878 879def from_wire(message, current): 880 """Convert possibly compressed wire format into a Name. 881 @param message: the entire DNS message 882 @type message: string 883 @param current: the offset of the beginning of the name from the start 884 of the message 885 @type current: int 886 @raises dns.name.BadPointer: a compression pointer did not point backwards 887 in the message 888 @raises dns.name.BadLabelType: an invalid label type was encountered. 889 @returns: a tuple consisting of the name that was read and the number 890 of bytes of the wire format message which were consumed reading it 891 @rtype: (dns.name.Name object, int) tuple 892 """ 893 894 if not isinstance(message, binary_type): 895 raise ValueError("input to from_wire() must be a byte string") 896 message = dns.wiredata.maybe_wrap(message) 897 labels = [] 898 biggest_pointer = current 899 hops = 0 900 count = message[current] 901 current += 1 902 cused = 1 903 while count != 0: 904 if count < 64: 905 labels.append(message[current: current + count].unwrap()) 906 current += count 907 if hops == 0: 908 cused += count 909 elif count >= 192: 910 current = (count & 0x3f) * 256 + message[current] 911 if hops == 0: 912 cused += 1 913 if current >= biggest_pointer: 914 raise BadPointer 915 biggest_pointer = current 916 hops += 1 917 else: 918 raise BadLabelType 919 count = message[current] 920 current += 1 921 if hops == 0: 922 cused += 1 923 labels.append('') 924 return (Name(labels), cused) 925