1# Copyright (c) 2019, Stefan Grönke 2# All rights reserved. 3# 4# Redistribution and use in source and binary forms, with or without 5# modification, are permitted providing that the following conditions 6# are met: 7# 1. Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# 2. Redistributions in binary form must reproduce the above copyright 10# notice, this list of conditions and the following disclaimer in the 11# documentation and/or other materials provided with the distribution. 12# 13# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR 14# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED 15# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE 16# ARE DISCLAIMED. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY 17# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL 18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS 19# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION) 20# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT, 21# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING 22# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE 23# POSSIBILITY OF SUCH DAMAGE. 24import typing 25import ctypes 26import struct 27import enum 28import freebsd_sysctl.libc 29import freebsd_sysctl.types 30import freebsd_sysctl.flags 31 32NULL_BYTES = b"\x00" 33CTL_MAXNAME = ctypes.c_uint(24) 34T_OID = (ctypes.c_int * 2) 35BUFSIZ = 1024 # see /include/stdio.h#L209 36 37 38class Sysctl: 39 40 _name: typing.Optional[str] 41 _oid: typing.Optional[typing.List[int]] 42 _kind: typing.Optional[int] 43 _fmt = typing.Optional[bytes] 44 _size: typing.Optional[int] 45 _value: typing.Optional[str] 46 _description: typing.Optional[str] 47 48 def __init__( 49 self, 50 name: typing.Optional[str]=None, 51 oid: typing.Optional[typing.List[int]]=None 52 ) -> None: 53 self._name = name 54 self._oid = oid 55 self._kind = None 56 self._fmt = None 57 self._size = None 58 self._value = None 59 self._description = None 60 61 @property 62 def oid(self) -> typing.List[int]: 63 if self._oid is None: 64 if self.name is None: 65 raise ValueError("Name or OID required") 66 self._oid = self.name2oid(self.name) 67 return self._oid 68 69 @property 70 def name(self) -> str: 71 if self._name is None: 72 if self.oid is None: 73 raise ValueError("Name or OID required") 74 self._name = self.oid2name(self.oid) 75 return self._name 76 77 @property 78 def kind(self) -> int: 79 if self._kind is None: 80 self.__query_kind_and_fmt() 81 return self._kind 82 83 @property 84 def fmt(self) -> int: 85 if self._fmt is None: 86 self.__query_kind_and_fmt() 87 return self._fmt 88 89 @property 90 def size(self) -> int: 91 if self._size is None: 92 self._size = self.query_size(self.oid, self.ctl_type) 93 return self._size 94 95 @property 96 def raw_value(self) -> int: 97 if self._value is None: 98 self._value = self.query_value(self.oid, self.size, self.ctl_type) 99 return self._value 100 101 @property 102 def value(self) -> int: 103 return self.raw_value.value 104 105 @property 106 def description(self) -> int: 107 if self._description is None: 108 self._description = self.query_description(self.oid) 109 return self._description 110 111 @property 112 def next(self): 113 return self.__class__(oid=self.query_next(self.oid)) 114 115 @property 116 def children(self) -> typing.Iterator['Sysctl']: 117 if self.ctl_type != freebsd_sysctl.types.NODE: 118 return 119 current = self.next 120 while self.oid == current.oid[:len(self.oid)]: 121 yield current 122 current = current.next 123 124 def __query_kind_and_fmt(self) -> None: 125 self._kind, self._fmt = self.query_fmt(self.oid) 126 127 @staticmethod 128 def name2oid(name: str) -> typing.List[int]: 129 p_name = ctypes.c_char_p(name.encode() + NULL_BYTES) 130 oid = T_OID(0, 3) 131 p_oid = ctypes.POINTER(T_OID)(oid) 132 133 length = ctypes.c_int(CTL_MAXNAME.value * ctypes.sizeof(ctypes.c_int)) 134 p_length = ctypes.POINTER(ctypes.c_int)(length) 135 136 Res = ctypes.c_int*length.value 137 res = (Res)() 138 139 freebsd_sysctl.libc.dll.sysctl( 140 p_oid, 141 2, 142 ctypes.POINTER(Res)(res), 143 p_length, 144 p_name, 145 len(p_name.value) 146 ) 147 148 oid_length = int(length.value / ctypes.sizeof(ctypes.c_int)) 149 return res[:oid_length] 150 151 @staticmethod 152 def oid2name(oid: typing.List[int]) -> str: 153 qoid_len = (2 + len(oid)) 154 qoid_type = ctypes.c_int * qoid_len 155 qoid = (qoid_type)(*([0, 1] + oid)) 156 p_qoid = ctypes.POINTER(qoid_type)(qoid) 157 158 buf = ctypes.create_string_buffer(BUFSIZ) 159 buf_void = ctypes.cast(buf, ctypes.c_void_p) 160 161 buf_length = ctypes.sizeof(buf) 162 p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length)) 163 164 freebsd_sysctl.libc.dll.sysctl( 165 p_qoid, 166 qoid_len, 167 buf_void, 168 p_buf_length, 169 0, 170 0 171 ) 172 return buf.value.decode() 173 174 @staticmethod 175 def query_fmt(oid: typing.List[int]) -> bytes: 176 177 qoid_len = (2 + len(oid)) 178 qoid_type = ctypes.c_int * qoid_len 179 qoid = (qoid_type)(*([0, 4] + oid)) 180 p_qoid = ctypes.POINTER(qoid_type)(qoid) 181 182 buf_type = ctypes.c_char * BUFSIZ 183 buf = buf_type() 184 p_buf = ctypes.POINTER(buf_type)(buf) 185 buf_void = ctypes.cast(p_buf, ctypes.c_void_p) 186 187 buf_length = ctypes.sizeof(buf) 188 p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length)) 189 190 freebsd_sysctl.libc.dll.sysctl( 191 p_qoid, 192 qoid_len, 193 buf_void, 194 p_buf_length, 195 0, 196 0 197 ) 198 199 if len(buf) < 4: 200 raise Exception("response buffer too small") 201 result = buf[:buf_length] 202 kind, = struct.unpack("<I", result[:4]) 203 fmt = result[4:] 204 return (kind, fmt) 205 206 @staticmethod 207 def query_size( 208 oid: typing.List[int], 209 ctl_type: freebsd_sysctl.types.CtlType 210 ) -> bytes: 211 212 oid_type = ctypes.c_int * len(oid) 213 _oid = (oid_type)(*oid) 214 p_oid = ctypes.POINTER(oid_type)(_oid) 215 216 length = ctypes.c_int() 217 p_length = ctypes.POINTER(ctypes.c_int)(length) 218 219 freebsd_sysctl.libc.dll.sysctl( 220 p_oid, 221 len(oid), 222 None, 223 p_length, 224 0 225 ) 226 227 return max(length.value, ctl_type.min_size) 228 229 @staticmethod 230 def query_value( 231 oid: typing.List[int], 232 size: int, 233 ctl_type: freebsd_sysctl.types.CtlType 234 ) -> bytes: 235 236 # ToDo: check if value is readable 237 238 oid_type = ctypes.c_int * len(oid) 239 _oid = (oid_type)(*oid) 240 p_oid = ctypes.POINTER(oid_type)(_oid) 241 242 buf_type = ctypes.c_char * size 243 buf = buf_type() 244 p_buf = ctypes.POINTER(buf_type)(buf) 245 p_buf_void = ctypes.cast(p_buf, ctypes.c_void_p) 246 247 buf_length = ctypes.sizeof(buf) 248 p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length)) 249 250 freebsd_sysctl.libc.dll.sysctl( 251 p_oid, 252 ctypes.c_uint32(len(oid)), 253 p_buf_void, 254 p_buf_length, 255 None, 256 0 257 ) 258 259 return ctl_type(buf, size) 260 261 @staticmethod 262 def query_description( 263 oid: typing.List[int] 264 ) -> str: 265 266 qoid_len = (2 + len(oid)) 267 qoid_type = ctypes.c_int * qoid_len 268 qoid = (qoid_type)(*([0, 5] + oid)) 269 p_qoid = ctypes.POINTER(qoid_type)(qoid) 270 271 buf_type = ctypes.c_char * BUFSIZ 272 buf = buf_type() 273 p_buf = ctypes.POINTER(buf_type)(buf) 274 buf_void = ctypes.cast(p_buf, ctypes.c_void_p) 275 276 buf_length = ctypes.sizeof(buf) 277 p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length)) 278 279 freebsd_sysctl.libc.dll.sysctl( 280 p_qoid, 281 qoid_len, 282 buf_void, 283 p_buf_length, 284 0, 285 0 286 ) 287 288 return buf.value.decode() 289 290 @staticmethod 291 def query_next(oid: typing.List[int]) -> bytes: 292 293 qoid_len = (2 + len(oid)) 294 qoid_type = ctypes.c_int * qoid_len 295 qoid = (qoid_type)(*([0, 2] + oid)) 296 p_qoid = ctypes.POINTER(qoid_type)(qoid) 297 298 buf_type = ctypes.c_int * CTL_MAXNAME.value 299 buf = buf_type() 300 p_buf = ctypes.POINTER(buf_type)(buf) 301 buf_void = ctypes.cast(p_buf, ctypes.c_void_p) 302 303 buf_length = ctypes.sizeof(buf) 304 p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length)) 305 306 freebsd_sysctl.libc.dll.sysctl( 307 p_qoid, 308 qoid_len, 309 buf_void, 310 p_buf_length, 311 0, 312 0 313 ) 314 315 oid_length = int( 316 p_buf_length.contents.value / ctypes.sizeof(ctypes.c_int) 317 ) 318 return buf[0:oid_length] 319 320 @property 321 def ctl_type(self) -> freebsd_sysctl.types.CtlType: 322 return self.get_ctl_type(self.kind, self.fmt) 323 324 @staticmethod 325 def get_ctl_type( 326 kind: int, 327 fmt: bytes 328 ) -> freebsd_sysctl.types.CtlType: 329 return freebsd_sysctl.types.identify_type(kind, fmt) 330 331 def has_flag(self, flag: int) -> bool: 332 """Return is the sysctl has a certain flag.""" 333 return (self.kind & flag == flag) is True 334