1import cle
2import pickle
3import nose
4import gc
5import os
6
7import claripy
8import angr
9from angr import SimState
10
11
12binaries_base = os.path.join(os.path.dirname(os.path.realpath(__file__)), '..', '..', 'binaries')
13
14
15def test_state():
16    s = SimState(arch='AMD64')
17    s.registers.store('sp', 0x7ffffffffff0000)
18    nose.tools.assert_equal(s.solver.eval(s.registers.load('sp')), 0x7ffffffffff0000)
19
20    s.stack_push(s.solver.BVV(b"ABCDEFGH"))
21    nose.tools.assert_equal(s.solver.eval(s.registers.load('sp')), 0x7fffffffffefff8)
22    s.stack_push(s.solver.BVV(b"IJKLMNOP"))
23    nose.tools.assert_equal(s.solver.eval(s.registers.load('sp')), 0x7fffffffffefff0)
24
25    a = s.stack_pop()
26    nose.tools.assert_equal(s.solver.eval(s.registers.load('sp')), 0x7fffffffffefff8)
27    nose.tools.assert_equal(s.solver.eval(a, cast_to=bytes), b"IJKLMNOP")
28
29    b = s.stack_pop()
30    nose.tools.assert_equal(s.solver.eval(s.registers.load('sp')), 0x7ffffffffff0000)
31    nose.tools.assert_equal(s.solver.eval(b, cast_to=bytes), b"ABCDEFGH")
32
33#@nose.tools.timed(10)
34def test_state_merge():
35    a = SimState(arch='AMD64', mode='symbolic')
36    a.memory.store(1, a.solver.BVV(42, 8))
37
38    b = a.copy()
39    c = b.copy()
40    a.memory.store(2, a.memory.load(1, 1)+1)
41    b.memory.store(2, b.memory.load(1, 1)*2)
42    c.memory.store(2, c.memory.load(1, 1)/2)
43
44    # make sure the byte at 1 is right
45    nose.tools.assert_equal(a.solver.eval(a.memory.load(1, 1)), 42)
46    nose.tools.assert_equal(b.solver.eval(b.memory.load(1, 1)), 42)
47    nose.tools.assert_equal(c.solver.eval(c.memory.load(1, 1)), 42)
48
49    # make sure the byte at 2 is right
50    nose.tools.assert_equal(a.solver.eval(a.memory.load(2, 1)), 43)
51    nose.tools.assert_equal(b.solver.eval(b.memory.load(2, 1)), 84)
52    nose.tools.assert_equal(c.solver.eval(c.memory.load(2, 1)), 21)
53
54    # the byte at 2 should be unique for all before the merge
55    nose.tools.assert_true(a.solver.unique(a.memory.load(2, 1)))
56    nose.tools.assert_true(b.solver.unique(b.memory.load(2, 1)))
57    nose.tools.assert_true(c.solver.unique(c.memory.load(2, 1)))
58
59    #logging.getLogger('angr.state_plugins.symbolic_memory').setLevel(logging.DEBUG)
60    m, merge_conditions, merging_occurred = a.merge(b, c)
61    #logging.getLogger('angr.state_plugins.symbolic_memory').setLevel(logging.WARNING)
62
63    nose.tools.assert_true(merging_occurred)
64    #nose.tools.assert_equal(sorted(m.solver.eval_upto(merge_flag, 10)), [ 0,1,2 ])
65    assert len(merge_conditions) == 3
66
67    # the byte at 2 should now *not* be unique for a
68    nose.tools.assert_false(m.solver.unique(m.memory.load(2, 1)))
69    nose.tools.assert_true(a.solver.unique(a.memory.load(2, 1)))
70    nose.tools.assert_true(b.solver.unique(b.memory.load(2, 1)))
71    nose.tools.assert_true(c.solver.unique(c.memory.load(2, 1)))
72
73    # the byte at 2 should have the three values
74    nose.tools.assert_sequence_equal(sorted(m.solver.eval_upto(m.memory.load(2, 1), 10)), (21, 43, 84))
75
76    # we should be able to select them by adding constraints
77    a_a = m.copy()
78    a_a.add_constraints(merge_conditions[0])
79    nose.tools.assert_true(a_a.solver.unique(a_a.memory.load(2, 1)))
80    nose.tools.assert_equal(a_a.solver.eval(a_a.memory.load(2, 1)), 43)
81
82    a_b = m.copy()
83    a_b.add_constraints(merge_conditions[1])
84    nose.tools.assert_true(a_b.solver.unique(a_b.memory.load(2, 1)))
85    nose.tools.assert_equal(a_b.solver.eval(a_b.memory.load(2, 1)), 84)
86
87    a_c = m.copy()
88    a_c.add_constraints(merge_conditions[2])
89    nose.tools.assert_true(a_c.solver.unique(a_c.memory.load(2, 1)))
90    nose.tools.assert_equal(a_c.solver.eval(a_c.memory.load(2, 1)), 21)
91
92    # test different sets of plugins
93    a = SimState(arch='AMD64', mode='symbolic')
94    nose.tools.assert_true(a.has_plugin('memory'))
95    nose.tools.assert_true(a.has_plugin('registers'))
96    nose.tools.assert_false(a.has_plugin('libc'))
97
98    b = a.copy()
99    a.get_plugin('libc')
100    nose.tools.assert_true(a.has_plugin('libc'))
101    nose.tools.assert_false(b.has_plugin('libc'))
102    c = a.copy().merge(b.copy())[0]
103    d = b.copy().merge(a.copy())[0]
104    nose.tools.assert_true(c.has_plugin('libc'))
105    nose.tools.assert_true(d.has_plugin('libc'))
106
107    # test merging posix with different open files (illegal!)
108    a = SimState(arch='AMD64', mode='symbolic')
109    b = a.copy()
110    a.posix.open(b'/tmp/idk', 1)
111    nose.tools.assert_raises(angr.errors.SimMergeError, lambda: a.copy().merge(b.copy()))
112
113def test_state_merge_static():
114    # With abstract memory
115    # Aligned memory merging
116    a = SimState(arch='AMD64', mode='static')
117
118    addr = a.solver.ValueSet(32, 'global', 0, 8)
119    a.memory.store(addr, a.solver.BVV(42, 32))
120    # Clear a_locs, so further writes will not try to merge with value 42
121    a.memory._regions['global']._alocs = { }
122
123    b = a.copy()
124    c = a.copy()
125    a.memory.store(addr, a.solver.BVV(50, 32), endness='Iend_LE')
126    b.memory.store(addr, a.solver.BVV(60, 32), endness='Iend_LE')
127    c.memory.store(addr, a.solver.BVV(70, 32), endness='Iend_LE')
128
129    merged, _, _ = a.merge(b, c)
130    actual = claripy.backends.vsa.convert(merged.memory.load(addr, 4, endness='Iend_LE'))
131    expected = claripy.backends.vsa.convert(a.solver.SI(bits=32, stride=10, lower_bound=50, upper_bound=70))
132    nose.tools.assert_true(actual.identical(expected))
133
134
135def test_state_merge_3way():
136
137    a = SimState(arch='AMD64', mode='symbolic')
138    b = a.copy()
139    c = a.copy()
140    conds = [ a.solver.BoolS('cond_0'), a.solver.BoolS('cond_1') ]
141    a.add_constraints(conds[0])
142    b.add_constraints(a.solver.Not(conds[0]), conds[1])
143    c.add_constraints(a.solver.Not(conds[0]), a.solver.Not(conds[1]))
144
145    a.memory.store(0x400000, a.solver.BVV(8, 32))
146    b.memory.store(0x400000, b.solver.BVV(9, 32))
147    c.memory.store(0x400000, c.solver.BVV(10, 32))
148
149    m, _, _ = a.merge(b)
150    m, _, _ = m.merge(c)
151
152    assert m.satisfiable(extra_constraints=(m.memory.load(0x400000, 4) == 8,))
153    assert m.satisfiable(extra_constraints=(m.memory.load(0x400000, 4) == 9,))
154    assert m.satisfiable(extra_constraints=(m.memory.load(0x400000, 4) == 10,))
155
156
157def test_state_merge_optimal_nostrongrefstate():
158
159    # We do not specify the state option EFFICIENT_STATE_MERGING, and as a result, state histories do not store strong
160    # references to states. This will result in less efficient state merging since SimStateHistory will be the only
161    # state plugin that knows the common ancestor of all instances to merge. But it should still succeed.
162
163    binary_path = os.path.join(binaries_base, "tests", "x86_64", "state_merge_0")
164    p = angr.Project(binary_path, auto_load_libs=False)
165    sm = p.factory.simulation_manager()
166
167    sm.explore(find=0x400616, num_find=3)
168
169    var_addr = 0x601044
170
171    sm.merge(stash='found')
172    s = sm.one_found
173    culprit = s.mem[var_addr].dword.resolved
174
175    for i in range(8, 11):
176        assert i, s.solver.satisfiable(extra_constraints=(culprit == i,))
177
178    assert not s.solver.satisfiable(extra_constraints=(culprit == 12, ))
179
180
181def test_state_merge_optimal():
182
183    # Unlike the above test case, EFFICIENT_STATE_MERGING is enabled here
184
185    binary_path = os.path.join(binaries_base, "tests", "x86_64", "state_merge_0")
186    p = angr.Project(binary_path, auto_load_libs=False)
187    state = p.factory.blank_state(add_options={angr.sim_options.EFFICIENT_STATE_MERGING})
188    sm = p.factory.simulation_manager(state)
189
190    sm.explore(find=0x400616, num_find=3)
191
192    var_addr = 0x601044
193
194    sm.merge(stash='found')
195    s = sm.one_found
196    culprit = s.mem[var_addr].dword.resolved
197
198    for i in range(8, 11):
199        assert i, s.solver.satisfiable(extra_constraints=(culprit == i,))
200
201    assert not s.solver.satisfiable(extra_constraints=(culprit == 12, ))
202
203
204
205def test_state_pickle():
206    s = SimState(arch="AMD64")
207    s.memory.store(100, s.solver.BVV(0x4141414241414241424300, 88), endness='Iend_BE')
208    s.regs.rax = 100
209
210    sp = pickle.dumps(s)
211    del s
212    gc.collect()
213    s = pickle.loads(sp)
214    nose.tools.assert_equal(s.solver.eval(s.memory.load(100, 10), cast_to=bytes), b"AAABAABABC")
215
216def test_global_condition():
217    s = SimState(arch="AMD64")
218
219    s.regs.rax = 10
220    old_rax = s.regs.rax
221    with s.with_condition(False):
222        nose.tools.assert_false(s.solver.satisfiable())
223        s.regs.rax = 20
224    nose.tools.assert_is(s._global_condition, None)
225    nose.tools.assert_is(old_rax, s.regs.rax)
226
227    with s.with_condition(True):
228        s.regs.rax = 20
229    nose.tools.assert_is(s._global_condition, None)
230    nose.tools.assert_is_not(old_rax, s.regs.rax)
231    nose.tools.assert_is(s.solver.BVV(20, s.arch.bits), s.regs.rax)
232
233    with s.with_condition(s.regs.rbx != 0):
234        s.regs.rax = 25
235    nose.tools.assert_is(s._global_condition, None)
236    nose.tools.assert_is_not(s.solver.BVV(25, s.arch.bits), s.regs.rax)
237
238    with s.with_condition(s.regs.rbx != 1):
239        s.regs.rax = 30
240    nose.tools.assert_is(s._global_condition, None)
241    nose.tools.assert_is_not(s.solver.BVV(30, s.arch.bits), s.regs.rax)
242
243    with s.with_condition(s.regs.rbx == 0):
244        nose.tools.assert_equal(s.solver.eval_upto(s.regs.rbx, 10), [ 0 ])
245        nose.tools.assert_sequence_equal(s.solver.eval_upto(s.regs.rax, 10), [ 30 ])
246    with s.with_condition(s.regs.rbx == 1):
247        nose.tools.assert_equal(s.solver.eval_upto(s.regs.rbx, 10), [ 1 ])
248        nose.tools.assert_sequence_equal(s.solver.eval_upto(s.regs.rax, 10), [ 25 ])
249
250
251def test_successors_catch_arbitrary_interrupts():
252
253    # int 0xd2 should fail on x86/amd64 since it's an unsupported interrupt
254    block_bytes = b"\xcd\xd2"
255
256    proj = angr.load_shellcode(block_bytes, "amd64")
257    proj.loader.tls = cle.backends.tls.ELFThreadManager(proj.loader, proj.arch)
258    proj.simos = angr.simos.SimLinux(proj)
259    proj.simos.configure_project()
260    state = proj.factory.blank_state(addr=0)
261    simgr = proj.factory.simgr(state)
262
263    simgr.step()
264
265    nose.tools.assert_equal(len(simgr.errored), 0, msg="The state should not go to the errored stash. Is "
266                                                       "AngrSyscallError handled in SimSuccessors?")
267    nose.tools.assert_equal(len(simgr.unsat), 1)
268
269
270def test_bypass_errored_irstmt():
271
272    # fild [esp+4]  will fail when ftop is unspecified
273    # BYPASS_ERRORED_IRSTMT will suppress it
274
275    block_bytes = b"\xdb\x44\x24\x04"
276
277    proj = angr.load_shellcode(block_bytes, "x86")
278    state = proj.factory.blank_state(addr=0, mode="fastpath", cle_memory_backer=proj.loader.memory,
279                                     add_options={angr.sim_options.FAST_REGISTERS},
280                                     remove_options={angr.sim_options.BYPASS_ERRORED_IRSTMT})
281
282    # destroy esp
283    state.regs._esp = state.solver.BVS("unknown_rsp", 32)
284    state.regs._ftop = state.solver.BVS("unknown_ftop", 32)
285
286    # there should be one errored state if we step the state further without BYPASS_ERRORED_IRSTMT
287    simgr = proj.factory.simgr(state)
288    simgr.step()
289    nose.tools.assert_equal(len(simgr.errored), 1)
290    nose.tools.assert_equal(str(simgr.errored[0].error), "address not supported", msg="Does SimFastMemory support "
291                                                                                      "reading from a symbolic address?")
292
293    # try it with BYPASS_ERRORED_IRSTMT
294    state.options.add(angr.sim_options.BYPASS_ERRORED_IRSTMT)
295    simgr = proj.factory.simgr(state)
296    simgr.step()
297
298    nose.tools.assert_equal(len(simgr.errored), 0)
299    nose.tools.assert_equal(len(simgr.active), 1)
300
301
302if __name__ == '__main__':
303    test_state()
304    test_state_merge()
305    test_state_merge_3way()
306    test_state_merge_optimal()
307    test_state_merge_optimal_nostrongrefstate()
308    test_state_merge_static()
309    test_state_pickle()
310    test_global_condition()
311    test_successors_catch_arbitrary_interrupts()
312    test_bypass_errored_irstmt()
313