1#!/usr/local/bin/python3
2from ctypes import c_int64
3from ctypes import c_long
4from ctypes import sizeof
5from ctypes import Structure
6from enum import Enum
7import struct
8
9from atf_python.sys.netlink.attrs import NlAttr
10from atf_python.sys.netlink.attrs import NlAttrStr
11from atf_python.sys.netlink.attrs import NlAttrU16
12from atf_python.sys.netlink.attrs import NlAttrU32
13from atf_python.sys.netlink.attrs import NlAttrU8
14from atf_python.sys.netlink.base_headers import GenlMsgHdr
15from atf_python.sys.netlink.message import NlMsgCategory
16from atf_python.sys.netlink.message import NlMsgProps
17from atf_python.sys.netlink.message import StdNetlinkMessage
18from atf_python.sys.netlink.utils import AttrDescr
19from atf_python.sys.netlink.utils import prepare_attrs_map
20from atf_python.sys.netlink.utils import enum_or_int
21
22
23class NetlinkGenlMessage(StdNetlinkMessage):
24    messages = []
25    nl_attrs_map = {}
26    family_name = None
27
28    def __init__(self, helper, family_id, cmd=0):
29        super().__init__(helper, family_id)
30        self.base_hdr = GenlMsgHdr(cmd=enum_or_int(cmd))
31
32    def parse_base_header(self, data):
33        if len(data) < sizeof(GenlMsgHdr):
34            raise ValueError("length less than GenlMsgHdr header")
35        ghdr = GenlMsgHdr.from_buffer_copy(data)
36        return (ghdr, sizeof(GenlMsgHdr))
37
38    def _get_msg_type(self):
39        return self.base_hdr.cmd
40
41    def print_nl_header(self, prepend=""):
42        # len=44, type=RTM_DELROUTE, flags=NLM_F_REQUEST|NLM_F_ACK, seq=1641163704, pid=0  # noqa: E501
43        hdr = self.nl_hdr
44        print(
45            "{}len={}, family={}, flags={}(0x{:X}), seq={}, pid={}".format(
46                prepend,
47                hdr.nlmsg_len,
48                self.family_name,
49                self.get_nlm_flags_str(),
50                hdr.nlmsg_flags,
51                hdr.nlmsg_seq,
52                hdr.nlmsg_pid,
53            )
54        )
55
56    def print_base_header(self, hdr, prepend=""):
57        print(
58            "{}cmd={} version={} reserved={}".format(
59                prepend, self.msg_name, hdr.version, hdr.reserved
60            )
61        )
62
63
64GenlCtrlFamilyName = "nlctrl"
65
66class GenlCtrlMsgType(Enum):
67    CTRL_CMD_UNSPEC = 0
68    CTRL_CMD_NEWFAMILY = 1
69    CTRL_CMD_DELFAMILY = 2
70    CTRL_CMD_GETFAMILY = 3
71    CTRL_CMD_NEWOPS = 4
72    CTRL_CMD_DELOPS = 5
73    CTRL_CMD_GETOPS = 6
74    CTRL_CMD_NEWMCAST_GRP = 7
75    CTRL_CMD_DELMCAST_GRP = 8
76    CTRL_CMD_GETMCAST_GRP = 9
77    CTRL_CMD_GETPOLICY = 10
78
79
80class GenlCtrlAttrType(Enum):
81    CTRL_ATTR_FAMILY_ID = 1
82    CTRL_ATTR_FAMILY_NAME = 2
83    CTRL_ATTR_VERSION = 3
84    CTRL_ATTR_HDRSIZE = 4
85    CTRL_ATTR_MAXATTR = 5
86    CTRL_ATTR_OPS = 6
87    CTRL_ATTR_MCAST_GROUPS = 7
88    CTRL_ATTR_POLICY = 8
89    CTRL_ATTR_OP_POLICY = 9
90    CTRL_ATTR_OP = 10
91
92
93genl_ctrl_attrs = prepare_attrs_map(
94    [
95        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_ID, NlAttrU16),
96        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_FAMILY_NAME, NlAttrStr),
97        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_VERSION, NlAttrU32),
98        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_HDRSIZE, NlAttrU32),
99        AttrDescr(GenlCtrlAttrType.CTRL_ATTR_MAXATTR, NlAttrU32),
100    ]
101)
102
103
104class NetlinkGenlCtrlMessage(NetlinkGenlMessage):
105    messages = [
106        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_NEWFAMILY, NlMsgCategory.NEW),
107        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_GETFAMILY, NlMsgCategory.GET),
108        NlMsgProps(GenlCtrlMsgType.CTRL_CMD_DELFAMILY, NlMsgCategory.DELETE),
109    ]
110    nl_attrs_map = genl_ctrl_attrs
111    family_name = GenlCtrlFamilyName
112
113
114KtestFamilyName = "ktest"
115
116
117class KtestMsgType(Enum):
118    KTEST_CMD_UNSPEC = 0
119    KTEST_CMD_LIST = 1
120    KTEST_CMD_RUN = 2
121    KTEST_CMD_NEWTEST = 3
122    KTEST_CMD_NEWMESSAGE = 4
123
124
125class KtestAttrType(Enum):
126    KTEST_ATTR_MOD_NAME = 1
127    KTEST_ATTR_TEST_NAME = 2
128    KTEST_ATTR_TEST_DESCR = 3
129    KTEST_ATTR_TEST_META = 4
130
131
132class KtestLogMsgType(Enum):
133    KTEST_MSG_START = 1
134    KTEST_MSG_END = 2
135    KTEST_MSG_LOG = 3
136    KTEST_MSG_FAIL = 4
137
138
139class KtestMsgAttrType(Enum):
140    KTEST_MSG_ATTR_TS = 1
141    KTEST_MSG_ATTR_FUNC = 2
142    KTEST_MSG_ATTR_FILE = 3
143    KTEST_MSG_ATTR_LINE = 4
144    KTEST_MSG_ATTR_TEXT = 5
145    KTEST_MSG_ATTR_LEVEL = 6
146    KTEST_MSG_ATTR_META = 7
147
148
149class timespec(Structure):
150    _fields_ = [
151        ("tv_sec", c_int64),
152        ("tv_nsec", c_long),
153    ]
154
155
156class NlAttrTS(NlAttr):
157    DATA_LEN = sizeof(timespec)
158
159    def __init__(self, nla_type, val):
160        self.ts = val
161        super().__init__(nla_type, b"")
162
163    @property
164    def nla_len(self):
165        return NlAttr.HDR_LEN + self.DATA_LEN
166
167    def _print_attr_value(self):
168        return " tv_sec={} tv_nsec={}".format(self.ts.tv_sec, self.ts.tv_nsec)
169
170    @staticmethod
171    def _validate(data):
172        assert len(data) == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
173        nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
174        assert nla_len == NlAttr.HDR_LEN + NlAttrTS.DATA_LEN
175
176    @classmethod
177    def _parse(cls, data):
178        nla_len, nla_type = struct.unpack("@HH", data[:NlAttr.HDR_LEN])
179        val = timespec.from_buffer_copy(data[NlAttr.HDR_LEN:])
180        return cls(nla_type, val)
181
182    def __bytes__(self):
183        return self._to_bytes(bytes(self.ts))
184
185
186ktest_info_attrs = prepare_attrs_map(
187    [
188        AttrDescr(KtestAttrType.KTEST_ATTR_MOD_NAME, NlAttrStr),
189        AttrDescr(KtestAttrType.KTEST_ATTR_TEST_NAME, NlAttrStr),
190        AttrDescr(KtestAttrType.KTEST_ATTR_TEST_DESCR, NlAttrStr),
191    ]
192)
193
194
195ktest_msg_attrs = prepare_attrs_map(
196    [
197        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC, NlAttrStr),
198        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_FILE, NlAttrStr),
199        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LINE, NlAttrU32),
200        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT, NlAttrStr),
201        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL, NlAttrU8),
202        AttrDescr(KtestMsgAttrType.KTEST_MSG_ATTR_TS, NlAttrTS),
203    ]
204)
205
206
207class KtestInfoMessage(NetlinkGenlMessage):
208    messages = [
209        NlMsgProps(KtestMsgType.KTEST_CMD_LIST, NlMsgCategory.GET),
210        NlMsgProps(KtestMsgType.KTEST_CMD_RUN, NlMsgCategory.NEW),
211        NlMsgProps(KtestMsgType.KTEST_CMD_NEWTEST, NlMsgCategory.NEW),
212    ]
213    nl_attrs_map = ktest_info_attrs
214    family_name = KtestFamilyName
215
216
217class KtestMsgMessage(NetlinkGenlMessage):
218    messages = [
219        NlMsgProps(KtestMsgType.KTEST_CMD_NEWMESSAGE, NlMsgCategory.NEW),
220    ]
221    nl_attrs_map = ktest_msg_attrs
222    family_name = KtestFamilyName
223
224
225handler_classes = {
226    GenlCtrlFamilyName: [NetlinkGenlCtrlMessage],
227    KtestFamilyName: [KtestInfoMessage, KtestMsgMessage],
228}
229