1 2import logging 3from typing import Optional, DefaultDict, Dict, List, Tuple, Set, Any, Union, TYPE_CHECKING 4from collections import defaultdict 5 6import ailment 7import pyvex 8 9from ...block import Block 10from ...knowledge_plugins.cfg.cfg_node import CFGNode 11from ...codenode import CodeNode 12from ...engines.light import SimEngineLight 13from ...knowledge_plugins.functions import Function 14from ...knowledge_plugins.key_definitions import ReachingDefinitionsModel, LiveDefinitions 15from ...knowledge_plugins.key_definitions.constants import OP_BEFORE, OP_AFTER 16from ...misc.ux import deprecated 17from ..analysis import Analysis 18from ..forward_analysis import ForwardAnalysis 19from .engine_ail import SimEngineRDAIL 20from .engine_vex import SimEngineRDVEX 21from .rd_state import ReachingDefinitionsState 22from .subject import Subject, SubjectType 23if TYPE_CHECKING: 24 from .dep_graph import DepGraph 25 26 27l = logging.getLogger(name=__name__) 28 29 30class ReachingDefinitionsAnalysis(ForwardAnalysis, Analysis): # pylint:disable=abstract-method 31 """ 32 ReachingDefinitionsAnalysis is a text-book implementation of a static data-flow analysis that works on either a 33 function or a block. It supports both VEX and AIL. By registering observers to observation points, users may use 34 this analysis to generate use-def chains, def-use chains, and reaching definitions, and perform other traditional 35 data-flow analyses such as liveness analysis. 36 37 * I've always wanted to find a better name for this analysis. Now I gave up and decided to live with this name for 38 the foreseeable future (until a better name is proposed by someone else). 39 * Aliasing is definitely a problem, and I forgot how aliasing is resolved in this implementation. I'll leave this 40 as a post-graduation TODO. 41 * Some more documentation and examples would be nice. 42 """ 43 44 def __init__(self, subject: Union[Subject,ailment.Block,Block,Function]=None, func_graph=None, max_iterations=3, 45 track_tmps=False, observation_points=None, init_state: ReachingDefinitionsState=None, cc=None, 46 function_handler=None, call_stack: Optional[List[int]]=None, maximum_local_call_depth=5, 47 observe_all=False, visited_blocks=None, dep_graph: Optional['DepGraph']=None, observe_callback=None, 48 canonical_size=8): 49 """ 50 :param subject: The subject of the analysis: a function, or a single basic block 51 :param func_graph: Alternative graph for function.graph. 52 :param int max_iterations: The maximum number of iterations before the analysis is terminated. 53 :param Boolean track_tmps: Whether or not temporary variables should be taken into consideration 54 during the analysis. 55 :param iterable observation_points: A collection of tuples of ("node"|"insn", ins_addr, OP_TYPE) defining 56 where reaching definitions should be copied and stored. OP_TYPE can be 57 OP_BEFORE or OP_AFTER. 58 :param init_state: An optional initialization state. The analysis creates and works on a 59 copy. 60 Default to None: the analysis then initialize its own abstract state, 61 based on the given <Subject>. 62 :param SimCC cc: Calling convention of the function. 63 :param FunctionHandler function_handler: 64 The function handler to update the analysis state and results on 65 function calls. 66 :param call_stack: An ordered list of Function addresses representing the call stack 67 leading to the analysed subject, from older to newer calls. Setting it 68 to None to limit the analysis to a single function and disable call 69 stack tracking; In that case, all contexts in CodeLocation will be 70 None, which makes CodeLocation objects contextless. 71 :param int maximum_local_call_depth: Maximum local function recursion depth. 72 :param Boolean observe_all: Observe every statement, both before and after. 73 :param visited_blocks: A set of previously visited blocks. 74 :param dep_graph: An initial dependency graph to add the result of the analysis to. Set it 75 to None to skip dependency graph generation. 76 :param canonical_size: The sizes (in bytes) that objects with an UNKNOWN_SIZE are treated as 77 for operations where sizes are necessary. 78 """ 79 80 if not isinstance(subject, Subject): 81 self._subject = Subject(subject, func_graph, cc) 82 else: 83 self._subject = subject 84 self._graph_visitor = self._subject.visitor 85 86 ForwardAnalysis.__init__(self, order_jobs=True, allow_merging=True, allow_widening=False, 87 graph_visitor=self._graph_visitor) 88 89 self._track_tmps = track_tmps 90 self._max_iterations = max_iterations 91 self._observation_points = observation_points 92 self._init_state = init_state 93 self._maximum_local_call_depth = maximum_local_call_depth 94 self._canonical_size = canonical_size 95 96 self._dep_graph = dep_graph 97 98 if function_handler is None: 99 self._function_handler = function_handler 100 else: 101 self._function_handler = function_handler.hook(self) 102 103 self._call_stack: Optional[List[int]] = None 104 if call_stack is not None: 105 self._call_stack = self._init_call_stack(call_stack, subject) 106 107 if self._init_state is not None: 108 self._init_state = self._init_state.copy() 109 self._init_state.analysis = self 110 111 self._observe_all = observe_all 112 self._observe_callback = observe_callback 113 114 # sanity check 115 if self._observation_points and any(not type(op) is tuple for op in self._observation_points): 116 raise ValueError('"observation_points" must be tuples.') 117 118 if type(self) is ReachingDefinitionsAnalysis and \ 119 not self._observe_all and \ 120 not self._observation_points and \ 121 not self._observe_callback: 122 l.warning('No observation point is specified. ' 123 'You cannot get any analysis result from performing the analysis.' 124 ) 125 126 self._node_iterations: DefaultDict[int, int] = defaultdict(int) 127 128 self._engine_vex = SimEngineRDVEX(self.project, self._call_stack, self._maximum_local_call_depth, 129 functions=self.kb.functions, 130 function_handler=self._function_handler) 131 self._engine_ail = SimEngineRDAIL(self.project, self._call_stack, self._maximum_local_call_depth, 132 function_handler=self._function_handler) 133 134 self._visited_blocks: Set[Any] = visited_blocks or set() 135 self.model: ReachingDefinitionsModel = ReachingDefinitionsModel( 136 func_addr=self.subject.content.addr if isinstance(self.subject.content, Function) else None) 137 138 self._analyze() 139 140 def _init_call_stack(self, call_stack: List[int], subject) -> List[int]: 141 if self._subject.type == SubjectType.Function: 142 return call_stack + [subject.addr] 143 elif self._subject.type == SubjectType.Block: 144 cfg = self.kb.cfgs.get_most_accurate() 145 if cfg is None: 146 # no CFG exists 147 return call_stack 148 cfg_node = cfg.get_any_node(subject.addr) 149 if cfg_node is None: 150 # we don't know which function this node belongs to 151 return call_stack 152 function_address = cfg_node.function_address 153 function = self.kb.functions.function(function_address) 154 if len(call_stack) > 0 and call_stack[-1] == function.addr: 155 return call_stack 156 else: 157 return call_stack + [function.addr] 158 elif self._subject.type == SubjectType.CallTrace: 159 return call_stack + [self._subject.content.current_function_address()] 160 else: 161 raise ValueError('Unexpected subject type %s.' % self._subject.type) 162 163 @property 164 def observed_results(self) -> Dict[Tuple[str,int,int],LiveDefinitions]: 165 return self.model.observed_results 166 167 @property 168 def all_definitions(self): 169 return self.model.all_definitions 170 171 @all_definitions.setter 172 def all_definitions(self, v): 173 self.model.all_definitions = v 174 175 @property 176 def all_uses(self): 177 return self.model.all_uses 178 179 @property 180 def one_result(self): 181 182 if not self.observed_results: 183 raise ValueError('No result is available.') 184 if len(self.observed_results) != 1: 185 raise ValueError("More than one results are available.") 186 187 return next(iter(self.observed_results.values())) 188 189 @property 190 def dep_graph(self): 191 return self._dep_graph 192 193 @property 194 def visited_blocks(self): 195 return self._visited_blocks 196 197 def _current_local_call_depth(self): 198 return len(self._call_stack) 199 200 @deprecated(replacement="get_reaching_definitions_by_insn") 201 def get_reaching_definitions(self, ins_addr, op_type): 202 return self.get_reaching_definitions_by_insn(ins_addr, op_type) 203 204 def get_reaching_definitions_by_insn(self, ins_addr, op_type): 205 key = 'insn', ins_addr, op_type 206 if key not in self.observed_results: 207 raise KeyError(("Reaching definitions are not available at observation point %s. " 208 "Did you specify that observation point?") % key) 209 210 return self.observed_results[key] 211 212 def get_reaching_definitions_by_node(self, node_addr, op_type): 213 key = 'node', node_addr, op_type 214 if key not in self.observed_results: 215 raise KeyError("Reaching definitions are not available at observation point %s. " 216 "Did you specify that observation point?" % str(key)) 217 218 return self.observed_results[key] 219 220 def node_observe(self, node_addr: int, state: ReachingDefinitionsState, op_type: int) -> None: 221 """ 222 :param node_addr: Address of the node. 223 :param state: The analysis state. 224 :param op_type: Type of the bbservation point. Must be one of the following: OP_BEFORE, OP_AFTER. 225 """ 226 227 key = 'node', node_addr, op_type 228 229 observe = False 230 231 if self._observe_all: 232 observe = True 233 elif self._observation_points is not None and key in self._observation_points: 234 observe = True 235 elif self._observe_callback is not None: 236 observe = self._observe_callback('node', addr=node_addr, state=state, op_type=op_type) 237 238 if observe: 239 self.observed_results[key] = state.live_definitions 240 241 def insn_observe(self, insn_addr: int, stmt: Union[ailment.Stmt.Statement,pyvex.stmt.IRStmt], 242 block: Union[Block,ailment.Block], state: ReachingDefinitionsState, op_type: int) -> None: 243 """ 244 :param insn_addr: Address of the instruction. 245 :param stmt: The statement. 246 :param block: The current block. 247 :param state: The abstract analysis state. 248 :param op_type: Type of the observation point. Must be one of the following: OP_BEORE, OP_AFTER. 249 """ 250 251 key = 'insn', insn_addr, op_type 252 observe = False 253 254 if self._observe_all: 255 observe = True 256 elif self._observation_points is not None and key in self._observation_points: 257 observe = True 258 elif self._observe_callback is not None: 259 observe = self._observe_callback('insn', addr=insn_addr, stmt=stmt, block=block, state=state, 260 op_type=op_type) 261 262 if not observe: 263 return 264 265 if isinstance(stmt, pyvex.stmt.IRStmt): 266 # it's an angr block 267 vex_block = block.vex 268 # OP_BEFORE: stmt has to be IMark 269 if op_type == OP_BEFORE and type(stmt) is pyvex.stmt.IMark: 270 self.observed_results[key] = state.live_definitions.copy() 271 # OP_AFTER: stmt has to be last stmt of block or next stmt has to be IMark 272 elif op_type == OP_AFTER: 273 idx = vex_block.statements.index(stmt) 274 if idx == len(vex_block.statements) - 1 or type( 275 vex_block.statements[idx + 1]) is pyvex.IRStmt.IMark: 276 self.observed_results[key] = state.live_definitions.copy() 277 elif isinstance(stmt, ailment.Stmt.Statement): 278 # it's an AIL block 279 self.observed_results[key] = state.live_definitions.copy() 280 281 @property 282 def subject(self): 283 return self._subject 284 285 # 286 # Main analysis routines 287 # 288 289 def _pre_analysis(self): 290 pass 291 292 def _initial_abstract_state(self, node) -> ReachingDefinitionsState: 293 if self._init_state is not None: 294 return self._init_state 295 else: 296 return ReachingDefinitionsState( 297 self.project.arch, self.subject, track_tmps=self._track_tmps, analysis=self, 298 canonical_size=self._canonical_size, 299 ) 300 301 def _merge_states(self, node, *states): 302 return states[0].merge(*states[1:]) 303 304 def _run_on_node(self, node, state: ReachingDefinitionsState): 305 """ 306 307 :param node: The current node. 308 :param state: The analysis state. 309 :return: A tuple: (reached fix-point, successor state) 310 """ 311 312 self._visited_blocks.add(node) 313 314 engine: SimEngineLight 315 316 if isinstance(node, ailment.Block): 317 block = node 318 engine = self._engine_ail 319 elif isinstance(node, (Block, CodeNode)): 320 block = self.project.factory.block(node.addr, node.size, opt_level=1, cross_insn_opt=False) 321 engine = self._engine_vex 322 elif isinstance(node, CFGNode): 323 if node.is_simprocedure or node.is_syscall: 324 return False, state.copy() 325 block = node.block 326 engine = self._engine_vex 327 else: 328 l.warning("Unsupported node type %s.", node.__class__) 329 return False, state.copy() 330 331 self.node_observe(node.addr, state, OP_BEFORE) 332 333 state = state.copy() 334 state, self._visited_blocks, self._dep_graph = engine.process( 335 state, 336 block=block, 337 fail_fast=self._fail_fast, 338 visited_blocks=self._visited_blocks, 339 dep_graph=self._dep_graph, 340 ) 341 342 block_key = node.addr 343 self._node_iterations[block_key] += 1 344 345 self.node_observe(node.addr, state, OP_AFTER) 346 347 # update all definitions and all uses 348 self.all_definitions |= state.all_definitions 349 for use in [state.stack_uses, state.heap_uses, state.register_uses]: 350 self.all_uses.merge(use) 351 352 if self._node_iterations[block_key] < self._max_iterations: 353 return True, state 354 else: 355 return False, state 356 357 def _intra_analysis(self): 358 pass 359 360 def _post_analysis(self): 361 pass 362