1from . import ExplorationTechnique 2from .common import condition_to_lambda 3from .. import sim_options 4 5import logging 6l = logging.getLogger(name=__name__) 7 8class Explorer(ExplorationTechnique): 9 """ 10 Search for up to "num_find" paths that satisfy condition "find", avoiding condition "avoid". Stashes found paths into "find_stash' and avoided paths into "avoid_stash". 11 12 The "find" and "avoid" parameters may be any of: 13 14 - An address to find 15 - A set or list of addresses to find 16 - A function that takes a path and returns whether or not it matches. 17 18 If an angr CFG is passed in as the "cfg" parameter and "find" is either a number or a list or a set, then 19 any paths which cannot possibly reach a success state without going through a failure state will be 20 preemptively avoided. 21 22 If either the "find" or "avoid" parameter is a function returning a boolean, and a path triggers both conditions, it will be added to the find stash, unless "avoid_priority" is set to True. 23 """ 24 def __init__(self, find=None, avoid=None, find_stash='found', avoid_stash='avoid', cfg=None, num_find=1, avoid_priority=False): 25 super(Explorer, self).__init__() 26 self.find, static_find = condition_to_lambda(find) 27 self.avoid, static_avoid = condition_to_lambda(avoid) 28 self.find_stash = find_stash 29 self.avoid_stash = avoid_stash 30 self.cfg = cfg 31 self.ok_blocks = set() 32 self.num_find = num_find 33 self.avoid_priority = avoid_priority 34 35 # even if avoid or find addresses are not statically known, stop on those that we do know 36 self._extra_stop_points = (static_find or set()) | (static_avoid or set()) 37 self._unknown_stop_points = static_find is None or static_avoid is None 38 self._warned_unicorn = False 39 40 # TODO: This is a hack for while CFGFast doesn't handle procedure continuations 41 from .. import analyses 42 if isinstance(cfg, analyses.CFGFast): 43 l.error("CFGFast is currently inappropriate for use with Explorer.") 44 l.error("Usage of the CFG has been disabled for this explorer.") 45 self.cfg = None 46 47 if self.cfg is not None: 48 avoid = static_avoid or set() 49 50 # we need the find addresses to be determined statically 51 if not static_find: 52 l.error("You must provide at least one 'find' address as a number, set, list, or tuple if you provide a CFG.") 53 l.error("Usage of the CFG has been disabled for this explorer.") 54 self.cfg = None 55 return 56 57 for a in avoid: 58 if cfg.model.get_any_node(a) is None: 59 l.warning("'Avoid' address %#x not present in CFG...", a) 60 61 # not a queue but a stack... it's just a worklist! 62 queue = [] 63 for f in static_find: 64 nodes = cfg.model.get_all_nodes(f) 65 if len(nodes) == 0: 66 l.warning("'Find' address %#x not present in CFG...", f) 67 else: 68 queue.extend(nodes) 69 70 seen_nodes = set() 71 while len(queue) > 0: 72 n = queue.pop() 73 if id(n) in seen_nodes: 74 continue 75 if n.addr in avoid: 76 continue 77 self.ok_blocks.add(n.addr) 78 seen_nodes.add(id(n)) 79 queue.extend(n.predecessors) 80 81 if len(self.ok_blocks) == 0: 82 l.error("No addresses could be validated by the provided CFG!") 83 l.error("Usage of the CFG has been disabled for this explorer.") 84 self.cfg = None 85 return 86 87 l.warning("Please be sure that the CFG you have passed in is complete.") 88 l.warning("Providing an incomplete CFG can cause viable paths to be discarded!") 89 90 def setup(self, simgr): 91 if not self.find_stash in simgr.stashes: simgr.stashes[self.find_stash] = [] 92 if not self.avoid_stash in simgr.stashes: simgr.stashes[self.avoid_stash] = [] 93 94 def step(self, simgr, stash='active', **kwargs): 95 base_extra_stop_points = set(kwargs.pop("extra_stop_points", [])) 96 return simgr.step(stash=stash, extra_stop_points=base_extra_stop_points | self._extra_stop_points, **kwargs) 97 98 def _classify(self, addr, findable, avoidable): 99 if self.avoid_priority: 100 if avoidable and (avoidable is True or addr in avoidable): 101 return self.avoid_stash 102 elif findable and (findable is True or addr in findable): 103 return self.find_stash 104 else: 105 if findable and (findable is True or addr in findable): 106 return self.find_stash 107 elif avoidable and (avoidable is True or addr in avoidable): 108 return self.avoid_stash 109 return None 110 111 # make it more natural to deal with the intended dataflow 112 def filter(self, simgr, state, **kwargs): 113 stash = self._filter_inner(state) 114 if stash is None: 115 return simgr.filter(state, **kwargs) 116 return stash 117 118 def _filter_inner(self, state): 119 if self._unknown_stop_points and sim_options.UNICORN in state.options and not self._warned_unicorn: 120 l.warning("Using unicorn with find/avoid conditions that are a lambda (not a number, set, tuple or list)") 121 l.warning("Unicorn may step over states that match the condition (find or avoid) without stopping.") 122 self._warned_unicorn = True 123 124 findable = self.find(state) 125 avoidable = self.avoid(state) 126 127 if not findable and not avoidable: 128 if self.cfg is not None and self.cfg.model.get_any_node(state.addr) is not None: 129 if state.addr not in self.ok_blocks: 130 return self.avoid_stash 131 return None 132 133 stash = self._classify(state.addr, findable, avoidable) 134 if stash is not None: 135 return stash 136 137 return None 138 139 def complete(self, simgr): 140 return len(simgr.stashes[self.find_stash]) >= self.num_find 141