1from collections import defaultdict
2
3import claripy
4import functools
5
6import logging
7l = logging.getLogger(name=__name__)
8
9from ..errors import AngrError, SimError, SimUnsupportedError, SimCCallError
10from .. import sim_options
11from ..engines.successors import SimSuccessors
12
13exc_list = (AngrError, SimError, claripy.ClaripyError, TypeError, ValueError, ArithmeticError, MemoryError)
14
15from . import ExplorationTechnique
16
17
18class Oppologist(ExplorationTechnique):
19    """
20    The Oppologist is an exploration technique that forces uncooperative code through qemu.
21    """
22
23    def __init__(self):
24        ExplorationTechnique.__init__(self)
25
26    @staticmethod
27    def _restore_state(old, new):
28        new.release_plugin('unicorn')
29        new.register_plugin('unicorn', old.unicorn.copy())
30        new.options = old.options.copy()
31
32    def _oppologize(self, simgr, state, pn, **kwargs):
33        l.debug("... pn: %s", pn)
34
35        pn.options.add(sim_options.UNICORN)
36        pn.options.add(sim_options.UNICORN_AGGRESSIVE_CONCRETIZATION)
37        pn.unicorn.max_steps = 1
38        pn.unicorn.countdown_symbolic_stop = 0
39        pn.unicorn.countdown_unsupported_stop = 0
40        pn.unicorn.countdown_nonunicorn_blocks = 0
41        pn.unicorn.countdown_stop_point = 0
42        ss = simgr.successors(pn, throw=True, **kwargs)
43
44        fixup = functools.partial(self._restore_state, state)
45
46        l.debug("... successors: %s", ss)
47        for s in ss.flat_successors + ss.unconstrained_successors + ss.unsat_successors + ss.successors:
48            fixup(s)
49
50        return ss
51
52    @staticmethod
53    def _combine_results(*results):
54        all_results = defaultdict(list)
55
56        final = SimSuccessors(results[0].addr, results[0].initial_state)
57        final.description = 'Oppology'
58        final.sort = 'Oppologist'
59
60        for med in results:
61            final.processed = True
62            final.successors.extend(med.successors)
63            final.all_successors.extend(med.all_successors)
64            final.flat_successors.extend(med.flat_successors)
65            final.unsat_successors.extend(med.unsat_successors)
66            final.unconstrained_successors.extend(med.unsat_successors)
67
68        return final
69
70    def _delayed_oppology(self, simgr, state, e, **kwargs):
71        ss = simgr.successors(state, num_inst=e.executed_instruction_count, **kwargs)
72        need_oppologizing = [ s for s in ss.flat_successors if s.addr == e.ins_addr ]
73        ss.flat_successors = [ s for s in ss.flat_successors if s.addr != e.ins_addr ]
74        results = [ss]
75
76        results.extend(map(functools.partial(self._oppologize, simgr, state, **kwargs), need_oppologizing))
77        return self._combine_results(*results)
78
79    def successors(self, simgr, state, **kwargs):
80        try:
81            kwargs.pop('throw', None)
82            return simgr.successors(state, **kwargs)
83
84        except (SimUnsupportedError, SimCCallError) as e:
85            l.debug("Errored on path %s after %d instructions", state, e.executed_instruction_count)
86            try:
87                if e.executed_instruction_count:
88                    return self._delayed_oppology(simgr, state, e, **kwargs)
89                else:
90                    return self._oppologize(simgr, state, state.copy(), **kwargs)
91            except exc_list: #pylint:disable=broad-except
92                l.error("Oppologizer hit an error while trying to perform repairs", exc_info=True)
93                raise e
94        except Exception: #pylint:disable=broad-except
95            l.error("Original block hit an unsupported error", exc_info=True)
96            raise
97