1from . import ExplorationTechnique
2from .common import condition_to_lambda
3from .. import sim_options
4
5import logging
6l = logging.getLogger(name=__name__)
7
8class Explorer(ExplorationTechnique):
9    """
10    Search for up to "num_find" paths that satisfy condition "find", avoiding condition "avoid". Stashes found paths into "find_stash' and avoided paths into "avoid_stash".
11
12    The "find" and "avoid" parameters may be any of:
13
14    - An address to find
15    - A set or list of addresses to find
16    - A function that takes a path and returns whether or not it matches.
17
18    If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then
19    any paths which cannot possibly reach a success state without going through a failure state will be
20    preemptively avoided.
21
22    If either the "find" or "avoid" parameter is a function returning a boolean, and a path triggers both conditions, it will be added to the find stash, unless "avoid_priority" is set to True.
23    """
24    def __init__(self, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, avoid_priority=False):
25        super(Explorer, self).__init__()
26        self.find, static_find = condition_to_lambda(find)
27        self.avoid, static_avoid = condition_to_lambda(avoid)
28        self.find_stash = find_stash
29        self.avoid_stash = avoid_stash
30        self.cfg = cfg
31        self.ok_blocks = set()
32        self.num_find = num_find
33        self.avoid_priority = avoid_priority
34
35        # even if avoid or find addresses are not statically known, stop on those that we do know
36        self._extra_stop_points = (static_find or set()) | (static_avoid or set())
37        self._unknown_stop_points = static_find is None or static_avoid is None
38        self._warned_unicorn = False
39
40        # TODO: This is a hack for while CFGFast doesn't handle procedure continuations
41        from .. import analyses
42        if isinstance(cfg, analyses.CFGFast):
43            l.error("CFGFast is currently inappropriate for use with Explorer.")
44            l.error("Usage of the CFG has been disabled for this explorer.")
45            self.cfg = None
46
47        if self.cfg is not None:
48            avoid = static_avoid or set()
49
50            # we need the find addresses to be determined statically
51            if not static_find:
52                l.error("You must provide at least one 'find' address as a number, set, list, or tuple if you provide a CFG.")
53                l.error("Usage of the CFG has been disabled for this explorer.")
54                self.cfg = None
55                return
56
57            for a in avoid:
58                if cfg.model.get_any_node(a) is None:
59                    l.warning("'Avoid' address %#x not present in CFG...", a)
60
61            # not a queue but a stack... it's just a worklist!
62            queue = []
63            for f in static_find:
64                nodes = cfg.model.get_all_nodes(f)
65                if len(nodes) == 0:
66                    l.warning("'Find' address %#x not present in CFG...", f)
67                else:
68                    queue.extend(nodes)
69
70            seen_nodes = set()
71            while len(queue) > 0:
72                n = queue.pop()
73                if id(n) in seen_nodes:
74                    continue
75                if n.addr in avoid:
76                    continue
77                self.ok_blocks.add(n.addr)
78                seen_nodes.add(id(n))
79                queue.extend(n.predecessors)
80
81            if len(self.ok_blocks) == 0:
82                l.error("No addresses could be validated by the provided CFG!")
83                l.error("Usage of the CFG has been disabled for this explorer.")
84                self.cfg = None
85                return
86
87            l.warning("Please be sure that the CFG you have passed in is complete.")
88            l.warning("Providing an incomplete CFG can cause viable paths to be discarded!")
89
90    def setup(self, simgr):
91        if not self.find_stash in simgr.stashes: simgr.stashes[self.find_stash] = []
92        if not self.avoid_stash in simgr.stashes: simgr.stashes[self.avoid_stash] = []
93
94    def step(self, simgr, stash='active', **kwargs):
95        base_extra_stop_points = set(kwargs.pop("extra_stop_points", []))
96        return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs)
97
98    def _classify(self, addr, findable, avoidable):
99        if self.avoid_priority:
100            if avoidable and (avoidable is True or addr in avoidable):
101                return self.avoid_stash
102            elif findable and (findable is True or addr in findable):
103                return self.find_stash
104        else:
105            if findable and (findable is True or addr in findable):
106                return self.find_stash
107            elif avoidable and (avoidable is True or addr in avoidable):
108                return self.avoid_stash
109        return None
110
111    # make it more natural to deal with the intended dataflow
112    def filter(self, simgr, state, **kwargs):
113        stash = self._filter_inner(state)
114        if stash is None:
115            return simgr.filter(state, **kwargs)
116        return stash
117
118    def _filter_inner(self, state):
119        if self._unknown_stop_points and sim_options.UNICORN in state.options and not self._warned_unicorn:
120            l.warning("Using unicorn with find/avoid conditions that are a lambda (not a number, set, tuple or list)")
121            l.warning("Unicorn may step over states that match the condition (find or avoid) without stopping.")
122            self._warned_unicorn = True
123
124        findable = self.find(state)
125        avoidable = self.avoid(state)
126
127        if not findable and not avoidable:
128            if self.cfg is not None and self.cfg.model.get_any_node(state.addr) is not None:
129                if state.addr not in self.ok_blocks:
130                    return self.avoid_stash
131            return None
132
133        stash = self._classify(state.addr, findable, avoidable)
134        if stash is not None:
135            return stash
136
137        return None
138
139    def complete(self, simgr):
140        return len(simgr.stashes[self.find_stash]) >= self.num_find
141