1#!/usr/bin/env python3 2 3import base64 4import logging 5import os 6import sys 7import tempfile 8 9logger = logging.getLogger(__name__) 10 11 12class LogTracker(logging.Handler): 13 LOG_LEVELS = { 14 "DEBUG": logging.DEBUG, 15 "INFO": logging.INFO, 16 "WARNING": logging.WARNING, 17 "ERROR": logging.ERROR, 18 } 19 20 def __init__(self): 21 super().__init__() 22 self._formatter = logging.Formatter( 23 '%(levelname)-6s %(asctime)-15s %(thread)-8X' 24 ' %(name)-10s %(message)s' 25 ) 26 self.outfile = None 27 28 def emit(self, record): 29 line = self._formatter.format(record) 30 sys.stderr.write(line + "\n") 31 32 def on_uncatched_exception_cb(self, exc_type, exc_value, exc_tb): 33 logger.error( 34 "=== UNCATCHED EXCEPTION ===", 35 exc_info=(exc_type, exc_value, exc_tb) 36 ) 37 logger.error( 38 "===========================" 39 ) 40 41 def init(self): 42 self.outfile = tempfile.NamedTemporaryFile( 43 mode="w+", newline='\n', prefix='ironscanner_', suffix='.txt', 44 delete=False 45 ) 46 sys.stderr.write( 47 "Logs will be stored temporarily in: {}\n".format( 48 self.outfile.name 49 ) 50 ) 51 52 os.dup2(self.outfile.fileno(), sys.stdout.fileno()) 53 os.dup2(self.outfile.fileno(), sys.stderr.fileno()) 54 55 logger = logging.getLogger() 56 logger.addHandler(self) 57 sys.excepthook = self.on_uncatched_exception_cb 58 logger.setLevel(logging.DEBUG) 59 60 def get_logs(self): 61 self.outfile.flush() 62 logs = None 63 with open(self.outfile.name, 'r') as fd: 64 logs = fd.read() 65 self.cleanup() 66 return logs 67 68 def complete_report(self, report): 69 traces = "" 70 try: 71 logs = self.get_logs() 72 traces = base64.encodebytes(logs.encode("utf-8")).decode("utf-8") 73 except Exception as exc: 74 traces = "(Exception: {})".format(str(exc)) 75 logger.error("Exception while encoding traces", exc_info=exc) 76 report['traces'] = traces 77 78 def cleanup(self): 79 if os.path.exists(self.outfile.name): 80 # os.unlink(self.outfile.name) 81 pass 82 83 def __str__(self): 84 return "Traces" 85