1"""Libknot server control interface wrapper."""
2
3import ctypes
4import enum
5import warnings
6import libknot
7
8
9def load_lib(path: str = None) -> None:
10    """Compatibility wrapper."""
11
12    libknot.Knot(path)
13    warnings.warn("libknot.control.load_lib() is deprecated, use libknot.Knot() instead", \
14                  category=Warning, stacklevel=2)
15
16
17class KnotCtlType(enum.IntEnum):
18    """Libknot server control data unit types."""
19
20    END = 0
21    DATA = 1
22    EXTRA = 2
23    BLOCK = 3
24
25
26class KnotCtlDataIdx(enum.IntEnum):
27    """Libknot server control data unit indices."""
28
29    COMMAND = 0
30    FLAGS = 1
31    ERROR = 2
32    SECTION = 3
33    ITEM = 4
34    ID = 5
35    ZONE = 6
36    OWNER = 7
37    TTL = 8
38    TYPE = 9
39    DATA = 10
40    FILTER = 11
41
42
43class KnotCtlData(object):
44    """Libknot server control data unit."""
45
46    DataArray = ctypes.c_char_p * len(KnotCtlDataIdx)
47
48    def __init__(self) -> None:
49        self.data = self.DataArray()
50
51    def __str__(self) -> str:
52        """Returns data unit in text form."""
53
54        string = str()
55
56        for idx in KnotCtlDataIdx:
57            if self.data[idx]:
58                if string:
59                    string += ", "
60                string += "%s = %s" % (idx.name, self.data[idx])
61
62        return string
63
64    def __getitem__(self, index: KnotCtlDataIdx) -> str:
65        """Data unit item getter."""
66
67        value = self.data[index]
68        if not value:
69            value = str()
70        return value if isinstance(value, str) else value.decode()
71
72    def __setitem__(self, index: KnotCtlDataIdx, value: str) -> None:
73        """Data unit item setter."""
74
75        self.data[index] = ctypes.c_char_p(value.encode()) if value else ctypes.c_char_p()
76
77
78class KnotCtlError(Exception):
79    """Libknot server control error."""
80
81    def __init__(self, message: str, data: KnotCtlData = None) -> None:
82        super().__init__()
83        self.message = message
84        self.data = data
85
86    def __str__(self) -> str:
87        return "%s (data: %s)" % (self.message, self.data)
88
89
90class KnotCtl(object):
91    """Libknot server control interface."""
92
93    ALLOC = None
94    FREE = None
95    SET_TIMEOUT = None
96    CONNECT = None
97    CLOSE = None
98    SEND = None
99    RECEIVE = None
100
101    def __init__(self) -> None:
102        """Initializes a control interface instance."""
103
104        if not KnotCtl.ALLOC:
105            libknot.Knot()
106
107            KnotCtl.ALLOC = libknot.Knot.LIBKNOT.knot_ctl_alloc
108            KnotCtl.ALLOC.restype = ctypes.c_void_p
109
110            KnotCtl.FREE = libknot.Knot.LIBKNOT.knot_ctl_free
111            KnotCtl.FREE.argtypes = [ctypes.c_void_p]
112
113            KnotCtl.SET_TIMEOUT = libknot.Knot.LIBKNOT.knot_ctl_set_timeout
114            KnotCtl.SET_TIMEOUT.argtypes = [ctypes.c_void_p, ctypes.c_int]
115
116            KnotCtl.CONNECT = libknot.Knot.LIBKNOT.knot_ctl_connect
117            KnotCtl.CONNECT.restype = ctypes.c_int
118            KnotCtl.CONNECT.argtypes = [ctypes.c_void_p, ctypes.c_char_p]
119
120            KnotCtl.CLOSE = libknot.Knot.LIBKNOT.knot_ctl_close
121            KnotCtl.CLOSE.argtypes = [ctypes.c_void_p]
122
123            KnotCtl.SEND = libknot.Knot.LIBKNOT.knot_ctl_send
124            KnotCtl.SEND.restype = ctypes.c_int
125            KnotCtl.SEND.argtypes = [ctypes.c_void_p, ctypes.c_uint, ctypes.c_void_p]
126
127            KnotCtl.RECEIVE = libknot.Knot.LIBKNOT.knot_ctl_receive
128            KnotCtl.RECEIVE.restype = ctypes.c_int
129            KnotCtl.RECEIVE.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p]
130
131        self.obj = KnotCtl.ALLOC()
132
133    def __del__(self) -> None:
134        """Deallocates control interface instance."""
135
136        KnotCtl.FREE(self.obj)
137
138    def set_timeout(self, timeout: int) -> None:
139        """Sets control socket operations timeout in seconds."""
140
141        KnotCtl.SET_TIMEOUT(self.obj, timeout * 1000)
142
143    def connect(self, path: str) -> None:
144        """Connect to a specified control UNIX socket."""
145
146        ret = KnotCtl.CONNECT(self.obj, path.encode())
147        if ret != 0:
148            err = libknot.Knot.STRERROR(ret)
149            raise KnotCtlError(err if isinstance(err, str) else err.decode())
150
151    def close(self) -> None:
152        """Disconnects from the current control socket."""
153
154        KnotCtl.CLOSE(self.obj)
155
156    def send(self, data_type: KnotCtlType, data: KnotCtlData = None) -> None:
157        """Sends a data unit to the connected control socket."""
158
159        ret = KnotCtl.SEND(self.obj, data_type,
160                           data.data if data else ctypes.c_char_p())
161        if ret != 0:
162            err = libknot.Knot.STRERROR(ret)
163            raise KnotCtlError(err if isinstance(err, str) else err.decode())
164
165    def receive(self, data: KnotCtlData = None) -> KnotCtlType:
166        """Receives a data unit from the connected control socket."""
167
168        data_type = ctypes.c_uint()
169        ret = KnotCtl.RECEIVE(self.obj, ctypes.byref(data_type),
170                              data.data if data else ctypes.c_char_p())
171        if ret != 0:
172            err = libknot.Knot.STRERROR(ret)
173            raise KnotCtlError(err if isinstance(err, str) else err.decode())
174        return KnotCtlType(data_type.value)
175
176    def send_block(self, cmd: str, section: str = None, item: str = None,
177                   identifier: str = None, zone: str = None, owner: str = None,
178                   ttl: str = None, rtype: str = None, data: str = None,
179                   flags: str = None, filters: str = None) -> None:
180        """Sends a control query block."""
181
182        query = KnotCtlData()
183        query[KnotCtlDataIdx.COMMAND] = cmd
184        query[KnotCtlDataIdx.SECTION] = section
185        query[KnotCtlDataIdx.ITEM] = item
186        query[KnotCtlDataIdx.ID] = identifier
187        query[KnotCtlDataIdx.ZONE] = zone
188        query[KnotCtlDataIdx.OWNER] = owner
189        query[KnotCtlDataIdx.TTL] = ttl
190        query[KnotCtlDataIdx.TYPE] = rtype
191        query[KnotCtlDataIdx.DATA] = data
192        query[KnotCtlDataIdx.FLAGS] = flags
193        query[KnotCtlDataIdx.FILTER] = filters
194
195        self.send(KnotCtlType.DATA, query)
196        self.send(KnotCtlType.BLOCK)
197
198    def _receive_conf(self, out, reply):
199
200        section = reply[KnotCtlDataIdx.SECTION]
201        ident = reply[KnotCtlDataIdx.ID]
202        item = reply[KnotCtlDataIdx.ITEM]
203        data = reply[KnotCtlDataIdx.DATA]
204
205        # Add the section if not exists.
206        if section not in out:
207            out[section] = dict()
208
209        # Add the identifier if not exists.
210        if ident and ident not in out[section]:
211            out[section][ident] = dict()
212
213        # Return if no item/value.
214        if not item:
215            return
216
217        item_level = out[section][ident] if ident else out[section]
218
219        # Treat alone identifier item differently.
220        if item in ["id", "domain", "target"]:
221            if data not in out[section]:
222                out[section][data] = dict()
223        else:
224            if item not in item_level:
225                item_level[item] = list()
226
227            if data:
228                item_level[item].append(data)
229
230    def _receive_zone_status(self, out, reply):
231
232        zone = reply[KnotCtlDataIdx.ZONE]
233        rtype = reply[KnotCtlDataIdx.TYPE]
234        data = reply[KnotCtlDataIdx.DATA]
235
236        # Add the zone if not exists.
237        if zone not in out:
238            out[zone] = dict()
239
240        out[zone][rtype] = data
241
242    def _receive_zone(self, out, reply):
243
244        zone = reply[KnotCtlDataIdx.ZONE]
245        owner = reply[KnotCtlDataIdx.OWNER]
246        ttl = reply[KnotCtlDataIdx.TTL]
247        rtype = reply[KnotCtlDataIdx.TYPE]
248        data = reply[KnotCtlDataIdx.DATA]
249
250        # Add the zone if not exists.
251        if zone not in out:
252            out[zone] = dict()
253
254        if owner not in out[zone]:
255            out[zone][owner] = dict()
256
257        if rtype not in out[zone][owner]:
258            out[zone][owner][rtype] = dict()
259
260        # Add the key/value.
261        out[zone][owner][rtype]["ttl"] = ttl
262
263        if not "data" in out[zone][owner][rtype]:
264            out[zone][owner][rtype]["data"] = [data]
265        else:
266            out[zone][owner][rtype]["data"].append(data)
267
268    def _receive_stats(self, out, reply):
269
270        zone = reply[KnotCtlDataIdx.ZONE]
271        section = reply[KnotCtlDataIdx.SECTION]
272        item = reply[KnotCtlDataIdx.ITEM]
273        idx = reply[KnotCtlDataIdx.ID]
274        data = int(reply[KnotCtlDataIdx.DATA])
275
276        # Add the zone if not exists.
277        if zone:
278            if "zone" not in out:
279                out["zone"] = dict()
280
281            if zone not in out["zone"]:
282                out["zone"][zone] = dict()
283
284        section_level = out["zone"][zone] if zone else out
285
286        if section not in section_level:
287            section_level[section] = dict()
288
289        if idx:
290            if item not in section_level[section]:
291                section_level[section][item] = dict()
292
293            section_level[section][item][idx] = data
294        else:
295            section_level[section][item] = data
296
297    def receive_stats(self) -> dict:
298        """Receives statistics answer and returns it as a structured dictionary."""
299
300        out = dict()
301        err_reply = None
302
303        while True:
304            reply = KnotCtlData()
305            reply_type = self.receive(reply)
306
307            # Stop if not data type.
308            if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
309                break
310
311            # Check for an error.
312            if reply[KnotCtlDataIdx.ERROR]:
313                err_reply = reply
314                continue
315
316            self._receive_stats(out, reply)
317
318        if err_reply:
319            raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply)
320
321        return out
322
323    def receive_block(self) -> dict:
324        """Receives a control answer and returns it as a structured dictionary."""
325
326        out = dict()
327        err_reply = None
328
329        while True:
330            reply = KnotCtlData()
331            reply_type = self.receive(reply)
332
333            # Stop if not data type.
334            if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]:
335                break
336
337            # Check for an error.
338            if reply[KnotCtlDataIdx.ERROR]:
339                err_reply = reply
340                continue
341
342            # Check for config data.
343            if reply[KnotCtlDataIdx.SECTION]:
344                self._receive_conf(out, reply)
345            # Check for zone data.
346            elif reply[KnotCtlDataIdx.ZONE]:
347                if reply[KnotCtlDataIdx.OWNER]:
348                    self._receive_zone(out, reply)
349                else:
350                    self._receive_zone_status(out, reply)
351            else:
352                continue
353
354        if err_reply:
355            raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply)
356
357        return out
358