xref: /freebsd/tests/atf_python/sys/net/rtsock.py (revision 8eb2bee6)
18eb2bee6SAlexander V. Chernikov#!/usr/local/bin/python3
28eb2bee6SAlexander V. Chernikovimport os
38eb2bee6SAlexander V. Chernikovimport socket
48eb2bee6SAlexander V. Chernikovimport struct
58eb2bee6SAlexander V. Chernikovimport sys
68eb2bee6SAlexander V. Chernikovfrom ctypes import c_byte
78eb2bee6SAlexander V. Chernikovfrom ctypes import c_char
88eb2bee6SAlexander V. Chernikovfrom ctypes import c_int
98eb2bee6SAlexander V. Chernikovfrom ctypes import c_long
108eb2bee6SAlexander V. Chernikovfrom ctypes import c_uint32
118eb2bee6SAlexander V. Chernikovfrom ctypes import c_ulong
128eb2bee6SAlexander V. Chernikovfrom ctypes import c_ushort
138eb2bee6SAlexander V. Chernikovfrom ctypes import sizeof
148eb2bee6SAlexander V. Chernikovfrom ctypes import Structure
158eb2bee6SAlexander V. Chernikovfrom typing import Dict
168eb2bee6SAlexander V. Chernikovfrom typing import List
178eb2bee6SAlexander V. Chernikovfrom typing import Optional
188eb2bee6SAlexander V. Chernikovfrom typing import Union
198eb2bee6SAlexander V. Chernikov
208eb2bee6SAlexander V. Chernikov
218eb2bee6SAlexander V. Chernikovdef roundup2(val: int, num: int) -> int:
228eb2bee6SAlexander V. Chernikov    if val % num:
238eb2bee6SAlexander V. Chernikov        return (val | (num - 1)) + 1
248eb2bee6SAlexander V. Chernikov    else:
258eb2bee6SAlexander V. Chernikov        return val
268eb2bee6SAlexander V. Chernikov
278eb2bee6SAlexander V. Chernikov
288eb2bee6SAlexander V. Chernikovclass RtSockException(OSError):
298eb2bee6SAlexander V. Chernikov    pass
308eb2bee6SAlexander V. Chernikov
318eb2bee6SAlexander V. Chernikov
328eb2bee6SAlexander V. Chernikovclass RtConst:
338eb2bee6SAlexander V. Chernikov    RTM_VERSION = 5
348eb2bee6SAlexander V. Chernikov    ALIGN = sizeof(c_long)
358eb2bee6SAlexander V. Chernikov
368eb2bee6SAlexander V. Chernikov    AF_INET = socket.AF_INET
378eb2bee6SAlexander V. Chernikov    AF_INET6 = socket.AF_INET6
388eb2bee6SAlexander V. Chernikov    AF_LINK = socket.AF_LINK
398eb2bee6SAlexander V. Chernikov
408eb2bee6SAlexander V. Chernikov    RTA_DST = 0x1
418eb2bee6SAlexander V. Chernikov    RTA_GATEWAY = 0x2
428eb2bee6SAlexander V. Chernikov    RTA_NETMASK = 0x4
438eb2bee6SAlexander V. Chernikov    RTA_GENMASK = 0x8
448eb2bee6SAlexander V. Chernikov    RTA_IFP = 0x10
458eb2bee6SAlexander V. Chernikov    RTA_IFA = 0x20
468eb2bee6SAlexander V. Chernikov    RTA_AUTHOR = 0x40
478eb2bee6SAlexander V. Chernikov    RTA_BRD = 0x80
488eb2bee6SAlexander V. Chernikov
498eb2bee6SAlexander V. Chernikov    RTM_ADD = 1
508eb2bee6SAlexander V. Chernikov    RTM_DELETE = 2
518eb2bee6SAlexander V. Chernikov    RTM_CHANGE = 3
528eb2bee6SAlexander V. Chernikov    RTM_GET = 4
538eb2bee6SAlexander V. Chernikov
548eb2bee6SAlexander V. Chernikov    RTF_UP = 0x1
558eb2bee6SAlexander V. Chernikov    RTF_GATEWAY = 0x2
568eb2bee6SAlexander V. Chernikov    RTF_HOST = 0x4
578eb2bee6SAlexander V. Chernikov    RTF_REJECT = 0x8
588eb2bee6SAlexander V. Chernikov    RTF_DYNAMIC = 0x10
598eb2bee6SAlexander V. Chernikov    RTF_MODIFIED = 0x20
608eb2bee6SAlexander V. Chernikov    RTF_DONE = 0x40
618eb2bee6SAlexander V. Chernikov    RTF_XRESOLVE = 0x200
628eb2bee6SAlexander V. Chernikov    RTF_LLINFO = 0x400
638eb2bee6SAlexander V. Chernikov    RTF_LLDATA = 0x400
648eb2bee6SAlexander V. Chernikov    RTF_STATIC = 0x800
658eb2bee6SAlexander V. Chernikov    RTF_BLACKHOLE = 0x1000
668eb2bee6SAlexander V. Chernikov    RTF_PROTO2 = 0x4000
678eb2bee6SAlexander V. Chernikov    RTF_PROTO1 = 0x8000
688eb2bee6SAlexander V. Chernikov    RTF_PROTO3 = 0x40000
698eb2bee6SAlexander V. Chernikov    RTF_FIXEDMTU = 0x80000
708eb2bee6SAlexander V. Chernikov    RTF_PINNED = 0x100000
718eb2bee6SAlexander V. Chernikov    RTF_LOCAL = 0x200000
728eb2bee6SAlexander V. Chernikov    RTF_BROADCAST = 0x400000
738eb2bee6SAlexander V. Chernikov    RTF_MULTICAST = 0x800000
748eb2bee6SAlexander V. Chernikov    RTF_STICKY = 0x10000000
758eb2bee6SAlexander V. Chernikov    RTF_RNH_LOCKED = 0x40000000
768eb2bee6SAlexander V. Chernikov    RTF_GWFLAG_COMPAT = 0x80000000
778eb2bee6SAlexander V. Chernikov
788eb2bee6SAlexander V. Chernikov    RTV_MTU = 0x1
798eb2bee6SAlexander V. Chernikov    RTV_HOPCOUNT = 0x2
808eb2bee6SAlexander V. Chernikov    RTV_EXPIRE = 0x4
818eb2bee6SAlexander V. Chernikov    RTV_RPIPE = 0x8
828eb2bee6SAlexander V. Chernikov    RTV_SPIPE = 0x10
838eb2bee6SAlexander V. Chernikov    RTV_SSTHRESH = 0x20
848eb2bee6SAlexander V. Chernikov    RTV_RTT = 0x40
858eb2bee6SAlexander V. Chernikov    RTV_RTTVAR = 0x80
868eb2bee6SAlexander V. Chernikov    RTV_WEIGHT = 0x100
878eb2bee6SAlexander V. Chernikov
888eb2bee6SAlexander V. Chernikov    @staticmethod
898eb2bee6SAlexander V. Chernikov    def get_props(prefix: str) -> List[str]:
908eb2bee6SAlexander V. Chernikov        return [n for n in dir(RtConst) if n.startswith(prefix)]
918eb2bee6SAlexander V. Chernikov
928eb2bee6SAlexander V. Chernikov    @staticmethod
938eb2bee6SAlexander V. Chernikov    def get_name(prefix: str, value: int) -> str:
948eb2bee6SAlexander V. Chernikov        props = RtConst.get_props(prefix)
958eb2bee6SAlexander V. Chernikov        for prop in props:
968eb2bee6SAlexander V. Chernikov            if getattr(RtConst, prop) == value:
978eb2bee6SAlexander V. Chernikov                return prop
988eb2bee6SAlexander V. Chernikov        return "U:{}:{}".format(prefix, value)
998eb2bee6SAlexander V. Chernikov
1008eb2bee6SAlexander V. Chernikov    @staticmethod
1018eb2bee6SAlexander V. Chernikov    def get_bitmask_map(prefix: str, value: int) -> Dict[int, str]:
1028eb2bee6SAlexander V. Chernikov        props = RtConst.get_props(prefix)
1038eb2bee6SAlexander V. Chernikov        propmap = {getattr(RtConst, prop): prop for prop in props}
1048eb2bee6SAlexander V. Chernikov        v = 1
1058eb2bee6SAlexander V. Chernikov        ret = {}
1068eb2bee6SAlexander V. Chernikov        while value:
1078eb2bee6SAlexander V. Chernikov            if v & value:
1088eb2bee6SAlexander V. Chernikov                if v in propmap:
1098eb2bee6SAlexander V. Chernikov                    ret[v] = propmap[v]
1108eb2bee6SAlexander V. Chernikov                else:
1118eb2bee6SAlexander V. Chernikov                    ret[v] = hex(v)
1128eb2bee6SAlexander V. Chernikov                value -= v
1138eb2bee6SAlexander V. Chernikov            v *= 2
1148eb2bee6SAlexander V. Chernikov        return ret
1158eb2bee6SAlexander V. Chernikov
1168eb2bee6SAlexander V. Chernikov    @staticmethod
1178eb2bee6SAlexander V. Chernikov    def get_bitmask_str(prefix: str, value: int) -> str:
1188eb2bee6SAlexander V. Chernikov        bmap = RtConst.get_bitmask_map(prefix, value)
1198eb2bee6SAlexander V. Chernikov        return ",".join([v for k, v in bmap.items()])
1208eb2bee6SAlexander V. Chernikov
1218eb2bee6SAlexander V. Chernikov
1228eb2bee6SAlexander V. Chernikovclass RtMetrics(Structure):
1238eb2bee6SAlexander V. Chernikov    _fields_ = [
1248eb2bee6SAlexander V. Chernikov        ("rmx_locks", c_ulong),
1258eb2bee6SAlexander V. Chernikov        ("rmx_mtu", c_ulong),
1268eb2bee6SAlexander V. Chernikov        ("rmx_hopcount", c_ulong),
1278eb2bee6SAlexander V. Chernikov        ("rmx_expire", c_ulong),
1288eb2bee6SAlexander V. Chernikov        ("rmx_recvpipe", c_ulong),
1298eb2bee6SAlexander V. Chernikov        ("rmx_sendpipe", c_ulong),
1308eb2bee6SAlexander V. Chernikov        ("rmx_ssthresh", c_ulong),
1318eb2bee6SAlexander V. Chernikov        ("rmx_rtt", c_ulong),
1328eb2bee6SAlexander V. Chernikov        ("rmx_rttvar", c_ulong),
1338eb2bee6SAlexander V. Chernikov        ("rmx_pksent", c_ulong),
1348eb2bee6SAlexander V. Chernikov        ("rmx_weight", c_ulong),
1358eb2bee6SAlexander V. Chernikov        ("rmx_nhidx", c_ulong),
1368eb2bee6SAlexander V. Chernikov        ("rmx_filler", c_ulong * 2),
1378eb2bee6SAlexander V. Chernikov    ]
1388eb2bee6SAlexander V. Chernikov
1398eb2bee6SAlexander V. Chernikov
1408eb2bee6SAlexander V. Chernikovclass RtMsgHdr(Structure):
1418eb2bee6SAlexander V. Chernikov    _fields_ = [
1428eb2bee6SAlexander V. Chernikov        ("rtm_msglen", c_ushort),
1438eb2bee6SAlexander V. Chernikov        ("rtm_version", c_byte),
1448eb2bee6SAlexander V. Chernikov        ("rtm_type", c_byte),
1458eb2bee6SAlexander V. Chernikov        ("rtm_index", c_ushort),
1468eb2bee6SAlexander V. Chernikov        ("_rtm_spare1", c_ushort),
1478eb2bee6SAlexander V. Chernikov        ("rtm_flags", c_int),
1488eb2bee6SAlexander V. Chernikov        ("rtm_addrs", c_int),
1498eb2bee6SAlexander V. Chernikov        ("rtm_pid", c_int),
1508eb2bee6SAlexander V. Chernikov        ("rtm_seq", c_int),
1518eb2bee6SAlexander V. Chernikov        ("rtm_errno", c_int),
1528eb2bee6SAlexander V. Chernikov        ("rtm_fmask", c_int),
1538eb2bee6SAlexander V. Chernikov        ("rtm_inits", c_ulong),
1548eb2bee6SAlexander V. Chernikov        ("rtm_rmx", RtMetrics),
1558eb2bee6SAlexander V. Chernikov    ]
1568eb2bee6SAlexander V. Chernikov
1578eb2bee6SAlexander V. Chernikov
1588eb2bee6SAlexander V. Chernikovclass SockaddrIn(Structure):
1598eb2bee6SAlexander V. Chernikov    _fields_ = [
1608eb2bee6SAlexander V. Chernikov        ("sin_len", c_byte),
1618eb2bee6SAlexander V. Chernikov        ("sin_family", c_byte),
1628eb2bee6SAlexander V. Chernikov        ("sin_port", c_ushort),
1638eb2bee6SAlexander V. Chernikov        ("sin_addr", c_uint32),
1648eb2bee6SAlexander V. Chernikov        ("sin_zero", c_char * 8),
1658eb2bee6SAlexander V. Chernikov    ]
1668eb2bee6SAlexander V. Chernikov
1678eb2bee6SAlexander V. Chernikov
1688eb2bee6SAlexander V. Chernikovclass SockaddrIn6(Structure):
1698eb2bee6SAlexander V. Chernikov    _fields_ = [
1708eb2bee6SAlexander V. Chernikov        ("sin6_len", c_byte),
1718eb2bee6SAlexander V. Chernikov        ("sin6_family", c_byte),
1728eb2bee6SAlexander V. Chernikov        ("sin6_port", c_ushort),
1738eb2bee6SAlexander V. Chernikov        ("sin6_flowinfo", c_uint32),
1748eb2bee6SAlexander V. Chernikov        ("sin6_addr", c_byte * 16),
1758eb2bee6SAlexander V. Chernikov        ("sin6_scope_id", c_uint32),
1768eb2bee6SAlexander V. Chernikov    ]
1778eb2bee6SAlexander V. Chernikov
1788eb2bee6SAlexander V. Chernikov
1798eb2bee6SAlexander V. Chernikovclass SockaddrDl(Structure):
1808eb2bee6SAlexander V. Chernikov    _fields_ = [
1818eb2bee6SAlexander V. Chernikov        ("sdl_len", c_byte),
1828eb2bee6SAlexander V. Chernikov        ("sdl_family", c_byte),
1838eb2bee6SAlexander V. Chernikov        ("sdl_index", c_ushort),
1848eb2bee6SAlexander V. Chernikov        ("sdl_type", c_byte),
1858eb2bee6SAlexander V. Chernikov        ("sdl_nlen", c_byte),
1868eb2bee6SAlexander V. Chernikov        ("sdl_alen", c_byte),
1878eb2bee6SAlexander V. Chernikov        ("sdl_slen", c_byte),
1888eb2bee6SAlexander V. Chernikov        ("sdl_data", c_byte * 8),
1898eb2bee6SAlexander V. Chernikov    ]
1908eb2bee6SAlexander V. Chernikov
1918eb2bee6SAlexander V. Chernikov
1928eb2bee6SAlexander V. Chernikovclass SaHelper(object):
1938eb2bee6SAlexander V. Chernikov    @staticmethod
1948eb2bee6SAlexander V. Chernikov    def is_ipv6(ip: str) -> bool:
1958eb2bee6SAlexander V. Chernikov        return ":" in ip
1968eb2bee6SAlexander V. Chernikov
1978eb2bee6SAlexander V. Chernikov    @staticmethod
1988eb2bee6SAlexander V. Chernikov    def ip_sa(ip: str, scopeid: int = 0) -> bytes:
1998eb2bee6SAlexander V. Chernikov        if SaHelper.is_ipv6(ip):
2008eb2bee6SAlexander V. Chernikov            return SaHelper.ip6_sa(ip, scopeid)
2018eb2bee6SAlexander V. Chernikov        else:
2028eb2bee6SAlexander V. Chernikov            return SaHelper.ip4_sa(ip)
2038eb2bee6SAlexander V. Chernikov
2048eb2bee6SAlexander V. Chernikov    @staticmethod
2058eb2bee6SAlexander V. Chernikov    def ip4_sa(ip: str) -> bytes:
2068eb2bee6SAlexander V. Chernikov        addr_int = int.from_bytes(socket.inet_pton(2, ip), sys.byteorder)
2078eb2bee6SAlexander V. Chernikov        sin = SockaddrIn(sizeof(SockaddrIn), socket.AF_INET, 0, addr_int)
2088eb2bee6SAlexander V. Chernikov        return bytes(sin)
2098eb2bee6SAlexander V. Chernikov
2108eb2bee6SAlexander V. Chernikov    @staticmethod
2118eb2bee6SAlexander V. Chernikov    def ip6_sa(ip6: str, scopeid: int) -> bytes:
2128eb2bee6SAlexander V. Chernikov        addr_bytes = (c_byte * 16)()
2138eb2bee6SAlexander V. Chernikov        for i, b in enumerate(socket.inet_pton(socket.AF_INET6, ip6)):
2148eb2bee6SAlexander V. Chernikov            addr_bytes[i] = b
2158eb2bee6SAlexander V. Chernikov        sin6 = SockaddrIn6(
2168eb2bee6SAlexander V. Chernikov            sizeof(SockaddrIn6), socket.AF_INET6, 0, 0, addr_bytes, scopeid
2178eb2bee6SAlexander V. Chernikov        )
2188eb2bee6SAlexander V. Chernikov        return bytes(sin6)
2198eb2bee6SAlexander V. Chernikov
2208eb2bee6SAlexander V. Chernikov    @staticmethod
2218eb2bee6SAlexander V. Chernikov    def link_sa(ifindex: int = 0, iftype: int = 0) -> bytes:
2228eb2bee6SAlexander V. Chernikov        sa = SockaddrDl(sizeof(SockaddrDl), socket.AF_LINK, c_ushort(ifindex), iftype)
2238eb2bee6SAlexander V. Chernikov        return bytes(sa)
2248eb2bee6SAlexander V. Chernikov
2258eb2bee6SAlexander V. Chernikov    @staticmethod
2268eb2bee6SAlexander V. Chernikov    def pxlen4_sa(pxlen: int) -> bytes:
2278eb2bee6SAlexander V. Chernikov        return SaHelper.ip_sa(SaHelper.pxlen_to_ip4(pxlen))
2288eb2bee6SAlexander V. Chernikov
2298eb2bee6SAlexander V. Chernikov    @staticmethod
2308eb2bee6SAlexander V. Chernikov    def pxlen_to_ip4(pxlen: int) -> str:
2318eb2bee6SAlexander V. Chernikov        if pxlen == 32:
2328eb2bee6SAlexander V. Chernikov            return "255.255.255.255"
2338eb2bee6SAlexander V. Chernikov        else:
2348eb2bee6SAlexander V. Chernikov            addr = 0xFFFFFFFF - ((1 << (32 - pxlen)) - 1)
2358eb2bee6SAlexander V. Chernikov            addr_bytes = struct.pack("!I", addr)
2368eb2bee6SAlexander V. Chernikov            return socket.inet_ntop(socket.AF_INET, addr_bytes)
2378eb2bee6SAlexander V. Chernikov
2388eb2bee6SAlexander V. Chernikov    @staticmethod
2398eb2bee6SAlexander V. Chernikov    def pxlen6_sa(pxlen: int) -> bytes:
2408eb2bee6SAlexander V. Chernikov        return SaHelper.ip_sa(SaHelper.pxlen_to_ip6(pxlen))
2418eb2bee6SAlexander V. Chernikov
2428eb2bee6SAlexander V. Chernikov    @staticmethod
2438eb2bee6SAlexander V. Chernikov    def pxlen_to_ip6(pxlen: int) -> str:
2448eb2bee6SAlexander V. Chernikov        ip6_b = [0] * 16
2458eb2bee6SAlexander V. Chernikov        start = 0
2468eb2bee6SAlexander V. Chernikov        while pxlen > 8:
2478eb2bee6SAlexander V. Chernikov            ip6_b[start] = 0xFF
2488eb2bee6SAlexander V. Chernikov            pxlen -= 8
2498eb2bee6SAlexander V. Chernikov            start += 1
2508eb2bee6SAlexander V. Chernikov        ip6_b[start] = 0xFF - ((1 << (8 - pxlen)) - 1)
2518eb2bee6SAlexander V. Chernikov        return socket.inet_ntop(socket.AF_INET6, bytes(ip6_b))
2528eb2bee6SAlexander V. Chernikov
2538eb2bee6SAlexander V. Chernikov    @staticmethod
2548eb2bee6SAlexander V. Chernikov    def print_sa_inet(sa: bytes):
2558eb2bee6SAlexander V. Chernikov        if len(sa) < 8:
2568eb2bee6SAlexander V. Chernikov            raise RtSockException("IPv4 sa size too small: {}".format(len(sa)))
2578eb2bee6SAlexander V. Chernikov        addr = socket.inet_ntop(socket.AF_INET, sa[4:8])
2588eb2bee6SAlexander V. Chernikov        return "{}".format(addr)
2598eb2bee6SAlexander V. Chernikov
2608eb2bee6SAlexander V. Chernikov    @staticmethod
2618eb2bee6SAlexander V. Chernikov    def print_sa_inet6(sa: bytes):
2628eb2bee6SAlexander V. Chernikov        if len(sa) < sizeof(SockaddrIn6):
2638eb2bee6SAlexander V. Chernikov            raise RtSockException("IPv6 sa size too small: {}".format(len(sa)))
2648eb2bee6SAlexander V. Chernikov        addr = socket.inet_ntop(socket.AF_INET6, sa[8:24])
2658eb2bee6SAlexander V. Chernikov        scopeid = struct.unpack(">I", sa[24:28])[0]
2668eb2bee6SAlexander V. Chernikov        return "{} scopeid {}".format(addr, scopeid)
2678eb2bee6SAlexander V. Chernikov
2688eb2bee6SAlexander V. Chernikov    @staticmethod
2698eb2bee6SAlexander V. Chernikov    def print_sa_link(sa: bytes, hd: Optional[bool] = True):
2708eb2bee6SAlexander V. Chernikov        if len(sa) < sizeof(SockaddrDl):
2718eb2bee6SAlexander V. Chernikov            raise RtSockException("LINK sa size too small: {}".format(len(sa)))
2728eb2bee6SAlexander V. Chernikov        sdl = SockaddrDl.from_buffer_copy(sa)
2738eb2bee6SAlexander V. Chernikov        if sdl.sdl_index:
2748eb2bee6SAlexander V. Chernikov            ifindex = "link#{} ".format(sdl.sdl_index)
2758eb2bee6SAlexander V. Chernikov        else:
2768eb2bee6SAlexander V. Chernikov            ifindex = ""
2778eb2bee6SAlexander V. Chernikov        if sdl.sdl_nlen:
2788eb2bee6SAlexander V. Chernikov            iface_offset = 8
2798eb2bee6SAlexander V. Chernikov            if sdl.sdl_nlen + iface_offset > len(sa):
2808eb2bee6SAlexander V. Chernikov                raise RtSockException(
2818eb2bee6SAlexander V. Chernikov                    "LINK sa sdl_nlen {} > total len {}".format(sdl.sdl_nlen, len(sa))
2828eb2bee6SAlexander V. Chernikov                )
2838eb2bee6SAlexander V. Chernikov            ifname = "ifname:{} ".format(
2848eb2bee6SAlexander V. Chernikov                bytes.decode(sa[iface_offset : iface_offset + sdl.sdl_nlen])
2858eb2bee6SAlexander V. Chernikov            )
2868eb2bee6SAlexander V. Chernikov        else:
2878eb2bee6SAlexander V. Chernikov            ifname = ""
2888eb2bee6SAlexander V. Chernikov        return "{}{}".format(ifindex, ifname)
2898eb2bee6SAlexander V. Chernikov
2908eb2bee6SAlexander V. Chernikov    @staticmethod
2918eb2bee6SAlexander V. Chernikov    def print_sa_unknown(sa: bytes):
2928eb2bee6SAlexander V. Chernikov        return "unknown_type:{}".format(sa[1])
2938eb2bee6SAlexander V. Chernikov
2948eb2bee6SAlexander V. Chernikov    @classmethod
2958eb2bee6SAlexander V. Chernikov    def print_sa(cls, sa: bytes, hd: Optional[bool] = False):
2968eb2bee6SAlexander V. Chernikov        if sa[0] != len(sa):
2978eb2bee6SAlexander V. Chernikov            raise Exception("sa size {} != buffer size {}".format(sa[0], len(sa)))
2988eb2bee6SAlexander V. Chernikov
2998eb2bee6SAlexander V. Chernikov        if len(sa) < 2:
3008eb2bee6SAlexander V. Chernikov            raise Exception(
3018eb2bee6SAlexander V. Chernikov                "sa type {} too short: {}".format(
3028eb2bee6SAlexander V. Chernikov                    RtConst.get_name("AF_", sa[1]), len(sa)
3038eb2bee6SAlexander V. Chernikov                )
3048eb2bee6SAlexander V. Chernikov            )
3058eb2bee6SAlexander V. Chernikov
3068eb2bee6SAlexander V. Chernikov        if sa[1] == socket.AF_INET:
3078eb2bee6SAlexander V. Chernikov            text = cls.print_sa_inet(sa)
3088eb2bee6SAlexander V. Chernikov        elif sa[1] == socket.AF_INET6:
3098eb2bee6SAlexander V. Chernikov            text = cls.print_sa_inet6(sa)
3108eb2bee6SAlexander V. Chernikov        elif sa[1] == socket.AF_LINK:
3118eb2bee6SAlexander V. Chernikov            text = cls.print_sa_link(sa)
3128eb2bee6SAlexander V. Chernikov        else:
3138eb2bee6SAlexander V. Chernikov            text = cls.print_sa_unknown(sa)
3148eb2bee6SAlexander V. Chernikov        if hd:
3158eb2bee6SAlexander V. Chernikov            dump = " [{!r}]".format(sa)
3168eb2bee6SAlexander V. Chernikov        else:
3178eb2bee6SAlexander V. Chernikov            dump = ""
3188eb2bee6SAlexander V. Chernikov        return "{}{}".format(text, dump)
3198eb2bee6SAlexander V. Chernikov
3208eb2bee6SAlexander V. Chernikov
3218eb2bee6SAlexander V. Chernikovclass BaseRtsockMessage(object):
3228eb2bee6SAlexander V. Chernikov    def __init__(self, rtm_type):
3238eb2bee6SAlexander V. Chernikov        self.rtm_type = rtm_type
3248eb2bee6SAlexander V. Chernikov        self.sa = SaHelper()
3258eb2bee6SAlexander V. Chernikov
3268eb2bee6SAlexander V. Chernikov    @staticmethod
3278eb2bee6SAlexander V. Chernikov    def print_rtm_type(rtm_type):
3288eb2bee6SAlexander V. Chernikov        return RtConst.get_name("RTM_", rtm_type)
3298eb2bee6SAlexander V. Chernikov
3308eb2bee6SAlexander V. Chernikov    @property
3318eb2bee6SAlexander V. Chernikov    def rtm_type_str(self):
3328eb2bee6SAlexander V. Chernikov        return self.print_rtm_type(self.rtm_type)
3338eb2bee6SAlexander V. Chernikov
3348eb2bee6SAlexander V. Chernikov
3358eb2bee6SAlexander V. Chernikovclass RtsockRtMessage(BaseRtsockMessage):
3368eb2bee6SAlexander V. Chernikov    messages = [
3378eb2bee6SAlexander V. Chernikov        RtConst.RTM_ADD,
3388eb2bee6SAlexander V. Chernikov        RtConst.RTM_DELETE,
3398eb2bee6SAlexander V. Chernikov        RtConst.RTM_CHANGE,
3408eb2bee6SAlexander V. Chernikov        RtConst.RTM_GET,
3418eb2bee6SAlexander V. Chernikov    ]
3428eb2bee6SAlexander V. Chernikov
3438eb2bee6SAlexander V. Chernikov    def __init__(self, rtm_type, rtm_seq=1, dst_sa=None, mask_sa=None):
3448eb2bee6SAlexander V. Chernikov        super().__init__(rtm_type)
3458eb2bee6SAlexander V. Chernikov        self.rtm_flags = 0
3468eb2bee6SAlexander V. Chernikov        self.rtm_seq = rtm_seq
3478eb2bee6SAlexander V. Chernikov        self._attrs = {}
3488eb2bee6SAlexander V. Chernikov        self.rtm_errno = 0
3498eb2bee6SAlexander V. Chernikov        self.rtm_pid = 0
3508eb2bee6SAlexander V. Chernikov        self.rtm_inits = 0
3518eb2bee6SAlexander V. Chernikov        self.rtm_rmx = RtMetrics()
3528eb2bee6SAlexander V. Chernikov        self._orig_data = None
3538eb2bee6SAlexander V. Chernikov        if dst_sa:
3548eb2bee6SAlexander V. Chernikov            self.add_sa_attr(RtConst.RTA_DST, dst_sa)
3558eb2bee6SAlexander V. Chernikov        if mask_sa:
3568eb2bee6SAlexander V. Chernikov            self.add_sa_attr(RtConst.RTA_NETMASK, mask_sa)
3578eb2bee6SAlexander V. Chernikov
3588eb2bee6SAlexander V. Chernikov    def add_sa_attr(self, attr_type, attr_bytes: bytes):
3598eb2bee6SAlexander V. Chernikov        self._attrs[attr_type] = attr_bytes
3608eb2bee6SAlexander V. Chernikov
3618eb2bee6SAlexander V. Chernikov    def add_ip_attr(self, attr_type, ip_addr: str, scopeid: int = 0):
3628eb2bee6SAlexander V. Chernikov        if ":" in ip_addr:
3638eb2bee6SAlexander V. Chernikov            self.add_ip6_attr(attr_type, ip_addr, scopeid)
3648eb2bee6SAlexander V. Chernikov        else:
3658eb2bee6SAlexander V. Chernikov            self.add_ip4_attr(attr_type, ip_addr)
3668eb2bee6SAlexander V. Chernikov
3678eb2bee6SAlexander V. Chernikov    def add_ip4_attr(self, attr_type, ip: str):
3688eb2bee6SAlexander V. Chernikov        self.add_sa_attr(attr_type, self.sa.ip_sa(ip))
3698eb2bee6SAlexander V. Chernikov
3708eb2bee6SAlexander V. Chernikov    def add_ip6_attr(self, attr_type, ip6: str, scopeid: int):
3718eb2bee6SAlexander V. Chernikov        self.add_sa_attr(attr_type, self.sa.ip6_sa(ip6, scopeid))
3728eb2bee6SAlexander V. Chernikov
3738eb2bee6SAlexander V. Chernikov    def add_link_attr(self, attr_type, ifindex: Optional[int] = 0):
3748eb2bee6SAlexander V. Chernikov        self.add_sa_attr(attr_type, self.sa.link_sa(ifindex))
3758eb2bee6SAlexander V. Chernikov
3768eb2bee6SAlexander V. Chernikov    def get_sa(self, attr_type) -> bytes:
3778eb2bee6SAlexander V. Chernikov        return self._attrs.get(attr_type)
3788eb2bee6SAlexander V. Chernikov
3798eb2bee6SAlexander V. Chernikov    def print_message(self):
3808eb2bee6SAlexander V. Chernikov        # RTM_GET: Report Metrics: len 272, pid: 87839, seq 1, errno 0, flags:<UP,GATEWAY,DONE,STATIC>
3818eb2bee6SAlexander V. Chernikov        if self._orig_data:
3828eb2bee6SAlexander V. Chernikov            rtm_len = len(self._orig_data)
3838eb2bee6SAlexander V. Chernikov        else:
3848eb2bee6SAlexander V. Chernikov            rtm_len = len(bytes(self))
3858eb2bee6SAlexander V. Chernikov        print(
3868eb2bee6SAlexander V. Chernikov            "{}: len {}, pid: {}, seq {}, errno {}, flags: <{}>".format(
3878eb2bee6SAlexander V. Chernikov                self.rtm_type_str,
3888eb2bee6SAlexander V. Chernikov                rtm_len,
3898eb2bee6SAlexander V. Chernikov                self.rtm_pid,
3908eb2bee6SAlexander V. Chernikov                self.rtm_seq,
3918eb2bee6SAlexander V. Chernikov                self.rtm_errno,
3928eb2bee6SAlexander V. Chernikov                RtConst.get_bitmask_str("RTF_", self.rtm_flags),
3938eb2bee6SAlexander V. Chernikov            )
3948eb2bee6SAlexander V. Chernikov        )
3958eb2bee6SAlexander V. Chernikov        rtm_addrs = sum(list(self._attrs.keys()))
3968eb2bee6SAlexander V. Chernikov        print("Addrs: <{}>".format(RtConst.get_bitmask_str("RTA_", rtm_addrs)))
3978eb2bee6SAlexander V. Chernikov        for attr in sorted(self._attrs.keys()):
3988eb2bee6SAlexander V. Chernikov            sa_data = SaHelper.print_sa(self._attrs[attr])
3998eb2bee6SAlexander V. Chernikov            print(" {}: {}".format(RtConst.get_name("RTA_", attr), sa_data))
4008eb2bee6SAlexander V. Chernikov
4018eb2bee6SAlexander V. Chernikov    def print_in_message(self):
4028eb2bee6SAlexander V. Chernikov        print("vvvvvvvv  IN vvvvvvvv")
4038eb2bee6SAlexander V. Chernikov        self.print_message()
4048eb2bee6SAlexander V. Chernikov        print()
4058eb2bee6SAlexander V. Chernikov
4068eb2bee6SAlexander V. Chernikov    def verify_sa_inet(self, sa_data):
4078eb2bee6SAlexander V. Chernikov        if len(sa_data) < 8:
4088eb2bee6SAlexander V. Chernikov            raise Exception("IPv4 sa size too small: {}".format(sa_data))
4098eb2bee6SAlexander V. Chernikov        if sa_data[0] > len(sa_data):
4108eb2bee6SAlexander V. Chernikov            raise Exception(
4118eb2bee6SAlexander V. Chernikov                "IPv4 sin_len too big: {} vs sa size {}: {}".format(
4128eb2bee6SAlexander V. Chernikov                    sa_data[0], len(sa_data), sa_data
4138eb2bee6SAlexander V. Chernikov                )
4148eb2bee6SAlexander V. Chernikov            )
4158eb2bee6SAlexander V. Chernikov        sin = SockaddrIn.from_buffer_copy(sa_data)
4168eb2bee6SAlexander V. Chernikov        assert sin.sin_port == 0
4178eb2bee6SAlexander V. Chernikov        assert sin.sin_zero == [0] * 8
4188eb2bee6SAlexander V. Chernikov
4198eb2bee6SAlexander V. Chernikov    def compare_sa(self, sa_type, sa_data):
4208eb2bee6SAlexander V. Chernikov        if len(sa_data) < 4:
4218eb2bee6SAlexander V. Chernikov            sa_type_name = RtConst.get_name("RTA_", sa_type)
4228eb2bee6SAlexander V. Chernikov            raise Exception(
4238eb2bee6SAlexander V. Chernikov                "sa_len for type {} too short: {}".format(sa_type_name, len(sa_data))
4248eb2bee6SAlexander V. Chernikov            )
4258eb2bee6SAlexander V. Chernikov        our_sa = self._attrs[sa_type]
4268eb2bee6SAlexander V. Chernikov        assert SaHelper.print_sa(sa_data) == SaHelper.print_sa(our_sa)
4278eb2bee6SAlexander V. Chernikov        assert len(sa_data) == len(our_sa)
4288eb2bee6SAlexander V. Chernikov        assert sa_data == our_sa
4298eb2bee6SAlexander V. Chernikov
4308eb2bee6SAlexander V. Chernikov    def verify(self, rtm_type: int, rtm_sa):
4318eb2bee6SAlexander V. Chernikov        assert self.rtm_type_str == self.print_rtm_type(rtm_type)
4328eb2bee6SAlexander V. Chernikov        assert self.rtm_errno == 0
4338eb2bee6SAlexander V. Chernikov        hdr = RtMsgHdr.from_buffer_copy(self._orig_data)
4348eb2bee6SAlexander V. Chernikov        assert hdr._rtm_spare1 == 0
4358eb2bee6SAlexander V. Chernikov        for sa_type, sa_data in rtm_sa.items():
4368eb2bee6SAlexander V. Chernikov            if sa_type not in self._attrs:
4378eb2bee6SAlexander V. Chernikov                sa_type_name = RtConst.get_name("RTA_", sa_type)
4388eb2bee6SAlexander V. Chernikov                raise Exception("SA type {} not present".format(sa_type_name))
4398eb2bee6SAlexander V. Chernikov            self.compare_sa(sa_type, sa_data)
4408eb2bee6SAlexander V. Chernikov
4418eb2bee6SAlexander V. Chernikov    @classmethod
4428eb2bee6SAlexander V. Chernikov    def from_bytes(cls, data: bytes):
4438eb2bee6SAlexander V. Chernikov        if len(data) < sizeof(RtMsgHdr):
4448eb2bee6SAlexander V. Chernikov            raise Exception(
4458eb2bee6SAlexander V. Chernikov                "messages size {} is less than expected {}".format(
4468eb2bee6SAlexander V. Chernikov                    len(data), sizeof(RtMsgHdr)
4478eb2bee6SAlexander V. Chernikov                )
4488eb2bee6SAlexander V. Chernikov            )
4498eb2bee6SAlexander V. Chernikov        hdr = RtMsgHdr.from_buffer_copy(data)
4508eb2bee6SAlexander V. Chernikov
4518eb2bee6SAlexander V. Chernikov        self = cls(hdr.rtm_type)
4528eb2bee6SAlexander V. Chernikov        self.rtm_flags = hdr.rtm_flags
4538eb2bee6SAlexander V. Chernikov        self.rtm_seq = hdr.rtm_seq
4548eb2bee6SAlexander V. Chernikov        self.rtm_errno = hdr.rtm_errno
4558eb2bee6SAlexander V. Chernikov        self.rtm_pid = hdr.rtm_pid
4568eb2bee6SAlexander V. Chernikov        self.rtm_inits = hdr.rtm_inits
4578eb2bee6SAlexander V. Chernikov        self.rtm_rmx = hdr.rtm_rmx
4588eb2bee6SAlexander V. Chernikov        self._orig_data = data
4598eb2bee6SAlexander V. Chernikov
4608eb2bee6SAlexander V. Chernikov        off = sizeof(RtMsgHdr)
4618eb2bee6SAlexander V. Chernikov        v = 1
4628eb2bee6SAlexander V. Chernikov        addrs_mask = hdr.rtm_addrs
4638eb2bee6SAlexander V. Chernikov        while addrs_mask:
4648eb2bee6SAlexander V. Chernikov            if addrs_mask & v:
4658eb2bee6SAlexander V. Chernikov                addrs_mask -= v
4668eb2bee6SAlexander V. Chernikov
4678eb2bee6SAlexander V. Chernikov                if off + data[off] > len(data):
4688eb2bee6SAlexander V. Chernikov                    raise Exception(
4698eb2bee6SAlexander V. Chernikov                        "SA sizeof for {} > total message length: {}+{} > {}".format(
4708eb2bee6SAlexander V. Chernikov                            RtConst.get_name("RTA_", v), off, data[off], len(data)
4718eb2bee6SAlexander V. Chernikov                        )
4728eb2bee6SAlexander V. Chernikov                    )
4738eb2bee6SAlexander V. Chernikov                self._attrs[v] = data[off : off + data[off]]
4748eb2bee6SAlexander V. Chernikov                off += roundup2(data[off], RtConst.ALIGN)
4758eb2bee6SAlexander V. Chernikov            v *= 2
4768eb2bee6SAlexander V. Chernikov        return self
4778eb2bee6SAlexander V. Chernikov
4788eb2bee6SAlexander V. Chernikov    def __bytes__(self):
4798eb2bee6SAlexander V. Chernikov        sz = sizeof(RtMsgHdr)
4808eb2bee6SAlexander V. Chernikov        addrs_mask = 0
4818eb2bee6SAlexander V. Chernikov        for k, v in self._attrs.items():
4828eb2bee6SAlexander V. Chernikov            sz += roundup2(len(v), RtConst.ALIGN)
4838eb2bee6SAlexander V. Chernikov            addrs_mask += k
4848eb2bee6SAlexander V. Chernikov        hdr = RtMsgHdr(
4858eb2bee6SAlexander V. Chernikov            rtm_msglen=sz,
4868eb2bee6SAlexander V. Chernikov            rtm_version=RtConst.RTM_VERSION,
4878eb2bee6SAlexander V. Chernikov            rtm_type=self.rtm_type,
4888eb2bee6SAlexander V. Chernikov            rtm_flags=self.rtm_flags,
4898eb2bee6SAlexander V. Chernikov            rtm_seq=self.rtm_seq,
4908eb2bee6SAlexander V. Chernikov            rtm_addrs=addrs_mask,
4918eb2bee6SAlexander V. Chernikov            rtm_inits=self.rtm_inits,
4928eb2bee6SAlexander V. Chernikov            rtm_rmx=self.rtm_rmx,
4938eb2bee6SAlexander V. Chernikov        )
4948eb2bee6SAlexander V. Chernikov        buf = bytearray(sz)
4958eb2bee6SAlexander V. Chernikov        buf[0 : sizeof(RtMsgHdr)] = hdr
4968eb2bee6SAlexander V. Chernikov        off = sizeof(RtMsgHdr)
4978eb2bee6SAlexander V. Chernikov        for attr in sorted(self._attrs.keys()):
4988eb2bee6SAlexander V. Chernikov            v = self._attrs[attr]
4998eb2bee6SAlexander V. Chernikov            sa_len = len(v)
5008eb2bee6SAlexander V. Chernikov            buf[off : off + sa_len] = v
5018eb2bee6SAlexander V. Chernikov            off += roundup2(len(v), RtConst.ALIGN)
5028eb2bee6SAlexander V. Chernikov        return bytes(buf)
5038eb2bee6SAlexander V. Chernikov
5048eb2bee6SAlexander V. Chernikov
5058eb2bee6SAlexander V. Chernikovclass Rtsock:
5068eb2bee6SAlexander V. Chernikov    def __init__(self):
5078eb2bee6SAlexander V. Chernikov        self.socket = self._setup_rtsock()
5088eb2bee6SAlexander V. Chernikov        self.rtm_seq = 1
5098eb2bee6SAlexander V. Chernikov        self.msgmap = self.build_msgmap()
5108eb2bee6SAlexander V. Chernikov
5118eb2bee6SAlexander V. Chernikov    def build_msgmap(self):
5128eb2bee6SAlexander V. Chernikov        classes = [RtsockRtMessage]
5138eb2bee6SAlexander V. Chernikov        xmap = {}
5148eb2bee6SAlexander V. Chernikov        for cls in classes:
5158eb2bee6SAlexander V. Chernikov            for message in cls.messages:
5168eb2bee6SAlexander V. Chernikov                xmap[message] = cls
5178eb2bee6SAlexander V. Chernikov        return xmap
5188eb2bee6SAlexander V. Chernikov
5198eb2bee6SAlexander V. Chernikov    def get_seq(self):
5208eb2bee6SAlexander V. Chernikov        ret = self.rtm_seq
5218eb2bee6SAlexander V. Chernikov        self.rtm_seq += 1
5228eb2bee6SAlexander V. Chernikov        return ret
5238eb2bee6SAlexander V. Chernikov
5248eb2bee6SAlexander V. Chernikov    def get_weight(self, weight) -> int:
5258eb2bee6SAlexander V. Chernikov        if weight:
5268eb2bee6SAlexander V. Chernikov            return weight
5278eb2bee6SAlexander V. Chernikov        else:
5288eb2bee6SAlexander V. Chernikov            return 1  # RT_DEFAULT_WEIGHT
5298eb2bee6SAlexander V. Chernikov
5308eb2bee6SAlexander V. Chernikov    def new_rtm_any(self, msg_type, prefix: str, gw: Union[str, bytes]):
5318eb2bee6SAlexander V. Chernikov        px = prefix.split("/")
5328eb2bee6SAlexander V. Chernikov        addr_sa = SaHelper.ip_sa(px[0])
5338eb2bee6SAlexander V. Chernikov        if len(px) > 1:
5348eb2bee6SAlexander V. Chernikov            pxlen = int(px[1])
5358eb2bee6SAlexander V. Chernikov            if SaHelper.is_ipv6(px[0]):
5368eb2bee6SAlexander V. Chernikov                mask_sa = SaHelper.pxlen6_sa(pxlen)
5378eb2bee6SAlexander V. Chernikov            else:
5388eb2bee6SAlexander V. Chernikov                mask_sa = SaHelper.pxlen4_sa(pxlen)
5398eb2bee6SAlexander V. Chernikov        else:
5408eb2bee6SAlexander V. Chernikov            mask_sa = None
5418eb2bee6SAlexander V. Chernikov        msg = RtsockRtMessage(msg_type, self.get_seq(), addr_sa, mask_sa)
5428eb2bee6SAlexander V. Chernikov        if isinstance(gw, bytes):
5438eb2bee6SAlexander V. Chernikov            msg.add_sa_attr(RtConst.RTA_GATEWAY, gw)
5448eb2bee6SAlexander V. Chernikov        else:
5458eb2bee6SAlexander V. Chernikov            # String
5468eb2bee6SAlexander V. Chernikov            msg.add_ip_attr(RtConst.RTA_GATEWAY, gw)
5478eb2bee6SAlexander V. Chernikov        return msg
5488eb2bee6SAlexander V. Chernikov
5498eb2bee6SAlexander V. Chernikov    def new_rtm_add(self, prefix: str, gw: Union[str, bytes]):
5508eb2bee6SAlexander V. Chernikov        return self.new_rtm_any(RtConst.RTM_ADD, prefix, gw)
5518eb2bee6SAlexander V. Chernikov
5528eb2bee6SAlexander V. Chernikov    def new_rtm_del(self, prefix: str, gw: Union[str, bytes]):
5538eb2bee6SAlexander V. Chernikov        return self.new_rtm_any(RtConst.RTM_DELETE, prefix, gw)
5548eb2bee6SAlexander V. Chernikov
5558eb2bee6SAlexander V. Chernikov    def new_rtm_change(self, prefix: str, gw: Union[str, bytes]):
5568eb2bee6SAlexander V. Chernikov        return self.new_rtm_any(RtConst.RTM_CHANGE, prefix, gw)
5578eb2bee6SAlexander V. Chernikov
5588eb2bee6SAlexander V. Chernikov    def _setup_rtsock(self) -> socket.socket:
5598eb2bee6SAlexander V. Chernikov        s = socket.socket(socket.AF_ROUTE, socket.SOCK_RAW, socket.AF_UNSPEC)
5608eb2bee6SAlexander V. Chernikov        s.setsockopt(socket.SOL_SOCKET, socket.SO_USELOOPBACK, 1)
5618eb2bee6SAlexander V. Chernikov        return s
5628eb2bee6SAlexander V. Chernikov
5638eb2bee6SAlexander V. Chernikov    def print_hd(self, data: bytes):
5648eb2bee6SAlexander V. Chernikov        width = 16
5658eb2bee6SAlexander V. Chernikov        print("==========================================")
5668eb2bee6SAlexander V. Chernikov        for chunk in [data[i : i + width] for i in range(0, len(data), width)]:
5678eb2bee6SAlexander V. Chernikov            for b in chunk:
5688eb2bee6SAlexander V. Chernikov                print("0x{:02X} ".format(b), end="")
5698eb2bee6SAlexander V. Chernikov            print()
5708eb2bee6SAlexander V. Chernikov        print()
5718eb2bee6SAlexander V. Chernikov
5728eb2bee6SAlexander V. Chernikov    def write_message(self, msg):
5738eb2bee6SAlexander V. Chernikov        print("vvvvvvvv OUT vvvvvvvv")
5748eb2bee6SAlexander V. Chernikov        msg.print_message()
5758eb2bee6SAlexander V. Chernikov        print()
5768eb2bee6SAlexander V. Chernikov        msg_bytes = bytes(msg)
5778eb2bee6SAlexander V. Chernikov        ret = os.write(self.socket.fileno(), msg_bytes)
5788eb2bee6SAlexander V. Chernikov        if ret != -1:
5798eb2bee6SAlexander V. Chernikov            assert ret == len(msg_bytes)
5808eb2bee6SAlexander V. Chernikov
5818eb2bee6SAlexander V. Chernikov    def parse_message(self, data: bytes):
5828eb2bee6SAlexander V. Chernikov        if len(data) < 4:
5838eb2bee6SAlexander V. Chernikov            raise OSError("Short read from rtsock: {} bytes".format(len(data)))
5848eb2bee6SAlexander V. Chernikov        rtm_type = data[4]
5858eb2bee6SAlexander V. Chernikov        if rtm_type not in self.msgmap:
5868eb2bee6SAlexander V. Chernikov            return None
5878eb2bee6SAlexander V. Chernikov
5888eb2bee6SAlexander V. Chernikov    def write_data(self, data: bytes):
5898eb2bee6SAlexander V. Chernikov        self.socket.send(data)
5908eb2bee6SAlexander V. Chernikov
5918eb2bee6SAlexander V. Chernikov    def read_data(self, seq: Optional[int] = None) -> bytes:
5928eb2bee6SAlexander V. Chernikov        while True:
5938eb2bee6SAlexander V. Chernikov            data = self.socket.recv(4096)
5948eb2bee6SAlexander V. Chernikov            if seq is None:
5958eb2bee6SAlexander V. Chernikov                break
5968eb2bee6SAlexander V. Chernikov            if len(data) > sizeof(RtMsgHdr):
5978eb2bee6SAlexander V. Chernikov                hdr = RtMsgHdr.from_buffer_copy(data)
5988eb2bee6SAlexander V. Chernikov                if hdr.rtm_seq == seq:
5998eb2bee6SAlexander V. Chernikov                    break
6008eb2bee6SAlexander V. Chernikov        return data
6018eb2bee6SAlexander V. Chernikov
6028eb2bee6SAlexander V. Chernikov    def read_message(self) -> bytes:
6038eb2bee6SAlexander V. Chernikov        data = self.read_data()
6048eb2bee6SAlexander V. Chernikov        return self.parse_message(data)
605