1#!/usr/bin/env python3
2import os
3import socket
4import struct
5import subprocess
6import sys
7from ctypes import c_byte
8from ctypes import c_char
9from ctypes import c_int
10from ctypes import c_long
11from ctypes import c_uint32
12from ctypes import c_uint8
13from ctypes import c_ulong
14from ctypes import c_ushort
15from ctypes import sizeof
16from ctypes import Structure
17from enum import Enum
18from typing import Any
19from typing import Dict
20from typing import List
21from typing import NamedTuple
22from typing import Optional
23from typing import Union
24
25from atf_python.sys.netpfil.ipfw.ioctl import get3_classes
26from atf_python.sys.netpfil.ipfw.ioctl import legacy_classes
27from atf_python.sys.netpfil.ipfw.ioctl import set3_classes
28from atf_python.sys.netpfil.ipfw.utils import AttrDescr
29from atf_python.sys.netpfil.ipfw.utils import enum_from_int
30from atf_python.sys.netpfil.ipfw.utils import enum_or_int
31from atf_python.sys.netpfil.ipfw.utils import prepare_attrs_map
32
33
34class DebugHeader(Structure):
35    _fields_ = [
36        ("cmd_type", c_ushort),
37        ("spare1", c_ushort),
38        ("opt_name", c_uint32),
39        ("total_len", c_uint32),
40        ("spare2", c_uint32),
41    ]
42
43
44class DebugType(Enum):
45    DO_CMD = 1
46    DO_SET3 = 2
47    DO_GET3 = 3
48
49
50class DebugIoReader(object):
51    HANDLER_CLASSES = {
52        DebugType.DO_CMD: legacy_classes,
53        DebugType.DO_SET3: set3_classes,
54        DebugType.DO_GET3: get3_classes,
55    }
56
57    def __init__(self, ipfw_path):
58        self._msgmap = self.build_msgmap()
59        self.ipfw_path = ipfw_path
60
61    def build_msgmap(self):
62        xmap = {}
63        for debug_type, handler_classes in self.HANDLER_CLASSES.items():
64            debug_type = enum_or_int(debug_type)
65            if debug_type not in xmap:
66                xmap[debug_type] = {}
67            for handler_class in handler_classes:
68                for msg in handler_class.messages:
69                    xmap[debug_type][enum_or_int(msg)] = handler_class
70        return xmap
71
72    def print_obj_header(self, hdr):
73        debug_type = "#{}".format(hdr.cmd_type)
74        for _type in self.HANDLER_CLASSES.keys():
75            if _type.value == hdr.cmd_type:
76                debug_type = _type.name.lower()
77                break
78        print(
79            "@@ record for {} len={} optname={}".format(
80                debug_type, hdr.total_len, hdr.opt_name
81            )
82        )
83
84    def parse_record(self, data):
85        hdr = DebugHeader.from_buffer_copy(data[: sizeof(DebugHeader)])
86        data = data[sizeof(DebugHeader) :]
87        cls = self._msgmap[hdr.cmd_type].get(hdr.opt_name)
88        if cls is not None:
89            return cls.from_bytes(data)
90        raise ValueError(
91            "unsupported cmd_type={} opt_name={}".format(hdr.cmd_type, hdr.opt_name)
92        )
93
94    def get_record_from_stdin(self):
95        data = sys.stdin.buffer.peek(sizeof(DebugHeader))
96        if len(data) == 0:
97            return None
98
99        hdr = DebugHeader.from_buffer_copy(data)
100        data = sys.stdin.buffer.read(hdr.total_len)
101        return self.parse_record(data)
102
103    def get_records_from_buffer(self, data):
104        off = 0
105        ret = []
106        while off + sizeof(DebugHeader) <= len(data):
107            hdr = DebugHeader.from_buffer_copy(data[off : off + sizeof(DebugHeader)])
108            ret.append(self.parse_record(data[off : off + hdr.total_len]))
109            off += hdr.total_len
110        return ret
111
112    def run_ipfw(self, cmd: str) -> bytes:
113        args = [self.ipfw_path, "-xqn"] + cmd.split()
114        r = subprocess.run(args, capture_output=True)
115        return r.stdout
116
117    def get_records(self, cmd: str):
118        return self.get_records_from_buffer(self.run_ipfw(cmd))
119