1import dis
2import os.path
3import re
4import subprocess
5import sys
6import types
7import unittest
8
9from test.support import findfile
10
11
12def abspath(filename):
13    return os.path.abspath(findfile(filename, subdir="dtracedata"))
14
15
16def normalize_trace_output(output):
17    """Normalize DTrace output for comparison.
18
19    DTrace keeps a per-CPU buffer, and when showing the fired probes, buffers
20    are concatenated. So if the operating system moves our thread around, the
21    straight result can be "non-causal". So we add timestamps to the probe
22    firing, sort by that field, then strip it from the output"""
23
24    # When compiling with '--with-pydebug', strip '[# refs]' debug output.
25    output = re.sub(r"\[[0-9]+ refs\]", "", output)
26    try:
27        result = [
28            row.split("\t")
29            for row in output.splitlines()
30            if row and not row.startswith('#')
31        ]
32        result.sort(key=lambda row: int(row[0]))
33        result = [row[1] for row in result]
34        return "\n".join(result)
35    except (IndexError, ValueError):
36        raise AssertionError(
37            "tracer produced unparsable output:\n{}".format(output)
38        )
39
40
41class TraceBackend:
42    EXTENSION = None
43    COMMAND = None
44    COMMAND_ARGS = []
45
46    def run_case(self, name, optimize_python=None):
47        actual_output = normalize_trace_output(self.trace_python(
48            script_file=abspath(name + self.EXTENSION),
49            python_file=abspath(name + ".py"),
50            optimize_python=optimize_python))
51
52        with open(abspath(name + self.EXTENSION + ".expected")) as f:
53            expected_output = f.read().rstrip()
54
55        return (expected_output, actual_output)
56
57    def generate_trace_command(self, script_file, subcommand=None):
58        command = self.COMMAND + [script_file]
59        if subcommand:
60            command += ["-c", subcommand]
61        return command
62
63    def trace(self, script_file, subcommand=None):
64        command = self.generate_trace_command(script_file, subcommand)
65        stdout, _ = subprocess.Popen(command,
66                                     stdout=subprocess.PIPE,
67                                     stderr=subprocess.STDOUT,
68                                     universal_newlines=True).communicate()
69        return stdout
70
71    def trace_python(self, script_file, python_file, optimize_python=None):
72        python_flags = []
73        if optimize_python:
74            python_flags.extend(["-O"] * optimize_python)
75        subcommand = " ".join([sys.executable] + python_flags + [python_file])
76        return self.trace(script_file, subcommand)
77
78    def assert_usable(self):
79        try:
80            output = self.trace(abspath("assert_usable" + self.EXTENSION))
81            output = output.strip()
82        except (FileNotFoundError, NotADirectoryError, PermissionError) as fnfe:
83            output = str(fnfe)
84        if output != "probe: success":
85            raise unittest.SkipTest(
86                "{}(1) failed: {}".format(self.COMMAND[0], output)
87            )
88
89
90class DTraceBackend(TraceBackend):
91    EXTENSION = ".d"
92    COMMAND = ["dtrace", "-q", "-s"]
93
94
95class SystemTapBackend(TraceBackend):
96    EXTENSION = ".stp"
97    COMMAND = ["stap", "-g"]
98
99
100class TraceTests:
101    # unittest.TestCase options
102    maxDiff = None
103
104    # TraceTests options
105    backend = None
106    optimize_python = 0
107
108    @classmethod
109    def setUpClass(self):
110        self.backend.assert_usable()
111
112    def run_case(self, name):
113        actual_output, expected_output = self.backend.run_case(
114            name, optimize_python=self.optimize_python)
115        self.assertEqual(actual_output, expected_output)
116
117    def test_function_entry_return(self):
118        self.run_case("call_stack")
119
120    def test_verify_call_opcodes(self):
121        """Ensure our call stack test hits all function call opcodes"""
122
123        opcodes = set(["CALL_FUNCTION", "CALL_FUNCTION_EX", "CALL_FUNCTION_KW"])
124
125        with open(abspath("call_stack.py")) as f:
126            code_string = f.read()
127
128        def get_function_instructions(funcname):
129            # Recompile with appropriate optimization setting
130            code = compile(source=code_string,
131                           filename="<string>",
132                           mode="exec",
133                           optimize=self.optimize_python)
134
135            for c in code.co_consts:
136                if isinstance(c, types.CodeType) and c.co_name == funcname:
137                    return dis.get_instructions(c)
138            return []
139
140        for instruction in get_function_instructions('start'):
141            opcodes.discard(instruction.opname)
142
143        self.assertEqual(set(), opcodes)
144
145    def test_gc(self):
146        self.run_case("gc")
147
148    def test_line(self):
149        self.run_case("line")
150
151
152class DTraceNormalTests(TraceTests, unittest.TestCase):
153    backend = DTraceBackend()
154    optimize_python = 0
155
156
157class DTraceOptimizedTests(TraceTests, unittest.TestCase):
158    backend = DTraceBackend()
159    optimize_python = 2
160
161
162class SystemTapNormalTests(TraceTests, unittest.TestCase):
163    backend = SystemTapBackend()
164    optimize_python = 0
165
166
167class SystemTapOptimizedTests(TraceTests, unittest.TestCase):
168    backend = SystemTapBackend()
169    optimize_python = 2
170
171
172if __name__ == '__main__':
173    test_main()
174