1""" Run an rbldnsd and send it DNS queries.
2
3
4"""
5from itertools import count
6import subprocess
7from tempfile import NamedTemporaryFile, TemporaryFile
8import time
9import unittest
10
11try:
12    import DNS
13except ImportError:
14    raise RuntimeError("The pydns library is not installed")
15
16DUMMY_ZONE_HEADER = """
17$SOA 0 example.org. hostmaster.example.com. 0 1h 1h 2d 1h
18$NS 1d ns0.example.org
19"""
20
21class ZoneFile(object):
22    def __init__(self, lines=None, no_header=False):
23        self._file = NamedTemporaryFile()
24        if not no_header:
25            self._file.write(DUMMY_ZONE_HEADER)
26        if lines is not None:
27            self.writelines(lines)
28        self._file.flush()
29
30    @property
31    def name(self):
32        return self._file.name
33
34    def write(self, str):
35        self._file.write(str)
36        self._file.flush()
37
38    def writelines(self, lines):
39        self._file.writelines("%s\n" % line for line in lines)
40        self._file.flush()
41
42class DaemonError(Exception):
43    """ Various errors having to do with the execution of the daemon.
44    """
45
46class QueryRefused(Exception):
47    """ Query to rbldnsd was REFUSED.
48    """
49
50
51class Rbldnsd(object):
52    def __init__(self, datasets=None,
53                 daemon_addr='localhost', daemon_port=5300,
54                 daemon_bin='./rbldnsd',
55                 stderr=None):
56        self._daemon = None
57        self.datasets = []
58        self.daemon_addr = daemon_addr
59        self.daemon_port = daemon_port
60        self.daemon_bin = daemon_bin
61        self.stderr = stderr
62
63    def add_dataset(self, ds_type, file, soa='example.com'):
64        self.datasets.append((soa, ds_type, file))
65
66    def __enter__(self):
67        self._start_daemon()
68        return self
69
70    def __exit__(self, exc_type, exc_value, exc_tb):
71        self._stop_daemon()
72
73    def __del__(self):
74        if self._daemon:
75            self._stop_daemon()
76
77    def query(self, name, qtype='TXT'):
78        if not self._daemon:
79            raise DaemonError("daemon not running")
80        elif self._daemon.poll() is not None:
81            raise DaemonError("daemon has died with code %d"
82                              % self._daemon.returncode)
83
84        req = DNS.Request(name=name, qtype=qtype, rd=0)
85        resp = req.req(server=self.daemon_addr, port=self.daemon_port)
86        status = resp.header['status']
87        if status == 'REFUSED':
88            raise QueryRefused("REFUSED")
89        elif status == 'NXDOMAIN':
90            return None
91        else:
92            assert status == 'NOERROR'
93            assert len(resp.answers) == 1
94            assert len(resp.answers[0]['data']) == 1
95            return resp.answers[0]['data'][0]
96
97    def _start_daemon(self):
98        if len(self.datasets) == 0:
99            raise ValueError("no datasets defined")
100
101        cmd = [ self.daemon_bin, '-n',
102                '-b', '%s/%u' % (self.daemon_addr, self.daemon_port),
103                ]
104        for zone, ds_type, file in self.datasets:
105            if isinstance(file, basestring):
106                filename = file
107            else:
108                filename = file.name
109            cmd.append("%s:%s:%s" % (zone, ds_type, filename))
110
111        self._stdout = TemporaryFile()
112        self._daemon = daemon = subprocess.Popen(cmd, stdout=self._stdout,
113                                                 stderr=self.stderr)
114
115        # wait for rbldnsd to start responding
116        time.sleep(0.1)
117        for retry in count():
118            if daemon.poll() is not None:
119                raise DaemonError(
120                    "rbldsnd exited unexpectedly with return code %d"
121                    % daemon.returncode)
122            try:
123                self.query('dummy.nonexisting.zone')
124                break
125            except QueryRefused:
126                break
127            except DNS.DNSError as ex:
128                if str(ex) != 'no working nameservers found':
129                    raise
130                elif retries > 10:
131                    raise DaemonError(
132                        "rbldnsd does not seem to be responding")
133
134    def _stop_daemon(self):
135        daemon = self._daemon
136
137        if daemon.poll() is None:
138            daemon.terminate()
139            retries = count()
140            while daemon.poll() is None:
141                retry = next(retries)
142                if retry == 30:
143                    daemon.kill()
144                elif retry == 50:
145                    raise DaemonError("can not kill stop rbldnsd")
146                time.sleep(0.1)
147
148        self._daemon = None
149        if daemon.returncode != 0:
150            raise DaemonError("rbldnsd exited with code %d"
151                              % daemon.returncode)
152
153class TestRbldnsd(unittest.TestCase):
154    def test(self):
155        rbldnsd = Rbldnsd()
156        test_zone = ZoneFile(lines=["1.2.3.4 :1: Success"])
157        rbldnsd.add_dataset('ip4set', test_zone)
158        with rbldnsd:
159            self.assertEqual(rbldnsd.query('4.3.2.1.example.com'), 'Success')
160            self.assertEqual(rbldnsd.query('5.3.2.1.example.com'), None)
161
162if __name__ == '__main__':
163    unittest.main()
164