1 2import logging 3import math 4import types 5from collections import deque, defaultdict 6 7import networkx 8from . import Analysis 9 10from ..errors import SimEngineError, SimMemoryError 11 12# todo include an explanation of the algorithm 13# todo include a method that detects any change other than constants 14# todo use function names / string references where available 15 16l = logging.getLogger(name=__name__) 17 18# basic block changes 19DIFF_TYPE = "type" 20DIFF_VALUE = "value" 21 22 23# exception for trying find basic block changes 24class UnmatchedStatementsException(Exception): 25 pass 26 27 28# statement difference classes 29class Difference(object): 30 def __init__(self, diff_type, value_a, value_b): 31 self.type = diff_type 32 self.value_a = value_a 33 self.value_b = value_b 34 35 36class ConstantChange(object): 37 def __init__(self, offset, value_a, value_b): 38 self.offset = offset 39 self.value_a = value_a 40 self.value_b = value_b 41 42 43# helper methods 44def _euclidean_dist(vector_a, vector_b): 45 """ 46 :param vector_a: A list of numbers. 47 :param vector_b: A list of numbers. 48 :returns: The euclidean distance between the two vectors. 49 """ 50 dist = 0 51 for (x, y) in zip(vector_a, vector_b): 52 dist += (x-y)*(x-y) 53 return math.sqrt(dist) 54 55 56def _get_closest_matches(input_attributes, target_attributes): 57 """ 58 :param input_attributes: First dictionary of objects to attribute tuples. 59 :param target_attributes: Second dictionary of blocks to attribute tuples. 60 :returns: A dictionary of objects in the input_attributes to the closest objects in the 61 target_attributes. 62 """ 63 closest_matches = {} 64 65 # for each object in the first set find the objects with the closest target attributes 66 for a in input_attributes: 67 best_dist = float('inf') 68 best_matches = [] 69 for b in target_attributes: 70 dist = _euclidean_dist(input_attributes[a], target_attributes[b]) 71 if dist < best_dist: 72 best_matches = [b] 73 best_dist = dist 74 elif dist == best_dist: 75 best_matches.append(b) 76 closest_matches[a] = best_matches 77 78 return closest_matches 79 80 81# from http://rosettacode.org/wiki/Levenshtein_distance 82def _levenshtein_distance(s1, s2): 83 """ 84 :param s1: A list or string 85 :param s2: Another list or string 86 :returns: The levenshtein distance between the two 87 """ 88 if len(s1) > len(s2): 89 s1, s2 = s2, s1 90 distances = range(len(s1) + 1) 91 for index2, num2 in enumerate(s2): 92 new_distances = [index2 + 1] 93 for index1, num1 in enumerate(s1): 94 if num1 == num2: 95 new_distances.append(distances[index1]) 96 else: 97 new_distances.append(1 + min((distances[index1], 98 distances[index1+1], 99 new_distances[-1]))) 100 distances = new_distances 101 return distances[-1] 102 103 104def _normalized_levenshtein_distance(s1, s2, acceptable_differences): 105 """ 106 This function calculates the levenshtein distance but allows for elements in the lists to be different by any number 107 in the set acceptable_differences. 108 109 :param s1: A list. 110 :param s2: Another list. 111 :param acceptable_differences: A set of numbers. If (s2[i]-s1[i]) is in the set then they are considered equal. 112 :returns: 113 """ 114 if len(s1) > len(s2): 115 s1, s2 = s2, s1 116 acceptable_differences = set(-i for i in acceptable_differences) 117 distances = range(len(s1) + 1) 118 for index2, num2 in enumerate(s2): 119 new_distances = [index2 + 1] 120 for index1, num1 in enumerate(s1): 121 if num2 - num1 in acceptable_differences: 122 new_distances.append(distances[index1]) 123 else: 124 new_distances.append(1 + min((distances[index1], 125 distances[index1+1], 126 new_distances[-1]))) 127 distances = new_distances 128 return distances[-1] 129 130 131def _is_better_match(x, y, matched_a, matched_b, attributes_dict_a, attributes_dict_b): 132 """ 133 :param x: The first element of a possible match. 134 :param y: The second element of a possible match. 135 :param matched_a: The current matches for the first set. 136 :param matched_b: The current matches for the second set. 137 :param attributes_dict_a: The attributes for each element in the first set. 138 :param attributes_dict_b: The attributes for each element in the second set. 139 :returns: True/False 140 """ 141 attributes_x = attributes_dict_a[x] 142 attributes_y = attributes_dict_b[y] 143 if x in matched_a: 144 attributes_match = attributes_dict_b[matched_a[x]] 145 if _euclidean_dist(attributes_x, attributes_y) >= _euclidean_dist(attributes_x, attributes_match): 146 return False 147 if y in matched_b: 148 attributes_match = attributes_dict_a[matched_b[y]] 149 if _euclidean_dist(attributes_x, attributes_y) >= _euclidean_dist(attributes_y, attributes_match): 150 return False 151 return True 152 153 154def differing_constants(block_a, block_b): 155 """ 156 Compares two basic blocks and finds all the constants that differ from the first block to the second. 157 158 :param block_a: The first block to compare. 159 :param block_b: The second block to compare. 160 :returns: Returns a list of differing constants in the form of ConstantChange, which has the offset in the 161 block and the respective constants. 162 """ 163 statements_a = [s for s in block_a.vex.statements if s.tag != "Ist_IMark"] + [block_a.vex.next] 164 statements_b = [s for s in block_b.vex.statements if s.tag != "Ist_IMark"] + [block_b.vex.next] 165 if len(statements_a) != len(statements_b): 166 raise UnmatchedStatementsException("Blocks have different numbers of statements") 167 168 start_1 = min(block_a.instruction_addrs) 169 start_2 = min(block_b.instruction_addrs) 170 171 changes = [] 172 173 # check statements 174 current_offset = None 175 for statement, statement_2 in zip(statements_a, statements_b): 176 # sanity check 177 if statement.tag != statement_2.tag: 178 raise UnmatchedStatementsException("Statement tag has changed") 179 180 if statement.tag == "Ist_IMark": 181 if statement.addr - start_1 != statement_2.addr - start_2: 182 raise UnmatchedStatementsException("Instruction length has changed") 183 current_offset = statement.addr - start_1 184 continue 185 186 differences = compare_statement_dict(statement, statement_2) 187 for d in differences: 188 if d.type != DIFF_VALUE: 189 raise UnmatchedStatementsException("Instruction has changed") 190 else: 191 changes.append(ConstantChange(current_offset, d.value_a, d.value_b)) 192 193 return changes 194 195 196def compare_statement_dict(statement_1, statement_2): 197 # should return whether or not the statement's type/effects changed 198 # need to return the specific number that changed too 199 200 if type(statement_1) != type(statement_2): 201 return [Difference(DIFF_TYPE, None, None)] 202 203 # None 204 if statement_1 is None and statement_2 is None: 205 return [] 206 207 # constants 208 if isinstance(statement_1, (int, float, str, bytes)): 209 if isinstance(statement_1, float) and math.isnan(statement_1) and math.isnan(statement_2): 210 return [] 211 elif statement_1 == statement_2: 212 return [] 213 else: 214 return [Difference(None, statement_1, statement_2)] 215 216 # tuples/lists 217 if isinstance(statement_1, (tuple, list)): 218 if len(statement_1) != len(statement_2): 219 return Difference(DIFF_TYPE, None, None) 220 221 differences = [] 222 for s1, s2 in zip(statement_1, statement_2): 223 differences += compare_statement_dict(s1, s2) 224 return differences 225 226 # Yan's weird types 227 differences = [] 228 for attr in statement_1.__slots__: 229 # don't check arch, property, or methods 230 if attr == "arch": 231 continue 232 if hasattr(statement_1.__class__, attr) and isinstance(getattr(statement_1.__class__, attr), property): 233 continue 234 if isinstance(getattr(statement_1, attr), types.MethodType): 235 continue 236 237 new_diffs = compare_statement_dict(getattr(statement_1, attr), getattr(statement_2, attr)) 238 # set the difference types 239 for diff in new_diffs: 240 if diff.type is None: 241 diff.type = attr 242 differences += new_diffs 243 244 return differences 245 246 247class NormalizedBlock(object): 248 # block may span multiple calls 249 def __init__(self, block, function): 250 addresses = [block.addr] 251 if block.addr in function.merged_blocks: 252 for a in function.merged_blocks[block.addr]: 253 addresses.append(a.addr) 254 255 self.addr = block.addr 256 self.addresses = addresses 257 self.statements = [] 258 self.all_constants = [] 259 self.operations = [] 260 self.call_targets = [] 261 self.blocks = [] 262 self.instruction_addrs = [] 263 264 if block.addr in function.call_sites: 265 self.call_targets = function.call_sites[block.addr] 266 267 self.jumpkind = None 268 269 for a in addresses: 270 block = function.project.factory.block(a) 271 self.instruction_addrs += block.instruction_addrs 272 irsb = block.vex 273 self.blocks.append(block) 274 self.statements += irsb.statements 275 self.all_constants += irsb.all_constants 276 self.operations += irsb.operations 277 self.jumpkind = irsb.jumpkind 278 279 self.size = sum([b.size for b in self.blocks]) 280 281 def __repr__(self): 282 size = sum([b.size for b in self.blocks]) 283 return '<Normalized Block for %#x, %d bytes>' % (self.addr, size) 284 285 286class NormalizedFunction(object): 287 # a more normalized function 288 def __init__(self, function): 289 # start by copying the graph 290 self.graph = function.graph.copy() 291 self.project = function._function_manager._kb._project 292 self.call_sites = dict() 293 self.startpoint = function.startpoint 294 self.merged_blocks = dict() 295 self.orig_function = function 296 297 # find nodes which end in call and combine them 298 done = False 299 while not done: 300 done = True 301 for node in self.graph.nodes(): 302 try: 303 bl = self.project.factory.block(node.addr) 304 except (SimMemoryError, SimEngineError): 305 continue 306 307 # merge if it ends with a single call, and the successor has only one predecessor and succ is after 308 successors = list(self.graph.successors(node)) 309 if bl.vex.jumpkind == "Ijk_Call" and len(successors) == 1 and \ 310 len(list(self.graph.predecessors(successors[0]))) == 1 and successors[0].addr > node.addr: 311 # add edges to the successors of its successor, and delete the original successors 312 succ = list(self.graph.successors(node))[0] 313 for s in self.graph.successors(succ): 314 self.graph.add_edge(node, s) 315 self.graph.remove_node(succ) 316 done = False 317 318 # add to merged blocks 319 if node not in self.merged_blocks: 320 self.merged_blocks[node] = [] 321 self.merged_blocks[node].append(succ) 322 if succ in self.merged_blocks: 323 self.merged_blocks[node] += self.merged_blocks[succ] 324 del self.merged_blocks[succ] 325 326 # stop iterating and start over 327 break 328 329 # set up call sites 330 for n in self.graph.nodes(): 331 call_targets = [] 332 if n.addr in self.orig_function.get_call_sites(): 333 call_targets.append(self.orig_function.get_call_target(n.addr)) 334 if n.addr in self.merged_blocks: 335 for block in self.merged_blocks[n]: 336 if block.addr in self.orig_function.get_call_sites(): 337 call_targets.append(self.orig_function.get_call_target(block.addr)) 338 if len(call_targets) > 0: 339 self.call_sites[n] = call_targets 340 341 342class FunctionDiff(object): 343 """ 344 This class computes the a diff between two functions. 345 """ 346 def __init__(self, function_a, function_b, bindiff=None): 347 """ 348 :param function_a: The first angr Function object to diff. 349 :param function_b: The second angr Function object. 350 :param bindiff: An optional Bindiff object. Used for some extra normalization during basic block comparison. 351 """ 352 self._function_a = NormalizedFunction(function_a) 353 self._function_b = NormalizedFunction(function_b) 354 self._project_a = self._function_a.project 355 self._project_b = self._function_b.project 356 self._bindiff = bindiff 357 358 self._attributes_a = dict() 359 self._attributes_a = dict() 360 361 self._block_matches = set() 362 self._unmatched_blocks_from_a = set() 363 self._unmatched_blocks_from_b = set() 364 365 self._compute_diff() 366 367 @property 368 def probably_identical(self): 369 """ 370 :returns: Whether or not these two functions are identical. 371 """ 372 if len(self._unmatched_blocks_from_a | self._unmatched_blocks_from_b) > 0: 373 return False 374 for (a, b) in self._block_matches: 375 if not self.blocks_probably_identical(a, b): 376 return False 377 return True 378 379 @property 380 def identical_blocks(self): 381 """ 382 :returns: A list of block matches which appear to be identical 383 """ 384 identical_blocks = [] 385 for (block_a, block_b) in self._block_matches: 386 if self.blocks_probably_identical(block_a, block_b): 387 identical_blocks.append((block_a, block_b)) 388 return identical_blocks 389 390 @property 391 def differing_blocks(self): 392 """ 393 :returns: A list of block matches which appear to differ 394 """ 395 differing_blocks = [] 396 for (block_a, block_b) in self._block_matches: 397 if not self.blocks_probably_identical(block_a, block_b): 398 differing_blocks.append((block_a, block_b)) 399 return differing_blocks 400 401 @property 402 def blocks_with_differing_constants(self): 403 """ 404 :return: A list of block matches which appear to differ 405 """ 406 differing_blocks = [] 407 diffs = dict() 408 for (block_a, block_b) in self._block_matches: 409 if self.blocks_probably_identical(block_a, block_b) and \ 410 not self.blocks_probably_identical(block_a, block_b, check_constants=True): 411 differing_blocks.append((block_a, block_b)) 412 for block_a, block_b in differing_blocks: 413 ba = NormalizedBlock(block_a, self._function_a) 414 bb = NormalizedBlock(block_b, self._function_b) 415 diffs[(block_a, block_b)] = FunctionDiff._block_diff_constants(ba, bb) 416 return diffs 417 418 @property 419 def block_matches(self): 420 return self._block_matches 421 422 @property 423 def unmatched_blocks(self): 424 return self._unmatched_blocks_from_a, self._unmatched_blocks_from_b 425 426 @staticmethod 427 def get_normalized_block(addr, function): 428 """ 429 :param addr: Where to start the normalized block. 430 :param function: A function containing the block address. 431 :returns: A normalized basic block. 432 """ 433 return NormalizedBlock(addr, function) 434 435 def block_similarity(self, block_a, block_b): 436 """ 437 :param block_a: The first block address. 438 :param block_b: The second block address. 439 :returns: The similarity of the basic blocks, normalized for the base address of the block and function 440 call addresses. 441 """ 442 443 # handle sim procedure blocks 444 if self._project_a.is_hooked(block_a) and self._project_b.is_hooked(block_b): 445 if self._project_a._sim_procedures[block_a] == self._project_b._sim_procedures[block_b]: 446 return 1.0 447 else: 448 return 0.0 449 450 try: 451 block_a = NormalizedBlock(block_a, self._function_a) 452 except (SimMemoryError, SimEngineError): 453 block_a = None 454 455 try: 456 block_b = NormalizedBlock(block_b, self._function_b) 457 except (SimMemoryError, SimEngineError): 458 block_b = None 459 460 # if both were None then they are assumed to be the same, if only one was the same they are assumed to differ 461 if block_a is None and block_b is None: 462 return 1.0 463 elif block_a is None or block_b is None: 464 return 0.0 465 466 # get all elements for computing similarity 467 tags_a = [s.tag for s in block_a.statements] 468 tags_b = [s.tag for s in block_b.statements] 469 consts_a = [c.value for c in block_a.all_constants] 470 consts_b = [c.value for c in block_b.all_constants] 471 all_registers_a = [s.offset for s in block_a.statements if hasattr(s, "offset")] 472 all_registers_b = [s.offset for s in block_b.statements if hasattr(s, "offset")] 473 jumpkind_a = block_a.jumpkind 474 jumpkind_b = block_b.jumpkind 475 476 # compute total distance 477 total_dist = 0 478 total_dist += _levenshtein_distance(tags_a, tags_b) 479 total_dist += _levenshtein_distance(block_a.operations, block_b.operations) 480 total_dist += _levenshtein_distance(all_registers_a, all_registers_b) 481 acceptable_differences = self._get_acceptable_constant_differences(block_a, block_b) 482 total_dist += _normalized_levenshtein_distance(consts_a, consts_b, acceptable_differences) 483 total_dist += 0 if jumpkind_a == jumpkind_b else 1 484 485 # compute similarity 486 num_values = max(len(tags_a), len(tags_b)) 487 num_values += max(len(consts_a), len(consts_b)) 488 num_values += max(len(block_a.operations), len(block_b.operations)) 489 num_values += 1 # jumpkind 490 similarity = 1 - (float(total_dist) / num_values) 491 492 return similarity 493 494 def blocks_probably_identical(self, block_a, block_b, check_constants=False): 495 """ 496 :param block_a: The first block address. 497 :param block_b: The second block address. 498 :param check_constants: Whether or not to require matching constants in blocks. 499 :returns: Whether or not the blocks appear to be identical. 500 """ 501 # handle sim procedure blocks 502 if self._project_a.is_hooked(block_a) and self._project_b.is_hooked(block_b): 503 return self._project_a._sim_procedures[block_a] == self._project_b._sim_procedures[block_b] 504 505 try: 506 block_a = NormalizedBlock(block_a, self._function_a) 507 except (SimMemoryError, SimEngineError): 508 block_a = None 509 510 try: 511 block_b = NormalizedBlock(block_b, self._function_b) 512 except (SimMemoryError, SimEngineError): 513 block_b = None 514 515 # if both were None then they are assumed to be the same, if only one was None they are assumed to differ 516 if block_a is None and block_b is None: 517 return True 518 elif block_a is None or block_b is None: 519 return False 520 521 # if they represent a different number of blocks they are not the same 522 if len(block_a.blocks) != len(block_b.blocks): 523 return False 524 525 # check differing constants 526 try: 527 diff_constants = FunctionDiff._block_diff_constants(block_a, block_b) 528 except UnmatchedStatementsException: 529 return False 530 531 if not check_constants: 532 return True 533 534 # get values of differences that probably indicate no change 535 acceptable_differences = self._get_acceptable_constant_differences(block_a, block_b) 536 537 # todo match globals 538 for c in diff_constants: 539 if (c.value_a, c.value_b) in self._block_matches: 540 # constants point to matched basic blocks 541 continue 542 if self._bindiff is not None and (c.value_a and c.value_b) in self._bindiff.function_matches: 543 # constants point to matched functions 544 continue 545 # if both are in the binary we'll assume it's okay, although we should really match globals 546 # TODO use global matches 547 if self._project_a.loader.main_object.contains_addr(c.value_a) and \ 548 self._project_b.loader.main_object.contains_addr(c.value_b): 549 continue 550 # if the difference is equal to the difference in block addr's or successor addr's we'll say it's also okay 551 if c.value_b - c.value_a in acceptable_differences: 552 continue 553 # otherwise they probably are different 554 return False 555 556 # the blocks appear to be identical 557 return True 558 559 @staticmethod 560 def _block_diff_constants(block_a, block_b): 561 diff_constants = [] 562 for irsb_a, irsb_b in zip(block_a.blocks, block_b.blocks): 563 diff_constants += differing_constants(irsb_a, irsb_b) 564 return diff_constants 565 566 @staticmethod 567 def _compute_block_attributes(function): 568 """ 569 :param function: A normalized function object. 570 :returns: A dictionary of basic block addresses to tuples of attributes. 571 """ 572 # The attributes we use are the distance form function start, distance from function exit and whether 573 # or not it has a subfunction call 574 distances_from_start = FunctionDiff._distances_from_function_start(function) 575 distances_from_exit = FunctionDiff._distances_from_function_exit(function) 576 call_sites = function.call_sites 577 578 attributes = {} 579 for block in function.graph.nodes(): 580 if block in call_sites: 581 number_of_subfunction_calls = len(call_sites[block]) 582 else: 583 number_of_subfunction_calls = 0 584 # there really shouldn't be blocks that can't be reached from the start, but there are for now 585 dist_start = distances_from_start[block] if block in distances_from_start else 10000 586 dist_exit = distances_from_exit[block] if block in distances_from_exit else 10000 587 588 attributes[block] = (dist_start, dist_exit, number_of_subfunction_calls) 589 590 return attributes 591 592 @staticmethod 593 def _distances_from_function_start(function): 594 """ 595 :param function: A normalized Function object. 596 :returns: A dictionary of basic block addresses and their distance to the start of the function. 597 """ 598 return networkx.single_source_shortest_path_length(function.graph, 599 function.startpoint) 600 601 @staticmethod 602 def _distances_from_function_exit(function): 603 """ 604 :param function: A normalized Function object. 605 :returns: A dictionary of basic block addresses and their distance to the exit of the function. 606 """ 607 reverse_graph = function.graph.reverse() 608 # we aren't guaranteed to have an exit from the function so explicitly add the node 609 reverse_graph.add_node("start") 610 found_exits = False 611 for n in function.graph.nodes(): 612 if len(list(function.graph.successors(n))) == 0: 613 reverse_graph.add_edge("start", n) 614 found_exits = True 615 616 # if there were no exits (a function with a while 1) let's consider the block with the highest address to 617 # be the exit. This isn't the most scientific way, but since this case is pretty rare it should be okay 618 if not found_exits: 619 last = max(function.graph.nodes(), key=lambda x:x.addr) 620 reverse_graph.add_edge("start", last) 621 622 dists = networkx.single_source_shortest_path_length(reverse_graph, "start") 623 624 # remove temp node 625 del dists["start"] 626 627 # correct for the added node 628 for n in dists: 629 dists[n] -= 1 630 631 return dists 632 633 def _compute_diff(self): 634 """ 635 Computes the diff of the functions and saves the result. 636 """ 637 # get the attributes for all blocks 638 l.debug("Computing diff of functions: %s, %s", 639 ("%#x" % self._function_a.startpoint.addr) if self._function_a.startpoint is not None else "None", 640 ("%#x" % self._function_b.startpoint.addr) if self._function_b.startpoint is not None else "None" 641 ) 642 self.attributes_a = self._compute_block_attributes(self._function_a) 643 self.attributes_b = self._compute_block_attributes(self._function_b) 644 645 # get the initial matches 646 initial_matches = self._get_block_matches(self.attributes_a, self.attributes_b, 647 tiebreak_with_block_similarity=False) 648 649 # Use a queue so we process matches in the order that they are found 650 to_process = deque(initial_matches) 651 652 # Keep track of which matches we've already added to the queue 653 processed_matches = set((x, y) for (x, y) in initial_matches) 654 655 # Keep a dict of current matches, which will be updated if better matches are found 656 matched_a = dict() 657 matched_b = dict() 658 for (x, y) in processed_matches: 659 matched_a[x] = y 660 matched_b[y] = x 661 662 # while queue is not empty 663 while to_process: 664 (block_a, block_b) = to_process.pop() 665 l.debug("FunctionDiff: Processing (%#x, %#x)", block_a.addr, block_b.addr) 666 667 # we could find new matches in the successors or predecessors of functions 668 block_a_succ = list(self._function_a.graph.successors(block_a)) 669 block_b_succ = list(self._function_b.graph.successors(block_b)) 670 block_a_pred = list(self._function_a.graph.predecessors(block_a)) 671 block_b_pred = list(self._function_b.graph.predecessors(block_b)) 672 673 # propagate the difference in blocks as delta 674 delta = tuple((i-j) for i, j in zip(self.attributes_b[block_b], self.attributes_a[block_a])) 675 676 # get possible new matches 677 new_matches = [] 678 679 # if the blocks are identical then the successors should most likely be matched in the same order 680 if self.blocks_probably_identical(block_a, block_b) and len(block_a_succ) == len(block_b_succ): 681 ordered_succ_a = self._get_ordered_successors(self._project_a, block_a, block_a_succ) 682 ordered_succ_b = self._get_ordered_successors(self._project_b, block_b, block_b_succ) 683 new_matches.extend(zip(ordered_succ_a, ordered_succ_b)) 684 685 new_matches += self._get_block_matches(self.attributes_a, self.attributes_b, block_a_succ, block_b_succ, 686 delta, tiebreak_with_block_similarity=True) 687 new_matches += self._get_block_matches(self.attributes_a, self.attributes_b, block_a_pred, block_b_pred, 688 delta, tiebreak_with_block_similarity=True) 689 690 # for each of the possible new matches add it if it improves the matching 691 for (x, y) in new_matches: 692 if (x, y) not in processed_matches: 693 processed_matches.add((x, y)) 694 l.debug("FunctionDiff: checking if (%#x, %#x) is better", x.addr, y.addr) 695 # if it's a better match than what we already have use it 696 if _is_better_match(x, y, matched_a, matched_b, self.attributes_a, self.attributes_b): 697 l.debug("FunctionDiff: adding possible match (%#x, %#x)", x.addr, y.addr) 698 if x in matched_a: 699 old_match = matched_a[x] 700 del matched_b[old_match] 701 if y in matched_b: 702 old_match = matched_b[y] 703 del matched_a[old_match] 704 matched_a[x] = y 705 matched_b[y] = x 706 to_process.appendleft((x, y)) 707 708 # reformat matches into a set of pairs 709 self._block_matches = set((x, y) for (x, y) in matched_a.items()) 710 711 # get the unmatched blocks 712 self._unmatched_blocks_from_a = set(x for x in self._function_a.graph.nodes() if x not in matched_a) 713 self._unmatched_blocks_from_b = set(x for x in self._function_b.graph.nodes() if x not in matched_b) 714 715 @staticmethod 716 def _get_ordered_successors(project, block, succ): 717 try: 718 # add them in order of the vex 719 addr = block.addr 720 succ = set(succ) 721 ordered_succ = [] 722 bl = project.factory.block(addr) 723 for x in bl.vex.all_constants: 724 if x in succ: 725 ordered_succ.append(x) 726 727 # add the rest (sorting might be better than no order) 728 for s in sorted(succ - set(ordered_succ), key=lambda x:x.addr): 729 ordered_succ.append(s) 730 return ordered_succ 731 except (SimMemoryError, SimEngineError): 732 return sorted(succ, key=lambda x:x.addr) 733 734 def _get_block_matches(self, attributes_a, attributes_b, filter_set_a=None, filter_set_b=None, delta=(0, 0, 0), 735 tiebreak_with_block_similarity=False): 736 """ 737 :param attributes_a: A dict of blocks to their attributes 738 :param attributes_b: A dict of blocks to their attributes 739 740 The following parameters are optional. 741 742 :param filter_set_a: A set to limit attributes_a to the blocks in this set. 743 :param filter_set_b: A set to limit attributes_b to the blocks in this set. 744 :param delta: An offset to add to each vector in attributes_a. 745 :returns: A list of tuples of matching objects. 746 """ 747 # get the attributes that are in the sets 748 if filter_set_a is None: 749 filtered_attributes_a = {k: v for k, v in attributes_a.items()} 750 else: 751 filtered_attributes_a = {k: v for k, v in attributes_a.items() if k in filter_set_a} 752 753 if filter_set_b is None: 754 filtered_attributes_b = {k: v for k, v in attributes_b.items()} 755 else: 756 filtered_attributes_b = {k: v for k, v in attributes_b.items() if k in filter_set_b} 757 758 # add delta 759 for k in filtered_attributes_a: 760 filtered_attributes_a[k] = tuple((i+j) for i, j in zip(filtered_attributes_a[k], delta)) 761 for k in filtered_attributes_b: 762 filtered_attributes_b[k] = tuple((i+j) for i, j in zip(filtered_attributes_b[k], delta)) 763 764 # get closest 765 closest_a = _get_closest_matches(filtered_attributes_a, filtered_attributes_b) 766 closest_b = _get_closest_matches(filtered_attributes_b, filtered_attributes_a) 767 768 if tiebreak_with_block_similarity: 769 # use block similarity to break ties in the first set 770 for a in closest_a: 771 if len(closest_a[a]) > 1: 772 best_similarity = 0 773 best = [] 774 for x in closest_a[a]: 775 similarity = self.block_similarity(a, x) 776 if similarity > best_similarity: 777 best_similarity = similarity 778 best = [x] 779 elif similarity == best_similarity: 780 best.append(x) 781 closest_a[a] = best 782 783 # use block similarity to break ties in the second set 784 for b in closest_b: 785 if len(closest_b[b]) > 1: 786 best_similarity = 0 787 best = [] 788 for x in closest_b[b]: 789 similarity = self.block_similarity(x, b) 790 if similarity > best_similarity: 791 best_similarity = similarity 792 best = [x] 793 elif similarity == best_similarity: 794 best.append(x) 795 closest_b[b] = best 796 797 # a match (x,y) is good if x is the closest to y and y is the closest to x 798 matches = [] 799 for a in closest_a: 800 if len(closest_a[a]) == 1: 801 match = closest_a[a][0] 802 if len(closest_b[match]) == 1 and closest_b[match][0] == a: 803 matches.append((a, match)) 804 805 return matches 806 807 def _get_acceptable_constant_differences(self, block_a, block_b): 808 # keep a set of the acceptable differences in constants between the two blocks 809 acceptable_differences = set() 810 acceptable_differences.add(0) 811 812 block_a_base = block_a.instruction_addrs[0] 813 block_b_base = block_b.instruction_addrs[0] 814 acceptable_differences.add(block_b_base - block_a_base) 815 816 # get matching successors 817 for target_a, target_b in zip(block_a.call_targets, block_b.call_targets): 818 # these can be none if we couldn't resolve the call target 819 if target_a is None or target_b is None: 820 continue 821 acceptable_differences.add(target_b - target_a) 822 acceptable_differences.add((target_b - block_b_base) - (target_a - block_a_base)) 823 824 # get the difference between the data segments 825 # this is hackish 826 if ".bss" in self._project_a.loader.main_object.sections_map and \ 827 ".bss" in self._project_b.loader.main_object.sections_map: 828 bss_a = self._project_a.loader.main_object.sections_map[".bss"].min_addr 829 bss_b = self._project_b.loader.main_object.sections_map[".bss"].min_addr 830 acceptable_differences.add(bss_b - bss_a) 831 acceptable_differences.add((bss_b - block_b_base) - (bss_a - block_a_base)) 832 833 return acceptable_differences 834 835 836class BinDiff(Analysis): 837 """ 838 This class computes the a diff between two binaries represented by angr Projects 839 """ 840 def __init__(self, other_project, enable_advanced_backward_slicing=False, cfg_a=None, cfg_b=None): 841 """ 842 :param other_project: The second project to diff 843 """ 844 l.debug("Computing cfg's") 845 846 back_traversal = not enable_advanced_backward_slicing 847 848 if cfg_a is None: 849 #self.cfg_a = self.project.analyses.CFG(resolve_indirect_jumps=True) 850 #self.cfg_b = other_project.analyses.CFG(resolve_indirect_jumps=True) 851 self.cfg_a = self.project.analyses.CFGEmulated(context_sensitivity_level=1, 852 keep_state = True, 853 enable_symbolic_back_traversal = back_traversal, 854 enable_advanced_backward_slicing = enable_advanced_backward_slicing) 855 856 self.cfg_b = other_project.analyses.CFGEmulated(context_sensitivity_level=1, 857 keep_state = True, 858 enable_symbolic_back_traversal = back_traversal, 859 enable_advanced_backward_slicing = enable_advanced_backward_slicing) 860 861 else: 862 self.cfg_a = cfg_a 863 self.cfg_b = cfg_b 864 865 l.debug("Done computing cfg's") 866 867 self._p2 = other_project 868 self._attributes_a = dict() 869 self._attributes_a = dict() 870 871 self._function_diffs = dict() 872 self.function_matches = set() 873 self._unmatched_functions_from_a = set() 874 self._unmatched_functions_from_b = set() 875 876 self._compute_diff() 877 878 def functions_probably_identical(self, func_a_addr, func_b_addr, check_consts=False): 879 """ 880 Compare two functions and return True if they appear identical. 881 882 :param func_a_addr: The address of the first function (in the first binary). 883 :param func_b_addr: The address of the second function (in the second binary). 884 :returns: Whether or not the functions appear to be identical. 885 """ 886 if self.cfg_a.project.is_hooked(func_a_addr) and self.cfg_b.project.is_hooked(func_b_addr): 887 return self.cfg_a.project._sim_procedures[func_a_addr] == self.cfg_b.project._sim_procedures[func_b_addr] 888 889 func_diff = self.get_function_diff(func_a_addr, func_b_addr) 890 if check_consts: 891 return func_diff.probably_identical_with_consts 892 893 return func_diff.probably_identical 894 895 @property 896 def identical_functions(self): 897 """ 898 :returns: A list of function matches that appear to be identical 899 """ 900 identical_funcs = [] 901 for (func_a, func_b) in self.function_matches: 902 if self.functions_probably_identical(func_a, func_b): 903 identical_funcs.append((func_a, func_b)) 904 return identical_funcs 905 906 @property 907 def differing_functions(self): 908 """ 909 :returns: A list of function matches that appear to differ 910 """ 911 different_funcs = [] 912 for (func_a, func_b) in self.function_matches: 913 if not self.functions_probably_identical(func_a, func_b): 914 different_funcs.append((func_a, func_b)) 915 return different_funcs 916 917 def differing_functions_with_consts(self): 918 """ 919 :return: A list of function matches that appear to differ including just by constants 920 """ 921 different_funcs = [] 922 for (func_a, func_b) in self.function_matches: 923 if not self.functions_probably_identical(func_a, func_b, check_consts=True): 924 different_funcs.append((func_a, func_b)) 925 return different_funcs 926 927 @property 928 def differing_blocks(self): 929 """ 930 :returns: A list of block matches that appear to differ 931 """ 932 differing_blocks = [] 933 for (func_a, func_b) in self.function_matches: 934 differing_blocks.extend(self.get_function_diff(func_a, func_b).differing_blocks) 935 return differing_blocks 936 937 @property 938 def identical_blocks(self): 939 """ 940 :return A list of all block matches that appear to be identical 941 """ 942 identical_blocks = [] 943 for (func_a, func_b) in self.function_matches: 944 identical_blocks.extend(self.get_function_diff(func_a, func_b).identical_blocks) 945 return identical_blocks 946 947 @property 948 def blocks_with_differing_constants(self): 949 """ 950 :return: A dict of block matches with differing constants to the tuple of constants 951 """ 952 diffs = dict() 953 for (func_a, func_b) in self.function_matches: 954 diffs.update(self.get_function_diff(func_a, func_b).blocks_with_differing_constants) 955 return diffs 956 957 @property 958 def unmatched_functions(self): 959 return self._unmatched_functions_from_a, self._unmatched_functions_from_b 960 961 # gets the diff of two functions in the binaries 962 def get_function_diff(self, function_addr_a, function_addr_b): 963 """ 964 :param function_addr_a: The address of the first function (in the first binary) 965 :param function_addr_b: The address of the second function (in the second binary) 966 :returns: the FunctionDiff of the two functions 967 """ 968 pair = (function_addr_a, function_addr_b) 969 if pair not in self._function_diffs: 970 function_a = self.cfg_a.kb.functions.function(function_addr_a) 971 function_b = self.cfg_b.kb.functions.function(function_addr_b) 972 self._function_diffs[pair] = FunctionDiff(function_a, function_b, self) 973 return self._function_diffs[pair] 974 975 @staticmethod 976 def _compute_function_attributes(cfg): 977 """ 978 :param cfg: An angr CFG object 979 :returns: a dictionary of function addresses to tuples of attributes 980 """ 981 # the attributes we use are the number of basic blocks, number of edges, and number of subfunction calls 982 attributes = dict() 983 all_funcs = set(cfg.kb.callgraph.nodes()) 984 for function_addr in cfg.kb.functions: 985 # skip syscalls and functions which are None in the cfg 986 if cfg.kb.functions.function(function_addr) is None or cfg.kb.functions.function(function_addr).is_syscall: 987 continue 988 if cfg.kb.functions.function(function_addr) is not None: 989 normalized_funtion = NormalizedFunction(cfg.kb.functions.function(function_addr)) 990 number_of_basic_blocks = len(normalized_funtion.graph.nodes()) 991 number_of_edges = len(normalized_funtion.graph.edges()) 992 else: 993 number_of_basic_blocks = 0 994 number_of_edges = 0 995 if function_addr in all_funcs: 996 number_of_subfunction_calls = len(list(cfg.kb.callgraph.successors(function_addr))) 997 else: 998 number_of_subfunction_calls = 0 999 attributes[function_addr] = (number_of_basic_blocks, number_of_edges, number_of_subfunction_calls) 1000 1001 return attributes 1002 1003 def _get_call_site_matches(self, func_a, func_b): 1004 possible_matches = set() 1005 1006 # Make sure those functions are not SimProcedures 1007 f_a = self.cfg_a.kb.functions.function(func_a) 1008 f_b = self.cfg_b.kb.functions.function(func_b) 1009 if f_a.startpoint is None or f_b.startpoint is None: 1010 return possible_matches 1011 1012 fd = self.get_function_diff(func_a, func_b) 1013 basic_block_matches = fd.block_matches 1014 function_a = fd._function_a 1015 function_b = fd._function_b 1016 for (a, b) in basic_block_matches: 1017 if a in function_a.call_sites and b in function_b.call_sites: 1018 # add them in order 1019 for target_a, target_b in zip(function_a.call_sites[a], function_b.call_sites[b]): 1020 possible_matches.add((target_a, target_b)) 1021 # add them in reverse, since if a new call was added the ordering from each side 1022 # will remain constant until the change 1023 for target_a, target_b in zip(reversed(function_a.call_sites[a]), 1024 reversed(function_b.call_sites[b])): 1025 possible_matches.add((target_a, target_b)) 1026 1027 return possible_matches 1028 1029 def _get_plt_matches(self): 1030 plt_matches = [] 1031 for name, addr in self.project.loader.main_object.plt.items(): 1032 if name in self._p2.loader.main_object.plt: 1033 plt_matches.append((addr, self._p2.loader.main_object.plt[name])) 1034 1035 # in the case of sim procedures the actual sim procedure might be in the interfunction graph, not the plt entry 1036 func_to_addr_a = dict() 1037 func_to_addr_b = dict() 1038 for (k, hook) in self.project._sim_procedures.items(): 1039 if "resolves" in hook.kwargs: 1040 func_to_addr_a[hook.kwargs['resolves']] = k 1041 1042 for (k, hook) in self._p2._sim_procedures.items(): 1043 if "resolves" in hook.kwargs: 1044 func_to_addr_b[hook.kwargs['resolves']] = k 1045 1046 for name, addr in func_to_addr_a.items(): 1047 if name in func_to_addr_b: 1048 plt_matches.append((addr, func_to_addr_b[name])) 1049 1050 # remove ones that aren't in the interfunction graph, because these seem to not be consistent 1051 all_funcs_a = set(self.cfg_a.kb.callgraph.nodes()) 1052 all_funcs_b = set(self.cfg_b.kb.callgraph.nodes()) 1053 plt_matches = [x for x in plt_matches if x[0] in all_funcs_a and x[1] in all_funcs_b] 1054 1055 return plt_matches 1056 1057 def _get_name_matches(self): 1058 names_to_addrs_a = defaultdict(list) 1059 for f in self.cfg_a.functions.values(): 1060 if not f.name.startswith("sub_"): 1061 names_to_addrs_a[f.name].append(f.addr) 1062 1063 names_to_addrs_b = defaultdict(list) 1064 for f in self.cfg_b.functions.values(): 1065 if not f.name.startswith("sub_"): 1066 names_to_addrs_b[f.name].append(f.addr) 1067 1068 name_matches = [] 1069 for name, addrs in names_to_addrs_a.items(): 1070 if name in names_to_addrs_b: 1071 for addr_a, addr_b in zip(addrs, names_to_addrs_b[name]): 1072 # if binary a and binary b have different numbers of functions with the same name, we will see them 1073 # in unmatched functions in the end. 1074 name_matches.append((addr_a, addr_b)) 1075 1076 return name_matches 1077 1078 def _compute_diff(self): 1079 # get the attributes for all functions 1080 self.attributes_a = self._compute_function_attributes(self.cfg_a) 1081 self.attributes_b = self._compute_function_attributes(self.cfg_b) 1082 1083 # get the initial matches 1084 initial_matches = self._get_plt_matches() 1085 initial_matches += self._get_name_matches() 1086 initial_matches += self._get_function_matches(self.attributes_a, self.attributes_b) 1087 for (a, b) in initial_matches: 1088 l.debug("Initially matched (%#x, %#x)", a, b) 1089 1090 # Use a queue so we process matches in the order that they are found 1091 to_process = deque(initial_matches) 1092 1093 # Keep track of which matches we've already added to the queue 1094 processed_matches = set((x, y) for (x, y) in initial_matches) 1095 1096 # Keep a dict of current matches, which will be updated if better matches are found 1097 matched_a = dict() 1098 matched_b = dict() 1099 for (x, y) in processed_matches: 1100 matched_a[x] = y 1101 matched_b[y] = x 1102 1103 callgraph_a_nodes = set(self.cfg_a.kb.callgraph.nodes()) 1104 callgraph_b_nodes = set(self.cfg_b.kb.callgraph.nodes()) 1105 1106 # while queue is not empty 1107 while to_process: 1108 (func_a, func_b) = to_process.pop() 1109 l.debug("Processing (%#x, %#x)", func_a, func_b) 1110 1111 # we could find new matches in the successors or predecessors of functions 1112 if not self.project.loader.main_object.contains_addr(func_a): 1113 continue 1114 if not self._p2.loader.main_object.contains_addr(func_b): 1115 continue 1116 1117 func_a_succ = self.cfg_a.kb.callgraph.successors(func_a) if func_a in callgraph_a_nodes else [] 1118 func_b_succ = self.cfg_b.kb.callgraph.successors(func_b) if func_b in callgraph_b_nodes else [] 1119 func_a_pred = self.cfg_a.kb.callgraph.predecessors(func_a) if func_a in callgraph_a_nodes else [] 1120 func_b_pred = self.cfg_b.kb.callgraph.predecessors(func_b) if func_b in callgraph_b_nodes else [] 1121 1122 # get possible new matches 1123 new_matches = set(self._get_function_matches(self.attributes_a, self.attributes_b, 1124 func_a_succ, func_b_succ)) 1125 new_matches |= set(self._get_function_matches(self.attributes_a, self.attributes_b, 1126 func_a_pred, func_b_pred)) 1127 1128 # could also find matches as function calls of matched basic blocks 1129 new_matches.update(self._get_call_site_matches(func_a, func_b)) 1130 1131 # for each of the possible new matches add it if it improves the matching 1132 for (x, y) in new_matches: 1133 # skip none functions and syscalls 1134 func_a = self.cfg_a.kb.functions.function(x) 1135 if func_a is None or func_a.is_simprocedure or func_a.is_syscall: 1136 continue 1137 func_b = self.cfg_b.kb.functions.function(y) 1138 if func_b is None or func_b.is_simprocedure or func_b.is_syscall: 1139 continue 1140 1141 if (x, y) not in processed_matches: 1142 processed_matches.add((x, y)) 1143 # if it's a better match than what we already have use it 1144 l.debug("Checking function match %s, %s", hex(x), hex(y)) 1145 if _is_better_match(x, y, matched_a, matched_b, self.attributes_a, self.attributes_b): 1146 l.debug("Adding potential match %s, %s", hex(x), hex(y)) 1147 if x in matched_a: 1148 old_match = matched_a[x] 1149 del matched_b[old_match] 1150 l.debug("Removing previous match (%#x, %#x)", x, old_match) 1151 if y in matched_b: 1152 old_match = matched_b[y] 1153 del matched_a[old_match] 1154 l.debug("Removing previous match (%#x, %#x)", old_match, y) 1155 matched_a[x] = y 1156 matched_b[y] = x 1157 to_process.appendleft((x, y)) 1158 1159 # reformat matches into a set of pairs 1160 self.function_matches = set() 1161 for x,y in matched_a.items(): 1162 # only keep if the pair is in the binary ranges 1163 if self.project.loader.main_object.contains_addr(x) and self._p2.loader.main_object.contains_addr(y): 1164 self.function_matches.add((x, y)) 1165 1166 # get the unmatched functions 1167 self._unmatched_functions_from_a = set(x for x in self.attributes_a.keys() if x not in matched_a) 1168 self._unmatched_functions_from_b = set(x for x in self.attributes_b.keys() if x not in matched_b) 1169 1170 # remove unneeded function diffs 1171 for (x, y) in dict(self._function_diffs): 1172 if (x, y) not in self.function_matches: 1173 del self._function_diffs[(x, y)] 1174 1175 @staticmethod 1176 def _get_function_matches(attributes_a, attributes_b, filter_set_a=None, filter_set_b=None): 1177 """ 1178 :param attributes_a: A dict of functions to their attributes 1179 :param attributes_b: A dict of functions to their attributes 1180 1181 The following parameters are optional. 1182 1183 :param filter_set_a: A set to limit attributes_a to the functions in this set. 1184 :param filter_set_b: A set to limit attributes_b to the functions in this set. 1185 :returns: A list of tuples of matching objects. 1186 """ 1187 # get the attributes that are in the sets 1188 if filter_set_a is None: 1189 filtered_attributes_a = {k: v for k, v in attributes_a.items()} 1190 else: 1191 filtered_attributes_a = {k: v for k, v in attributes_a.items() if k in filter_set_a} 1192 1193 if filter_set_b is None: 1194 filtered_attributes_b = {k: v for k, v in attributes_b.items()} 1195 else: 1196 filtered_attributes_b = {k: v for k, v in attributes_b.items() if k in filter_set_b} 1197 1198 # get closest 1199 closest_a = _get_closest_matches(filtered_attributes_a, filtered_attributes_b) 1200 closest_b = _get_closest_matches(filtered_attributes_b, filtered_attributes_a) 1201 1202 # a match (x,y) is good if x is the closest to y and y is the closest to x 1203 matches = [] 1204 for a in closest_a: 1205 if len(closest_a[a]) == 1: 1206 match = closest_a[a][0] 1207 if len(closest_b[match]) == 1 and closest_b[match][0] == a: 1208 matches.append((a, match)) 1209 1210 return matches 1211 1212from angr.analyses import AnalysesHub 1213AnalysesHub.register_default('BinDiff', BinDiff) 1214