1""" Run an rbldnsd and send it DNS queries. 2 3 4""" 5from itertools import count 6import subprocess 7from tempfile import NamedTemporaryFile, TemporaryFile 8import time 9import unittest 10 11try: 12 import DNS 13except ImportError: 14 raise RuntimeError("The pydns library is not installed") 15 16DUMMY_ZONE_HEADER = """ 17$SOA 0 example.org. hostmaster.example.com. 0 1h 1h 2d 1h 18$NS 1d ns0.example.org 19""" 20 21class ZoneFile(object): 22 def __init__(self, lines=None, no_header=False): 23 self._file = NamedTemporaryFile() 24 if not no_header: 25 self._file.write(DUMMY_ZONE_HEADER) 26 if lines is not None: 27 self.writelines(lines) 28 self._file.flush() 29 30 @property 31 def name(self): 32 return self._file.name 33 34 def write(self, str): 35 self._file.write(str) 36 self._file.flush() 37 38 def writelines(self, lines): 39 self._file.writelines("%s\n" % line for line in lines) 40 self._file.flush() 41 42class DaemonError(Exception): 43 """ Various errors having to do with the execution of the daemon. 44 """ 45 46class QueryRefused(Exception): 47 """ Query to rbldnsd was REFUSED. 48 """ 49 50 51class Rbldnsd(object): 52 def __init__(self, datasets=None, 53 daemon_addr='localhost', daemon_port=5300, 54 daemon_bin='./rbldnsd', 55 stderr=None): 56 self._daemon = None 57 self.datasets = [] 58 self.daemon_addr = daemon_addr 59 self.daemon_port = daemon_port 60 self.daemon_bin = daemon_bin 61 self.stderr = stderr 62 63 def add_dataset(self, ds_type, file, soa='example.com'): 64 self.datasets.append((soa, ds_type, file)) 65 66 def __enter__(self): 67 self._start_daemon() 68 return self 69 70 def __exit__(self, exc_type, exc_value, exc_tb): 71 self._stop_daemon() 72 73 def __del__(self): 74 if self._daemon: 75 self._stop_daemon() 76 77 def query(self, name, qtype='TXT'): 78 if not self._daemon: 79 raise DaemonError("daemon not running") 80 elif self._daemon.poll() is not None: 81 raise DaemonError("daemon has died with code %d" 82 % self._daemon.returncode) 83 84 req = DNS.Request(name=name, qtype=qtype, rd=0) 85 resp = req.req(server=self.daemon_addr, port=self.daemon_port) 86 status = resp.header['status'] 87 if status == 'REFUSED': 88 raise QueryRefused("REFUSED") 89 elif status == 'NXDOMAIN': 90 return None 91 else: 92 assert status == 'NOERROR' 93 assert len(resp.answers) == 1 94 assert len(resp.answers[0]['data']) == 1 95 return resp.answers[0]['data'][0] 96 97 def _start_daemon(self): 98 if len(self.datasets) == 0: 99 raise ValueError("no datasets defined") 100 101 cmd = [ self.daemon_bin, '-n', 102 '-b', '%s/%u' % (self.daemon_addr, self.daemon_port), 103 ] 104 for zone, ds_type, file in self.datasets: 105 if isinstance(file, basestring): 106 filename = file 107 else: 108 filename = file.name 109 cmd.append("%s:%s:%s" % (zone, ds_type, filename)) 110 111 self._stdout = TemporaryFile() 112 self._daemon = daemon = subprocess.Popen(cmd, stdout=self._stdout, 113 stderr=self.stderr) 114 115 # wait for rbldnsd to start responding 116 time.sleep(0.1) 117 for retry in count(): 118 if daemon.poll() is not None: 119 raise DaemonError( 120 "rbldsnd exited unexpectedly with return code %d" 121 % daemon.returncode) 122 try: 123 self.query('dummy.nonexisting.zone') 124 break 125 except QueryRefused: 126 break 127 except DNS.DNSError as ex: 128 if str(ex) != 'no working nameservers found': 129 raise 130 elif retries > 10: 131 raise DaemonError( 132 "rbldnsd does not seem to be responding") 133 134 def _stop_daemon(self): 135 daemon = self._daemon 136 137 if daemon.poll() is None: 138 daemon.terminate() 139 retries = count() 140 while daemon.poll() is None: 141 retry = next(retries) 142 if retry == 30: 143 daemon.kill() 144 elif retry == 50: 145 raise DaemonError("can not kill stop rbldnsd") 146 time.sleep(0.1) 147 148 self._daemon = None 149 if daemon.returncode != 0: 150 raise DaemonError("rbldnsd exited with code %d" 151 % daemon.returncode) 152 153class TestRbldnsd(unittest.TestCase): 154 def test(self): 155 rbldnsd = Rbldnsd() 156 test_zone = ZoneFile(lines=["1.2.3.4 :1: Success"]) 157 rbldnsd.add_dataset('ip4set', test_zone) 158 with rbldnsd: 159 self.assertEqual(rbldnsd.query('4.3.2.1.example.com'), 'Success') 160 self.assertEqual(rbldnsd.query('5.3.2.1.example.com'), None) 161 162if __name__ == '__main__': 163 unittest.main() 164