1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62class GdbRemoteTestCaseBase(Base, metaclass=GdbRemoteTestCaseFactory): 63 64 # Default time out in seconds. The timeout is increased tenfold under Asan. 65 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 66 # Default sleep time in seconds. The sleep time is doubled under Asan. 67 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 68 69 _GDBREMOTE_KILL_PACKET = b"$k#6b" 70 71 # Start the inferior separately, attach to the inferior on the stub 72 # command line. 73 _STARTUP_ATTACH = "attach" 74 # Start the inferior separately, start the stub without attaching, allow 75 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 76 _STARTUP_ATTACH_MANUALLY = "attach_manually" 77 # Start the stub, and launch the inferior with an $A packet via the 78 # initial packet stream. 79 _STARTUP_LAUNCH = "launch" 80 81 # GDB Signal numbers that are not target-specific used for common 82 # exceptions 83 TARGET_EXC_BAD_ACCESS = 0x91 84 TARGET_EXC_BAD_INSTRUCTION = 0x92 85 TARGET_EXC_ARITHMETIC = 0x93 86 TARGET_EXC_EMULATION = 0x94 87 TARGET_EXC_SOFTWARE = 0x95 88 TARGET_EXC_BREAKPOINT = 0x96 89 90 _verbose_log_handler = None 91 _log_formatter = logging.Formatter( 92 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 93 94 def setUpBaseLogging(self): 95 self.logger = logging.getLogger(__name__) 96 97 if len(self.logger.handlers) > 0: 98 return # We have set up this handler already 99 100 self.logger.propagate = False 101 self.logger.setLevel(logging.DEBUG) 102 103 # log all warnings to stderr 104 handler = logging.StreamHandler() 105 handler.setLevel(logging.WARNING) 106 handler.setFormatter(self._log_formatter) 107 self.logger.addHandler(handler) 108 109 def isVerboseLoggingRequested(self): 110 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 111 # logged. 112 return any(("gdb-remote" in channel) 113 for channel in lldbtest_config.channels) 114 115 def getDebugServer(self): 116 method = getattr(self, self.testMethodName) 117 return getattr(method, "debug_server", None) 118 119 def setUp(self): 120 super(GdbRemoteTestCaseBase, self).setUp() 121 122 self.setUpBaseLogging() 123 self.debug_monitor_extra_args = [] 124 125 if self.isVerboseLoggingRequested(): 126 # If requested, full logs go to a log file 127 self._verbose_log_handler = logging.FileHandler( 128 self.getLogBasenameForCurrentTest() + "-host.log") 129 self._verbose_log_handler.setFormatter(self._log_formatter) 130 self._verbose_log_handler.setLevel(logging.DEBUG) 131 self.logger.addHandler(self._verbose_log_handler) 132 133 self.test_sequence = GdbRemoteTestSequence(self.logger) 134 self.set_inferior_startup_launch() 135 self.port = self.get_next_port() 136 self.stub_sends_two_stop_notifications_on_kill = False 137 if configuration.lldb_platform_url: 138 if configuration.lldb_platform_url.startswith('unix-'): 139 url_pattern = '(.+)://\[?(.+?)\]?/.*' 140 else: 141 url_pattern = '(.+)://(.+):\d+' 142 scheme, host = re.match( 143 url_pattern, configuration.lldb_platform_url).groups() 144 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 145 self.stub_device = host 146 self.stub_hostname = 'localhost' 147 else: 148 self.stub_device = None 149 self.stub_hostname = host 150 else: 151 self.stub_hostname = "localhost" 152 153 debug_server = self.getDebugServer() 154 if debug_server == "debugserver": 155 self._init_debugserver_test() 156 else: 157 self._init_llgs_test() 158 159 def tearDown(self): 160 self.logger.removeHandler(self._verbose_log_handler) 161 self._verbose_log_handler = None 162 TestBase.tearDown(self) 163 164 def getLocalServerLogFile(self): 165 return self.getLogBasenameForCurrentTest() + "-server.log" 166 167 def setUpServerLogging(self, is_llgs): 168 if len(lldbtest_config.channels) == 0: 169 return # No logging requested 170 171 if lldb.remote_platform: 172 log_file = lldbutil.join_remote_paths( 173 lldb.remote_platform.GetWorkingDirectory(), "server.log") 174 else: 175 log_file = self.getLocalServerLogFile() 176 177 if is_llgs: 178 self.debug_monitor_extra_args.append("--log-file=" + log_file) 179 self.debug_monitor_extra_args.append( 180 "--log-channels={}".format(":".join(lldbtest_config.channels))) 181 else: 182 self.debug_monitor_extra_args = [ 183 "--log-file=" + log_file, "--log-flags=0x800000"] 184 185 def get_next_port(self): 186 return 12000 + random.randint(0, 3999) 187 188 def reset_test_sequence(self): 189 self.test_sequence = GdbRemoteTestSequence(self.logger) 190 191 192 def _init_llgs_test(self): 193 reverse_connect = True 194 if lldb.remote_platform: 195 # Reverse connections may be tricky due to firewalls/NATs. 196 reverse_connect = False 197 198 # FIXME: This is extremely linux-oriented 199 200 # Grab the ppid from /proc/[shell pid]/stat 201 err, retcode, shell_stat = self.run_platform_command( 202 "cat /proc/$$/stat") 203 self.assertTrue( 204 err.Success() and retcode == 0, 205 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 206 (err.GetCString(), 207 retcode)) 208 209 # [pid] ([executable]) [state] [*ppid*] 210 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 211 err, retcode, ls_output = self.run_platform_command( 212 "ls -l /proc/%s/exe" % pid) 213 self.assertTrue( 214 err.Success() and retcode == 0, 215 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 216 (pid, 217 err.GetCString(), 218 retcode)) 219 exe = ls_output.split()[-1] 220 221 # If the binary has been deleted, the link name has " (deleted)" appended. 222 # Remove if it's there. 223 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 224 else: 225 self.debug_monitor_exe = get_lldb_server_exe() 226 227 self.debug_monitor_extra_args = ["gdbserver"] 228 self.setUpServerLogging(is_llgs=True) 229 230 self.reverse_connect = reverse_connect 231 232 def _init_debugserver_test(self): 233 self.debug_monitor_exe = get_debugserver_exe() 234 self.setUpServerLogging(is_llgs=False) 235 self.reverse_connect = True 236 237 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 238 # when the process truly dies. 239 self.stub_sends_two_stop_notifications_on_kill = True 240 241 def forward_adb_port(self, source, target, direction, device): 242 adb = ['adb'] + (['-s', device] if device else []) + [direction] 243 244 def remove_port_forward(): 245 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 246 247 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 248 self.addTearDownHook(remove_port_forward) 249 250 def _verify_socket(self, sock): 251 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 252 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 253 # the connect() will always be successful, but the connection will be immediately dropped 254 # if ADB could not connect on the remote side. This function tries to detect this 255 # situation, and report it as "connection refused" so that the upper layers attempt the 256 # connection again. 257 triple = self.dbg.GetSelectedPlatform().GetTriple() 258 if not re.match(".*-.*-.*-android", triple): 259 return # Not android. 260 can_read, _, _ = select.select([sock], [], [], 0.1) 261 if sock not in can_read: 262 return # Data is not available, but the connection is alive. 263 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 264 raise _ConnectionRefused() # Got EOF, connection dropped. 265 266 def create_socket(self): 267 try: 268 sock = socket.socket(family=socket.AF_INET) 269 except OSError as e: 270 if e.errno != errno.EAFNOSUPPORT: 271 raise 272 sock = socket.socket(family=socket.AF_INET6) 273 274 logger = self.logger 275 276 triple = self.dbg.GetSelectedPlatform().GetTriple() 277 if re.match(".*-.*-.*-android", triple): 278 self.forward_adb_port( 279 self.port, 280 self.port, 281 "forward", 282 self.stub_device) 283 284 logger.info( 285 "Connecting to debug monitor on %s:%d", 286 self.stub_hostname, 287 self.port) 288 connect_info = (self.stub_hostname, self.port) 289 try: 290 sock.connect(connect_info) 291 except socket.error as serr: 292 if serr.errno == errno.ECONNREFUSED: 293 raise _ConnectionRefused() 294 raise serr 295 296 def shutdown_socket(): 297 if sock: 298 try: 299 # send the kill packet so lldb-server shuts down gracefully 300 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 301 except: 302 logger.warning( 303 "failed to send kill packet to debug monitor: {}; ignoring".format( 304 sys.exc_info()[0])) 305 306 try: 307 sock.close() 308 except: 309 logger.warning( 310 "failed to close socket to debug monitor: {}; ignoring".format( 311 sys.exc_info()[0])) 312 313 self.addTearDownHook(shutdown_socket) 314 315 self._verify_socket(sock) 316 317 return sock 318 319 def set_inferior_startup_launch(self): 320 self._inferior_startup = self._STARTUP_LAUNCH 321 322 def set_inferior_startup_attach(self): 323 self._inferior_startup = self._STARTUP_ATTACH 324 325 def set_inferior_startup_attach_manually(self): 326 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 327 328 def get_debug_monitor_command_line_args(self, attach_pid=None): 329 commandline_args = self.debug_monitor_extra_args 330 if attach_pid: 331 commandline_args += ["--attach=%d" % attach_pid] 332 if self.reverse_connect: 333 commandline_args += ["--reverse-connect", self.connect_address] 334 else: 335 if lldb.remote_platform: 336 commandline_args += ["*:{}".format(self.port)] 337 else: 338 commandline_args += ["localhost:{}".format(self.port)] 339 340 return commandline_args 341 342 def get_target_byte_order(self): 343 inferior_exe_path = self.getBuildArtifact("a.out") 344 target = self.dbg.CreateTarget(inferior_exe_path) 345 return target.GetByteOrder() 346 347 def launch_debug_monitor(self, attach_pid=None, logfile=None): 348 if self.reverse_connect: 349 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 350 sock = socket.socket(family, type, proto) 351 sock.settimeout(self.DEFAULT_TIMEOUT) 352 353 sock.bind(addr) 354 sock.listen(1) 355 addr = sock.getsockname() 356 self.connect_address = "[{}]:{}".format(*addr) 357 358 359 # Create the command line. 360 commandline_args = self.get_debug_monitor_command_line_args( 361 attach_pid=attach_pid) 362 363 # Start the server. 364 server = self.spawnSubprocess( 365 self.debug_monitor_exe, 366 commandline_args, 367 install_remote=False) 368 self.assertIsNotNone(server) 369 370 if self.reverse_connect: 371 self.sock = sock.accept()[0] 372 self.sock.settimeout(self.DEFAULT_TIMEOUT) 373 374 return server 375 376 def connect_to_debug_monitor(self, attach_pid=None): 377 if self.reverse_connect: 378 # Create the stub. 379 server = self.launch_debug_monitor(attach_pid=attach_pid) 380 self.assertIsNotNone(server) 381 382 # Schedule debug monitor to be shut down during teardown. 383 logger = self.logger 384 385 self._server = Server(self.sock, server) 386 return server 387 388 # We're using a random port algorithm to try not to collide with other ports, 389 # and retry a max # times. 390 attempts = 0 391 MAX_ATTEMPTS = 20 392 393 while attempts < MAX_ATTEMPTS: 394 server = self.launch_debug_monitor(attach_pid=attach_pid) 395 396 # Schedule debug monitor to be shut down during teardown. 397 logger = self.logger 398 399 connect_attemps = 0 400 MAX_CONNECT_ATTEMPTS = 10 401 402 while connect_attemps < MAX_CONNECT_ATTEMPTS: 403 # Create a socket to talk to the server 404 try: 405 logger.info("Connect attempt %d", connect_attemps + 1) 406 self.sock = self.create_socket() 407 self._server = Server(self.sock, server) 408 return server 409 except _ConnectionRefused as serr: 410 # Ignore, and try again. 411 pass 412 time.sleep(0.5) 413 connect_attemps += 1 414 415 # We should close the server here to be safe. 416 server.terminate() 417 418 # Increment attempts. 419 print( 420 "connect to debug monitor on port %d failed, attempt #%d of %d" % 421 (self.port, attempts + 1, MAX_ATTEMPTS)) 422 attempts += 1 423 424 # And wait a random length of time before next attempt, to avoid 425 # collisions. 426 time.sleep(random.randint(1, 5)) 427 428 # Now grab a new port number. 429 self.port = self.get_next_port() 430 431 raise Exception( 432 "failed to create a socket to the launched debug monitor after %d tries" % 433 attempts) 434 435 def launch_process_for_attach( 436 self, 437 inferior_args=None, 438 sleep_seconds=3, 439 exe_path=None): 440 # We're going to start a child process that the debug monitor stub can later attach to. 441 # This process needs to be started so that it just hangs around for a while. We'll 442 # have it sleep. 443 if not exe_path: 444 exe_path = self.getBuildArtifact("a.out") 445 446 args = [] 447 if inferior_args: 448 args.extend(inferior_args) 449 if sleep_seconds: 450 args.append("sleep:%d" % sleep_seconds) 451 452 return self.spawnSubprocess(exe_path, args) 453 454 def prep_debug_monitor_and_inferior( 455 self, 456 inferior_args=None, 457 inferior_sleep_seconds=3, 458 inferior_exe_path=None, 459 inferior_env=None): 460 """Prep the debug monitor, the inferior, and the expected packet stream. 461 462 Handle the separate cases of using the debug monitor in attach-to-inferior mode 463 and in launch-inferior mode. 464 465 For attach-to-inferior mode, the inferior process is first started, then 466 the debug monitor is started in attach to pid mode (using --attach on the 467 stub command line), and the no-ack-mode setup is appended to the packet 468 stream. The packet stream is not yet executed, ready to have more expected 469 packet entries added to it. 470 471 For launch-inferior mode, the stub is first started, then no ack mode is 472 setup on the expected packet stream, then the verified launch packets are added 473 to the expected socket stream. The packet stream is not yet executed, ready 474 to have more expected packet entries added to it. 475 476 The return value is: 477 {inferior:<inferior>, server:<server>} 478 """ 479 inferior = None 480 attach_pid = None 481 482 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 483 # Launch the process that we'll use as the inferior. 484 inferior = self.launch_process_for_attach( 485 inferior_args=inferior_args, 486 sleep_seconds=inferior_sleep_seconds, 487 exe_path=inferior_exe_path) 488 self.assertIsNotNone(inferior) 489 self.assertTrue(inferior.pid > 0) 490 if self._inferior_startup == self._STARTUP_ATTACH: 491 # In this case, we want the stub to attach via the command 492 # line, so set the command line attach pid here. 493 attach_pid = inferior.pid 494 495 if self._inferior_startup == self._STARTUP_LAUNCH: 496 # Build launch args 497 if not inferior_exe_path: 498 inferior_exe_path = self.getBuildArtifact("a.out") 499 500 if lldb.remote_platform: 501 remote_path = lldbutil.append_to_process_working_directory(self, 502 os.path.basename(inferior_exe_path)) 503 remote_file_spec = lldb.SBFileSpec(remote_path, False) 504 err = lldb.remote_platform.Install(lldb.SBFileSpec( 505 inferior_exe_path, True), remote_file_spec) 506 if err.Fail(): 507 raise Exception( 508 "remote_platform.Install('%s', '%s') failed: %s" % 509 (inferior_exe_path, remote_path, err)) 510 inferior_exe_path = remote_path 511 512 launch_args = [inferior_exe_path] 513 if inferior_args: 514 launch_args.extend(inferior_args) 515 516 # Launch the debug monitor stub, attaching to the inferior. 517 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 518 self.assertIsNotNone(server) 519 520 self.do_handshake() 521 522 # Build the expected protocol stream 523 if inferior_env: 524 for name, value in inferior_env.items(): 525 self.add_set_environment_packets(name, value) 526 if self._inferior_startup == self._STARTUP_LAUNCH: 527 self.add_verified_launch_packets(launch_args) 528 529 return {"inferior": inferior, "server": server} 530 531 def do_handshake(self): 532 server = self._server 533 server.send_ack() 534 server.send_packet(b"QStartNoAckMode") 535 self.assertEqual(server.get_normal_packet(), b"+") 536 self.assertEqual(server.get_normal_packet(), b"OK") 537 server.send_ack() 538 539 def add_verified_launch_packets(self, launch_args): 540 self.test_sequence.add_log_lines( 541 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 542 "send packet: $OK#00", 543 "read packet: $qLaunchSuccess#a5", 544 "send packet: $OK#00"], 545 True) 546 547 def add_thread_suffix_request_packets(self): 548 self.test_sequence.add_log_lines( 549 ["read packet: $QThreadSuffixSupported#e4", 550 "send packet: $OK#00", 551 ], True) 552 553 def add_process_info_collection_packets(self): 554 self.test_sequence.add_log_lines( 555 ["read packet: $qProcessInfo#dc", 556 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 557 True) 558 559 def add_set_environment_packets(self, name, value): 560 self.test_sequence.add_log_lines( 561 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 562 "send packet: $OK#00", 563 ], True) 564 565 _KNOWN_PROCESS_INFO_KEYS = [ 566 "pid", 567 "parent-pid", 568 "real-uid", 569 "real-gid", 570 "effective-uid", 571 "effective-gid", 572 "cputype", 573 "cpusubtype", 574 "ostype", 575 "triple", 576 "vendor", 577 "endian", 578 "elf_abi", 579 "ptrsize" 580 ] 581 582 def parse_process_info_response(self, context): 583 # Ensure we have a process info response. 584 self.assertIsNotNone(context) 585 process_info_raw = context.get("process_info_raw") 586 self.assertIsNotNone(process_info_raw) 587 588 # Pull out key:value; pairs. 589 process_info_dict = { 590 match.group(1): match.group(2) for match in re.finditer( 591 r"([^:]+):([^;]+);", process_info_raw)} 592 593 # Validate keys are known. 594 for (key, val) in list(process_info_dict.items()): 595 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 596 self.assertIsNotNone(val) 597 598 return process_info_dict 599 600 def add_register_info_collection_packets(self): 601 self.test_sequence.add_log_lines( 602 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 603 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 604 "save_key": "reg_info_responses"}], 605 True) 606 607 def parse_register_info_packets(self, context): 608 """Return an array of register info dictionaries, one per register info.""" 609 reg_info_responses = context.get("reg_info_responses") 610 self.assertIsNotNone(reg_info_responses) 611 612 # Parse register infos. 613 return [parse_reg_info_response(reg_info_response) 614 for reg_info_response in reg_info_responses] 615 616 def expect_gdbremote_sequence(self): 617 return expect_lldb_gdbserver_replay( 618 self, 619 self._server, 620 self.test_sequence, 621 self.DEFAULT_TIMEOUT * len(self.test_sequence), 622 self.logger) 623 624 _KNOWN_REGINFO_KEYS = [ 625 "name", 626 "alt-name", 627 "bitsize", 628 "offset", 629 "encoding", 630 "format", 631 "set", 632 "gcc", 633 "ehframe", 634 "dwarf", 635 "generic", 636 "container-regs", 637 "invalidate-regs", 638 "dynamic_size_dwarf_expr_bytes", 639 "dynamic_size_dwarf_len" 640 ] 641 642 def assert_valid_reg_info(self, reg_info): 643 # Assert we know about all the reginfo keys parsed. 644 for key in reg_info: 645 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 646 647 # Check the bare-minimum expected set of register info keys. 648 self.assertTrue("name" in reg_info) 649 self.assertTrue("bitsize" in reg_info) 650 651 if not self.getArchitecture() == 'aarch64': 652 self.assertTrue("offset" in reg_info) 653 654 self.assertTrue("encoding" in reg_info) 655 self.assertTrue("format" in reg_info) 656 657 def find_pc_reg_info(self, reg_infos): 658 lldb_reg_index = 0 659 for reg_info in reg_infos: 660 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 661 return (lldb_reg_index, reg_info) 662 lldb_reg_index += 1 663 664 return (None, None) 665 666 def add_lldb_register_index(self, reg_infos): 667 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 668 669 We'll use this when we want to call packets like P/p with a register index but do so 670 on only a subset of the full register info set. 671 """ 672 self.assertIsNotNone(reg_infos) 673 674 reg_index = 0 675 for reg_info in reg_infos: 676 reg_info["lldb_register_index"] = reg_index 677 reg_index += 1 678 679 def add_query_memory_region_packets(self, address): 680 self.test_sequence.add_log_lines( 681 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 682 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 683 True) 684 685 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 686 self.assertIsNotNone(key_val_text) 687 kv_dict = {} 688 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 689 key = match.group(1) 690 val = match.group(2) 691 if key in kv_dict: 692 if allow_dupes: 693 if isinstance(kv_dict[key], list): 694 kv_dict[key].append(val) 695 else: 696 # Promote to list 697 kv_dict[key] = [kv_dict[key], val] 698 else: 699 self.fail( 700 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 701 key, val, key_val_text, kv_dict)) 702 else: 703 kv_dict[key] = val 704 return kv_dict 705 706 def parse_memory_region_packet(self, context): 707 # Ensure we have a context. 708 self.assertIsNotNone(context.get("memory_region_response")) 709 710 # Pull out key:value; pairs. 711 mem_region_dict = self.parse_key_val_dict( 712 context.get("memory_region_response")) 713 714 # Validate keys are known. 715 for (key, val) in list(mem_region_dict.items()): 716 self.assertIn(key, 717 ["start", 718 "size", 719 "permissions", 720 "flags", 721 "name", 722 "error", 723 "dirty-pages", 724 "type"]) 725 self.assertIsNotNone(val) 726 727 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 728 # Return the dictionary of key-value pairs for the memory region. 729 return mem_region_dict 730 731 def assert_address_within_memory_region( 732 self, test_address, mem_region_dict): 733 self.assertIsNotNone(mem_region_dict) 734 self.assertTrue("start" in mem_region_dict) 735 self.assertTrue("size" in mem_region_dict) 736 737 range_start = int(mem_region_dict["start"], 16) 738 range_size = int(mem_region_dict["size"], 16) 739 range_end = range_start + range_size 740 741 if test_address < range_start: 742 self.fail( 743 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 744 test_address, 745 range_start, 746 range_end, 747 range_size)) 748 elif test_address >= range_end: 749 self.fail( 750 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 751 test_address, 752 range_start, 753 range_end, 754 range_size)) 755 756 def add_threadinfo_collection_packets(self): 757 self.test_sequence.add_log_lines( 758 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 759 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 760 "save_key": "threadinfo_responses"}], 761 True) 762 763 def parse_threadinfo_packets(self, context): 764 """Return an array of thread ids (decimal ints), one per thread.""" 765 threadinfo_responses = context.get("threadinfo_responses") 766 self.assertIsNotNone(threadinfo_responses) 767 768 thread_ids = [] 769 for threadinfo_response in threadinfo_responses: 770 new_thread_infos = parse_threadinfo_response(threadinfo_response) 771 thread_ids.extend(new_thread_infos) 772 return thread_ids 773 774 def launch_with_threads(self, thread_count): 775 procs = self.prep_debug_monitor_and_inferior( 776 inferior_args=["thread:new"]*(thread_count-1) + ["trap"]) 777 778 self.test_sequence.add_log_lines([ 779 "read packet: $c#00", 780 {"direction": "send", 781 "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$", 782 "capture": {1: "stop_signo", 2: "stop_reply_kv"}}], True) 783 self.add_threadinfo_collection_packets() 784 context = self.expect_gdbremote_sequence() 785 threads = self.parse_threadinfo_packets(context) 786 self.assertGreaterEqual(len(threads), thread_count) 787 return context, threads 788 789 def add_set_breakpoint_packets( 790 self, 791 address, 792 z_packet_type=0, 793 do_continue=True, 794 breakpoint_kind=1): 795 self.test_sequence.add_log_lines( 796 [ # Set the breakpoint. 797 "read packet: $Z{2},{0:x},{1}#00".format( 798 address, breakpoint_kind, z_packet_type), 799 # Verify the stub could set it. 800 "send packet: $OK#00", 801 ], True) 802 803 if (do_continue): 804 self.test_sequence.add_log_lines( 805 [ # Continue the inferior. 806 "read packet: $c#63", 807 # Expect a breakpoint stop report. 808 {"direction": "send", 809 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 810 "capture": {1: "stop_signo", 811 2: "stop_thread_id"}}, 812 ], True) 813 814 def add_remove_breakpoint_packets( 815 self, 816 address, 817 z_packet_type=0, 818 breakpoint_kind=1): 819 self.test_sequence.add_log_lines( 820 [ # Remove the breakpoint. 821 "read packet: $z{2},{0:x},{1}#00".format( 822 address, breakpoint_kind, z_packet_type), 823 # Verify the stub could unset it. 824 "send packet: $OK#00", 825 ], True) 826 827 def add_qSupported_packets(self, client_features=[]): 828 features = ''.join(';' + x for x in client_features) 829 self.test_sequence.add_log_lines( 830 ["read packet: $qSupported{}#00".format(features), 831 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 832 ], True) 833 834 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 835 "augmented-libraries-svr4-read", 836 "PacketSize", 837 "QStartNoAckMode", 838 "QThreadSuffixSupported", 839 "QListThreadsInStopReply", 840 "qXfer:auxv:read", 841 "qXfer:libraries:read", 842 "qXfer:libraries-svr4:read", 843 "qXfer:features:read", 844 "qXfer:siginfo:read", 845 "qEcho", 846 "QPassSignals", 847 "multiprocess", 848 "fork-events", 849 "vfork-events", 850 "memory-tagging", 851 "qSaveCore", 852 "native-signals", 853 "QNonStop", 854 ] 855 856 def parse_qSupported_response(self, context): 857 self.assertIsNotNone(context) 858 859 raw_response = context.get("qSupported_response") 860 self.assertIsNotNone(raw_response) 861 862 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 863 # +,-,? is stripped from the key and set as the value. 864 supported_dict = {} 865 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 866 key = match.group(1) 867 val = match.group(3) 868 869 # key=val: store as is 870 if val and len(val) > 0: 871 supported_dict[key] = val 872 else: 873 if len(key) < 2: 874 raise Exception( 875 "singular stub feature is too short: must be stub_feature{+,-,?}") 876 supported_type = key[-1] 877 key = key[:-1] 878 if not supported_type in ["+", "-", "?"]: 879 raise Exception( 880 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 881 supported_dict[key] = supported_type 882 # Ensure we know the supported element 883 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 884 raise Exception( 885 "unknown qSupported stub feature reported: %s" % 886 key) 887 888 return supported_dict 889 890 def continue_process_and_wait_for_stop(self): 891 self.test_sequence.add_log_lines( 892 [ 893 "read packet: $vCont;c#a8", 894 { 895 "direction": "send", 896 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 897 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 898 }, 899 ], 900 True, 901 ) 902 context = self.expect_gdbremote_sequence() 903 self.assertIsNotNone(context) 904 return self.parse_interrupt_packets(context) 905 906 def select_modifiable_register(self, reg_infos): 907 """Find a register that can be read/written freely.""" 908 PREFERRED_REGISTER_NAMES = set(["rax", ]) 909 910 # First check for the first register from the preferred register name 911 # set. 912 alternative_register_index = None 913 914 self.assertIsNotNone(reg_infos) 915 for reg_info in reg_infos: 916 if ("name" in reg_info) and ( 917 reg_info["name"] in PREFERRED_REGISTER_NAMES): 918 # We found a preferred register. Use it. 919 return reg_info["lldb_register_index"] 920 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 921 reg_info["generic"] == "arg1"): 922 # A frame pointer or first arg register will do as a 923 # register to modify temporarily. 924 alternative_register_index = reg_info["lldb_register_index"] 925 926 # We didn't find a preferred register. Return whatever alternative register 927 # we found, if any. 928 return alternative_register_index 929 930 def extract_registers_from_stop_notification(self, stop_key_vals_text): 931 self.assertIsNotNone(stop_key_vals_text) 932 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 933 934 registers = {} 935 for (key, val) in list(kv_dict.items()): 936 if re.match(r"^[0-9a-fA-F]+$", key): 937 registers[int(key, 16)] = val 938 return registers 939 940 def gather_register_infos(self): 941 self.reset_test_sequence() 942 self.add_register_info_collection_packets() 943 944 context = self.expect_gdbremote_sequence() 945 self.assertIsNotNone(context) 946 947 reg_infos = self.parse_register_info_packets(context) 948 self.assertIsNotNone(reg_infos) 949 self.add_lldb_register_index(reg_infos) 950 951 return reg_infos 952 953 def find_generic_register_with_name(self, reg_infos, generic_name): 954 self.assertIsNotNone(reg_infos) 955 for reg_info in reg_infos: 956 if ("generic" in reg_info) and ( 957 reg_info["generic"] == generic_name): 958 return reg_info 959 return None 960 961 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 962 self.assertIsNotNone(reg_infos) 963 for reg_info in reg_infos: 964 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 965 return reg_info 966 return None 967 968 def decode_gdbremote_binary(self, encoded_bytes): 969 decoded_bytes = "" 970 i = 0 971 while i < len(encoded_bytes): 972 if encoded_bytes[i] == "}": 973 # Handle escaped char. 974 self.assertTrue(i + 1 < len(encoded_bytes)) 975 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 976 i += 2 977 elif encoded_bytes[i] == "*": 978 # Handle run length encoding. 979 self.assertTrue(len(decoded_bytes) > 0) 980 self.assertTrue(i + 1 < len(encoded_bytes)) 981 repeat_count = ord(encoded_bytes[i + 1]) - 29 982 decoded_bytes += decoded_bytes[-1] * repeat_count 983 i += 2 984 else: 985 decoded_bytes += encoded_bytes[i] 986 i += 1 987 return decoded_bytes 988 989 def build_auxv_dict(self, endian, word_size, auxv_data): 990 self.assertIsNotNone(endian) 991 self.assertIsNotNone(word_size) 992 self.assertIsNotNone(auxv_data) 993 994 auxv_dict = {} 995 996 # PowerPC64le's auxvec has a special key that must be ignored. 997 # This special key may be used multiple times, resulting in 998 # multiple key/value pairs with the same key, which would otherwise 999 # break this test check for repeated keys. 1000 # 1001 # AT_IGNOREPPC = 22 1002 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1003 arch = self.getArchitecture() 1004 ignore_keys = None 1005 if arch in ignored_keys_for_arch: 1006 ignore_keys = ignored_keys_for_arch[arch] 1007 1008 while len(auxv_data) > 0: 1009 # Chop off key. 1010 raw_key = auxv_data[:word_size] 1011 auxv_data = auxv_data[word_size:] 1012 1013 # Chop of value. 1014 raw_value = auxv_data[:word_size] 1015 auxv_data = auxv_data[word_size:] 1016 1017 # Convert raw text from target endian. 1018 key = unpack_endian_binary_string(endian, raw_key) 1019 value = unpack_endian_binary_string(endian, raw_value) 1020 1021 if ignore_keys and key in ignore_keys: 1022 continue 1023 1024 # Handle ending entry. 1025 if key == 0: 1026 self.assertEqual(value, 0) 1027 return auxv_dict 1028 1029 # The key should not already be present. 1030 self.assertFalse(key in auxv_dict) 1031 auxv_dict[key] = value 1032 1033 self.fail( 1034 "should not reach here - implies required double zero entry not found") 1035 return auxv_dict 1036 1037 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1038 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1039 offset = 0 1040 done = False 1041 decoded_data = "" 1042 1043 while not done: 1044 # Grab the next iteration of data. 1045 self.reset_test_sequence() 1046 self.test_sequence.add_log_lines( 1047 [ 1048 "read packet: ${}{:x},{:x}:#00".format( 1049 command_prefix, 1050 offset, 1051 chunk_length), 1052 { 1053 "direction": "send", 1054 "regex": re.compile( 1055 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1056 re.MULTILINE | re.DOTALL), 1057 "capture": { 1058 1: "response_type", 1059 2: "content_raw"}}], 1060 True) 1061 1062 context = self.expect_gdbremote_sequence() 1063 self.assertIsNotNone(context) 1064 1065 response_type = context.get("response_type") 1066 self.assertIsNotNone(response_type) 1067 self.assertTrue(response_type in ["l", "m"]) 1068 1069 # Move offset along. 1070 offset += chunk_length 1071 1072 # Figure out if we're done. We're done if the response type is l. 1073 done = response_type == "l" 1074 1075 # Decode binary data. 1076 content_raw = context.get("content_raw") 1077 if content_raw and len(content_raw) > 0: 1078 self.assertIsNotNone(content_raw) 1079 decoded_data += self.decode_gdbremote_binary(content_raw) 1080 return decoded_data 1081 1082 def add_interrupt_packets(self): 1083 self.test_sequence.add_log_lines([ 1084 # Send the intterupt. 1085 "read packet: {}".format(chr(3)), 1086 # And wait for the stop notification. 1087 {"direction": "send", 1088 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1089 "capture": {1: "stop_signo", 1090 2: "stop_key_val_text"}}, 1091 ], True) 1092 1093 def parse_interrupt_packets(self, context): 1094 self.assertIsNotNone(context.get("stop_signo")) 1095 self.assertIsNotNone(context.get("stop_key_val_text")) 1096 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1097 context["stop_key_val_text"])) 1098 1099 def add_QSaveRegisterState_packets(self, thread_id): 1100 if thread_id: 1101 # Use the thread suffix form. 1102 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1103 thread_id) 1104 else: 1105 request = "read packet: $QSaveRegisterState#00" 1106 1107 self.test_sequence.add_log_lines([request, 1108 {"direction": "send", 1109 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1110 "capture": {1: "save_response"}}, 1111 ], 1112 True) 1113 1114 def parse_QSaveRegisterState_response(self, context): 1115 self.assertIsNotNone(context) 1116 1117 save_response = context.get("save_response") 1118 self.assertIsNotNone(save_response) 1119 1120 if len(save_response) < 1 or save_response[0] == "E": 1121 # error received 1122 return (False, None) 1123 else: 1124 return (True, int(save_response)) 1125 1126 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1127 if thread_id: 1128 # Use the thread suffix form. 1129 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1130 save_id, thread_id) 1131 else: 1132 request = "read packet: $QRestoreRegisterState:{}#00".format( 1133 save_id) 1134 1135 self.test_sequence.add_log_lines([ 1136 request, 1137 "send packet: $OK#00" 1138 ], True) 1139 1140 def flip_all_bits_in_each_register_value( 1141 self, reg_infos, endian, thread_id=None): 1142 self.assertIsNotNone(reg_infos) 1143 1144 successful_writes = 0 1145 failed_writes = 0 1146 1147 for reg_info in reg_infos: 1148 # Use the lldb register index added to the reg info. We're not necessarily 1149 # working off a full set of register infos, so an inferred register 1150 # index could be wrong. 1151 reg_index = reg_info["lldb_register_index"] 1152 self.assertIsNotNone(reg_index) 1153 1154 reg_byte_size = int(reg_info["bitsize"]) // 8 1155 self.assertTrue(reg_byte_size > 0) 1156 1157 # Handle thread suffix. 1158 if thread_id: 1159 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1160 reg_index, thread_id) 1161 else: 1162 p_request = "read packet: $p{:x}#00".format(reg_index) 1163 1164 # Read the existing value. 1165 self.reset_test_sequence() 1166 self.test_sequence.add_log_lines([ 1167 p_request, 1168 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1169 ], True) 1170 context = self.expect_gdbremote_sequence() 1171 self.assertIsNotNone(context) 1172 1173 # Verify the response length. 1174 p_response = context.get("p_response") 1175 self.assertIsNotNone(p_response) 1176 initial_reg_value = unpack_register_hex_unsigned( 1177 endian, p_response) 1178 1179 # Flip the value by xoring with all 1s 1180 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1181 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1182 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1183 1184 # Handle thread suffix for P. 1185 if thread_id: 1186 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1187 reg_index, pack_register_hex( 1188 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1189 else: 1190 P_request = "read packet: $P{:x}={}#00".format( 1191 reg_index, pack_register_hex( 1192 endian, flipped_bits_int, byte_size=reg_byte_size)) 1193 1194 # Write the flipped value to the register. 1195 self.reset_test_sequence() 1196 self.test_sequence.add_log_lines([P_request, 1197 {"direction": "send", 1198 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1199 "capture": {1: "P_response"}}, 1200 ], 1201 True) 1202 context = self.expect_gdbremote_sequence() 1203 self.assertIsNotNone(context) 1204 1205 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1206 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1207 # all flipping perfectly. 1208 P_response = context.get("P_response") 1209 self.assertIsNotNone(P_response) 1210 if P_response == "OK": 1211 successful_writes += 1 1212 else: 1213 failed_writes += 1 1214 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1215 1216 # Read back the register value, ensure it matches the flipped 1217 # value. 1218 if P_response == "OK": 1219 self.reset_test_sequence() 1220 self.test_sequence.add_log_lines([ 1221 p_request, 1222 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1223 ], True) 1224 context = self.expect_gdbremote_sequence() 1225 self.assertIsNotNone(context) 1226 1227 verify_p_response_raw = context.get("p_response") 1228 self.assertIsNotNone(verify_p_response_raw) 1229 verify_bits = unpack_register_hex_unsigned( 1230 endian, verify_p_response_raw) 1231 1232 if verify_bits != flipped_bits_int: 1233 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1234 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1235 successful_writes -= 1 1236 failed_writes += 1 1237 1238 return (successful_writes, failed_writes) 1239 1240 def is_bit_flippable_register(self, reg_info): 1241 if not reg_info: 1242 return False 1243 if not "set" in reg_info: 1244 return False 1245 if reg_info["set"] != "General Purpose Registers": 1246 return False 1247 if ("container-regs" in reg_info) and ( 1248 len(reg_info["container-regs"]) > 0): 1249 # Don't try to bit flip registers contained in another register. 1250 return False 1251 if re.match("^.s$", reg_info["name"]): 1252 # This is a 2-letter register name that ends in "s", like a segment register. 1253 # Don't try to bit flip these. 1254 return False 1255 if re.match("^(c|)psr$", reg_info["name"]): 1256 # This is an ARM program status register; don't flip it. 1257 return False 1258 # Okay, this looks fine-enough. 1259 return True 1260 1261 def read_register_values(self, reg_infos, endian, thread_id=None): 1262 self.assertIsNotNone(reg_infos) 1263 values = {} 1264 1265 for reg_info in reg_infos: 1266 # We append a register index when load reg infos so we can work 1267 # with subsets. 1268 reg_index = reg_info.get("lldb_register_index") 1269 self.assertIsNotNone(reg_index) 1270 1271 # Handle thread suffix. 1272 if thread_id: 1273 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1274 reg_index, thread_id) 1275 else: 1276 p_request = "read packet: $p{:x}#00".format(reg_index) 1277 1278 # Read it with p. 1279 self.reset_test_sequence() 1280 self.test_sequence.add_log_lines([ 1281 p_request, 1282 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1283 ], True) 1284 context = self.expect_gdbremote_sequence() 1285 self.assertIsNotNone(context) 1286 1287 # Convert value from target endian to integral. 1288 p_response = context.get("p_response") 1289 self.assertIsNotNone(p_response) 1290 self.assertTrue(len(p_response) > 0) 1291 self.assertFalse(p_response[0] == "E") 1292 1293 values[reg_index] = unpack_register_hex_unsigned( 1294 endian, p_response) 1295 1296 return values 1297 1298 def add_vCont_query_packets(self): 1299 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1300 {"direction": "send", 1301 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1302 "capture": {2: "vCont_query_response"}}, 1303 ], 1304 True) 1305 1306 def parse_vCont_query_response(self, context): 1307 self.assertIsNotNone(context) 1308 vCont_query_response = context.get("vCont_query_response") 1309 1310 # Handle case of no vCont support at all - in which case the capture 1311 # group will be none or zero length. 1312 if not vCont_query_response or len(vCont_query_response) == 0: 1313 return {} 1314 1315 return {key: 1 for key in vCont_query_response.split( 1316 ";") if key and len(key) > 0} 1317 1318 def count_single_steps_until_true( 1319 self, 1320 thread_id, 1321 predicate, 1322 args, 1323 max_step_count=100, 1324 use_Hc_packet=True, 1325 step_instruction="s"): 1326 """Used by single step test that appears in a few different contexts.""" 1327 single_step_count = 0 1328 1329 while single_step_count < max_step_count: 1330 self.assertIsNotNone(thread_id) 1331 1332 # Build the packet for the single step instruction. We replace 1333 # {thread}, if present, with the thread_id. 1334 step_packet = "read packet: ${}#00".format( 1335 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1336 # print("\nstep_packet created: {}\n".format(step_packet)) 1337 1338 # Single step. 1339 self.reset_test_sequence() 1340 if use_Hc_packet: 1341 self.test_sequence.add_log_lines( 1342 [ # Set the continue thread. 1343 "read packet: $Hc{0:x}#00".format(thread_id), 1344 "send packet: $OK#00", 1345 ], True) 1346 self.test_sequence.add_log_lines([ 1347 # Single step. 1348 step_packet, 1349 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1350 # Expect a breakpoint stop report. 1351 {"direction": "send", 1352 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1353 "capture": {1: "stop_signo", 1354 2: "stop_thread_id"}}, 1355 ], True) 1356 context = self.expect_gdbremote_sequence() 1357 self.assertIsNotNone(context) 1358 self.assertIsNotNone(context.get("stop_signo")) 1359 self.assertEqual(int(context.get("stop_signo"), 16), 1360 lldbutil.get_signal_number('SIGTRAP')) 1361 1362 single_step_count += 1 1363 1364 # See if the predicate is true. If so, we're done. 1365 if predicate(args): 1366 return (True, single_step_count) 1367 1368 # The predicate didn't return true within the runaway step count. 1369 return (False, single_step_count) 1370 1371 def g_c1_c2_contents_are(self, args): 1372 """Used by single step test that appears in a few different contexts.""" 1373 g_c1_address = args["g_c1_address"] 1374 g_c2_address = args["g_c2_address"] 1375 expected_g_c1 = args["expected_g_c1"] 1376 expected_g_c2 = args["expected_g_c2"] 1377 1378 # Read g_c1 and g_c2 contents. 1379 self.reset_test_sequence() 1380 self.test_sequence.add_log_lines( 1381 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1382 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1383 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1384 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1385 True) 1386 1387 # Run the packet stream. 1388 context = self.expect_gdbremote_sequence() 1389 self.assertIsNotNone(context) 1390 1391 # Check if what we read from inferior memory is what we are expecting. 1392 self.assertIsNotNone(context.get("g_c1_contents")) 1393 self.assertIsNotNone(context.get("g_c2_contents")) 1394 1395 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1396 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1397 1398 def single_step_only_steps_one_instruction( 1399 self, use_Hc_packet=True, step_instruction="s"): 1400 """Used by single step test that appears in a few different contexts.""" 1401 # Start up the inferior. 1402 procs = self.prep_debug_monitor_and_inferior( 1403 inferior_args=[ 1404 "get-code-address-hex:swap_chars", 1405 "get-data-address-hex:g_c1", 1406 "get-data-address-hex:g_c2", 1407 "sleep:1", 1408 "call-function:swap_chars", 1409 "sleep:5"]) 1410 1411 # Run the process 1412 self.test_sequence.add_log_lines( 1413 [ # Start running after initial stop. 1414 "read packet: $c#63", 1415 # Match output line that prints the memory address of the function call entry point. 1416 # Note we require launch-only testing so we can get inferior otuput. 1417 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1418 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1419 # Now stop the inferior. 1420 "read packet: {}".format(chr(3)), 1421 # And wait for the stop notification. 1422 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1423 True) 1424 1425 # Run the packet stream. 1426 context = self.expect_gdbremote_sequence() 1427 self.assertIsNotNone(context) 1428 1429 # Grab the main thread id. 1430 self.assertIsNotNone(context.get("stop_thread_id")) 1431 main_thread_id = int(context.get("stop_thread_id"), 16) 1432 1433 # Grab the function address. 1434 self.assertIsNotNone(context.get("function_address")) 1435 function_address = int(context.get("function_address"), 16) 1436 1437 # Grab the data addresses. 1438 self.assertIsNotNone(context.get("g_c1_address")) 1439 g_c1_address = int(context.get("g_c1_address"), 16) 1440 1441 self.assertIsNotNone(context.get("g_c2_address")) 1442 g_c2_address = int(context.get("g_c2_address"), 16) 1443 1444 # Set a breakpoint at the given address. 1445 if self.getArchitecture().startswith("arm"): 1446 # TODO: Handle case when setting breakpoint in thumb code 1447 BREAKPOINT_KIND = 4 1448 else: 1449 BREAKPOINT_KIND = 1 1450 self.reset_test_sequence() 1451 self.add_set_breakpoint_packets( 1452 function_address, 1453 do_continue=True, 1454 breakpoint_kind=BREAKPOINT_KIND) 1455 context = self.expect_gdbremote_sequence() 1456 self.assertIsNotNone(context) 1457 1458 # Remove the breakpoint. 1459 self.reset_test_sequence() 1460 self.add_remove_breakpoint_packets( 1461 function_address, breakpoint_kind=BREAKPOINT_KIND) 1462 context = self.expect_gdbremote_sequence() 1463 self.assertIsNotNone(context) 1464 1465 # Verify g_c1 and g_c2 match expected initial state. 1466 args = {} 1467 args["g_c1_address"] = g_c1_address 1468 args["g_c2_address"] = g_c2_address 1469 args["expected_g_c1"] = "0" 1470 args["expected_g_c2"] = "1" 1471 1472 self.assertTrue(self.g_c1_c2_contents_are(args)) 1473 1474 # Verify we take only a small number of steps to hit the first state. 1475 # Might need to work through function entry prologue code. 1476 args["expected_g_c1"] = "1" 1477 args["expected_g_c2"] = "1" 1478 (state_reached, 1479 step_count) = self.count_single_steps_until_true(main_thread_id, 1480 self.g_c1_c2_contents_are, 1481 args, 1482 max_step_count=25, 1483 use_Hc_packet=use_Hc_packet, 1484 step_instruction=step_instruction) 1485 self.assertTrue(state_reached) 1486 1487 # Verify we hit the next state. 1488 args["expected_g_c1"] = "1" 1489 args["expected_g_c2"] = "0" 1490 (state_reached, 1491 step_count) = self.count_single_steps_until_true(main_thread_id, 1492 self.g_c1_c2_contents_are, 1493 args, 1494 max_step_count=5, 1495 use_Hc_packet=use_Hc_packet, 1496 step_instruction=step_instruction) 1497 self.assertTrue(state_reached) 1498 expected_step_count = 1 1499 arch = self.getArchitecture() 1500 1501 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1502 # of variable value 1503 if re.match("mips", arch): 1504 expected_step_count = 3 1505 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1506 # variable value 1507 if re.match("s390x", arch): 1508 expected_step_count = 2 1509 # ARM64 requires "4" instructions: 2 to compute the address (adrp, 1510 # add), one to materialize the constant (mov) and the store. Once 1511 # addresses and constants are materialized, only one instruction is 1512 # needed. 1513 if re.match("arm64", arch): 1514 before_materialization_step_count = 4 1515 after_matrialization_step_count = 1 1516 self.assertIn(step_count, [before_materialization_step_count, 1517 after_matrialization_step_count]) 1518 expected_step_count = after_matrialization_step_count 1519 else: 1520 self.assertEqual(step_count, expected_step_count) 1521 1522 # Verify we hit the next state. 1523 args["expected_g_c1"] = "0" 1524 args["expected_g_c2"] = "0" 1525 (state_reached, 1526 step_count) = self.count_single_steps_until_true(main_thread_id, 1527 self.g_c1_c2_contents_are, 1528 args, 1529 max_step_count=5, 1530 use_Hc_packet=use_Hc_packet, 1531 step_instruction=step_instruction) 1532 self.assertTrue(state_reached) 1533 self.assertEqual(step_count, expected_step_count) 1534 1535 # Verify we hit the next state. 1536 args["expected_g_c1"] = "0" 1537 args["expected_g_c2"] = "1" 1538 (state_reached, 1539 step_count) = self.count_single_steps_until_true(main_thread_id, 1540 self.g_c1_c2_contents_are, 1541 args, 1542 max_step_count=5, 1543 use_Hc_packet=use_Hc_packet, 1544 step_instruction=step_instruction) 1545 self.assertTrue(state_reached) 1546 self.assertEqual(step_count, expected_step_count) 1547 1548 def maybe_strict_output_regex(self, regex): 1549 return '.*' + regex + \ 1550 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1551 1552 def install_and_create_launch_args(self): 1553 exe_path = self.getBuildArtifact("a.out") 1554 if not lldb.remote_platform: 1555 return [exe_path] 1556 remote_path = lldbutil.append_to_process_working_directory(self, 1557 os.path.basename(exe_path)) 1558 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1559 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1560 remote_file_spec) 1561 if err.Fail(): 1562 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1563 (exe_path, remote_path, err)) 1564 return [remote_path] 1565