1import time
2import threading
3from queue import Queue
4
5"""Instrumentation for measuring high-level time spent on various tasks inside the runner.
6
7This is lower fidelity than an actual profile, but allows custom data to be considered,
8so that we can see the time spent in specific tests and test directories.
9
10
11Instruments are intended to be used as context managers with the return value of __enter__
12containing the user-facing API e.g.
13
14with Instrument(*args) as recording:
15    recording.set(["init"])
16    do_init()
17    recording.pause()
18    for thread in test_threads:
19       thread.start(recording, *args)
20    for thread in test_threads:
21       thread.join()
22    recording.set(["teardown"])   # un-pauses the Instrument
23    do_teardown()
24"""
25
26class NullInstrument(object):
27    def set(self, stack):
28        """Set the current task to stack
29
30        :param stack: A list of strings defining the current task.
31                      These are interpreted like a stack trace so that ["foo"] and
32                      ["foo", "bar"] both show up as descendants of "foo"
33        """
34        pass
35
36    def pause(self):
37        """Stop recording a task on the current thread. This is useful if the thread
38        is purely waiting on the results of other threads"""
39        pass
40
41    def __enter__(self):
42        return self
43
44    def __exit__(self, *args, **kwargs):
45        return
46
47
48class InstrumentWriter(object):
49    def __init__(self, queue):
50        self.queue = queue
51
52    def set(self, stack):
53        stack.insert(0, threading.current_thread().name)
54        stack = self._check_stack(stack)
55        self.queue.put(("set", threading.current_thread().ident, time.time(), stack))
56
57    def pause(self):
58        self.queue.put(("pause", threading.current_thread().ident, time.time(), None))
59
60    def _check_stack(self, stack):
61        assert isinstance(stack, (tuple, list))
62        return [item.replace(" ", "_") for item in stack]
63
64
65class Instrument(object):
66    def __init__(self, file_path):
67        """Instrument that collects data from multiple threads and sums the time in each
68        thread. The output is in the format required by flamegraph.pl to enable visualisation
69        of the time spent in each task.
70
71        :param file_path: - The path on which to write instrument output. Any existing file
72                            at the path will be overwritten
73        """
74        self.path = file_path
75        self.queue = None
76        self.current = None
77        self.start_time = None
78        self.thread = None
79
80    def __enter__(self):
81        assert self.thread is None
82        assert self.queue is None
83        self.queue = Queue()
84        self.thread = threading.Thread(target=self.run)
85        self.thread.start()
86        return InstrumentWriter(self.queue)
87
88    def __exit__(self, *args, **kwargs):
89        self.queue.put(("stop", None, time.time(), None))
90        self.thread.join()
91        self.thread = None
92        self.queue = None
93
94    def run(self):
95        known_commands = {"stop", "pause", "set"}
96        with open(self.path, "w") as f:
97            thread_data = {}
98            while True:
99                command, thread, time_stamp, stack = self.queue.get()
100                assert command in known_commands
101
102                # If we are done recording, dump the information from all threads to the file
103                # before exiting. Otherwise for either 'set' or 'pause' we only need to dump
104                # information from the current stack (if any) that was recording on the reporting
105                # thread (as that stack is no longer active).
106                items = []
107                if command == "stop":
108                    items = thread_data.values()
109                elif thread in thread_data:
110                    items.append(thread_data.pop(thread))
111                for output_stack, start_time in items:
112                    f.write("%s %d\n" % (";".join(output_stack), int(1000 * (time_stamp - start_time))))
113
114                if command == "set":
115                    thread_data[thread] = (stack, time_stamp)
116                elif command == "stop":
117                    break
118