1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def build(self, *args, **kwargs): 166 self.buildDefault(*args, **kwargs) 167 168 def getLocalServerLogFile(self): 169 return self.getLogBasenameForCurrentTest() + "-server.log" 170 171 def setUpServerLogging(self, is_llgs): 172 if len(lldbtest_config.channels) == 0: 173 return # No logging requested 174 175 if lldb.remote_platform: 176 log_file = lldbutil.join_remote_paths( 177 lldb.remote_platform.GetWorkingDirectory(), "server.log") 178 else: 179 log_file = self.getLocalServerLogFile() 180 181 if is_llgs: 182 self.debug_monitor_extra_args.append("--log-file=" + log_file) 183 self.debug_monitor_extra_args.append( 184 "--log-channels={}".format(":".join(lldbtest_config.channels))) 185 else: 186 self.debug_monitor_extra_args = [ 187 "--log-file=" + log_file, "--log-flags=0x800000"] 188 189 def get_next_port(self): 190 return 12000 + random.randint(0, 3999) 191 192 def reset_test_sequence(self): 193 self.test_sequence = GdbRemoteTestSequence(self.logger) 194 195 196 def _init_llgs_test(self): 197 reverse_connect = True 198 if lldb.remote_platform: 199 # Reverse connections may be tricky due to firewalls/NATs. 200 reverse_connect = False 201 202 # FIXME: This is extremely linux-oriented 203 204 # Grab the ppid from /proc/[shell pid]/stat 205 err, retcode, shell_stat = self.run_platform_command( 206 "cat /proc/$$/stat") 207 self.assertTrue( 208 err.Success() and retcode == 0, 209 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 210 (err.GetCString(), 211 retcode)) 212 213 # [pid] ([executable]) [state] [*ppid*] 214 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 215 err, retcode, ls_output = self.run_platform_command( 216 "ls -l /proc/%s/exe" % pid) 217 self.assertTrue( 218 err.Success() and retcode == 0, 219 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 220 (pid, 221 err.GetCString(), 222 retcode)) 223 exe = ls_output.split()[-1] 224 225 # If the binary has been deleted, the link name has " (deleted)" appended. 226 # Remove if it's there. 227 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 228 else: 229 self.debug_monitor_exe = get_lldb_server_exe() 230 231 self.debug_monitor_extra_args = ["gdbserver"] 232 self.setUpServerLogging(is_llgs=True) 233 234 self.reverse_connect = reverse_connect 235 236 def _init_debugserver_test(self): 237 self.debug_monitor_exe = get_debugserver_exe() 238 self.setUpServerLogging(is_llgs=False) 239 self.reverse_connect = True 240 241 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 242 # when the process truly dies. 243 self.stub_sends_two_stop_notifications_on_kill = True 244 245 def forward_adb_port(self, source, target, direction, device): 246 adb = ['adb'] + (['-s', device] if device else []) + [direction] 247 248 def remove_port_forward(): 249 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 250 251 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 252 self.addTearDownHook(remove_port_forward) 253 254 def _verify_socket(self, sock): 255 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 256 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 257 # the connect() will always be successful, but the connection will be immediately dropped 258 # if ADB could not connect on the remote side. This function tries to detect this 259 # situation, and report it as "connection refused" so that the upper layers attempt the 260 # connection again. 261 triple = self.dbg.GetSelectedPlatform().GetTriple() 262 if not re.match(".*-.*-.*-android", triple): 263 return # Not android. 264 can_read, _, _ = select.select([sock], [], [], 0.1) 265 if sock not in can_read: 266 return # Data is not available, but the connection is alive. 267 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 268 raise _ConnectionRefused() # Got EOF, connection dropped. 269 270 def create_socket(self): 271 try: 272 sock = socket.socket(family=socket.AF_INET) 273 except OSError as e: 274 if e.errno != errno.EAFNOSUPPORT: 275 raise 276 sock = socket.socket(family=socket.AF_INET6) 277 278 logger = self.logger 279 280 triple = self.dbg.GetSelectedPlatform().GetTriple() 281 if re.match(".*-.*-.*-android", triple): 282 self.forward_adb_port( 283 self.port, 284 self.port, 285 "forward", 286 self.stub_device) 287 288 logger.info( 289 "Connecting to debug monitor on %s:%d", 290 self.stub_hostname, 291 self.port) 292 connect_info = (self.stub_hostname, self.port) 293 try: 294 sock.connect(connect_info) 295 except socket.error as serr: 296 if serr.errno == errno.ECONNREFUSED: 297 raise _ConnectionRefused() 298 raise serr 299 300 def shutdown_socket(): 301 if sock: 302 try: 303 # send the kill packet so lldb-server shuts down gracefully 304 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 305 except: 306 logger.warning( 307 "failed to send kill packet to debug monitor: {}; ignoring".format( 308 sys.exc_info()[0])) 309 310 try: 311 sock.close() 312 except: 313 logger.warning( 314 "failed to close socket to debug monitor: {}; ignoring".format( 315 sys.exc_info()[0])) 316 317 self.addTearDownHook(shutdown_socket) 318 319 self._verify_socket(sock) 320 321 return sock 322 323 def set_inferior_startup_launch(self): 324 self._inferior_startup = self._STARTUP_LAUNCH 325 326 def set_inferior_startup_attach(self): 327 self._inferior_startup = self._STARTUP_ATTACH 328 329 def set_inferior_startup_attach_manually(self): 330 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 331 332 def get_debug_monitor_command_line_args(self, attach_pid=None): 333 commandline_args = self.debug_monitor_extra_args 334 if attach_pid: 335 commandline_args += ["--attach=%d" % attach_pid] 336 if self.reverse_connect: 337 commandline_args += ["--reverse-connect", self.connect_address] 338 else: 339 if lldb.remote_platform: 340 commandline_args += ["*:{}".format(self.port)] 341 else: 342 commandline_args += ["localhost:{}".format(self.port)] 343 344 return commandline_args 345 346 def get_target_byte_order(self): 347 inferior_exe_path = self.getBuildArtifact("a.out") 348 target = self.dbg.CreateTarget(inferior_exe_path) 349 return target.GetByteOrder() 350 351 def launch_debug_monitor(self, attach_pid=None, logfile=None): 352 if self.reverse_connect: 353 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 354 sock = socket.socket(family, type, proto) 355 sock.settimeout(self.DEFAULT_TIMEOUT) 356 357 sock.bind(addr) 358 sock.listen(1) 359 addr = sock.getsockname() 360 self.connect_address = "[{}]:{}".format(*addr) 361 362 363 # Create the command line. 364 commandline_args = self.get_debug_monitor_command_line_args( 365 attach_pid=attach_pid) 366 367 # Start the server. 368 server = self.spawnSubprocess( 369 self.debug_monitor_exe, 370 commandline_args, 371 install_remote=False) 372 self.assertIsNotNone(server) 373 374 if self.reverse_connect: 375 self.sock = sock.accept()[0] 376 self.sock.settimeout(self.DEFAULT_TIMEOUT) 377 378 return server 379 380 def connect_to_debug_monitor(self, attach_pid=None): 381 if self.reverse_connect: 382 # Create the stub. 383 server = self.launch_debug_monitor(attach_pid=attach_pid) 384 self.assertIsNotNone(server) 385 386 # Schedule debug monitor to be shut down during teardown. 387 logger = self.logger 388 389 self._server = Server(self.sock, server) 390 return server 391 392 # We're using a random port algorithm to try not to collide with other ports, 393 # and retry a max # times. 394 attempts = 0 395 MAX_ATTEMPTS = 20 396 397 while attempts < MAX_ATTEMPTS: 398 server = self.launch_debug_monitor(attach_pid=attach_pid) 399 400 # Schedule debug monitor to be shut down during teardown. 401 logger = self.logger 402 403 connect_attemps = 0 404 MAX_CONNECT_ATTEMPTS = 10 405 406 while connect_attemps < MAX_CONNECT_ATTEMPTS: 407 # Create a socket to talk to the server 408 try: 409 logger.info("Connect attempt %d", connect_attemps + 1) 410 self.sock = self.create_socket() 411 self._server = Server(self.sock, server) 412 return server 413 except _ConnectionRefused as serr: 414 # Ignore, and try again. 415 pass 416 time.sleep(0.5) 417 connect_attemps += 1 418 419 # We should close the server here to be safe. 420 server.terminate() 421 422 # Increment attempts. 423 print( 424 "connect to debug monitor on port %d failed, attempt #%d of %d" % 425 (self.port, attempts + 1, MAX_ATTEMPTS)) 426 attempts += 1 427 428 # And wait a random length of time before next attempt, to avoid 429 # collisions. 430 time.sleep(random.randint(1, 5)) 431 432 # Now grab a new port number. 433 self.port = self.get_next_port() 434 435 raise Exception( 436 "failed to create a socket to the launched debug monitor after %d tries" % 437 attempts) 438 439 def launch_process_for_attach( 440 self, 441 inferior_args=None, 442 sleep_seconds=3, 443 exe_path=None): 444 # We're going to start a child process that the debug monitor stub can later attach to. 445 # This process needs to be started so that it just hangs around for a while. We'll 446 # have it sleep. 447 if not exe_path: 448 exe_path = self.getBuildArtifact("a.out") 449 450 args = [] 451 if inferior_args: 452 args.extend(inferior_args) 453 if sleep_seconds: 454 args.append("sleep:%d" % sleep_seconds) 455 456 return self.spawnSubprocess(exe_path, args) 457 458 def prep_debug_monitor_and_inferior( 459 self, 460 inferior_args=None, 461 inferior_sleep_seconds=3, 462 inferior_exe_path=None, 463 inferior_env=None): 464 """Prep the debug monitor, the inferior, and the expected packet stream. 465 466 Handle the separate cases of using the debug monitor in attach-to-inferior mode 467 and in launch-inferior mode. 468 469 For attach-to-inferior mode, the inferior process is first started, then 470 the debug monitor is started in attach to pid mode (using --attach on the 471 stub command line), and the no-ack-mode setup is appended to the packet 472 stream. The packet stream is not yet executed, ready to have more expected 473 packet entries added to it. 474 475 For launch-inferior mode, the stub is first started, then no ack mode is 476 setup on the expected packet stream, then the verified launch packets are added 477 to the expected socket stream. The packet stream is not yet executed, ready 478 to have more expected packet entries added to it. 479 480 The return value is: 481 {inferior:<inferior>, server:<server>} 482 """ 483 inferior = None 484 attach_pid = None 485 486 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 487 # Launch the process that we'll use as the inferior. 488 inferior = self.launch_process_for_attach( 489 inferior_args=inferior_args, 490 sleep_seconds=inferior_sleep_seconds, 491 exe_path=inferior_exe_path) 492 self.assertIsNotNone(inferior) 493 self.assertTrue(inferior.pid > 0) 494 if self._inferior_startup == self._STARTUP_ATTACH: 495 # In this case, we want the stub to attach via the command 496 # line, so set the command line attach pid here. 497 attach_pid = inferior.pid 498 499 if self._inferior_startup == self._STARTUP_LAUNCH: 500 # Build launch args 501 if not inferior_exe_path: 502 inferior_exe_path = self.getBuildArtifact("a.out") 503 504 if lldb.remote_platform: 505 remote_path = lldbutil.append_to_process_working_directory(self, 506 os.path.basename(inferior_exe_path)) 507 remote_file_spec = lldb.SBFileSpec(remote_path, False) 508 err = lldb.remote_platform.Install(lldb.SBFileSpec( 509 inferior_exe_path, True), remote_file_spec) 510 if err.Fail(): 511 raise Exception( 512 "remote_platform.Install('%s', '%s') failed: %s" % 513 (inferior_exe_path, remote_path, err)) 514 inferior_exe_path = remote_path 515 516 launch_args = [inferior_exe_path] 517 if inferior_args: 518 launch_args.extend(inferior_args) 519 520 # Launch the debug monitor stub, attaching to the inferior. 521 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 522 self.assertIsNotNone(server) 523 524 self.do_handshake() 525 526 # Build the expected protocol stream 527 if inferior_env: 528 for name, value in inferior_env.items(): 529 self.add_set_environment_packets(name, value) 530 if self._inferior_startup == self._STARTUP_LAUNCH: 531 self.add_verified_launch_packets(launch_args) 532 533 return {"inferior": inferior, "server": server} 534 535 def do_handshake(self): 536 server = self._server 537 server.send_ack() 538 server.send_packet(b"QStartNoAckMode") 539 self.assertEqual(server.get_normal_packet(), b"+") 540 self.assertEqual(server.get_normal_packet(), b"OK") 541 server.send_ack() 542 543 def add_verified_launch_packets(self, launch_args): 544 self.test_sequence.add_log_lines( 545 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 546 "send packet: $OK#00", 547 "read packet: $qLaunchSuccess#a5", 548 "send packet: $OK#00"], 549 True) 550 551 def add_thread_suffix_request_packets(self): 552 self.test_sequence.add_log_lines( 553 ["read packet: $QThreadSuffixSupported#e4", 554 "send packet: $OK#00", 555 ], True) 556 557 def add_process_info_collection_packets(self): 558 self.test_sequence.add_log_lines( 559 ["read packet: $qProcessInfo#dc", 560 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 561 True) 562 563 def add_set_environment_packets(self, name, value): 564 self.test_sequence.add_log_lines( 565 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 566 "send packet: $OK#00", 567 ], True) 568 569 _KNOWN_PROCESS_INFO_KEYS = [ 570 "pid", 571 "parent-pid", 572 "real-uid", 573 "real-gid", 574 "effective-uid", 575 "effective-gid", 576 "cputype", 577 "cpusubtype", 578 "ostype", 579 "triple", 580 "vendor", 581 "endian", 582 "elf_abi", 583 "ptrsize" 584 ] 585 586 def parse_process_info_response(self, context): 587 # Ensure we have a process info response. 588 self.assertIsNotNone(context) 589 process_info_raw = context.get("process_info_raw") 590 self.assertIsNotNone(process_info_raw) 591 592 # Pull out key:value; pairs. 593 process_info_dict = { 594 match.group(1): match.group(2) for match in re.finditer( 595 r"([^:]+):([^;]+);", process_info_raw)} 596 597 # Validate keys are known. 598 for (key, val) in list(process_info_dict.items()): 599 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 600 self.assertIsNotNone(val) 601 602 return process_info_dict 603 604 def add_register_info_collection_packets(self): 605 self.test_sequence.add_log_lines( 606 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 607 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 608 "save_key": "reg_info_responses"}], 609 True) 610 611 def parse_register_info_packets(self, context): 612 """Return an array of register info dictionaries, one per register info.""" 613 reg_info_responses = context.get("reg_info_responses") 614 self.assertIsNotNone(reg_info_responses) 615 616 # Parse register infos. 617 return [parse_reg_info_response(reg_info_response) 618 for reg_info_response in reg_info_responses] 619 620 def expect_gdbremote_sequence(self): 621 return expect_lldb_gdbserver_replay( 622 self, 623 self._server, 624 self.test_sequence, 625 self.DEFAULT_TIMEOUT * len(self.test_sequence), 626 self.logger) 627 628 _KNOWN_REGINFO_KEYS = [ 629 "name", 630 "alt-name", 631 "bitsize", 632 "offset", 633 "encoding", 634 "format", 635 "set", 636 "gcc", 637 "ehframe", 638 "dwarf", 639 "generic", 640 "container-regs", 641 "invalidate-regs", 642 "dynamic_size_dwarf_expr_bytes", 643 "dynamic_size_dwarf_len" 644 ] 645 646 def assert_valid_reg_info(self, reg_info): 647 # Assert we know about all the reginfo keys parsed. 648 for key in reg_info: 649 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 650 651 # Check the bare-minimum expected set of register info keys. 652 self.assertTrue("name" in reg_info) 653 self.assertTrue("bitsize" in reg_info) 654 655 if not self.getArchitecture() == 'aarch64': 656 self.assertTrue("offset" in reg_info) 657 658 self.assertTrue("encoding" in reg_info) 659 self.assertTrue("format" in reg_info) 660 661 def find_pc_reg_info(self, reg_infos): 662 lldb_reg_index = 0 663 for reg_info in reg_infos: 664 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 665 return (lldb_reg_index, reg_info) 666 lldb_reg_index += 1 667 668 return (None, None) 669 670 def add_lldb_register_index(self, reg_infos): 671 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 672 673 We'll use this when we want to call packets like P/p with a register index but do so 674 on only a subset of the full register info set. 675 """ 676 self.assertIsNotNone(reg_infos) 677 678 reg_index = 0 679 for reg_info in reg_infos: 680 reg_info["lldb_register_index"] = reg_index 681 reg_index += 1 682 683 def add_query_memory_region_packets(self, address): 684 self.test_sequence.add_log_lines( 685 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 686 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 687 True) 688 689 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 690 self.assertIsNotNone(key_val_text) 691 kv_dict = {} 692 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 693 key = match.group(1) 694 val = match.group(2) 695 if key in kv_dict: 696 if allow_dupes: 697 if isinstance(kv_dict[key], list): 698 kv_dict[key].append(val) 699 else: 700 # Promote to list 701 kv_dict[key] = [kv_dict[key], val] 702 else: 703 self.fail( 704 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 705 key, val, key_val_text, kv_dict)) 706 else: 707 kv_dict[key] = val 708 return kv_dict 709 710 def parse_memory_region_packet(self, context): 711 # Ensure we have a context. 712 self.assertIsNotNone(context.get("memory_region_response")) 713 714 # Pull out key:value; pairs. 715 mem_region_dict = self.parse_key_val_dict( 716 context.get("memory_region_response")) 717 718 # Validate keys are known. 719 for (key, val) in list(mem_region_dict.items()): 720 self.assertIn(key, 721 ["start", 722 "size", 723 "permissions", 724 "flags", 725 "name", 726 "error", 727 "dirty-pages"]) 728 self.assertIsNotNone(val) 729 730 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 731 # Return the dictionary of key-value pairs for the memory region. 732 return mem_region_dict 733 734 def assert_address_within_memory_region( 735 self, test_address, mem_region_dict): 736 self.assertIsNotNone(mem_region_dict) 737 self.assertTrue("start" in mem_region_dict) 738 self.assertTrue("size" in mem_region_dict) 739 740 range_start = int(mem_region_dict["start"], 16) 741 range_size = int(mem_region_dict["size"], 16) 742 range_end = range_start + range_size 743 744 if test_address < range_start: 745 self.fail( 746 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 747 test_address, 748 range_start, 749 range_end, 750 range_size)) 751 elif test_address >= range_end: 752 self.fail( 753 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 754 test_address, 755 range_start, 756 range_end, 757 range_size)) 758 759 def add_threadinfo_collection_packets(self): 760 self.test_sequence.add_log_lines( 761 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 762 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 763 "save_key": "threadinfo_responses"}], 764 True) 765 766 def parse_threadinfo_packets(self, context): 767 """Return an array of thread ids (decimal ints), one per thread.""" 768 threadinfo_responses = context.get("threadinfo_responses") 769 self.assertIsNotNone(threadinfo_responses) 770 771 thread_ids = [] 772 for threadinfo_response in threadinfo_responses: 773 new_thread_infos = parse_threadinfo_response(threadinfo_response) 774 thread_ids.extend(new_thread_infos) 775 return thread_ids 776 777 def wait_for_thread_count(self, thread_count): 778 start_time = time.time() 779 timeout_time = start_time + self.DEFAULT_TIMEOUT 780 781 actual_thread_count = 0 782 while actual_thread_count < thread_count: 783 self.reset_test_sequence() 784 self.add_threadinfo_collection_packets() 785 786 context = self.expect_gdbremote_sequence() 787 self.assertIsNotNone(context) 788 789 threads = self.parse_threadinfo_packets(context) 790 self.assertIsNotNone(threads) 791 792 actual_thread_count = len(threads) 793 794 if time.time() > timeout_time: 795 raise Exception( 796 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 797 self.DEFAULT_TIMEOUT, thread_count, actual_thread_count)) 798 799 return threads 800 801 def add_set_breakpoint_packets( 802 self, 803 address, 804 z_packet_type=0, 805 do_continue=True, 806 breakpoint_kind=1): 807 self.test_sequence.add_log_lines( 808 [ # Set the breakpoint. 809 "read packet: $Z{2},{0:x},{1}#00".format( 810 address, breakpoint_kind, z_packet_type), 811 # Verify the stub could set it. 812 "send packet: $OK#00", 813 ], True) 814 815 if (do_continue): 816 self.test_sequence.add_log_lines( 817 [ # Continue the inferior. 818 "read packet: $c#63", 819 # Expect a breakpoint stop report. 820 {"direction": "send", 821 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 822 "capture": {1: "stop_signo", 823 2: "stop_thread_id"}}, 824 ], True) 825 826 def add_remove_breakpoint_packets( 827 self, 828 address, 829 z_packet_type=0, 830 breakpoint_kind=1): 831 self.test_sequence.add_log_lines( 832 [ # Remove the breakpoint. 833 "read packet: $z{2},{0:x},{1}#00".format( 834 address, breakpoint_kind, z_packet_type), 835 # Verify the stub could unset it. 836 "send packet: $OK#00", 837 ], True) 838 839 def add_qSupported_packets(self, client_features=[]): 840 features = ''.join(';' + x for x in client_features) 841 self.test_sequence.add_log_lines( 842 ["read packet: $qSupported{}#00".format(features), 843 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 844 ], True) 845 846 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 847 "augmented-libraries-svr4-read", 848 "PacketSize", 849 "QStartNoAckMode", 850 "QThreadSuffixSupported", 851 "QListThreadsInStopReply", 852 "qXfer:auxv:read", 853 "qXfer:libraries:read", 854 "qXfer:libraries-svr4:read", 855 "qXfer:features:read", 856 "qEcho", 857 "QPassSignals", 858 "multiprocess", 859 "fork-events", 860 "vfork-events", 861 "memory-tagging", 862 ] 863 864 def parse_qSupported_response(self, context): 865 self.assertIsNotNone(context) 866 867 raw_response = context.get("qSupported_response") 868 self.assertIsNotNone(raw_response) 869 870 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 871 # +,-,? is stripped from the key and set as the value. 872 supported_dict = {} 873 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 874 key = match.group(1) 875 val = match.group(3) 876 877 # key=val: store as is 878 if val and len(val) > 0: 879 supported_dict[key] = val 880 else: 881 if len(key) < 2: 882 raise Exception( 883 "singular stub feature is too short: must be stub_feature{+,-,?}") 884 supported_type = key[-1] 885 key = key[:-1] 886 if not supported_type in ["+", "-", "?"]: 887 raise Exception( 888 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 889 supported_dict[key] = supported_type 890 # Ensure we know the supported element 891 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 892 raise Exception( 893 "unknown qSupported stub feature reported: %s" % 894 key) 895 896 return supported_dict 897 898 def run_process_then_stop(self, run_seconds=1): 899 # Tell the stub to continue. 900 self.test_sequence.add_log_lines( 901 ["read packet: $vCont;c#a8"], 902 True) 903 context = self.expect_gdbremote_sequence() 904 905 # Wait for run_seconds. 906 time.sleep(run_seconds) 907 908 # Send an interrupt, capture a T response. 909 self.reset_test_sequence() 910 self.test_sequence.add_log_lines( 911 ["read packet: {}".format(chr(3)), 912 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 913 True) 914 context = self.expect_gdbremote_sequence() 915 self.assertIsNotNone(context) 916 self.assertIsNotNone(context.get("stop_result")) 917 918 return context 919 920 def continue_process_and_wait_for_stop(self): 921 self.test_sequence.add_log_lines( 922 [ 923 "read packet: $vCont;c#a8", 924 { 925 "direction": "send", 926 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 927 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 928 }, 929 ], 930 True, 931 ) 932 context = self.expect_gdbremote_sequence() 933 self.assertIsNotNone(context) 934 return self.parse_interrupt_packets(context) 935 936 def select_modifiable_register(self, reg_infos): 937 """Find a register that can be read/written freely.""" 938 PREFERRED_REGISTER_NAMES = set(["rax", ]) 939 940 # First check for the first register from the preferred register name 941 # set. 942 alternative_register_index = None 943 944 self.assertIsNotNone(reg_infos) 945 for reg_info in reg_infos: 946 if ("name" in reg_info) and ( 947 reg_info["name"] in PREFERRED_REGISTER_NAMES): 948 # We found a preferred register. Use it. 949 return reg_info["lldb_register_index"] 950 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 951 reg_info["generic"] == "arg1"): 952 # A frame pointer or first arg register will do as a 953 # register to modify temporarily. 954 alternative_register_index = reg_info["lldb_register_index"] 955 956 # We didn't find a preferred register. Return whatever alternative register 957 # we found, if any. 958 return alternative_register_index 959 960 def extract_registers_from_stop_notification(self, stop_key_vals_text): 961 self.assertIsNotNone(stop_key_vals_text) 962 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 963 964 registers = {} 965 for (key, val) in list(kv_dict.items()): 966 if re.match(r"^[0-9a-fA-F]+$", key): 967 registers[int(key, 16)] = val 968 return registers 969 970 def gather_register_infos(self): 971 self.reset_test_sequence() 972 self.add_register_info_collection_packets() 973 974 context = self.expect_gdbremote_sequence() 975 self.assertIsNotNone(context) 976 977 reg_infos = self.parse_register_info_packets(context) 978 self.assertIsNotNone(reg_infos) 979 self.add_lldb_register_index(reg_infos) 980 981 return reg_infos 982 983 def find_generic_register_with_name(self, reg_infos, generic_name): 984 self.assertIsNotNone(reg_infos) 985 for reg_info in reg_infos: 986 if ("generic" in reg_info) and ( 987 reg_info["generic"] == generic_name): 988 return reg_info 989 return None 990 991 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 992 self.assertIsNotNone(reg_infos) 993 for reg_info in reg_infos: 994 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 995 return reg_info 996 return None 997 998 def decode_gdbremote_binary(self, encoded_bytes): 999 decoded_bytes = "" 1000 i = 0 1001 while i < len(encoded_bytes): 1002 if encoded_bytes[i] == "}": 1003 # Handle escaped char. 1004 self.assertTrue(i + 1 < len(encoded_bytes)) 1005 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1006 i += 2 1007 elif encoded_bytes[i] == "*": 1008 # Handle run length encoding. 1009 self.assertTrue(len(decoded_bytes) > 0) 1010 self.assertTrue(i + 1 < len(encoded_bytes)) 1011 repeat_count = ord(encoded_bytes[i + 1]) - 29 1012 decoded_bytes += decoded_bytes[-1] * repeat_count 1013 i += 2 1014 else: 1015 decoded_bytes += encoded_bytes[i] 1016 i += 1 1017 return decoded_bytes 1018 1019 def build_auxv_dict(self, endian, word_size, auxv_data): 1020 self.assertIsNotNone(endian) 1021 self.assertIsNotNone(word_size) 1022 self.assertIsNotNone(auxv_data) 1023 1024 auxv_dict = {} 1025 1026 # PowerPC64le's auxvec has a special key that must be ignored. 1027 # This special key may be used multiple times, resulting in 1028 # multiple key/value pairs with the same key, which would otherwise 1029 # break this test check for repeated keys. 1030 # 1031 # AT_IGNOREPPC = 22 1032 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1033 arch = self.getArchitecture() 1034 ignore_keys = None 1035 if arch in ignored_keys_for_arch: 1036 ignore_keys = ignored_keys_for_arch[arch] 1037 1038 while len(auxv_data) > 0: 1039 # Chop off key. 1040 raw_key = auxv_data[:word_size] 1041 auxv_data = auxv_data[word_size:] 1042 1043 # Chop of value. 1044 raw_value = auxv_data[:word_size] 1045 auxv_data = auxv_data[word_size:] 1046 1047 # Convert raw text from target endian. 1048 key = unpack_endian_binary_string(endian, raw_key) 1049 value = unpack_endian_binary_string(endian, raw_value) 1050 1051 if ignore_keys and key in ignore_keys: 1052 continue 1053 1054 # Handle ending entry. 1055 if key == 0: 1056 self.assertEqual(value, 0) 1057 return auxv_dict 1058 1059 # The key should not already be present. 1060 self.assertFalse(key in auxv_dict) 1061 auxv_dict[key] = value 1062 1063 self.fail( 1064 "should not reach here - implies required double zero entry not found") 1065 return auxv_dict 1066 1067 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1068 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1069 offset = 0 1070 done = False 1071 decoded_data = "" 1072 1073 while not done: 1074 # Grab the next iteration of data. 1075 self.reset_test_sequence() 1076 self.test_sequence.add_log_lines( 1077 [ 1078 "read packet: ${}{:x},{:x}:#00".format( 1079 command_prefix, 1080 offset, 1081 chunk_length), 1082 { 1083 "direction": "send", 1084 "regex": re.compile( 1085 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1086 re.MULTILINE | re.DOTALL), 1087 "capture": { 1088 1: "response_type", 1089 2: "content_raw"}}], 1090 True) 1091 1092 context = self.expect_gdbremote_sequence() 1093 self.assertIsNotNone(context) 1094 1095 response_type = context.get("response_type") 1096 self.assertIsNotNone(response_type) 1097 self.assertTrue(response_type in ["l", "m"]) 1098 1099 # Move offset along. 1100 offset += chunk_length 1101 1102 # Figure out if we're done. We're done if the response type is l. 1103 done = response_type == "l" 1104 1105 # Decode binary data. 1106 content_raw = context.get("content_raw") 1107 if content_raw and len(content_raw) > 0: 1108 self.assertIsNotNone(content_raw) 1109 decoded_data += self.decode_gdbremote_binary(content_raw) 1110 return decoded_data 1111 1112 def add_interrupt_packets(self): 1113 self.test_sequence.add_log_lines([ 1114 # Send the intterupt. 1115 "read packet: {}".format(chr(3)), 1116 # And wait for the stop notification. 1117 {"direction": "send", 1118 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1119 "capture": {1: "stop_signo", 1120 2: "stop_key_val_text"}}, 1121 ], True) 1122 1123 def parse_interrupt_packets(self, context): 1124 self.assertIsNotNone(context.get("stop_signo")) 1125 self.assertIsNotNone(context.get("stop_key_val_text")) 1126 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1127 context["stop_key_val_text"])) 1128 1129 def add_QSaveRegisterState_packets(self, thread_id): 1130 if thread_id: 1131 # Use the thread suffix form. 1132 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1133 thread_id) 1134 else: 1135 request = "read packet: $QSaveRegisterState#00" 1136 1137 self.test_sequence.add_log_lines([request, 1138 {"direction": "send", 1139 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1140 "capture": {1: "save_response"}}, 1141 ], 1142 True) 1143 1144 def parse_QSaveRegisterState_response(self, context): 1145 self.assertIsNotNone(context) 1146 1147 save_response = context.get("save_response") 1148 self.assertIsNotNone(save_response) 1149 1150 if len(save_response) < 1 or save_response[0] == "E": 1151 # error received 1152 return (False, None) 1153 else: 1154 return (True, int(save_response)) 1155 1156 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1157 if thread_id: 1158 # Use the thread suffix form. 1159 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1160 save_id, thread_id) 1161 else: 1162 request = "read packet: $QRestoreRegisterState:{}#00".format( 1163 save_id) 1164 1165 self.test_sequence.add_log_lines([ 1166 request, 1167 "send packet: $OK#00" 1168 ], True) 1169 1170 def flip_all_bits_in_each_register_value( 1171 self, reg_infos, endian, thread_id=None): 1172 self.assertIsNotNone(reg_infos) 1173 1174 successful_writes = 0 1175 failed_writes = 0 1176 1177 for reg_info in reg_infos: 1178 # Use the lldb register index added to the reg info. We're not necessarily 1179 # working off a full set of register infos, so an inferred register 1180 # index could be wrong. 1181 reg_index = reg_info["lldb_register_index"] 1182 self.assertIsNotNone(reg_index) 1183 1184 reg_byte_size = int(reg_info["bitsize"]) // 8 1185 self.assertTrue(reg_byte_size > 0) 1186 1187 # Handle thread suffix. 1188 if thread_id: 1189 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1190 reg_index, thread_id) 1191 else: 1192 p_request = "read packet: $p{:x}#00".format(reg_index) 1193 1194 # Read the existing value. 1195 self.reset_test_sequence() 1196 self.test_sequence.add_log_lines([ 1197 p_request, 1198 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1199 ], True) 1200 context = self.expect_gdbremote_sequence() 1201 self.assertIsNotNone(context) 1202 1203 # Verify the response length. 1204 p_response = context.get("p_response") 1205 self.assertIsNotNone(p_response) 1206 initial_reg_value = unpack_register_hex_unsigned( 1207 endian, p_response) 1208 1209 # Flip the value by xoring with all 1s 1210 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1211 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1212 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1213 1214 # Handle thread suffix for P. 1215 if thread_id: 1216 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1217 reg_index, pack_register_hex( 1218 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1219 else: 1220 P_request = "read packet: $P{:x}={}#00".format( 1221 reg_index, pack_register_hex( 1222 endian, flipped_bits_int, byte_size=reg_byte_size)) 1223 1224 # Write the flipped value to the register. 1225 self.reset_test_sequence() 1226 self.test_sequence.add_log_lines([P_request, 1227 {"direction": "send", 1228 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1229 "capture": {1: "P_response"}}, 1230 ], 1231 True) 1232 context = self.expect_gdbremote_sequence() 1233 self.assertIsNotNone(context) 1234 1235 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1236 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1237 # all flipping perfectly. 1238 P_response = context.get("P_response") 1239 self.assertIsNotNone(P_response) 1240 if P_response == "OK": 1241 successful_writes += 1 1242 else: 1243 failed_writes += 1 1244 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1245 1246 # Read back the register value, ensure it matches the flipped 1247 # value. 1248 if P_response == "OK": 1249 self.reset_test_sequence() 1250 self.test_sequence.add_log_lines([ 1251 p_request, 1252 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1253 ], True) 1254 context = self.expect_gdbremote_sequence() 1255 self.assertIsNotNone(context) 1256 1257 verify_p_response_raw = context.get("p_response") 1258 self.assertIsNotNone(verify_p_response_raw) 1259 verify_bits = unpack_register_hex_unsigned( 1260 endian, verify_p_response_raw) 1261 1262 if verify_bits != flipped_bits_int: 1263 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1264 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1265 successful_writes -= 1 1266 failed_writes += 1 1267 1268 return (successful_writes, failed_writes) 1269 1270 def is_bit_flippable_register(self, reg_info): 1271 if not reg_info: 1272 return False 1273 if not "set" in reg_info: 1274 return False 1275 if reg_info["set"] != "General Purpose Registers": 1276 return False 1277 if ("container-regs" in reg_info) and ( 1278 len(reg_info["container-regs"]) > 0): 1279 # Don't try to bit flip registers contained in another register. 1280 return False 1281 if re.match("^.s$", reg_info["name"]): 1282 # This is a 2-letter register name that ends in "s", like a segment register. 1283 # Don't try to bit flip these. 1284 return False 1285 if re.match("^(c|)psr$", reg_info["name"]): 1286 # This is an ARM program status register; don't flip it. 1287 return False 1288 # Okay, this looks fine-enough. 1289 return True 1290 1291 def read_register_values(self, reg_infos, endian, thread_id=None): 1292 self.assertIsNotNone(reg_infos) 1293 values = {} 1294 1295 for reg_info in reg_infos: 1296 # We append a register index when load reg infos so we can work 1297 # with subsets. 1298 reg_index = reg_info.get("lldb_register_index") 1299 self.assertIsNotNone(reg_index) 1300 1301 # Handle thread suffix. 1302 if thread_id: 1303 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1304 reg_index, thread_id) 1305 else: 1306 p_request = "read packet: $p{:x}#00".format(reg_index) 1307 1308 # Read it with p. 1309 self.reset_test_sequence() 1310 self.test_sequence.add_log_lines([ 1311 p_request, 1312 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1313 ], True) 1314 context = self.expect_gdbremote_sequence() 1315 self.assertIsNotNone(context) 1316 1317 # Convert value from target endian to integral. 1318 p_response = context.get("p_response") 1319 self.assertIsNotNone(p_response) 1320 self.assertTrue(len(p_response) > 0) 1321 self.assertFalse(p_response[0] == "E") 1322 1323 values[reg_index] = unpack_register_hex_unsigned( 1324 endian, p_response) 1325 1326 return values 1327 1328 def add_vCont_query_packets(self): 1329 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1330 {"direction": "send", 1331 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1332 "capture": {2: "vCont_query_response"}}, 1333 ], 1334 True) 1335 1336 def parse_vCont_query_response(self, context): 1337 self.assertIsNotNone(context) 1338 vCont_query_response = context.get("vCont_query_response") 1339 1340 # Handle case of no vCont support at all - in which case the capture 1341 # group will be none or zero length. 1342 if not vCont_query_response or len(vCont_query_response) == 0: 1343 return {} 1344 1345 return {key: 1 for key in vCont_query_response.split( 1346 ";") if key and len(key) > 0} 1347 1348 def count_single_steps_until_true( 1349 self, 1350 thread_id, 1351 predicate, 1352 args, 1353 max_step_count=100, 1354 use_Hc_packet=True, 1355 step_instruction="s"): 1356 """Used by single step test that appears in a few different contexts.""" 1357 single_step_count = 0 1358 1359 while single_step_count < max_step_count: 1360 self.assertIsNotNone(thread_id) 1361 1362 # Build the packet for the single step instruction. We replace 1363 # {thread}, if present, with the thread_id. 1364 step_packet = "read packet: ${}#00".format( 1365 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1366 # print("\nstep_packet created: {}\n".format(step_packet)) 1367 1368 # Single step. 1369 self.reset_test_sequence() 1370 if use_Hc_packet: 1371 self.test_sequence.add_log_lines( 1372 [ # Set the continue thread. 1373 "read packet: $Hc{0:x}#00".format(thread_id), 1374 "send packet: $OK#00", 1375 ], True) 1376 self.test_sequence.add_log_lines([ 1377 # Single step. 1378 step_packet, 1379 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1380 # Expect a breakpoint stop report. 1381 {"direction": "send", 1382 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1383 "capture": {1: "stop_signo", 1384 2: "stop_thread_id"}}, 1385 ], True) 1386 context = self.expect_gdbremote_sequence() 1387 self.assertIsNotNone(context) 1388 self.assertIsNotNone(context.get("stop_signo")) 1389 self.assertEqual(int(context.get("stop_signo"), 16), 1390 lldbutil.get_signal_number('SIGTRAP')) 1391 1392 single_step_count += 1 1393 1394 # See if the predicate is true. If so, we're done. 1395 if predicate(args): 1396 return (True, single_step_count) 1397 1398 # The predicate didn't return true within the runaway step count. 1399 return (False, single_step_count) 1400 1401 def g_c1_c2_contents_are(self, args): 1402 """Used by single step test that appears in a few different contexts.""" 1403 g_c1_address = args["g_c1_address"] 1404 g_c2_address = args["g_c2_address"] 1405 expected_g_c1 = args["expected_g_c1"] 1406 expected_g_c2 = args["expected_g_c2"] 1407 1408 # Read g_c1 and g_c2 contents. 1409 self.reset_test_sequence() 1410 self.test_sequence.add_log_lines( 1411 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1412 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1413 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1414 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1415 True) 1416 1417 # Run the packet stream. 1418 context = self.expect_gdbremote_sequence() 1419 self.assertIsNotNone(context) 1420 1421 # Check if what we read from inferior memory is what we are expecting. 1422 self.assertIsNotNone(context.get("g_c1_contents")) 1423 self.assertIsNotNone(context.get("g_c2_contents")) 1424 1425 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1426 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1427 1428 def single_step_only_steps_one_instruction( 1429 self, use_Hc_packet=True, step_instruction="s"): 1430 """Used by single step test that appears in a few different contexts.""" 1431 # Start up the inferior. 1432 procs = self.prep_debug_monitor_and_inferior( 1433 inferior_args=[ 1434 "get-code-address-hex:swap_chars", 1435 "get-data-address-hex:g_c1", 1436 "get-data-address-hex:g_c2", 1437 "sleep:1", 1438 "call-function:swap_chars", 1439 "sleep:5"]) 1440 1441 # Run the process 1442 self.test_sequence.add_log_lines( 1443 [ # Start running after initial stop. 1444 "read packet: $c#63", 1445 # Match output line that prints the memory address of the function call entry point. 1446 # Note we require launch-only testing so we can get inferior otuput. 1447 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1448 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1449 # Now stop the inferior. 1450 "read packet: {}".format(chr(3)), 1451 # And wait for the stop notification. 1452 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1453 True) 1454 1455 # Run the packet stream. 1456 context = self.expect_gdbremote_sequence() 1457 self.assertIsNotNone(context) 1458 1459 # Grab the main thread id. 1460 self.assertIsNotNone(context.get("stop_thread_id")) 1461 main_thread_id = int(context.get("stop_thread_id"), 16) 1462 1463 # Grab the function address. 1464 self.assertIsNotNone(context.get("function_address")) 1465 function_address = int(context.get("function_address"), 16) 1466 1467 # Grab the data addresses. 1468 self.assertIsNotNone(context.get("g_c1_address")) 1469 g_c1_address = int(context.get("g_c1_address"), 16) 1470 1471 self.assertIsNotNone(context.get("g_c2_address")) 1472 g_c2_address = int(context.get("g_c2_address"), 16) 1473 1474 # Set a breakpoint at the given address. 1475 if self.getArchitecture().startswith("arm"): 1476 # TODO: Handle case when setting breakpoint in thumb code 1477 BREAKPOINT_KIND = 4 1478 else: 1479 BREAKPOINT_KIND = 1 1480 self.reset_test_sequence() 1481 self.add_set_breakpoint_packets( 1482 function_address, 1483 do_continue=True, 1484 breakpoint_kind=BREAKPOINT_KIND) 1485 context = self.expect_gdbremote_sequence() 1486 self.assertIsNotNone(context) 1487 1488 # Remove the breakpoint. 1489 self.reset_test_sequence() 1490 self.add_remove_breakpoint_packets( 1491 function_address, breakpoint_kind=BREAKPOINT_KIND) 1492 context = self.expect_gdbremote_sequence() 1493 self.assertIsNotNone(context) 1494 1495 # Verify g_c1 and g_c2 match expected initial state. 1496 args = {} 1497 args["g_c1_address"] = g_c1_address 1498 args["g_c2_address"] = g_c2_address 1499 args["expected_g_c1"] = "0" 1500 args["expected_g_c2"] = "1" 1501 1502 self.assertTrue(self.g_c1_c2_contents_are(args)) 1503 1504 # Verify we take only a small number of steps to hit the first state. 1505 # Might need to work through function entry prologue code. 1506 args["expected_g_c1"] = "1" 1507 args["expected_g_c2"] = "1" 1508 (state_reached, 1509 step_count) = self.count_single_steps_until_true(main_thread_id, 1510 self.g_c1_c2_contents_are, 1511 args, 1512 max_step_count=25, 1513 use_Hc_packet=use_Hc_packet, 1514 step_instruction=step_instruction) 1515 self.assertTrue(state_reached) 1516 1517 # Verify we hit the next state. 1518 args["expected_g_c1"] = "1" 1519 args["expected_g_c2"] = "0" 1520 (state_reached, 1521 step_count) = self.count_single_steps_until_true(main_thread_id, 1522 self.g_c1_c2_contents_are, 1523 args, 1524 max_step_count=5, 1525 use_Hc_packet=use_Hc_packet, 1526 step_instruction=step_instruction) 1527 self.assertTrue(state_reached) 1528 expected_step_count = 1 1529 arch = self.getArchitecture() 1530 1531 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1532 # of variable value 1533 if re.match("mips", arch): 1534 expected_step_count = 3 1535 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1536 # variable value 1537 if re.match("s390x", arch): 1538 expected_step_count = 2 1539 # ARM64 requires "4" instructions: 2 to compute the address (adrp, add), 1540 # one to materialize the constant (mov) and the store 1541 if re.match("arm64", arch): 1542 expected_step_count = 4 1543 1544 self.assertEqual(step_count, expected_step_count) 1545 1546 # ARM64: Once addresses and constants are materialized, only one 1547 # instruction is needed. 1548 if re.match("arm64", arch): 1549 expected_step_count = 1 1550 1551 # Verify we hit the next state. 1552 args["expected_g_c1"] = "0" 1553 args["expected_g_c2"] = "0" 1554 (state_reached, 1555 step_count) = self.count_single_steps_until_true(main_thread_id, 1556 self.g_c1_c2_contents_are, 1557 args, 1558 max_step_count=5, 1559 use_Hc_packet=use_Hc_packet, 1560 step_instruction=step_instruction) 1561 self.assertTrue(state_reached) 1562 self.assertEqual(step_count, expected_step_count) 1563 1564 # Verify we hit the next state. 1565 args["expected_g_c1"] = "0" 1566 args["expected_g_c2"] = "1" 1567 (state_reached, 1568 step_count) = self.count_single_steps_until_true(main_thread_id, 1569 self.g_c1_c2_contents_are, 1570 args, 1571 max_step_count=5, 1572 use_Hc_packet=use_Hc_packet, 1573 step_instruction=step_instruction) 1574 self.assertTrue(state_reached) 1575 self.assertEqual(step_count, expected_step_count) 1576 1577 def maybe_strict_output_regex(self, regex): 1578 return '.*' + regex + \ 1579 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1580 1581 def install_and_create_launch_args(self): 1582 exe_path = self.getBuildArtifact("a.out") 1583 if not lldb.remote_platform: 1584 return [exe_path] 1585 remote_path = lldbutil.append_to_process_working_directory(self, 1586 os.path.basename(exe_path)) 1587 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1588 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1589 remote_file_spec) 1590 if err.Fail(): 1591 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1592 (exe_path, remote_path, err)) 1593 return [remote_path] 1594