1import nose
2import angr
3import pickle
4import re
5from angr import options as so
6from nose.plugins.attrib import attr
7import gc
8import os
9
10test_location = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..')
11
12
13
14def _remove_addr_from_trace_item(trace_item_str):
15    m = re.match(r"(<\S+ \S+) from 0x[0-9a-f]+(:[\s\S]+)", trace_item_str)
16    if m is None:
17        return None
18    return m.group(1) + m.group(2)
19
20def _compare_trace(trace, expected):
21
22    nose.tools.assert_equal(len(trace), len(expected))
23
24    for trace_item, expected_str in zip(trace, expected):
25        trace_item_str = str(trace_item)
26        if trace_item_str.startswith('<SimProcedure'):
27            # we do not care if addresses of SimProcedures match, since they are not allocated in a deterministic way
28            trace_item_str = _remove_addr_from_trace_item(trace_item_str)
29            expected_str = _remove_addr_from_trace_item(expected_str)
30
31        nose.tools.assert_equal(trace_item_str, expected_str)
32
33def test_stops():
34    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'uc_stop'))
35
36    # test STOP_NORMAL, STOP_STOPPOINT
37    s_normal = p.factory.entry_state(args=['a'], add_options=so.unicorn)
38    s_normal.unicorn.max_steps = 100
39    pg_normal = p.factory.simulation_manager(s_normal).run()
40    p_normal = pg_normal.one_deadended
41    _compare_trace(p_normal.history.descriptions, ['<Unicorn (STOP_STOPPOINT after 4 steps) from 0x8048340: 1 sat>', '<SimProcedure __libc_start_main from 0x8119990: 1 sat>', '<Unicorn (STOP_STOPPOINT after 14 steps) from 0x8048650: 1 sat>', '<SimProcedure __libc_start_main from 0x8400044: 1 sat>', '<Unicorn (STOP_NORMAL after 100 steps) from 0x80485b5: 1 sat>', '<Unicorn (STOP_STOPPOINT after 12 steps) from 0x804846f: 1 sat>', '<SimProcedure __libc_start_main from 0x8400048: 1 sat>'])
42
43    s_normal_angr = p.factory.entry_state(args=['a'])
44    pg_normal_angr = p.factory.simulation_manager(s_normal_angr).run()
45    p_normal_angr = pg_normal_angr.one_deadended
46    nose.tools.assert_equal(p_normal_angr.history.bbl_addrs.hardcopy, p_normal.history.bbl_addrs.hardcopy)
47
48    # test STOP_STOPPOINT on an address that is not a basic block start
49    s_stoppoints = p.factory.call_state(p.loader.find_symbol("main").rebased_addr, 1, [], add_options=so.unicorn)
50
51    # this address is right before/after the bb for the stop_normal() function ends
52    # we should not stop there, since that code is never hit
53    stop_fake = [0x0804847c, 0x08048454]
54
55    # this is an address inside main that is not the beginning of a basic block. we should stop here
56    stop_in_bb = 0x08048638
57    stop_bb = 0x08048633 # basic block of the above address
58    pg_stoppoints = p.factory.simulation_manager(s_stoppoints).run(n=1, extra_stop_points=stop_fake + [stop_in_bb])
59    nose.tools.assert_equal(len(pg_stoppoints.active), 1) # path should not branch
60    p_stoppoints = pg_stoppoints.one_active
61    nose.tools.assert_equal(p_stoppoints.addr, stop_bb) # should stop at bb before stop_in_bb
62    _compare_trace(p_stoppoints.history.descriptions, ['<Unicorn (STOP_STOPPOINT after 111 steps) from 0x80485b5: 1 sat>'])
63
64    # test STOP_SYMBOLIC_READ_SYMBOLIC_TRACKING_DISABLED
65    s_symbolic_read_tracking_disabled = p.factory.entry_state(args=['a', 'a'], add_options=so.unicorn, remove_options={so.UNICORN_SYM_REGS_SUPPORT})
66    pg_symbolic_read_tracking_disabled = p.factory.simulation_manager(s_symbolic_read_tracking_disabled).run()
67    p_symbolic_read_tracking_disabled = pg_symbolic_read_tracking_disabled.one_deadended
68    _compare_trace(p_symbolic_read_tracking_disabled.history.descriptions, ['<Unicorn (STOP_STOPPOINT after 4 steps) from 0x8048340: 1 sat>', '<SimProcedure __libc_start_main from 0x8119990: 1 sat>', '<Unicorn (STOP_STOPPOINT after 14 steps) from 0x8048650: 1 sat>', '<SimProcedure __libc_start_main from 0x8400044: 1 sat>', '<Unicorn (STOP_SYMBOLIC_READ_SYMBOLIC_TRACKING_DISABLED after 7 steps) from 0x80485b5: 1 sat>', '<IRSB from 0x804848a: 1 sat 3 unsat>', '<Unicorn (STOP_STOPPOINT after 3 steps) from 0x80484bb: 1 sat>', '<SimProcedure __libc_start_main from 0x8400048: 1 sat>'])
69
70    s_symbolic_read_tracking_disabled_angr = p.factory.entry_state(args=['a', 'a'])
71    pg_symbolic_read_tracking_disabled_angr = p.factory.simulation_manager(s_symbolic_read_tracking_disabled_angr).run()
72    p_symbolic_read_tracking_disabled_angr = pg_symbolic_read_tracking_disabled_angr.one_deadended
73    nose.tools.assert_equal(p_symbolic_read_tracking_disabled_angr.history.bbl_addrs.hardcopy, p_symbolic_read_tracking_disabled.history.bbl_addrs.hardcopy)
74
75    # test STOP_SEGFAULT
76    s_segfault = p.factory.entry_state(args=['a', 'a', 'a', 'a', 'a', 'a', 'a'], add_options=so.unicorn | {so.STRICT_PAGE_ACCESS, so.ENABLE_NX})
77    pg_segfault = p.factory.simulation_manager(s_segfault).run()
78    p_segfault = pg_segfault.errored[0].state
79    # TODO: fix the permissions segfault to commit if it's a MEM_FETCH
80    # this will extend the last simunicorn one more block
81    _compare_trace(p_segfault.history.descriptions, ['<Unicorn (STOP_STOPPOINT after 4 steps) from 0x8048340: 1 sat>', '<SimProcedure __libc_start_main from 0x8119990: 1 sat>', '<Unicorn (STOP_STOPPOINT after 14 steps) from 0x8048650: 1 sat>', '<SimProcedure __libc_start_main from 0x8400044: 1 sat>', '<Unicorn (STOP_SEGFAULT after 7 steps) from 0x80485b5: 1 sat>', '<IRSB from 0x8048508: 1 sat>'])
82
83    s_segfault_angr = p.factory.entry_state(args=['a', 'a', 'a', 'a', 'a', 'a', 'a'], add_options={so.STRICT_PAGE_ACCESS, so.ENABLE_NX})
84    pg_segfault_angr = p.factory.simulation_manager(s_segfault_angr).run()
85    p_segfault_angr = pg_segfault_angr.errored[0].state
86    nose.tools.assert_equal(p_segfault_angr.history.bbl_addrs.hardcopy, p_segfault.history.bbl_addrs.hardcopy)
87    nose.tools.assert_equal(pg_segfault_angr.errored[0].error.addr, pg_segfault.errored[0].error.addr)
88
89    # test STOP_SYMBOLIC_BLOCK_EXIT
90    s_symbolic_exit = p.factory.entry_state(args=['a'] * 10, add_options=so.unicorn)
91    pg_symbolic_exit = p.factory.simulation_manager(s_symbolic_exit).run()
92    p_symbolic_exit = pg_symbolic_exit.one_deadended
93    _compare_trace(p_symbolic_exit.history.descriptions, ['<Unicorn (STOP_STOPPOINT after 4 steps) from 0x8048340: 1 sat>', '<SimProcedure __libc_start_main from 0x8119990: 1 sat>', '<Unicorn (STOP_STOPPOINT after 14 steps) from 0x8048650: 1 sat>', '<SimProcedure __libc_start_main from 0x8400044: 1 sat>', '<Unicorn (STOP_SYMBOLIC_BLOCK_EXIT_CONDITION after 7 steps) from 0x80485b5: 1 sat>', '<IRSB from 0x804855d: 2 sat 1 unsat>', '<Unicorn (STOP_STOPPOINT after 4 steps) from 0x8048587: 1 sat>', '<SimProcedure __libc_start_main from 0x8400048: 1 sat>'])
94
95    s_symbolic_exit_angr = p.factory.entry_state(args=['a'] * 10)
96    pg_symbolic_exit_angr = p.factory.simulation_manager(s_symbolic_exit_angr).run()
97    p_symbolic_exit_angr = pg_symbolic_exit_angr.one_deadended
98    nose.tools.assert_equal(p_symbolic_exit_angr.history.bbl_addrs.hardcopy, p_symbolic_exit.history.bbl_addrs.hardcopy)
99
100def run_longinit(arch):
101    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', arch, 'longinit'))
102    s_unicorn = p.factory.entry_state(add_options=so.unicorn, remove_options={so.SHORT_READS})
103    pg = p.factory.simulation_manager(s_unicorn, save_unconstrained=True, save_unsat=True)
104    pg.explore()
105    s = pg.deadended[0]
106    (first, _), (second, _) = s.posix.stdin.content
107    s.add_constraints(first == s.solver.BVV(b'A'*9))
108    s.add_constraints(second == s.solver.BVV(b'B'*9))
109    nose.tools.assert_equal(s.posix.dumps(1), b"You entered AAAAAAAAA and BBBBBBBBB!\n")
110
111def test_longinit_i386():
112    run_longinit('i386')
113def test_longinit_x86_64():
114    run_longinit('x86_64')
115
116def test_fauxware_arm():
117    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'armel', 'fauxware'))
118    s_unicorn = p.factory.entry_state(add_options=so.unicorn) # unicorn
119    pg = p.factory.simulation_manager(s_unicorn)
120    pg.explore()
121    assert all("Unicorn" in ''.join(p.history.descriptions.hardcopy) for p in pg.deadended)
122    nose.tools.assert_equal(sorted(pg.mp_deadended.posix.dumps(1).mp_items), sorted((
123        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n',
124        b'Username: \nPassword: \nGo away!',
125        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n'
126    )))
127
128
129def test_fauxware():
130    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'fauxware'))
131    s_unicorn = p.factory.entry_state(add_options=so.unicorn) # unicorn
132    pg = p.factory.simulation_manager(s_unicorn)
133    pg.explore()
134
135    assert all("Unicorn" in ''.join(p.history.descriptions.hardcopy) for p in pg.deadended)
136    nose.tools.assert_equal(sorted(pg.mp_deadended.posix.dumps(1).mp_items), sorted((
137        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n',
138        b'Username: \nPassword: \nGo away!',
139        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n'
140    )))
141
142def test_fauxware_aggressive():
143    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'fauxware'))
144    s_unicorn = p.factory.entry_state(
145        add_options=so.unicorn | { so.UNICORN_AGGRESSIVE_CONCRETIZATION },
146        remove_options={ so.LAZY_SOLVES }
147    ) # unicorn
148    s_unicorn.unicorn.cooldown_symbolic_stop = 2
149    s_unicorn.unicorn.cooldown_unsupported_stop = 2
150    s_unicorn.unicorn.cooldown_nonunicorn_blocks = 0
151
152    pg = p.factory.simulation_manager(s_unicorn)
153    pg.explore()
154
155    nose.tools.assert_equal(len(pg.deadended), 1)
156
157def run_similarity(binpath, depth, prehook=None):
158    b = angr.Project(os.path.join(test_location, binpath))
159    cc = b.analyses.CongruencyCheck(throw=True)
160    cc.set_state_options(
161        left_add_options=so.unicorn,
162        left_remove_options={so.LAZY_SOLVES, so.TRACK_MEMORY_MAPPING, so.COMPOSITE_SOLVER},
163        right_add_options={so.INITIALIZE_ZERO_REGISTERS},
164        right_remove_options={so.LAZY_SOLVES, so.TRACK_MEMORY_MAPPING, so.COMPOSITE_SOLVER}
165    )
166    if prehook:
167        cc.simgr = prehook(cc.simgr)
168    cc.run(depth=depth)
169
170@attr(speed='slow')
171def test_similarity_fauxware():
172    def cooldown(pg):
173        # gotta skip the initializers because of cpuid and RDTSC
174        pg.one_left.unicorn.countdown_nonunicorn_blocks = 39
175        return pg
176    run_similarity(os.path.join("binaries", "tests", "i386", "fauxware"), 1000, prehook=cooldown)
177
178def test_fp():
179    with open(os.path.join(test_location, 'binaries', 'tests_src', 'manyfloatsum.c')) as fp:
180        type_cache = angr.sim_type.parse_defns(fp.read())
181    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'manyfloatsum'))
182
183    for function in ('sum_floats', 'sum_combo', 'sum_segregated', 'sum_doubles', 'sum_combo_doubles', 'sum_segregated_doubles'):
184        cc = p.factory.cc(func_ty=type_cache[function])
185        args = list(range(len(cc.func_ty.args)))
186        answer = float(sum(args))
187        addr = p.loader.find_symbol(function).rebased_addr
188        my_callable = p.factory.callable(addr, cc=cc)
189        my_callable.set_base_state(p.factory.blank_state(add_options=so.unicorn))
190        result = my_callable(*args)
191        nose.tools.assert_false(result.symbolic)
192        result_concrete = result.args[0]
193        nose.tools.assert_equal(answer, result_concrete)
194
195def test_unicorn_pickle():
196    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'fauxware'))
197
198    def _uni_state():
199        # try pickling out paths that went through unicorn
200        s_unicorn = p.factory.entry_state(add_options=so.unicorn)
201        s_unicorn.unicorn.countdown_nonunicorn_blocks = 0
202        s_unicorn.unicorn.countdown_symbolic_stop = 0
203        s_unicorn.unicorn.cooldown_nonunicorn_blocks = 0
204        s_unicorn.unicorn.cooldown_symbolic_stop = 2
205        return s_unicorn
206
207    pg = p.factory.simulation_manager(_uni_state())
208    pg.one_active.options.update(so.unicorn)
209    pg.run(until=lambda lpg: "Unicorn" in lpg.one_active.history.recent_description)
210    assert len(pg.active) > 0
211
212    pgp = pickle.dumps(pg, -1)
213    del pg
214    gc.collect()
215    pg2 = pickle.loads(pgp)
216    pg2.explore()
217
218    nose.tools.assert_equal(sorted(pg2.mp_deadended.posix.dumps(1).mp_items), sorted((
219        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n',
220        b'Username: \nPassword: \nGo away!',
221        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n'
222    )))
223
224    # test the pickling of SimUnicorn itself
225    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'fauxware'))
226    pg = p.factory.simulation_manager(_uni_state())
227    pg.run(n=2)
228    assert p.factory.successors(pg.one_active).sort == 'Unicorn'
229
230    pgp = pickle.dumps(pg, -1)
231    del pg
232    gc.collect()
233    pg2 = pickle.loads(pgp)
234    pg2.explore()
235
236    nose.tools.assert_equal(sorted(pg2.mp_deadended.posix.dumps(1).mp_items), sorted((
237        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n',
238        b'Username: \nPassword: \nGo away!',
239        b'Username: \nPassword: \nWelcome to the admin console, trusted user!\n'
240    )))
241
242def test_concrete_transmits():
243    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'cgc', 'PIZZA_00001'))
244    inp = bytes.fromhex("320a310a0100000005000000330a330a340a")
245
246    s_unicorn = p.factory.entry_state(add_options=so.unicorn | {so.CGC_NO_SYMBOLIC_RECEIVE_LENGTH}, stdin=inp, flag_page=b'\0'*4096)
247    pg_unicorn = p.factory.simulation_manager(s_unicorn)
248    pg_unicorn.run(n=10)
249
250    nose.tools.assert_equal(pg_unicorn.one_active.posix.dumps(1), b'1) Add number to the array\n2) Add random number to the array\n3) Sum numbers\n4) Exit\nRandomness added\n1) Add number to the array\n2) Add random number to the array\n3) Sum numbers\n4) Exit\n  Index: \n1) Add number to the array\n2) Add random number to the array\n3) Sum numbers\n4) Exit\n')
251
252def test_inspect():
253    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'uc_stop'))
254
255    def main_state(argc, add_options=None):
256        add_options = add_options or so.unicorn
257        main_addr = p.loader.find_symbol("main").rebased_addr
258        return p.factory.call_state(main_addr, argc, [], add_options=add_options)
259
260    # test breaking on specific addresses
261    s_break_addr = main_state(1)
262    addr0 = 0x08048479 # at the beginning of a basic block, at end of stop_normal function
263    addr1 = 0x080485d0 # this is at the beginning of main, in the middle of a basic block
264    addr2 = 0x08048461 # another non-bb address, at the start of stop_normal
265    addr3 = 0x0804847c # address of a block that should not get hit (stop_symbolc function)
266    addr4 = 0x08048632 # another address that shouldn't get hit, near end of main
267    hits = { addr0 : 0, addr1: 0, addr2: 0, addr3: 0, addr4: 0 }
268
269    def create_addr_action(addr):
270        def action(_state):
271            hits[addr] += 1
272        return action
273
274    for addr in [addr0, addr1, addr2]:
275        s_break_addr.inspect.b("instruction", instruction=addr, action=create_addr_action(addr))
276
277    pg_instruction = p.factory.simulation_manager(s_break_addr)
278    pg_instruction.run()
279    nose.tools.assert_equal(hits[addr0], 1)
280    nose.tools.assert_equal(hits[addr1], 1)
281    nose.tools.assert_equal(hits[addr2], 1)
282    nose.tools.assert_equal(hits[addr3], 0)
283    nose.tools.assert_equal(hits[addr4], 0)
284
285    # test breaking on every instruction
286    def collect_trace(options):
287        s_break_every = main_state(1, add_options=options)
288        trace = []
289        def action_every(state):
290            trace.append(state.addr)
291        s_break_every.inspect.b("instruction", action=action_every)
292        pg_break_every = p.factory.simulation_manager(s_break_every)
293        pg_break_every.run()
294    nose.tools.assert_equal(collect_trace(so.unicorn), collect_trace(set()))
295
296def test_explore():
297    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'uc_stop'))
298
299    def main_state(argc, add_options=None):
300        add_options = add_options or so.unicorn
301        main_addr = p.loader.find_symbol("main").rebased_addr
302        return p.factory.call_state(main_addr, argc, [], add_options=add_options)
303
304    addr = 0x08048479
305    s_explore = main_state(1)
306    pg_explore_find = p.factory.simulation_manager(s_explore)
307    pg_explore_find.explore(find=addr)
308    nose.tools.assert_equal(len(pg_explore_find.found), 1)
309    nose.tools.assert_equal(pg_explore_find.found[0].addr, addr)
310
311    pg_explore_avoid = p.factory.simulation_manager(s_explore)
312    pg_explore_avoid.explore(avoid=addr)
313    nose.tools.assert_equal(len(pg_explore_avoid.avoid), 1)
314    nose.tools.assert_equal(pg_explore_avoid.avoid[0].addr, addr)
315
316
317def test_single_step():
318    p = angr.Project(os.path.join(test_location, 'binaries', 'tests', 'i386', 'uc_stop'))
319
320
321    def main_state(argc, add_options=None):
322        add_options = add_options or so.unicorn
323        main_addr = p.loader.find_symbol("main").rebased_addr
324        return p.factory.call_state(main_addr, argc, [], add_options=add_options)
325
326    s_main = main_state(1)
327
328    step1 = s_main.block().instruction_addrs[1]
329    successors1 = s_main.step(num_inst=1).successors
330    nose.tools.assert_equal(len(successors1), 1)
331    nose.tools.assert_equal(successors1[0].addr, step1)
332
333    step5 = s_main.block().instruction_addrs[5]
334    successors2 = successors1[0].step(num_inst=4).successors
335    nose.tools.assert_equal(len(successors2), 1)
336    nose.tools.assert_equal(successors2[0].addr, step5)
337
338if __name__ == '__main__':
339    import logging
340    logging.getLogger('angr.state_plugins.unicorn_engine').setLevel('DEBUG')
341    logging.getLogger('angr.engines.unicorn_engine').setLevel('INFO')
342    logging.getLogger('angr.factory').setLevel('DEBUG')
343    logging.getLogger('angr.project').setLevel('DEBUG')
344    #logging.getLogger('claripy.backends.backend_z3').setLevel('DEBUG')
345
346    import sys
347    if len(sys.argv) > 1:
348        for arg in sys.argv[1:]:
349            print('test_' + arg)
350            res = globals()['test_' + arg]()
351            if hasattr(res, '__iter__'):
352                for ft in res:
353                    fo = ft[0]
354                    fa = ft[1:]
355                    print('...', fa)
356                    fo(*fa)
357    else:
358        for fk, fv in list(globals().items()):
359            if fk.startswith('test_') and callable(fv):
360                print(fk)
361                res = fv()
362                if hasattr(res, '__iter__'):
363                    for ft in res:
364                        fo = ft[0]
365                        fa = ft[1:]
366                        print('...', fa)
367                        fo(*fa)