1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def build(self, *args, **kwargs): 166 self.buildDefault(*args, **kwargs) 167 168 def getLocalServerLogFile(self): 169 return self.getLogBasenameForCurrentTest() + "-server.log" 170 171 def setUpServerLogging(self, is_llgs): 172 if len(lldbtest_config.channels) == 0: 173 return # No logging requested 174 175 if lldb.remote_platform: 176 log_file = lldbutil.join_remote_paths( 177 lldb.remote_platform.GetWorkingDirectory(), "server.log") 178 else: 179 log_file = self.getLocalServerLogFile() 180 181 if is_llgs: 182 self.debug_monitor_extra_args.append("--log-file=" + log_file) 183 self.debug_monitor_extra_args.append( 184 "--log-channels={}".format(":".join(lldbtest_config.channels))) 185 else: 186 self.debug_monitor_extra_args = [ 187 "--log-file=" + log_file, "--log-flags=0x800000"] 188 189 def get_next_port(self): 190 return 12000 + random.randint(0, 3999) 191 192 def reset_test_sequence(self): 193 self.test_sequence = GdbRemoteTestSequence(self.logger) 194 195 196 def _init_llgs_test(self): 197 reverse_connect = True 198 if lldb.remote_platform: 199 # Reverse connections may be tricky due to firewalls/NATs. 200 reverse_connect = False 201 202 # FIXME: This is extremely linux-oriented 203 204 # Grab the ppid from /proc/[shell pid]/stat 205 err, retcode, shell_stat = self.run_platform_command( 206 "cat /proc/$$/stat") 207 self.assertTrue( 208 err.Success() and retcode == 0, 209 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 210 (err.GetCString(), 211 retcode)) 212 213 # [pid] ([executable]) [state] [*ppid*] 214 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 215 err, retcode, ls_output = self.run_platform_command( 216 "ls -l /proc/%s/exe" % pid) 217 self.assertTrue( 218 err.Success() and retcode == 0, 219 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 220 (pid, 221 err.GetCString(), 222 retcode)) 223 exe = ls_output.split()[-1] 224 225 # If the binary has been deleted, the link name has " (deleted)" appended. 226 # Remove if it's there. 227 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 228 else: 229 self.debug_monitor_exe = get_lldb_server_exe() 230 231 self.debug_monitor_extra_args = ["gdbserver"] 232 self.setUpServerLogging(is_llgs=True) 233 234 self.reverse_connect = reverse_connect 235 236 def _init_debugserver_test(self): 237 self.debug_monitor_exe = get_debugserver_exe() 238 self.setUpServerLogging(is_llgs=False) 239 self.reverse_connect = True 240 241 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 242 # when the process truly dies. 243 self.stub_sends_two_stop_notifications_on_kill = True 244 245 def forward_adb_port(self, source, target, direction, device): 246 adb = ['adb'] + (['-s', device] if device else []) + [direction] 247 248 def remove_port_forward(): 249 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 250 251 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 252 self.addTearDownHook(remove_port_forward) 253 254 def _verify_socket(self, sock): 255 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 256 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 257 # the connect() will always be successful, but the connection will be immediately dropped 258 # if ADB could not connect on the remote side. This function tries to detect this 259 # situation, and report it as "connection refused" so that the upper layers attempt the 260 # connection again. 261 triple = self.dbg.GetSelectedPlatform().GetTriple() 262 if not re.match(".*-.*-.*-android", triple): 263 return # Not android. 264 can_read, _, _ = select.select([sock], [], [], 0.1) 265 if sock not in can_read: 266 return # Data is not available, but the connection is alive. 267 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 268 raise _ConnectionRefused() # Got EOF, connection dropped. 269 270 def create_socket(self): 271 try: 272 sock = socket.socket(family=socket.AF_INET) 273 except OSError as e: 274 if e.errno != errno.EAFNOSUPPORT: 275 raise 276 sock = socket.socket(family=socket.AF_INET6) 277 278 logger = self.logger 279 280 triple = self.dbg.GetSelectedPlatform().GetTriple() 281 if re.match(".*-.*-.*-android", triple): 282 self.forward_adb_port( 283 self.port, 284 self.port, 285 "forward", 286 self.stub_device) 287 288 logger.info( 289 "Connecting to debug monitor on %s:%d", 290 self.stub_hostname, 291 self.port) 292 connect_info = (self.stub_hostname, self.port) 293 try: 294 sock.connect(connect_info) 295 except socket.error as serr: 296 if serr.errno == errno.ECONNREFUSED: 297 raise _ConnectionRefused() 298 raise serr 299 300 def shutdown_socket(): 301 if sock: 302 try: 303 # send the kill packet so lldb-server shuts down gracefully 304 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 305 except: 306 logger.warning( 307 "failed to send kill packet to debug monitor: {}; ignoring".format( 308 sys.exc_info()[0])) 309 310 try: 311 sock.close() 312 except: 313 logger.warning( 314 "failed to close socket to debug monitor: {}; ignoring".format( 315 sys.exc_info()[0])) 316 317 self.addTearDownHook(shutdown_socket) 318 319 self._verify_socket(sock) 320 321 return sock 322 323 def set_inferior_startup_launch(self): 324 self._inferior_startup = self._STARTUP_LAUNCH 325 326 def set_inferior_startup_attach(self): 327 self._inferior_startup = self._STARTUP_ATTACH 328 329 def set_inferior_startup_attach_manually(self): 330 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 331 332 def get_debug_monitor_command_line_args(self, attach_pid=None): 333 commandline_args = self.debug_monitor_extra_args 334 if attach_pid: 335 commandline_args += ["--attach=%d" % attach_pid] 336 if self.reverse_connect: 337 commandline_args += ["--reverse-connect", self.connect_address] 338 else: 339 if lldb.remote_platform: 340 commandline_args += ["*:{}".format(self.port)] 341 else: 342 commandline_args += ["localhost:{}".format(self.port)] 343 344 return commandline_args 345 346 def get_target_byte_order(self): 347 inferior_exe_path = self.getBuildArtifact("a.out") 348 target = self.dbg.CreateTarget(inferior_exe_path) 349 return target.GetByteOrder() 350 351 def launch_debug_monitor(self, attach_pid=None, logfile=None): 352 if self.reverse_connect: 353 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 354 sock = socket.socket(family, type, proto) 355 sock.settimeout(self.DEFAULT_TIMEOUT) 356 357 sock.bind(addr) 358 sock.listen(1) 359 addr = sock.getsockname() 360 self.connect_address = "[{}]:{}".format(*addr) 361 362 363 # Create the command line. 364 commandline_args = self.get_debug_monitor_command_line_args( 365 attach_pid=attach_pid) 366 367 # Start the server. 368 server = self.spawnSubprocess( 369 self.debug_monitor_exe, 370 commandline_args, 371 install_remote=False) 372 self.assertIsNotNone(server) 373 374 if self.reverse_connect: 375 self.sock = sock.accept()[0] 376 self.sock.settimeout(self.DEFAULT_TIMEOUT) 377 378 return server 379 380 def connect_to_debug_monitor(self, attach_pid=None): 381 if self.reverse_connect: 382 # Create the stub. 383 server = self.launch_debug_monitor(attach_pid=attach_pid) 384 self.assertIsNotNone(server) 385 386 # Schedule debug monitor to be shut down during teardown. 387 logger = self.logger 388 389 self._server = Server(self.sock, server) 390 return server 391 392 # We're using a random port algorithm to try not to collide with other ports, 393 # and retry a max # times. 394 attempts = 0 395 MAX_ATTEMPTS = 20 396 397 while attempts < MAX_ATTEMPTS: 398 server = self.launch_debug_monitor(attach_pid=attach_pid) 399 400 # Schedule debug monitor to be shut down during teardown. 401 logger = self.logger 402 403 connect_attemps = 0 404 MAX_CONNECT_ATTEMPTS = 10 405 406 while connect_attemps < MAX_CONNECT_ATTEMPTS: 407 # Create a socket to talk to the server 408 try: 409 logger.info("Connect attempt %d", connect_attemps + 1) 410 self.sock = self.create_socket() 411 self._server = Server(self.sock, server) 412 return server 413 except _ConnectionRefused as serr: 414 # Ignore, and try again. 415 pass 416 time.sleep(0.5) 417 connect_attemps += 1 418 419 # We should close the server here to be safe. 420 server.terminate() 421 422 # Increment attempts. 423 print( 424 "connect to debug monitor on port %d failed, attempt #%d of %d" % 425 (self.port, attempts + 1, MAX_ATTEMPTS)) 426 attempts += 1 427 428 # And wait a random length of time before next attempt, to avoid 429 # collisions. 430 time.sleep(random.randint(1, 5)) 431 432 # Now grab a new port number. 433 self.port = self.get_next_port() 434 435 raise Exception( 436 "failed to create a socket to the launched debug monitor after %d tries" % 437 attempts) 438 439 def launch_process_for_attach( 440 self, 441 inferior_args=None, 442 sleep_seconds=3, 443 exe_path=None): 444 # We're going to start a child process that the debug monitor stub can later attach to. 445 # This process needs to be started so that it just hangs around for a while. We'll 446 # have it sleep. 447 if not exe_path: 448 exe_path = self.getBuildArtifact("a.out") 449 450 args = [] 451 if inferior_args: 452 args.extend(inferior_args) 453 if sleep_seconds: 454 args.append("sleep:%d" % sleep_seconds) 455 456 return self.spawnSubprocess(exe_path, args) 457 458 def prep_debug_monitor_and_inferior( 459 self, 460 inferior_args=None, 461 inferior_sleep_seconds=3, 462 inferior_exe_path=None, 463 inferior_env=None): 464 """Prep the debug monitor, the inferior, and the expected packet stream. 465 466 Handle the separate cases of using the debug monitor in attach-to-inferior mode 467 and in launch-inferior mode. 468 469 For attach-to-inferior mode, the inferior process is first started, then 470 the debug monitor is started in attach to pid mode (using --attach on the 471 stub command line), and the no-ack-mode setup is appended to the packet 472 stream. The packet stream is not yet executed, ready to have more expected 473 packet entries added to it. 474 475 For launch-inferior mode, the stub is first started, then no ack mode is 476 setup on the expected packet stream, then the verified launch packets are added 477 to the expected socket stream. The packet stream is not yet executed, ready 478 to have more expected packet entries added to it. 479 480 The return value is: 481 {inferior:<inferior>, server:<server>} 482 """ 483 inferior = None 484 attach_pid = None 485 486 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 487 # Launch the process that we'll use as the inferior. 488 inferior = self.launch_process_for_attach( 489 inferior_args=inferior_args, 490 sleep_seconds=inferior_sleep_seconds, 491 exe_path=inferior_exe_path) 492 self.assertIsNotNone(inferior) 493 self.assertTrue(inferior.pid > 0) 494 if self._inferior_startup == self._STARTUP_ATTACH: 495 # In this case, we want the stub to attach via the command 496 # line, so set the command line attach pid here. 497 attach_pid = inferior.pid 498 499 if self._inferior_startup == self._STARTUP_LAUNCH: 500 # Build launch args 501 if not inferior_exe_path: 502 inferior_exe_path = self.getBuildArtifact("a.out") 503 504 if lldb.remote_platform: 505 remote_path = lldbutil.append_to_process_working_directory(self, 506 os.path.basename(inferior_exe_path)) 507 remote_file_spec = lldb.SBFileSpec(remote_path, False) 508 err = lldb.remote_platform.Install(lldb.SBFileSpec( 509 inferior_exe_path, True), remote_file_spec) 510 if err.Fail(): 511 raise Exception( 512 "remote_platform.Install('%s', '%s') failed: %s" % 513 (inferior_exe_path, remote_path, err)) 514 inferior_exe_path = remote_path 515 516 launch_args = [inferior_exe_path] 517 if inferior_args: 518 launch_args.extend(inferior_args) 519 520 # Launch the debug monitor stub, attaching to the inferior. 521 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 522 self.assertIsNotNone(server) 523 524 self.do_handshake() 525 526 # Build the expected protocol stream 527 if inferior_env: 528 for name, value in inferior_env.items(): 529 self.add_set_environment_packets(name, value) 530 if self._inferior_startup == self._STARTUP_LAUNCH: 531 self.add_verified_launch_packets(launch_args) 532 533 return {"inferior": inferior, "server": server} 534 535 def do_handshake(self): 536 server = self._server 537 server.send_ack() 538 server.send_packet(b"QStartNoAckMode") 539 self.assertEqual(server.get_normal_packet(), b"+") 540 self.assertEqual(server.get_normal_packet(), b"OK") 541 server.send_ack() 542 543 def add_verified_launch_packets(self, launch_args): 544 self.test_sequence.add_log_lines( 545 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 546 "send packet: $OK#00", 547 "read packet: $qLaunchSuccess#a5", 548 "send packet: $OK#00"], 549 True) 550 551 def add_thread_suffix_request_packets(self): 552 self.test_sequence.add_log_lines( 553 ["read packet: $QThreadSuffixSupported#e4", 554 "send packet: $OK#00", 555 ], True) 556 557 def add_process_info_collection_packets(self): 558 self.test_sequence.add_log_lines( 559 ["read packet: $qProcessInfo#dc", 560 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 561 True) 562 563 def add_set_environment_packets(self, name, value): 564 self.test_sequence.add_log_lines( 565 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 566 "send packet: $OK#00", 567 ], True) 568 569 _KNOWN_PROCESS_INFO_KEYS = [ 570 "pid", 571 "parent-pid", 572 "real-uid", 573 "real-gid", 574 "effective-uid", 575 "effective-gid", 576 "cputype", 577 "cpusubtype", 578 "ostype", 579 "triple", 580 "vendor", 581 "endian", 582 "elf_abi", 583 "ptrsize" 584 ] 585 586 def parse_process_info_response(self, context): 587 # Ensure we have a process info response. 588 self.assertIsNotNone(context) 589 process_info_raw = context.get("process_info_raw") 590 self.assertIsNotNone(process_info_raw) 591 592 # Pull out key:value; pairs. 593 process_info_dict = { 594 match.group(1): match.group(2) for match in re.finditer( 595 r"([^:]+):([^;]+);", process_info_raw)} 596 597 # Validate keys are known. 598 for (key, val) in list(process_info_dict.items()): 599 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 600 self.assertIsNotNone(val) 601 602 return process_info_dict 603 604 def add_register_info_collection_packets(self): 605 self.test_sequence.add_log_lines( 606 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 607 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 608 "save_key": "reg_info_responses"}], 609 True) 610 611 def parse_register_info_packets(self, context): 612 """Return an array of register info dictionaries, one per register info.""" 613 reg_info_responses = context.get("reg_info_responses") 614 self.assertIsNotNone(reg_info_responses) 615 616 # Parse register infos. 617 return [parse_reg_info_response(reg_info_response) 618 for reg_info_response in reg_info_responses] 619 620 def expect_gdbremote_sequence(self): 621 return expect_lldb_gdbserver_replay( 622 self, 623 self._server, 624 self.test_sequence, 625 self.DEFAULT_TIMEOUT * len(self.test_sequence), 626 self.logger) 627 628 _KNOWN_REGINFO_KEYS = [ 629 "name", 630 "alt-name", 631 "bitsize", 632 "offset", 633 "encoding", 634 "format", 635 "set", 636 "gcc", 637 "ehframe", 638 "dwarf", 639 "generic", 640 "container-regs", 641 "invalidate-regs", 642 "dynamic_size_dwarf_expr_bytes", 643 "dynamic_size_dwarf_len" 644 ] 645 646 def assert_valid_reg_info(self, reg_info): 647 # Assert we know about all the reginfo keys parsed. 648 for key in reg_info: 649 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 650 651 # Check the bare-minimum expected set of register info keys. 652 self.assertTrue("name" in reg_info) 653 self.assertTrue("bitsize" in reg_info) 654 655 if not self.getArchitecture() == 'aarch64': 656 self.assertTrue("offset" in reg_info) 657 658 self.assertTrue("encoding" in reg_info) 659 self.assertTrue("format" in reg_info) 660 661 def find_pc_reg_info(self, reg_infos): 662 lldb_reg_index = 0 663 for reg_info in reg_infos: 664 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 665 return (lldb_reg_index, reg_info) 666 lldb_reg_index += 1 667 668 return (None, None) 669 670 def add_lldb_register_index(self, reg_infos): 671 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 672 673 We'll use this when we want to call packets like P/p with a register index but do so 674 on only a subset of the full register info set. 675 """ 676 self.assertIsNotNone(reg_infos) 677 678 reg_index = 0 679 for reg_info in reg_infos: 680 reg_info["lldb_register_index"] = reg_index 681 reg_index += 1 682 683 def add_query_memory_region_packets(self, address): 684 self.test_sequence.add_log_lines( 685 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 686 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 687 True) 688 689 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 690 self.assertIsNotNone(key_val_text) 691 kv_dict = {} 692 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 693 key = match.group(1) 694 val = match.group(2) 695 if key in kv_dict: 696 if allow_dupes: 697 if isinstance(kv_dict[key], list): 698 kv_dict[key].append(val) 699 else: 700 # Promote to list 701 kv_dict[key] = [kv_dict[key], val] 702 else: 703 self.fail( 704 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 705 key, val, key_val_text, kv_dict)) 706 else: 707 kv_dict[key] = val 708 return kv_dict 709 710 def parse_memory_region_packet(self, context): 711 # Ensure we have a context. 712 self.assertIsNotNone(context.get("memory_region_response")) 713 714 # Pull out key:value; pairs. 715 mem_region_dict = self.parse_key_val_dict( 716 context.get("memory_region_response")) 717 718 # Validate keys are known. 719 for (key, val) in list(mem_region_dict.items()): 720 self.assertIn(key, 721 ["start", 722 "size", 723 "permissions", 724 "flags", 725 "name", 726 "error", 727 "dirty-pages", 728 "type"]) 729 self.assertIsNotNone(val) 730 731 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 732 # Return the dictionary of key-value pairs for the memory region. 733 return mem_region_dict 734 735 def assert_address_within_memory_region( 736 self, test_address, mem_region_dict): 737 self.assertIsNotNone(mem_region_dict) 738 self.assertTrue("start" in mem_region_dict) 739 self.assertTrue("size" in mem_region_dict) 740 741 range_start = int(mem_region_dict["start"], 16) 742 range_size = int(mem_region_dict["size"], 16) 743 range_end = range_start + range_size 744 745 if test_address < range_start: 746 self.fail( 747 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 748 test_address, 749 range_start, 750 range_end, 751 range_size)) 752 elif test_address >= range_end: 753 self.fail( 754 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 755 test_address, 756 range_start, 757 range_end, 758 range_size)) 759 760 def add_threadinfo_collection_packets(self): 761 self.test_sequence.add_log_lines( 762 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 763 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 764 "save_key": "threadinfo_responses"}], 765 True) 766 767 def parse_threadinfo_packets(self, context): 768 """Return an array of thread ids (decimal ints), one per thread.""" 769 threadinfo_responses = context.get("threadinfo_responses") 770 self.assertIsNotNone(threadinfo_responses) 771 772 thread_ids = [] 773 for threadinfo_response in threadinfo_responses: 774 new_thread_infos = parse_threadinfo_response(threadinfo_response) 775 thread_ids.extend(new_thread_infos) 776 return thread_ids 777 778 def wait_for_thread_count(self, thread_count): 779 start_time = time.time() 780 timeout_time = start_time + self.DEFAULT_TIMEOUT 781 782 actual_thread_count = 0 783 while actual_thread_count < thread_count: 784 self.reset_test_sequence() 785 self.add_threadinfo_collection_packets() 786 787 context = self.expect_gdbremote_sequence() 788 self.assertIsNotNone(context) 789 790 threads = self.parse_threadinfo_packets(context) 791 self.assertIsNotNone(threads) 792 793 actual_thread_count = len(threads) 794 795 if time.time() > timeout_time: 796 raise Exception( 797 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 798 self.DEFAULT_TIMEOUT, thread_count, actual_thread_count)) 799 800 return threads 801 802 def add_set_breakpoint_packets( 803 self, 804 address, 805 z_packet_type=0, 806 do_continue=True, 807 breakpoint_kind=1): 808 self.test_sequence.add_log_lines( 809 [ # Set the breakpoint. 810 "read packet: $Z{2},{0:x},{1}#00".format( 811 address, breakpoint_kind, z_packet_type), 812 # Verify the stub could set it. 813 "send packet: $OK#00", 814 ], True) 815 816 if (do_continue): 817 self.test_sequence.add_log_lines( 818 [ # Continue the inferior. 819 "read packet: $c#63", 820 # Expect a breakpoint stop report. 821 {"direction": "send", 822 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 823 "capture": {1: "stop_signo", 824 2: "stop_thread_id"}}, 825 ], True) 826 827 def add_remove_breakpoint_packets( 828 self, 829 address, 830 z_packet_type=0, 831 breakpoint_kind=1): 832 self.test_sequence.add_log_lines( 833 [ # Remove the breakpoint. 834 "read packet: $z{2},{0:x},{1}#00".format( 835 address, breakpoint_kind, z_packet_type), 836 # Verify the stub could unset it. 837 "send packet: $OK#00", 838 ], True) 839 840 def add_qSupported_packets(self, client_features=[]): 841 features = ''.join(';' + x for x in client_features) 842 self.test_sequence.add_log_lines( 843 ["read packet: $qSupported{}#00".format(features), 844 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 845 ], True) 846 847 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 848 "augmented-libraries-svr4-read", 849 "PacketSize", 850 "QStartNoAckMode", 851 "QThreadSuffixSupported", 852 "QListThreadsInStopReply", 853 "qXfer:auxv:read", 854 "qXfer:libraries:read", 855 "qXfer:libraries-svr4:read", 856 "qXfer:features:read", 857 "qEcho", 858 "QPassSignals", 859 "multiprocess", 860 "fork-events", 861 "vfork-events", 862 "memory-tagging", 863 "qSaveCore", 864 ] 865 866 def parse_qSupported_response(self, context): 867 self.assertIsNotNone(context) 868 869 raw_response = context.get("qSupported_response") 870 self.assertIsNotNone(raw_response) 871 872 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 873 # +,-,? is stripped from the key and set as the value. 874 supported_dict = {} 875 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 876 key = match.group(1) 877 val = match.group(3) 878 879 # key=val: store as is 880 if val and len(val) > 0: 881 supported_dict[key] = val 882 else: 883 if len(key) < 2: 884 raise Exception( 885 "singular stub feature is too short: must be stub_feature{+,-,?}") 886 supported_type = key[-1] 887 key = key[:-1] 888 if not supported_type in ["+", "-", "?"]: 889 raise Exception( 890 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 891 supported_dict[key] = supported_type 892 # Ensure we know the supported element 893 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 894 raise Exception( 895 "unknown qSupported stub feature reported: %s" % 896 key) 897 898 return supported_dict 899 900 def run_process_then_stop(self, run_seconds=1): 901 # Tell the stub to continue. 902 self.test_sequence.add_log_lines( 903 ["read packet: $vCont;c#a8"], 904 True) 905 context = self.expect_gdbremote_sequence() 906 907 # Wait for run_seconds. 908 time.sleep(run_seconds) 909 910 # Send an interrupt, capture a T response. 911 self.reset_test_sequence() 912 self.test_sequence.add_log_lines( 913 ["read packet: {}".format(chr(3)), 914 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 915 True) 916 context = self.expect_gdbremote_sequence() 917 self.assertIsNotNone(context) 918 self.assertIsNotNone(context.get("stop_result")) 919 920 return context 921 922 def continue_process_and_wait_for_stop(self): 923 self.test_sequence.add_log_lines( 924 [ 925 "read packet: $vCont;c#a8", 926 { 927 "direction": "send", 928 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 929 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 930 }, 931 ], 932 True, 933 ) 934 context = self.expect_gdbremote_sequence() 935 self.assertIsNotNone(context) 936 return self.parse_interrupt_packets(context) 937 938 def select_modifiable_register(self, reg_infos): 939 """Find a register that can be read/written freely.""" 940 PREFERRED_REGISTER_NAMES = set(["rax", ]) 941 942 # First check for the first register from the preferred register name 943 # set. 944 alternative_register_index = None 945 946 self.assertIsNotNone(reg_infos) 947 for reg_info in reg_infos: 948 if ("name" in reg_info) and ( 949 reg_info["name"] in PREFERRED_REGISTER_NAMES): 950 # We found a preferred register. Use it. 951 return reg_info["lldb_register_index"] 952 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 953 reg_info["generic"] == "arg1"): 954 # A frame pointer or first arg register will do as a 955 # register to modify temporarily. 956 alternative_register_index = reg_info["lldb_register_index"] 957 958 # We didn't find a preferred register. Return whatever alternative register 959 # we found, if any. 960 return alternative_register_index 961 962 def extract_registers_from_stop_notification(self, stop_key_vals_text): 963 self.assertIsNotNone(stop_key_vals_text) 964 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 965 966 registers = {} 967 for (key, val) in list(kv_dict.items()): 968 if re.match(r"^[0-9a-fA-F]+$", key): 969 registers[int(key, 16)] = val 970 return registers 971 972 def gather_register_infos(self): 973 self.reset_test_sequence() 974 self.add_register_info_collection_packets() 975 976 context = self.expect_gdbremote_sequence() 977 self.assertIsNotNone(context) 978 979 reg_infos = self.parse_register_info_packets(context) 980 self.assertIsNotNone(reg_infos) 981 self.add_lldb_register_index(reg_infos) 982 983 return reg_infos 984 985 def find_generic_register_with_name(self, reg_infos, generic_name): 986 self.assertIsNotNone(reg_infos) 987 for reg_info in reg_infos: 988 if ("generic" in reg_info) and ( 989 reg_info["generic"] == generic_name): 990 return reg_info 991 return None 992 993 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 994 self.assertIsNotNone(reg_infos) 995 for reg_info in reg_infos: 996 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 997 return reg_info 998 return None 999 1000 def decode_gdbremote_binary(self, encoded_bytes): 1001 decoded_bytes = "" 1002 i = 0 1003 while i < len(encoded_bytes): 1004 if encoded_bytes[i] == "}": 1005 # Handle escaped char. 1006 self.assertTrue(i + 1 < len(encoded_bytes)) 1007 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1008 i += 2 1009 elif encoded_bytes[i] == "*": 1010 # Handle run length encoding. 1011 self.assertTrue(len(decoded_bytes) > 0) 1012 self.assertTrue(i + 1 < len(encoded_bytes)) 1013 repeat_count = ord(encoded_bytes[i + 1]) - 29 1014 decoded_bytes += decoded_bytes[-1] * repeat_count 1015 i += 2 1016 else: 1017 decoded_bytes += encoded_bytes[i] 1018 i += 1 1019 return decoded_bytes 1020 1021 def build_auxv_dict(self, endian, word_size, auxv_data): 1022 self.assertIsNotNone(endian) 1023 self.assertIsNotNone(word_size) 1024 self.assertIsNotNone(auxv_data) 1025 1026 auxv_dict = {} 1027 1028 # PowerPC64le's auxvec has a special key that must be ignored. 1029 # This special key may be used multiple times, resulting in 1030 # multiple key/value pairs with the same key, which would otherwise 1031 # break this test check for repeated keys. 1032 # 1033 # AT_IGNOREPPC = 22 1034 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1035 arch = self.getArchitecture() 1036 ignore_keys = None 1037 if arch in ignored_keys_for_arch: 1038 ignore_keys = ignored_keys_for_arch[arch] 1039 1040 while len(auxv_data) > 0: 1041 # Chop off key. 1042 raw_key = auxv_data[:word_size] 1043 auxv_data = auxv_data[word_size:] 1044 1045 # Chop of value. 1046 raw_value = auxv_data[:word_size] 1047 auxv_data = auxv_data[word_size:] 1048 1049 # Convert raw text from target endian. 1050 key = unpack_endian_binary_string(endian, raw_key) 1051 value = unpack_endian_binary_string(endian, raw_value) 1052 1053 if ignore_keys and key in ignore_keys: 1054 continue 1055 1056 # Handle ending entry. 1057 if key == 0: 1058 self.assertEqual(value, 0) 1059 return auxv_dict 1060 1061 # The key should not already be present. 1062 self.assertFalse(key in auxv_dict) 1063 auxv_dict[key] = value 1064 1065 self.fail( 1066 "should not reach here - implies required double zero entry not found") 1067 return auxv_dict 1068 1069 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1070 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1071 offset = 0 1072 done = False 1073 decoded_data = "" 1074 1075 while not done: 1076 # Grab the next iteration of data. 1077 self.reset_test_sequence() 1078 self.test_sequence.add_log_lines( 1079 [ 1080 "read packet: ${}{:x},{:x}:#00".format( 1081 command_prefix, 1082 offset, 1083 chunk_length), 1084 { 1085 "direction": "send", 1086 "regex": re.compile( 1087 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1088 re.MULTILINE | re.DOTALL), 1089 "capture": { 1090 1: "response_type", 1091 2: "content_raw"}}], 1092 True) 1093 1094 context = self.expect_gdbremote_sequence() 1095 self.assertIsNotNone(context) 1096 1097 response_type = context.get("response_type") 1098 self.assertIsNotNone(response_type) 1099 self.assertTrue(response_type in ["l", "m"]) 1100 1101 # Move offset along. 1102 offset += chunk_length 1103 1104 # Figure out if we're done. We're done if the response type is l. 1105 done = response_type == "l" 1106 1107 # Decode binary data. 1108 content_raw = context.get("content_raw") 1109 if content_raw and len(content_raw) > 0: 1110 self.assertIsNotNone(content_raw) 1111 decoded_data += self.decode_gdbremote_binary(content_raw) 1112 return decoded_data 1113 1114 def add_interrupt_packets(self): 1115 self.test_sequence.add_log_lines([ 1116 # Send the intterupt. 1117 "read packet: {}".format(chr(3)), 1118 # And wait for the stop notification. 1119 {"direction": "send", 1120 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1121 "capture": {1: "stop_signo", 1122 2: "stop_key_val_text"}}, 1123 ], True) 1124 1125 def parse_interrupt_packets(self, context): 1126 self.assertIsNotNone(context.get("stop_signo")) 1127 self.assertIsNotNone(context.get("stop_key_val_text")) 1128 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1129 context["stop_key_val_text"])) 1130 1131 def add_QSaveRegisterState_packets(self, thread_id): 1132 if thread_id: 1133 # Use the thread suffix form. 1134 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1135 thread_id) 1136 else: 1137 request = "read packet: $QSaveRegisterState#00" 1138 1139 self.test_sequence.add_log_lines([request, 1140 {"direction": "send", 1141 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1142 "capture": {1: "save_response"}}, 1143 ], 1144 True) 1145 1146 def parse_QSaveRegisterState_response(self, context): 1147 self.assertIsNotNone(context) 1148 1149 save_response = context.get("save_response") 1150 self.assertIsNotNone(save_response) 1151 1152 if len(save_response) < 1 or save_response[0] == "E": 1153 # error received 1154 return (False, None) 1155 else: 1156 return (True, int(save_response)) 1157 1158 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1159 if thread_id: 1160 # Use the thread suffix form. 1161 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1162 save_id, thread_id) 1163 else: 1164 request = "read packet: $QRestoreRegisterState:{}#00".format( 1165 save_id) 1166 1167 self.test_sequence.add_log_lines([ 1168 request, 1169 "send packet: $OK#00" 1170 ], True) 1171 1172 def flip_all_bits_in_each_register_value( 1173 self, reg_infos, endian, thread_id=None): 1174 self.assertIsNotNone(reg_infos) 1175 1176 successful_writes = 0 1177 failed_writes = 0 1178 1179 for reg_info in reg_infos: 1180 # Use the lldb register index added to the reg info. We're not necessarily 1181 # working off a full set of register infos, so an inferred register 1182 # index could be wrong. 1183 reg_index = reg_info["lldb_register_index"] 1184 self.assertIsNotNone(reg_index) 1185 1186 reg_byte_size = int(reg_info["bitsize"]) // 8 1187 self.assertTrue(reg_byte_size > 0) 1188 1189 # Handle thread suffix. 1190 if thread_id: 1191 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1192 reg_index, thread_id) 1193 else: 1194 p_request = "read packet: $p{:x}#00".format(reg_index) 1195 1196 # Read the existing value. 1197 self.reset_test_sequence() 1198 self.test_sequence.add_log_lines([ 1199 p_request, 1200 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1201 ], True) 1202 context = self.expect_gdbremote_sequence() 1203 self.assertIsNotNone(context) 1204 1205 # Verify the response length. 1206 p_response = context.get("p_response") 1207 self.assertIsNotNone(p_response) 1208 initial_reg_value = unpack_register_hex_unsigned( 1209 endian, p_response) 1210 1211 # Flip the value by xoring with all 1s 1212 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1213 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1214 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1215 1216 # Handle thread suffix for P. 1217 if thread_id: 1218 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1219 reg_index, pack_register_hex( 1220 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1221 else: 1222 P_request = "read packet: $P{:x}={}#00".format( 1223 reg_index, pack_register_hex( 1224 endian, flipped_bits_int, byte_size=reg_byte_size)) 1225 1226 # Write the flipped value to the register. 1227 self.reset_test_sequence() 1228 self.test_sequence.add_log_lines([P_request, 1229 {"direction": "send", 1230 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1231 "capture": {1: "P_response"}}, 1232 ], 1233 True) 1234 context = self.expect_gdbremote_sequence() 1235 self.assertIsNotNone(context) 1236 1237 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1238 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1239 # all flipping perfectly. 1240 P_response = context.get("P_response") 1241 self.assertIsNotNone(P_response) 1242 if P_response == "OK": 1243 successful_writes += 1 1244 else: 1245 failed_writes += 1 1246 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1247 1248 # Read back the register value, ensure it matches the flipped 1249 # value. 1250 if P_response == "OK": 1251 self.reset_test_sequence() 1252 self.test_sequence.add_log_lines([ 1253 p_request, 1254 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1255 ], True) 1256 context = self.expect_gdbremote_sequence() 1257 self.assertIsNotNone(context) 1258 1259 verify_p_response_raw = context.get("p_response") 1260 self.assertIsNotNone(verify_p_response_raw) 1261 verify_bits = unpack_register_hex_unsigned( 1262 endian, verify_p_response_raw) 1263 1264 if verify_bits != flipped_bits_int: 1265 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1266 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1267 successful_writes -= 1 1268 failed_writes += 1 1269 1270 return (successful_writes, failed_writes) 1271 1272 def is_bit_flippable_register(self, reg_info): 1273 if not reg_info: 1274 return False 1275 if not "set" in reg_info: 1276 return False 1277 if reg_info["set"] != "General Purpose Registers": 1278 return False 1279 if ("container-regs" in reg_info) and ( 1280 len(reg_info["container-regs"]) > 0): 1281 # Don't try to bit flip registers contained in another register. 1282 return False 1283 if re.match("^.s$", reg_info["name"]): 1284 # This is a 2-letter register name that ends in "s", like a segment register. 1285 # Don't try to bit flip these. 1286 return False 1287 if re.match("^(c|)psr$", reg_info["name"]): 1288 # This is an ARM program status register; don't flip it. 1289 return False 1290 # Okay, this looks fine-enough. 1291 return True 1292 1293 def read_register_values(self, reg_infos, endian, thread_id=None): 1294 self.assertIsNotNone(reg_infos) 1295 values = {} 1296 1297 for reg_info in reg_infos: 1298 # We append a register index when load reg infos so we can work 1299 # with subsets. 1300 reg_index = reg_info.get("lldb_register_index") 1301 self.assertIsNotNone(reg_index) 1302 1303 # Handle thread suffix. 1304 if thread_id: 1305 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1306 reg_index, thread_id) 1307 else: 1308 p_request = "read packet: $p{:x}#00".format(reg_index) 1309 1310 # Read it with p. 1311 self.reset_test_sequence() 1312 self.test_sequence.add_log_lines([ 1313 p_request, 1314 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1315 ], True) 1316 context = self.expect_gdbremote_sequence() 1317 self.assertIsNotNone(context) 1318 1319 # Convert value from target endian to integral. 1320 p_response = context.get("p_response") 1321 self.assertIsNotNone(p_response) 1322 self.assertTrue(len(p_response) > 0) 1323 self.assertFalse(p_response[0] == "E") 1324 1325 values[reg_index] = unpack_register_hex_unsigned( 1326 endian, p_response) 1327 1328 return values 1329 1330 def add_vCont_query_packets(self): 1331 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1332 {"direction": "send", 1333 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1334 "capture": {2: "vCont_query_response"}}, 1335 ], 1336 True) 1337 1338 def parse_vCont_query_response(self, context): 1339 self.assertIsNotNone(context) 1340 vCont_query_response = context.get("vCont_query_response") 1341 1342 # Handle case of no vCont support at all - in which case the capture 1343 # group will be none or zero length. 1344 if not vCont_query_response or len(vCont_query_response) == 0: 1345 return {} 1346 1347 return {key: 1 for key in vCont_query_response.split( 1348 ";") if key and len(key) > 0} 1349 1350 def count_single_steps_until_true( 1351 self, 1352 thread_id, 1353 predicate, 1354 args, 1355 max_step_count=100, 1356 use_Hc_packet=True, 1357 step_instruction="s"): 1358 """Used by single step test that appears in a few different contexts.""" 1359 single_step_count = 0 1360 1361 while single_step_count < max_step_count: 1362 self.assertIsNotNone(thread_id) 1363 1364 # Build the packet for the single step instruction. We replace 1365 # {thread}, if present, with the thread_id. 1366 step_packet = "read packet: ${}#00".format( 1367 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1368 # print("\nstep_packet created: {}\n".format(step_packet)) 1369 1370 # Single step. 1371 self.reset_test_sequence() 1372 if use_Hc_packet: 1373 self.test_sequence.add_log_lines( 1374 [ # Set the continue thread. 1375 "read packet: $Hc{0:x}#00".format(thread_id), 1376 "send packet: $OK#00", 1377 ], True) 1378 self.test_sequence.add_log_lines([ 1379 # Single step. 1380 step_packet, 1381 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1382 # Expect a breakpoint stop report. 1383 {"direction": "send", 1384 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1385 "capture": {1: "stop_signo", 1386 2: "stop_thread_id"}}, 1387 ], True) 1388 context = self.expect_gdbremote_sequence() 1389 self.assertIsNotNone(context) 1390 self.assertIsNotNone(context.get("stop_signo")) 1391 self.assertEqual(int(context.get("stop_signo"), 16), 1392 lldbutil.get_signal_number('SIGTRAP')) 1393 1394 single_step_count += 1 1395 1396 # See if the predicate is true. If so, we're done. 1397 if predicate(args): 1398 return (True, single_step_count) 1399 1400 # The predicate didn't return true within the runaway step count. 1401 return (False, single_step_count) 1402 1403 def g_c1_c2_contents_are(self, args): 1404 """Used by single step test that appears in a few different contexts.""" 1405 g_c1_address = args["g_c1_address"] 1406 g_c2_address = args["g_c2_address"] 1407 expected_g_c1 = args["expected_g_c1"] 1408 expected_g_c2 = args["expected_g_c2"] 1409 1410 # Read g_c1 and g_c2 contents. 1411 self.reset_test_sequence() 1412 self.test_sequence.add_log_lines( 1413 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1414 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1415 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1416 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1417 True) 1418 1419 # Run the packet stream. 1420 context = self.expect_gdbremote_sequence() 1421 self.assertIsNotNone(context) 1422 1423 # Check if what we read from inferior memory is what we are expecting. 1424 self.assertIsNotNone(context.get("g_c1_contents")) 1425 self.assertIsNotNone(context.get("g_c2_contents")) 1426 1427 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1428 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1429 1430 def single_step_only_steps_one_instruction( 1431 self, use_Hc_packet=True, step_instruction="s"): 1432 """Used by single step test that appears in a few different contexts.""" 1433 # Start up the inferior. 1434 procs = self.prep_debug_monitor_and_inferior( 1435 inferior_args=[ 1436 "get-code-address-hex:swap_chars", 1437 "get-data-address-hex:g_c1", 1438 "get-data-address-hex:g_c2", 1439 "sleep:1", 1440 "call-function:swap_chars", 1441 "sleep:5"]) 1442 1443 # Run the process 1444 self.test_sequence.add_log_lines( 1445 [ # Start running after initial stop. 1446 "read packet: $c#63", 1447 # Match output line that prints the memory address of the function call entry point. 1448 # Note we require launch-only testing so we can get inferior otuput. 1449 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1450 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1451 # Now stop the inferior. 1452 "read packet: {}".format(chr(3)), 1453 # And wait for the stop notification. 1454 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1455 True) 1456 1457 # Run the packet stream. 1458 context = self.expect_gdbremote_sequence() 1459 self.assertIsNotNone(context) 1460 1461 # Grab the main thread id. 1462 self.assertIsNotNone(context.get("stop_thread_id")) 1463 main_thread_id = int(context.get("stop_thread_id"), 16) 1464 1465 # Grab the function address. 1466 self.assertIsNotNone(context.get("function_address")) 1467 function_address = int(context.get("function_address"), 16) 1468 1469 # Grab the data addresses. 1470 self.assertIsNotNone(context.get("g_c1_address")) 1471 g_c1_address = int(context.get("g_c1_address"), 16) 1472 1473 self.assertIsNotNone(context.get("g_c2_address")) 1474 g_c2_address = int(context.get("g_c2_address"), 16) 1475 1476 # Set a breakpoint at the given address. 1477 if self.getArchitecture().startswith("arm"): 1478 # TODO: Handle case when setting breakpoint in thumb code 1479 BREAKPOINT_KIND = 4 1480 else: 1481 BREAKPOINT_KIND = 1 1482 self.reset_test_sequence() 1483 self.add_set_breakpoint_packets( 1484 function_address, 1485 do_continue=True, 1486 breakpoint_kind=BREAKPOINT_KIND) 1487 context = self.expect_gdbremote_sequence() 1488 self.assertIsNotNone(context) 1489 1490 # Remove the breakpoint. 1491 self.reset_test_sequence() 1492 self.add_remove_breakpoint_packets( 1493 function_address, breakpoint_kind=BREAKPOINT_KIND) 1494 context = self.expect_gdbremote_sequence() 1495 self.assertIsNotNone(context) 1496 1497 # Verify g_c1 and g_c2 match expected initial state. 1498 args = {} 1499 args["g_c1_address"] = g_c1_address 1500 args["g_c2_address"] = g_c2_address 1501 args["expected_g_c1"] = "0" 1502 args["expected_g_c2"] = "1" 1503 1504 self.assertTrue(self.g_c1_c2_contents_are(args)) 1505 1506 # Verify we take only a small number of steps to hit the first state. 1507 # Might need to work through function entry prologue code. 1508 args["expected_g_c1"] = "1" 1509 args["expected_g_c2"] = "1" 1510 (state_reached, 1511 step_count) = self.count_single_steps_until_true(main_thread_id, 1512 self.g_c1_c2_contents_are, 1513 args, 1514 max_step_count=25, 1515 use_Hc_packet=use_Hc_packet, 1516 step_instruction=step_instruction) 1517 self.assertTrue(state_reached) 1518 1519 # Verify we hit the next state. 1520 args["expected_g_c1"] = "1" 1521 args["expected_g_c2"] = "0" 1522 (state_reached, 1523 step_count) = self.count_single_steps_until_true(main_thread_id, 1524 self.g_c1_c2_contents_are, 1525 args, 1526 max_step_count=5, 1527 use_Hc_packet=use_Hc_packet, 1528 step_instruction=step_instruction) 1529 self.assertTrue(state_reached) 1530 expected_step_count = 1 1531 arch = self.getArchitecture() 1532 1533 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1534 # of variable value 1535 if re.match("mips", arch): 1536 expected_step_count = 3 1537 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1538 # variable value 1539 if re.match("s390x", arch): 1540 expected_step_count = 2 1541 # ARM64 requires "4" instructions: 2 to compute the address (adrp, add), 1542 # one to materialize the constant (mov) and the store 1543 if re.match("arm64", arch): 1544 expected_step_count = 4 1545 1546 self.assertEqual(step_count, expected_step_count) 1547 1548 # ARM64: Once addresses and constants are materialized, only one 1549 # instruction is needed. 1550 if re.match("arm64", arch): 1551 expected_step_count = 1 1552 1553 # Verify we hit the next state. 1554 args["expected_g_c1"] = "0" 1555 args["expected_g_c2"] = "0" 1556 (state_reached, 1557 step_count) = self.count_single_steps_until_true(main_thread_id, 1558 self.g_c1_c2_contents_are, 1559 args, 1560 max_step_count=5, 1561 use_Hc_packet=use_Hc_packet, 1562 step_instruction=step_instruction) 1563 self.assertTrue(state_reached) 1564 self.assertEqual(step_count, expected_step_count) 1565 1566 # Verify we hit the next state. 1567 args["expected_g_c1"] = "0" 1568 args["expected_g_c2"] = "1" 1569 (state_reached, 1570 step_count) = self.count_single_steps_until_true(main_thread_id, 1571 self.g_c1_c2_contents_are, 1572 args, 1573 max_step_count=5, 1574 use_Hc_packet=use_Hc_packet, 1575 step_instruction=step_instruction) 1576 self.assertTrue(state_reached) 1577 self.assertEqual(step_count, expected_step_count) 1578 1579 def maybe_strict_output_regex(self, regex): 1580 return '.*' + regex + \ 1581 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1582 1583 def install_and_create_launch_args(self): 1584 exe_path = self.getBuildArtifact("a.out") 1585 if not lldb.remote_platform: 1586 return [exe_path] 1587 remote_path = lldbutil.append_to_process_working_directory(self, 1588 os.path.basename(exe_path)) 1589 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1590 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1591 remote_file_spec) 1592 if err.Fail(): 1593 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1594 (exe_path, remote_path, err)) 1595 return [remote_path] 1596