1from sys import stderr 2from ipaddress import ip_address, ip_interface 3from dns.resolver import Resolver, NXDOMAIN, NoAnswer, Timeout 4from dns.name import root, from_text 5 6from .provider import DNSProvider 7 8class DNSPythonProvider(DNSProvider): 9 def configure(self, dns_servers, *, bind_addresses=None, search_domains=()): 10 super().configure(dns_servers, bind_addresses=bind_addresses, search_domains=search_domains) 11 12 self.resolver = Resolver(configure=False) 13 self.resolver.domain = root 14 self.resolver.search = [from_text(d) for d in search_domains] 15 16 self.rectypes = [] 17 if self.bind_addresses is None or any(a.version == 4 for a in self.bind_addresses): 18 self.rectypes.append('A') 19 if self.bind_addresses is None or any(a.version == 6 for a in self.bind_addresses): 20 self.rectypes.append('AAAA') 21 22 def lookup_host(self, hostname, keep_going=True): 23 result = set() 24 25 for source in self.bind_addresses or [None]: 26 if source is None: 27 self.resolver.nameservers = self.nameservers 28 else: 29 self.resolver.nameservers = [str(dns) for dns in self.dns_servers if dns.version == source.version] 30 if not self.resolver.nameservers: 31 continue 32 33 for rectype in self.rectypes: 34 try: 35 # print("Issuing query for hostname %r, rectype %r, source %r, search_domains %r, nameservers %r" % ( 36 # hostname, rectype, source, self.resolver.search_domains, self.resolver.nameservers), file=stderr) 37 a = self.resolver.query(hostname, rectype, source=str(source)) 38 print("Got results: %r" % list(a), file=stderr) 39 except (NXDOMAIN, NoAnswer): 40 pass 41 except Timeout: 42 # No point in retrying with a different rectype if these DNS server(s) are not responding 43 break 44 else: 45 result.update(ip_address(r.address) for r in a) 46 if result and not keep_going: 47 return result 48 49 return result or None 50