1import os 2import logging 3import networkx 4import string 5import itertools 6from collections import defaultdict 7from typing import Union, Optional, Iterable, Set 8from typing import Type # For some reasons the linter doesn't recognize the use in apply_definition but PyCharm needs it imported to correctly recognize it # pylint: disable=unused-import 9 10from itanium_demangler import parse 11 12from cle.backends.symbol import Symbol 13from archinfo.arch_arm import get_real_address_if_arm 14import claripy 15 16from ...codenode import CodeNode, BlockNode, HookNode, SyscallNode 17from ...serializable import Serializable 18from ...errors import AngrValueError, SimEngineError, SimMemoryError 19from ...procedures import SIM_LIBRARIES 20from ...protos import function_pb2 21from ...calling_conventions import DEFAULT_CC 22from .function_parser import FunctionParser 23 24l = logging.getLogger(name=__name__) 25 26from ...sim_type import SimTypeFunction, parse_defns 27from ...calling_conventions import SimCC 28from ...project import Project 29 30 31class Function(Serializable): 32 """ 33 A representation of a function and various information about it. 34 """ 35 36 __slots__ = ('transition_graph', '_local_transition_graph', 'normalized', '_ret_sites', '_jumpout_sites', 37 '_callout_sites', '_endpoints', '_call_sites', '_retout_sites', 'addr', '_function_manager', 38 'is_syscall', '_project', 'is_plt', 'addr', 'is_simprocedure', '_name', 'binary_name', 39 '_argument_registers', '_argument_stack_variables', 40 'bp_on_stack', 'retaddr_on_stack', 'sp_delta', '_cc', '_prototype', '_returning', 41 'prepared_registers', 'prepared_stack_variables', 'registers_read_afterwards', 42 'startpoint', '_addr_to_block_node', '_block_sizes', '_block_cache', '_local_blocks', 43 '_local_block_addrs', 'info', 'tags', 'alignment', 44 ) 45 46 def __init__(self, function_manager, addr, name=None, syscall=None, is_simprocedure=None, binary_name=None, 47 is_plt=None, returning=None, alignment=False): 48 """ 49 Function constructor. If the optional parameters are not provided, they will be automatically determined upon 50 the creation of a Function object. 51 52 :param addr: The address of the function. 53 54 The following parameters are optional. 55 56 :param str name: The name of the function. 57 :param bool syscall: Whether this function is a syscall or not. 58 :param bool is_simprocedure: Whether this function is a SimProcedure or not. 59 :param str binary_name: Name of the binary where this function is. 60 :param bool is_plt: If this function is a PLT entry. 61 :param bool returning: If this function returns. 62 :param bool alignment: If this function acts as an alignment filler. Such functions usually only contain nops. 63 """ 64 self.transition_graph = networkx.DiGraph() 65 self._local_transition_graph = None 66 self.normalized = False 67 68 # block nodes at whose ends the function returns 69 self._ret_sites = set() 70 # block nodes at whose ends the function jumps out to another function (jumps outside) 71 self._jumpout_sites = set() 72 # block nodes at whose ends the function calls out to another non-returning function 73 self._callout_sites = set() 74 # block nodes that ends the function by returning out to another function (returns outside). This is rare. 75 self._retout_sites = set() 76 # block nodes (basic block nodes) at whose ends the function terminates 77 # in theory, if everything works fine, endpoints == ret_sites | jumpout_sites | callout_sites 78 self._endpoints = defaultdict(set) 79 80 self._call_sites = {} 81 self.addr = addr 82 # startpoint can be None if the corresponding CFGNode is a syscall node 83 self.startpoint = None 84 self._function_manager = function_manager 85 self.is_syscall = None 86 self.is_plt = None 87 self.is_simprocedure = False 88 self.alignment = alignment 89 90 # These properties are set by VariableManager 91 self.bp_on_stack = False 92 self.retaddr_on_stack = False 93 self.sp_delta = 0 94 # Calling convention 95 self._cc = None # type: Optional[SimCC] 96 # Function prototype 97 self._prototype = None # type: Optional[SimTypeFunction] 98 # Whether this function returns or not. `None` means it's not determined yet 99 self._returning = None 100 self.prepared_registers = set() 101 self.prepared_stack_variables = set() 102 self.registers_read_afterwards = set() 103 104 self._addr_to_block_node = {} # map addresses to nodes. it's a cache of blocks. if a block is removed from the 105 # function, it may not be removed from _addr_to_block_node. if you want to list 106 # all blocks of a function, access .blocks. 107 self._block_sizes = {} # map addresses to block sizes 108 self._block_cache = {} # a cache of real, hard data Block objects 109 self._local_blocks = {} # a dict of all blocks inside the function 110 self._local_block_addrs = set() # a set of addresses of all blocks inside the function 111 112 self.info = {} # storing special information, like $gp values for MIPS32 113 self.tags = tuple() # store function tags. can be set manually by performing CodeTagging analysis. 114 115 # TODO: Can we remove the following two members? 116 # Register offsets of those arguments passed in registers 117 self._argument_registers = [] 118 # Stack offsets of those arguments passed in stack variables 119 self._argument_stack_variables = [] 120 121 self._project = None # type: Optional[Project] # will be initialized upon the first access to self.project 122 123 # 124 # Initialize unspecified properties 125 # 126 127 if syscall is not None: 128 self.is_syscall = syscall 129 else: 130 if self.project is None: 131 raise ValueError("'syscall' must be specified if you do not specify a function manager for this new" 132 " function." ) 133 134 # Determine whether this function is a syscall or not 135 self.is_syscall = self.project.simos.is_syscall_addr(addr) 136 137 # Determine whether this function is a SimProcedure 138 if is_simprocedure is not None: 139 self.is_simprocedure = is_simprocedure 140 else: 141 if self.project is None: 142 raise ValueError("'is_simprocedure' must be specified if you do not specify a function manager for this" 143 " new function.") 144 145 if self.is_syscall or self.project.is_hooked(addr): 146 self.is_simprocedure = True 147 148 # Determine if this function is a PLT entry 149 if is_plt is not None: 150 self.is_plt = is_plt 151 else: 152 # Whether this function is a PLT entry or not is fully relying on the PLT detection in CLE 153 if self.project is None: 154 raise ValueError("'is_plt' must be specified if you do not specify a function manager for this new" 155 " function.") 156 157 self.is_plt = self.project.loader.find_plt_stub_name(addr) is not None 158 159 # Determine the name of this function 160 if name is None: 161 self._name = self._get_initial_name() 162 else: 163 self._name = name 164 165 # Determine the name the binary where this function is. 166 if binary_name is not None: 167 self.binary_name = binary_name 168 else: 169 self.binary_name = self._get_initial_binary_name() 170 171 # Determine returning status for SimProcedures and Syscalls 172 if returning is not None: 173 self._returning = returning 174 else: 175 if self.project is None: 176 raise ValueError("'returning' must be specified if you do not specify a functio nmnager for this new" 177 " function.") 178 179 self._returning = self._get_initial_returning() 180 181 # Determine a calling convention 182 # If it is a SimProcedure it might have a CC already defined which can be used 183 if self.is_simprocedure and self.project is not None and self.addr in self.project._sim_procedures: 184 simproc = self.project._sim_procedures[self.addr] 185 cc = simproc.cc 186 if cc is None: 187 arch = self.project.arch 188 if self.project.arch.name in DEFAULT_CC: 189 cc = DEFAULT_CC[arch.name](arch) 190 191 # update cc.args according to num_args 192 # TODO: Handle non-traditional arguments like fp 193 if cc is not None and not cc.args and simproc.num_args: 194 args = cc.arg_locs(is_fp=[False] * simproc.num_args) # arg_locs() uses cc.args 195 cc.args = args 196 197 self.calling_convention = cc 198 else: 199 self.calling_convention = None 200 201 @property 202 def name(self): 203 return self._name 204 205 @name.setter 206 def name(self, v): 207 self._name = v 208 self._function_manager._kb.labels[self.addr] = v 209 210 @property 211 def project(self): 212 if self._project is None: 213 # try to set it from function manager 214 if self._function_manager is not None: 215 self._project = self._function_manager._kb._project #type: Optional[Project] 216 return self._project 217 218 @property 219 def returning(self): 220 return self._returning 221 222 @returning.setter 223 def returning(self, v): 224 self._returning = v 225 226 @property 227 def blocks(self): 228 """ 229 An iterator of all local blocks in the current function. 230 231 :return: angr.lifter.Block instances. 232 """ 233 234 for block_addr, block in self._local_blocks.items(): 235 try: 236 yield self.get_block(block_addr, size=block.size, 237 byte_string=block.bytestr if isinstance(block, BlockNode) else None) 238 except (SimEngineError, SimMemoryError): 239 pass 240 241 @property 242 def block_addrs(self): 243 """ 244 An iterator of all local block addresses in the current function. 245 246 :return: block addresses. 247 """ 248 249 return self._local_blocks.keys() 250 251 @property 252 def block_addrs_set(self): 253 """ 254 Return a set of block addresses for a better performance of inclusion tests. 255 256 :return: A set of block addresses. 257 :rtype: set 258 """ 259 260 return self._local_block_addrs 261 262 def get_block(self, addr, size=None, byte_string=None): 263 if addr in self._block_cache: 264 b = self._block_cache[addr] 265 if size is None or b.size == size: 266 return b 267 else: 268 # size seems to be updated. remove this cached entry from the block cache 269 del self._block_cache[addr] 270 271 if size is None and addr in self.block_addrs: 272 # we know the size 273 size = self._block_sizes[addr] 274 275 block = self._project.factory.block(addr, size=size, byte_string=byte_string) 276 if size is None: 277 # update block_size dict 278 self._block_sizes[addr] = block.size 279 self._block_cache[addr] = block 280 return block 281 282 # compatibility 283 _get_block = get_block 284 285 @property 286 def nodes(self): 287 return self.transition_graph.nodes() 288 289 def get_node(self, addr): 290 return self._addr_to_block_node.get(addr, None) 291 292 @property 293 def has_unresolved_jumps(self): 294 for addr in self.block_addrs: 295 if addr in self._function_manager._kb.unresolved_indirect_jumps: 296 b = self._function_manager._kb._project.factory.block(addr) 297 if b.vex.jumpkind == 'Ijk_Boring': 298 return True 299 return False 300 301 @property 302 def has_unresolved_calls(self): 303 for addr in self.block_addrs: 304 if addr in self._function_manager._kb.unresolved_indirect_jumps: 305 b = self._function_manager._kb._project.factory.block(addr) 306 if b.vex.jumpkind == 'Ijk_Call': 307 return True 308 return False 309 310 @property 311 def operations(self): 312 """ 313 All of the operations that are done by this functions. 314 """ 315 return [op for block in self.blocks for op in block.vex.operations] 316 317 @property 318 def code_constants(self): 319 """ 320 All of the constants that are used by this functions's code. 321 """ 322 # TODO: remove link register values 323 return [const.value for block in self.blocks for const in block.vex.constants] 324 325 @property 326 def calling_convention(self): 327 """ 328 Get the calling convention of this function. 329 330 :return: The calling convention of this function. 331 :rtype: Optional[SimCC] 332 """ 333 return self._cc 334 335 @calling_convention.setter 336 def calling_convention(self, v): 337 """ 338 Set the calling convention of this function. If the new cc has a function prototype, we will clear 339 self._prototype. Otherwise, if self.prototype is set, we will use it to update the function prototype of the new 340 cc, and then clear self._prototype. A warning message will be generated in either case. 341 342 :param Optional[SimCC] v: The new calling convention. 343 :return: None 344 """ 345 self._cc = v 346 347 if self._cc is not None: 348 if self._cc.func_ty is None and self._prototype is not None: 349 l.warning("The new calling convention for %r does not have a prototype associated. Using the existing " 350 "function prototype to update the new calling convention. The existing function prototype " 351 "will be removed.", self) 352 self._cc.set_func_type_with_arch(self._prototype) 353 self._prototype = None 354 elif self._cc.func_ty is not None and self._prototype is not None: 355 l.warning("The new calling convention for %r already has a prototype associated. The existing function " 356 "prototype will be removed.", self) 357 self._prototype = None 358 359 @property 360 def prototype(self): 361 """ 362 Get the prototype of this function. We prioritize the function prototype that is set in self.calling_convention. 363 364 :return: The function prototype. 365 :rtype: Optional[SimTypeFunction] 366 """ 367 if self._cc: 368 return self._cc.func_ty 369 else: 370 return self._prototype 371 372 @prototype.setter 373 def prototype(self, proto): 374 """ 375 Set a new prototype to this function. If a calling convention is already set to this function, the new prototype 376 will be set to this calling convention instead. 377 378 :param Optional[SimTypeFunction] proto: The new prototype. 379 :return: None 380 """ 381 if self._cc: 382 self._cc.set_func_type_with_arch(proto) 383 else: 384 self._prototype = proto 385 386 @classmethod 387 def _get_cmsg(cls): 388 return function_pb2.Function() 389 390 def serialize_to_cmessage(self): 391 return FunctionParser.serialize(self) 392 393 @classmethod 394 def parse_from_cmessage(cls, cmsg, **kwargs): 395 """ 396 :param cmsg: 397 398 :return Function: The function instantiated out of the cmsg data. 399 """ 400 return FunctionParser.parse_from_cmsg(cmsg, **kwargs) 401 402 403 def string_references(self, minimum_length=2, vex_only=False): 404 """ 405 All of the constant string references used by this function. 406 407 :param minimum_length: The minimum length of strings to find (default is 1) 408 :param vex_only: Only analyze VEX IR, don't interpret the entry state to detect additional constants. 409 :return: A list of tuples of (address, string) where is address is the location of the string in 410 memory. 411 """ 412 strings = [] 413 memory = self._project.loader.memory 414 415 # get known instruction addresses and call targets 416 # these addresses cannot be string references, but show up frequently in the runtime values 417 known_executable_addresses = set() 418 for block in self.blocks: 419 known_executable_addresses.update(block.instruction_addrs) 420 for function in self._function_manager.values(): 421 known_executable_addresses.update(set(x.addr for x in function.graph.nodes())) 422 423 # loop over all local runtime values and check if the value points to a printable string 424 for addr in self.local_runtime_values if not vex_only else self.code_constants: 425 if not isinstance(addr, claripy.fp.FPV) and addr in memory: 426 # check that the address isn't an pointing to known executable code 427 # and that it isn't an indirect pointer to known executable code 428 try: 429 possible_pointer = memory.unpack_word(addr) 430 if addr not in known_executable_addresses and possible_pointer not in known_executable_addresses: 431 # build string 432 stn = "" 433 offset = 0 434 current_char = chr(memory[addr + offset]) 435 while current_char in string.printable: 436 stn += current_char 437 offset += 1 438 current_char = chr(memory[addr + offset]) 439 440 # check that the string was a null terminated string with minimum length 441 if current_char == "\x00" and len(stn) >= minimum_length: 442 strings.append((addr, stn)) 443 except KeyError: 444 pass 445 return strings 446 447 @property 448 def local_runtime_values(self): 449 """ 450 Tries to find all runtime values of this function which do not come from inputs. 451 These values are generated by starting from a blank state and reanalyzing the basic blocks once each. 452 Function calls are skipped, and back edges are never taken so these values are often unreliable, 453 This function is good at finding simple constant addresses which the function will use or calculate. 454 455 :return: a set of constants 456 """ 457 constants = set() 458 459 if not self._project.loader.main_object.contains_addr(self.addr): 460 return constants 461 462 # FIXME the old way was better for architectures like mips, but we need the initial irsb 463 # reanalyze function with a new initial state (use persistent registers) 464 # initial_state = self._function_manager._cfg.get_any_irsb(self.addr).initial_state 465 # fresh_state = self._project.factory.blank_state(mode="fastpath") 466 # for reg in initial_state.arch.persistent_regs + ['ip']: 467 # fresh_state.registers.store(reg, initial_state.registers.load(reg)) 468 469 # reanalyze function with a new initial state 470 fresh_state = self._project.factory.blank_state(mode="fastpath") 471 fresh_state.regs.ip = self.addr 472 473 graph_addrs = set(x.addr for x in self.graph.nodes() if isinstance(x, BlockNode)) 474 475 # process the nodes in a breadth-first order keeping track of which nodes have already been analyzed 476 analyzed = set() 477 q = [fresh_state] 478 analyzed.add(fresh_state.solver.eval(fresh_state.ip)) 479 while len(q) > 0: 480 state = q.pop() 481 # make sure its in this function 482 if state.solver.eval(state.ip) not in graph_addrs: 483 continue 484 # don't trace into simprocedures 485 if self._project.is_hooked(state.solver.eval(state.ip)): 486 continue 487 # don't trace outside of the binary 488 if not self._project.loader.main_object.contains_addr(state.solver.eval(state.ip)): 489 continue 490 # don't trace unreachable blocks 491 if state.history.jumpkind in {'Ijk_EmWarn', 'Ijk_NoDecode', 492 'Ijk_MapFail', 'Ijk_NoRedir', 493 'Ijk_SigTRAP', 'Ijk_SigSEGV', 494 'Ijk_ClientReq'}: 495 continue 496 497 curr_ip = state.solver.eval(state.ip) 498 499 # get runtime values from logs of successors 500 successors = self._project.factory.successors(state) 501 for succ in successors.flat_successors + successors.unsat_successors: 502 for a in succ.history.recent_actions: 503 for ao in a.all_objects: 504 if not isinstance(ao.ast, claripy.ast.Base): 505 constants.add(ao.ast) 506 elif not ao.ast.symbolic: 507 constants.add(succ.solver.eval(ao.ast)) 508 509 # add successors to the queue to analyze 510 if not succ.solver.symbolic(succ.ip): 511 succ_ip = succ.solver.eval(succ.ip) 512 if succ_ip in self and succ_ip not in analyzed: 513 analyzed.add(succ_ip) 514 q.insert(0, succ) 515 516 # force jumps to missing successors 517 # (this is a slightly hacky way to force it to explore all the nodes in the function) 518 node = self.get_node(curr_ip) 519 if node is None: 520 # the node does not exist. maybe it's not a block node. 521 continue 522 missing = set(x.addr for x in list(self.graph.successors(node))) - analyzed 523 for succ_addr in missing: 524 l.info("Forcing jump to missing successor: %#x", succ_addr) 525 if succ_addr not in analyzed: 526 all_successors = successors.unconstrained_successors + \ 527 successors.flat_successors + \ 528 successors.unsat_successors 529 if len(all_successors) > 0: 530 # set the ip of a copied successor to the successor address 531 succ = all_successors[0].copy() 532 succ.ip = succ_addr 533 analyzed.add(succ_addr) 534 q.insert(0, succ) 535 else: 536 l.warning("Could not reach successor: %#x", succ_addr) 537 538 return constants 539 540 @property 541 def num_arguments(self): 542 return len(self._argument_registers) + len(self._argument_stack_variables) 543 544 def __contains__(self, val): 545 if isinstance(val, int): 546 return val in self._block_sizes 547 else: 548 return False 549 550 def __str__(self): 551 s = 'Function %s [%s]\n' % (self.name, self.addr) 552 s += ' Syscall: %s\n' % self.is_syscall 553 s += ' SP difference: %d\n' % self.sp_delta 554 s += ' Has return: %s\n' % self.has_return 555 s += ' Returning: %s\n' % ('Unknown' if self.returning is None else self.returning) 556 s += ' Alignment: %s\n' % (self.alignment) 557 s += ' Arguments: reg: %s, stack: %s\n' % \ 558 (self._argument_registers, 559 self._argument_stack_variables) 560 s += ' Blocks: [%s]\n' % ", ".join(['%#x' % i for i in self.block_addrs]) 561 s += " Calling convention: %s" % self.calling_convention 562 return s 563 564 def __repr__(self): 565 if self.is_syscall: 566 return '<Syscall function %s (%s)>' % (self.name, 567 hex(self.addr) if isinstance(self.addr, int) else self.addr) 568 return '<Function %s (%s)>' % (self.name, hex(self.addr) if isinstance(self.addr, int) else self.addr) 569 570 @property 571 def endpoints(self): 572 return list(itertools.chain(*self._endpoints.values())) 573 574 @property 575 def endpoints_with_type(self): 576 return self._endpoints 577 578 @property 579 def ret_sites(self): 580 return list(self._ret_sites) 581 582 @property 583 def jumpout_sites(self): 584 return list(self._jumpout_sites) 585 586 @property 587 def retout_sites(self): 588 return list(self._retout_sites) 589 590 @property 591 def callout_sites(self): 592 return list(self._callout_sites) 593 594 @property 595 def size(self): 596 return sum([ b.size for b in self.blocks ]) 597 598 @property 599 def binary(self): 600 """ 601 Get the object this function belongs to. 602 :return: The object this function belongs to. 603 """ 604 605 return self._project.loader.find_object_containing(self.addr, membership_check=False) 606 607 @property 608 def offset(self) -> int: 609 """ 610 :return: the function's binary offset (i.e., non-rebased address) 611 """ 612 return self.addr - self.binary.mapped_base 613 614 @property 615 def symbol(self) -> Union[None, Symbol]: 616 """ 617 :return: the function's Symbol, if any 618 """ 619 return self.binary.loader.find_symbol(self.addr) 620 621 def add_jumpout_site(self, node): 622 """ 623 Add a custom jumpout site. 624 625 :param node: The address of the basic block that control flow leaves during this transition. 626 :return: None 627 """ 628 629 self._register_nodes(True, node) 630 self._jumpout_sites.add(node) 631 self._add_endpoint(node, 'transition') 632 633 def add_retout_site(self, node): 634 """ 635 Add a custom retout site. 636 637 Retout (returning to outside of the function) sites are very rare. It mostly occurs during CFG recovery when we 638 incorrectly identify the beginning of a function in the first iteration, and then correctly identify that 639 function later in the same iteration (function alignments can lead to this bizarre case). We will mark all edges 640 going out of the header of that function as a outside edge, because all successors now belong to the 641 incorrectly-identified function. This identification error will be fixed in the second iteration of CFG 642 recovery. However, we still want to keep track of jumpouts/retouts during the first iteration so other logic in 643 CFG recovery still work. 644 645 :param node: The address of the basic block that control flow leaves the current function after a call. 646 :return: None 647 """ 648 649 self._register_nodes(True, node) 650 self._retout_sites.add(node) 651 self._add_endpoint(node, 'return') 652 653 def _get_initial_name(self): 654 """ 655 Determine the most suitable name of the function. 656 657 :return: The initial function name. 658 :rtype: string 659 """ 660 661 name = None 662 addr = self.addr 663 664 # Try to get a name from existing labels 665 if self._function_manager is not None: 666 if addr in self._function_manager._kb.labels: 667 name = self._function_manager._kb.labels[addr] 668 669 # try to get the name from a hook 670 if name is None and self.project is not None: 671 project = self.project 672 if project.is_hooked(addr): 673 hooker = project.hooked_by(addr) 674 name = hooker.display_name 675 elif project.simos.is_syscall_addr(addr): 676 syscall_inst = project.simos.syscall_from_addr(addr) 677 name = syscall_inst.display_name 678 679 # generate an IDA-style sub_X name 680 if name is None: 681 name = 'sub_%x' % addr 682 683 return name 684 685 def _get_initial_binary_name(self): 686 """ 687 Determine the name of the binary where this function is. 688 689 :return: None 690 """ 691 692 binary_name = None 693 694 # if this function is a simprocedure but not a syscall, use its library name as 695 # its binary name 696 # if it is a syscall, fall back to use self.binary.binary which explicitly says cle##kernel 697 if self.project and self.is_simprocedure and not self.is_syscall: 698 hooker = self.project.hooked_by(self.addr) 699 if hooker is not None: 700 binary_name = hooker.library_name 701 702 if binary_name is None and self.binary is not None and self.binary.binary: 703 binary_name = os.path.basename(self.binary.binary) 704 705 return binary_name 706 707 def _get_initial_returning(self): 708 """ 709 Determine if this function returns or not *if it is hooked by a SimProcedure or a user hook*. 710 711 :return: True if the hooker returns, False otherwise. 712 :rtype: bool 713 """ 714 715 hooker = None 716 if self.is_syscall: 717 hooker = self.project.simos.syscall_from_addr(self.addr) 718 elif self.is_simprocedure: 719 hooker = self.project.hooked_by(self.addr) 720 if hooker and hasattr(hooker, 'NO_RET'): 721 return not hooker.NO_RET 722 723 # Cannot determine 724 return None 725 726 def _clear_transition_graph(self): 727 self._block_cache = {} 728 self._block_sizes = {} 729 self.startpoint = None 730 self.transition_graph = networkx.DiGraph() 731 self._local_transition_graph = None 732 733 def _confirm_fakeret(self, src, dst): 734 735 if src not in self.transition_graph or dst not in self.transition_graph[src]: 736 raise AngrValueError('FakeRet edge (%s, %s) is not in transition graph.' % (src, dst)) 737 738 data = self.transition_graph[src][dst] 739 740 if 'type' not in data or data['type'] != 'fake_return': 741 raise AngrValueError('Edge (%s, %s) is not a FakeRet edge' % (src, dst)) 742 743 # it's confirmed. register the node if needed 744 if 'outside' not in data or data['outside'] is False: 745 self._register_nodes(True, dst) 746 747 self.transition_graph[src][dst]['confirmed'] = True 748 749 def _transit_to(self, from_node, to_node, outside=False, ins_addr=None, stmt_idx=None, is_exception=False): 750 """ 751 Registers an edge between basic blocks in this function's transition graph. 752 Arguments are CodeNode objects. 753 754 :param from_node The address of the basic block that control 755 flow leaves during this transition. 756 :param to_node The address of the basic block that control 757 flow enters during this transition. 758 :param bool outside: If this is a transition to another function, e.g. tail call optimization 759 :return: None 760 """ 761 762 if outside: 763 self._register_nodes(True, from_node) 764 if to_node is not None: 765 self._register_nodes(False, to_node) 766 767 self._jumpout_sites.add(from_node) 768 else: 769 if to_node is not None: 770 self._register_nodes(True, from_node, to_node) 771 else: 772 self._register_nodes(True, from_node) 773 774 type_ = 'transition' if not is_exception else 'exception' 775 if to_node is not None: 776 self.transition_graph.add_edge(from_node, to_node, type=type_, outside=outside, ins_addr=ins_addr, 777 stmt_idx=stmt_idx 778 ) 779 780 if outside: 781 # this node is an endpoint of the current function 782 self._add_endpoint(from_node, type_) 783 784 # clear the cache 785 self._local_transition_graph = None 786 787 def _call_to(self, from_node, to_func, ret_node, stmt_idx=None, ins_addr=None, return_to_outside=False): 788 """ 789 Registers an edge between the caller basic block and callee function. 790 791 :param from_addr: The basic block that control flow leaves during the transition. 792 :type from_addr: angr.knowledge.CodeNode 793 :param to_func: The function that we are calling 794 :type to_func: Function 795 :param ret_node The basic block that control flow should return to after the 796 function call. 797 :type to_func: angr.knowledge.CodeNode or None 798 :param stmt_idx: Statement ID of this call. 799 :type stmt_idx: int, str or None 800 :param ins_addr: Instruction address of this call. 801 :type ins_addr: int or None 802 """ 803 804 self._register_nodes(True, from_node) 805 806 if to_func.is_syscall: 807 self.transition_graph.add_edge(from_node, to_func, type='syscall', stmt_idx=stmt_idx, ins_addr=ins_addr) 808 else: 809 self.transition_graph.add_edge(from_node, to_func, type='call', stmt_idx=stmt_idx, ins_addr=ins_addr) 810 if ret_node is not None: 811 self._fakeret_to(from_node, ret_node, to_outside=return_to_outside) 812 813 self._local_transition_graph = None 814 815 def _fakeret_to(self, from_node, to_node, confirmed=None, to_outside=False): 816 self._register_nodes(True, from_node) 817 818 if confirmed is None: 819 self.transition_graph.add_edge(from_node, to_node, type='fake_return', outside=to_outside) 820 else: 821 self.transition_graph.add_edge(from_node, to_node, type='fake_return', confirmed=confirmed, 822 outside=to_outside 823 ) 824 if confirmed: 825 self._register_nodes(not to_outside, to_node) 826 827 self._local_transition_graph = None 828 829 def _remove_fakeret(self, from_node, to_node): 830 self.transition_graph.remove_edge(from_node, to_node) 831 832 self._local_transition_graph = None 833 834 def _return_from_call(self, from_func, to_node, to_outside=False): 835 self.transition_graph.add_edge(from_func, to_node, type='return', to_outside=to_outside) 836 for _, _, data in self.transition_graph.in_edges(to_node, data=True): 837 if 'type' in data and data['type'] == 'fake_return': 838 data['confirmed'] = True 839 840 self._local_transition_graph = None 841 842 def _register_nodes(self, is_local, *nodes): 843 if not isinstance(is_local, bool): 844 raise AngrValueError('_register_nodes(): the "is_local" parameter must be a bool') 845 846 for node in nodes: 847 self.transition_graph.add_node(node) 848 if not isinstance(node, CodeNode): 849 continue 850 node._graph = self.transition_graph 851 if node.addr not in self or self._block_sizes[node.addr] == 0: 852 self._block_sizes[node.addr] = node.size 853 if node.addr == self.addr: 854 if self.startpoint is None or not self.startpoint.is_hook: 855 self.startpoint = node 856 if is_local: 857 self._local_blocks[node.addr] = node 858 self._local_block_addrs.add(node.addr) 859 # add BlockNodes to the addr_to_block_node cache if not already there 860 if isinstance(node, BlockNode): 861 if node.addr not in self._addr_to_block_node: 862 self._addr_to_block_node[node.addr] = node 863 #else: 864 # # checks that we don't have multiple block nodes at a single address 865 # assert node == self._addr_to_block_node[node.addr] 866 867 def _add_return_site(self, return_site): 868 """ 869 Registers a basic block as a site for control flow to return from this function. 870 871 :param CodeNode return_site: The block node that ends with a return. 872 """ 873 self._register_nodes(True, return_site) 874 875 self._ret_sites.add(return_site) 876 # A return site must be an endpoint of the function - you cannot continue execution of the current function 877 # after returning 878 self._add_endpoint(return_site, 'return') 879 880 def _add_call_site(self, call_site_addr, call_target_addr, retn_addr): 881 """ 882 Registers a basic block as calling a function and returning somewhere. 883 884 :param call_site_addr: The address of a basic block that ends in a call. 885 :param call_target_addr: The address of the target of said call. 886 :param retn_addr: The address that said call will return to. 887 """ 888 self._call_sites[call_site_addr] = (call_target_addr, retn_addr) 889 890 def _add_endpoint(self, endpoint_node, sort): 891 """ 892 Registers an endpoint with a type of `sort`. The type can be one of the following: 893 - call: calling a function that does not return 894 - return: returning from the current function 895 - transition: a jump/branch targeting a different function 896 897 It is possible for a block to act as two different sorts of endpoints. For example, consider the following 898 block: 899 900 .text:0000000000024350 mov eax, 1 901 .text:0000000000024355 lock xadd [rdi+4], eax 902 .text:000000000002435A retn 903 904 VEX code: 905 00 | ------ IMark(0x424350, 5, 0) ------ 906 01 | PUT(rax) = 0x0000000000000001 907 02 | PUT(rip) = 0x0000000000424355 908 03 | ------ IMark(0x424355, 5, 0) ------ 909 04 | t11 = GET:I64(rdi) 910 05 | t10 = Add64(t11,0x0000000000000004) 911 06 | t0 = LDle:I32(t10) 912 07 | t2 = Add32(t0,0x00000001) 913 08 | t(4,4294967295) = CASle(t10 :: (t0,None)->(t2,None)) 914 09 | t14 = CasCmpNE32(t4,t0) 915 10 | if (t14) { PUT(rip) = 0x424355; Ijk_Boring } 916 11 | PUT(cc_op) = 0x0000000000000003 917 12 | t15 = 32Uto64(t0) 918 13 | PUT(cc_dep1) = t15 919 14 | PUT(cc_dep2) = 0x0000000000000001 920 15 | t17 = 32Uto64(t0) 921 16 | PUT(rax) = t17 922 17 | PUT(rip) = 0x000000000042435a 923 18 | ------ IMark(0x42435a, 1, 0) ------ 924 19 | t6 = GET:I64(rsp) 925 20 | t7 = LDle:I64(t6) 926 21 | t8 = Add64(t6,0x0000000000000008) 927 22 | PUT(rsp) = t8 928 23 | t18 = Sub64(t8,0x0000000000000080) 929 24 | ====== AbiHint(0xt18, 128, t7) ====== 930 NEXT: PUT(rip) = t7; Ijk_Ret 931 932 This block acts as both a return endpoint and a transition endpoint (transitioning to 0x424355). 933 934 :param endpoint_node: The endpoint node. 935 :param sort: Type of the endpoint. 936 :return: None 937 """ 938 939 self._endpoints[sort].add(endpoint_node) 940 941 def mark_nonreturning_calls_endpoints(self): 942 """ 943 Iterate through all call edges in transition graph. For each call a non-returning function, mark the source 944 basic block as an endpoint. 945 946 This method should only be executed once all functions are recovered and analyzed by CFG recovery, so we know 947 whether each function returns or not. 948 949 :return: None 950 """ 951 952 for src, dst, data in self.transition_graph.edges(data=True): 953 if 'type' in data and data['type'] == 'call': 954 func_addr = dst.addr 955 if func_addr in self._function_manager: 956 function = self._function_manager[func_addr] 957 if function.returning is False: 958 # the target function does not return 959 the_node = self.get_node(src.addr) 960 self._callout_sites.add(the_node) 961 self._add_endpoint(the_node, 'call') 962 963 def get_call_sites(self) -> Iterable[int]: 964 """ 965 Gets a list of all the basic blocks that end in calls. 966 967 :return: A view of the addresses of the blocks that end in calls. 968 """ 969 return self._call_sites.keys() 970 971 def get_call_target(self, callsite_addr): 972 """ 973 Get the target of a call. 974 975 :param callsite_addr: The address of a basic block that ends in a call. 976 :return: The target of said call, or None if callsite_addr is not a 977 callsite. 978 """ 979 if callsite_addr in self._call_sites: 980 return self._call_sites[callsite_addr][0] 981 return None 982 983 def get_call_return(self, callsite_addr): 984 """ 985 Get the hypothetical return address of a call. 986 987 :param callsite_addr: The address of the basic block that ends in a call. 988 :return: The likely return target of said call, or None if callsite_addr 989 is not a callsite. 990 """ 991 if callsite_addr in self._call_sites: 992 return self._call_sites[callsite_addr][1] 993 return None 994 995 @property 996 def graph(self): 997 """ 998 Get a local transition graph. A local transition graph is a transition graph that only contains nodes that 999 belong to the current function. All edges, except for the edges going out from the current function or coming 1000 from outside the current function, are included. 1001 1002 The generated graph is cached in self._local_transition_graph. 1003 1004 :return: A local transition graph. 1005 :rtype: networkx.DiGraph 1006 """ 1007 1008 if self._local_transition_graph is not None: 1009 return self._local_transition_graph 1010 1011 g = networkx.DiGraph() 1012 if self.startpoint is not None: 1013 g.add_node(self.startpoint) 1014 for block in self._local_blocks.values(): 1015 g.add_node(block) 1016 for src, dst, data in self.transition_graph.edges(data=True): 1017 if 'type' in data: 1018 if data['type'] in ('transition', 'exception') and ('outside' not in data or data['outside'] is False): 1019 g.add_edge(src, dst, **data) 1020 elif data['type'] == 'fake_return' and 'confirmed' in data and \ 1021 ('outside' not in data or data['outside'] is False): 1022 g.add_edge(src, dst, **data) 1023 1024 self._local_transition_graph = g 1025 1026 return g 1027 1028 def graph_ex(self, exception_edges=True): 1029 """ 1030 Get a local transition graph with a custom configuration. A local transition graph is a transition graph that 1031 only contains nodes that belong to the current function. This method allows user to exclude certain types of 1032 edges together with the nodes that are only reachable through such edges, such as exception edges. 1033 1034 The generated graph is not cached. 1035 1036 :param bool exception_edges: Should exception edges and the nodes that are only reachable through exception 1037 edges be kept. 1038 :return: A local transition graph with a special configuration. 1039 :rtype: networkx.DiGraph 1040 """ 1041 1042 # graph_ex() should not impact any already cached graph 1043 old_cached_graph = self._local_transition_graph 1044 graph = self.graph 1045 self._local_transition_graph = old_cached_graph # restore the cached graph 1046 1047 # fast path 1048 if exception_edges: 1049 return graph 1050 1051 # BFS on local graph but ignoring certain types of graphs 1052 g = networkx.DiGraph() 1053 queue = [ n for n in graph if n is self.startpoint or graph.in_degree[n] == 0 ] 1054 traversed = set(queue) 1055 1056 while queue: 1057 node = queue.pop(0) 1058 1059 g.add_node(node) 1060 for _, dst, edge_data in graph.out_edges(node, data=True): 1061 edge_type = edge_data.get('type', None) 1062 if not exception_edges and edge_type == 'exception': 1063 # ignore this edge 1064 continue 1065 g.add_edge(node, dst, **edge_data) 1066 1067 if dst not in traversed: 1068 traversed.add(dst) 1069 queue.append(dst) 1070 1071 return g 1072 1073 def transition_graph_ex(self, exception_edges=True): 1074 """ 1075 Get a transition graph with a custom configuration. This method allows user to exclude certain types of edges 1076 together with the nodes that are only reachable through such edges, such as exception edges. 1077 1078 The generated graph is not cached. 1079 1080 :param bool exception_edges: Should exception edges and the nodes that are only reachable through exception 1081 edges be kept. 1082 :return: A local transition graph with a special configuration. 1083 :rtype: networkx.DiGraph 1084 """ 1085 1086 graph = self.transition_graph 1087 1088 # fast path 1089 if exception_edges: 1090 return graph 1091 1092 # BFS on local graph but ignoring certain types of graphs 1093 g = networkx.DiGraph() 1094 queue = [ n for n in graph if n is self.startpoint or graph.in_degree[n] == 0 ] 1095 traversed = set(queue) 1096 1097 while queue: 1098 node = queue.pop(0) 1099 traversed.add(node) 1100 1101 g.add_node(node) 1102 for _, dst, edge_data in graph.out_edges(node, data=True): 1103 edge_type = edge_data.get('type', None) 1104 if not exception_edges and edge_type == 'exception': 1105 # ignore this edge 1106 continue 1107 g.add_edge(node, dst, **edge_data) 1108 1109 if dst not in traversed: 1110 traversed.add(dst) 1111 queue.append(dst) 1112 1113 return g 1114 1115 def subgraph(self, ins_addrs): 1116 """ 1117 Generate a sub control flow graph of instruction addresses based on self.graph 1118 1119 :param iterable ins_addrs: A collection of instruction addresses that should be included in the subgraph. 1120 :return networkx.DiGraph: A subgraph. 1121 """ 1122 1123 # find all basic blocks that include those instructions 1124 blocks = [] 1125 block_addr_to_insns = {} 1126 1127 for b in self._local_blocks.values(): 1128 # TODO: should I call get_blocks? 1129 block = self.get_block(b.addr, size=b.size, byte_string=b.bytestr) 1130 common_insns = set(block.instruction_addrs).intersection(ins_addrs) 1131 if common_insns: 1132 blocks.append(b) 1133 block_addr_to_insns[b.addr] = sorted(common_insns) 1134 1135 #subgraph = networkx.subgraph(self.graph, blocks) 1136 subgraph = self.graph.subgraph(blocks).copy() 1137 g = networkx.DiGraph() 1138 1139 for n in subgraph.nodes(): 1140 insns = block_addr_to_insns[n.addr] 1141 1142 in_edges = subgraph.in_edges(n) 1143 # out_edges = subgraph.out_edges(n) 1144 if len(in_edges) > 1: 1145 # the first instruction address should be included 1146 if n.addr not in insns: 1147 insns = [n.addr] + insns 1148 1149 for src, _ in in_edges: 1150 last_instr = block_addr_to_insns[src.addr][-1] 1151 g.add_edge(last_instr, insns[0]) 1152 1153 for i in range(0, len(insns) - 1): 1154 g.add_edge(insns[i], insns[i + 1]) 1155 1156 return g 1157 1158 def instruction_size(self, insn_addr): 1159 """ 1160 Get the size of the instruction specified by `insn_addr`. 1161 1162 :param int insn_addr: Address of the instruction 1163 :return int: Size of the instruction in bytes, or None if the instruction is not found. 1164 """ 1165 1166 for b in self.blocks: 1167 block = self.get_block(b.addr, size=b.size, byte_string=b.bytestr) 1168 if insn_addr in block.instruction_addrs: 1169 index = block.instruction_addrs.index(insn_addr) 1170 if index == len(block.instruction_addrs) - 1: 1171 # the very last instruction 1172 size = block.addr + block.size - insn_addr 1173 else: 1174 size = block.instruction_addrs[index + 1] - insn_addr 1175 return size 1176 1177 return None 1178 1179 def addr_to_instruction_addr(self, addr): 1180 """ 1181 Obtain the address of the instruction that covers @addr. 1182 1183 :param int addr: An address. 1184 :return: Address of the instruction that covers @addr, or None if this addr is not covered by any 1185 instruction of this function. 1186 :rtype: int or None 1187 """ 1188 1189 # TODO: Replace the linear search with binary search 1190 for b in self.blocks: 1191 if b.addr <= addr < b.addr + b.size: 1192 # found it 1193 for i, instr_addr in enumerate(b.instruction_addrs): 1194 if i < len(b.instruction_addrs) - 1 and instr_addr <= addr < b.instruction_addrs[i+1]: 1195 return instr_addr 1196 elif i == len(b.instruction_addrs) - 1 and instr_addr <= addr: 1197 return instr_addr 1198 # Not covered by any instruction... why? 1199 return None 1200 return None 1201 1202 def dbg_print(self): 1203 """ 1204 Returns a representation of the list of basic blocks in this function. 1205 """ 1206 return "[%s]" % (', '.join(('%#08x' % n.addr) for n in self.transition_graph.nodes())) 1207 1208 def dbg_draw(self, filename): 1209 """ 1210 Draw the graph and save it to a PNG file. 1211 """ 1212 import matplotlib.pyplot as pyplot # pylint: disable=import-error 1213 from networkx.drawing.nx_agraph import graphviz_layout # pylint: disable=import-error 1214 1215 tmp_graph = networkx.DiGraph() 1216 for from_block, to_block in self.transition_graph.edges(): 1217 node_a = "%#08x" % from_block.addr 1218 node_b = "%#08x" % to_block.addr 1219 if node_b in self._ret_sites: 1220 node_b += "[Ret]" 1221 if node_a in self._call_sites: 1222 node_a += "[Call]" 1223 tmp_graph.add_edge(node_a, node_b) 1224 pos = graphviz_layout(tmp_graph, prog='fdp') # pylint: disable=no-member 1225 networkx.draw(tmp_graph, pos, node_size=1200) 1226 pyplot.savefig(filename) 1227 1228 def _add_argument_register(self, reg_offset): 1229 """ 1230 Registers a register offset as being used as an argument to the function. 1231 1232 :param reg_offset: The offset of the register to register. 1233 """ 1234 if reg_offset in self._function_manager._arg_registers and \ 1235 reg_offset not in self._argument_registers: 1236 self._argument_registers.append(reg_offset) 1237 1238 def _add_argument_stack_variable(self, stack_var_offset): 1239 if stack_var_offset not in self._argument_stack_variables: 1240 self._argument_stack_variables.append(stack_var_offset) 1241 1242 @property 1243 def arguments(self): 1244 if self.calling_convention is None: 1245 return self._argument_registers + self._argument_stack_variables 1246 else: 1247 return self.calling_convention.args 1248 1249 @property 1250 def has_return(self): 1251 return len(self._ret_sites) > 0 1252 1253 @property 1254 def callable(self): 1255 return self._project.factory.callable(self.addr) 1256 1257 def normalize(self): 1258 """ 1259 Make sure all basic blocks in the transition graph of this function do not overlap. You will end up with a CFG 1260 that IDA Pro generates. 1261 1262 This method does not touch the CFG result. You may call CFG{Emulated, Fast}.normalize() for that matter. 1263 1264 :return: None 1265 """ 1266 1267 # let's put a check here 1268 if self.startpoint is None: 1269 # this function is empty 1270 l.debug('Unexpected error: %s does not have any blocks. normalize() fails.', repr(self)) 1271 return 1272 1273 graph = self.transition_graph 1274 end_addresses = defaultdict(list) 1275 1276 for block in self.nodes: 1277 if isinstance(block, BlockNode): 1278 end_addr = block.addr + block.size 1279 end_addresses[end_addr].append(block) 1280 1281 while any(len(x) > 1 for x in end_addresses.values()): 1282 end_addr, all_nodes = \ 1283 next((end_addr, x) for (end_addr, x) in end_addresses.items() if len(x) > 1) 1284 1285 all_nodes = sorted(all_nodes, key=lambda node: node.size) 1286 smallest_node = all_nodes[0] 1287 other_nodes = all_nodes[1:] 1288 1289 is_outside_node = False 1290 if smallest_node not in graph: 1291 is_outside_node = True 1292 1293 # Break other nodes 1294 for n in other_nodes: 1295 new_size = get_real_address_if_arm(self._project.arch, smallest_node.addr) - get_real_address_if_arm(self._project.arch, n.addr) 1296 if new_size == 0: 1297 # This is the node that has the same size as the smallest one 1298 continue 1299 1300 new_end_addr = n.addr + new_size 1301 1302 # Does it already exist? 1303 new_node = None 1304 if new_end_addr in end_addresses: 1305 nodes = [i for i in end_addresses[new_end_addr] if i.addr == n.addr] 1306 if len(nodes) > 0: 1307 new_node = nodes[0] 1308 1309 if new_node is None: 1310 # TODO: Do this correctly for hook nodes 1311 # Create a new one 1312 new_node = BlockNode(n.addr, new_size, graph=graph, thumb=n.thumb) 1313 self._block_sizes[n.addr] = new_size 1314 self._addr_to_block_node[n.addr] = new_node 1315 # Put the newnode into end_addresses 1316 end_addresses[new_end_addr].append(new_node) 1317 1318 # Modify the CFG 1319 original_predecessors = list(graph.in_edges([n], data=True)) 1320 original_successors = list(graph.out_edges([n], data=True)) 1321 1322 for _, d, data in original_successors: 1323 ins_addr = data.get('ins_addr', data.get('pseudo_ins_addr', None)) 1324 if ins_addr is not None and ins_addr < d.addr: 1325 continue 1326 if d not in graph[smallest_node]: 1327 if d is n: 1328 graph.add_edge(smallest_node, new_node, **data) 1329 else: 1330 graph.add_edge(smallest_node, d, **data) 1331 1332 for p, _, _ in original_predecessors: 1333 graph.remove_edge(p, n) 1334 graph.remove_node(n) 1335 1336 # update local_blocks 1337 if n.addr in self._local_blocks and self._local_blocks[n.addr].size != new_node.size: 1338 del self._local_blocks[n.addr] 1339 self._local_blocks[n.addr] = new_node 1340 1341 # update block_cache and block_sizes 1342 if (n.addr in self._block_cache and self._block_cache[n.addr].size != new_node.size) or \ 1343 (n.addr in self._block_sizes and self._block_sizes[n.addr] != new_node.size): 1344 # the cache needs updating 1345 self._block_cache.pop(n.addr, None) 1346 self._block_sizes[n.addr] = new_node.size 1347 1348 for p, _, data in original_predecessors: 1349 if p not in other_nodes: 1350 graph.add_edge(p, new_node, **data) 1351 1352 # We should find the correct successor 1353 new_successors = [i for i in all_nodes 1354 if i.addr == smallest_node.addr] 1355 if new_successors: 1356 new_successor = new_successors[0] 1357 graph.add_edge(new_node, new_successor, 1358 type="transition", 1359 outside=is_outside_node, 1360 # it's named "pseudo_ins_addr" because we have no way to know what the actual last 1361 # instruction is at this moment (without re-lifting the block, which would be a 1362 # waste of time). 1363 pseudo_ins_addr=new_node.addr + new_node.size - 1, 1364 ) 1365 else: 1366 # We gotta create a new one 1367 l.error('normalize(): Please report it to Fish/maybe john.') 1368 1369 end_addresses[end_addr] = [smallest_node] 1370 1371 # Rebuild startpoint 1372 if self.startpoint.size != self._block_sizes[self.startpoint.addr]: 1373 self.startpoint = self.get_node(self.startpoint.addr) 1374 1375 # Clear the cache 1376 self._local_transition_graph = None 1377 1378 self.normalized = True 1379 1380 def find_declaration(self): 1381 """ 1382 Find the most likely function declaration from the embedded collection of prototypes, set it to self.prototype, 1383 and update self.calling_convention with the declaration. 1384 1385 :return: None 1386 """ 1387 1388 # determine the library name 1389 1390 if not self.is_plt: 1391 binary_name = self.binary_name 1392 if binary_name not in SIM_LIBRARIES: 1393 return 1394 else: 1395 binary_name = None 1396 # PLT entries must have the same declaration as their jump targets 1397 # Try to determine which library this PLT entry will jump to 1398 edges = self.transition_graph.edges() 1399 if len(edges) == 0: return 1400 node = next(iter(edges))[1] 1401 if len(edges) == 1 and (type(node) is HookNode or type(node) is SyscallNode): 1402 target = node.addr 1403 if target in self._function_manager: 1404 target_func = self._function_manager[target] 1405 binary_name = target_func.binary_name 1406 1407 if binary_name is None: 1408 return 1409 1410 library = SIM_LIBRARIES.get(binary_name, None) 1411 1412 if library is None: 1413 return 1414 1415 if not library.has_prototype(self.name): 1416 return 1417 1418 proto = library.get_prototype(self.name) 1419 self.prototype = proto 1420 if self.calling_convention is not None: 1421 self.calling_convention.args = None 1422 self.calling_convention.set_func_type_with_arch(proto) 1423 1424 @staticmethod 1425 def _addr_to_funcloc(addr): 1426 1427 # FIXME 1428 if isinstance(addr, tuple): 1429 return addr[0] 1430 else: # int, long 1431 return addr 1432 1433 @property 1434 def demangled_name(self): 1435 if self.name[0:2] == "_Z": 1436 try: 1437 ast = parse(self.name) 1438 except NotImplementedError: 1439 return self.name 1440 if ast: 1441 return ast.__str__() 1442 return self.name 1443 1444 def apply_definition(self, definition, calling_convention=None): 1445 """ 1446 1447 :param str definition: 1448 :param Optional[Union[SimCC, Type[SimCC]]] calling_convention: 1449 :return None: 1450 """ 1451 if not definition.endswith(";"): 1452 definition += ";" 1453 func_def = parse_defns(definition) 1454 if len(func_def.keys()) > 1: 1455 raise Exception("Too many definitions: %s " % list(func_def.keys())) 1456 1457 name, ty = func_def.popitem() # type: str, SimTypeFunction 1458 self.name = name 1459 # setup the calling convention 1460 # If a SimCC object is passed assume that this is sane and just use it 1461 if isinstance(calling_convention, SimCC): 1462 self.calling_convention = calling_convention 1463 1464 # If it is a subclass of SimCC we can instantiate it 1465 elif isinstance(calling_convention, type) and issubclass(calling_convention, SimCC): 1466 self.calling_convention = calling_convention(self.project.arch, func_ty=ty) 1467 1468 # If none is specified default to something 1469 elif calling_convention is None: 1470 self.calling_convention = self.project.factory.cc(func_ty=ty) 1471 1472 else: 1473 raise TypeError("calling_convention has to be one of: [SimCC, type(SimCC), None]") 1474 1475 def functions_called(self) -> Set['Function']: 1476 """ 1477 :return: The set of all functions that can be reached from the function represented by self. 1478 """ 1479 called = set() 1480 def _find_called(function_address): 1481 successors = set(self._function_manager.callgraph.successors(function_address)) - called 1482 for s in successors: 1483 called.add(s) 1484 _find_called(s) 1485 _find_called(self.addr) 1486 return { self._function_manager.function(a) for a in called } 1487 1488 def copy(self): 1489 func = Function(self._function_manager, self.addr, name=self.name, syscall=self.is_syscall) 1490 func.transition_graph = networkx.DiGraph(self.transition_graph) 1491 func.normalized = self.normalized 1492 func._ret_sites = self._ret_sites.copy() 1493 func._jumpout_sites = self._jumpout_sites.copy() 1494 func._retout_sites = self._retout_sites.copy() 1495 func._endpoints = self._endpoints.copy() 1496 func._call_sites = self._call_sites.copy() 1497 func._project = self._project 1498 func.is_plt = self.is_plt 1499 func.is_simprocedure = self.is_simprocedure 1500 func.binary_name = self.binary_name 1501 func.bp_on_stack = self.bp_on_stack 1502 func.retaddr_on_stack = self.retaddr_on_stack 1503 func.sp_delta = self.sp_delta 1504 func.calling_convention = self.calling_convention 1505 func.prototype = self.prototype 1506 func._returning = self._returning 1507 func.alignment = self.alignment 1508 func.startpoint = self.startpoint 1509 func._addr_to_block_node = self._addr_to_block_node.copy() 1510 func._block_sizes = self._block_sizes.copy() 1511 func._block_cache = self._block_cache.copy() 1512 func._local_blocks = self._local_blocks.copy() 1513 func._local_block_addrs = self._local_block_addrs.copy() 1514 func.info = self.info.copy() 1515 func.tags = self.tags 1516 1517 return func 1518