1# Copyright (c) 2019, Stefan Grönke
2# All rights reserved.
3#
4# Redistribution and use in source and binary forms, with or without
5# modification, are permitted providing that the following conditions
6# are met:
7# 1. Redistributions of source code must retain the above copyright
8#    notice, this list of conditions and the following disclaimer.
9# 2. Redistributions in binary form must reproduce the above copyright
10#    notice, this list of conditions and the following disclaimer in the
11#    documentation and/or other materials provided with the distribution.
12#
13# THIS SOFTWARE IS PROVIDED BY THE AUTHOR ``AS IS'' AND ANY EXPRESS OR
14# IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE IMPLIED
15# WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR PURPOSE
16# ARE DISCLAIMED.  IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY
17# DIRECT, INDIRECT, INCIDENTAL, SPECIAL, EXEMPLARY, OR CONSEQUENTIAL
18# DAMAGES (INCLUDING, BUT NOT LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS
19# OR SERVICES; LOSS OF USE, DATA, OR PROFITS; OR BUSINESS INTERRUPTION)
20# HOWEVER CAUSED AND ON ANY THEORY OF LIABILITY, WHETHER IN CONTRACT,
21# STRICT LIABILITY, OR TORT (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING
22# IN ANY WAY OUT OF THE USE OF THIS SOFTWARE, EVEN IF ADVISED OF THE
23# POSSIBILITY OF SUCH DAMAGE.
24import typing
25import ctypes
26import struct
27import enum
28import freebsd_sysctl.libc
29import freebsd_sysctl.types
30import freebsd_sysctl.flags
31
32NULL_BYTES = b"\x00"
33CTL_MAXNAME = ctypes.c_uint(24)
34T_OID = (ctypes.c_int * 2)
35BUFSIZ = 1024 # see /include/stdio.h#L209
36
37
38class Sysctl:
39
40    _name: typing.Optional[str]
41    _oid: typing.Optional[typing.List[int]]
42    _kind: typing.Optional[int]
43    _fmt = typing.Optional[bytes]
44    _size: typing.Optional[int]
45    _value: typing.Optional[str]
46    _description: typing.Optional[str]
47
48    def __init__(
49        self,
50        name: typing.Optional[str]=None,
51        oid: typing.Optional[typing.List[int]]=None
52    ) -> None:
53        self._name = name
54        self._oid = oid
55        self._kind = None
56        self._fmt = None
57        self._size = None
58        self._value = None
59        self._description = None
60
61    @property
62    def oid(self) -> typing.List[int]:
63        if self._oid is None:
64            if self.name is None:
65                raise ValueError("Name or OID required")
66            self._oid = self.name2oid(self.name)
67        return self._oid
68
69    @property
70    def name(self) -> str:
71        if self._name is None:
72            if self.oid is None:
73                raise ValueError("Name or OID required")
74            self._name = self.oid2name(self.oid)
75        return self._name
76
77    @property
78    def kind(self) -> int:
79        if self._kind is None:
80            self.__query_kind_and_fmt()
81        return self._kind
82
83    @property
84    def fmt(self) -> int:
85        if self._fmt is None:
86            self.__query_kind_and_fmt()
87        return self._fmt
88
89    @property
90    def size(self) -> int:
91        if self._size is None:
92            self._size = self.query_size(self.oid, self.ctl_type)
93        return self._size
94
95    @property
96    def raw_value(self) -> int:
97        if self._value is None:
98            self._value = self.query_value(self.oid, self.size, self.ctl_type)
99        return self._value
100
101    @property
102    def value(self) -> int:
103        return self.raw_value.value
104
105    @property
106    def description(self) -> int:
107        if self._description is None:
108            self._description = self.query_description(self.oid)
109        return self._description
110
111    @property
112    def next(self):
113        return self.__class__(oid=self.query_next(self.oid))
114
115    @property
116    def children(self) -> typing.Iterator['Sysctl']:
117        if self.ctl_type != freebsd_sysctl.types.NODE:
118            return
119        current = self.next
120        while self.oid == current.oid[:len(self.oid)]:
121            yield current
122            current = current.next
123
124    def __query_kind_and_fmt(self) -> None:
125        self._kind, self._fmt = self.query_fmt(self.oid)
126
127    @staticmethod
128    def name2oid(name: str) -> typing.List[int]:
129        p_name = ctypes.c_char_p(name.encode() + NULL_BYTES)
130        oid = T_OID(0, 3)
131        p_oid = ctypes.POINTER(T_OID)(oid)
132
133        length = ctypes.c_int(CTL_MAXNAME.value * ctypes.sizeof(ctypes.c_int))
134        p_length = ctypes.POINTER(ctypes.c_int)(length)
135
136        Res = ctypes.c_int*length.value
137        res = (Res)()
138
139        freebsd_sysctl.libc.dll.sysctl(
140            p_oid,
141            2,
142            ctypes.POINTER(Res)(res),
143            p_length,
144            p_name,
145            len(p_name.value)
146        )
147
148        oid_length = int(length.value / ctypes.sizeof(ctypes.c_int))
149        return res[:oid_length]
150
151    @staticmethod
152    def oid2name(oid: typing.List[int]) -> str:
153        qoid_len = (2 + len(oid))
154        qoid_type = ctypes.c_int * qoid_len
155        qoid = (qoid_type)(*([0, 1] + oid))
156        p_qoid = ctypes.POINTER(qoid_type)(qoid)
157
158        buf = ctypes.create_string_buffer(BUFSIZ)
159        buf_void = ctypes.cast(buf, ctypes.c_void_p)
160
161        buf_length = ctypes.sizeof(buf)
162        p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length))
163
164        freebsd_sysctl.libc.dll.sysctl(
165            p_qoid,
166            qoid_len,
167            buf_void,
168            p_buf_length,
169            0,
170            0
171        )
172        return buf.value.decode()
173
174    @staticmethod
175    def query_fmt(oid: typing.List[int]) -> bytes:
176
177        qoid_len = (2 + len(oid))
178        qoid_type = ctypes.c_int * qoid_len
179        qoid = (qoid_type)(*([0, 4] + oid))
180        p_qoid = ctypes.POINTER(qoid_type)(qoid)
181
182        buf_type = ctypes.c_char * BUFSIZ
183        buf = buf_type()
184        p_buf = ctypes.POINTER(buf_type)(buf)
185        buf_void = ctypes.cast(p_buf, ctypes.c_void_p)
186
187        buf_length = ctypes.sizeof(buf)
188        p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length))
189
190        freebsd_sysctl.libc.dll.sysctl(
191            p_qoid,
192            qoid_len,
193            buf_void,
194            p_buf_length,
195            0,
196            0
197        )
198
199        if len(buf) < 4:
200            raise Exception("response buffer too small")
201        result = buf[:buf_length]
202        kind, = struct.unpack("<I", result[:4])
203        fmt = result[4:]
204        return (kind, fmt)
205
206    @staticmethod
207    def query_size(
208        oid: typing.List[int],
209        ctl_type: freebsd_sysctl.types.CtlType
210    ) -> bytes:
211
212        oid_type = ctypes.c_int * len(oid)
213        _oid = (oid_type)(*oid)
214        p_oid = ctypes.POINTER(oid_type)(_oid)
215
216        length = ctypes.c_int()
217        p_length = ctypes.POINTER(ctypes.c_int)(length)
218
219        freebsd_sysctl.libc.dll.sysctl(
220            p_oid,
221            len(oid),
222            None,
223            p_length,
224            0
225        )
226
227        return max(length.value, ctl_type.min_size)
228
229    @staticmethod
230    def query_value(
231        oid: typing.List[int],
232        size: int,
233        ctl_type: freebsd_sysctl.types.CtlType
234    ) -> bytes:
235
236        # ToDo: check if value is readable
237
238        oid_type = ctypes.c_int * len(oid)
239        _oid = (oid_type)(*oid)
240        p_oid = ctypes.POINTER(oid_type)(_oid)
241
242        buf_type = ctypes.c_char * size
243        buf = buf_type()
244        p_buf = ctypes.POINTER(buf_type)(buf)
245        p_buf_void = ctypes.cast(p_buf, ctypes.c_void_p)
246
247        buf_length = ctypes.sizeof(buf)
248        p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length))
249
250        freebsd_sysctl.libc.dll.sysctl(
251            p_oid,
252            ctypes.c_uint32(len(oid)),
253            p_buf_void,
254            p_buf_length,
255            None,
256            0
257        )
258
259        return ctl_type(buf, size)
260
261    @staticmethod
262    def query_description(
263        oid: typing.List[int]
264    ) -> str:
265
266        qoid_len = (2 + len(oid))
267        qoid_type = ctypes.c_int * qoid_len
268        qoid = (qoid_type)(*([0, 5] + oid))
269        p_qoid = ctypes.POINTER(qoid_type)(qoid)
270
271        buf_type = ctypes.c_char * BUFSIZ
272        buf = buf_type()
273        p_buf = ctypes.POINTER(buf_type)(buf)
274        buf_void = ctypes.cast(p_buf, ctypes.c_void_p)
275
276        buf_length = ctypes.sizeof(buf)
277        p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length))
278
279        freebsd_sysctl.libc.dll.sysctl(
280            p_qoid,
281            qoid_len,
282            buf_void,
283            p_buf_length,
284            0,
285            0
286        )
287
288        return buf.value.decode()
289
290    @staticmethod
291    def query_next(oid: typing.List[int]) -> bytes:
292
293        qoid_len = (2 + len(oid))
294        qoid_type = ctypes.c_int * qoid_len
295        qoid = (qoid_type)(*([0, 2] + oid))
296        p_qoid = ctypes.POINTER(qoid_type)(qoid)
297
298        buf_type = ctypes.c_int * CTL_MAXNAME.value
299        buf = buf_type()
300        p_buf = ctypes.POINTER(buf_type)(buf)
301        buf_void = ctypes.cast(p_buf, ctypes.c_void_p)
302
303        buf_length = ctypes.sizeof(buf)
304        p_buf_length = ctypes.POINTER(ctypes.c_int)(ctypes.c_int(buf_length))
305
306        freebsd_sysctl.libc.dll.sysctl(
307            p_qoid,
308            qoid_len,
309            buf_void,
310            p_buf_length,
311            0,
312            0
313        )
314
315        oid_length = int(
316            p_buf_length.contents.value / ctypes.sizeof(ctypes.c_int)
317        )
318        return buf[0:oid_length]
319
320    @property
321    def ctl_type(self) -> freebsd_sysctl.types.CtlType:
322        return self.get_ctl_type(self.kind, self.fmt)
323
324    @staticmethod
325    def get_ctl_type(
326        kind: int,
327        fmt: bytes
328    ) -> freebsd_sysctl.types.CtlType:
329        return freebsd_sysctl.types.identify_type(kind, fmt)
330
331    def has_flag(self, flag: int) -> bool:
332        """Return is the sysctl has a certain flag."""
333        return (self.kind & flag == flag) is True
334