18eb2bee6SAlexander V. Chernikov#!/usr/local/bin/python3 28eb2bee6SAlexander V. Chernikovimport os 38eb2bee6SAlexander V. Chernikovimport socket 48eb2bee6SAlexander V. Chernikovimport struct 58eb2bee6SAlexander V. Chernikovimport sys 68eb2bee6SAlexander V. Chernikovfrom ctypes import c_byte 78eb2bee6SAlexander V. Chernikovfrom ctypes import c_char 88eb2bee6SAlexander V. Chernikovfrom ctypes import c_int 98eb2bee6SAlexander V. Chernikovfrom ctypes import c_long 108eb2bee6SAlexander V. Chernikovfrom ctypes import c_uint32 118eb2bee6SAlexander V. Chernikovfrom ctypes import c_ulong 128eb2bee6SAlexander V. Chernikovfrom ctypes import c_ushort 138eb2bee6SAlexander V. Chernikovfrom ctypes import sizeof 148eb2bee6SAlexander V. Chernikovfrom ctypes import Structure 158eb2bee6SAlexander V. Chernikovfrom typing import Dict 168eb2bee6SAlexander V. Chernikovfrom typing import List 178eb2bee6SAlexander V. Chernikovfrom typing import Optional 188eb2bee6SAlexander V. Chernikovfrom typing import Union 198eb2bee6SAlexander V. Chernikov 208eb2bee6SAlexander V. Chernikov 218eb2bee6SAlexander V. Chernikovdef roundup2(val: int, num: int) -> int: 228eb2bee6SAlexander V. Chernikov if val % num: 238eb2bee6SAlexander V. Chernikov return (val | (num - 1)) + 1 248eb2bee6SAlexander V. Chernikov else: 258eb2bee6SAlexander V. Chernikov return val 268eb2bee6SAlexander V. Chernikov 278eb2bee6SAlexander V. Chernikov 288eb2bee6SAlexander V. Chernikovclass RtSockException(OSError): 298eb2bee6SAlexander V. Chernikov pass 308eb2bee6SAlexander V. Chernikov 318eb2bee6SAlexander V. Chernikov 328eb2bee6SAlexander V. Chernikovclass RtConst: 338eb2bee6SAlexander V. Chernikov RTM_VERSION = 5 348eb2bee6SAlexander V. Chernikov ALIGN = sizeof(c_long) 358eb2bee6SAlexander V. Chernikov 368eb2bee6SAlexander V. Chernikov AF_INET = socket.AF_INET 378eb2bee6SAlexander V. Chernikov AF_INET6 = socket.AF_INET6 388eb2bee6SAlexander V. Chernikov AF_LINK = socket.AF_LINK 398eb2bee6SAlexander V. Chernikov 408eb2bee6SAlexander V. Chernikov RTA_DST = 0x1 418eb2bee6SAlexander V. Chernikov RTA_GATEWAY = 0x2 428eb2bee6SAlexander V. Chernikov RTA_NETMASK = 0x4 438eb2bee6SAlexander V. Chernikov RTA_GENMASK = 0x8 448eb2bee6SAlexander V. Chernikov RTA_IFP = 0x10 458eb2bee6SAlexander V. Chernikov RTA_IFA = 0x20 468eb2bee6SAlexander V. Chernikov RTA_AUTHOR = 0x40 478eb2bee6SAlexander V. Chernikov RTA_BRD = 0x80 488eb2bee6SAlexander V. Chernikov 498eb2bee6SAlexander V. Chernikov RTM_ADD = 1 508eb2bee6SAlexander V. Chernikov RTM_DELETE = 2 518eb2bee6SAlexander V. Chernikov RTM_CHANGE = 3 528eb2bee6SAlexander V. Chernikov RTM_GET = 4 538eb2bee6SAlexander V. Chernikov 548eb2bee6SAlexander V. Chernikov RTF_UP = 0x1 558eb2bee6SAlexander V. Chernikov RTF_GATEWAY = 0x2 568eb2bee6SAlexander V. Chernikov RTF_HOST = 0x4 578eb2bee6SAlexander V. Chernikov RTF_REJECT = 0x8 588eb2bee6SAlexander V. Chernikov RTF_DYNAMIC = 0x10 598eb2bee6SAlexander V. Chernikov RTF_MODIFIED = 0x20 608eb2bee6SAlexander V. Chernikov RTF_DONE = 0x40 618eb2bee6SAlexander V. Chernikov RTF_XRESOLVE = 0x200 628eb2bee6SAlexander V. Chernikov RTF_LLINFO = 0x400 638eb2bee6SAlexander V. Chernikov RTF_LLDATA = 0x400 648eb2bee6SAlexander V. Chernikov RTF_STATIC = 0x800 658eb2bee6SAlexander V. Chernikov RTF_BLACKHOLE = 0x1000 668eb2bee6SAlexander V. Chernikov RTF_PROTO2 = 0x4000 678eb2bee6SAlexander V. Chernikov RTF_PROTO1 = 0x8000 688eb2bee6SAlexander V. Chernikov RTF_PROTO3 = 0x40000 698eb2bee6SAlexander V. Chernikov RTF_FIXEDMTU = 0x80000 708eb2bee6SAlexander V. Chernikov RTF_PINNED = 0x100000 718eb2bee6SAlexander V. Chernikov RTF_LOCAL = 0x200000 728eb2bee6SAlexander V. Chernikov RTF_BROADCAST = 0x400000 738eb2bee6SAlexander V. Chernikov RTF_MULTICAST = 0x800000 748eb2bee6SAlexander V. Chernikov RTF_STICKY = 0x10000000 758eb2bee6SAlexander V. Chernikov RTF_RNH_LOCKED = 0x40000000 768eb2bee6SAlexander V. Chernikov RTF_GWFLAG_COMPAT = 0x80000000 778eb2bee6SAlexander V. Chernikov 788eb2bee6SAlexander V. Chernikov RTV_MTU = 0x1 798eb2bee6SAlexander V. Chernikov RTV_HOPCOUNT = 0x2 808eb2bee6SAlexander V. Chernikov RTV_EXPIRE = 0x4 818eb2bee6SAlexander V. Chernikov RTV_RPIPE = 0x8 828eb2bee6SAlexander V. Chernikov RTV_SPIPE = 0x10 838eb2bee6SAlexander V. Chernikov RTV_SSTHRESH = 0x20 848eb2bee6SAlexander V. Chernikov RTV_RTT = 0x40 858eb2bee6SAlexander V. Chernikov RTV_RTTVAR = 0x80 868eb2bee6SAlexander V. Chernikov RTV_WEIGHT = 0x100 878eb2bee6SAlexander V. Chernikov 888eb2bee6SAlexander V. Chernikov @staticmethod 898eb2bee6SAlexander V. Chernikov def get_props(prefix: str) -> List[str]: 908eb2bee6SAlexander V. Chernikov return [n for n in dir(RtConst) if n.startswith(prefix)] 918eb2bee6SAlexander V. Chernikov 928eb2bee6SAlexander V. Chernikov @staticmethod 938eb2bee6SAlexander V. Chernikov def get_name(prefix: str, value: int) -> str: 948eb2bee6SAlexander V. Chernikov props = RtConst.get_props(prefix) 958eb2bee6SAlexander V. Chernikov for prop in props: 968eb2bee6SAlexander V. Chernikov if getattr(RtConst, prop) == value: 978eb2bee6SAlexander V. Chernikov return prop 988eb2bee6SAlexander V. Chernikov return "U:{}:{}".format(prefix, value) 998eb2bee6SAlexander V. Chernikov 1008eb2bee6SAlexander V. Chernikov @staticmethod 1018eb2bee6SAlexander V. Chernikov def get_bitmask_map(prefix: str, value: int) -> Dict[int, str]: 1028eb2bee6SAlexander V. Chernikov props = RtConst.get_props(prefix) 1038eb2bee6SAlexander V. Chernikov propmap = {getattr(RtConst, prop): prop for prop in props} 1048eb2bee6SAlexander V. Chernikov v = 1 1058eb2bee6SAlexander V. Chernikov ret = {} 1068eb2bee6SAlexander V. Chernikov while value: 1078eb2bee6SAlexander V. Chernikov if v & value: 1088eb2bee6SAlexander V. Chernikov if v in propmap: 1098eb2bee6SAlexander V. Chernikov ret[v] = propmap[v] 1108eb2bee6SAlexander V. Chernikov else: 1118eb2bee6SAlexander V. Chernikov ret[v] = hex(v) 1128eb2bee6SAlexander V. Chernikov value -= v 1138eb2bee6SAlexander V. Chernikov v *= 2 1148eb2bee6SAlexander V. Chernikov return ret 1158eb2bee6SAlexander V. Chernikov 1168eb2bee6SAlexander V. Chernikov @staticmethod 1178eb2bee6SAlexander V. Chernikov def get_bitmask_str(prefix: str, value: int) -> str: 1188eb2bee6SAlexander V. Chernikov bmap = RtConst.get_bitmask_map(prefix, value) 1198eb2bee6SAlexander V. Chernikov return ",".join([v for k, v in bmap.items()]) 1208eb2bee6SAlexander V. Chernikov 1218eb2bee6SAlexander V. Chernikov 1228eb2bee6SAlexander V. Chernikovclass RtMetrics(Structure): 1238eb2bee6SAlexander V. Chernikov _fields_ = [ 1248eb2bee6SAlexander V. Chernikov ("rmx_locks", c_ulong), 1258eb2bee6SAlexander V. Chernikov ("rmx_mtu", c_ulong), 1268eb2bee6SAlexander V. Chernikov ("rmx_hopcount", c_ulong), 1278eb2bee6SAlexander V. Chernikov ("rmx_expire", c_ulong), 1288eb2bee6SAlexander V. Chernikov ("rmx_recvpipe", c_ulong), 1298eb2bee6SAlexander V. Chernikov ("rmx_sendpipe", c_ulong), 1308eb2bee6SAlexander V. Chernikov ("rmx_ssthresh", c_ulong), 1318eb2bee6SAlexander V. Chernikov ("rmx_rtt", c_ulong), 1328eb2bee6SAlexander V. Chernikov ("rmx_rttvar", c_ulong), 1338eb2bee6SAlexander V. Chernikov ("rmx_pksent", c_ulong), 1348eb2bee6SAlexander V. Chernikov ("rmx_weight", c_ulong), 1358eb2bee6SAlexander V. Chernikov ("rmx_nhidx", c_ulong), 1368eb2bee6SAlexander V. Chernikov ("rmx_filler", c_ulong * 2), 1378eb2bee6SAlexander V. Chernikov ] 1388eb2bee6SAlexander V. Chernikov 1398eb2bee6SAlexander V. Chernikov 1408eb2bee6SAlexander V. Chernikovclass RtMsgHdr(Structure): 1418eb2bee6SAlexander V. Chernikov _fields_ = [ 1428eb2bee6SAlexander V. Chernikov ("rtm_msglen", c_ushort), 1438eb2bee6SAlexander V. Chernikov ("rtm_version", c_byte), 1448eb2bee6SAlexander V. Chernikov ("rtm_type", c_byte), 1458eb2bee6SAlexander V. Chernikov ("rtm_index", c_ushort), 1468eb2bee6SAlexander V. Chernikov ("_rtm_spare1", c_ushort), 1478eb2bee6SAlexander V. Chernikov ("rtm_flags", c_int), 1488eb2bee6SAlexander V. Chernikov ("rtm_addrs", c_int), 1498eb2bee6SAlexander V. Chernikov ("rtm_pid", c_int), 1508eb2bee6SAlexander V. Chernikov ("rtm_seq", c_int), 1518eb2bee6SAlexander V. Chernikov ("rtm_errno", c_int), 1528eb2bee6SAlexander V. Chernikov ("rtm_fmask", c_int), 1538eb2bee6SAlexander V. Chernikov ("rtm_inits", c_ulong), 1548eb2bee6SAlexander V. Chernikov ("rtm_rmx", RtMetrics), 1558eb2bee6SAlexander V. Chernikov ] 1568eb2bee6SAlexander V. Chernikov 1578eb2bee6SAlexander V. Chernikov 1588eb2bee6SAlexander V. Chernikovclass SockaddrIn(Structure): 1598eb2bee6SAlexander V. Chernikov _fields_ = [ 1608eb2bee6SAlexander V. Chernikov ("sin_len", c_byte), 1618eb2bee6SAlexander V. Chernikov ("sin_family", c_byte), 1628eb2bee6SAlexander V. Chernikov ("sin_port", c_ushort), 1638eb2bee6SAlexander V. Chernikov ("sin_addr", c_uint32), 1648eb2bee6SAlexander V. Chernikov ("sin_zero", c_char * 8), 1658eb2bee6SAlexander V. Chernikov ] 1668eb2bee6SAlexander V. Chernikov 1678eb2bee6SAlexander V. Chernikov 1688eb2bee6SAlexander V. Chernikovclass SockaddrIn6(Structure): 1698eb2bee6SAlexander V. Chernikov _fields_ = [ 1708eb2bee6SAlexander V. Chernikov ("sin6_len", c_byte), 1718eb2bee6SAlexander V. Chernikov ("sin6_family", c_byte), 1728eb2bee6SAlexander V. Chernikov ("sin6_port", c_ushort), 1738eb2bee6SAlexander V. Chernikov ("sin6_flowinfo", c_uint32), 1748eb2bee6SAlexander V. Chernikov ("sin6_addr", c_byte * 16), 1758eb2bee6SAlexander V. Chernikov ("sin6_scope_id", c_uint32), 1768eb2bee6SAlexander V. Chernikov ] 1778eb2bee6SAlexander V. Chernikov 1788eb2bee6SAlexander V. Chernikov 1798eb2bee6SAlexander V. Chernikovclass SockaddrDl(Structure): 1808eb2bee6SAlexander V. Chernikov _fields_ = [ 1818eb2bee6SAlexander V. Chernikov ("sdl_len", c_byte), 1828eb2bee6SAlexander V. Chernikov ("sdl_family", c_byte), 1838eb2bee6SAlexander V. Chernikov ("sdl_index", c_ushort), 1848eb2bee6SAlexander V. Chernikov ("sdl_type", c_byte), 1858eb2bee6SAlexander V. Chernikov ("sdl_nlen", c_byte), 1868eb2bee6SAlexander V. Chernikov ("sdl_alen", c_byte), 1878eb2bee6SAlexander V. Chernikov ("sdl_slen", c_byte), 1888eb2bee6SAlexander V. Chernikov ("sdl_data", c_byte * 8), 1898eb2bee6SAlexander V. Chernikov ] 1908eb2bee6SAlexander V. Chernikov 1918eb2bee6SAlexander V. Chernikov 1928eb2bee6SAlexander V. Chernikovclass SaHelper(object): 1938eb2bee6SAlexander V. Chernikov @staticmethod 1948eb2bee6SAlexander V. Chernikov def is_ipv6(ip: str) -> bool: 1958eb2bee6SAlexander V. Chernikov return ":" in ip 1968eb2bee6SAlexander V. Chernikov 1978eb2bee6SAlexander V. Chernikov @staticmethod 1988eb2bee6SAlexander V. Chernikov def ip_sa(ip: str, scopeid: int = 0) -> bytes: 1998eb2bee6SAlexander V. Chernikov if SaHelper.is_ipv6(ip): 2008eb2bee6SAlexander V. Chernikov return SaHelper.ip6_sa(ip, scopeid) 2018eb2bee6SAlexander V. Chernikov else: 2028eb2bee6SAlexander V. Chernikov return SaHelper.ip4_sa(ip) 2038eb2bee6SAlexander V. Chernikov 2048eb2bee6SAlexander V. Chernikov @staticmethod 2058eb2bee6SAlexander V. Chernikov def ip4_sa(ip: str) -> bytes: 2068eb2bee6SAlexander V. Chernikov addr_int = int.from_bytes(socket.inet_pton(2, ip), sys.byteorder) 2078eb2bee6SAlexander V. Chernikov sin = SockaddrIn(sizeof(SockaddrIn), socket.AF_INET, 0, addr_int) 2088eb2bee6SAlexander V. Chernikov return bytes(sin) 2098eb2bee6SAlexander V. Chernikov 2108eb2bee6SAlexander V. Chernikov @staticmethod 2118eb2bee6SAlexander V. Chernikov def ip6_sa(ip6: str, scopeid: int) -> bytes: 2128eb2bee6SAlexander V. Chernikov addr_bytes = (c_byte * 16)() 2138eb2bee6SAlexander V. Chernikov for i, b in enumerate(socket.inet_pton(socket.AF_INET6, ip6)): 2148eb2bee6SAlexander V. Chernikov addr_bytes[i] = b 2158eb2bee6SAlexander V. Chernikov sin6 = SockaddrIn6( 2168eb2bee6SAlexander V. Chernikov sizeof(SockaddrIn6), socket.AF_INET6, 0, 0, addr_bytes, scopeid 2178eb2bee6SAlexander V. Chernikov ) 2188eb2bee6SAlexander V. Chernikov return bytes(sin6) 2198eb2bee6SAlexander V. Chernikov 2208eb2bee6SAlexander V. Chernikov @staticmethod 2218eb2bee6SAlexander V. Chernikov def link_sa(ifindex: int = 0, iftype: int = 0) -> bytes: 2228eb2bee6SAlexander V. Chernikov sa = SockaddrDl(sizeof(SockaddrDl), socket.AF_LINK, c_ushort(ifindex), iftype) 2238eb2bee6SAlexander V. Chernikov return bytes(sa) 2248eb2bee6SAlexander V. Chernikov 2258eb2bee6SAlexander V. Chernikov @staticmethod 2268eb2bee6SAlexander V. Chernikov def pxlen4_sa(pxlen: int) -> bytes: 2278eb2bee6SAlexander V. Chernikov return SaHelper.ip_sa(SaHelper.pxlen_to_ip4(pxlen)) 2288eb2bee6SAlexander V. Chernikov 2298eb2bee6SAlexander V. Chernikov @staticmethod 2308eb2bee6SAlexander V. Chernikov def pxlen_to_ip4(pxlen: int) -> str: 2318eb2bee6SAlexander V. Chernikov if pxlen == 32: 2328eb2bee6SAlexander V. Chernikov return "255.255.255.255" 2338eb2bee6SAlexander V. Chernikov else: 2348eb2bee6SAlexander V. Chernikov addr = 0xFFFFFFFF - ((1 << (32 - pxlen)) - 1) 2358eb2bee6SAlexander V. Chernikov addr_bytes = struct.pack("!I", addr) 2368eb2bee6SAlexander V. Chernikov return socket.inet_ntop(socket.AF_INET, addr_bytes) 2378eb2bee6SAlexander V. Chernikov 2388eb2bee6SAlexander V. Chernikov @staticmethod 2398eb2bee6SAlexander V. Chernikov def pxlen6_sa(pxlen: int) -> bytes: 2408eb2bee6SAlexander V. Chernikov return SaHelper.ip_sa(SaHelper.pxlen_to_ip6(pxlen)) 2418eb2bee6SAlexander V. Chernikov 2428eb2bee6SAlexander V. Chernikov @staticmethod 2438eb2bee6SAlexander V. Chernikov def pxlen_to_ip6(pxlen: int) -> str: 2448eb2bee6SAlexander V. Chernikov ip6_b = [0] * 16 2458eb2bee6SAlexander V. Chernikov start = 0 2468eb2bee6SAlexander V. Chernikov while pxlen > 8: 2478eb2bee6SAlexander V. Chernikov ip6_b[start] = 0xFF 2488eb2bee6SAlexander V. Chernikov pxlen -= 8 2498eb2bee6SAlexander V. Chernikov start += 1 2508eb2bee6SAlexander V. Chernikov ip6_b[start] = 0xFF - ((1 << (8 - pxlen)) - 1) 2518eb2bee6SAlexander V. Chernikov return socket.inet_ntop(socket.AF_INET6, bytes(ip6_b)) 2528eb2bee6SAlexander V. Chernikov 2538eb2bee6SAlexander V. Chernikov @staticmethod 2548eb2bee6SAlexander V. Chernikov def print_sa_inet(sa: bytes): 2558eb2bee6SAlexander V. Chernikov if len(sa) < 8: 2568eb2bee6SAlexander V. Chernikov raise RtSockException("IPv4 sa size too small: {}".format(len(sa))) 2578eb2bee6SAlexander V. Chernikov addr = socket.inet_ntop(socket.AF_INET, sa[4:8]) 2588eb2bee6SAlexander V. Chernikov return "{}".format(addr) 2598eb2bee6SAlexander V. Chernikov 2608eb2bee6SAlexander V. Chernikov @staticmethod 2618eb2bee6SAlexander V. Chernikov def print_sa_inet6(sa: bytes): 2628eb2bee6SAlexander V. Chernikov if len(sa) < sizeof(SockaddrIn6): 2638eb2bee6SAlexander V. Chernikov raise RtSockException("IPv6 sa size too small: {}".format(len(sa))) 2648eb2bee6SAlexander V. Chernikov addr = socket.inet_ntop(socket.AF_INET6, sa[8:24]) 2658eb2bee6SAlexander V. Chernikov scopeid = struct.unpack(">I", sa[24:28])[0] 2668eb2bee6SAlexander V. Chernikov return "{} scopeid {}".format(addr, scopeid) 2678eb2bee6SAlexander V. Chernikov 2688eb2bee6SAlexander V. Chernikov @staticmethod 2698eb2bee6SAlexander V. Chernikov def print_sa_link(sa: bytes, hd: Optional[bool] = True): 2708eb2bee6SAlexander V. Chernikov if len(sa) < sizeof(SockaddrDl): 2718eb2bee6SAlexander V. Chernikov raise RtSockException("LINK sa size too small: {}".format(len(sa))) 2728eb2bee6SAlexander V. Chernikov sdl = SockaddrDl.from_buffer_copy(sa) 2738eb2bee6SAlexander V. Chernikov if sdl.sdl_index: 2748eb2bee6SAlexander V. Chernikov ifindex = "link#{} ".format(sdl.sdl_index) 2758eb2bee6SAlexander V. Chernikov else: 2768eb2bee6SAlexander V. Chernikov ifindex = "" 2778eb2bee6SAlexander V. Chernikov if sdl.sdl_nlen: 2788eb2bee6SAlexander V. Chernikov iface_offset = 8 2798eb2bee6SAlexander V. Chernikov if sdl.sdl_nlen + iface_offset > len(sa): 2808eb2bee6SAlexander V. Chernikov raise RtSockException( 2818eb2bee6SAlexander V. Chernikov "LINK sa sdl_nlen {} > total len {}".format(sdl.sdl_nlen, len(sa)) 2828eb2bee6SAlexander V. Chernikov ) 2838eb2bee6SAlexander V. Chernikov ifname = "ifname:{} ".format( 2848eb2bee6SAlexander V. Chernikov bytes.decode(sa[iface_offset : iface_offset + sdl.sdl_nlen]) 2858eb2bee6SAlexander V. Chernikov ) 2868eb2bee6SAlexander V. Chernikov else: 2878eb2bee6SAlexander V. Chernikov ifname = "" 2888eb2bee6SAlexander V. Chernikov return "{}{}".format(ifindex, ifname) 2898eb2bee6SAlexander V. Chernikov 2908eb2bee6SAlexander V. Chernikov @staticmethod 2918eb2bee6SAlexander V. Chernikov def print_sa_unknown(sa: bytes): 2928eb2bee6SAlexander V. Chernikov return "unknown_type:{}".format(sa[1]) 2938eb2bee6SAlexander V. Chernikov 2948eb2bee6SAlexander V. Chernikov @classmethod 2958eb2bee6SAlexander V. Chernikov def print_sa(cls, sa: bytes, hd: Optional[bool] = False): 2968eb2bee6SAlexander V. Chernikov if sa[0] != len(sa): 2978eb2bee6SAlexander V. Chernikov raise Exception("sa size {} != buffer size {}".format(sa[0], len(sa))) 2988eb2bee6SAlexander V. Chernikov 2998eb2bee6SAlexander V. Chernikov if len(sa) < 2: 3008eb2bee6SAlexander V. Chernikov raise Exception( 3018eb2bee6SAlexander V. Chernikov "sa type {} too short: {}".format( 3028eb2bee6SAlexander V. Chernikov RtConst.get_name("AF_", sa[1]), len(sa) 3038eb2bee6SAlexander V. Chernikov ) 3048eb2bee6SAlexander V. Chernikov ) 3058eb2bee6SAlexander V. Chernikov 3068eb2bee6SAlexander V. Chernikov if sa[1] == socket.AF_INET: 3078eb2bee6SAlexander V. Chernikov text = cls.print_sa_inet(sa) 3088eb2bee6SAlexander V. Chernikov elif sa[1] == socket.AF_INET6: 3098eb2bee6SAlexander V. Chernikov text = cls.print_sa_inet6(sa) 3108eb2bee6SAlexander V. Chernikov elif sa[1] == socket.AF_LINK: 3118eb2bee6SAlexander V. Chernikov text = cls.print_sa_link(sa) 3128eb2bee6SAlexander V. Chernikov else: 3138eb2bee6SAlexander V. Chernikov text = cls.print_sa_unknown(sa) 3148eb2bee6SAlexander V. Chernikov if hd: 3158eb2bee6SAlexander V. Chernikov dump = " [{!r}]".format(sa) 3168eb2bee6SAlexander V. Chernikov else: 3178eb2bee6SAlexander V. Chernikov dump = "" 3188eb2bee6SAlexander V. Chernikov return "{}{}".format(text, dump) 3198eb2bee6SAlexander V. Chernikov 3208eb2bee6SAlexander V. Chernikov 3218eb2bee6SAlexander V. Chernikovclass BaseRtsockMessage(object): 3228eb2bee6SAlexander V. Chernikov def __init__(self, rtm_type): 3238eb2bee6SAlexander V. Chernikov self.rtm_type = rtm_type 3248eb2bee6SAlexander V. Chernikov self.sa = SaHelper() 3258eb2bee6SAlexander V. Chernikov 3268eb2bee6SAlexander V. Chernikov @staticmethod 3278eb2bee6SAlexander V. Chernikov def print_rtm_type(rtm_type): 3288eb2bee6SAlexander V. Chernikov return RtConst.get_name("RTM_", rtm_type) 3298eb2bee6SAlexander V. Chernikov 3308eb2bee6SAlexander V. Chernikov @property 3318eb2bee6SAlexander V. Chernikov def rtm_type_str(self): 3328eb2bee6SAlexander V. Chernikov return self.print_rtm_type(self.rtm_type) 3338eb2bee6SAlexander V. Chernikov 3348eb2bee6SAlexander V. Chernikov 3358eb2bee6SAlexander V. Chernikovclass RtsockRtMessage(BaseRtsockMessage): 3368eb2bee6SAlexander V. Chernikov messages = [ 3378eb2bee6SAlexander V. Chernikov RtConst.RTM_ADD, 3388eb2bee6SAlexander V. Chernikov RtConst.RTM_DELETE, 3398eb2bee6SAlexander V. Chernikov RtConst.RTM_CHANGE, 3408eb2bee6SAlexander V. Chernikov RtConst.RTM_GET, 3418eb2bee6SAlexander V. Chernikov ] 3428eb2bee6SAlexander V. Chernikov 3438eb2bee6SAlexander V. Chernikov def __init__(self, rtm_type, rtm_seq=1, dst_sa=None, mask_sa=None): 3448eb2bee6SAlexander V. Chernikov super().__init__(rtm_type) 3458eb2bee6SAlexander V. Chernikov self.rtm_flags = 0 3468eb2bee6SAlexander V. Chernikov self.rtm_seq = rtm_seq 3478eb2bee6SAlexander V. Chernikov self._attrs = {} 3488eb2bee6SAlexander V. Chernikov self.rtm_errno = 0 3498eb2bee6SAlexander V. Chernikov self.rtm_pid = 0 3508eb2bee6SAlexander V. Chernikov self.rtm_inits = 0 3518eb2bee6SAlexander V. Chernikov self.rtm_rmx = RtMetrics() 3528eb2bee6SAlexander V. Chernikov self._orig_data = None 3538eb2bee6SAlexander V. Chernikov if dst_sa: 3548eb2bee6SAlexander V. Chernikov self.add_sa_attr(RtConst.RTA_DST, dst_sa) 3558eb2bee6SAlexander V. Chernikov if mask_sa: 3568eb2bee6SAlexander V. Chernikov self.add_sa_attr(RtConst.RTA_NETMASK, mask_sa) 3578eb2bee6SAlexander V. Chernikov 3588eb2bee6SAlexander V. Chernikov def add_sa_attr(self, attr_type, attr_bytes: bytes): 3598eb2bee6SAlexander V. Chernikov self._attrs[attr_type] = attr_bytes 3608eb2bee6SAlexander V. Chernikov 3618eb2bee6SAlexander V. Chernikov def add_ip_attr(self, attr_type, ip_addr: str, scopeid: int = 0): 3628eb2bee6SAlexander V. Chernikov if ":" in ip_addr: 3638eb2bee6SAlexander V. Chernikov self.add_ip6_attr(attr_type, ip_addr, scopeid) 3648eb2bee6SAlexander V. Chernikov else: 3658eb2bee6SAlexander V. Chernikov self.add_ip4_attr(attr_type, ip_addr) 3668eb2bee6SAlexander V. Chernikov 3678eb2bee6SAlexander V. Chernikov def add_ip4_attr(self, attr_type, ip: str): 3688eb2bee6SAlexander V. Chernikov self.add_sa_attr(attr_type, self.sa.ip_sa(ip)) 3698eb2bee6SAlexander V. Chernikov 3708eb2bee6SAlexander V. Chernikov def add_ip6_attr(self, attr_type, ip6: str, scopeid: int): 3718eb2bee6SAlexander V. Chernikov self.add_sa_attr(attr_type, self.sa.ip6_sa(ip6, scopeid)) 3728eb2bee6SAlexander V. Chernikov 3738eb2bee6SAlexander V. Chernikov def add_link_attr(self, attr_type, ifindex: Optional[int] = 0): 3748eb2bee6SAlexander V. Chernikov self.add_sa_attr(attr_type, self.sa.link_sa(ifindex)) 3758eb2bee6SAlexander V. Chernikov 3768eb2bee6SAlexander V. Chernikov def get_sa(self, attr_type) -> bytes: 3778eb2bee6SAlexander V. Chernikov return self._attrs.get(attr_type) 3788eb2bee6SAlexander V. Chernikov 3798eb2bee6SAlexander V. Chernikov def print_message(self): 3808eb2bee6SAlexander V. Chernikov # RTM_GET: Report Metrics: len 272, pid: 87839, seq 1, errno 0, flags:<UP,GATEWAY,DONE,STATIC> 3818eb2bee6SAlexander V. Chernikov if self._orig_data: 3828eb2bee6SAlexander V. Chernikov rtm_len = len(self._orig_data) 3838eb2bee6SAlexander V. Chernikov else: 3848eb2bee6SAlexander V. Chernikov rtm_len = len(bytes(self)) 3858eb2bee6SAlexander V. Chernikov print( 3868eb2bee6SAlexander V. Chernikov "{}: len {}, pid: {}, seq {}, errno {}, flags: <{}>".format( 3878eb2bee6SAlexander V. Chernikov self.rtm_type_str, 3888eb2bee6SAlexander V. Chernikov rtm_len, 3898eb2bee6SAlexander V. Chernikov self.rtm_pid, 3908eb2bee6SAlexander V. Chernikov self.rtm_seq, 3918eb2bee6SAlexander V. Chernikov self.rtm_errno, 3928eb2bee6SAlexander V. Chernikov RtConst.get_bitmask_str("RTF_", self.rtm_flags), 3938eb2bee6SAlexander V. Chernikov ) 3948eb2bee6SAlexander V. Chernikov ) 3958eb2bee6SAlexander V. Chernikov rtm_addrs = sum(list(self._attrs.keys())) 3968eb2bee6SAlexander V. Chernikov print("Addrs: <{}>".format(RtConst.get_bitmask_str("RTA_", rtm_addrs))) 3978eb2bee6SAlexander V. Chernikov for attr in sorted(self._attrs.keys()): 3988eb2bee6SAlexander V. Chernikov sa_data = SaHelper.print_sa(self._attrs[attr]) 3998eb2bee6SAlexander V. Chernikov print(" {}: {}".format(RtConst.get_name("RTA_", attr), sa_data)) 4008eb2bee6SAlexander V. Chernikov 4018eb2bee6SAlexander V. Chernikov def print_in_message(self): 4028eb2bee6SAlexander V. Chernikov print("vvvvvvvv IN vvvvvvvv") 4038eb2bee6SAlexander V. Chernikov self.print_message() 4048eb2bee6SAlexander V. Chernikov print() 4058eb2bee6SAlexander V. Chernikov 4068eb2bee6SAlexander V. Chernikov def verify_sa_inet(self, sa_data): 4078eb2bee6SAlexander V. Chernikov if len(sa_data) < 8: 4088eb2bee6SAlexander V. Chernikov raise Exception("IPv4 sa size too small: {}".format(sa_data)) 4098eb2bee6SAlexander V. Chernikov if sa_data[0] > len(sa_data): 4108eb2bee6SAlexander V. Chernikov raise Exception( 4118eb2bee6SAlexander V. Chernikov "IPv4 sin_len too big: {} vs sa size {}: {}".format( 4128eb2bee6SAlexander V. Chernikov sa_data[0], len(sa_data), sa_data 4138eb2bee6SAlexander V. Chernikov ) 4148eb2bee6SAlexander V. Chernikov ) 4158eb2bee6SAlexander V. Chernikov sin = SockaddrIn.from_buffer_copy(sa_data) 4168eb2bee6SAlexander V. Chernikov assert sin.sin_port == 0 4178eb2bee6SAlexander V. Chernikov assert sin.sin_zero == [0] * 8 4188eb2bee6SAlexander V. Chernikov 4198eb2bee6SAlexander V. Chernikov def compare_sa(self, sa_type, sa_data): 4208eb2bee6SAlexander V. Chernikov if len(sa_data) < 4: 4218eb2bee6SAlexander V. Chernikov sa_type_name = RtConst.get_name("RTA_", sa_type) 4228eb2bee6SAlexander V. Chernikov raise Exception( 4238eb2bee6SAlexander V. Chernikov "sa_len for type {} too short: {}".format(sa_type_name, len(sa_data)) 4248eb2bee6SAlexander V. Chernikov ) 4258eb2bee6SAlexander V. Chernikov our_sa = self._attrs[sa_type] 4268eb2bee6SAlexander V. Chernikov assert SaHelper.print_sa(sa_data) == SaHelper.print_sa(our_sa) 4278eb2bee6SAlexander V. Chernikov assert len(sa_data) == len(our_sa) 4288eb2bee6SAlexander V. Chernikov assert sa_data == our_sa 4298eb2bee6SAlexander V. Chernikov 4308eb2bee6SAlexander V. Chernikov def verify(self, rtm_type: int, rtm_sa): 4318eb2bee6SAlexander V. Chernikov assert self.rtm_type_str == self.print_rtm_type(rtm_type) 4328eb2bee6SAlexander V. Chernikov assert self.rtm_errno == 0 4338eb2bee6SAlexander V. Chernikov hdr = RtMsgHdr.from_buffer_copy(self._orig_data) 4348eb2bee6SAlexander V. Chernikov assert hdr._rtm_spare1 == 0 4358eb2bee6SAlexander V. Chernikov for sa_type, sa_data in rtm_sa.items(): 4368eb2bee6SAlexander V. Chernikov if sa_type not in self._attrs: 4378eb2bee6SAlexander V. Chernikov sa_type_name = RtConst.get_name("RTA_", sa_type) 4388eb2bee6SAlexander V. Chernikov raise Exception("SA type {} not present".format(sa_type_name)) 4398eb2bee6SAlexander V. Chernikov self.compare_sa(sa_type, sa_data) 4408eb2bee6SAlexander V. Chernikov 4418eb2bee6SAlexander V. Chernikov @classmethod 4428eb2bee6SAlexander V. Chernikov def from_bytes(cls, data: bytes): 4438eb2bee6SAlexander V. Chernikov if len(data) < sizeof(RtMsgHdr): 4448eb2bee6SAlexander V. Chernikov raise Exception( 4458eb2bee6SAlexander V. Chernikov "messages size {} is less than expected {}".format( 4468eb2bee6SAlexander V. Chernikov len(data), sizeof(RtMsgHdr) 4478eb2bee6SAlexander V. Chernikov ) 4488eb2bee6SAlexander V. Chernikov ) 4498eb2bee6SAlexander V. Chernikov hdr = RtMsgHdr.from_buffer_copy(data) 4508eb2bee6SAlexander V. Chernikov 4518eb2bee6SAlexander V. Chernikov self = cls(hdr.rtm_type) 4528eb2bee6SAlexander V. Chernikov self.rtm_flags = hdr.rtm_flags 4538eb2bee6SAlexander V. Chernikov self.rtm_seq = hdr.rtm_seq 4548eb2bee6SAlexander V. Chernikov self.rtm_errno = hdr.rtm_errno 4558eb2bee6SAlexander V. Chernikov self.rtm_pid = hdr.rtm_pid 4568eb2bee6SAlexander V. Chernikov self.rtm_inits = hdr.rtm_inits 4578eb2bee6SAlexander V. Chernikov self.rtm_rmx = hdr.rtm_rmx 4588eb2bee6SAlexander V. Chernikov self._orig_data = data 4598eb2bee6SAlexander V. Chernikov 4608eb2bee6SAlexander V. Chernikov off = sizeof(RtMsgHdr) 4618eb2bee6SAlexander V. Chernikov v = 1 4628eb2bee6SAlexander V. Chernikov addrs_mask = hdr.rtm_addrs 4638eb2bee6SAlexander V. Chernikov while addrs_mask: 4648eb2bee6SAlexander V. Chernikov if addrs_mask & v: 4658eb2bee6SAlexander V. Chernikov addrs_mask -= v 4668eb2bee6SAlexander V. Chernikov 4678eb2bee6SAlexander V. Chernikov if off + data[off] > len(data): 4688eb2bee6SAlexander V. Chernikov raise Exception( 4698eb2bee6SAlexander V. Chernikov "SA sizeof for {} > total message length: {}+{} > {}".format( 4708eb2bee6SAlexander V. Chernikov RtConst.get_name("RTA_", v), off, data[off], len(data) 4718eb2bee6SAlexander V. Chernikov ) 4728eb2bee6SAlexander V. Chernikov ) 4738eb2bee6SAlexander V. Chernikov self._attrs[v] = data[off : off + data[off]] 4748eb2bee6SAlexander V. Chernikov off += roundup2(data[off], RtConst.ALIGN) 4758eb2bee6SAlexander V. Chernikov v *= 2 4768eb2bee6SAlexander V. Chernikov return self 4778eb2bee6SAlexander V. Chernikov 4788eb2bee6SAlexander V. Chernikov def __bytes__(self): 4798eb2bee6SAlexander V. Chernikov sz = sizeof(RtMsgHdr) 4808eb2bee6SAlexander V. Chernikov addrs_mask = 0 4818eb2bee6SAlexander V. Chernikov for k, v in self._attrs.items(): 4828eb2bee6SAlexander V. Chernikov sz += roundup2(len(v), RtConst.ALIGN) 4838eb2bee6SAlexander V. Chernikov addrs_mask += k 4848eb2bee6SAlexander V. Chernikov hdr = RtMsgHdr( 4858eb2bee6SAlexander V. Chernikov rtm_msglen=sz, 4868eb2bee6SAlexander V. Chernikov rtm_version=RtConst.RTM_VERSION, 4878eb2bee6SAlexander V. Chernikov rtm_type=self.rtm_type, 4888eb2bee6SAlexander V. Chernikov rtm_flags=self.rtm_flags, 4898eb2bee6SAlexander V. Chernikov rtm_seq=self.rtm_seq, 4908eb2bee6SAlexander V. Chernikov rtm_addrs=addrs_mask, 4918eb2bee6SAlexander V. Chernikov rtm_inits=self.rtm_inits, 4928eb2bee6SAlexander V. Chernikov rtm_rmx=self.rtm_rmx, 4938eb2bee6SAlexander V. Chernikov ) 4948eb2bee6SAlexander V. Chernikov buf = bytearray(sz) 4958eb2bee6SAlexander V. Chernikov buf[0 : sizeof(RtMsgHdr)] = hdr 4968eb2bee6SAlexander V. Chernikov off = sizeof(RtMsgHdr) 4978eb2bee6SAlexander V. Chernikov for attr in sorted(self._attrs.keys()): 4988eb2bee6SAlexander V. Chernikov v = self._attrs[attr] 4998eb2bee6SAlexander V. Chernikov sa_len = len(v) 5008eb2bee6SAlexander V. Chernikov buf[off : off + sa_len] = v 5018eb2bee6SAlexander V. Chernikov off += roundup2(len(v), RtConst.ALIGN) 5028eb2bee6SAlexander V. Chernikov return bytes(buf) 5038eb2bee6SAlexander V. Chernikov 5048eb2bee6SAlexander V. Chernikov 5058eb2bee6SAlexander V. Chernikovclass Rtsock: 5068eb2bee6SAlexander V. Chernikov def __init__(self): 5078eb2bee6SAlexander V. Chernikov self.socket = self._setup_rtsock() 5088eb2bee6SAlexander V. Chernikov self.rtm_seq = 1 5098eb2bee6SAlexander V. Chernikov self.msgmap = self.build_msgmap() 5108eb2bee6SAlexander V. Chernikov 5118eb2bee6SAlexander V. Chernikov def build_msgmap(self): 5128eb2bee6SAlexander V. Chernikov classes = [RtsockRtMessage] 5138eb2bee6SAlexander V. Chernikov xmap = {} 5148eb2bee6SAlexander V. Chernikov for cls in classes: 5158eb2bee6SAlexander V. Chernikov for message in cls.messages: 5168eb2bee6SAlexander V. Chernikov xmap[message] = cls 5178eb2bee6SAlexander V. Chernikov return xmap 5188eb2bee6SAlexander V. Chernikov 5198eb2bee6SAlexander V. Chernikov def get_seq(self): 5208eb2bee6SAlexander V. Chernikov ret = self.rtm_seq 5218eb2bee6SAlexander V. Chernikov self.rtm_seq += 1 5228eb2bee6SAlexander V. Chernikov return ret 5238eb2bee6SAlexander V. Chernikov 5248eb2bee6SAlexander V. Chernikov def get_weight(self, weight) -> int: 5258eb2bee6SAlexander V. Chernikov if weight: 5268eb2bee6SAlexander V. Chernikov return weight 5278eb2bee6SAlexander V. Chernikov else: 5288eb2bee6SAlexander V. Chernikov return 1 # RT_DEFAULT_WEIGHT 5298eb2bee6SAlexander V. Chernikov 5308eb2bee6SAlexander V. Chernikov def new_rtm_any(self, msg_type, prefix: str, gw: Union[str, bytes]): 5318eb2bee6SAlexander V. Chernikov px = prefix.split("/") 5328eb2bee6SAlexander V. Chernikov addr_sa = SaHelper.ip_sa(px[0]) 5338eb2bee6SAlexander V. Chernikov if len(px) > 1: 5348eb2bee6SAlexander V. Chernikov pxlen = int(px[1]) 5358eb2bee6SAlexander V. Chernikov if SaHelper.is_ipv6(px[0]): 5368eb2bee6SAlexander V. Chernikov mask_sa = SaHelper.pxlen6_sa(pxlen) 5378eb2bee6SAlexander V. Chernikov else: 5388eb2bee6SAlexander V. Chernikov mask_sa = SaHelper.pxlen4_sa(pxlen) 5398eb2bee6SAlexander V. Chernikov else: 5408eb2bee6SAlexander V. Chernikov mask_sa = None 5418eb2bee6SAlexander V. Chernikov msg = RtsockRtMessage(msg_type, self.get_seq(), addr_sa, mask_sa) 5428eb2bee6SAlexander V. Chernikov if isinstance(gw, bytes): 5438eb2bee6SAlexander V. Chernikov msg.add_sa_attr(RtConst.RTA_GATEWAY, gw) 5448eb2bee6SAlexander V. Chernikov else: 5458eb2bee6SAlexander V. Chernikov # String 5468eb2bee6SAlexander V. Chernikov msg.add_ip_attr(RtConst.RTA_GATEWAY, gw) 5478eb2bee6SAlexander V. Chernikov return msg 5488eb2bee6SAlexander V. Chernikov 5498eb2bee6SAlexander V. Chernikov def new_rtm_add(self, prefix: str, gw: Union[str, bytes]): 5508eb2bee6SAlexander V. Chernikov return self.new_rtm_any(RtConst.RTM_ADD, prefix, gw) 5518eb2bee6SAlexander V. Chernikov 5528eb2bee6SAlexander V. Chernikov def new_rtm_del(self, prefix: str, gw: Union[str, bytes]): 5538eb2bee6SAlexander V. Chernikov return self.new_rtm_any(RtConst.RTM_DELETE, prefix, gw) 5548eb2bee6SAlexander V. Chernikov 5558eb2bee6SAlexander V. Chernikov def new_rtm_change(self, prefix: str, gw: Union[str, bytes]): 5568eb2bee6SAlexander V. Chernikov return self.new_rtm_any(RtConst.RTM_CHANGE, prefix, gw) 5578eb2bee6SAlexander V. Chernikov 5588eb2bee6SAlexander V. Chernikov def _setup_rtsock(self) -> socket.socket: 5598eb2bee6SAlexander V. Chernikov s = socket.socket(socket.AF_ROUTE, socket.SOCK_RAW, socket.AF_UNSPEC) 5608eb2bee6SAlexander V. Chernikov s.setsockopt(socket.SOL_SOCKET, socket.SO_USELOOPBACK, 1) 5618eb2bee6SAlexander V. Chernikov return s 5628eb2bee6SAlexander V. Chernikov 5638eb2bee6SAlexander V. Chernikov def print_hd(self, data: bytes): 5648eb2bee6SAlexander V. Chernikov width = 16 5658eb2bee6SAlexander V. Chernikov print("==========================================") 5668eb2bee6SAlexander V. Chernikov for chunk in [data[i : i + width] for i in range(0, len(data), width)]: 5678eb2bee6SAlexander V. Chernikov for b in chunk: 5688eb2bee6SAlexander V. Chernikov print("0x{:02X} ".format(b), end="") 5698eb2bee6SAlexander V. Chernikov print() 5708eb2bee6SAlexander V. Chernikov print() 5718eb2bee6SAlexander V. Chernikov 5728eb2bee6SAlexander V. Chernikov def write_message(self, msg): 5738eb2bee6SAlexander V. Chernikov print("vvvvvvvv OUT vvvvvvvv") 5748eb2bee6SAlexander V. Chernikov msg.print_message() 5758eb2bee6SAlexander V. Chernikov print() 5768eb2bee6SAlexander V. Chernikov msg_bytes = bytes(msg) 5778eb2bee6SAlexander V. Chernikov ret = os.write(self.socket.fileno(), msg_bytes) 5788eb2bee6SAlexander V. Chernikov if ret != -1: 5798eb2bee6SAlexander V. Chernikov assert ret == len(msg_bytes) 5808eb2bee6SAlexander V. Chernikov 5818eb2bee6SAlexander V. Chernikov def parse_message(self, data: bytes): 5828eb2bee6SAlexander V. Chernikov if len(data) < 4: 5838eb2bee6SAlexander V. Chernikov raise OSError("Short read from rtsock: {} bytes".format(len(data))) 5848eb2bee6SAlexander V. Chernikov rtm_type = data[4] 5858eb2bee6SAlexander V. Chernikov if rtm_type not in self.msgmap: 5868eb2bee6SAlexander V. Chernikov return None 5878eb2bee6SAlexander V. Chernikov 5888eb2bee6SAlexander V. Chernikov def write_data(self, data: bytes): 5898eb2bee6SAlexander V. Chernikov self.socket.send(data) 5908eb2bee6SAlexander V. Chernikov 5918eb2bee6SAlexander V. Chernikov def read_data(self, seq: Optional[int] = None) -> bytes: 5928eb2bee6SAlexander V. Chernikov while True: 5938eb2bee6SAlexander V. Chernikov data = self.socket.recv(4096) 5948eb2bee6SAlexander V. Chernikov if seq is None: 5958eb2bee6SAlexander V. Chernikov break 5968eb2bee6SAlexander V. Chernikov if len(data) > sizeof(RtMsgHdr): 5978eb2bee6SAlexander V. Chernikov hdr = RtMsgHdr.from_buffer_copy(data) 5988eb2bee6SAlexander V. Chernikov if hdr.rtm_seq == seq: 5998eb2bee6SAlexander V. Chernikov break 6008eb2bee6SAlexander V. Chernikov return data 6018eb2bee6SAlexander V. Chernikov 6028eb2bee6SAlexander V. Chernikov def read_message(self) -> bytes: 6038eb2bee6SAlexander V. Chernikov data = self.read_data() 6048eb2bee6SAlexander V. Chernikov return self.parse_message(data) 605