1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62@add_metaclass(GdbRemoteTestCaseFactory)
63class GdbRemoteTestCaseBase(Base):
64
65    # Default time out in seconds. The timeout is increased tenfold under Asan.
66    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
67    # Default sleep time in seconds. The sleep time is doubled under Asan.
68    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
69
70    _GDBREMOTE_KILL_PACKET = b"$k#6b"
71
72    # Start the inferior separately, attach to the inferior on the stub
73    # command line.
74    _STARTUP_ATTACH = "attach"
75    # Start the inferior separately, start the stub without attaching, allow
76    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
77    _STARTUP_ATTACH_MANUALLY = "attach_manually"
78    # Start the stub, and launch the inferior with an $A packet via the
79    # initial packet stream.
80    _STARTUP_LAUNCH = "launch"
81
82    # GDB Signal numbers that are not target-specific used for common
83    # exceptions
84    TARGET_EXC_BAD_ACCESS = 0x91
85    TARGET_EXC_BAD_INSTRUCTION = 0x92
86    TARGET_EXC_ARITHMETIC = 0x93
87    TARGET_EXC_EMULATION = 0x94
88    TARGET_EXC_SOFTWARE = 0x95
89    TARGET_EXC_BREAKPOINT = 0x96
90
91    _verbose_log_handler = None
92    _log_formatter = logging.Formatter(
93        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
94
95    def setUpBaseLogging(self):
96        self.logger = logging.getLogger(__name__)
97
98        if len(self.logger.handlers) > 0:
99            return  # We have set up this handler already
100
101        self.logger.propagate = False
102        self.logger.setLevel(logging.DEBUG)
103
104        # log all warnings to stderr
105        handler = logging.StreamHandler()
106        handler.setLevel(logging.WARNING)
107        handler.setFormatter(self._log_formatter)
108        self.logger.addHandler(handler)
109
110    def isVerboseLoggingRequested(self):
111        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
112        # logged.
113        return any(("gdb-remote" in channel)
114                   for channel in lldbtest_config.channels)
115
116    def getDebugServer(self):
117        method = getattr(self, self.testMethodName)
118        return getattr(method, "debug_server", None)
119
120    def setUp(self):
121        super(GdbRemoteTestCaseBase, self).setUp()
122
123        self.setUpBaseLogging()
124        self.debug_monitor_extra_args = []
125
126        if self.isVerboseLoggingRequested():
127            # If requested, full logs go to a log file
128            self._verbose_log_handler = logging.FileHandler(
129                self.getLogBasenameForCurrentTest() + "-host.log")
130            self._verbose_log_handler.setFormatter(self._log_formatter)
131            self._verbose_log_handler.setLevel(logging.DEBUG)
132            self.logger.addHandler(self._verbose_log_handler)
133
134        self.test_sequence = GdbRemoteTestSequence(self.logger)
135        self.set_inferior_startup_launch()
136        self.port = self.get_next_port()
137        self.stub_sends_two_stop_notifications_on_kill = False
138        if configuration.lldb_platform_url:
139            if configuration.lldb_platform_url.startswith('unix-'):
140                url_pattern = '(.+)://\[?(.+?)\]?/.*'
141            else:
142                url_pattern = '(.+)://(.+):\d+'
143            scheme, host = re.match(
144                url_pattern, configuration.lldb_platform_url).groups()
145            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
146                self.stub_device = host
147                self.stub_hostname = 'localhost'
148            else:
149                self.stub_device = None
150                self.stub_hostname = host
151        else:
152            self.stub_hostname = "localhost"
153
154        debug_server = self.getDebugServer()
155        if debug_server == "debugserver":
156            self._init_debugserver_test()
157        else:
158            self._init_llgs_test()
159
160    def tearDown(self):
161        self.logger.removeHandler(self._verbose_log_handler)
162        self._verbose_log_handler = None
163        TestBase.tearDown(self)
164
165    def build(self, *args, **kwargs):
166        self.buildDefault(*args, **kwargs)
167
168    def getLocalServerLogFile(self):
169        return self.getLogBasenameForCurrentTest() + "-server.log"
170
171    def setUpServerLogging(self, is_llgs):
172        if len(lldbtest_config.channels) == 0:
173            return  # No logging requested
174
175        if lldb.remote_platform:
176            log_file = lldbutil.join_remote_paths(
177                lldb.remote_platform.GetWorkingDirectory(), "server.log")
178        else:
179            log_file = self.getLocalServerLogFile()
180
181        if is_llgs:
182            self.debug_monitor_extra_args.append("--log-file=" + log_file)
183            self.debug_monitor_extra_args.append(
184                "--log-channels={}".format(":".join(lldbtest_config.channels)))
185        else:
186            self.debug_monitor_extra_args = [
187                "--log-file=" + log_file, "--log-flags=0x800000"]
188
189    def get_next_port(self):
190        return 12000 + random.randint(0, 3999)
191
192    def reset_test_sequence(self):
193        self.test_sequence = GdbRemoteTestSequence(self.logger)
194
195
196    def _init_llgs_test(self):
197        reverse_connect = True
198        if lldb.remote_platform:
199            # Reverse connections may be tricky due to firewalls/NATs.
200            reverse_connect = False
201
202            # FIXME: This is extremely linux-oriented
203
204            # Grab the ppid from /proc/[shell pid]/stat
205            err, retcode, shell_stat = self.run_platform_command(
206                "cat /proc/$$/stat")
207            self.assertTrue(
208                err.Success() and retcode == 0,
209                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
210                (err.GetCString(),
211                 retcode))
212
213            # [pid] ([executable]) [state] [*ppid*]
214            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
215            err, retcode, ls_output = self.run_platform_command(
216                "ls -l /proc/%s/exe" % pid)
217            self.assertTrue(
218                err.Success() and retcode == 0,
219                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
220                (pid,
221                 err.GetCString(),
222                 retcode))
223            exe = ls_output.split()[-1]
224
225            # If the binary has been deleted, the link name has " (deleted)" appended.
226            # Remove if it's there.
227            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
228        else:
229            self.debug_monitor_exe = get_lldb_server_exe()
230            if not self.debug_monitor_exe:
231                self.skipTest("lldb-server exe not found")
232
233        self.debug_monitor_extra_args = ["gdbserver"]
234        self.setUpServerLogging(is_llgs=True)
235
236        self.reverse_connect = reverse_connect
237
238    def _init_debugserver_test(self):
239        self.debug_monitor_exe = get_debugserver_exe()
240        if not self.debug_monitor_exe:
241            self.skipTest("debugserver exe not found")
242        self.setUpServerLogging(is_llgs=False)
243        self.reverse_connect = True
244
245        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
246        # when the process truly dies.
247        self.stub_sends_two_stop_notifications_on_kill = True
248
249    def forward_adb_port(self, source, target, direction, device):
250        adb = ['adb'] + (['-s', device] if device else []) + [direction]
251
252        def remove_port_forward():
253            subprocess.call(adb + ["--remove", "tcp:%d" % source])
254
255        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
256        self.addTearDownHook(remove_port_forward)
257
258    def _verify_socket(self, sock):
259        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
260        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
261        # the connect() will always be successful, but the connection will be immediately dropped
262        # if ADB could not connect on the remote side. This function tries to detect this
263        # situation, and report it as "connection refused" so that the upper layers attempt the
264        # connection again.
265        triple = self.dbg.GetSelectedPlatform().GetTriple()
266        if not re.match(".*-.*-.*-android", triple):
267            return  # Not android.
268        can_read, _, _ = select.select([sock], [], [], 0.1)
269        if sock not in can_read:
270            return  # Data is not available, but the connection is alive.
271        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
272            raise _ConnectionRefused()  # Got EOF, connection dropped.
273
274    def create_socket(self):
275        try:
276            sock = socket.socket(family=socket.AF_INET)
277        except OSError as e:
278            if e.errno != errno.EAFNOSUPPORT:
279                raise
280            sock = socket.socket(family=socket.AF_INET6)
281
282        logger = self.logger
283
284        triple = self.dbg.GetSelectedPlatform().GetTriple()
285        if re.match(".*-.*-.*-android", triple):
286            self.forward_adb_port(
287                self.port,
288                self.port,
289                "forward",
290                self.stub_device)
291
292        logger.info(
293            "Connecting to debug monitor on %s:%d",
294            self.stub_hostname,
295            self.port)
296        connect_info = (self.stub_hostname, self.port)
297        try:
298            sock.connect(connect_info)
299        except socket.error as serr:
300            if serr.errno == errno.ECONNREFUSED:
301                raise _ConnectionRefused()
302            raise serr
303
304        def shutdown_socket():
305            if sock:
306                try:
307                    # send the kill packet so lldb-server shuts down gracefully
308                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
309                except:
310                    logger.warning(
311                        "failed to send kill packet to debug monitor: {}; ignoring".format(
312                            sys.exc_info()[0]))
313
314                try:
315                    sock.close()
316                except:
317                    logger.warning(
318                        "failed to close socket to debug monitor: {}; ignoring".format(
319                            sys.exc_info()[0]))
320
321        self.addTearDownHook(shutdown_socket)
322
323        self._verify_socket(sock)
324
325        return sock
326
327    def set_inferior_startup_launch(self):
328        self._inferior_startup = self._STARTUP_LAUNCH
329
330    def set_inferior_startup_attach(self):
331        self._inferior_startup = self._STARTUP_ATTACH
332
333    def set_inferior_startup_attach_manually(self):
334        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
335
336    def get_debug_monitor_command_line_args(self, attach_pid=None):
337        commandline_args = self.debug_monitor_extra_args
338        if attach_pid:
339            commandline_args += ["--attach=%d" % attach_pid]
340        if self.reverse_connect:
341            commandline_args += ["--reverse-connect", self.connect_address]
342        else:
343            if lldb.remote_platform:
344                commandline_args += ["*:{}".format(self.port)]
345            else:
346                commandline_args += ["localhost:{}".format(self.port)]
347
348        return commandline_args
349
350    def get_target_byte_order(self):
351        inferior_exe_path = self.getBuildArtifact("a.out")
352        target = self.dbg.CreateTarget(inferior_exe_path)
353        return target.GetByteOrder()
354
355    def launch_debug_monitor(self, attach_pid=None, logfile=None):
356        if self.reverse_connect:
357            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
358            sock = socket.socket(family, type, proto)
359            sock.settimeout(self.DEFAULT_TIMEOUT)
360
361            sock.bind(addr)
362            sock.listen(1)
363            addr = sock.getsockname()
364            self.connect_address = "[{}]:{}".format(*addr)
365
366
367        # Create the command line.
368        commandline_args = self.get_debug_monitor_command_line_args(
369            attach_pid=attach_pid)
370
371        # Start the server.
372        server = self.spawnSubprocess(
373            self.debug_monitor_exe,
374            commandline_args,
375            install_remote=False)
376        self.assertIsNotNone(server)
377
378        if self.reverse_connect:
379            self.sock = sock.accept()[0]
380            self.sock.settimeout(self.DEFAULT_TIMEOUT)
381
382        return server
383
384    def connect_to_debug_monitor(self, attach_pid=None):
385        if self.reverse_connect:
386            # Create the stub.
387            server = self.launch_debug_monitor(attach_pid=attach_pid)
388            self.assertIsNotNone(server)
389
390            # Schedule debug monitor to be shut down during teardown.
391            logger = self.logger
392
393            self._server = Server(self.sock, server)
394            return server
395
396        # We're using a random port algorithm to try not to collide with other ports,
397        # and retry a max # times.
398        attempts = 0
399        MAX_ATTEMPTS = 20
400
401        while attempts < MAX_ATTEMPTS:
402            server = self.launch_debug_monitor(attach_pid=attach_pid)
403
404            # Schedule debug monitor to be shut down during teardown.
405            logger = self.logger
406
407            connect_attemps = 0
408            MAX_CONNECT_ATTEMPTS = 10
409
410            while connect_attemps < MAX_CONNECT_ATTEMPTS:
411                # Create a socket to talk to the server
412                try:
413                    logger.info("Connect attempt %d", connect_attemps + 1)
414                    self.sock = self.create_socket()
415                    self._server = Server(self.sock, server)
416                    return server
417                except _ConnectionRefused as serr:
418                    # Ignore, and try again.
419                    pass
420                time.sleep(0.5)
421                connect_attemps += 1
422
423            # We should close the server here to be safe.
424            server.terminate()
425
426            # Increment attempts.
427            print(
428                "connect to debug monitor on port %d failed, attempt #%d of %d" %
429                (self.port, attempts + 1, MAX_ATTEMPTS))
430            attempts += 1
431
432            # And wait a random length of time before next attempt, to avoid
433            # collisions.
434            time.sleep(random.randint(1, 5))
435
436            # Now grab a new port number.
437            self.port = self.get_next_port()
438
439        raise Exception(
440            "failed to create a socket to the launched debug monitor after %d tries" %
441            attempts)
442
443    def launch_process_for_attach(
444            self,
445            inferior_args=None,
446            sleep_seconds=3,
447            exe_path=None):
448        # We're going to start a child process that the debug monitor stub can later attach to.
449        # This process needs to be started so that it just hangs around for a while.  We'll
450        # have it sleep.
451        if not exe_path:
452            exe_path = self.getBuildArtifact("a.out")
453
454        args = []
455        if inferior_args:
456            args.extend(inferior_args)
457        if sleep_seconds:
458            args.append("sleep:%d" % sleep_seconds)
459
460        return self.spawnSubprocess(exe_path, args)
461
462    def prep_debug_monitor_and_inferior(
463            self,
464            inferior_args=None,
465            inferior_sleep_seconds=3,
466            inferior_exe_path=None,
467            inferior_env=None):
468        """Prep the debug monitor, the inferior, and the expected packet stream.
469
470        Handle the separate cases of using the debug monitor in attach-to-inferior mode
471        and in launch-inferior mode.
472
473        For attach-to-inferior mode, the inferior process is first started, then
474        the debug monitor is started in attach to pid mode (using --attach on the
475        stub command line), and the no-ack-mode setup is appended to the packet
476        stream.  The packet stream is not yet executed, ready to have more expected
477        packet entries added to it.
478
479        For launch-inferior mode, the stub is first started, then no ack mode is
480        setup on the expected packet stream, then the verified launch packets are added
481        to the expected socket stream.  The packet stream is not yet executed, ready
482        to have more expected packet entries added to it.
483
484        The return value is:
485        {inferior:<inferior>, server:<server>}
486        """
487        inferior = None
488        attach_pid = None
489
490        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
491            # Launch the process that we'll use as the inferior.
492            inferior = self.launch_process_for_attach(
493                inferior_args=inferior_args,
494                sleep_seconds=inferior_sleep_seconds,
495                exe_path=inferior_exe_path)
496            self.assertIsNotNone(inferior)
497            self.assertTrue(inferior.pid > 0)
498            if self._inferior_startup == self._STARTUP_ATTACH:
499                # In this case, we want the stub to attach via the command
500                # line, so set the command line attach pid here.
501                attach_pid = inferior.pid
502
503        if self._inferior_startup == self._STARTUP_LAUNCH:
504            # Build launch args
505            if not inferior_exe_path:
506                inferior_exe_path = self.getBuildArtifact("a.out")
507
508            if lldb.remote_platform:
509                remote_path = lldbutil.append_to_process_working_directory(self,
510                    os.path.basename(inferior_exe_path))
511                remote_file_spec = lldb.SBFileSpec(remote_path, False)
512                err = lldb.remote_platform.Install(lldb.SBFileSpec(
513                    inferior_exe_path, True), remote_file_spec)
514                if err.Fail():
515                    raise Exception(
516                        "remote_platform.Install('%s', '%s') failed: %s" %
517                        (inferior_exe_path, remote_path, err))
518                inferior_exe_path = remote_path
519
520            launch_args = [inferior_exe_path]
521            if inferior_args:
522                launch_args.extend(inferior_args)
523
524        # Launch the debug monitor stub, attaching to the inferior.
525        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
526        self.assertIsNotNone(server)
527
528        # Build the expected protocol stream
529        self.add_no_ack_remote_stream()
530        if inferior_env:
531            for name, value in inferior_env.items():
532                self.add_set_environment_packets(name, value)
533        if self._inferior_startup == self._STARTUP_LAUNCH:
534            self.add_verified_launch_packets(launch_args)
535
536        return {"inferior": inferior, "server": server}
537
538    def expect_socket_recv(
539            self,
540            sock,
541            expected_content_regex
542            ):
543        response = ""
544        timeout_time = time.time() + self.DEFAULT_TIMEOUT
545
546        while not expected_content_regex.match(
547                response) and time.time() < timeout_time:
548            can_read, _, _ = select.select([sock], [], [], self.DEFAULT_TIMEOUT)
549            if can_read and sock in can_read:
550                recv_bytes = sock.recv(4096)
551                if recv_bytes:
552                    response += seven.bitcast_to_string(recv_bytes)
553
554        self.assertTrue(expected_content_regex.match(response))
555
556    def expect_socket_send(self, sock, content):
557        request_bytes_remaining = content
558        timeout_time = time.time() + self.DEFAULT_TIMEOUT
559
560        while len(request_bytes_remaining) > 0 and time.time() < timeout_time:
561            _, can_write, _ = select.select([], [sock], [], self.DEFAULT_TIMEOUT)
562            if can_write and sock in can_write:
563                written_byte_count = sock.send(request_bytes_remaining.encode())
564                request_bytes_remaining = request_bytes_remaining[
565                    written_byte_count:]
566        self.assertEqual(len(request_bytes_remaining), 0)
567
568    def do_handshake(self, stub_socket):
569        # Write the ack.
570        self.expect_socket_send(stub_socket, "+")
571
572        # Send the start no ack mode packet.
573        NO_ACK_MODE_REQUEST = "$QStartNoAckMode#b0"
574        bytes_sent = stub_socket.send(NO_ACK_MODE_REQUEST.encode())
575        self.assertEqual(bytes_sent, len(NO_ACK_MODE_REQUEST))
576
577        # Receive the ack and "OK"
578        self.expect_socket_recv(stub_socket, re.compile(
579            r"^\+\$OK#[0-9a-fA-F]{2}$"))
580
581        # Send the final ack.
582        self.expect_socket_send(stub_socket, "+")
583
584    def add_no_ack_remote_stream(self):
585        self.test_sequence.add_log_lines(
586            ["read packet: +",
587             "read packet: $QStartNoAckMode#b0",
588             "send packet: +",
589             "send packet: $OK#9a",
590             "read packet: +"],
591            True)
592
593    def add_verified_launch_packets(self, launch_args):
594        self.test_sequence.add_log_lines(
595            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
596             "send packet: $OK#00",
597             "read packet: $qLaunchSuccess#a5",
598             "send packet: $OK#00"],
599            True)
600
601    def add_thread_suffix_request_packets(self):
602        self.test_sequence.add_log_lines(
603            ["read packet: $QThreadSuffixSupported#e4",
604             "send packet: $OK#00",
605             ], True)
606
607    def add_process_info_collection_packets(self):
608        self.test_sequence.add_log_lines(
609            ["read packet: $qProcessInfo#dc",
610             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
611            True)
612
613    def add_set_environment_packets(self, name, value):
614        self.test_sequence.add_log_lines(
615            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
616             "send packet: $OK#00",
617             ], True)
618
619    _KNOWN_PROCESS_INFO_KEYS = [
620        "pid",
621        "parent-pid",
622        "real-uid",
623        "real-gid",
624        "effective-uid",
625        "effective-gid",
626        "cputype",
627        "cpusubtype",
628        "ostype",
629        "triple",
630        "vendor",
631        "endian",
632        "elf_abi",
633        "ptrsize"
634    ]
635
636    def parse_process_info_response(self, context):
637        # Ensure we have a process info response.
638        self.assertIsNotNone(context)
639        process_info_raw = context.get("process_info_raw")
640        self.assertIsNotNone(process_info_raw)
641
642        # Pull out key:value; pairs.
643        process_info_dict = {
644            match.group(1): match.group(2) for match in re.finditer(
645                r"([^:]+):([^;]+);", process_info_raw)}
646
647        # Validate keys are known.
648        for (key, val) in list(process_info_dict.items()):
649            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
650            self.assertIsNotNone(val)
651
652        return process_info_dict
653
654    def add_register_info_collection_packets(self):
655        self.test_sequence.add_log_lines(
656            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
657                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
658                "save_key": "reg_info_responses"}],
659            True)
660
661    def parse_register_info_packets(self, context):
662        """Return an array of register info dictionaries, one per register info."""
663        reg_info_responses = context.get("reg_info_responses")
664        self.assertIsNotNone(reg_info_responses)
665
666        # Parse register infos.
667        return [parse_reg_info_response(reg_info_response)
668                for reg_info_response in reg_info_responses]
669
670    def expect_gdbremote_sequence(self):
671        return expect_lldb_gdbserver_replay(
672            self,
673            self._server,
674            self.test_sequence,
675            self.DEFAULT_TIMEOUT * len(self.test_sequence),
676            self.logger)
677
678    _KNOWN_REGINFO_KEYS = [
679        "name",
680        "alt-name",
681        "bitsize",
682        "offset",
683        "encoding",
684        "format",
685        "set",
686        "gcc",
687        "ehframe",
688        "dwarf",
689        "generic",
690        "container-regs",
691        "invalidate-regs",
692        "dynamic_size_dwarf_expr_bytes",
693        "dynamic_size_dwarf_len"
694    ]
695
696    def assert_valid_reg_info(self, reg_info):
697        # Assert we know about all the reginfo keys parsed.
698        for key in reg_info:
699            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
700
701        # Check the bare-minimum expected set of register info keys.
702        self.assertTrue("name" in reg_info)
703        self.assertTrue("bitsize" in reg_info)
704
705        if not self.getArchitecture() == 'aarch64':
706            self.assertTrue("offset" in reg_info)
707
708        self.assertTrue("encoding" in reg_info)
709        self.assertTrue("format" in reg_info)
710
711    def find_pc_reg_info(self, reg_infos):
712        lldb_reg_index = 0
713        for reg_info in reg_infos:
714            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
715                return (lldb_reg_index, reg_info)
716            lldb_reg_index += 1
717
718        return (None, None)
719
720    def add_lldb_register_index(self, reg_infos):
721        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
722
723        We'll use this when we want to call packets like P/p with a register index but do so
724        on only a subset of the full register info set.
725        """
726        self.assertIsNotNone(reg_infos)
727
728        reg_index = 0
729        for reg_info in reg_infos:
730            reg_info["lldb_register_index"] = reg_index
731            reg_index += 1
732
733    def add_query_memory_region_packets(self, address):
734        self.test_sequence.add_log_lines(
735            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
736             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
737            True)
738
739    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
740        self.assertIsNotNone(key_val_text)
741        kv_dict = {}
742        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
743            key = match.group(1)
744            val = match.group(2)
745            if key in kv_dict:
746                if allow_dupes:
747                    if isinstance(kv_dict[key], list):
748                        kv_dict[key].append(val)
749                    else:
750                        # Promote to list
751                        kv_dict[key] = [kv_dict[key], val]
752                else:
753                    self.fail(
754                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
755                            key, val, key_val_text, kv_dict))
756            else:
757                kv_dict[key] = val
758        return kv_dict
759
760    def parse_memory_region_packet(self, context):
761        # Ensure we have a context.
762        self.assertIsNotNone(context.get("memory_region_response"))
763
764        # Pull out key:value; pairs.
765        mem_region_dict = self.parse_key_val_dict(
766            context.get("memory_region_response"))
767
768        # Validate keys are known.
769        for (key, val) in list(mem_region_dict.items()):
770            self.assertIn(key,
771                ["start",
772                 "size",
773                 "permissions",
774                 "flags",
775                 "name",
776                 "error"])
777            self.assertIsNotNone(val)
778
779        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
780        # Return the dictionary of key-value pairs for the memory region.
781        return mem_region_dict
782
783    def assert_address_within_memory_region(
784            self, test_address, mem_region_dict):
785        self.assertIsNotNone(mem_region_dict)
786        self.assertTrue("start" in mem_region_dict)
787        self.assertTrue("size" in mem_region_dict)
788
789        range_start = int(mem_region_dict["start"], 16)
790        range_size = int(mem_region_dict["size"], 16)
791        range_end = range_start + range_size
792
793        if test_address < range_start:
794            self.fail(
795                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
796                    test_address,
797                    range_start,
798                    range_end,
799                    range_size))
800        elif test_address >= range_end:
801            self.fail(
802                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
803                    test_address,
804                    range_start,
805                    range_end,
806                    range_size))
807
808    def add_threadinfo_collection_packets(self):
809        self.test_sequence.add_log_lines(
810            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
811                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
812                "save_key": "threadinfo_responses"}],
813            True)
814
815    def parse_threadinfo_packets(self, context):
816        """Return an array of thread ids (decimal ints), one per thread."""
817        threadinfo_responses = context.get("threadinfo_responses")
818        self.assertIsNotNone(threadinfo_responses)
819
820        thread_ids = []
821        for threadinfo_response in threadinfo_responses:
822            new_thread_infos = parse_threadinfo_response(threadinfo_response)
823            thread_ids.extend(new_thread_infos)
824        return thread_ids
825
826    def wait_for_thread_count(self, thread_count):
827        start_time = time.time()
828        timeout_time = start_time + self.DEFAULT_TIMEOUT
829
830        actual_thread_count = 0
831        while actual_thread_count < thread_count:
832            self.reset_test_sequence()
833            self.add_threadinfo_collection_packets()
834
835            context = self.expect_gdbremote_sequence()
836            self.assertIsNotNone(context)
837
838            threads = self.parse_threadinfo_packets(context)
839            self.assertIsNotNone(threads)
840
841            actual_thread_count = len(threads)
842
843            if time.time() > timeout_time:
844                raise Exception(
845                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
846                        self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
847
848        return threads
849
850    def add_set_breakpoint_packets(
851            self,
852            address,
853            z_packet_type=0,
854            do_continue=True,
855            breakpoint_kind=1):
856        self.test_sequence.add_log_lines(
857            [  # Set the breakpoint.
858                "read packet: $Z{2},{0:x},{1}#00".format(
859                    address, breakpoint_kind, z_packet_type),
860                # Verify the stub could set it.
861                "send packet: $OK#00",
862            ], True)
863
864        if (do_continue):
865            self.test_sequence.add_log_lines(
866                [  # Continue the inferior.
867                    "read packet: $c#63",
868                    # Expect a breakpoint stop report.
869                    {"direction": "send",
870                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
871                     "capture": {1: "stop_signo",
872                                 2: "stop_thread_id"}},
873                ], True)
874
875    def add_remove_breakpoint_packets(
876            self,
877            address,
878            z_packet_type=0,
879            breakpoint_kind=1):
880        self.test_sequence.add_log_lines(
881            [  # Remove the breakpoint.
882                "read packet: $z{2},{0:x},{1}#00".format(
883                    address, breakpoint_kind, z_packet_type),
884                # Verify the stub could unset it.
885                "send packet: $OK#00",
886            ], True)
887
888    def add_qSupported_packets(self):
889        self.test_sequence.add_log_lines(
890            ["read packet: $qSupported#00",
891             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
892             ], True)
893
894    _KNOWN_QSUPPORTED_STUB_FEATURES = [
895        "augmented-libraries-svr4-read",
896        "PacketSize",
897        "QStartNoAckMode",
898        "QThreadSuffixSupported",
899        "QListThreadsInStopReply",
900        "qXfer:auxv:read",
901        "qXfer:libraries:read",
902        "qXfer:libraries-svr4:read",
903        "qXfer:features:read",
904        "qEcho",
905        "QPassSignals"
906    ]
907
908    def parse_qSupported_response(self, context):
909        self.assertIsNotNone(context)
910
911        raw_response = context.get("qSupported_response")
912        self.assertIsNotNone(raw_response)
913
914        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
915        # +,-,? is stripped from the key and set as the value.
916        supported_dict = {}
917        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
918            key = match.group(1)
919            val = match.group(3)
920
921            # key=val: store as is
922            if val and len(val) > 0:
923                supported_dict[key] = val
924            else:
925                if len(key) < 2:
926                    raise Exception(
927                        "singular stub feature is too short: must be stub_feature{+,-,?}")
928                supported_type = key[-1]
929                key = key[:-1]
930                if not supported_type in ["+", "-", "?"]:
931                    raise Exception(
932                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
933                supported_dict[key] = supported_type
934            # Ensure we know the supported element
935            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
936                raise Exception(
937                    "unknown qSupported stub feature reported: %s" %
938                    key)
939
940        return supported_dict
941
942    def run_process_then_stop(self, run_seconds=1):
943        # Tell the stub to continue.
944        self.test_sequence.add_log_lines(
945            ["read packet: $vCont;c#a8"],
946            True)
947        context = self.expect_gdbremote_sequence()
948
949        # Wait for run_seconds.
950        time.sleep(run_seconds)
951
952        # Send an interrupt, capture a T response.
953        self.reset_test_sequence()
954        self.test_sequence.add_log_lines(
955            ["read packet: {}".format(chr(3)),
956             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
957            True)
958        context = self.expect_gdbremote_sequence()
959        self.assertIsNotNone(context)
960        self.assertIsNotNone(context.get("stop_result"))
961
962        return context
963
964    def continue_process_and_wait_for_stop(self):
965        self.test_sequence.add_log_lines(
966            [
967                "read packet: $vCont;c#a8",
968                {
969                    "direction": "send",
970                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
971                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
972                },
973            ],
974            True,
975        )
976        context = self.expect_gdbremote_sequence()
977        self.assertIsNotNone(context)
978        return self.parse_interrupt_packets(context)
979
980    def select_modifiable_register(self, reg_infos):
981        """Find a register that can be read/written freely."""
982        PREFERRED_REGISTER_NAMES = set(["rax", ])
983
984        # First check for the first register from the preferred register name
985        # set.
986        alternative_register_index = None
987
988        self.assertIsNotNone(reg_infos)
989        for reg_info in reg_infos:
990            if ("name" in reg_info) and (
991                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
992                # We found a preferred register.  Use it.
993                return reg_info["lldb_register_index"]
994            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
995                    reg_info["generic"] == "arg1"):
996                # A frame pointer or first arg register will do as a
997                # register to modify temporarily.
998                alternative_register_index = reg_info["lldb_register_index"]
999
1000        # We didn't find a preferred register.  Return whatever alternative register
1001        # we found, if any.
1002        return alternative_register_index
1003
1004    def extract_registers_from_stop_notification(self, stop_key_vals_text):
1005        self.assertIsNotNone(stop_key_vals_text)
1006        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
1007
1008        registers = {}
1009        for (key, val) in list(kv_dict.items()):
1010            if re.match(r"^[0-9a-fA-F]+$", key):
1011                registers[int(key, 16)] = val
1012        return registers
1013
1014    def gather_register_infos(self):
1015        self.reset_test_sequence()
1016        self.add_register_info_collection_packets()
1017
1018        context = self.expect_gdbremote_sequence()
1019        self.assertIsNotNone(context)
1020
1021        reg_infos = self.parse_register_info_packets(context)
1022        self.assertIsNotNone(reg_infos)
1023        self.add_lldb_register_index(reg_infos)
1024
1025        return reg_infos
1026
1027    def find_generic_register_with_name(self, reg_infos, generic_name):
1028        self.assertIsNotNone(reg_infos)
1029        for reg_info in reg_infos:
1030            if ("generic" in reg_info) and (
1031                    reg_info["generic"] == generic_name):
1032                return reg_info
1033        return None
1034
1035    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
1036        self.assertIsNotNone(reg_infos)
1037        for reg_info in reg_infos:
1038            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
1039                return reg_info
1040        return None
1041
1042    def decode_gdbremote_binary(self, encoded_bytes):
1043        decoded_bytes = ""
1044        i = 0
1045        while i < len(encoded_bytes):
1046            if encoded_bytes[i] == "}":
1047                # Handle escaped char.
1048                self.assertTrue(i + 1 < len(encoded_bytes))
1049                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1050                i += 2
1051            elif encoded_bytes[i] == "*":
1052                # Handle run length encoding.
1053                self.assertTrue(len(decoded_bytes) > 0)
1054                self.assertTrue(i + 1 < len(encoded_bytes))
1055                repeat_count = ord(encoded_bytes[i + 1]) - 29
1056                decoded_bytes += decoded_bytes[-1] * repeat_count
1057                i += 2
1058            else:
1059                decoded_bytes += encoded_bytes[i]
1060                i += 1
1061        return decoded_bytes
1062
1063    def build_auxv_dict(self, endian, word_size, auxv_data):
1064        self.assertIsNotNone(endian)
1065        self.assertIsNotNone(word_size)
1066        self.assertIsNotNone(auxv_data)
1067
1068        auxv_dict = {}
1069
1070        # PowerPC64le's auxvec has a special key that must be ignored.
1071        # This special key may be used multiple times, resulting in
1072        # multiple key/value pairs with the same key, which would otherwise
1073        # break this test check for repeated keys.
1074        #
1075        # AT_IGNOREPPC = 22
1076        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1077        arch = self.getArchitecture()
1078        ignore_keys = None
1079        if arch in ignored_keys_for_arch:
1080            ignore_keys = ignored_keys_for_arch[arch]
1081
1082        while len(auxv_data) > 0:
1083            # Chop off key.
1084            raw_key = auxv_data[:word_size]
1085            auxv_data = auxv_data[word_size:]
1086
1087            # Chop of value.
1088            raw_value = auxv_data[:word_size]
1089            auxv_data = auxv_data[word_size:]
1090
1091            # Convert raw text from target endian.
1092            key = unpack_endian_binary_string(endian, raw_key)
1093            value = unpack_endian_binary_string(endian, raw_value)
1094
1095            if ignore_keys and key in ignore_keys:
1096                continue
1097
1098            # Handle ending entry.
1099            if key == 0:
1100                self.assertEqual(value, 0)
1101                return auxv_dict
1102
1103            # The key should not already be present.
1104            self.assertFalse(key in auxv_dict)
1105            auxv_dict[key] = value
1106
1107        self.fail(
1108            "should not reach here - implies required double zero entry not found")
1109        return auxv_dict
1110
1111    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1112        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1113        offset = 0
1114        done = False
1115        decoded_data = ""
1116
1117        while not done:
1118            # Grab the next iteration of data.
1119            self.reset_test_sequence()
1120            self.test_sequence.add_log_lines(
1121                [
1122                    "read packet: ${}{:x},{:x}:#00".format(
1123                        command_prefix,
1124                        offset,
1125                        chunk_length),
1126                    {
1127                        "direction": "send",
1128                        "regex": re.compile(
1129                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1130                            re.MULTILINE | re.DOTALL),
1131                        "capture": {
1132                            1: "response_type",
1133                            2: "content_raw"}}],
1134                True)
1135
1136            context = self.expect_gdbremote_sequence()
1137            self.assertIsNotNone(context)
1138
1139            response_type = context.get("response_type")
1140            self.assertIsNotNone(response_type)
1141            self.assertTrue(response_type in ["l", "m"])
1142
1143            # Move offset along.
1144            offset += chunk_length
1145
1146            # Figure out if we're done.  We're done if the response type is l.
1147            done = response_type == "l"
1148
1149            # Decode binary data.
1150            content_raw = context.get("content_raw")
1151            if content_raw and len(content_raw) > 0:
1152                self.assertIsNotNone(content_raw)
1153                decoded_data += self.decode_gdbremote_binary(content_raw)
1154        return decoded_data
1155
1156    def add_interrupt_packets(self):
1157        self.test_sequence.add_log_lines([
1158            # Send the intterupt.
1159            "read packet: {}".format(chr(3)),
1160            # And wait for the stop notification.
1161            {"direction": "send",
1162             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1163             "capture": {1: "stop_signo",
1164                         2: "stop_key_val_text"}},
1165        ], True)
1166
1167    def parse_interrupt_packets(self, context):
1168        self.assertIsNotNone(context.get("stop_signo"))
1169        self.assertIsNotNone(context.get("stop_key_val_text"))
1170        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1171            context["stop_key_val_text"]))
1172
1173    def add_QSaveRegisterState_packets(self, thread_id):
1174        if thread_id:
1175            # Use the thread suffix form.
1176            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1177                thread_id)
1178        else:
1179            request = "read packet: $QSaveRegisterState#00"
1180
1181        self.test_sequence.add_log_lines([request,
1182                                          {"direction": "send",
1183                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1184                                           "capture": {1: "save_response"}},
1185                                          ],
1186                                         True)
1187
1188    def parse_QSaveRegisterState_response(self, context):
1189        self.assertIsNotNone(context)
1190
1191        save_response = context.get("save_response")
1192        self.assertIsNotNone(save_response)
1193
1194        if len(save_response) < 1 or save_response[0] == "E":
1195            # error received
1196            return (False, None)
1197        else:
1198            return (True, int(save_response))
1199
1200    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1201        if thread_id:
1202            # Use the thread suffix form.
1203            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1204                save_id, thread_id)
1205        else:
1206            request = "read packet: $QRestoreRegisterState:{}#00".format(
1207                save_id)
1208
1209        self.test_sequence.add_log_lines([
1210            request,
1211            "send packet: $OK#00"
1212        ], True)
1213
1214    def flip_all_bits_in_each_register_value(
1215            self, reg_infos, endian, thread_id=None):
1216        self.assertIsNotNone(reg_infos)
1217
1218        successful_writes = 0
1219        failed_writes = 0
1220
1221        for reg_info in reg_infos:
1222            # Use the lldb register index added to the reg info.  We're not necessarily
1223            # working off a full set of register infos, so an inferred register
1224            # index could be wrong.
1225            reg_index = reg_info["lldb_register_index"]
1226            self.assertIsNotNone(reg_index)
1227
1228            reg_byte_size = int(reg_info["bitsize"]) // 8
1229            self.assertTrue(reg_byte_size > 0)
1230
1231            # Handle thread suffix.
1232            if thread_id:
1233                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1234                    reg_index, thread_id)
1235            else:
1236                p_request = "read packet: $p{:x}#00".format(reg_index)
1237
1238            # Read the existing value.
1239            self.reset_test_sequence()
1240            self.test_sequence.add_log_lines([
1241                p_request,
1242                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1243            ], True)
1244            context = self.expect_gdbremote_sequence()
1245            self.assertIsNotNone(context)
1246
1247            # Verify the response length.
1248            p_response = context.get("p_response")
1249            self.assertIsNotNone(p_response)
1250            initial_reg_value = unpack_register_hex_unsigned(
1251                endian, p_response)
1252
1253            # Flip the value by xoring with all 1s
1254            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1255            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1256            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1257
1258            # Handle thread suffix for P.
1259            if thread_id:
1260                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1261                    reg_index, pack_register_hex(
1262                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1263            else:
1264                P_request = "read packet: $P{:x}={}#00".format(
1265                    reg_index, pack_register_hex(
1266                        endian, flipped_bits_int, byte_size=reg_byte_size))
1267
1268            # Write the flipped value to the register.
1269            self.reset_test_sequence()
1270            self.test_sequence.add_log_lines([P_request,
1271                                              {"direction": "send",
1272                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1273                                               "capture": {1: "P_response"}},
1274                                              ],
1275                                             True)
1276            context = self.expect_gdbremote_sequence()
1277            self.assertIsNotNone(context)
1278
1279            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1280            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1281            # all flipping perfectly.
1282            P_response = context.get("P_response")
1283            self.assertIsNotNone(P_response)
1284            if P_response == "OK":
1285                successful_writes += 1
1286            else:
1287                failed_writes += 1
1288                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1289
1290            # Read back the register value, ensure it matches the flipped
1291            # value.
1292            if P_response == "OK":
1293                self.reset_test_sequence()
1294                self.test_sequence.add_log_lines([
1295                    p_request,
1296                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1297                ], True)
1298                context = self.expect_gdbremote_sequence()
1299                self.assertIsNotNone(context)
1300
1301                verify_p_response_raw = context.get("p_response")
1302                self.assertIsNotNone(verify_p_response_raw)
1303                verify_bits = unpack_register_hex_unsigned(
1304                    endian, verify_p_response_raw)
1305
1306                if verify_bits != flipped_bits_int:
1307                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1308                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1309                    successful_writes -= 1
1310                    failed_writes += 1
1311
1312        return (successful_writes, failed_writes)
1313
1314    def is_bit_flippable_register(self, reg_info):
1315        if not reg_info:
1316            return False
1317        if not "set" in reg_info:
1318            return False
1319        if reg_info["set"] != "General Purpose Registers":
1320            return False
1321        if ("container-regs" in reg_info) and (
1322                len(reg_info["container-regs"]) > 0):
1323            # Don't try to bit flip registers contained in another register.
1324            return False
1325        if re.match("^.s$", reg_info["name"]):
1326            # This is a 2-letter register name that ends in "s", like a segment register.
1327            # Don't try to bit flip these.
1328            return False
1329        if re.match("^(c|)psr$", reg_info["name"]):
1330            # This is an ARM program status register; don't flip it.
1331            return False
1332        # Okay, this looks fine-enough.
1333        return True
1334
1335    def read_register_values(self, reg_infos, endian, thread_id=None):
1336        self.assertIsNotNone(reg_infos)
1337        values = {}
1338
1339        for reg_info in reg_infos:
1340            # We append a register index when load reg infos so we can work
1341            # with subsets.
1342            reg_index = reg_info.get("lldb_register_index")
1343            self.assertIsNotNone(reg_index)
1344
1345            # Handle thread suffix.
1346            if thread_id:
1347                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1348                    reg_index, thread_id)
1349            else:
1350                p_request = "read packet: $p{:x}#00".format(reg_index)
1351
1352            # Read it with p.
1353            self.reset_test_sequence()
1354            self.test_sequence.add_log_lines([
1355                p_request,
1356                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1357            ], True)
1358            context = self.expect_gdbremote_sequence()
1359            self.assertIsNotNone(context)
1360
1361            # Convert value from target endian to integral.
1362            p_response = context.get("p_response")
1363            self.assertIsNotNone(p_response)
1364            self.assertTrue(len(p_response) > 0)
1365            self.assertFalse(p_response[0] == "E")
1366
1367            values[reg_index] = unpack_register_hex_unsigned(
1368                endian, p_response)
1369
1370        return values
1371
1372    def add_vCont_query_packets(self):
1373        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1374                                          {"direction": "send",
1375                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1376                                           "capture": {2: "vCont_query_response"}},
1377                                          ],
1378                                         True)
1379
1380    def parse_vCont_query_response(self, context):
1381        self.assertIsNotNone(context)
1382        vCont_query_response = context.get("vCont_query_response")
1383
1384        # Handle case of no vCont support at all - in which case the capture
1385        # group will be none or zero length.
1386        if not vCont_query_response or len(vCont_query_response) == 0:
1387            return {}
1388
1389        return {key: 1 for key in vCont_query_response.split(
1390            ";") if key and len(key) > 0}
1391
1392    def count_single_steps_until_true(
1393            self,
1394            thread_id,
1395            predicate,
1396            args,
1397            max_step_count=100,
1398            use_Hc_packet=True,
1399            step_instruction="s"):
1400        """Used by single step test that appears in a few different contexts."""
1401        single_step_count = 0
1402
1403        while single_step_count < max_step_count:
1404            self.assertIsNotNone(thread_id)
1405
1406            # Build the packet for the single step instruction.  We replace
1407            # {thread}, if present, with the thread_id.
1408            step_packet = "read packet: ${}#00".format(
1409                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1410            # print("\nstep_packet created: {}\n".format(step_packet))
1411
1412            # Single step.
1413            self.reset_test_sequence()
1414            if use_Hc_packet:
1415                self.test_sequence.add_log_lines(
1416                    [  # Set the continue thread.
1417                        "read packet: $Hc{0:x}#00".format(thread_id),
1418                        "send packet: $OK#00",
1419                    ], True)
1420            self.test_sequence.add_log_lines([
1421                # Single step.
1422                step_packet,
1423                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1424                # Expect a breakpoint stop report.
1425                {"direction": "send",
1426                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1427                 "capture": {1: "stop_signo",
1428                             2: "stop_thread_id"}},
1429            ], True)
1430            context = self.expect_gdbremote_sequence()
1431            self.assertIsNotNone(context)
1432            self.assertIsNotNone(context.get("stop_signo"))
1433            self.assertEqual(int(context.get("stop_signo"), 16),
1434                             lldbutil.get_signal_number('SIGTRAP'))
1435
1436            single_step_count += 1
1437
1438            # See if the predicate is true.  If so, we're done.
1439            if predicate(args):
1440                return (True, single_step_count)
1441
1442        # The predicate didn't return true within the runaway step count.
1443        return (False, single_step_count)
1444
1445    def g_c1_c2_contents_are(self, args):
1446        """Used by single step test that appears in a few different contexts."""
1447        g_c1_address = args["g_c1_address"]
1448        g_c2_address = args["g_c2_address"]
1449        expected_g_c1 = args["expected_g_c1"]
1450        expected_g_c2 = args["expected_g_c2"]
1451
1452        # Read g_c1 and g_c2 contents.
1453        self.reset_test_sequence()
1454        self.test_sequence.add_log_lines(
1455            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1456             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1457             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1458             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1459            True)
1460
1461        # Run the packet stream.
1462        context = self.expect_gdbremote_sequence()
1463        self.assertIsNotNone(context)
1464
1465        # Check if what we read from inferior memory is what we are expecting.
1466        self.assertIsNotNone(context.get("g_c1_contents"))
1467        self.assertIsNotNone(context.get("g_c2_contents"))
1468
1469        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1470            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1471
1472    def single_step_only_steps_one_instruction(
1473            self, use_Hc_packet=True, step_instruction="s"):
1474        """Used by single step test that appears in a few different contexts."""
1475        # Start up the inferior.
1476        procs = self.prep_debug_monitor_and_inferior(
1477            inferior_args=[
1478                "get-code-address-hex:swap_chars",
1479                "get-data-address-hex:g_c1",
1480                "get-data-address-hex:g_c2",
1481                "sleep:1",
1482                "call-function:swap_chars",
1483                "sleep:5"])
1484
1485        # Run the process
1486        self.test_sequence.add_log_lines(
1487            [  # Start running after initial stop.
1488                "read packet: $c#63",
1489                # Match output line that prints the memory address of the function call entry point.
1490                # Note we require launch-only testing so we can get inferior otuput.
1491                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1492                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1493                # Now stop the inferior.
1494                "read packet: {}".format(chr(3)),
1495                # And wait for the stop notification.
1496                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1497            True)
1498
1499        # Run the packet stream.
1500        context = self.expect_gdbremote_sequence()
1501        self.assertIsNotNone(context)
1502
1503        # Grab the main thread id.
1504        self.assertIsNotNone(context.get("stop_thread_id"))
1505        main_thread_id = int(context.get("stop_thread_id"), 16)
1506
1507        # Grab the function address.
1508        self.assertIsNotNone(context.get("function_address"))
1509        function_address = int(context.get("function_address"), 16)
1510
1511        # Grab the data addresses.
1512        self.assertIsNotNone(context.get("g_c1_address"))
1513        g_c1_address = int(context.get("g_c1_address"), 16)
1514
1515        self.assertIsNotNone(context.get("g_c2_address"))
1516        g_c2_address = int(context.get("g_c2_address"), 16)
1517
1518        # Set a breakpoint at the given address.
1519        if self.getArchitecture().startswith("arm"):
1520            # TODO: Handle case when setting breakpoint in thumb code
1521            BREAKPOINT_KIND = 4
1522        else:
1523            BREAKPOINT_KIND = 1
1524        self.reset_test_sequence()
1525        self.add_set_breakpoint_packets(
1526            function_address,
1527            do_continue=True,
1528            breakpoint_kind=BREAKPOINT_KIND)
1529        context = self.expect_gdbremote_sequence()
1530        self.assertIsNotNone(context)
1531
1532        # Remove the breakpoint.
1533        self.reset_test_sequence()
1534        self.add_remove_breakpoint_packets(
1535            function_address, breakpoint_kind=BREAKPOINT_KIND)
1536        context = self.expect_gdbremote_sequence()
1537        self.assertIsNotNone(context)
1538
1539        # Verify g_c1 and g_c2 match expected initial state.
1540        args = {}
1541        args["g_c1_address"] = g_c1_address
1542        args["g_c2_address"] = g_c2_address
1543        args["expected_g_c1"] = "0"
1544        args["expected_g_c2"] = "1"
1545
1546        self.assertTrue(self.g_c1_c2_contents_are(args))
1547
1548        # Verify we take only a small number of steps to hit the first state.
1549        # Might need to work through function entry prologue code.
1550        args["expected_g_c1"] = "1"
1551        args["expected_g_c2"] = "1"
1552        (state_reached,
1553         step_count) = self.count_single_steps_until_true(main_thread_id,
1554                                                          self.g_c1_c2_contents_are,
1555                                                          args,
1556                                                          max_step_count=25,
1557                                                          use_Hc_packet=use_Hc_packet,
1558                                                          step_instruction=step_instruction)
1559        self.assertTrue(state_reached)
1560
1561        # Verify we hit the next state.
1562        args["expected_g_c1"] = "1"
1563        args["expected_g_c2"] = "0"
1564        (state_reached,
1565         step_count) = self.count_single_steps_until_true(main_thread_id,
1566                                                          self.g_c1_c2_contents_are,
1567                                                          args,
1568                                                          max_step_count=5,
1569                                                          use_Hc_packet=use_Hc_packet,
1570                                                          step_instruction=step_instruction)
1571        self.assertTrue(state_reached)
1572        expected_step_count = 1
1573        arch = self.getArchitecture()
1574
1575        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1576        # of variable value
1577        if re.match("mips", arch):
1578            expected_step_count = 3
1579        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1580        # variable value
1581        if re.match("s390x", arch):
1582            expected_step_count = 2
1583        # ARM64 requires "4" instructions: 2 to compute the address (adrp, add),
1584        # one to materialize the constant (mov) and the store
1585        if re.match("arm64", arch):
1586            expected_step_count = 4
1587
1588        self.assertEqual(step_count, expected_step_count)
1589
1590        # ARM64: Once addresses and constants are materialized, only one
1591        # instruction is needed.
1592        if re.match("arm64", arch):
1593            expected_step_count = 1
1594
1595        # Verify we hit the next state.
1596        args["expected_g_c1"] = "0"
1597        args["expected_g_c2"] = "0"
1598        (state_reached,
1599         step_count) = self.count_single_steps_until_true(main_thread_id,
1600                                                          self.g_c1_c2_contents_are,
1601                                                          args,
1602                                                          max_step_count=5,
1603                                                          use_Hc_packet=use_Hc_packet,
1604                                                          step_instruction=step_instruction)
1605        self.assertTrue(state_reached)
1606        self.assertEqual(step_count, expected_step_count)
1607
1608        # Verify we hit the next state.
1609        args["expected_g_c1"] = "0"
1610        args["expected_g_c2"] = "1"
1611        (state_reached,
1612         step_count) = self.count_single_steps_until_true(main_thread_id,
1613                                                          self.g_c1_c2_contents_are,
1614                                                          args,
1615                                                          max_step_count=5,
1616                                                          use_Hc_packet=use_Hc_packet,
1617                                                          step_instruction=step_instruction)
1618        self.assertTrue(state_reached)
1619        self.assertEqual(step_count, expected_step_count)
1620
1621    def maybe_strict_output_regex(self, regex):
1622        return '.*' + regex + \
1623            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1624
1625    def install_and_create_launch_args(self):
1626        exe_path = self.getBuildArtifact("a.out")
1627        if not lldb.remote_platform:
1628            return [exe_path]
1629        remote_path = lldbutil.append_to_process_working_directory(self,
1630            os.path.basename(exe_path))
1631        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1632        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1633                                           remote_file_spec)
1634        if err.Fail():
1635            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1636                            (exe_path, remote_path, err))
1637        return [remote_path]
1638