1import os,json,struct,signal,uuid 2 3from typing import Any, Dict 4 5import lldb 6from lldb.plugins.scripted_process import ScriptedProcess 7from lldb.plugins.scripted_process import ScriptedThread 8 9from lldb.macosx.crashlog import CrashLog,CrashLogParser 10 11class CrashLogScriptedProcess(ScriptedProcess): 12 def parse_crashlog(self): 13 crashlog_parser = CrashLogParser.create(self.dbg, self.crashlog_path, False) 14 crash_log = crashlog_parser.parse() 15 16 self.pid = crash_log.process_id 17 self.addr_mask = crash_log.addr_mask 18 self.crashed_thread_idx = crash_log.crashed_thread_idx 19 self.loaded_images = [] 20 self.exception = crash_log.exception 21 self.app_specific_thread = None 22 if hasattr(crash_log, 'asi'): 23 self.metadata['asi'] = crash_log.asi 24 if hasattr(crash_log, 'asb'): 25 self.extended_thread_info = crash_log.asb 26 27 def load_images(self, images): 28 #TODO: Add to self.loaded_images and load images in lldb 29 if images: 30 for image in images: 31 if image not in self.loaded_images: 32 if image.uuid == uuid.UUID(int=0): 33 continue 34 err = image.add_module(self.target) 35 if err: 36 # Append to SBCommandReturnObject 37 print(err) 38 else: 39 self.loaded_images.append(image) 40 41 for thread in crash_log.threads: 42 if self.load_all_images: 43 load_images(self, crash_log.images) 44 elif thread.did_crash(): 45 for ident in thread.idents: 46 load_images(self, crash_log.find_images_with_identifier(ident)) 47 48 if hasattr(thread, 'app_specific_backtrace') and thread.app_specific_backtrace: 49 # We don't want to include the Application Specific Backtrace 50 # Thread into the Scripted Process' Thread list. 51 # Instead, we will try to extract the stackframe pcs from the 52 # backtrace and inject that as the extended thread info. 53 self.app_specific_thread = thread 54 continue 55 56 self.threads[thread.index] = CrashLogScriptedThread(self, None, thread) 57 58 59 if self.app_specific_thread: 60 self.extended_thread_info = \ 61 CrashLogScriptedThread.resolve_stackframes(self.app_specific_thread, 62 self.addr_mask, 63 self.target) 64 65 def __init__(self, exe_ctx: lldb.SBExecutionContext, args : lldb.SBStructuredData): 66 super().__init__(exe_ctx, args) 67 68 if not self.target or not self.target.IsValid(): 69 # Return error 70 return 71 72 self.crashlog_path = None 73 74 crashlog_path = args.GetValueForKey("file_path") 75 if crashlog_path and crashlog_path.IsValid(): 76 if crashlog_path.GetType() == lldb.eStructuredDataTypeString: 77 self.crashlog_path = crashlog_path.GetStringValue(4096) 78 79 if not self.crashlog_path: 80 # Return error 81 return 82 83 load_all_images = args.GetValueForKey("load_all_images") 84 if load_all_images and load_all_images.IsValid(): 85 if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean: 86 self.load_all_images = load_all_images.GetBooleanValue() 87 88 if not self.load_all_images: 89 self.load_all_images = False 90 91 self.pid = super().get_process_id() 92 self.crashed_thread_idx = 0 93 self.exception = None 94 self.extended_thread_info = None 95 self.parse_crashlog() 96 97 def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo: 98 return None 99 100 def get_thread_with_id(self, tid: int): 101 return {} 102 103 def get_registers_for_thread(self, tid: int): 104 return {} 105 106 def read_memory_at_address(self, addr: int, size: int, error: lldb.SBError) -> lldb.SBData: 107 # NOTE: CrashLogs don't contain any memory. 108 return lldb.SBData() 109 110 def get_loaded_images(self): 111 # TODO: Iterate over corefile_target modules and build a data structure 112 # from it. 113 return self.loaded_images 114 115 def get_process_id(self) -> int: 116 return self.pid 117 118 def should_stop(self) -> bool: 119 return True 120 121 def is_alive(self) -> bool: 122 return True 123 124 def get_scripted_thread_plugin(self): 125 return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__ 126 127 def get_process_metadata(self): 128 return self.metadata 129 130class CrashLogScriptedThread(ScriptedThread): 131 def create_register_ctx(self): 132 if not self.has_crashed: 133 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 134 135 if not self.backing_thread or not len(self.backing_thread.registers): 136 return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0) 137 138 for reg in self.register_info['registers']: 139 reg_name = reg['name'] 140 if reg_name in self.backing_thread.registers: 141 self.register_ctx[reg_name] = self.backing_thread.registers[reg_name] 142 else: 143 self.register_ctx[reg_name] = 0 144 145 return self.register_ctx 146 147 def resolve_stackframes(thread, addr_mask, target): 148 frames = [] 149 for frame in thread.frames: 150 frame_pc = frame.pc & addr_mask 151 pc = frame_pc if frame.index == 0 or frame_pc == 0 else frame_pc - 1 152 sym_addr = lldb.SBAddress() 153 sym_addr.SetLoadAddress(pc, target) 154 if not sym_addr.IsValid(): 155 continue 156 frames.append({"idx": frame.index, "pc": pc}) 157 return frames 158 159 160 def create_stackframes(self): 161 if not (self.scripted_process.load_all_images or self.has_crashed): 162 return None 163 164 if not self.backing_thread or not len(self.backing_thread.frames): 165 return None 166 167 self.frames = CrashLogScriptedThread.resolve_stackframes(self.backing_thread, 168 self.scripted_process.addr_mask, 169 self.target) 170 171 return self.frames 172 173 def __init__(self, process, args, crashlog_thread): 174 super().__init__(process, args) 175 176 self.backing_thread = crashlog_thread 177 self.idx = self.backing_thread.index 178 self.tid = self.backing_thread.id 179 if self.backing_thread.app_specific_backtrace: 180 self.name = "Application Specific Backtrace - " + str(self.idx) 181 else: 182 self.name = self.backing_thread.name 183 self.queue = self.backing_thread.queue 184 self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx) 185 self.create_stackframes() 186 187 def get_state(self): 188 if not self.has_crashed: 189 return lldb.eStateStopped 190 return lldb.eStateCrashed 191 192 def get_stop_reason(self) -> Dict[str, Any]: 193 if not self.has_crashed: 194 return { "type": lldb.eStopReasonNone } 195 # TODO: Investigate what stop reason should be reported when crashed 196 stop_reason = { "type": lldb.eStopReasonException, "data": { }} 197 if self.scripted_process.exception: 198 stop_reason['data']['mach_exception'] = self.scripted_process.exception 199 return stop_reason 200 201 def get_register_context(self) -> str: 202 if not self.register_ctx: 203 self.register_ctx = self.create_register_ctx() 204 205 return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values()) 206 207 def get_extended_info(self): 208 if (self.has_crashed): 209 self.extended_info = self.scripted_process.extended_thread_info 210 return self.extended_info 211 212