1 2import logging 3import re 4import string 5import struct 6from collections import defaultdict 7from itertools import count 8 9import capstone 10import cffi 11import cle 12import networkx 13import pyvex 14from . import Analysis 15 16from ..knowledge_base import KnowledgeBase 17from ..sim_variable import SimMemoryVariable, SimTemporaryVariable 18 19l = logging.getLogger(name=__name__) 20 21# 22# Exceptions 23# 24 25class BinaryError(Exception): 26 pass 27 28 29class InstructionError(BinaryError): 30 pass 31 32 33class ReassemblerFailureNotice(BinaryError): 34 pass 35 36# 37# Constants 38# 39 40OP_TYPE_REG = 1 41OP_TYPE_IMM = 2 42OP_TYPE_MEM = 3 43 44OP_TYPE_MAP = { 45 OP_TYPE_REG: 'REG', 46 OP_TYPE_IMM: 'IMM', 47 OP_TYPE_MEM: 'MEM', 48} 49 50CAPSTONE_OP_TYPE_MAP = { 51 'X86': { 52 capstone.x86.X86_OP_REG: OP_TYPE_REG, 53 capstone.x86.X86_OP_IMM: OP_TYPE_IMM, 54 capstone.x86.X86_OP_MEM: OP_TYPE_MEM, 55 }, 56 'AMD64': { 57 capstone.x86.X86_OP_REG: OP_TYPE_REG, 58 capstone.x86.X86_OP_IMM: OP_TYPE_IMM, 59 capstone.x86.X86_OP_MEM: OP_TYPE_MEM, 60 }, 61} 62 63CAPSTONE_REG_MAP = { 64 # will be filled up by fill_reg_map() 65 'X86': { 66 }, 67 'AMD64': { 68 } 69} 70 71# Utils 72 73def string_escape(s): 74 75 if isinstance(s, bytes): 76 s = "".join(chr(i) for i in s) 77 78 s = s.encode('unicode_escape').decode("utf-8") 79 80 s = s.replace("\\'", "'") 81 s = s.replace("\"", "\\\"") 82 83 return s 84 85def fill_reg_map(): 86 # TODO: Support more architectures 87 for attr in dir(capstone.x86): 88 if attr.startswith('X86_REG_'): 89 reg_name = attr[8:] 90 reg_offset = getattr(capstone.x86, attr) 91 CAPSTONE_REG_MAP['X86'][reg_offset] = reg_name.lower() 92 93 for attr in dir(capstone.x86): 94 if attr.startswith('X86_REG_'): 95 reg_name = attr[8:] 96 reg_offset = getattr(capstone.x86, attr) 97 CAPSTONE_REG_MAP['AMD64'][reg_offset] = reg_name.lower() 98 99def split_operands(s): 100 101 operands = [ ] 102 operand = "" 103 in_paranthesis = False 104 for i, c in enumerate(s): 105 if in_paranthesis and c == ")": 106 in_paranthesis = False 107 if c == "(": 108 in_paranthesis = True 109 if not in_paranthesis and c == "," and (i == len(s) - 1 or s[i + 1] == ' '): 110 operands.append(operand) 111 operand = "" 112 continue 113 operand += c 114 115 if operand: 116 operands.append(operand) 117 118 return operands 119 120def is_hex(s): 121 try: 122 int(s, 16) 123 return True 124 except ValueError: 125 return False 126 127fill_reg_map() 128 129 130class Label(object): 131 g_label_ctr = count() 132 133 def __init__(self, binary, name, original_addr=None): 134 135 self.binary = binary 136 self.name = name 137 138 self.assigned = False 139 140 self.var_size = None 141 142 if self.name is None: 143 self.name = "label_%d" % next(Label.g_label_ctr) 144 145 self.original_addr = original_addr 146 self.base_addr = None 147 148 # 149 # Overridden predefined methods 150 # 151 152 def __str__(self): 153 """ 154 155 :return: 156 """ 157 158 #if self.var_size is not None: 159 # s = ".type {name},@object\n.comm {name},{size},{size}".format(name=self.name, size=self.var_size) 160 #else: 161 s = ".{name}:".format(name=self.name) 162 return s 163 164 def __hash__(self): 165 return hash(self.name) 166 167 def __eq__(self, other): 168 return self.name == other.name 169 170 # 171 # Properties 172 # 173 174 @property 175 def operand_str(self): 176 if self.base_addr is None: 177 return ".%s" % self.name 178 else: 179 offset = self.offset 180 sign = '+' if offset >= 0 else '-' 181 offset = abs(offset) 182 return ".%s%s%d" % (self.name, sign, offset) 183 184 @property 185 def offset(self): 186 if self.base_addr is None: 187 return 0 188 return self.original_addr - self.base_addr 189 190 # 191 # Static methods 192 # 193 194 @staticmethod 195 def new_label(binary, name=None, function_name=None, original_addr=None, data_label=False): 196 if function_name is not None: 197 return FunctionLabel(binary, function_name, original_addr) 198 elif data_label: 199 return DataLabel(binary, original_addr) 200 else: 201 return Label(binary, name, original_addr=original_addr) 202 203 204class DataLabel(Label): 205 def __init__(self, binary, original_addr, name=None): 206 Label.__init__(self, binary, name, original_addr=original_addr) 207 208 @property 209 def operand_str(self): 210 if self.base_addr is None: 211 return self.name 212 else: 213 offset = self.offset 214 sign = '+' if offset >= 0 else '-' 215 offset = abs(offset) 216 return '(%s%s%s)' % (self.name, sign, offset) 217 218 def __str__(self): 219 #if self.var_size is not None: 220 # s = ".comm {name},{size},{size}".format(name=self.name, size=self.var_size) 221 #else: 222 s = "%s:" % (self.name) 223 return s 224 225 226class FunctionLabel(Label): 227 def __init__(self, binary, function_name, original_addr, plt=False): 228 Label.__init__(self, binary, function_name, original_addr=original_addr) 229 230 self.plt = plt 231 232 @property 233 def function_name(self): 234 return self.name 235 236 @property 237 def operand_str(self): 238 return self.name 239 240 def __str__(self): 241 return ("\t.globl {func_name}\n" + 242 "\t.type {func_name}, @function\n" + 243 "{func_name}:").format( 244 func_name=self.function_name 245 ) 246 247 248class ObjectLabel(Label): 249 def __init__(self, binary, symbol_name, original_addr, plt=False): 250 Label.__init__(self, binary, symbol_name, original_addr=original_addr) 251 252 self.plt = plt 253 254 @property 255 def symbol_name(self): 256 return self.name 257 258 @property 259 def operand_str(self): 260 return self.name 261 262 def __str__(self): 263 return ("\t.globl {symbol_name}\n" + 264 "\t.type {symbol_name}, @object\n" + 265 "{symbol_name}:").format( 266 symbol_name=self.symbol_name 267 ) 268 269 270class NotypeLabel(Label): 271 def __init__(self, binary, symbol_name, original_addr, plt=False): 272 Label.__init__(self, binary, symbol_name, original_addr=original_addr) 273 274 self.plt = plt 275 276 @property 277 def symbol_name(self): 278 return self.name 279 280 @property 281 def operand_str(self): 282 return self.name 283 284 def __str__(self): 285 return ("\t.globl {symbol_name}\n" + 286 "\t.type {symbol_name}, @notype\n" + 287 "{symbol_name}:").format( 288 symbol_name=self.symbol_name 289 ) 290 291 292class SymbolManager(object): 293 """ 294 SymbolManager manages all symbols in the binary. 295 """ 296 def __init__(self, binary, cfg): 297 """ 298 Constructor. 299 300 :param Reassembler binary: The Binary analysis instance. 301 :param angr.analyses.CFG cfg: The CFG analysis instance. 302 :return: None 303 """ 304 305 self.binary = binary 306 self.project = binary.project 307 self.cfg = cfg 308 309 self.addr_to_label = defaultdict(list) 310 self.symbol_names = set() # deduplicate symbol names 311 312 def get_unique_symbol_name(self, symbol_name): 313 if symbol_name not in self.symbol_names: 314 self.symbol_names.add(symbol_name) 315 return symbol_name 316 317 i = 0 318 while True: 319 name = "%s_%d" % (symbol_name, i) 320 if name not in self.symbol_names: 321 self.symbol_names.add(name) 322 return name 323 i += 1 324 325 def new_label(self, addr, name=None, is_function=None, force=False): 326 327 if force: 328 if self.binary.main_nonexecutable_regions_contain(addr): 329 label = DataLabel(self.binary, addr, name=name) 330 else: 331 label = Label.new_label(self.binary, name=name, original_addr=addr) 332 self.addr_to_label[addr].append(label) 333 return label 334 335 if addr in self.addr_to_label: 336 return self.addr_to_label[addr][0] 337 338 # Check if the address points to a function by checking the plt of main binary 339 reverse_plt = self.project.loader.main_object.reverse_plt 340 341 if addr in reverse_plt: 342 # It's a PLT entry! 343 label = FunctionLabel(self.binary, reverse_plt[addr], addr, plt=True) 344 elif addr is not None and self.project.loader.find_symbol(addr) is not None: 345 # It's an extern symbol 346 symbol = self.project.loader.find_symbol(addr) 347 if symbol.owner is self.project.loader.main_object: 348 symbol_name = symbol.name 349 if '@' in symbol_name: 350 symbol_name = symbol_name[ : symbol_name.index('@') ] 351 352 # check the type... 353 if symbol.type == cle.SymbolType.TYPE_FUNCTION: 354 # it's a function! 355 unique_symbol_name = self.get_unique_symbol_name(symbol_name) 356 label = FunctionLabel(self.binary, unique_symbol_name, addr) 357 elif symbol.type == cle.SymbolType.TYPE_OBJECT: 358 # it's an object 359 unique_symbol_name = self.get_unique_symbol_name(symbol_name) 360 label = ObjectLabel(self.binary, unique_symbol_name, addr) 361 elif symbol.type == cle.SymbolType.TYPE_NONE: 362 # notype 363 unique_symbol_name = self.get_unique_symbol_name(symbol_name) 364 label = NotypeLabel(self.binary, unique_symbol_name, addr) 365 elif symbol.type == cle.SymbolType.TYPE_SECTION: 366 # section label 367 # use a normal label instead 368 if not name: 369 # handle empty names 370 name = None 371 label = Label.new_label(self.binary, name=name, original_addr=addr) 372 else: 373 raise Exception('Unsupported symbol type %s. Bug Fish about it!' % symbol.type) 374 375 else: 376 raise Exception("the symbol %s is not owned by the main object. Try reload the project with" 377 "\"auto_load_libs=False\". If that does not solve the issue, please report to GitHub." 378 % symbol.name 379 ) 380 381 elif (addr is not None and addr in self.cfg.functions) or is_function: 382 # It's a function identified by angr's CFG recovery 383 384 if is_function and name is not None: 385 function_name = name 386 else: 387 function_name = self.cfg.functions[addr].name 388 389 # special function name for entry point 390 if addr == self.project.entry: 391 function_name = "_start" 392 393 label = FunctionLabel(self.binary, function_name, addr) 394 elif addr is not None and self.binary.main_nonexecutable_regions_contain(addr): 395 label = DataLabel(self.binary, addr) 396 else: 397 label = Label.new_label(self.binary, name=name, original_addr=addr) 398 399 if addr is not None: 400 self.addr_to_label[addr].append(label) 401 402 return label 403 404 def label_got(self, addr, label): 405 """ 406 Mark a certain label as assigned (to an instruction or a block of data). 407 408 :param int addr: The address of the label. 409 :param angr.analyses.reassembler.Label label: 410 The label that is just assigned. 411 :return: None 412 """ 413 414 if label in self.addr_to_label[addr]: 415 label.assigned = True 416 417 418class Operand(object): 419 def __init__(self, binary, insn_addr, insn_size, capstone_operand, operand_str, mnemonic, operand_offset, syntax=None): 420 """ 421 Constructor. 422 423 :param Reassembler binary: The Binary analysis. 424 :param int insn_addr: Address of the instruction. 425 :param capstone_operand: 426 :param str operand_str: the string representation of this operand 427 :param str mnemonic: Mnemonic of the instruction that this operand belongs to. 428 :param int operand_offset: offset of the operand into the instruction. 429 :param str syntax: Provide a way to override the default syntax coming from `binary`. 430 :return: None 431 """ 432 433 self.binary = binary 434 self.project = binary.project 435 self.insn_addr = insn_addr 436 self.insn_size = insn_size 437 self.operand_str = operand_str 438 self.mnemonic = mnemonic 439 self.operand_offset = operand_offset 440 self.syntax = self.binary.syntax if syntax is None else syntax 441 self.type = None 442 self.size = capstone_operand.size 443 444 # IMM 445 self.is_coderef = None 446 self.is_dataref = None 447 self.label = None 448 self.label_offset = 0 449 450 # MEM 451 self.base = None 452 self.index = None 453 self.scale = None 454 self.disp = None 455 456 self.disp_is_coderef = None 457 self.disp_is_dataref = None 458 self.disp_label = None 459 self.disp_label_offset = 0 460 461 self._initialize(capstone_operand) 462 463 # 464 # Public methods 465 # 466 467 def assembly(self): 468 if self.type == OP_TYPE_IMM and self.label: 469 if self.label_offset > 0: 470 return "%s + %d" % (self.label.operand_str, self.label_offset) 471 elif self.label_offset < 0: 472 return "%s - %d" % (self.label.operand_str, abs(self.label_offset)) 473 else: 474 return self.label.operand_str 475 476 elif self.type == OP_TYPE_MEM: 477 478 disp = "" 479 if self.disp: 480 if self.disp_label: 481 if self.disp_label_offset > 0: 482 disp = "%s + %d" % (self.disp_label.operand_str, self.disp_label_offset) 483 elif self.disp_label_offset < 0: 484 disp = "%s - %d" % (self.disp_label.operand_str, abs(self.disp_label_offset)) 485 else: 486 disp = self.disp_label.operand_str 487 else: 488 disp = "%d" % self.disp 489 490 base = "" 491 if self.base: 492 base = CAPSTONE_REG_MAP[self.project.arch.name][self.base] 493 494 if self.syntax == 'at&t': 495 # displacement(base, index, scale) 496 base = "%%%s" % base if base else "" 497 498 if "*" in self.operand_str and disp: 499 # absolute memory address 500 disp = "*" + disp 501 502 if self.index: 503 s = "%s(%s, %%%s, %d)" % (disp, base, CAPSTONE_REG_MAP[self.project.arch.name][self.index], 504 self.scale 505 ) 506 elif self.base: # not self.index 507 s = "%s(%s)" % (disp, base) 508 else: 509 s = disp 510 511 return s 512 513 else: 514 s = [ ] 515 if base: 516 s.append(base) 517 518 if self.index and self.scale: 519 if s: 520 s.append('+') 521 s.append("(%s * %d)" % (CAPSTONE_REG_MAP[self.project.arch.name][self.index], self.scale)) 522 523 if disp: 524 if disp.startswith('-'): 525 s.append('-') 526 s.append(disp[1:]) 527 else: 528 if s: 529 s.append('+') 530 s.append(disp) 531 532 asm = " ".join(s) 533 534 # we need to specify the size here 535 if self.size == 16: 536 asm = 'xmmword ptr [%s]' % asm 537 elif self.size == 8: 538 asm = 'qword ptr [%s]' % asm 539 elif self.size == 4: 540 asm = 'dword ptr [%s]' % asm 541 elif self.size == 2: 542 asm = 'word ptr [%s]' % asm 543 elif self.size == 1: 544 asm = 'byte ptr [%s]' % asm 545 else: 546 raise BinaryError('Unsupported memory operand size for operand "%s"' % self.operand_str) 547 548 return asm 549 550 else: 551 # Nothing special 552 return None 553 554 # 555 # Overridden predefined methods 556 # 557 558 def __str__(self): 559 """ 560 561 :return: 562 """ 563 564 op_type = OP_TYPE_MAP[self.type] 565 566 ref_type = "" 567 if self.is_coderef: 568 ref_type = "CODEREF" 569 elif self.is_dataref: 570 ref_type = "DATAREF" 571 572 if ref_type: 573 return "%s <%s>" % (op_type, ref_type) 574 else: 575 return op_type 576 577 # 578 # Properties 579 # 580 581 @property 582 def is_immediate(self): 583 return self.type == OP_TYPE_IMM 584 585 @property 586 def symbolized(self): 587 return self.label is not None or self.disp_label is not None 588 589 # 590 # Private methods 591 # 592 593 def _initialize(self, capstone_operand): 594 595 arch_name = self.project.arch.name 596 self.type = CAPSTONE_OP_TYPE_MAP[arch_name][capstone_operand.type] 597 598 if self.type == OP_TYPE_IMM: 599 # Check if this is a reference to code 600 imm = capstone_operand.imm 601 602 self.is_coderef, self.is_dataref, baseaddr = \ 603 self._imm_to_ptr(imm, self.type, self.mnemonic) 604 605 if self.is_coderef or self.is_dataref: 606 self.label = self.binary.symbol_manager.new_label(addr=baseaddr) 607 self.label_offset = imm - baseaddr 608 609 if self.mnemonic.startswith('j') or self.mnemonic.startswith('loop'): 610 sort = 'jump' 611 elif self.mnemonic.startswith('call'): 612 sort = 'call' 613 else: 614 sort = 'absolute' 615 self.binary.register_instruction_reference(self.insn_addr, imm, sort, self.operand_offset) 616 617 elif self.type == OP_TYPE_MEM: 618 619 self.base = capstone_operand.mem.base 620 self.index = capstone_operand.mem.index 621 self.scale = capstone_operand.mem.scale 622 self.disp = capstone_operand.mem.disp 623 624 if self.binary.project.arch.name == 'AMD64' and CAPSTONE_REG_MAP['AMD64'][self.base] == 'rip': 625 # rip-relative addressing 626 self.disp += self.insn_addr + self.insn_size 627 628 self.disp_is_coderef, self.disp_is_dataref, baseaddr = \ 629 self._imm_to_ptr(self.disp, self.type, self.mnemonic) 630 631 if self.disp_is_coderef or self.disp_is_dataref: 632 self.disp_label = self.binary.symbol_manager.new_label(addr=baseaddr) 633 self.disp_label_offset = self.disp - baseaddr 634 635 self.binary.register_instruction_reference(self.insn_addr, self.disp, 'absolute', self.operand_offset) 636 637 def _imm_to_ptr(self, imm, operand_type, mnemonic): # pylint:disable=no-self-use,unused-argument 638 """ 639 Try to classify an immediate as a pointer. 640 641 :param int imm: The immediate to test. 642 :param int operand_type: Operand type of this operand, can either be IMM or MEM. 643 :param str mnemonic: Mnemonic of the instruction that this operand belongs to. 644 :return: A tuple of (is code reference, is data reference, base address, offset) 645 :rtype: tuple 646 """ 647 648 is_coderef, is_dataref = False, False 649 baseaddr = None 650 651 if not is_coderef and not is_dataref: 652 if self.binary.main_executable_regions_contain(imm): 653 # does it point to the beginning of an instruction? 654 if imm in self.binary.all_insn_addrs: 655 is_coderef = True 656 baseaddr = imm 657 658 if not is_coderef and not is_dataref: 659 if self.binary.main_nonexecutable_regions_contain(imm): 660 is_dataref = True 661 baseaddr = imm 662 663 if not is_coderef and not is_dataref: 664 tolerance_before = 1024 if operand_type == OP_TYPE_MEM else 64 665 contains_, baseaddr_ = self.binary.main_nonexecutable_region_limbos_contain(imm, 666 tolerance_before=tolerance_before, 667 tolerance_after=1024 668 ) 669 if contains_: 670 is_dataref = True 671 baseaddr = baseaddr_ 672 673 if not contains_: 674 contains_, baseaddr_ = self.binary.main_executable_region_limbos_contain(imm) 675 if contains_: 676 is_coderef = True 677 baseaddr = baseaddr_ 678 679 return (is_coderef, is_dataref, baseaddr) 680 681 682class Instruction(object): 683 """ 684 High-level representation of an instruction in the binary 685 """ 686 def __init__(self, binary, addr, size, insn_bytes, capstone_instr): 687 """ 688 689 :param Reassembler binary: The Binary analysis 690 :param int addr: Address of the instruction 691 :param int size: Size of the instruction 692 :param str insn_bytes: Instruction bytes 693 :param capstone_instr: Capstone Instr object. 694 :return: None 695 """ 696 697 self.binary = binary 698 self.project = binary.project 699 self.addr = addr 700 self.size = size 701 self.bytes = insn_bytes 702 703 self.mnemonic = capstone_instr.mnemonic 704 self.op_str = capstone_instr.op_str 705 self.capstone_operand_types = [ operand.type for operand in capstone_instr.operands ] 706 707 self.operands = [ ] 708 709 self.labels = [ ] 710 711 operand_offsets = [ ] 712 for operand in capstone_instr.operands: 713 if operand.type == capstone.CS_OP_IMM: 714 operand_offsets.append(capstone_instr.imm_offset) 715 elif operand.type == capstone.CS_OP_MEM: 716 operand_offsets.append(capstone_instr.disp_offset) 717 else: 718 operand_offsets.append(None) 719 720 if self.addr is not None: 721 self._initialize(capstone_instr.operands, operand_offsets) 722 723 # 724 # Overridden predefined instructions 725 # 726 def __str__(self): 727 """ 728 729 :return: 730 """ 731 732 assembly = self.assembly(comments=True, symbolized=False) 733 return assembly 734 735 # 736 # Public methods 737 # 738 739 def assign_labels(self): 740 741 if self.addr in self.binary.symbol_manager.addr_to_label: 742 labels = self.binary.symbol_manager.addr_to_label[self.addr] 743 for label in labels: 744 if label not in self.labels: 745 self.labels.append(label) 746 747 def dbg_comments(self): 748 operands = ", ".join([ str(operand) for operand in self.operands ]) 749 capstone_str = "%#08x:\t%s\t%s" % (self.addr, self.mnemonic, self.op_str) 750 comments = "\t# %s [%s]" % (capstone_str, operands) 751 752 return comments 753 754 def assembly(self, comments=False, symbolized=True): 755 """ 756 757 :return: 758 """ 759 760 if comments: 761 dbg_comments = self.dbg_comments() 762 else: 763 dbg_comments = "" 764 765 labels = "\n".join([ str(lbl) for lbl in self.labels ]) 766 767 inserted_asm_before_label = "" 768 if self.addr in self.binary.inserted_asm_before_label: 769 # put all assembly code there 770 if comments: 771 inserted_asm_before_label += "\t# Inserted assembly code (before label):\n" 772 inserted_asm_before_label = "\n".join(self.binary.inserted_asm_before_label[self.addr]) 773 inserted_asm_before_label += "\n" 774 775 inserted_asm_after_label = "" 776 if self.addr in self.binary.inserted_asm_after_label: 777 # put all assembly code there 778 if comments: 779 inserted_asm_after_label += "\t# Inserted assembly code (after label):\n" 780 inserted_asm_after_label = "\n".join(self.binary.inserted_asm_after_label[self.addr]) 781 inserted_asm_after_label += "\n" 782 783 not_symbolized = "\t%s\t%s" % (self.mnemonic, self.op_str) 784 if not symbolized: 785 asm = not_symbolized 786 787 elif not any([ operand.symbolized for operand in self.operands ]): 788 # No label is involved 789 asm = not_symbolized 790 791 elif not self.operands: 792 # There is no operand 793 asm = not_symbolized 794 795 else: 796 # Now it's the tricky part. capstone doesn't give us anyway to print individual operand. We gotta parse it 797 # by ourselves 798 # Remove the address 799 #capstone_str = capstone_str[capstone_str.find('\t') + 1 : ] 800 801 all_operands = [ operand.operand_str for operand in self.operands] 802 mnemonic = self.mnemonic 803 804 for i, op in enumerate(self.operands): 805 op_asm = op.assembly() 806 if op_asm is not None: 807 if op.type in (OP_TYPE_IMM, OP_TYPE_MEM): 808 all_operands[i] = op_asm 809 else: 810 raise BinaryError("Unsupported operand type %d." % op.type) 811 812 if self.capstone_operand_types[i] == capstone.CS_OP_IMM: 813 if mnemonic.startswith('j') or mnemonic.startswith('call') or mnemonic.startswith('loop'): 814 pass 815 else: 816 # mark the size of the variable 817 if op.is_dataref: 818 op.label.var_size = op.size 819 if self.binary.syntax == 'at&t': 820 all_operands[i] = "$" + all_operands[i] 821 else: 822 all_operands[i] = 'OFFSET FLAT:' + all_operands[i] 823 824 asm = "\t%s%s" % (mnemonic, "\t" + ", ".join(all_operands)) 825 826 if self.addr in self.binary._removed_instructions: 827 contents = [dbg_comments, inserted_asm_before_label, labels, inserted_asm_after_label] 828 else: 829 contents = [ dbg_comments, inserted_asm_before_label, labels, inserted_asm_after_label, asm ] 830 contents = [ a for a in contents if a ] 831 832 return "\n".join(contents) 833 834 # 835 # Private methods 836 # 837 838 def _initialize(self, capstone_operands, operand_offsets): 839 """ 840 Initialize this object 841 842 :return: None 843 """ 844 845 if self.addr is None: 846 raise InstructionError('self.addr must be specified') 847 848 self._initialize_operands(capstone_operands, operand_offsets) 849 850 def _initialize_operands(self, capstone_operands, operand_offsets): 851 """ 852 853 :return: 854 """ 855 856 all_operands = split_operands(self.op_str) 857 capstone_operands = capstone_operands[ - len(all_operands) : ] # sometimes there are more operands than expected... 858 operand_offsets = operand_offsets[ - len(all_operands) : ] 859 860 for operand, operand_str, offset in zip(capstone_operands, all_operands, operand_offsets): 861 self.operands.append(Operand(self.binary, self.addr, self.size, operand, operand_str, self.mnemonic, offset)) 862 863class BasicBlock(object): 864 """ 865 BasicBlock represents a basic block in the binary. 866 """ 867 def __init__(self, binary, addr, size): 868 """ 869 Constructor. 870 871 :param Reassembler binary: The Binary analysis. 872 :param int addr: Address of the block 873 :param int size: Size of the block 874 :return: None 875 """ 876 877 self.binary = binary 878 self.project = binary.project 879 880 self.addr = addr 881 self.size = size 882 883 self.instructions = [ ] 884 885 self._initialize() 886 887 # 888 # Overridden predefined methods 889 # 890 891 def __str__(self): 892 """ 893 Return a linear representation of all instructions in this block. 894 :return: 895 """ 896 897 return self.assembly(symbolized=False) 898 899 def __repr__(self): 900 901 return "<BasicBlock %#08x>" % self.addr 902 903 # 904 # Public methods 905 # 906 907 def assign_labels(self): 908 for ins in self.instructions: 909 ins.assign_labels() 910 911 def assembly(self, comments=False, symbolized=True): 912 s = "\n".join([ins.assembly(comments=comments, symbolized=symbolized) for ins in self.instructions]) 913 914 return s 915 916 def instruction_addresses(self): 917 return sorted([ (ins.addr, ins.size) for ins in self.instructions ], key=lambda x: x[0]) 918 919 # 920 # Private methods 921 # 922 923 def _initialize(self): 924 """ 925 926 :return: 927 """ 928 929 # re-lifting 930 block = self.project.factory.fresh_block(self.addr, self.size) 931 capstone_obj = block.capstone 932 933 # Fill in instructions 934 for instr in capstone_obj.insns: 935 instruction = Instruction(self.binary, instr.address, instr.size, None, instr) 936 937 self.instructions.append(instruction) 938 939 self.instructions = sorted(self.instructions, key=lambda x: x.addr) 940 941class Procedure(object): 942 """ 943 Procedure in the binary. 944 """ 945 def __init__(self, binary, function=None, addr=None, size=None, name=None, section=".text", asm_code=None): 946 """ 947 Constructor. 948 949 :param Reassembler binary: The Binary analysis. 950 :param angr.knowledge.Function function: The function it represents 951 :param int addr: Address of the function. Not required if `function` is provided. 952 :param int size: Size of the function. Not required if `function` is provided. 953 :param str section: Which section this function comes from. 954 :return: None 955 """ 956 957 self.binary = binary 958 self.project = binary.project 959 960 if function is None: 961 self.addr = addr 962 self.size = size 963 964 self.function = None 965 self._name = name 966 967 else: 968 self.addr = function.addr 969 self.size = None # FIXME: 970 971 self.function = function 972 self._name = function.name 973 974 self.asm_code = asm_code 975 self.section = section 976 977 self.blocks = [ ] 978 979 self._initialize() 980 981 # 982 # Attributes 983 # 984 985 @property 986 def name(self): 987 """ 988 Get function name from the labels of the very first block. 989 :return: Function name if there is any, None otherwise 990 :rtype: string 991 """ 992 993 if self._name is not None: 994 return self._name 995 996 if not self.blocks: 997 return None 998 999 if not self.blocks[0].instructions: 1000 return None 1001 1002 if not self.blocks[0].instructions[0].labels: 1003 return None 1004 1005 lbl = self.blocks[0].instructions[0].labels[0] 1006 1007 if isinstance(lbl, FunctionLabel): 1008 return lbl.function_name 1009 1010 return None 1011 1012 @property 1013 def is_plt(self): 1014 """ 1015 If this function is a PLT entry or not. 1016 :return: True if this function is a PLT entry, False otherwise 1017 :rtype: bool 1018 """ 1019 1020 if self.section == ".plt": 1021 return True 1022 1023 if not self.blocks: 1024 return False 1025 1026 initial_block = next((b for b in self.blocks if b.addr == self.addr), None) 1027 if initial_block is None: 1028 return False 1029 1030 if not initial_block.instructions: 1031 return False 1032 1033 if not initial_block.instructions[0].labels: 1034 return False 1035 1036 lbl = initial_block.instructions[0].labels[0] 1037 1038 if isinstance(lbl, FunctionLabel): 1039 return lbl.plt 1040 1041 return False 1042 1043 # 1044 # Overridden predefined methods 1045 # 1046 def __str__(self): 1047 """ 1048 Output all instructions of the current procedure 1049 :return: 1050 """ 1051 1052 return self.assembly(symbolized=False) 1053 1054 # 1055 # Public methods 1056 # 1057 1058 def assign_labels(self): 1059 for block in self.blocks: 1060 block.assign_labels() 1061 1062 def assembly(self, comments=False, symbolized=True): 1063 """ 1064 Get the assembly manifest of the procedure. 1065 1066 :param comments: 1067 :param symbolized: 1068 :return: A list of tuples (address, basic block assembly), ordered by basic block addresses 1069 :rtype: list 1070 """ 1071 1072 assembly = [ ] 1073 1074 header = "\t.section\t{section}\n\t.align\t{alignment}\n".format(section=self.section, 1075 alignment=self.binary.section_alignment(self.section) 1076 ) 1077 if self.addr is not None: 1078 procedure_name = "%#x" % self.addr 1079 else: 1080 procedure_name = self._name 1081 header += "\t#Procedure %s\n" % procedure_name 1082 1083 if self._output_function_label: 1084 if self.addr: 1085 function_label = self.binary.symbol_manager.new_label(self.addr) 1086 else: 1087 function_label = self.binary.symbol_manager.new_label(None, name=procedure_name, is_function=True) 1088 header += str(function_label) + "\n" 1089 1090 assembly.append((self.addr, header)) 1091 1092 if self.asm_code: 1093 s = self.asm_code 1094 assembly.append((self.addr, s)) 1095 elif self.blocks: 1096 for b in sorted(self.blocks, key=lambda x:x.addr): # type: BasicBlock 1097 s = b.assembly(comments=comments, symbolized=symbolized) 1098 assembly.append((b.addr, s)) 1099 1100 return assembly 1101 1102 def instruction_addresses(self): 1103 """ 1104 Get all instruction addresses in the binary. 1105 1106 :return: A list of sorted instruction addresses. 1107 :rtype: list 1108 """ 1109 1110 addrs = [ ] 1111 for b in sorted(self.blocks, key=lambda x: x.addr): # type: BasicBlock 1112 addrs.extend(b.instruction_addresses()) 1113 1114 return sorted(set(addrs), key=lambda x: x[0]) 1115 1116 # 1117 # Private methods 1118 # 1119 1120 def _initialize(self): 1121 1122 if self.function is None: 1123 if not self.asm_code: 1124 raise BinaryError('Unsupported procedure type. You must either specify a angr.knowledge.Function ' 1125 'object, or specify assembly code.') 1126 1127 1128 else: 1129 for block_addr in self.function.block_addrs: 1130 b = BasicBlock(self.binary, block_addr, self.function._block_sizes[block_addr]) 1131 self.blocks.append(b) 1132 1133 self.blocks = sorted(self.blocks, key=lambda x: x.addr) 1134 1135 @property 1136 def _output_function_label(self): 1137 """ 1138 Determines if we want to output the function label in assembly. We output the function label only when the 1139 original instruction does not output the function label. 1140 1141 :return: True if we should output the function label, False otherwise. 1142 :rtype: bool 1143 """ 1144 1145 if self.asm_code: 1146 return True 1147 if not self.blocks: 1148 return True 1149 1150 the_block = next((b for b in self.blocks if b.addr == self.addr), None) 1151 if the_block is None: 1152 return True 1153 if not the_block.instructions: 1154 return True 1155 if not the_block.instructions[0].labels: 1156 return True 1157 return False 1158 1159class ProcedureChunk(Procedure): 1160 """ 1161 Procedure chunk. 1162 """ 1163 def __init__(self, project, addr, size): 1164 """ 1165 Constructor. 1166 1167 :param project: 1168 :param addr: 1169 :param size: 1170 :return: 1171 """ 1172 1173 Procedure.__init__(self, project, addr=addr, size=size) 1174 1175 1176class Data(object): 1177 def __init__(self, binary, memory_data=None, section=None, section_name=None, name=None, size=None, sort=None, 1178 addr=None, initial_content=None): 1179 1180 self.binary = binary 1181 self.project = binary.project 1182 self.memory_data = memory_data 1183 self.section = section 1184 self.section_name = section.name if section else section_name 1185 1186 self.addr = addr 1187 self.name = name 1188 self.size = size 1189 self.sort = sort 1190 self._initial_content = initial_content # only used by patcherex 1191 1192 self._content = None 1193 1194 self.labels = [ ] # a list of tuples like (address, label) 1195 self.end_labels = [ ] # a list of labels only show up at the end of this memory data entry. mostly because the 1196 # data block after this one is removed for some reason. only assigned by other methods. 1197 1198 self.null_terminated = None 1199 1200 self.skip = False 1201 1202 self._initialize() 1203 1204 def __repr__(self): 1205 return "<DataItem %s@%#08x, %d bytes>" % (self.sort, self.addr, self.size) 1206 1207 @property 1208 def content(self): 1209 return self._content 1210 1211 @content.setter 1212 def content(self, v): 1213 self._content = v 1214 1215 def shrink(self, new_size): 1216 """ 1217 Reduce the size of this block 1218 1219 :param int new_size: The new size 1220 :return: None 1221 """ 1222 self.size = new_size 1223 1224 if self.sort == 'string': 1225 self.null_terminated = False # string without the null byte terminator 1226 self._content[0] = self._content[0][ : self.size] 1227 1228 elif self.sort == 'pointer-array': 1229 pointer_size = self.binary.project.arch.bytes 1230 1231 if self.size % pointer_size != 0: 1232 # it's not aligned? 1233 raise BinaryError('Fails at Data.shrink()') 1234 1235 pointers = self.size // pointer_size 1236 self._content = self._content[ : pointers] 1237 1238 else: 1239 # unknown 1240 self._content = [ self._content[0][ : self.size ] ] 1241 1242 def desymbolize(self): 1243 """ 1244 We believe this was a pointer and symbolized it before. Now we want to desymbolize it. 1245 1246 The following actions are performed: 1247 - Reload content from memory 1248 - Mark the sort as 'unknown' 1249 1250 :return: None 1251 """ 1252 1253 self.sort = 'unknown' 1254 content = self.binary.fast_memory_load(self.addr, self.size, bytes) 1255 self.content = [ content ] 1256 1257 def assign_labels(self): 1258 1259 # TODO: What if it's not aligned for some sort of data, like pointer array? 1260 1261 if self.addr is None: 1262 # this piece of data comes from a patch, not from the original binary 1263 return 1264 1265 # Put labels to self.labels 1266 for i in range(self.size): 1267 addr = self.addr + i 1268 if addr in self.binary.symbol_manager.addr_to_label: 1269 labels = self.binary.symbol_manager.addr_to_label[addr] 1270 1271 for label in labels: 1272 if self.sort == 'pointer-array' and addr % (self.project.arch.bytes) != 0: 1273 # we need to modify the base address of the label 1274 base_addr = addr - (addr % (self.project.arch.bytes)) 1275 label.base_addr = base_addr 1276 tpl = (base_addr, label) 1277 if tpl not in self.labels: 1278 self.labels.append(tpl) 1279 else: 1280 tpl = (addr, label) 1281 if tpl not in self.labels: 1282 self.labels.append(tpl) 1283 1284 def assembly(self, comments=False, symbolized=True): 1285 s = "" 1286 1287 if comments: 1288 if self.addr is not None: 1289 s += "\t# data @ %#08x\n" % self.addr 1290 else: 1291 s += "\t# data (%s)\n" % self.name 1292 1293 if self.skip: 1294 return s 1295 1296 if self.sort == 'string': 1297 1298 if symbolized: 1299 ss = [ ] 1300 last_pos = 0 1301 for i, tpl in enumerate(self.labels): 1302 addr, lbl = tpl 1303 1304 # split the string 1305 pos = addr - self.addr 1306 # endpos = self.labels[i + 1][0] - self.addr + 1 if i < len(self.labels) - 1 else self.size 1307 string_piece = self.content[0][last_pos : pos] 1308 1309 last_pos = pos 1310 1311 if i == len(self.labels) - 1 and pos == self.size: 1312 directive = '.asciz' # null at the end 1313 else: 1314 directive = '.ascii' 1315 1316 if string_piece: 1317 ss.append("\t{directive} \"{str}\"".format( 1318 str=string_escape(string_piece), 1319 directive=directive, 1320 ) 1321 ) 1322 ss.append("%s" % str(lbl)) 1323 1324 if last_pos <= self.size - 1: 1325 string_piece = self.content[0][last_pos: ] 1326 directive = ".ascii" if self.null_terminated is False else ".asciz" 1327 1328 ss.append("\t{directive} \"{str}\"".format( 1329 str=string_escape(string_piece), 1330 directive=directive, 1331 )) 1332 1333 s += "\n".join(ss) 1334 else: 1335 if self.null_terminated is False: 1336 directive = ".ascii" 1337 else: 1338 directive = ".asciz" 1339 s += "\t.{directive} \"{str}\"".format(directive=directive, str=string_escape(self.content[0])) 1340 s += '\n' 1341 1342 elif self.sort == 'pointer-array': 1343 1344 if self.binary.project.arch.bits == 32: 1345 directive = '.long' 1346 elif self.binary.project.arch.bits == 64: 1347 directive = '.quad' 1348 else: 1349 raise BinaryError('Unsupported pointer size %d', self.binary.project.arch.bits) 1350 1351 if symbolized: 1352 addr_to_labels = {} 1353 for k, v in self.labels: 1354 if k not in addr_to_labels: 1355 addr_to_labels[k] = [ ] 1356 addr_to_labels[k].append(v) 1357 1358 i = 0 1359 if self.name is not None: 1360 s += "%s:\n" % self.name 1361 for symbolized_label in self.content: 1362 1363 if self.addr is not None and (self.addr + i) in addr_to_labels: 1364 for label in addr_to_labels[self.addr + i]: 1365 s += "%s\n" % str(label) 1366 elif self.addr is not None and (self.addr + i) in self.binary.symbol_manager.addr_to_label: 1367 labels = self.binary.symbol_manager.addr_to_label[self.addr + i] 1368 for label in labels: 1369 s += "%s\n" % str(label) 1370 i += self.project.arch.bytes 1371 1372 if isinstance(symbolized_label, int): 1373 s += "\t%s %d\n" % (directive, symbolized_label) 1374 else: 1375 s += "\t%s %s\n" % (directive, symbolized_label.operand_str) 1376 1377 else: 1378 for label in self.content: 1379 s += "\t%s %s\n" % (directive, label.operand_str) 1380 1381 elif self.sort == 'segment-boundary': 1382 1383 if symbolized: 1384 for _, label in self.labels: 1385 s += "\t%s\n" % str(label) 1386 1387 elif self.sort == 'integer': 1388 # display it as bytes only when there are references pointing to the middle 1389 content = [ ] 1390 1391 if self.size == 1: 1392 directive = '.byte' 1393 fmt_str = 'B' 1394 elif self.size == 2: 1395 directive = '.short' 1396 fmt_str = '<H' 1397 elif self.size == 4: 1398 directive = '.long' 1399 fmt_str = '<I' 1400 elif self.size == 8: 1401 directive = '.quad' 1402 fmt_str = '<Q' 1403 else: 1404 # we'll have to display it as a bunch of bytes 1405 directive = None 1406 fmt_str = None 1407 1408 if symbolized: 1409 addr_to_labels = {} 1410 for k, v in self.labels: 1411 if k not in addr_to_labels: 1412 addr_to_labels[k] = [] 1413 addr_to_labels[k].append(v) 1414 1415 show_integer = False 1416 if len(addr_to_labels) == 0: 1417 show_integer = True 1418 elif len(addr_to_labels) == 1: 1419 if self.addr is not None and next(iter(addr_to_labels.keys())) == self.addr: 1420 show_integer = True 1421 elif self.addr is None and next(iter(addr_to_labels.keys())) == 0: 1422 show_integer = True 1423 1424 if directive is not None and show_integer: 1425 # nice, we should display it as an integer 1426 if addr_to_labels: 1427 for label in next(iter(addr_to_labels.values())): 1428 content += [ "%s" % str(label) ] 1429 1430 integer = struct.unpack(fmt_str, self.content[0])[0] 1431 content += ['\t{directive} {integer}'.format( 1432 directive=directive, 1433 integer='%#x' % integer, 1434 )] 1435 1436 else: 1437 # display it as bytes... 1438 addr = self.addr if self.addr is not None else 0 1439 for piece in self.content: 1440 for c in piece: 1441 if addr in addr_to_labels: 1442 for label in addr_to_labels[addr]: 1443 content += [ "%s" % str(label) ] 1444 addr += 1 1445 1446 content += ['\t.byte %d' % c] 1447 1448 else: 1449 integer = struct.unpack(fmt_str, self.content[0])[0] 1450 content += ['\t{directive} {integer}'.format( 1451 directive=directive, 1452 integer='%#x' % integer, 1453 )] 1454 1455 s += "\n".join(content) 1456 s += "\n" 1457 1458 elif self.sort == 'fp': 1459 # we have to display it as bytes... 1460 # TODO: switch to "ten byes" whenever time permits 1461 content = [] 1462 1463 if symbolized: 1464 addr_to_labels = {} 1465 for k, v in self.labels: 1466 if k not in addr_to_labels: 1467 addr_to_labels[k] = [] 1468 addr_to_labels[k].append(v) 1469 1470 addr = self.addr if self.addr is not None else 0 1471 for piece in self.content: 1472 for c in piece: 1473 if addr in addr_to_labels: 1474 for label in addr_to_labels[addr]: 1475 content += [ "%s" % str(label) ] 1476 addr += 1 1477 1478 content += ['\t.byte %d' % c] 1479 else: 1480 for piece in self.content: 1481 content += ['\t.byte %d' % c for c in piece] 1482 1483 s += "\n".join(content) 1484 s += "\n" 1485 1486 else: 1487 content = [] 1488 1489 if symbolized: 1490 addr_to_labels = { } 1491 for k, v in self.labels: 1492 if k not in addr_to_labels: 1493 addr_to_labels[k] = [] 1494 addr_to_labels[k].append(v) 1495 1496 addr = self.addr if self.addr is not None else 0 1497 for piece in self.content: 1498 for c in piece: 1499 if addr in addr_to_labels: 1500 for label in addr_to_labels[addr]: 1501 content += [ "%s" % str(label) ] 1502 addr += 1 1503 1504 content += ['\t.byte %d' % c] 1505 else: 1506 for piece in self.content: 1507 content += [ '\t.byte %d' % c for c in piece ] 1508 1509 s += "\n".join(content) 1510 s += "\n" 1511 1512 if self.end_labels: 1513 for label in self.end_labels: 1514 s += "%s\n" % label 1515 1516 return s.strip("\n") 1517 1518 # 1519 # Private methods 1520 # 1521 1522 def _initialize(self): 1523 1524 if self.memory_data is None: 1525 1526 if self.size is None or self._initial_content is None and self.sort is None: 1527 raise BinaryError('You must at least specify size, initial_content, and sort.') 1528 1529 1530 if self.sort == 'pointer-array': 1531 1532 lbl = DataLabel(self.binary, -1, name=self.name) 1533 self.labels.append((0, lbl)) 1534 1535 # symbolize the pointer array 1536 1537 self._content = [ ] 1538 1539 fmt_str = "" 1540 if self.project.arch.memory_endness == 'Iend_LE': 1541 fmt_str += "<" 1542 else: 1543 fmt_str += ">" 1544 if self.project.arch.bits == 32: 1545 fmt_str += "I" 1546 pointer_size = 4 1547 else: 1548 fmt_str += "Q" 1549 pointer_size = 8 1550 1551 for i in range(0, len(self._initial_content), pointer_size): 1552 addr_str = self._initial_content[i : i + pointer_size] 1553 addr = struct.unpack(fmt_str, addr_str)[0] 1554 if addr != 0 and ( 1555 self.binary.main_executable_regions_contain(addr) or 1556 self.binary.main_nonexecutable_regions_contain(addr) 1557 ): 1558 label = self.binary.symbol_manager.new_label(addr) 1559 else: 1560 # it might be a pointer pointing to the binary base address or something 1561 # just keep it as it is 1562 # TODO: some more delicate logic should be applied here. For example, if the pointer is very 1563 # TODO: close to the beginning of .text, but after reassembling, it might be pointing to 1564 # TODO: somewhere inside .text. In this case we'd like to fix up the reference and make it 1565 # TODO: point to the beginning of .text minus an offset, instead of keeping the original header. 1566 label = addr 1567 self._content.append(label) 1568 1569 elif self.sort in ('string', 'unknown', 'integer'): 1570 1571 lbl = DataLabel(self.binary, -1, name=self.name) 1572 self.labels.append((0, lbl)) 1573 1574 self._content = [ self._initial_content ] 1575 1576 elif self.sort == 'segment-boundary': 1577 label = self.binary.symbol_manager.new_label(self.addr) 1578 self.labels.append((self.addr, label)) 1579 self._content = [] 1580 1581 else: 1582 raise BinaryError('Unsupported data sort "%s"' % self.sort) 1583 1584 else: 1585 self.addr = self.memory_data.address 1586 self.size = self.memory_data.size 1587 self.sort = self.memory_data.sort 1588 1589 # Symbolize the content 1590 if self.sort == 'pointer-array': 1591 # read out the address 1592 pointer_size = self.project.arch.bytes 1593 pointers = self.size // pointer_size 1594 1595 self._content = [] 1596 for i in range(pointers): 1597 addr = self.binary.fast_memory_load(self.addr + i * pointer_size, pointer_size, int, 1598 endness=self.project.arch.memory_endness 1599 ) 1600 if addr is None: 1601 continue 1602 obj = self.project.loader.find_object_containing(addr) 1603 if obj is self.project.loader.main_object: 1604 # a dynamic pointer 1605 if self.binary.main_executable_regions_contain(addr) or \ 1606 self.binary.main_nonexecutable_regions_contain(addr): 1607 label = self.binary.symbol_manager.new_label(addr) 1608 self._content.append(label) 1609 1610 self.binary.register_data_reference(self.addr + i * pointer_size, addr) 1611 1612 else: 1613 # it's a pointer pointing to a segment, but not any section. keep it as it is 1614 self._content.append(addr) 1615 else: 1616 # it's a static pointer. we should use the original pointer value. 1617 self._content.append(addr) 1618 1619 elif self.sort == 'string': 1620 data = self.binary.fast_memory_load(self.addr, self.size, bytes) 1621 if data[-1] == 0: 1622 self.null_terminated = True 1623 data = data[:-1] # remove the null-byte. we'll use .asciz for it instead. 1624 else: 1625 self.null_terminated = False 1626 1627 self._content = [data] 1628 1629 elif self.sort == 'integer': 1630 data = self.binary.fast_memory_load(self.addr, self.size, bytes) 1631 self._content = [ data ] 1632 1633 elif self.sort == 'segment-boundary': 1634 label = self.binary.symbol_manager.new_label(self.addr) 1635 self.labels.append((self.addr, label)) 1636 1637 self._content = [ ] 1638 1639 elif self.sort == 'fp': 1640 # floating-point integers 1641 # Python has some trouble in dealing with floating point numbers 1642 # just store them as bytes 1643 data = self.binary.fast_memory_load(self.addr, self.size, bytes) 1644 self._content = [ data ] 1645 1646 else: 1647 # other sorts 1648 content = self.binary.fast_memory_load(self.addr, self.size, bytes) 1649 if content is not None: 1650 self._content = [content] 1651 else: 1652 self._content = [] 1653 1654 1655class Relocation(object): 1656 def __init__(self, addr, ref_addr, sort): 1657 self.addr = addr 1658 self.ref_addr = ref_addr 1659 self.sort = sort 1660 1661 def __repr__(self): 1662 s = "<Reloc %s %#x (%#x)>" % (self.sort, self.addr, self.ref_addr) 1663 return s 1664 1665 1666class Reassembler(Analysis): 1667 """ 1668 High-level representation of a binary with a linear representation of all instructions and data regions. After 1669 calling "symbolize", it essentially acts as a binary reassembler. 1670 1671 Tested on CGC, x86 and x86-64 binaries. 1672 1673 Discliamer: The reassembler is an empirical solution. Don't be surprised if it does not work on some binaries. 1674 """ 1675 def __init__(self, syntax="intel", remove_cgc_attachments=True, log_relocations=True): 1676 1677 self.syntax = syntax 1678 self._remove_cgc_attachments = remove_cgc_attachments 1679 1680 self.symbol_manager = None 1681 self.cfg = None 1682 self._cgc_attachments_removed = False 1683 self.log_relocations = log_relocations 1684 1685 self.procedures = [ ] 1686 self.data = [ ] 1687 1688 self.extra_rodata = [ ] 1689 self.extra_data = [ ] 1690 1691 self._main_executable_regions = None 1692 self._main_nonexecutable_regions = None 1693 1694 self._symbolization_needed = True 1695 1696 # section names to alignments 1697 self._section_alignments = {} 1698 1699 # all instruction addresses 1700 self.all_insn_addrs = set() 1701 1702 self._relocations = [ ] 1703 1704 self._inserted_asm_before_label = defaultdict(list) 1705 self._inserted_asm_after_label = defaultdict(list) 1706 self._removed_instructions = set() 1707 1708 self._initialize() 1709 1710 # 1711 # Overridden predefined methods 1712 # 1713 1714 def __str__(self): 1715 """ 1716 Return a linear representation of all instructions in the binary 1717 :return: 1718 """ 1719 1720 s = "\n".join([str(proc) for proc in self.procedures]) 1721 1722 return s 1723 1724 # 1725 # Properties 1726 # 1727 @property 1728 def instructions(self): 1729 """ 1730 Get a list of all instructions in the binary 1731 1732 :return: A list of (address, instruction) 1733 :rtype: tuple 1734 """ 1735 1736 raise NotImplementedError() 1737 1738 @property 1739 def relocations(self): 1740 1741 return self._relocations 1742 1743 @property 1744 def inserted_asm_before_label(self): 1745 return self._inserted_asm_before_label 1746 1747 @property 1748 def inserted_asm_after_label(self): 1749 return self._inserted_asm_after_label 1750 1751 @property 1752 def main_executable_regions(self): 1753 """ 1754 1755 :return: 1756 """ 1757 1758 if self._main_executable_regions is None: 1759 self._main_executable_regions = [] 1760 1761 obj = self.project.loader.main_object 1762 1763 if obj.sections: 1764 for sec in obj.sections: 1765 if sec.is_executable: 1766 min_addr = sec.min_addr 1767 max_addr = sec.max_addr + 1 1768 if max_addr <= min_addr or min_addr == 0: 1769 continue 1770 self._main_executable_regions.append((min_addr, max_addr)) 1771 1772 else: 1773 for seg in obj.segments: 1774 if seg.is_executable: 1775 min_addr = seg.min_addr 1776 max_addr = seg.max_addr + 1 1777 self._main_executable_regions.append((min_addr, max_addr)) 1778 1779 return self._main_executable_regions 1780 1781 @property 1782 def main_nonexecutable_regions(self): 1783 """ 1784 1785 :return: 1786 """ 1787 1788 if self._main_nonexecutable_regions is None: 1789 self._main_nonexecutable_regions = [] 1790 1791 obj = self.project.loader.main_object 1792 1793 if obj.sections: 1794 for sec in obj.sections: 1795 if sec.name in {'.eh_frame', '.eh_frame_hdr'}: 1796 # hack for ELF binaries... 1797 continue 1798 if not sec.is_executable: 1799 min_addr = sec.min_addr 1800 max_addr = sec.max_addr + 1 1801 if max_addr <= min_addr or min_addr == 0: 1802 continue 1803 self._main_nonexecutable_regions.append((min_addr, max_addr)) 1804 1805 else: 1806 for seg in obj.segments: 1807 if not seg.is_executable: 1808 min_addr = seg.min_addr 1809 max_addr = seg.max_addr + 1 1810 self._main_nonexecutable_regions.append((min_addr, max_addr)) 1811 1812 return self._main_nonexecutable_regions 1813 1814 # 1815 # Public methods 1816 # 1817 1818 def section_alignment(self, section_name): 1819 """ 1820 Get the alignment for the specific section. If the section is not found, 16 is used as default. 1821 1822 :param str section_name: The section. 1823 :return: The alignment in bytes. 1824 :rtype: int 1825 """ 1826 1827 return self._section_alignments.get(section_name, 16) 1828 1829 def main_executable_regions_contain(self, addr): 1830 """ 1831 1832 :param addr: 1833 :return: 1834 """ 1835 for start, end in self.main_executable_regions: 1836 if start <= addr < end: 1837 return True 1838 return False 1839 1840 def main_executable_region_limbos_contain(self, addr): 1841 """ 1842 Sometimes there exists a pointer that points to a few bytes before the beginning of a section, or a few bytes 1843 after the beginning of the section. We take care of that here. 1844 1845 :param int addr: The address to check. 1846 :return: A 2-tuple of (bool, the closest base address) 1847 :rtype: tuple 1848 """ 1849 1850 TOLERANCE = 64 1851 1852 closest_region = None 1853 least_limbo = None 1854 1855 for start, end in self.main_executable_regions: 1856 if start - TOLERANCE <= addr < start: 1857 if least_limbo is None or start - addr < least_limbo: 1858 closest_region = (True, start) 1859 least_limbo = start - addr 1860 if end <= addr < end + TOLERANCE: 1861 if least_limbo is None or addr - end < least_limbo: 1862 closest_region = (True, end) 1863 least_limbo = addr - end 1864 1865 if closest_region is not None: 1866 return closest_region 1867 return (False, None) 1868 1869 def main_nonexecutable_regions_contain(self, addr): 1870 """ 1871 1872 :param int addr: The address to check. 1873 :return: True if the address is inside a non-executable region, False otherwise. 1874 :rtype: bool 1875 """ 1876 for start, end in self.main_nonexecutable_regions: 1877 if start <= addr < end: 1878 return True 1879 return False 1880 1881 def main_nonexecutable_region_limbos_contain(self, addr, tolerance_before=64, tolerance_after=64): 1882 """ 1883 Sometimes there exists a pointer that points to a few bytes before the beginning of a section, or a few bytes 1884 after the beginning of the section. We take care of that here. 1885 1886 :param int addr: The address to check. 1887 :return: A 2-tuple of (bool, the closest base address) 1888 :rtype: tuple 1889 """ 1890 1891 closest_region = None 1892 least_limbo = None 1893 1894 for start, end in self.main_nonexecutable_regions: 1895 if start - tolerance_before <= addr < start: 1896 if least_limbo is None or start - addr < least_limbo: 1897 closest_region = (True, start) 1898 least_limbo = start - addr 1899 if end <= addr < end + tolerance_after: 1900 if least_limbo is None or addr - end < least_limbo: 1901 closest_region = (True, end) 1902 least_limbo = addr - end 1903 1904 if closest_region is not None: 1905 return closest_region 1906 return False, None 1907 1908 def register_instruction_reference(self, insn_addr, ref_addr, sort, operand_offset): 1909 1910 if not self.log_relocations: 1911 return 1912 1913 addr = insn_addr + operand_offset 1914 r = Relocation(addr, ref_addr, sort) 1915 1916 self._relocations.append(r) 1917 1918 def register_data_reference(self, data_addr, ref_addr): 1919 1920 if not self.log_relocations: 1921 return 1922 1923 r = Relocation(data_addr, ref_addr, 'absolute') 1924 1925 self._relocations.append(r) 1926 1927 def add_label(self, name, addr): 1928 """ 1929 Add a new label to the symbol manager. 1930 1931 :param str name: Name of the label. 1932 :param int addr: Address of the label. 1933 :return: None 1934 """ 1935 1936 # set the label 1937 self._symbolization_needed = True 1938 1939 self.symbol_manager.new_label(addr, name=name, force=True) 1940 1941 def insert_asm(self, addr, asm_code, before_label=False): 1942 """ 1943 Insert some assembly code at the specific address. There must be an instruction starting at that address. 1944 1945 :param int addr: Address of insertion 1946 :param str asm_code: The assembly code to insert 1947 :return: None 1948 """ 1949 1950 if before_label: 1951 self._inserted_asm_before_label[addr].append(asm_code) 1952 else: 1953 self._inserted_asm_after_label[addr].append(asm_code) 1954 1955 def append_procedure(self, name, asm_code): 1956 """ 1957 Add a new procedure with specific name and assembly code. 1958 1959 :param str name: The name of the new procedure. 1960 :param str asm_code: The assembly code of the procedure 1961 :return: None 1962 """ 1963 1964 proc = Procedure(self, name=name, asm_code=asm_code) 1965 self.procedures.append(proc) 1966 1967 def append_data(self, name, initial_content, size, readonly=False, sort="unknown"): # pylint:disable=unused-argument 1968 """ 1969 Append a new data entry into the binary with specific name, content, and size. 1970 1971 :param str name: Name of the data entry. Will be used as the label. 1972 :param bytes initial_content: The initial content of the data entry. 1973 :param int size: Size of the data entry. 1974 :param bool readonly: If the data entry belongs to the readonly region. 1975 :param str sort: Type of the data. 1976 :return: None 1977 """ 1978 1979 if readonly: 1980 section_name = ".rodata" 1981 else: 1982 section_name = '.data' 1983 1984 if initial_content is None: 1985 initial_content = b"" 1986 initial_content = initial_content.ljust(size, b"\x00") 1987 data = Data(self, memory_data=None, section_name=section_name, name=name, initial_content=initial_content, 1988 size=size, sort=sort 1989 ) 1990 1991 if section_name == '.rodata': 1992 self.extra_rodata.append(data) 1993 else: 1994 self.extra_data.append(data) 1995 1996 def remove_instruction(self, ins_addr): 1997 """ 1998 1999 :param ins_addr: 2000 :return: 2001 """ 2002 2003 self._removed_instructions.add(ins_addr) 2004 2005 def randomize_procedures(self): 2006 """ 2007 2008 :return: 2009 """ 2010 2011 raise NotImplementedError() 2012 2013 def symbolize(self): 2014 2015 # clear the flag 2016 self._symbolization_needed = False 2017 2018 # sanity checks 2019 #if self._has_integer_used_as_pointers(): 2020 # raise ReassemblerFailureNotice('Integer-used-as-pointer detected. Reassembler will not work safely on ' 2021 # 'this binary. Ping Fish if you believe the detection is wrong.' 2022 # ) 2023 2024 for proc in self.procedures: 2025 proc.assign_labels() 2026 2027 for data in self.data: 2028 data.assign_labels() 2029 2030 # Get all instruction addresses, and modify those labels pointing to the middle of an instruction 2031 insn_addrs = [ ] 2032 for proc in self.procedures: # type: Procedure 2033 insn_addrs.extend(proc.instruction_addresses()) 2034 # just to be safe 2035 insn_addrs = sorted(set(insn_addrs), key=lambda x: x[0]) 2036 2037 pos = 0 2038 2039 changed_labels = [ ] 2040 2041 for label_addr in sorted(self.symbol_manager.addr_to_label.keys()): 2042 while pos < len(insn_addrs) and label_addr > insn_addrs[pos][0]: 2043 pos += 1 2044 2045 if pos >= len(insn_addrs): 2046 break 2047 2048 if pos == 0: 2049 continue 2050 2051 insn_addr, insn_size = insn_addrs[pos - 1] 2052 2053 if insn_addr < label_addr < insn_addr + insn_size: 2054 # this label should be converted to something like 0x8000040+1 2055 labels = self.symbol_manager.addr_to_label[label_addr] 2056 for label in labels: 2057 label.base_addr = insn_addrs[pos][0] 2058 changed_labels.append(label) 2059 2060 for label in changed_labels: 2061 self.symbol_manager.addr_to_label[label.original_addr].remove(label) 2062 if not self.symbol_manager.addr_to_label[label.original_addr]: 2063 del self.symbol_manager.addr_to_label[label.original_addr] 2064 self.symbol_manager.addr_to_label[label.base_addr].append(label) 2065 2066 if changed_labels: 2067 for proc in self.procedures: 2068 proc.assign_labels() 2069 2070 def assembly(self, comments=False, symbolized=True): 2071 2072 if symbolized and self._symbolization_needed: 2073 self.symbolize() 2074 2075 if self._remove_cgc_attachments: 2076 self._cgc_attachments_removed = self.remove_cgc_attachments() 2077 2078 s = "" 2079 2080 if self.syntax == 'intel': 2081 s += "\t.intel_syntax noprefix\n" 2082 2083 all_assembly_lines = [ ] 2084 2085 addr_and_assembly = [ ] 2086 for proc in self.procedures: 2087 addr_and_assembly.extend(proc.assembly(comments=comments, symbolized=symbolized)) 2088 # sort it by the address - must be a stable sort! 2089 addr_and_assembly = sorted(addr_and_assembly, key=lambda x: x[0] if x[0] is not None else -1) 2090 all_assembly_lines.extend(line for _, line in addr_and_assembly) 2091 2092 last_section = None 2093 2094 if self._cgc_attachments_removed: 2095 all_data = self.data + self.extra_rodata + self.extra_data 2096 else: 2097 # to reduce memory usage, we put extra data in front of the original data in binary 2098 all_data = self.extra_data + self.data + self.extra_rodata 2099 2100 for data in all_data: 2101 if last_section is None or data.section_name != last_section: 2102 last_section = data.section_name 2103 all_assembly_lines.append("\t.section {section}\n\t.align {alignment}".format( 2104 section=(last_section if last_section != '.init_array' else '.data'), 2105 alignment=self.section_alignment(last_section) 2106 )) 2107 all_assembly_lines.append(data.assembly(comments=comments, symbolized=symbolized)) 2108 2109 s = "\n".join(all_assembly_lines) 2110 2111 return s 2112 2113 def remove_cgc_attachments(self): 2114 """ 2115 Remove CGC attachments. 2116 2117 :return: True if CGC attachments are found and removed, False otherwise 2118 :rtype: bool 2119 """ 2120 2121 cgc_package_list = None 2122 cgc_extended_application = None 2123 2124 for data in self.data: 2125 if data.sort == 'cgc-package-list': 2126 cgc_package_list = data 2127 elif data.sort == 'cgc-extended-application': 2128 cgc_extended_application = data 2129 2130 if not cgc_package_list or not cgc_extended_application: 2131 return False 2132 2133 if cgc_package_list.skip or cgc_extended_application.skip: 2134 # they have already been removed 2135 # so we still return True to indicate that CGC attachments have been removed 2136 return True 2137 2138 # there is a single function referencing them 2139 cgcpl_memory_data = self.cfg.memory_data.get(cgc_package_list.addr, None) 2140 cgcea_memory_data = self.cfg.memory_data.get(cgc_extended_application.addr, None) 2141 refs = self.cfg.kb.xrefs 2142 2143 if cgcpl_memory_data is None or cgcea_memory_data is None: 2144 return False 2145 2146 if len(refs.get_xrefs_by_dst(cgcpl_memory_data.addr)) != 1: 2147 return False 2148 if len(refs.get_xrefs_by_dst(cgcea_memory_data.addr)) != 1: 2149 return False 2150 2151 # check if the irsb addresses are the same 2152 if next(iter(refs.get_xrefs_by_dst(cgcpl_memory_data.addr))).block_addr != \ 2153 next(iter(refs.get_xrefs_by_dst(cgcea_memory_data.addr))).block_addr: 2154 return False 2155 2156 insn_addr = next(iter(refs.get_xrefs_by_dst(cgcpl_memory_data.addr))).ins_addr 2157 # get the basic block 2158 cfg_node = self.cfg.model.get_any_node(insn_addr, anyaddr=True) 2159 if not cfg_node: 2160 return False 2161 2162 func_addr = cfg_node.function_address 2163 2164 # this function should be calling another function 2165 sub_func_addr = None 2166 if func_addr not in self.cfg.functions: 2167 return False 2168 function = self.cfg.functions[func_addr] 2169 # traverse the graph and make sure there is only one call edge 2170 calling_targets = [ ] 2171 for _, dst, data in function.transition_graph.edges(data=True): 2172 if 'type' in data and data['type'] == 'call': 2173 calling_targets.append(dst.addr) 2174 2175 if len(calling_targets) != 1: 2176 return False 2177 2178 sub_func_addr = calling_targets[0] 2179 2180 # alright. We want to nop this function, as well as the subfunction 2181 proc = next((p for p in self.procedures if p.addr == func_addr), None) 2182 if proc is None: 2183 return False 2184 2185 subproc = next((p for p in self.procedures if p.addr == sub_func_addr), None) 2186 if subproc is None: 2187 return False 2188 2189 # if those two data entries have any label, we should properly modify them 2190 # at this point, we are fairly confident that none of those labels are direct data references to either package 2191 # list or extended application 2192 has_label = True 2193 lowest_address = min(cgc_package_list.addr, cgc_extended_application.addr) 2194 for obj in (cgc_package_list, cgc_extended_application): 2195 labels = obj.labels 2196 for addr, label in labels: 2197 if addr != lowest_address: 2198 label.base_addr = lowest_address 2199 2200 if has_label: 2201 # is there any memory data entry that ends right at the lowest address? 2202 data = next((d for d in self.data if d.addr is not None and d.addr + d.size == lowest_address), None) 2203 if data is None: 2204 # since there is no gap between memory data entries (we guarantee that), this can only be that no other 2205 # data resides in the same memory region that CGC attachments are in 2206 pass 2207 else: 2208 lbl = self.symbol_manager.addr_to_label[lowest_address][0] 2209 if lbl not in data.end_labels: 2210 data.end_labels.append(lbl) 2211 2212 # practically nop the function 2213 proc.asm_code = "\tret\n" 2214 subproc.asm_code = "\tret\n" 2215 2216 # remove those two data entries 2217 cgc_package_list.skip = True 2218 cgc_extended_application.skip = True 2219 2220 l.info('CGC attachments are removed.') 2221 2222 return True 2223 2224 def remove_unnecessary_stuff(self): 2225 """ 2226 Remove unnecessary functions and data 2227 2228 :return: None 2229 """ 2230 2231 # determine if the binary is compiled against glibc 2232 is_glibc = False 2233 for dep in self.project.loader.main_object.deps: 2234 if dep.lower() in {'libc.so.6', 'libc.so'}: 2235 is_glibc = True 2236 break 2237 if is_glibc: 2238 self.remove_unnecessary_stuff_glibc() 2239 2240 def remove_unnecessary_stuff_glibc(self): 2241 glibc_functions_blacklist = { 2242 '_start', 2243 '_init', 2244 '_fini', 2245 '__gmon_start__', 2246 '__do_global_dtors_aux', 2247 'frame_dummy', 2248 'atexit', 2249 'deregister_tm_clones', 2250 'register_tm_clones', 2251 '__x86.get_pc_thunk.bx', 2252 '__libc_csu_init', 2253 '__libc_csu_fini', 2254 } 2255 2256 glibc_data_blacklist = { 2257 '__TMC_END__', 2258 '_GLOBAL_OFFSET_TABLE_', 2259 '__JCR_END__', 2260 '__dso_handle', 2261 '__init_array_start', 2262 '__init_array_end', 2263 2264 # 2265 'stdout', 2266 'stderr', 2267 'stdin', 2268 'program_invocation_short_', 2269 'program_invocation_short_name', 2270 'program_invocation_name', 2271 '__progname_full', 2272 '_IO_stdin_used', 2273 'obstack_alloc_failed_hand', 2274 'optind', 2275 'optarg', 2276 '__progname', 2277 '_environ', 2278 'environ', 2279 '__environ', 2280 } 2281 2282 glibc_references_blacklist = { 2283 'frame_dummy', 2284 '__do_global_dtors_aux', 2285 } 2286 2287 self.procedures = [p for p in self.procedures if p.name not in glibc_functions_blacklist and not p.is_plt] 2288 2289 self.data = [d for d in self.data if not any(lbl.name in glibc_data_blacklist for _, lbl in d.labels)] 2290 2291 for d in self.data: 2292 if d.sort == 'pointer-array': 2293 for i in range(len(d.content)): 2294 ptr = d.content[i] 2295 if isinstance(ptr, Label) and ptr.name in glibc_references_blacklist: 2296 d.content[i] = 0 2297 2298 # 2299 # Private methods 2300 # 2301 2302 def _initialize(self): 2303 """ 2304 Initialize the binary. 2305 2306 :return: None 2307 """ 2308 2309 # figure out section alignments 2310 for section in self.project.loader.main_object.sections: 2311 in_segment = False 2312 for segment in self.project.loader.main_object.segments: 2313 segment_addr = segment.vaddr 2314 if segment_addr <= section.vaddr < segment_addr + segment.memsize: 2315 in_segment = True 2316 break 2317 if not in_segment: 2318 continue 2319 2320 # calculate alignments 2321 if section.vaddr % 0x20 == 0: 2322 alignment = 0x20 2323 elif section.vaddr % 0x10 == 0: 2324 alignment = 0x10 2325 elif section.vaddr % 0x8 == 0: 2326 alignment = 0x8 2327 elif section.vaddr % 0x4 == 0: 2328 alignment = 0x4 2329 else: 2330 alignment = 2 2331 2332 self._section_alignments[section.name] = alignment 2333 2334 l.debug('Generating CFG...') 2335 cfg = self.project.analyses.CFG(normalize=True, resolve_indirect_jumps=True, data_references=True, 2336 extra_memory_regions=[(0x4347c000, 0x4347c000 + 0x1000)], 2337 data_type_guessing_handlers=[ 2338 self._sequence_handler, 2339 self._cgc_extended_application_handler, 2340 self._unknown_data_size_handler, 2341 ], 2342 ) 2343 2344 self.cfg = cfg 2345 2346 old_capstone_syntax = self.project.arch.capstone_x86_syntax 2347 if old_capstone_syntax is None: 2348 old_capstone_syntax = 'intel' 2349 2350 if self.syntax == 'at&t': 2351 # switch capstone to AT&T style 2352 self.project.arch.capstone_x86_syntax = "at&t" 2353 # clear the block cache in lifter! 2354 self.project.factory.default_engine.clear_cache() 2355 2356 # initialize symbol manager 2357 self.symbol_manager = SymbolManager(self, cfg) 2358 2359 # collect address of all instructions 2360 l.debug('Collecting instruction addresses...') 2361 for cfg_node in self.cfg.nodes(): 2362 self.all_insn_addrs |= set(cfg_node.instruction_addrs) 2363 2364 # Functions 2365 2366 l.debug('Creating functions...') 2367 for f in cfg.kb.functions.values(): 2368 # Skip all SimProcedures 2369 if self.project.is_hooked(f.addr): 2370 continue 2371 elif self.project.simos.is_syscall_addr(f.addr): 2372 continue 2373 2374 # Check which section the start address belongs to 2375 section = next(iter(sec.name for sec in self.project.loader.main_object.sections 2376 if f.addr >= sec.vaddr and f.addr < sec.vaddr + sec.memsize 2377 ), 2378 ".text" 2379 ) 2380 2381 if section in ('.got', '.plt', 'init', 'fini'): 2382 continue 2383 2384 procedure = Procedure(self, f, section=section) 2385 self.procedures.append(procedure) 2386 2387 self.procedures = sorted(self.procedures, key=lambda x: x.addr) 2388 2389 # Data 2390 2391 has_sections = len(self.project.loader.main_object.sections) > 0 2392 2393 l.debug('Creating data entries...') 2394 for addr, memory_data in cfg._memory_data.items(): 2395 2396 if memory_data.sort in ('code reference', ): 2397 continue 2398 2399 if memory_data.sort == 'string': 2400 # it might be the CGC package list 2401 new_sort, new_size = self._cgc_package_list_identifier(memory_data.address, memory_data.size) 2402 if new_sort is not None: 2403 # oh we got it! 2404 memory_data = memory_data.copy() 2405 memory_data.sort = new_sort 2406 2407 if has_sections: 2408 # Check which section the start address belongs to 2409 section = next(iter(sec for sec in self.project.loader.main_object.sections 2410 if sec.vaddr <= addr < sec.vaddr + sec.memsize 2411 ), 2412 None 2413 ) 2414 2415 if section is not None and section.name not in ('.note.gnu.build-id', ): # ignore certain section names 2416 data = Data(self, memory_data, section=section) 2417 self.data.append(data) 2418 elif memory_data.sort == 'segment-boundary': 2419 # it just points to the end of the segment or a section 2420 section = next(iter(sec for sec in self.project.loader.main_object.sections 2421 if addr == sec.vaddr + sec.memsize), 2422 None 2423 ) 2424 if section is not None: 2425 data = Data(self, memory_data, section=section) 2426 self.data.append(data) 2427 2428 else: 2429 # data = Data(self, memory_data, section_name='.data') 2430 # the data is not really within any existing section. weird. ignored it. 2431 pass 2432 else: 2433 # the binary does not have any section 2434 # we use segment information instead 2435 # TODO: this logic needs reviewing 2436 segment = next(iter(seg for seg in self.project.loader.main_object.segments 2437 if seg.vaddr <= addr <= seg.vaddr + seg.memsize 2438 ), 2439 None 2440 ) 2441 2442 if segment is not None: 2443 data = Data(self, memory_data, section_name='.data') 2444 self.data.append(data) 2445 2446 # remove all data that belong to GCC-specific sections 2447 section_names_to_ignore = {'.init', '.fini', '.fini_array', '.jcr', '.dynamic', '.got', '.got.plt', 2448 '.eh_frame_hdr', '.eh_frame', '.rel.dyn', '.rel.plt', '.rela.dyn', '.rela.plt', 2449 '.dynstr', '.dynsym', '.interp', '.note.ABI-tag', '.note.gnu.build-id', '.gnu.hash', 2450 '.gnu.version', '.gnu.version_r' 2451 } 2452 2453 # make sure there are always memory data entries pointing at the end of sections 2454 all_data_addrs = set(d.addr for d in self.data) 2455 all_procedure_addrs = set(f.addr for f in self.procedures) 2456 all_addrs = all_data_addrs | all_procedure_addrs 2457 2458 if has_sections: 2459 for section in self.project.loader.main_object.sections: 2460 2461 if section.name in section_names_to_ignore: 2462 # skip all sections that are CGC specific 2463 continue 2464 2465 # make sure this section is inside a segment 2466 for segment in self.project.loader.main_object.segments: 2467 segment_start = segment.vaddr 2468 segment_end = segment_start + segment.memsize 2469 if segment_start <= section.vaddr < segment_end: 2470 break 2471 else: 2472 # this section is not mapped into memory 2473 continue 2474 2475 section_boundary_addr = section.vaddr + section.memsize 2476 if section_boundary_addr not in all_addrs: 2477 data = Data(self, addr=section_boundary_addr, size=0, sort='segment-boundary', 2478 section_name=section.name 2479 ) 2480 self.data.append(data) 2481 # add the address to all_data_addrs so we don't end up adding another boundary in 2482 all_data_addrs.add(section_boundary_addr) 2483 2484 self.data = sorted(self.data, key=lambda x: x.addr) 2485 2486 data_indices_to_remove = set() 2487 2488 # Go through data entry list and refine them 2489 for i, data in enumerate(self.data): 2490 2491 if i in data_indices_to_remove: 2492 continue 2493 2494 # process the overlapping ones 2495 if i < len(self.data) - 1: 2496 if data.addr + data.size > self.data[i + 1].addr: 2497 # they are overlapping :-( 2498 2499 # TODO: make sure new_size makes sense 2500 new_size = self.data[i + 1].addr - data.addr 2501 2502 # there are cases that legit data is misclassified as pointers 2503 # we are able to detect some of them here 2504 if data.sort == 'pointer-array': 2505 pointer_size = self.project.arch.bytes 2506 if new_size % pointer_size != 0: 2507 # the self.data[i+1] cannot be pointed to by a pointer 2508 # remove that guy later 2509 data_indices_to_remove.add(i + 1) 2510 # mark the source as a non-pointer 2511 # apparently the original Reassembleable Disassembler paper cannot get this case 2512 source_addr = self.data[i + 1].memory_data.pointer_addr 2513 if source_addr is not None: 2514 # find the original data 2515 original_data = next((d for d in self.data if d.addr <= source_addr < d.addr + d.size), 2516 None 2517 ) 2518 if original_data is not None: 2519 original_data.desymbolize() 2520 2521 continue 2522 2523 data.shrink(new_size) 2524 2525 # process those ones whose type is unknown 2526 if data.sort == 'unknown' and data.size == 0: 2527 # increase its size until reaching the next item 2528 2529 if i + 1 == len(self.data): 2530 if data.section is None: 2531 continue 2532 data.size = data.section.vaddr + data.section.memsize - data.addr 2533 else: 2534 data.size = self.data[i + 1].addr - data.addr 2535 2536 for i in sorted(data_indices_to_remove, reverse=True): 2537 self.data = self.data[ : i] + self.data[i + 1 : ] 2538 2539 # CGC-specific data filtering 2540 self.data = [ d for d in self.data if d.section_name not in section_names_to_ignore ] 2541 2542 # restore capstone X86 syntax at the end 2543 if self.project.arch.capstone_x86_syntax != old_capstone_syntax: 2544 self.project.arch.capstone_x86_syntax = old_capstone_syntax 2545 self.project.factory.default_engine.clear_cache() 2546 2547 l.debug('Initialized.') 2548 2549 def _is_sequence(self, cfg, addr, size): 2550 data = self.fast_memory_load(addr, size, bytes) 2551 if data is None: 2552 return False 2553 ints = [i for i in data] 2554 if len(set([(i - j) for i, j in zip(ints, ints[1:])])) == 1: 2555 # arithmetic progression 2556 # backoff: it should not be ending with a pointer 2557 closest_aligned_addr = (addr + size - 1) & 0xfffffffc 2558 ptr = self.fast_memory_load(closest_aligned_addr, 4, int, endness=self.project.arch.memory_endness) 2559 if ptr is None: 2560 return False 2561 if self._is_pointer(cfg, ptr): 2562 return False 2563 return True 2564 return False 2565 2566 @staticmethod 2567 def _is_pointer(cfg, ptr): 2568 if cfg.project.loader.find_section_containing(ptr) is not None or \ 2569 cfg.project.loader.find_segment_containing(ptr) is not None or \ 2570 (cfg._extra_memory_regions and 2571 next(((a < ptr < b) for (a, b) in cfg._extra_memory_regions), None) 2572 ): 2573 return True 2574 return False 2575 2576 def _sequence_handler(self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size): # pylint:disable=unused-argument 2577 """ 2578 Find sequences in binary data. 2579 2580 :param angr.analyses.CFG cfg: The control flow graph. 2581 :param pyvex.IRSB irsb: The IRSB object. 2582 :param int irsb_addr: Address of the block. 2583 :param int stmt_idx: Statement ID. 2584 :param int data_addr: Address of the data in memory. 2585 :param int max_size: Maximum size possible. 2586 :return: A 2-tuple of data type and size. 2587 :rtype: tuple 2588 """ 2589 2590 if not self._is_sequence(cfg, data_addr, 5): 2591 # fail-fast 2592 return None, None 2593 2594 sequence_max_size = min(256, max_size) 2595 2596 for i in range(5, min(256, max_size)): 2597 if not self._is_sequence(cfg, data_addr, i): 2598 return 'sequence', i - 1 2599 2600 return 'sequence', sequence_max_size 2601 2602 def _cgc_package_list_identifier(self, data_addr, data_size): 2603 """ 2604 Identifies the CGC package list associated with the CGC binary. 2605 2606 :param int data_addr: Address of the data in memory. 2607 :param int data_size: Maximum size possible. 2608 :return: A 2-tuple of data type and size. 2609 :rtype: tuple 2610 """ 2611 2612 if data_size < 100: 2613 return None, None 2614 2615 data = self.fast_memory_load(data_addr, data_size, str) 2616 2617 if data[:10] != 'The DECREE': 2618 return None, None 2619 2620 if not all(i in string.printable for i in data): 2621 return None, None 2622 2623 if not re.match(r"The DECREE packages used in the creation of this challenge binary were:", data): 2624 return None, None 2625 2626 return 'cgc-package-list', data_size 2627 2628 def _cgc_extended_application_handler(self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size): # pylint:disable=unused-argument 2629 """ 2630 Identifies the extended application (a PDF file) associated with the CGC binary. 2631 2632 :param angr.analyses.CFG cfg: The control flow graph. 2633 :param pyvex.IRSB irsb: The IRSB object. 2634 :param int irsb_addr: Address of the block. 2635 :param int stmt_idx: Statement ID. 2636 :param int data_addr: Address of the data in memory. 2637 :param int max_size: Maximum size possible. 2638 :return: A 2-tuple of data type and size. 2639 :rtype: tuple 2640 """ 2641 2642 if max_size < 100: 2643 return None, None 2644 2645 data = self.fast_memory_load(data_addr, 20, bytes) 2646 2647 if data is not None and data[:4] != b'The ': 2648 return None, None 2649 2650 # read everything in 2651 data = self.fast_memory_load(data_addr, max_size, str) 2652 2653 m = re.match(r"The ([\d]+) byte CGC Extended Application follows.", data) 2654 if not m: 2655 return None, None 2656 pdf_size = int(m.group(1)) 2657 2658 if '%PDF' not in data: 2659 return None, None 2660 if '%%EOF' not in data: 2661 return None, None 2662 2663 pdf_data = data[data.index('%PDF') : data.index('%%EOF') + 6] 2664 2665 if len(pdf_data) != pdf_size: 2666 return None, None 2667 2668 return 'cgc-extended-application', max_size 2669 2670 def _unknown_data_size_handler(self, cfg, irsb, irsb_addr, stmt_idx, data_addr, max_size): # pylint:disable=unused-argument 2671 """ 2672 Return the maximum number of bytes until a potential pointer or a potential sequence is found. 2673 2674 :param angr.analyses.CFG cfg: The control flow graph. 2675 :param pyvex.IRSB irsb: The IRSB object. 2676 :param int irsb_addr: Address of the block. 2677 :param int stmt_idx: Statement ID. 2678 :param int data_addr: Address of the data in memory. 2679 :param int max_size: Maximum size possible. 2680 :return: A 2-tuple of data type and size. 2681 :rtype: tuple 2682 """ 2683 2684 sequence_offset = None 2685 2686 for offset in range(1, max_size): 2687 if self._is_sequence(cfg, data_addr + offset, 5): 2688 # a potential sequence is found 2689 sequence_offset = offset 2690 break 2691 2692 if sequence_offset is not None: 2693 if self.project.arch.bits == 32: 2694 max_size = min(max_size, sequence_offset) 2695 elif self.project.arch.bits == 64: 2696 max_size = min(max_size, sequence_offset + 5) # high 5 bytes might be all zeros... 2697 2698 ptr_size = cfg.project.arch.bytes 2699 2700 size = None 2701 2702 for offset in range(1, max_size - ptr_size + 1): 2703 ptr = self.fast_memory_load(data_addr + offset, ptr_size, int, endness=cfg.project.arch.memory_endness) 2704 if self._is_pointer(cfg, ptr): 2705 size = offset 2706 break 2707 2708 if size is not None: 2709 return "unknown", size 2710 elif sequence_offset is not None: 2711 return "unknown", sequence_offset 2712 else: 2713 return None, None 2714 2715 def _has_integer_used_as_pointers(self): 2716 """ 2717 Test if there is any (suspicious) pointer decryption in the code. 2718 2719 :return: True if there is any pointer decryption, False otherwise. 2720 :rtype: bool 2721 """ 2722 2723 # check all integer accesses and see if there is any integer being used as a pointer later, but it wasn't 2724 # classified as a pointer reference 2725 2726 # we only care about unknown memory data that are 4 bytes long, and is directly referenced from an IRSB 2727 candidates = [ i for i in self.cfg.memory_data.values() if 2728 i.sort in ('unknown', 'integer') and 2729 i.size == self.project.arch.bytes and 2730 i.irsb_addr is not None 2731 ] 2732 2733 if not candidates: 2734 return False 2735 2736 for candidate in candidates: 2737 2738 # if the candidate is in .bss, we don't care about it 2739 sec = self.cfg.project.loader.find_section_containing(candidate.address) 2740 if sec.name in ('.bss', '.got.plt'): 2741 continue 2742 2743 # execute the single basic block and see how the value is used 2744 base_graph = networkx.DiGraph() 2745 candidate_node = self.cfg.model.get_any_node(candidate.irsb_addr) # type: angr.analyses.cfg_node.CFGNode 2746 if candidate_node is None: 2747 continue 2748 base_graph.add_node(candidate_node) 2749 tmp_kb = KnowledgeBase(self.project) 2750 cfg = self.project.analyses.CFGEmulated(kb=tmp_kb, 2751 starts=(candidate.irsb_addr,), 2752 keep_state=True, 2753 base_graph=base_graph 2754 ) 2755 candidate_irsb = cfg.get_any_irsb(candidate.irsb_addr) # type: SimIRSB 2756 ddg = self.project.analyses.DDG(kb=tmp_kb, cfg=cfg) 2757 2758 mem_var_node = None 2759 for node in ddg.simplified_data_graph.nodes(): 2760 if isinstance(node.variable, SimMemoryVariable) and node.location.ins_addr == candidate.insn_addr: 2761 # found it! 2762 mem_var_node = node 2763 break 2764 else: 2765 # mem_var_node is not found 2766 continue 2767 2768 # get a sub graph 2769 subgraph = ddg.data_sub_graph(mem_var_node, 2770 simplified=False, 2771 killing_edges=False, 2772 excluding_types={'mem_addr'}, 2773 ) 2774 2775 # is it used as a memory address anywhere? 2776 # TODO: 2777 2778 # is it used as a jump target? 2779 next_tmp = None 2780 if isinstance(candidate_irsb.irsb.next, pyvex.IRExpr.RdTmp): 2781 next_tmp = candidate_irsb.irsb.next.tmp 2782 2783 if next_tmp is not None: 2784 next_tmp_node = next((node for node in subgraph.nodes() 2785 if isinstance(node.variable, SimTemporaryVariable) and 2786 node.variable.tmp_id == next_tmp), 2787 None 2788 ) 2789 if next_tmp_node is not None: 2790 # ouch it's used as a jump target 2791 return True 2792 2793 return False 2794 2795 def fast_memory_load(self, addr, size, data_type, endness='Iend_LE'): 2796 """ 2797 Load memory bytes from loader's memory backend. 2798 2799 :param int addr: The address to begin memory loading. 2800 :param int size: Size in bytes. 2801 :param data_type: Type of the data. 2802 :param str endness: Endianness of this memory load. 2803 :return: Data read out of the memory. 2804 :rtype: int or bytes or str or None 2805 """ 2806 2807 if data_type is int: 2808 try: 2809 return self.project.loader.memory.unpack_word(addr, size=size, endness=endness) 2810 except KeyError: 2811 return None 2812 2813 try: 2814 data = self.project.loader.memory.load(addr, size) 2815 if data_type is str: 2816 return "".join(chr(i) for i in data) 2817 return data 2818 except KeyError: 2819 return None 2820 2821 2822from angr.analyses import AnalysesHub 2823AnalysesHub.register_default('Reassembler', Reassembler) 2824