1
2import logging
3from typing import Optional, DefaultDict, Dict, List, Tuple, Set, Any, Union, TYPE_CHECKING
4from collections import defaultdict
5
6import ailment
7import pyvex
8
9from ...block import Block
10from ...knowledge_plugins.cfg.cfg_node import CFGNode
11from ...codenode import CodeNode
12from ...engines.light import SimEngineLight
13from ...knowledge_plugins.functions import Function
14from ...knowledge_plugins.key_definitions import ReachingDefinitionsModel, LiveDefinitions
15from ...knowledge_plugins.key_definitions.constants import OP_BEFORE, OP_AFTER
16from ...misc.ux import deprecated
17from ..analysis import Analysis
18from ..forward_analysis import ForwardAnalysis
19from .engine_ail import SimEngineRDAIL
20from .engine_vex import SimEngineRDVEX
21from .rd_state import ReachingDefinitionsState
22from .subject import Subject, SubjectType
23if TYPE_CHECKING:
24    from .dep_graph import DepGraph
25
26
27l = logging.getLogger(name=__name__)
28
29
30class ReachingDefinitionsAnalysis(ForwardAnalysis, Analysis):  # pylint:disable=abstract-method
31    """
32    ReachingDefinitionsAnalysis is a text-book implementation of a static data-flow analysis that works on either a
33    function or a block. It supports both VEX and AIL. By registering observers to observation points, users may use
34    this analysis to generate use-def chains, def-use chains, and reaching definitions, and perform other traditional
35    data-flow analyses such as liveness analysis.
36
37    * I've always wanted to find a better name for this analysis. Now I gave up and decided to live with this name for
38      the foreseeable future (until a better name is proposed by someone else).
39    * Aliasing is definitely a problem, and I forgot how aliasing is resolved in this implementation. I'll leave this
40      as a post-graduation TODO.
41    * Some more documentation and examples would be nice.
42    """
43
44    def __init__(self, subject: Union[Subject,ailment.Block,Block,Function]=None, func_graph=None, max_iterations=3,
45                 track_tmps=False, observation_points=None, init_state: ReachingDefinitionsState=None, cc=None,
46                 function_handler=None, call_stack: Optional[List[int]]=None, maximum_local_call_depth=5,
47                 observe_all=False, visited_blocks=None, dep_graph: Optional['DepGraph']=None, observe_callback=None,
48                 canonical_size=8):
49        """
50        :param subject:                         The subject of the analysis: a function, or a single basic block
51        :param func_graph:                      Alternative graph for function.graph.
52        :param int max_iterations:              The maximum number of iterations before the analysis is terminated.
53        :param Boolean track_tmps:              Whether or not temporary variables should be taken into consideration
54                                                during the analysis.
55        :param iterable observation_points:     A collection of tuples of ("node"|"insn", ins_addr, OP_TYPE) defining
56                                                where reaching definitions should be copied and stored. OP_TYPE can be
57                                                OP_BEFORE or OP_AFTER.
58        :param init_state:                      An optional initialization state. The analysis creates and works on a
59                                                copy.
60                                                Default to None: the analysis then initialize its own abstract state,
61                                                based on the given <Subject>.
62        :param SimCC cc:                        Calling convention of the function.
63        :param FunctionHandler function_handler:
64                                                The function handler to update the analysis state and results on
65                                                function calls.
66        :param call_stack:                      An ordered list of Function addresses representing the call stack
67                                                leading to the analysed subject, from older to newer calls. Setting it
68                                                to None to limit the analysis to a single function and disable call
69                                                stack tracking; In that case, all contexts in CodeLocation will be
70                                                None, which makes CodeLocation objects contextless.
71        :param int maximum_local_call_depth:    Maximum local function recursion depth.
72        :param Boolean observe_all:             Observe every statement, both before and after.
73        :param visited_blocks:                  A set of previously visited blocks.
74        :param dep_graph:                       An initial dependency graph to add the result of the analysis to. Set it
75                                                to None to skip dependency graph generation.
76        :param canonical_size:                  The sizes (in bytes) that objects with an UNKNOWN_SIZE are treated as
77                                                for operations where sizes are necessary.
78        """
79
80        if not isinstance(subject, Subject):
81            self._subject = Subject(subject, func_graph, cc)
82        else:
83            self._subject = subject
84        self._graph_visitor = self._subject.visitor
85
86        ForwardAnalysis.__init__(self, order_jobs=True, allow_merging=True, allow_widening=False,
87                                 graph_visitor=self._graph_visitor)
88
89        self._track_tmps = track_tmps
90        self._max_iterations = max_iterations
91        self._observation_points = observation_points
92        self._init_state = init_state
93        self._maximum_local_call_depth = maximum_local_call_depth
94        self._canonical_size = canonical_size
95
96        self._dep_graph = dep_graph
97
98        if function_handler is None:
99            self._function_handler = function_handler
100        else:
101            self._function_handler = function_handler.hook(self)
102
103        self._call_stack: Optional[List[int]] = None
104        if call_stack is not None:
105            self._call_stack = self._init_call_stack(call_stack, subject)
106
107        if self._init_state is not None:
108            self._init_state = self._init_state.copy()
109            self._init_state.analysis = self
110
111        self._observe_all = observe_all
112        self._observe_callback = observe_callback
113
114        # sanity check
115        if self._observation_points and any(not type(op) is tuple for op in self._observation_points):
116            raise ValueError('"observation_points" must be tuples.')
117
118        if type(self) is ReachingDefinitionsAnalysis and \
119                not self._observe_all and \
120                not self._observation_points and \
121                not self._observe_callback:
122            l.warning('No observation point is specified. '
123                      'You cannot get any analysis result from performing the analysis.'
124                      )
125
126        self._node_iterations: DefaultDict[int, int] = defaultdict(int)
127
128        self._engine_vex = SimEngineRDVEX(self.project, self._call_stack, self._maximum_local_call_depth,
129                                          functions=self.kb.functions,
130                                          function_handler=self._function_handler)
131        self._engine_ail = SimEngineRDAIL(self.project, self._call_stack, self._maximum_local_call_depth,
132                                          function_handler=self._function_handler)
133
134        self._visited_blocks: Set[Any] = visited_blocks or set()
135        self.model: ReachingDefinitionsModel = ReachingDefinitionsModel(
136            func_addr=self.subject.content.addr if isinstance(self.subject.content, Function) else None)
137
138        self._analyze()
139
140    def _init_call_stack(self, call_stack: List[int], subject) -> List[int]:
141        if self._subject.type == SubjectType.Function:
142            return call_stack + [subject.addr]
143        elif self._subject.type == SubjectType.Block:
144            cfg = self.kb.cfgs.get_most_accurate()
145            if cfg is None:
146                # no CFG exists
147                return call_stack
148            cfg_node = cfg.get_any_node(subject.addr)
149            if cfg_node is None:
150                # we don't know which function this node belongs to
151                return call_stack
152            function_address = cfg_node.function_address
153            function = self.kb.functions.function(function_address)
154            if len(call_stack) > 0 and call_stack[-1] == function.addr:
155                return call_stack
156            else:
157                return call_stack + [function.addr]
158        elif self._subject.type == SubjectType.CallTrace:
159            return call_stack + [self._subject.content.current_function_address()]
160        else:
161            raise ValueError('Unexpected subject type %s.' % self._subject.type)
162
163    @property
164    def observed_results(self) -> Dict[Tuple[str,int,int],LiveDefinitions]:
165        return self.model.observed_results
166
167    @property
168    def all_definitions(self):
169        return self.model.all_definitions
170
171    @all_definitions.setter
172    def all_definitions(self, v):
173        self.model.all_definitions = v
174
175    @property
176    def all_uses(self):
177        return self.model.all_uses
178
179    @property
180    def one_result(self):
181
182        if not self.observed_results:
183            raise ValueError('No result is available.')
184        if len(self.observed_results) != 1:
185            raise ValueError("More than one results are available.")
186
187        return next(iter(self.observed_results.values()))
188
189    @property
190    def dep_graph(self):
191        return self._dep_graph
192
193    @property
194    def visited_blocks(self):
195        return self._visited_blocks
196
197    def _current_local_call_depth(self):
198        return len(self._call_stack)
199
200    @deprecated(replacement="get_reaching_definitions_by_insn")
201    def get_reaching_definitions(self, ins_addr, op_type):
202        return self.get_reaching_definitions_by_insn(ins_addr, op_type)
203
204    def get_reaching_definitions_by_insn(self, ins_addr, op_type):
205        key = 'insn', ins_addr, op_type
206        if key not in self.observed_results:
207            raise KeyError(("Reaching definitions are not available at observation point %s. "
208                            "Did you specify that observation point?") % key)
209
210        return self.observed_results[key]
211
212    def get_reaching_definitions_by_node(self, node_addr, op_type):
213        key = 'node', node_addr, op_type
214        if key not in self.observed_results:
215            raise KeyError("Reaching definitions are not available at observation point %s. "
216                            "Did you specify that observation point?" % str(key))
217
218        return self.observed_results[key]
219
220    def node_observe(self, node_addr: int, state: ReachingDefinitionsState, op_type: int) -> None:
221        """
222        :param node_addr:   Address of the node.
223        :param state:       The analysis state.
224        :param op_type:     Type of the bbservation point. Must be one of the following: OP_BEFORE, OP_AFTER.
225        """
226
227        key = 'node', node_addr, op_type
228
229        observe = False
230
231        if self._observe_all:
232            observe = True
233        elif self._observation_points is not None and key in self._observation_points:
234            observe = True
235        elif self._observe_callback is not None:
236            observe = self._observe_callback('node', addr=node_addr, state=state, op_type=op_type)
237
238        if observe:
239            self.observed_results[key] = state.live_definitions
240
241    def insn_observe(self, insn_addr: int, stmt: Union[ailment.Stmt.Statement,pyvex.stmt.IRStmt],
242                     block: Union[Block,ailment.Block], state: ReachingDefinitionsState, op_type: int) -> None:
243        """
244        :param insn_addr:   Address of the instruction.
245        :param stmt:        The statement.
246        :param block:       The current block.
247        :param state:       The abstract analysis state.
248        :param op_type:     Type of the observation point. Must be one of the following: OP_BEORE, OP_AFTER.
249        """
250
251        key = 'insn', insn_addr, op_type
252        observe = False
253
254        if self._observe_all:
255            observe = True
256        elif self._observation_points is not None and key in self._observation_points:
257            observe = True
258        elif self._observe_callback is not None:
259            observe = self._observe_callback('insn', addr=insn_addr, stmt=stmt, block=block, state=state,
260                                             op_type=op_type)
261
262        if not observe:
263            return
264
265        if isinstance(stmt, pyvex.stmt.IRStmt):
266            # it's an angr block
267            vex_block = block.vex
268            # OP_BEFORE: stmt has to be IMark
269            if op_type == OP_BEFORE and type(stmt) is pyvex.stmt.IMark:
270                self.observed_results[key] = state.live_definitions.copy()
271            # OP_AFTER: stmt has to be last stmt of block or next stmt has to be IMark
272            elif op_type == OP_AFTER:
273                idx = vex_block.statements.index(stmt)
274                if idx == len(vex_block.statements) - 1 or type(
275                        vex_block.statements[idx + 1]) is pyvex.IRStmt.IMark:
276                    self.observed_results[key] = state.live_definitions.copy()
277        elif isinstance(stmt, ailment.Stmt.Statement):
278            # it's an AIL block
279            self.observed_results[key] = state.live_definitions.copy()
280
281    @property
282    def subject(self):
283        return self._subject
284
285    #
286    # Main analysis routines
287    #
288
289    def _pre_analysis(self):
290        pass
291
292    def _initial_abstract_state(self, node) -> ReachingDefinitionsState:
293        if self._init_state is not None:
294            return self._init_state
295        else:
296            return ReachingDefinitionsState(
297                self.project.arch, self.subject, track_tmps=self._track_tmps, analysis=self,
298                canonical_size=self._canonical_size,
299            )
300
301    def _merge_states(self, node, *states):
302        return states[0].merge(*states[1:])
303
304    def _run_on_node(self, node, state: ReachingDefinitionsState):
305        """
306
307        :param node:    The current node.
308        :param state:   The analysis state.
309        :return:        A tuple: (reached fix-point, successor state)
310        """
311
312        self._visited_blocks.add(node)
313
314        engine: SimEngineLight
315
316        if isinstance(node, ailment.Block):
317            block = node
318            engine = self._engine_ail
319        elif isinstance(node, (Block, CodeNode)):
320            block = self.project.factory.block(node.addr, node.size, opt_level=1, cross_insn_opt=False)
321            engine = self._engine_vex
322        elif isinstance(node, CFGNode):
323            if node.is_simprocedure or node.is_syscall:
324                return False, state.copy()
325            block = node.block
326            engine = self._engine_vex
327        else:
328            l.warning("Unsupported node type %s.", node.__class__)
329            return False, state.copy()
330
331        self.node_observe(node.addr, state, OP_BEFORE)
332
333        state = state.copy()
334        state, self._visited_blocks, self._dep_graph = engine.process(
335            state,
336            block=block,
337            fail_fast=self._fail_fast,
338            visited_blocks=self._visited_blocks,
339            dep_graph=self._dep_graph,
340        )
341
342        block_key = node.addr
343        self._node_iterations[block_key] += 1
344
345        self.node_observe(node.addr, state, OP_AFTER)
346
347        # update all definitions and all uses
348        self.all_definitions |= state.all_definitions
349        for use in [state.stack_uses, state.heap_uses, state.register_uses]:
350            self.all_uses.merge(use)
351
352        if self._node_iterations[block_key] < self._max_iterations:
353            return True, state
354        else:
355            return False, state
356
357    def _intra_analysis(self):
358        pass
359
360    def _post_analysis(self):
361        pass
362