1# -*- coding: utf-8 -*- 2""" 3Adapted from http://code.activestate.com/recipes/577334/ 4 5by László Nagy, released under the MIT license. 6""" 7 8import os 9import sys 10import threading 11import time 12import traceback 13 14def _get_stack_traces(): 15 code = [] 16 threads = dict((t.ident, t.name) for t in threading.enumerate()) 17 for threadId, stack in sys._current_frames().items(): 18 if threadId == threading.current_thread().ident: 19 continue 20 threadName = threads.get(threadId, 'Unknown') 21 code.append('\n# Thread: %s (%s)' % (threadId, threadName)) 22 for filename, lineno, name, line in traceback.extract_stack(stack): 23 code.append('File: %r, line %d, in %s' % (filename, lineno, name)) 24 if line: 25 code.append(' %s' % (line.strip())) 26 27 return '\n'.join(code) 28 29 30class TraceDumper(threading.Thread): 31 """Dump stack traces into a given file periodically.""" 32 33 def __init__(self, path, interval): 34 """ 35 @param path: File path to output stack trace info. 36 @param interval: in seconds - how often to update the trace file. 37 """ 38 assert(interval > 0.1) 39 self.interval = interval 40 self.path = os.path.abspath(path) 41 self.stop_requested = threading.Event() 42 threading.Thread.__init__(self) 43 44 def run(self): 45 while not self.stop_requested.isSet(): 46 time.sleep(self.interval) 47 self.write_stack_traces() 48 49 def stop(self): 50 self.stop_requested.set() 51 self.join() 52 53 def write_stack_traces(self): 54 with open(self.path, 'w') as out: 55 out.write(_get_stack_traces()) 56 57 58_tracer = None 59 60def start_trace(path, interval=5): 61 """Start tracing into the given file.""" 62 global _tracer 63 if _tracer is None: 64 _tracer = TraceDumper(path, interval) 65 _tracer.daemon = True 66 _tracer.start() 67 else: 68 raise Exception('Already tracing to %s' % _tracer.path) 69 70 71def stop_trace(): 72 """Stop tracing.""" 73 global _tracer 74 if _tracer is not None: 75 _tracer.stop() 76 _tracer = None 77