xref: /freebsd/tests/atf_python/ktest.py (revision 0eb0d233)
13e5d0784SAlexander V. Chernikovimport logging
23e5d0784SAlexander V. Chernikovimport time
33e5d0784SAlexander V. Chernikovfrom typing import NamedTuple
43e5d0784SAlexander V. Chernikov
53e5d0784SAlexander V. Chernikovimport pytest
63e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrNested
73e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.attrs import NlAttrStr
83e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NetlinkMultipartIterator
93e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import NlHelper
103e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink import Nlsock
113e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestAttrType
123e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestInfoMessage
133e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestLogMsgType
143e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgAttrType
153e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import KtestMsgType
163e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.netlink_generic import timespec
173e5d0784SAlexander V. Chernikovfrom atf_python.sys.netlink.utils import NlConst
183e5d0784SAlexander V. Chernikovfrom atf_python.utils import BaseTest
193e5d0784SAlexander V. Chernikovfrom atf_python.utils import libc
203e5d0784SAlexander V. Chernikovfrom atf_python.utils import nodeid_to_method_name
213e5d0784SAlexander V. Chernikov
223e5d0784SAlexander V. Chernikov
233e5d0784SAlexander V. Chernikovdatefmt = "%H:%M:%S"
243e5d0784SAlexander V. Chernikovfmt = "%(asctime)s.%(msecs)03d %(filename)s:%(funcName)s:%(lineno)d %(message)s"
253e5d0784SAlexander V. Chernikovlogging.basicConfig(level=logging.DEBUG, format=fmt, datefmt=datefmt)
263e5d0784SAlexander V. Chernikovlogger = logging.getLogger("ktest")
273e5d0784SAlexander V. Chernikov
283e5d0784SAlexander V. Chernikov
293e5d0784SAlexander V. ChernikovNETLINK_FAMILY = "ktest"
303e5d0784SAlexander V. Chernikov
313e5d0784SAlexander V. Chernikov
323e5d0784SAlexander V. Chernikovclass KtestItem(pytest.Item):
333e5d0784SAlexander V. Chernikov    def __init__(self, *, descr, kcls, **kwargs):
343e5d0784SAlexander V. Chernikov        super().__init__(**kwargs)
353e5d0784SAlexander V. Chernikov        self.descr = descr
363e5d0784SAlexander V. Chernikov        self._kcls = kcls
373e5d0784SAlexander V. Chernikov
383e5d0784SAlexander V. Chernikov    def runtest(self):
393e5d0784SAlexander V. Chernikov        self._kcls().runtest()
403e5d0784SAlexander V. Chernikov
413e5d0784SAlexander V. Chernikov
423e5d0784SAlexander V. Chernikovclass KtestCollector(pytest.Class):
433e5d0784SAlexander V. Chernikov    def collect(self):
443e5d0784SAlexander V. Chernikov        obj = self.obj
453e5d0784SAlexander V. Chernikov        exclude_names = set([n for n in dir(obj) if not n.startswith("_")])
463e5d0784SAlexander V. Chernikov
473e5d0784SAlexander V. Chernikov        autoload = obj.KTEST_MODULE_AUTOLOAD
483e5d0784SAlexander V. Chernikov        module_name = obj.KTEST_MODULE_NAME
493e5d0784SAlexander V. Chernikov        loader = KtestLoader(module_name, autoload)
503e5d0784SAlexander V. Chernikov        ktests = loader.load_ktests()
513e5d0784SAlexander V. Chernikov        if not ktests:
523e5d0784SAlexander V. Chernikov            return
533e5d0784SAlexander V. Chernikov
543e5d0784SAlexander V. Chernikov        orig = pytest.Class.from_parent(self.parent, name=self.name, obj=obj)
553e5d0784SAlexander V. Chernikov        for py_test in orig.collect():
563e5d0784SAlexander V. Chernikov            yield py_test
573e5d0784SAlexander V. Chernikov
583e5d0784SAlexander V. Chernikov        for ktest in ktests:
593e5d0784SAlexander V. Chernikov            name = ktest["name"]
603e5d0784SAlexander V. Chernikov            descr = ktest["desc"]
613e5d0784SAlexander V. Chernikov            if name in exclude_names:
623e5d0784SAlexander V. Chernikov                continue
633e5d0784SAlexander V. Chernikov            yield KtestItem.from_parent(self, name=name, descr=descr, kcls=obj)
643e5d0784SAlexander V. Chernikov
653e5d0784SAlexander V. Chernikov
663e5d0784SAlexander V. Chernikovclass KtestLoader(object):
673e5d0784SAlexander V. Chernikov    def __init__(self, module_name: str, autoload: bool):
683e5d0784SAlexander V. Chernikov        self.module_name = module_name
693e5d0784SAlexander V. Chernikov        self.autoload = autoload
703e5d0784SAlexander V. Chernikov        self.helper = NlHelper()
713e5d0784SAlexander V. Chernikov        self.nlsock = Nlsock(NlConst.NETLINK_GENERIC, self.helper)
723e5d0784SAlexander V. Chernikov        self.family_id = self._get_family_id()
733e5d0784SAlexander V. Chernikov
743e5d0784SAlexander V. Chernikov    def _get_family_id(self):
753e5d0784SAlexander V. Chernikov        try:
763e5d0784SAlexander V. Chernikov            family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY)
773e5d0784SAlexander V. Chernikov        except ValueError:
783e5d0784SAlexander V. Chernikov            if self.autoload:
793e5d0784SAlexander V. Chernikov                libc.kldload(self.module_name)
803e5d0784SAlexander V. Chernikov                family_id = self.nlsock.get_genl_family_id(NETLINK_FAMILY)
813e5d0784SAlexander V. Chernikov            else:
823e5d0784SAlexander V. Chernikov                raise
833e5d0784SAlexander V. Chernikov        return family_id
843e5d0784SAlexander V. Chernikov
853e5d0784SAlexander V. Chernikov    def _load_ktests(self):
863e5d0784SAlexander V. Chernikov        msg = KtestInfoMessage(self.helper, self.family_id, KtestMsgType.KTEST_CMD_LIST)
873e5d0784SAlexander V. Chernikov        msg.set_request()
883e5d0784SAlexander V. Chernikov        msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, self.module_name))
893e5d0784SAlexander V. Chernikov        self.nlsock.write_message(msg, verbose=False)
903e5d0784SAlexander V. Chernikov        nlmsg_seq = msg.nl_hdr.nlmsg_seq
913e5d0784SAlexander V. Chernikov
923e5d0784SAlexander V. Chernikov        ret = []
933e5d0784SAlexander V. Chernikov        for rx_msg in NetlinkMultipartIterator(self.nlsock, nlmsg_seq, self.family_id):
940eb0d233SAlexander V. Chernikov            # rx_msg.print_message()
953e5d0784SAlexander V. Chernikov            tst = {
963e5d0784SAlexander V. Chernikov                "mod_name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_MOD_NAME).text,
973e5d0784SAlexander V. Chernikov                "name": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_NAME).text,
983e5d0784SAlexander V. Chernikov                "desc": rx_msg.get_nla(KtestAttrType.KTEST_ATTR_TEST_DESCR).text,
993e5d0784SAlexander V. Chernikov            }
1003e5d0784SAlexander V. Chernikov            ret.append(tst)
1013e5d0784SAlexander V. Chernikov        return ret
1023e5d0784SAlexander V. Chernikov
1033e5d0784SAlexander V. Chernikov    def load_ktests(self):
1043e5d0784SAlexander V. Chernikov        ret = self._load_ktests()
1053e5d0784SAlexander V. Chernikov        if not ret and self.autoload:
1063e5d0784SAlexander V. Chernikov            libc.kldload(self.module_name)
1073e5d0784SAlexander V. Chernikov            ret = self._load_ktests()
1083e5d0784SAlexander V. Chernikov        return ret
1093e5d0784SAlexander V. Chernikov
1103e5d0784SAlexander V. Chernikov
1113e5d0784SAlexander V. Chernikovdef generate_ktests(collector, name, obj):
1123e5d0784SAlexander V. Chernikov    if getattr(obj, "KTEST_MODULE_NAME", None) is not None:
1133e5d0784SAlexander V. Chernikov        return KtestCollector.from_parent(collector, name=name, obj=obj)
1143e5d0784SAlexander V. Chernikov    return None
1153e5d0784SAlexander V. Chernikov
1163e5d0784SAlexander V. Chernikov
1173e5d0784SAlexander V. Chernikovclass BaseKernelTest(BaseTest):
1183e5d0784SAlexander V. Chernikov    KTEST_MODULE_AUTOLOAD = True
1193e5d0784SAlexander V. Chernikov    KTEST_MODULE_NAME = None
1203e5d0784SAlexander V. Chernikov
1213e5d0784SAlexander V. Chernikov    def _get_record_time(self, msg) -> float:
1223e5d0784SAlexander V. Chernikov        timespec = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TS).ts
1233e5d0784SAlexander V. Chernikov        epoch_ktime = timespec.tv_sec * 1.0 + timespec.tv_nsec * 1.0 / 1000000000
1243e5d0784SAlexander V. Chernikov        if not hasattr(self, "_start_epoch"):
1253e5d0784SAlexander V. Chernikov            self._start_ktime = epoch_ktime
1263e5d0784SAlexander V. Chernikov            self._start_time = time.time()
1273e5d0784SAlexander V. Chernikov            epoch_time = self._start_time
1283e5d0784SAlexander V. Chernikov        else:
1293e5d0784SAlexander V. Chernikov            epoch_time = time.time() - self._start_time + epoch_ktime
1303e5d0784SAlexander V. Chernikov        return epoch_time
1313e5d0784SAlexander V. Chernikov
1323e5d0784SAlexander V. Chernikov    def _log_message(self, msg):
1333e5d0784SAlexander V. Chernikov        # Convert syslog-type l
1343e5d0784SAlexander V. Chernikov        syslog_level = msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LEVEL).u8
1353e5d0784SAlexander V. Chernikov        if syslog_level <= 6:
1363e5d0784SAlexander V. Chernikov            loglevel = logging.INFO
1373e5d0784SAlexander V. Chernikov        else:
1383e5d0784SAlexander V. Chernikov            loglevel = logging.DEBUG
1393e5d0784SAlexander V. Chernikov        rec = logging.LogRecord(
1403e5d0784SAlexander V. Chernikov            self.KTEST_MODULE_NAME,
1413e5d0784SAlexander V. Chernikov            loglevel,
1423e5d0784SAlexander V. Chernikov            msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FILE).text,
1433e5d0784SAlexander V. Chernikov            msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_LINE).u32,
1443e5d0784SAlexander V. Chernikov            "%s",
1453e5d0784SAlexander V. Chernikov            (msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_TEXT).text),
1463e5d0784SAlexander V. Chernikov            None,
1473e5d0784SAlexander V. Chernikov            msg.get_nla(KtestMsgAttrType.KTEST_MSG_ATTR_FUNC).text,
1483e5d0784SAlexander V. Chernikov            None,
1493e5d0784SAlexander V. Chernikov        )
1503e5d0784SAlexander V. Chernikov        rec.created = self._get_record_time(msg)
1513e5d0784SAlexander V. Chernikov        logger.handle(rec)
1523e5d0784SAlexander V. Chernikov
1533e5d0784SAlexander V. Chernikov    def _runtest_name(self, test_name: str, test_data):
1543e5d0784SAlexander V. Chernikov        module_name = self.KTEST_MODULE_NAME
1553e5d0784SAlexander V. Chernikov        # print("Running kernel test {} for module {}".format(test_name, module_name))
1563e5d0784SAlexander V. Chernikov        helper = NlHelper()
1573e5d0784SAlexander V. Chernikov        nlsock = Nlsock(NlConst.NETLINK_GENERIC, helper)
1583e5d0784SAlexander V. Chernikov        family_id = nlsock.get_genl_family_id(NETLINK_FAMILY)
1593e5d0784SAlexander V. Chernikov        msg = KtestInfoMessage(helper, family_id, KtestMsgType.KTEST_CMD_RUN)
1603e5d0784SAlexander V. Chernikov        msg.set_request()
1613e5d0784SAlexander V. Chernikov        msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_MOD_NAME, module_name))
1623e5d0784SAlexander V. Chernikov        msg.add_nla(NlAttrStr(KtestAttrType.KTEST_ATTR_TEST_NAME, test_name))
1633e5d0784SAlexander V. Chernikov        if test_data is not None:
1643e5d0784SAlexander V. Chernikov            msg.add_nla(NlAttrNested(KtestAttrType.KTEST_ATTR_TEST_META, test_data))
1653e5d0784SAlexander V. Chernikov        nlsock.write_message(msg, verbose=False)
1663e5d0784SAlexander V. Chernikov
1673e5d0784SAlexander V. Chernikov        for log_msg in NetlinkMultipartIterator(
1683e5d0784SAlexander V. Chernikov            nlsock, msg.nl_hdr.nlmsg_seq, family_id
1693e5d0784SAlexander V. Chernikov        ):
1703e5d0784SAlexander V. Chernikov            self._log_message(log_msg)
1713e5d0784SAlexander V. Chernikov
1723e5d0784SAlexander V. Chernikov    def runtest(self, test_data=None):
1733e5d0784SAlexander V. Chernikov        self._runtest_name(nodeid_to_method_name(self.test_id), test_data)
174