1"""Libknot server control interface wrapper.""" 2 3import ctypes 4import enum 5import warnings 6import libknot 7 8 9def load_lib(path: str = None) -> None: 10 """Compatibility wrapper.""" 11 12 libknot.Knot(path) 13 warnings.warn("libknot.control.load_lib() is deprecated, use libknot.Knot() instead", \ 14 category=Warning, stacklevel=2) 15 16 17class KnotCtlType(enum.IntEnum): 18 """Libknot server control data unit types.""" 19 20 END = 0 21 DATA = 1 22 EXTRA = 2 23 BLOCK = 3 24 25 26class KnotCtlDataIdx(enum.IntEnum): 27 """Libknot server control data unit indices.""" 28 29 COMMAND = 0 30 FLAGS = 1 31 ERROR = 2 32 SECTION = 3 33 ITEM = 4 34 ID = 5 35 ZONE = 6 36 OWNER = 7 37 TTL = 8 38 TYPE = 9 39 DATA = 10 40 FILTER = 11 41 42 43class KnotCtlData(object): 44 """Libknot server control data unit.""" 45 46 DataArray = ctypes.c_char_p * len(KnotCtlDataIdx) 47 48 def __init__(self) -> None: 49 self.data = self.DataArray() 50 51 def __str__(self) -> str: 52 """Returns data unit in text form.""" 53 54 string = str() 55 56 for idx in KnotCtlDataIdx: 57 if self.data[idx]: 58 if string: 59 string += ", " 60 string += "%s = %s" % (idx.name, self.data[idx]) 61 62 return string 63 64 def __getitem__(self, index: KnotCtlDataIdx) -> str: 65 """Data unit item getter.""" 66 67 value = self.data[index] 68 if not value: 69 value = str() 70 return value if isinstance(value, str) else value.decode() 71 72 def __setitem__(self, index: KnotCtlDataIdx, value: str) -> None: 73 """Data unit item setter.""" 74 75 self.data[index] = ctypes.c_char_p(value.encode()) if value else ctypes.c_char_p() 76 77 78class KnotCtlError(Exception): 79 """Libknot server control error.""" 80 81 def __init__(self, message: str, data: KnotCtlData = None) -> None: 82 super().__init__() 83 self.message = message 84 self.data = data 85 86 def __str__(self) -> str: 87 return "%s (data: %s)" % (self.message, self.data) 88 89 90class KnotCtl(object): 91 """Libknot server control interface.""" 92 93 ALLOC = None 94 FREE = None 95 SET_TIMEOUT = None 96 CONNECT = None 97 CLOSE = None 98 SEND = None 99 RECEIVE = None 100 101 def __init__(self) -> None: 102 """Initializes a control interface instance.""" 103 104 if not KnotCtl.ALLOC: 105 libknot.Knot() 106 107 KnotCtl.ALLOC = libknot.Knot.LIBKNOT.knot_ctl_alloc 108 KnotCtl.ALLOC.restype = ctypes.c_void_p 109 110 KnotCtl.FREE = libknot.Knot.LIBKNOT.knot_ctl_free 111 KnotCtl.FREE.argtypes = [ctypes.c_void_p] 112 113 KnotCtl.SET_TIMEOUT = libknot.Knot.LIBKNOT.knot_ctl_set_timeout 114 KnotCtl.SET_TIMEOUT.argtypes = [ctypes.c_void_p, ctypes.c_int] 115 116 KnotCtl.CONNECT = libknot.Knot.LIBKNOT.knot_ctl_connect 117 KnotCtl.CONNECT.restype = ctypes.c_int 118 KnotCtl.CONNECT.argtypes = [ctypes.c_void_p, ctypes.c_char_p] 119 120 KnotCtl.CLOSE = libknot.Knot.LIBKNOT.knot_ctl_close 121 KnotCtl.CLOSE.argtypes = [ctypes.c_void_p] 122 123 KnotCtl.SEND = libknot.Knot.LIBKNOT.knot_ctl_send 124 KnotCtl.SEND.restype = ctypes.c_int 125 KnotCtl.SEND.argtypes = [ctypes.c_void_p, ctypes.c_uint, ctypes.c_void_p] 126 127 KnotCtl.RECEIVE = libknot.Knot.LIBKNOT.knot_ctl_receive 128 KnotCtl.RECEIVE.restype = ctypes.c_int 129 KnotCtl.RECEIVE.argtypes = [ctypes.c_void_p, ctypes.c_void_p, ctypes.c_void_p] 130 131 self.obj = KnotCtl.ALLOC() 132 133 def __del__(self) -> None: 134 """Deallocates control interface instance.""" 135 136 KnotCtl.FREE(self.obj) 137 138 def set_timeout(self, timeout: int) -> None: 139 """Sets control socket operations timeout in seconds.""" 140 141 KnotCtl.SET_TIMEOUT(self.obj, timeout * 1000) 142 143 def connect(self, path: str) -> None: 144 """Connect to a specified control UNIX socket.""" 145 146 ret = KnotCtl.CONNECT(self.obj, path.encode()) 147 if ret != 0: 148 err = libknot.Knot.STRERROR(ret) 149 raise KnotCtlError(err if isinstance(err, str) else err.decode()) 150 151 def close(self) -> None: 152 """Disconnects from the current control socket.""" 153 154 KnotCtl.CLOSE(self.obj) 155 156 def send(self, data_type: KnotCtlType, data: KnotCtlData = None) -> None: 157 """Sends a data unit to the connected control socket.""" 158 159 ret = KnotCtl.SEND(self.obj, data_type, 160 data.data if data else ctypes.c_char_p()) 161 if ret != 0: 162 err = libknot.Knot.STRERROR(ret) 163 raise KnotCtlError(err if isinstance(err, str) else err.decode()) 164 165 def receive(self, data: KnotCtlData = None) -> KnotCtlType: 166 """Receives a data unit from the connected control socket.""" 167 168 data_type = ctypes.c_uint() 169 ret = KnotCtl.RECEIVE(self.obj, ctypes.byref(data_type), 170 data.data if data else ctypes.c_char_p()) 171 if ret != 0: 172 err = libknot.Knot.STRERROR(ret) 173 raise KnotCtlError(err if isinstance(err, str) else err.decode()) 174 return KnotCtlType(data_type.value) 175 176 def send_block(self, cmd: str, section: str = None, item: str = None, 177 identifier: str = None, zone: str = None, owner: str = None, 178 ttl: str = None, rtype: str = None, data: str = None, 179 flags: str = None, filters: str = None) -> None: 180 """Sends a control query block.""" 181 182 query = KnotCtlData() 183 query[KnotCtlDataIdx.COMMAND] = cmd 184 query[KnotCtlDataIdx.SECTION] = section 185 query[KnotCtlDataIdx.ITEM] = item 186 query[KnotCtlDataIdx.ID] = identifier 187 query[KnotCtlDataIdx.ZONE] = zone 188 query[KnotCtlDataIdx.OWNER] = owner 189 query[KnotCtlDataIdx.TTL] = ttl 190 query[KnotCtlDataIdx.TYPE] = rtype 191 query[KnotCtlDataIdx.DATA] = data 192 query[KnotCtlDataIdx.FLAGS] = flags 193 query[KnotCtlDataIdx.FILTER] = filters 194 195 self.send(KnotCtlType.DATA, query) 196 self.send(KnotCtlType.BLOCK) 197 198 def _receive_conf(self, out, reply): 199 200 section = reply[KnotCtlDataIdx.SECTION] 201 ident = reply[KnotCtlDataIdx.ID] 202 item = reply[KnotCtlDataIdx.ITEM] 203 data = reply[KnotCtlDataIdx.DATA] 204 205 # Add the section if not exists. 206 if section not in out: 207 out[section] = dict() 208 209 # Add the identifier if not exists. 210 if ident and ident not in out[section]: 211 out[section][ident] = dict() 212 213 # Return if no item/value. 214 if not item: 215 return 216 217 item_level = out[section][ident] if ident else out[section] 218 219 # Treat alone identifier item differently. 220 if item in ["id", "domain", "target"]: 221 if data not in out[section]: 222 out[section][data] = dict() 223 else: 224 if item not in item_level: 225 item_level[item] = list() 226 227 if data: 228 item_level[item].append(data) 229 230 def _receive_zone_status(self, out, reply): 231 232 zone = reply[KnotCtlDataIdx.ZONE] 233 rtype = reply[KnotCtlDataIdx.TYPE] 234 data = reply[KnotCtlDataIdx.DATA] 235 236 # Add the zone if not exists. 237 if zone not in out: 238 out[zone] = dict() 239 240 out[zone][rtype] = data 241 242 def _receive_zone(self, out, reply): 243 244 zone = reply[KnotCtlDataIdx.ZONE] 245 owner = reply[KnotCtlDataIdx.OWNER] 246 ttl = reply[KnotCtlDataIdx.TTL] 247 rtype = reply[KnotCtlDataIdx.TYPE] 248 data = reply[KnotCtlDataIdx.DATA] 249 250 # Add the zone if not exists. 251 if zone not in out: 252 out[zone] = dict() 253 254 if owner not in out[zone]: 255 out[zone][owner] = dict() 256 257 if rtype not in out[zone][owner]: 258 out[zone][owner][rtype] = dict() 259 260 # Add the key/value. 261 out[zone][owner][rtype]["ttl"] = ttl 262 263 if not "data" in out[zone][owner][rtype]: 264 out[zone][owner][rtype]["data"] = [data] 265 else: 266 out[zone][owner][rtype]["data"].append(data) 267 268 def _receive_stats(self, out, reply): 269 270 zone = reply[KnotCtlDataIdx.ZONE] 271 section = reply[KnotCtlDataIdx.SECTION] 272 item = reply[KnotCtlDataIdx.ITEM] 273 idx = reply[KnotCtlDataIdx.ID] 274 data = int(reply[KnotCtlDataIdx.DATA]) 275 276 # Add the zone if not exists. 277 if zone: 278 if "zone" not in out: 279 out["zone"] = dict() 280 281 if zone not in out["zone"]: 282 out["zone"][zone] = dict() 283 284 section_level = out["zone"][zone] if zone else out 285 286 if section not in section_level: 287 section_level[section] = dict() 288 289 if idx: 290 if item not in section_level[section]: 291 section_level[section][item] = dict() 292 293 section_level[section][item][idx] = data 294 else: 295 section_level[section][item] = data 296 297 def receive_stats(self) -> dict: 298 """Receives statistics answer and returns it as a structured dictionary.""" 299 300 out = dict() 301 err_reply = None 302 303 while True: 304 reply = KnotCtlData() 305 reply_type = self.receive(reply) 306 307 # Stop if not data type. 308 if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: 309 break 310 311 # Check for an error. 312 if reply[KnotCtlDataIdx.ERROR]: 313 err_reply = reply 314 continue 315 316 self._receive_stats(out, reply) 317 318 if err_reply: 319 raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply) 320 321 return out 322 323 def receive_block(self) -> dict: 324 """Receives a control answer and returns it as a structured dictionary.""" 325 326 out = dict() 327 err_reply = None 328 329 while True: 330 reply = KnotCtlData() 331 reply_type = self.receive(reply) 332 333 # Stop if not data type. 334 if reply_type not in [KnotCtlType.DATA, KnotCtlType.EXTRA]: 335 break 336 337 # Check for an error. 338 if reply[KnotCtlDataIdx.ERROR]: 339 err_reply = reply 340 continue 341 342 # Check for config data. 343 if reply[KnotCtlDataIdx.SECTION]: 344 self._receive_conf(out, reply) 345 # Check for zone data. 346 elif reply[KnotCtlDataIdx.ZONE]: 347 if reply[KnotCtlDataIdx.OWNER]: 348 self._receive_zone(out, reply) 349 else: 350 self._receive_zone_status(out, reply) 351 else: 352 continue 353 354 if err_reply: 355 raise KnotCtlError(err_reply[KnotCtlDataIdx.ERROR], err_reply) 356 357 return out 358