1import os,json,struct,signal,uuid
2
3from typing import Any, Dict
4
5import lldb
6from lldb.plugins.scripted_process import ScriptedProcess
7from lldb.plugins.scripted_process import ScriptedThread
8
9from lldb.macosx.crashlog import CrashLog,CrashLogParser
10
11class CrashLogScriptedProcess(ScriptedProcess):
12    def parse_crashlog(self):
13        crashlog_parser = CrashLogParser.create(self.dbg, self.crashlog_path, False)
14        crash_log = crashlog_parser.parse()
15
16        self.pid = crash_log.process_id
17        self.addr_mask = crash_log.addr_mask
18        self.crashed_thread_idx = crash_log.crashed_thread_idx
19        self.loaded_images = []
20        self.exception = crash_log.exception
21        self.app_specific_thread = None
22        if hasattr(crash_log, 'asi'):
23            self.metadata['asi'] = crash_log.asi
24        if hasattr(crash_log, 'asb'):
25            self.extended_thread_info = crash_log.asb
26
27        def load_images(self, images):
28            #TODO: Add to self.loaded_images and load images in lldb
29            if images:
30                for image in images:
31                    if image not in self.loaded_images:
32                        if image.uuid == uuid.UUID(int=0):
33                            continue
34                        err = image.add_module(self.target)
35                        if err:
36                            # Append to SBCommandReturnObject
37                            print(err)
38                        else:
39                            self.loaded_images.append(image)
40
41        for thread in crash_log.threads:
42            if self.load_all_images:
43                load_images(self, crash_log.images)
44            elif thread.did_crash():
45                for ident in thread.idents:
46                    load_images(self, crash_log.find_images_with_identifier(ident))
47
48            if hasattr(thread, 'app_specific_backtrace') and thread.app_specific_backtrace:
49                # We don't want to include the Application Specific Backtrace
50                # Thread into the Scripted Process' Thread list.
51                # Instead, we will try to extract the stackframe pcs from the
52                # backtrace and inject that as the extended thread info.
53                self.app_specific_thread = thread
54                continue
55
56            self.threads[thread.index] = CrashLogScriptedThread(self, None, thread)
57
58
59        if self.app_specific_thread:
60            self.extended_thread_info = \
61                    CrashLogScriptedThread.resolve_stackframes(self.app_specific_thread,
62                                                               self.addr_mask,
63                                                               self.target)
64
65    def __init__(self, exe_ctx: lldb.SBExecutionContext, args : lldb.SBStructuredData):
66        super().__init__(exe_ctx, args)
67
68        if not self.target or not self.target.IsValid():
69            # Return error
70            return
71
72        self.crashlog_path = None
73
74        crashlog_path = args.GetValueForKey("file_path")
75        if crashlog_path and crashlog_path.IsValid():
76            if crashlog_path.GetType() == lldb.eStructuredDataTypeString:
77                self.crashlog_path = crashlog_path.GetStringValue(4096)
78
79        if not self.crashlog_path:
80            # Return error
81            return
82
83        load_all_images = args.GetValueForKey("load_all_images")
84        if load_all_images and load_all_images.IsValid():
85            if load_all_images.GetType() == lldb.eStructuredDataTypeBoolean:
86                self.load_all_images = load_all_images.GetBooleanValue()
87
88        if not self.load_all_images:
89            self.load_all_images = False
90
91        self.pid = super().get_process_id()
92        self.crashed_thread_idx = 0
93        self.exception = None
94        self.extended_thread_info = None
95        self.parse_crashlog()
96
97    def get_memory_region_containing_address(self, addr: int) -> lldb.SBMemoryRegionInfo:
98        return None
99
100    def get_thread_with_id(self, tid: int):
101        return {}
102
103    def get_registers_for_thread(self, tid: int):
104        return {}
105
106    def read_memory_at_address(self, addr: int, size: int, error: lldb.SBError) -> lldb.SBData:
107        # NOTE: CrashLogs don't contain any memory.
108        return lldb.SBData()
109
110    def get_loaded_images(self):
111        # TODO: Iterate over corefile_target modules and build a data structure
112        # from it.
113        return self.loaded_images
114
115    def get_process_id(self) -> int:
116        return self.pid
117
118    def should_stop(self) -> bool:
119        return True
120
121    def is_alive(self) -> bool:
122        return True
123
124    def get_scripted_thread_plugin(self):
125        return CrashLogScriptedThread.__module__ + "." + CrashLogScriptedThread.__name__
126
127    def get_process_metadata(self):
128        return self.metadata
129
130class CrashLogScriptedThread(ScriptedThread):
131    def create_register_ctx(self):
132        if not self.has_crashed:
133            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
134
135        if not self.backing_thread or not len(self.backing_thread.registers):
136            return dict.fromkeys([*map(lambda reg: reg['name'], self.register_info['registers'])] , 0)
137
138        for reg in self.register_info['registers']:
139            reg_name = reg['name']
140            if reg_name in self.backing_thread.registers:
141                self.register_ctx[reg_name] = self.backing_thread.registers[reg_name]
142            else:
143                self.register_ctx[reg_name] = 0
144
145        return self.register_ctx
146
147    def resolve_stackframes(thread, addr_mask, target):
148        frames = []
149        for frame in thread.frames:
150            frame_pc = frame.pc & addr_mask
151            pc = frame_pc if frame.index == 0  or frame_pc == 0 else frame_pc - 1
152            sym_addr = lldb.SBAddress()
153            sym_addr.SetLoadAddress(pc, target)
154            if not sym_addr.IsValid():
155                continue
156            frames.append({"idx": frame.index, "pc": pc})
157        return frames
158
159
160    def create_stackframes(self):
161        if not (self.scripted_process.load_all_images or self.has_crashed):
162            return None
163
164        if not self.backing_thread or not len(self.backing_thread.frames):
165            return None
166
167        self.frames = CrashLogScriptedThread.resolve_stackframes(self.backing_thread,
168                                                                 self.scripted_process.addr_mask,
169                                                                 self.target)
170
171        return self.frames
172
173    def __init__(self, process, args, crashlog_thread):
174        super().__init__(process, args)
175
176        self.backing_thread = crashlog_thread
177        self.idx = self.backing_thread.index
178        self.tid = self.backing_thread.id
179        if self.backing_thread.app_specific_backtrace:
180            self.name = "Application Specific Backtrace - " + str(self.idx)
181        else:
182            self.name = self.backing_thread.name
183        self.queue = self.backing_thread.queue
184        self.has_crashed = (self.scripted_process.crashed_thread_idx == self.idx)
185        self.create_stackframes()
186
187    def get_state(self):
188        if not self.has_crashed:
189            return lldb.eStateStopped
190        return lldb.eStateCrashed
191
192    def get_stop_reason(self) -> Dict[str, Any]:
193        if not self.has_crashed:
194            return { "type": lldb.eStopReasonNone }
195        # TODO: Investigate what stop reason should be reported when crashed
196        stop_reason = { "type": lldb.eStopReasonException, "data": {  }}
197        if self.scripted_process.exception:
198            stop_reason['data']['mach_exception'] = self.scripted_process.exception
199        return stop_reason
200
201    def get_register_context(self) -> str:
202        if not self.register_ctx:
203            self.register_ctx = self.create_register_ctx()
204
205        return struct.pack("{}Q".format(len(self.register_ctx)), *self.register_ctx.values())
206
207    def get_extended_info(self):
208        if (self.has_crashed):
209            self.extended_info = self.scripted_process.extended_thread_info
210        return self.extended_info
211
212