1import time 2import threading 3from queue import Queue 4 5"""Instrumentation for measuring high-level time spent on various tasks inside the runner. 6 7This is lower fidelity than an actual profile, but allows custom data to be considered, 8so that we can see the time spent in specific tests and test directories. 9 10 11Instruments are intended to be used as context managers with the return value of __enter__ 12containing the user-facing API e.g. 13 14with Instrument(*args) as recording: 15 recording.set(["init"]) 16 do_init() 17 recording.pause() 18 for thread in test_threads: 19 thread.start(recording, *args) 20 for thread in test_threads: 21 thread.join() 22 recording.set(["teardown"]) # un-pauses the Instrument 23 do_teardown() 24""" 25 26class NullInstrument(object): 27 def set(self, stack): 28 """Set the current task to stack 29 30 :param stack: A list of strings defining the current task. 31 These are interpreted like a stack trace so that ["foo"] and 32 ["foo", "bar"] both show up as descendants of "foo" 33 """ 34 pass 35 36 def pause(self): 37 """Stop recording a task on the current thread. This is useful if the thread 38 is purely waiting on the results of other threads""" 39 pass 40 41 def __enter__(self): 42 return self 43 44 def __exit__(self, *args, **kwargs): 45 return 46 47 48class InstrumentWriter(object): 49 def __init__(self, queue): 50 self.queue = queue 51 52 def set(self, stack): 53 stack.insert(0, threading.current_thread().name) 54 stack = self._check_stack(stack) 55 self.queue.put(("set", threading.current_thread().ident, time.time(), stack)) 56 57 def pause(self): 58 self.queue.put(("pause", threading.current_thread().ident, time.time(), None)) 59 60 def _check_stack(self, stack): 61 assert isinstance(stack, (tuple, list)) 62 return [item.replace(" ", "_") for item in stack] 63 64 65class Instrument(object): 66 def __init__(self, file_path): 67 """Instrument that collects data from multiple threads and sums the time in each 68 thread. The output is in the format required by flamegraph.pl to enable visualisation 69 of the time spent in each task. 70 71 :param file_path: - The path on which to write instrument output. Any existing file 72 at the path will be overwritten 73 """ 74 self.path = file_path 75 self.queue = None 76 self.current = None 77 self.start_time = None 78 self.thread = None 79 80 def __enter__(self): 81 assert self.thread is None 82 assert self.queue is None 83 self.queue = Queue() 84 self.thread = threading.Thread(target=self.run) 85 self.thread.start() 86 return InstrumentWriter(self.queue) 87 88 def __exit__(self, *args, **kwargs): 89 self.queue.put(("stop", None, time.time(), None)) 90 self.thread.join() 91 self.thread = None 92 self.queue = None 93 94 def run(self): 95 known_commands = {"stop", "pause", "set"} 96 with open(self.path, "w") as f: 97 thread_data = {} 98 while True: 99 command, thread, time_stamp, stack = self.queue.get() 100 assert command in known_commands 101 102 # If we are done recording, dump the information from all threads to the file 103 # before exiting. Otherwise for either 'set' or 'pause' we only need to dump 104 # information from the current stack (if any) that was recording on the reporting 105 # thread (as that stack is no longer active). 106 items = [] 107 if command == "stop": 108 items = thread_data.values() 109 elif thread in thread_data: 110 items.append(thread_data.pop(thread)) 111 for output_stack, start_time in items: 112 f.write("%s %d\n" % (";".join(output_stack), int(1000 * (time_stamp - start_time)))) 113 114 if command == "set": 115 thread_data[thread] = (stack, time_stamp) 116 elif command == "stop": 117 break 118