1""" 2Base class for gdb-remote test cases. 3""" 4 5from __future__ import division, print_function 6 7 8import errno 9import os 10import os.path 11import random 12import re 13import select 14import socket 15import subprocess 16import sys 17import tempfile 18import time 19from lldbsuite.test import configuration 20from lldbsuite.test.lldbtest import * 21from lldbsuite.support import seven 22from lldbgdbserverutils import * 23import logging 24 25 26class _ConnectionRefused(IOError): 27 pass 28 29 30class GdbRemoteTestCaseFactory(type): 31 32 def __new__(cls, name, bases, attrs): 33 newattrs = {} 34 for attrname, attrvalue in attrs.items(): 35 if not attrname.startswith("test"): 36 newattrs[attrname] = attrvalue 37 continue 38 39 # If any debug server categories were explicitly tagged, assume 40 # that list to be authoritative. If none were specified, try 41 # all of them. 42 all_categories = set(["debugserver", "llgs"]) 43 categories = set( 44 getattr(attrvalue, "categories", [])) & all_categories 45 if not categories: 46 categories = all_categories 47 48 for cat in categories: 49 @decorators.add_test_categories([cat]) 50 @wraps(attrvalue) 51 def test_method(self, attrvalue=attrvalue): 52 return attrvalue(self) 53 54 method_name = attrname + "_" + cat 55 test_method.__name__ = method_name 56 test_method.debug_server = cat 57 newattrs[method_name] = test_method 58 59 return super(GdbRemoteTestCaseFactory, cls).__new__( 60 cls, name, bases, newattrs) 61 62@add_metaclass(GdbRemoteTestCaseFactory) 63class GdbRemoteTestCaseBase(Base): 64 65 # Default time out in seconds. The timeout is increased tenfold under Asan. 66 DEFAULT_TIMEOUT = 20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1) 67 # Default sleep time in seconds. The sleep time is doubled under Asan. 68 DEFAULT_SLEEP = 5 * (2 if ('ASAN_OPTIONS' in os.environ) else 1) 69 70 _GDBREMOTE_KILL_PACKET = b"$k#6b" 71 72 # Start the inferior separately, attach to the inferior on the stub 73 # command line. 74 _STARTUP_ATTACH = "attach" 75 # Start the inferior separately, start the stub without attaching, allow 76 # the test to attach to the inferior however it wants (e.g. $vAttach;pid). 77 _STARTUP_ATTACH_MANUALLY = "attach_manually" 78 # Start the stub, and launch the inferior with an $A packet via the 79 # initial packet stream. 80 _STARTUP_LAUNCH = "launch" 81 82 # GDB Signal numbers that are not target-specific used for common 83 # exceptions 84 TARGET_EXC_BAD_ACCESS = 0x91 85 TARGET_EXC_BAD_INSTRUCTION = 0x92 86 TARGET_EXC_ARITHMETIC = 0x93 87 TARGET_EXC_EMULATION = 0x94 88 TARGET_EXC_SOFTWARE = 0x95 89 TARGET_EXC_BREAKPOINT = 0x96 90 91 _verbose_log_handler = None 92 _log_formatter = logging.Formatter( 93 fmt='%(asctime)-15s %(levelname)-8s %(message)s') 94 95 def setUpBaseLogging(self): 96 self.logger = logging.getLogger(__name__) 97 98 if len(self.logger.handlers) > 0: 99 return # We have set up this handler already 100 101 self.logger.propagate = False 102 self.logger.setLevel(logging.DEBUG) 103 104 # log all warnings to stderr 105 handler = logging.StreamHandler() 106 handler.setLevel(logging.WARNING) 107 handler.setFormatter(self._log_formatter) 108 self.logger.addHandler(handler) 109 110 def isVerboseLoggingRequested(self): 111 # We will report our detailed logs if the user requested that the "gdb-remote" channel is 112 # logged. 113 return any(("gdb-remote" in channel) 114 for channel in lldbtest_config.channels) 115 116 def getDebugServer(self): 117 method = getattr(self, self.testMethodName) 118 return getattr(method, "debug_server", None) 119 120 def setUp(self): 121 super(GdbRemoteTestCaseBase, self).setUp() 122 123 self.setUpBaseLogging() 124 self.debug_monitor_extra_args = [] 125 126 if self.isVerboseLoggingRequested(): 127 # If requested, full logs go to a log file 128 self._verbose_log_handler = logging.FileHandler( 129 self.getLogBasenameForCurrentTest() + "-host.log") 130 self._verbose_log_handler.setFormatter(self._log_formatter) 131 self._verbose_log_handler.setLevel(logging.DEBUG) 132 self.logger.addHandler(self._verbose_log_handler) 133 134 self.test_sequence = GdbRemoteTestSequence(self.logger) 135 self.set_inferior_startup_launch() 136 self.port = self.get_next_port() 137 self.stub_sends_two_stop_notifications_on_kill = False 138 if configuration.lldb_platform_url: 139 if configuration.lldb_platform_url.startswith('unix-'): 140 url_pattern = '(.+)://\[?(.+?)\]?/.*' 141 else: 142 url_pattern = '(.+)://(.+):\d+' 143 scheme, host = re.match( 144 url_pattern, configuration.lldb_platform_url).groups() 145 if configuration.lldb_platform_name == 'remote-android' and host != 'localhost': 146 self.stub_device = host 147 self.stub_hostname = 'localhost' 148 else: 149 self.stub_device = None 150 self.stub_hostname = host 151 else: 152 self.stub_hostname = "localhost" 153 154 debug_server = self.getDebugServer() 155 if debug_server == "debugserver": 156 self._init_debugserver_test() 157 else: 158 self._init_llgs_test() 159 160 def tearDown(self): 161 self.logger.removeHandler(self._verbose_log_handler) 162 self._verbose_log_handler = None 163 TestBase.tearDown(self) 164 165 def build(self, *args, **kwargs): 166 self.buildDefault(*args, **kwargs) 167 168 def getLocalServerLogFile(self): 169 return self.getLogBasenameForCurrentTest() + "-server.log" 170 171 def setUpServerLogging(self, is_llgs): 172 if len(lldbtest_config.channels) == 0: 173 return # No logging requested 174 175 if lldb.remote_platform: 176 log_file = lldbutil.join_remote_paths( 177 lldb.remote_platform.GetWorkingDirectory(), "server.log") 178 else: 179 log_file = self.getLocalServerLogFile() 180 181 if is_llgs: 182 self.debug_monitor_extra_args.append("--log-file=" + log_file) 183 self.debug_monitor_extra_args.append( 184 "--log-channels={}".format(":".join(lldbtest_config.channels))) 185 else: 186 self.debug_monitor_extra_args = [ 187 "--log-file=" + log_file, "--log-flags=0x800000"] 188 189 def get_next_port(self): 190 return 12000 + random.randint(0, 3999) 191 192 def reset_test_sequence(self): 193 self.test_sequence = GdbRemoteTestSequence(self.logger) 194 195 196 def _init_llgs_test(self): 197 reverse_connect = True 198 if lldb.remote_platform: 199 # Reverse connections may be tricky due to firewalls/NATs. 200 reverse_connect = False 201 202 # FIXME: This is extremely linux-oriented 203 204 # Grab the ppid from /proc/[shell pid]/stat 205 err, retcode, shell_stat = self.run_platform_command( 206 "cat /proc/$$/stat") 207 self.assertTrue( 208 err.Success() and retcode == 0, 209 "Failed to read file /proc/$$/stat: %s, retcode: %d" % 210 (err.GetCString(), 211 retcode)) 212 213 # [pid] ([executable]) [state] [*ppid*] 214 pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1) 215 err, retcode, ls_output = self.run_platform_command( 216 "ls -l /proc/%s/exe" % pid) 217 self.assertTrue( 218 err.Success() and retcode == 0, 219 "Failed to read file /proc/%s/exe: %s, retcode: %d" % 220 (pid, 221 err.GetCString(), 222 retcode)) 223 exe = ls_output.split()[-1] 224 225 # If the binary has been deleted, the link name has " (deleted)" appended. 226 # Remove if it's there. 227 self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe) 228 else: 229 self.debug_monitor_exe = get_lldb_server_exe() 230 if not self.debug_monitor_exe: 231 self.skipTest("lldb-server exe not found") 232 233 self.debug_monitor_extra_args = ["gdbserver"] 234 self.setUpServerLogging(is_llgs=True) 235 236 self.reverse_connect = reverse_connect 237 238 def _init_debugserver_test(self): 239 self.debug_monitor_exe = get_debugserver_exe() 240 if not self.debug_monitor_exe: 241 self.skipTest("debugserver exe not found") 242 self.setUpServerLogging(is_llgs=False) 243 self.reverse_connect = True 244 245 # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification 246 # when the process truly dies. 247 self.stub_sends_two_stop_notifications_on_kill = True 248 249 def forward_adb_port(self, source, target, direction, device): 250 adb = ['adb'] + (['-s', device] if device else []) + [direction] 251 252 def remove_port_forward(): 253 subprocess.call(adb + ["--remove", "tcp:%d" % source]) 254 255 subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target]) 256 self.addTearDownHook(remove_port_forward) 257 258 def _verify_socket(self, sock): 259 # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the 260 # connect() attempt. However, due to the way how ADB forwarding works, on android targets 261 # the connect() will always be successful, but the connection will be immediately dropped 262 # if ADB could not connect on the remote side. This function tries to detect this 263 # situation, and report it as "connection refused" so that the upper layers attempt the 264 # connection again. 265 triple = self.dbg.GetSelectedPlatform().GetTriple() 266 if not re.match(".*-.*-.*-android", triple): 267 return # Not android. 268 can_read, _, _ = select.select([sock], [], [], 0.1) 269 if sock not in can_read: 270 return # Data is not available, but the connection is alive. 271 if len(sock.recv(1, socket.MSG_PEEK)) == 0: 272 raise _ConnectionRefused() # Got EOF, connection dropped. 273 274 def create_socket(self): 275 try: 276 sock = socket.socket(family=socket.AF_INET) 277 except OSError as e: 278 if e.errno != errno.EAFNOSUPPORT: 279 raise 280 sock = socket.socket(family=socket.AF_INET6) 281 282 logger = self.logger 283 284 triple = self.dbg.GetSelectedPlatform().GetTriple() 285 if re.match(".*-.*-.*-android", triple): 286 self.forward_adb_port( 287 self.port, 288 self.port, 289 "forward", 290 self.stub_device) 291 292 logger.info( 293 "Connecting to debug monitor on %s:%d", 294 self.stub_hostname, 295 self.port) 296 connect_info = (self.stub_hostname, self.port) 297 try: 298 sock.connect(connect_info) 299 except socket.error as serr: 300 if serr.errno == errno.ECONNREFUSED: 301 raise _ConnectionRefused() 302 raise serr 303 304 def shutdown_socket(): 305 if sock: 306 try: 307 # send the kill packet so lldb-server shuts down gracefully 308 sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET) 309 except: 310 logger.warning( 311 "failed to send kill packet to debug monitor: {}; ignoring".format( 312 sys.exc_info()[0])) 313 314 try: 315 sock.close() 316 except: 317 logger.warning( 318 "failed to close socket to debug monitor: {}; ignoring".format( 319 sys.exc_info()[0])) 320 321 self.addTearDownHook(shutdown_socket) 322 323 self._verify_socket(sock) 324 325 return sock 326 327 def set_inferior_startup_launch(self): 328 self._inferior_startup = self._STARTUP_LAUNCH 329 330 def set_inferior_startup_attach(self): 331 self._inferior_startup = self._STARTUP_ATTACH 332 333 def set_inferior_startup_attach_manually(self): 334 self._inferior_startup = self._STARTUP_ATTACH_MANUALLY 335 336 def get_debug_monitor_command_line_args(self, attach_pid=None): 337 commandline_args = self.debug_monitor_extra_args 338 if attach_pid: 339 commandline_args += ["--attach=%d" % attach_pid] 340 if self.reverse_connect: 341 commandline_args += ["--reverse-connect", self.connect_address] 342 else: 343 if lldb.remote_platform: 344 commandline_args += ["*:{}".format(self.port)] 345 else: 346 commandline_args += ["localhost:{}".format(self.port)] 347 348 return commandline_args 349 350 def get_target_byte_order(self): 351 inferior_exe_path = self.getBuildArtifact("a.out") 352 target = self.dbg.CreateTarget(inferior_exe_path) 353 return target.GetByteOrder() 354 355 def launch_debug_monitor(self, attach_pid=None, logfile=None): 356 if self.reverse_connect: 357 family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0] 358 sock = socket.socket(family, type, proto) 359 sock.settimeout(self.DEFAULT_TIMEOUT) 360 361 sock.bind(addr) 362 sock.listen(1) 363 addr = sock.getsockname() 364 self.connect_address = "[{}]:{}".format(*addr) 365 366 367 # Create the command line. 368 commandline_args = self.get_debug_monitor_command_line_args( 369 attach_pid=attach_pid) 370 371 # Start the server. 372 server = self.spawnSubprocess( 373 self.debug_monitor_exe, 374 commandline_args, 375 install_remote=False) 376 self.assertIsNotNone(server) 377 378 if self.reverse_connect: 379 self.sock = sock.accept()[0] 380 self.sock.settimeout(self.DEFAULT_TIMEOUT) 381 382 return server 383 384 def connect_to_debug_monitor(self, attach_pid=None): 385 if self.reverse_connect: 386 # Create the stub. 387 server = self.launch_debug_monitor(attach_pid=attach_pid) 388 self.assertIsNotNone(server) 389 390 # Schedule debug monitor to be shut down during teardown. 391 logger = self.logger 392 393 self._server = Server(self.sock, server) 394 return server 395 396 # We're using a random port algorithm to try not to collide with other ports, 397 # and retry a max # times. 398 attempts = 0 399 MAX_ATTEMPTS = 20 400 401 while attempts < MAX_ATTEMPTS: 402 server = self.launch_debug_monitor(attach_pid=attach_pid) 403 404 # Schedule debug monitor to be shut down during teardown. 405 logger = self.logger 406 407 connect_attemps = 0 408 MAX_CONNECT_ATTEMPTS = 10 409 410 while connect_attemps < MAX_CONNECT_ATTEMPTS: 411 # Create a socket to talk to the server 412 try: 413 logger.info("Connect attempt %d", connect_attemps + 1) 414 self.sock = self.create_socket() 415 self._server = Server(self.sock, server) 416 return server 417 except _ConnectionRefused as serr: 418 # Ignore, and try again. 419 pass 420 time.sleep(0.5) 421 connect_attemps += 1 422 423 # We should close the server here to be safe. 424 server.terminate() 425 426 # Increment attempts. 427 print( 428 "connect to debug monitor on port %d failed, attempt #%d of %d" % 429 (self.port, attempts + 1, MAX_ATTEMPTS)) 430 attempts += 1 431 432 # And wait a random length of time before next attempt, to avoid 433 # collisions. 434 time.sleep(random.randint(1, 5)) 435 436 # Now grab a new port number. 437 self.port = self.get_next_port() 438 439 raise Exception( 440 "failed to create a socket to the launched debug monitor after %d tries" % 441 attempts) 442 443 def launch_process_for_attach( 444 self, 445 inferior_args=None, 446 sleep_seconds=3, 447 exe_path=None): 448 # We're going to start a child process that the debug monitor stub can later attach to. 449 # This process needs to be started so that it just hangs around for a while. We'll 450 # have it sleep. 451 if not exe_path: 452 exe_path = self.getBuildArtifact("a.out") 453 454 args = [] 455 if inferior_args: 456 args.extend(inferior_args) 457 if sleep_seconds: 458 args.append("sleep:%d" % sleep_seconds) 459 460 return self.spawnSubprocess(exe_path, args) 461 462 def prep_debug_monitor_and_inferior( 463 self, 464 inferior_args=None, 465 inferior_sleep_seconds=3, 466 inferior_exe_path=None, 467 inferior_env=None): 468 """Prep the debug monitor, the inferior, and the expected packet stream. 469 470 Handle the separate cases of using the debug monitor in attach-to-inferior mode 471 and in launch-inferior mode. 472 473 For attach-to-inferior mode, the inferior process is first started, then 474 the debug monitor is started in attach to pid mode (using --attach on the 475 stub command line), and the no-ack-mode setup is appended to the packet 476 stream. The packet stream is not yet executed, ready to have more expected 477 packet entries added to it. 478 479 For launch-inferior mode, the stub is first started, then no ack mode is 480 setup on the expected packet stream, then the verified launch packets are added 481 to the expected socket stream. The packet stream is not yet executed, ready 482 to have more expected packet entries added to it. 483 484 The return value is: 485 {inferior:<inferior>, server:<server>} 486 """ 487 inferior = None 488 attach_pid = None 489 490 if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY: 491 # Launch the process that we'll use as the inferior. 492 inferior = self.launch_process_for_attach( 493 inferior_args=inferior_args, 494 sleep_seconds=inferior_sleep_seconds, 495 exe_path=inferior_exe_path) 496 self.assertIsNotNone(inferior) 497 self.assertTrue(inferior.pid > 0) 498 if self._inferior_startup == self._STARTUP_ATTACH: 499 # In this case, we want the stub to attach via the command 500 # line, so set the command line attach pid here. 501 attach_pid = inferior.pid 502 503 if self._inferior_startup == self._STARTUP_LAUNCH: 504 # Build launch args 505 if not inferior_exe_path: 506 inferior_exe_path = self.getBuildArtifact("a.out") 507 508 if lldb.remote_platform: 509 remote_path = lldbutil.append_to_process_working_directory(self, 510 os.path.basename(inferior_exe_path)) 511 remote_file_spec = lldb.SBFileSpec(remote_path, False) 512 err = lldb.remote_platform.Install(lldb.SBFileSpec( 513 inferior_exe_path, True), remote_file_spec) 514 if err.Fail(): 515 raise Exception( 516 "remote_platform.Install('%s', '%s') failed: %s" % 517 (inferior_exe_path, remote_path, err)) 518 inferior_exe_path = remote_path 519 520 launch_args = [inferior_exe_path] 521 if inferior_args: 522 launch_args.extend(inferior_args) 523 524 # Launch the debug monitor stub, attaching to the inferior. 525 server = self.connect_to_debug_monitor(attach_pid=attach_pid) 526 self.assertIsNotNone(server) 527 528 # Build the expected protocol stream 529 self.add_no_ack_remote_stream() 530 if inferior_env: 531 for name, value in inferior_env.items(): 532 self.add_set_environment_packets(name, value) 533 if self._inferior_startup == self._STARTUP_LAUNCH: 534 self.add_verified_launch_packets(launch_args) 535 536 return {"inferior": inferior, "server": server} 537 538 def expect_socket_recv( 539 self, 540 sock, 541 expected_content_regex 542 ): 543 response = "" 544 timeout_time = time.time() + self.DEFAULT_TIMEOUT 545 546 while not expected_content_regex.match( 547 response) and time.time() < timeout_time: 548 can_read, _, _ = select.select([sock], [], [], self.DEFAULT_TIMEOUT) 549 if can_read and sock in can_read: 550 recv_bytes = sock.recv(4096) 551 if recv_bytes: 552 response += seven.bitcast_to_string(recv_bytes) 553 554 self.assertTrue(expected_content_regex.match(response)) 555 556 def expect_socket_send(self, sock, content): 557 request_bytes_remaining = content 558 timeout_time = time.time() + self.DEFAULT_TIMEOUT 559 560 while len(request_bytes_remaining) > 0 and time.time() < timeout_time: 561 _, can_write, _ = select.select([], [sock], [], self.DEFAULT_TIMEOUT) 562 if can_write and sock in can_write: 563 written_byte_count = sock.send(request_bytes_remaining.encode()) 564 request_bytes_remaining = request_bytes_remaining[ 565 written_byte_count:] 566 self.assertEqual(len(request_bytes_remaining), 0) 567 568 def do_handshake(self, stub_socket): 569 # Write the ack. 570 self.expect_socket_send(stub_socket, "+") 571 572 # Send the start no ack mode packet. 573 NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0" 574 bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode()) 575 self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST)) 576 577 # Receive the ack and "OK" 578 self.expect_socket_recv(stub_socket, re.compile( 579 r"^\+\$OK#[0-9a-fA-F]{2}$")) 580 581 # Send the final ack. 582 self.expect_socket_send(stub_socket, "+") 583 584 def add_no_ack_remote_stream(self): 585 self.test_sequence.add_log_lines( 586 ["read packet: +", 587 "read packet: $QStartNoAckMode#b0", 588 "send packet: +", 589 "send packet: $OK#9a", 590 "read packet: +"], 591 True) 592 593 def add_verified_launch_packets(self, launch_args): 594 self.test_sequence.add_log_lines( 595 ["read packet: %s" % build_gdbremote_A_packet(launch_args), 596 "send packet: $OK#00", 597 "read packet: $qLaunchSuccess#a5", 598 "send packet: $OK#00"], 599 True) 600 601 def add_thread_suffix_request_packets(self): 602 self.test_sequence.add_log_lines( 603 ["read packet: $QThreadSuffixSupported#e4", 604 "send packet: $OK#00", 605 ], True) 606 607 def add_process_info_collection_packets(self): 608 self.test_sequence.add_log_lines( 609 ["read packet: $qProcessInfo#dc", 610 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}], 611 True) 612 613 def add_set_environment_packets(self, name, value): 614 self.test_sequence.add_log_lines( 615 ["read packet: $QEnvironment:" + name + "=" + value + "#00", 616 "send packet: $OK#00", 617 ], True) 618 619 _KNOWN_PROCESS_INFO_KEYS = [ 620 "pid", 621 "parent-pid", 622 "real-uid", 623 "real-gid", 624 "effective-uid", 625 "effective-gid", 626 "cputype", 627 "cpusubtype", 628 "ostype", 629 "triple", 630 "vendor", 631 "endian", 632 "elf_abi", 633 "ptrsize" 634 ] 635 636 def parse_process_info_response(self, context): 637 # Ensure we have a process info response. 638 self.assertIsNotNone(context) 639 process_info_raw = context.get("process_info_raw") 640 self.assertIsNotNone(process_info_raw) 641 642 # Pull out key:value; pairs. 643 process_info_dict = { 644 match.group(1): match.group(2) for match in re.finditer( 645 r"([^:]+):([^;]+);", process_info_raw)} 646 647 # Validate keys are known. 648 for (key, val) in list(process_info_dict.items()): 649 self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS) 650 self.assertIsNotNone(val) 651 652 return process_info_dict 653 654 def add_register_info_collection_packets(self): 655 self.test_sequence.add_log_lines( 656 [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True, 657 "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"), 658 "save_key": "reg_info_responses"}], 659 True) 660 661 def parse_register_info_packets(self, context): 662 """Return an array of register info dictionaries, one per register info.""" 663 reg_info_responses = context.get("reg_info_responses") 664 self.assertIsNotNone(reg_info_responses) 665 666 # Parse register infos. 667 return [parse_reg_info_response(reg_info_response) 668 for reg_info_response in reg_info_responses] 669 670 def expect_gdbremote_sequence(self): 671 return expect_lldb_gdbserver_replay( 672 self, 673 self._server, 674 self.test_sequence, 675 self.DEFAULT_TIMEOUT * len(self.test_sequence), 676 self.logger) 677 678 _KNOWN_REGINFO_KEYS = [ 679 "name", 680 "alt-name", 681 "bitsize", 682 "offset", 683 "encoding", 684 "format", 685 "set", 686 "gcc", 687 "ehframe", 688 "dwarf", 689 "generic", 690 "container-regs", 691 "invalidate-regs", 692 "dynamic_size_dwarf_expr_bytes", 693 "dynamic_size_dwarf_len" 694 ] 695 696 def assert_valid_reg_info(self, reg_info): 697 # Assert we know about all the reginfo keys parsed. 698 for key in reg_info: 699 self.assertTrue(key in self._KNOWN_REGINFO_KEYS) 700 701 # Check the bare-minimum expected set of register info keys. 702 self.assertTrue("name" in reg_info) 703 self.assertTrue("bitsize" in reg_info) 704 705 if not self.getArchitecture() == 'aarch64': 706 self.assertTrue("offset" in reg_info) 707 708 self.assertTrue("encoding" in reg_info) 709 self.assertTrue("format" in reg_info) 710 711 def find_pc_reg_info(self, reg_infos): 712 lldb_reg_index = 0 713 for reg_info in reg_infos: 714 if ("generic" in reg_info) and (reg_info["generic"] == "pc"): 715 return (lldb_reg_index, reg_info) 716 lldb_reg_index += 1 717 718 return (None, None) 719 720 def add_lldb_register_index(self, reg_infos): 721 """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry. 722 723 We'll use this when we want to call packets like P/p with a register index but do so 724 on only a subset of the full register info set. 725 """ 726 self.assertIsNotNone(reg_infos) 727 728 reg_index = 0 729 for reg_info in reg_infos: 730 reg_info["lldb_register_index"] = reg_index 731 reg_index += 1 732 733 def add_query_memory_region_packets(self, address): 734 self.test_sequence.add_log_lines( 735 ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address), 736 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}], 737 True) 738 739 def parse_key_val_dict(self, key_val_text, allow_dupes=True): 740 self.assertIsNotNone(key_val_text) 741 kv_dict = {} 742 for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text): 743 key = match.group(1) 744 val = match.group(2) 745 if key in kv_dict: 746 if allow_dupes: 747 if isinstance(kv_dict[key], list): 748 kv_dict[key].append(val) 749 else: 750 # Promote to list 751 kv_dict[key] = [kv_dict[key], val] 752 else: 753 self.fail( 754 "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format( 755 key, val, key_val_text, kv_dict)) 756 else: 757 kv_dict[key] = val 758 return kv_dict 759 760 def parse_memory_region_packet(self, context): 761 # Ensure we have a context. 762 self.assertIsNotNone(context.get("memory_region_response")) 763 764 # Pull out key:value; pairs. 765 mem_region_dict = self.parse_key_val_dict( 766 context.get("memory_region_response")) 767 768 # Validate keys are known. 769 for (key, val) in list(mem_region_dict.items()): 770 self.assertIn(key, 771 ["start", 772 "size", 773 "permissions", 774 "flags", 775 "name", 776 "error"]) 777 self.assertIsNotNone(val) 778 779 mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", "")) 780 # Return the dictionary of key-value pairs for the memory region. 781 return mem_region_dict 782 783 def assert_address_within_memory_region( 784 self, test_address, mem_region_dict): 785 self.assertIsNotNone(mem_region_dict) 786 self.assertTrue("start" in mem_region_dict) 787 self.assertTrue("size" in mem_region_dict) 788 789 range_start = int(mem_region_dict["start"], 16) 790 range_size = int(mem_region_dict["size"], 16) 791 range_end = range_start + range_size 792 793 if test_address < range_start: 794 self.fail( 795 "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 796 test_address, 797 range_start, 798 range_end, 799 range_size)) 800 elif test_address >= range_end: 801 self.fail( 802 "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format( 803 test_address, 804 range_start, 805 range_end, 806 range_size)) 807 808 def add_threadinfo_collection_packets(self): 809 self.test_sequence.add_log_lines( 810 [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo", 811 "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"), 812 "save_key": "threadinfo_responses"}], 813 True) 814 815 def parse_threadinfo_packets(self, context): 816 """Return an array of thread ids (decimal ints), one per thread.""" 817 threadinfo_responses = context.get("threadinfo_responses") 818 self.assertIsNotNone(threadinfo_responses) 819 820 thread_ids = [] 821 for threadinfo_response in threadinfo_responses: 822 new_thread_infos = parse_threadinfo_response(threadinfo_response) 823 thread_ids.extend(new_thread_infos) 824 return thread_ids 825 826 def wait_for_thread_count(self, thread_count): 827 start_time = time.time() 828 timeout_time = start_time + self.DEFAULT_TIMEOUT 829 830 actual_thread_count = 0 831 while actual_thread_count < thread_count: 832 self.reset_test_sequence() 833 self.add_threadinfo_collection_packets() 834 835 context = self.expect_gdbremote_sequence() 836 self.assertIsNotNone(context) 837 838 threads = self.parse_threadinfo_packets(context) 839 self.assertIsNotNone(threads) 840 841 actual_thread_count = len(threads) 842 843 if time.time() > timeout_time: 844 raise Exception( 845 'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format( 846 self.DEFAULT_TIMEOUT, thread_count, actual_thread_count)) 847 848 return threads 849 850 def add_set_breakpoint_packets( 851 self, 852 address, 853 z_packet_type=0, 854 do_continue=True, 855 breakpoint_kind=1): 856 self.test_sequence.add_log_lines( 857 [ # Set the breakpoint. 858 "read packet: $Z{2},{0:x},{1}#00".format( 859 address, breakpoint_kind, z_packet_type), 860 # Verify the stub could set it. 861 "send packet: $OK#00", 862 ], True) 863 864 if (do_continue): 865 self.test_sequence.add_log_lines( 866 [ # Continue the inferior. 867 "read packet: $c#63", 868 # Expect a breakpoint stop report. 869 {"direction": "send", 870 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 871 "capture": {1: "stop_signo", 872 2: "stop_thread_id"}}, 873 ], True) 874 875 def add_remove_breakpoint_packets( 876 self, 877 address, 878 z_packet_type=0, 879 breakpoint_kind=1): 880 self.test_sequence.add_log_lines( 881 [ # Remove the breakpoint. 882 "read packet: $z{2},{0:x},{1}#00".format( 883 address, breakpoint_kind, z_packet_type), 884 # Verify the stub could unset it. 885 "send packet: $OK#00", 886 ], True) 887 888 def add_qSupported_packets(self): 889 self.test_sequence.add_log_lines( 890 ["read packet: $qSupported#00", 891 {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}}, 892 ], True) 893 894 _KNOWN_QSUPPORTED_STUB_FEATURES = [ 895 "augmented-libraries-svr4-read", 896 "PacketSize", 897 "QStartNoAckMode", 898 "QThreadSuffixSupported", 899 "QListThreadsInStopReply", 900 "qXfer:auxv:read", 901 "qXfer:libraries:read", 902 "qXfer:libraries-svr4:read", 903 "qXfer:features:read", 904 "qEcho", 905 "QPassSignals" 906 ] 907 908 def parse_qSupported_response(self, context): 909 self.assertIsNotNone(context) 910 911 raw_response = context.get("qSupported_response") 912 self.assertIsNotNone(raw_response) 913 914 # For values with key=val, the dict key and vals are set as expected. For feature+, feature- and feature?, the 915 # +,-,? is stripped from the key and set as the value. 916 supported_dict = {} 917 for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response): 918 key = match.group(1) 919 val = match.group(3) 920 921 # key=val: store as is 922 if val and len(val) > 0: 923 supported_dict[key] = val 924 else: 925 if len(key) < 2: 926 raise Exception( 927 "singular stub feature is too short: must be stub_feature{+,-,?}") 928 supported_type = key[-1] 929 key = key[:-1] 930 if not supported_type in ["+", "-", "?"]: 931 raise Exception( 932 "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type)) 933 supported_dict[key] = supported_type 934 # Ensure we know the supported element 935 if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES: 936 raise Exception( 937 "unknown qSupported stub feature reported: %s" % 938 key) 939 940 return supported_dict 941 942 def run_process_then_stop(self, run_seconds=1): 943 # Tell the stub to continue. 944 self.test_sequence.add_log_lines( 945 ["read packet: $vCont;c#a8"], 946 True) 947 context = self.expect_gdbremote_sequence() 948 949 # Wait for run_seconds. 950 time.sleep(run_seconds) 951 952 # Send an interrupt, capture a T response. 953 self.reset_test_sequence() 954 self.test_sequence.add_log_lines( 955 ["read packet: {}".format(chr(3)), 956 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}], 957 True) 958 context = self.expect_gdbremote_sequence() 959 self.assertIsNotNone(context) 960 self.assertIsNotNone(context.get("stop_result")) 961 962 return context 963 964 def continue_process_and_wait_for_stop(self): 965 self.test_sequence.add_log_lines( 966 [ 967 "read packet: $vCont;c#a8", 968 { 969 "direction": "send", 970 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 971 "capture": {1: "stop_signo", 2: "stop_key_val_text"}, 972 }, 973 ], 974 True, 975 ) 976 context = self.expect_gdbremote_sequence() 977 self.assertIsNotNone(context) 978 return self.parse_interrupt_packets(context) 979 980 def select_modifiable_register(self, reg_infos): 981 """Find a register that can be read/written freely.""" 982 PREFERRED_REGISTER_NAMES = set(["rax", ]) 983 984 # First check for the first register from the preferred register name 985 # set. 986 alternative_register_index = None 987 988 self.assertIsNotNone(reg_infos) 989 for reg_info in reg_infos: 990 if ("name" in reg_info) and ( 991 reg_info["name"] in PREFERRED_REGISTER_NAMES): 992 # We found a preferred register. Use it. 993 return reg_info["lldb_register_index"] 994 if ("generic" in reg_info) and (reg_info["generic"] == "fp" or 995 reg_info["generic"] == "arg1"): 996 # A frame pointer or first arg register will do as a 997 # register to modify temporarily. 998 alternative_register_index = reg_info["lldb_register_index"] 999 1000 # We didn't find a preferred register. Return whatever alternative register 1001 # we found, if any. 1002 return alternative_register_index 1003 1004 def extract_registers_from_stop_notification(self, stop_key_vals_text): 1005 self.assertIsNotNone(stop_key_vals_text) 1006 kv_dict = self.parse_key_val_dict(stop_key_vals_text) 1007 1008 registers = {} 1009 for (key, val) in list(kv_dict.items()): 1010 if re.match(r"^[0-9a-fA-F]+$", key): 1011 registers[int(key, 16)] = val 1012 return registers 1013 1014 def gather_register_infos(self): 1015 self.reset_test_sequence() 1016 self.add_register_info_collection_packets() 1017 1018 context = self.expect_gdbremote_sequence() 1019 self.assertIsNotNone(context) 1020 1021 reg_infos = self.parse_register_info_packets(context) 1022 self.assertIsNotNone(reg_infos) 1023 self.add_lldb_register_index(reg_infos) 1024 1025 return reg_infos 1026 1027 def find_generic_register_with_name(self, reg_infos, generic_name): 1028 self.assertIsNotNone(reg_infos) 1029 for reg_info in reg_infos: 1030 if ("generic" in reg_info) and ( 1031 reg_info["generic"] == generic_name): 1032 return reg_info 1033 return None 1034 1035 def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num): 1036 self.assertIsNotNone(reg_infos) 1037 for reg_info in reg_infos: 1038 if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num): 1039 return reg_info 1040 return None 1041 1042 def decode_gdbremote_binary(self, encoded_bytes): 1043 decoded_bytes = "" 1044 i = 0 1045 while i < len(encoded_bytes): 1046 if encoded_bytes[i] == "}": 1047 # Handle escaped char. 1048 self.assertTrue(i + 1 < len(encoded_bytes)) 1049 decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20) 1050 i += 2 1051 elif encoded_bytes[i] == "*": 1052 # Handle run length encoding. 1053 self.assertTrue(len(decoded_bytes) > 0) 1054 self.assertTrue(i + 1 < len(encoded_bytes)) 1055 repeat_count = ord(encoded_bytes[i + 1]) - 29 1056 decoded_bytes += decoded_bytes[-1] * repeat_count 1057 i += 2 1058 else: 1059 decoded_bytes += encoded_bytes[i] 1060 i += 1 1061 return decoded_bytes 1062 1063 def build_auxv_dict(self, endian, word_size, auxv_data): 1064 self.assertIsNotNone(endian) 1065 self.assertIsNotNone(word_size) 1066 self.assertIsNotNone(auxv_data) 1067 1068 auxv_dict = {} 1069 1070 # PowerPC64le's auxvec has a special key that must be ignored. 1071 # This special key may be used multiple times, resulting in 1072 # multiple key/value pairs with the same key, which would otherwise 1073 # break this test check for repeated keys. 1074 # 1075 # AT_IGNOREPPC = 22 1076 ignored_keys_for_arch = { 'powerpc64le' : [22] } 1077 arch = self.getArchitecture() 1078 ignore_keys = None 1079 if arch in ignored_keys_for_arch: 1080 ignore_keys = ignored_keys_for_arch[arch] 1081 1082 while len(auxv_data) > 0: 1083 # Chop off key. 1084 raw_key = auxv_data[:word_size] 1085 auxv_data = auxv_data[word_size:] 1086 1087 # Chop of value. 1088 raw_value = auxv_data[:word_size] 1089 auxv_data = auxv_data[word_size:] 1090 1091 # Convert raw text from target endian. 1092 key = unpack_endian_binary_string(endian, raw_key) 1093 value = unpack_endian_binary_string(endian, raw_value) 1094 1095 if ignore_keys and key in ignore_keys: 1096 continue 1097 1098 # Handle ending entry. 1099 if key == 0: 1100 self.assertEqual(value, 0) 1101 return auxv_dict 1102 1103 # The key should not already be present. 1104 self.assertFalse(key in auxv_dict) 1105 auxv_dict[key] = value 1106 1107 self.fail( 1108 "should not reach here - implies required double zero entry not found") 1109 return auxv_dict 1110 1111 def read_binary_data_in_chunks(self, command_prefix, chunk_length): 1112 """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned.""" 1113 offset = 0 1114 done = False 1115 decoded_data = "" 1116 1117 while not done: 1118 # Grab the next iteration of data. 1119 self.reset_test_sequence() 1120 self.test_sequence.add_log_lines( 1121 [ 1122 "read packet: ${}{:x},{:x}:#00".format( 1123 command_prefix, 1124 offset, 1125 chunk_length), 1126 { 1127 "direction": "send", 1128 "regex": re.compile( 1129 r"^\$([^E])(.*)#[0-9a-fA-F]{2}$", 1130 re.MULTILINE | re.DOTALL), 1131 "capture": { 1132 1: "response_type", 1133 2: "content_raw"}}], 1134 True) 1135 1136 context = self.expect_gdbremote_sequence() 1137 self.assertIsNotNone(context) 1138 1139 response_type = context.get("response_type") 1140 self.assertIsNotNone(response_type) 1141 self.assertTrue(response_type in ["l", "m"]) 1142 1143 # Move offset along. 1144 offset += chunk_length 1145 1146 # Figure out if we're done. We're done if the response type is l. 1147 done = response_type == "l" 1148 1149 # Decode binary data. 1150 content_raw = context.get("content_raw") 1151 if content_raw and len(content_raw) > 0: 1152 self.assertIsNotNone(content_raw) 1153 decoded_data += self.decode_gdbremote_binary(content_raw) 1154 return decoded_data 1155 1156 def add_interrupt_packets(self): 1157 self.test_sequence.add_log_lines([ 1158 # Send the intterupt. 1159 "read packet: {}".format(chr(3)), 1160 # And wait for the stop notification. 1161 {"direction": "send", 1162 "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$", 1163 "capture": {1: "stop_signo", 1164 2: "stop_key_val_text"}}, 1165 ], True) 1166 1167 def parse_interrupt_packets(self, context): 1168 self.assertIsNotNone(context.get("stop_signo")) 1169 self.assertIsNotNone(context.get("stop_key_val_text")) 1170 return (int(context["stop_signo"], 16), self.parse_key_val_dict( 1171 context["stop_key_val_text"])) 1172 1173 def add_QSaveRegisterState_packets(self, thread_id): 1174 if thread_id: 1175 # Use the thread suffix form. 1176 request = "read packet: $QSaveRegisterState;thread:{:x}#00".format( 1177 thread_id) 1178 else: 1179 request = "read packet: $QSaveRegisterState#00" 1180 1181 self.test_sequence.add_log_lines([request, 1182 {"direction": "send", 1183 "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$", 1184 "capture": {1: "save_response"}}, 1185 ], 1186 True) 1187 1188 def parse_QSaveRegisterState_response(self, context): 1189 self.assertIsNotNone(context) 1190 1191 save_response = context.get("save_response") 1192 self.assertIsNotNone(save_response) 1193 1194 if len(save_response) < 1 or save_response[0] == "E": 1195 # error received 1196 return (False, None) 1197 else: 1198 return (True, int(save_response)) 1199 1200 def add_QRestoreRegisterState_packets(self, save_id, thread_id=None): 1201 if thread_id: 1202 # Use the thread suffix form. 1203 request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format( 1204 save_id, thread_id) 1205 else: 1206 request = "read packet: $QRestoreRegisterState:{}#00".format( 1207 save_id) 1208 1209 self.test_sequence.add_log_lines([ 1210 request, 1211 "send packet: $OK#00" 1212 ], True) 1213 1214 def flip_all_bits_in_each_register_value( 1215 self, reg_infos, endian, thread_id=None): 1216 self.assertIsNotNone(reg_infos) 1217 1218 successful_writes = 0 1219 failed_writes = 0 1220 1221 for reg_info in reg_infos: 1222 # Use the lldb register index added to the reg info. We're not necessarily 1223 # working off a full set of register infos, so an inferred register 1224 # index could be wrong. 1225 reg_index = reg_info["lldb_register_index"] 1226 self.assertIsNotNone(reg_index) 1227 1228 reg_byte_size = int(reg_info["bitsize"]) // 8 1229 self.assertTrue(reg_byte_size > 0) 1230 1231 # Handle thread suffix. 1232 if thread_id: 1233 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1234 reg_index, thread_id) 1235 else: 1236 p_request = "read packet: $p{:x}#00".format(reg_index) 1237 1238 # Read the existing value. 1239 self.reset_test_sequence() 1240 self.test_sequence.add_log_lines([ 1241 p_request, 1242 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1243 ], True) 1244 context = self.expect_gdbremote_sequence() 1245 self.assertIsNotNone(context) 1246 1247 # Verify the response length. 1248 p_response = context.get("p_response") 1249 self.assertIsNotNone(p_response) 1250 initial_reg_value = unpack_register_hex_unsigned( 1251 endian, p_response) 1252 1253 # Flip the value by xoring with all 1s 1254 all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8) 1255 flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16) 1256 # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int)) 1257 1258 # Handle thread suffix for P. 1259 if thread_id: 1260 P_request = "read packet: $P{:x}={};thread:{:x}#00".format( 1261 reg_index, pack_register_hex( 1262 endian, flipped_bits_int, byte_size=reg_byte_size), thread_id) 1263 else: 1264 P_request = "read packet: $P{:x}={}#00".format( 1265 reg_index, pack_register_hex( 1266 endian, flipped_bits_int, byte_size=reg_byte_size)) 1267 1268 # Write the flipped value to the register. 1269 self.reset_test_sequence() 1270 self.test_sequence.add_log_lines([P_request, 1271 {"direction": "send", 1272 "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}", 1273 "capture": {1: "P_response"}}, 1274 ], 1275 True) 1276 context = self.expect_gdbremote_sequence() 1277 self.assertIsNotNone(context) 1278 1279 # Determine if the write succeeded. There are a handful of registers that can fail, or partially fail 1280 # (e.g. flags, segment selectors, etc.) due to register value restrictions. Don't worry about them 1281 # all flipping perfectly. 1282 P_response = context.get("P_response") 1283 self.assertIsNotNone(P_response) 1284 if P_response == "OK": 1285 successful_writes += 1 1286 else: 1287 failed_writes += 1 1288 # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response)) 1289 1290 # Read back the register value, ensure it matches the flipped 1291 # value. 1292 if P_response == "OK": 1293 self.reset_test_sequence() 1294 self.test_sequence.add_log_lines([ 1295 p_request, 1296 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1297 ], True) 1298 context = self.expect_gdbremote_sequence() 1299 self.assertIsNotNone(context) 1300 1301 verify_p_response_raw = context.get("p_response") 1302 self.assertIsNotNone(verify_p_response_raw) 1303 verify_bits = unpack_register_hex_unsigned( 1304 endian, verify_p_response_raw) 1305 1306 if verify_bits != flipped_bits_int: 1307 # Some registers, like mxcsrmask and others, will permute what's written. Adjust succeed/fail counts. 1308 # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits)) 1309 successful_writes -= 1 1310 failed_writes += 1 1311 1312 return (successful_writes, failed_writes) 1313 1314 def is_bit_flippable_register(self, reg_info): 1315 if not reg_info: 1316 return False 1317 if not "set" in reg_info: 1318 return False 1319 if reg_info["set"] != "General Purpose Registers": 1320 return False 1321 if ("container-regs" in reg_info) and ( 1322 len(reg_info["container-regs"]) > 0): 1323 # Don't try to bit flip registers contained in another register. 1324 return False 1325 if re.match("^.s$", reg_info["name"]): 1326 # This is a 2-letter register name that ends in "s", like a segment register. 1327 # Don't try to bit flip these. 1328 return False 1329 if re.match("^(c|)psr$", reg_info["name"]): 1330 # This is an ARM program status register; don't flip it. 1331 return False 1332 # Okay, this looks fine-enough. 1333 return True 1334 1335 def read_register_values(self, reg_infos, endian, thread_id=None): 1336 self.assertIsNotNone(reg_infos) 1337 values = {} 1338 1339 for reg_info in reg_infos: 1340 # We append a register index when load reg infos so we can work 1341 # with subsets. 1342 reg_index = reg_info.get("lldb_register_index") 1343 self.assertIsNotNone(reg_index) 1344 1345 # Handle thread suffix. 1346 if thread_id: 1347 p_request = "read packet: $p{:x};thread:{:x}#00".format( 1348 reg_index, thread_id) 1349 else: 1350 p_request = "read packet: $p{:x}#00".format(reg_index) 1351 1352 # Read it with p. 1353 self.reset_test_sequence() 1354 self.test_sequence.add_log_lines([ 1355 p_request, 1356 {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}}, 1357 ], True) 1358 context = self.expect_gdbremote_sequence() 1359 self.assertIsNotNone(context) 1360 1361 # Convert value from target endian to integral. 1362 p_response = context.get("p_response") 1363 self.assertIsNotNone(p_response) 1364 self.assertTrue(len(p_response) > 0) 1365 self.assertFalse(p_response[0] == "E") 1366 1367 values[reg_index] = unpack_register_hex_unsigned( 1368 endian, p_response) 1369 1370 return values 1371 1372 def add_vCont_query_packets(self): 1373 self.test_sequence.add_log_lines(["read packet: $vCont?#49", 1374 {"direction": "send", 1375 "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$", 1376 "capture": {2: "vCont_query_response"}}, 1377 ], 1378 True) 1379 1380 def parse_vCont_query_response(self, context): 1381 self.assertIsNotNone(context) 1382 vCont_query_response = context.get("vCont_query_response") 1383 1384 # Handle case of no vCont support at all - in which case the capture 1385 # group will be none or zero length. 1386 if not vCont_query_response or len(vCont_query_response) == 0: 1387 return {} 1388 1389 return {key: 1 for key in vCont_query_response.split( 1390 ";") if key and len(key) > 0} 1391 1392 def count_single_steps_until_true( 1393 self, 1394 thread_id, 1395 predicate, 1396 args, 1397 max_step_count=100, 1398 use_Hc_packet=True, 1399 step_instruction="s"): 1400 """Used by single step test that appears in a few different contexts.""" 1401 single_step_count = 0 1402 1403 while single_step_count < max_step_count: 1404 self.assertIsNotNone(thread_id) 1405 1406 # Build the packet for the single step instruction. We replace 1407 # {thread}, if present, with the thread_id. 1408 step_packet = "read packet: ${}#00".format( 1409 re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction)) 1410 # print("\nstep_packet created: {}\n".format(step_packet)) 1411 1412 # Single step. 1413 self.reset_test_sequence() 1414 if use_Hc_packet: 1415 self.test_sequence.add_log_lines( 1416 [ # Set the continue thread. 1417 "read packet: $Hc{0:x}#00".format(thread_id), 1418 "send packet: $OK#00", 1419 ], True) 1420 self.test_sequence.add_log_lines([ 1421 # Single step. 1422 step_packet, 1423 # "read packet: $vCont;s:{0:x}#00".format(thread_id), 1424 # Expect a breakpoint stop report. 1425 {"direction": "send", 1426 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", 1427 "capture": {1: "stop_signo", 1428 2: "stop_thread_id"}}, 1429 ], True) 1430 context = self.expect_gdbremote_sequence() 1431 self.assertIsNotNone(context) 1432 self.assertIsNotNone(context.get("stop_signo")) 1433 self.assertEqual(int(context.get("stop_signo"), 16), 1434 lldbutil.get_signal_number('SIGTRAP')) 1435 1436 single_step_count += 1 1437 1438 # See if the predicate is true. If so, we're done. 1439 if predicate(args): 1440 return (True, single_step_count) 1441 1442 # The predicate didn't return true within the runaway step count. 1443 return (False, single_step_count) 1444 1445 def g_c1_c2_contents_are(self, args): 1446 """Used by single step test that appears in a few different contexts.""" 1447 g_c1_address = args["g_c1_address"] 1448 g_c2_address = args["g_c2_address"] 1449 expected_g_c1 = args["expected_g_c1"] 1450 expected_g_c2 = args["expected_g_c2"] 1451 1452 # Read g_c1 and g_c2 contents. 1453 self.reset_test_sequence() 1454 self.test_sequence.add_log_lines( 1455 ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1), 1456 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}}, 1457 "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1), 1458 {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}], 1459 True) 1460 1461 # Run the packet stream. 1462 context = self.expect_gdbremote_sequence() 1463 self.assertIsNotNone(context) 1464 1465 # Check if what we read from inferior memory is what we are expecting. 1466 self.assertIsNotNone(context.get("g_c1_contents")) 1467 self.assertIsNotNone(context.get("g_c2_contents")) 1468 1469 return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and ( 1470 seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2) 1471 1472 def single_step_only_steps_one_instruction( 1473 self, use_Hc_packet=True, step_instruction="s"): 1474 """Used by single step test that appears in a few different contexts.""" 1475 # Start up the inferior. 1476 procs = self.prep_debug_monitor_and_inferior( 1477 inferior_args=[ 1478 "get-code-address-hex:swap_chars", 1479 "get-data-address-hex:g_c1", 1480 "get-data-address-hex:g_c2", 1481 "sleep:1", 1482 "call-function:swap_chars", 1483 "sleep:5"]) 1484 1485 # Run the process 1486 self.test_sequence.add_log_lines( 1487 [ # Start running after initial stop. 1488 "read packet: $c#63", 1489 # Match output line that prints the memory address of the function call entry point. 1490 # Note we require launch-only testing so we can get inferior otuput. 1491 {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$", 1492 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}}, 1493 # Now stop the inferior. 1494 "read packet: {}".format(chr(3)), 1495 # And wait for the stop notification. 1496 {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}], 1497 True) 1498 1499 # Run the packet stream. 1500 context = self.expect_gdbremote_sequence() 1501 self.assertIsNotNone(context) 1502 1503 # Grab the main thread id. 1504 self.assertIsNotNone(context.get("stop_thread_id")) 1505 main_thread_id = int(context.get("stop_thread_id"), 16) 1506 1507 # Grab the function address. 1508 self.assertIsNotNone(context.get("function_address")) 1509 function_address = int(context.get("function_address"), 16) 1510 1511 # Grab the data addresses. 1512 self.assertIsNotNone(context.get("g_c1_address")) 1513 g_c1_address = int(context.get("g_c1_address"), 16) 1514 1515 self.assertIsNotNone(context.get("g_c2_address")) 1516 g_c2_address = int(context.get("g_c2_address"), 16) 1517 1518 # Set a breakpoint at the given address. 1519 if self.getArchitecture().startswith("arm"): 1520 # TODO: Handle case when setting breakpoint in thumb code 1521 BREAKPOINT_KIND = 4 1522 else: 1523 BREAKPOINT_KIND = 1 1524 self.reset_test_sequence() 1525 self.add_set_breakpoint_packets( 1526 function_address, 1527 do_continue=True, 1528 breakpoint_kind=BREAKPOINT_KIND) 1529 context = self.expect_gdbremote_sequence() 1530 self.assertIsNotNone(context) 1531 1532 # Remove the breakpoint. 1533 self.reset_test_sequence() 1534 self.add_remove_breakpoint_packets( 1535 function_address, breakpoint_kind=BREAKPOINT_KIND) 1536 context = self.expect_gdbremote_sequence() 1537 self.assertIsNotNone(context) 1538 1539 # Verify g_c1 and g_c2 match expected initial state. 1540 args = {} 1541 args["g_c1_address"] = g_c1_address 1542 args["g_c2_address"] = g_c2_address 1543 args["expected_g_c1"] = "0" 1544 args["expected_g_c2"] = "1" 1545 1546 self.assertTrue(self.g_c1_c2_contents_are(args)) 1547 1548 # Verify we take only a small number of steps to hit the first state. 1549 # Might need to work through function entry prologue code. 1550 args["expected_g_c1"] = "1" 1551 args["expected_g_c2"] = "1" 1552 (state_reached, 1553 step_count) = self.count_single_steps_until_true(main_thread_id, 1554 self.g_c1_c2_contents_are, 1555 args, 1556 max_step_count=25, 1557 use_Hc_packet=use_Hc_packet, 1558 step_instruction=step_instruction) 1559 self.assertTrue(state_reached) 1560 1561 # Verify we hit the next state. 1562 args["expected_g_c1"] = "1" 1563 args["expected_g_c2"] = "0" 1564 (state_reached, 1565 step_count) = self.count_single_steps_until_true(main_thread_id, 1566 self.g_c1_c2_contents_are, 1567 args, 1568 max_step_count=5, 1569 use_Hc_packet=use_Hc_packet, 1570 step_instruction=step_instruction) 1571 self.assertTrue(state_reached) 1572 expected_step_count = 1 1573 arch = self.getArchitecture() 1574 1575 # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation 1576 # of variable value 1577 if re.match("mips", arch): 1578 expected_step_count = 3 1579 # S390X requires "2" (LARL, MVI) machine instructions for updation of 1580 # variable value 1581 if re.match("s390x", arch): 1582 expected_step_count = 2 1583 # ARM64 requires "4" instructions: 2 to compute the address (adrp, add), 1584 # one to materialize the constant (mov) and the store 1585 if re.match("arm64", arch): 1586 expected_step_count = 4 1587 1588 self.assertEqual(step_count, expected_step_count) 1589 1590 # ARM64: Once addresses and constants are materialized, only one 1591 # instruction is needed. 1592 if re.match("arm64", arch): 1593 expected_step_count = 1 1594 1595 # Verify we hit the next state. 1596 args["expected_g_c1"] = "0" 1597 args["expected_g_c2"] = "0" 1598 (state_reached, 1599 step_count) = self.count_single_steps_until_true(main_thread_id, 1600 self.g_c1_c2_contents_are, 1601 args, 1602 max_step_count=5, 1603 use_Hc_packet=use_Hc_packet, 1604 step_instruction=step_instruction) 1605 self.assertTrue(state_reached) 1606 self.assertEqual(step_count, expected_step_count) 1607 1608 # Verify we hit the next state. 1609 args["expected_g_c1"] = "0" 1610 args["expected_g_c2"] = "1" 1611 (state_reached, 1612 step_count) = self.count_single_steps_until_true(main_thread_id, 1613 self.g_c1_c2_contents_are, 1614 args, 1615 max_step_count=5, 1616 use_Hc_packet=use_Hc_packet, 1617 step_instruction=step_instruction) 1618 self.assertTrue(state_reached) 1619 self.assertEqual(step_count, expected_step_count) 1620 1621 def maybe_strict_output_regex(self, regex): 1622 return '.*' + regex + \ 1623 '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$' 1624 1625 def install_and_create_launch_args(self): 1626 exe_path = self.getBuildArtifact("a.out") 1627 if not lldb.remote_platform: 1628 return [exe_path] 1629 remote_path = lldbutil.append_to_process_working_directory(self, 1630 os.path.basename(exe_path)) 1631 remote_file_spec = lldb.SBFileSpec(remote_path, False) 1632 err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True), 1633 remote_file_spec) 1634 if err.Fail(): 1635 raise Exception("remote_platform.Install('%s', '%s') failed: %s" % 1636 (exe_path, remote_path, err)) 1637 return [remote_path] 1638