1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62@add_metaclass(GdbRemoteTestCaseFactory)
63class GdbRemoteTestCaseBase(Base):
64
65    # Default time out in seconds. The timeout is increased tenfold under Asan.
66    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
67    # Default sleep time in seconds. The sleep time is doubled under Asan.
68    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
69
70    _GDBREMOTE_KILL_PACKET = b"$k#6b"
71
72    # Start the inferior separately, attach to the inferior on the stub
73    # command line.
74    _STARTUP_ATTACH = "attach"
75    # Start the inferior separately, start the stub without attaching, allow
76    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
77    _STARTUP_ATTACH_MANUALLY = "attach_manually"
78    # Start the stub, and launch the inferior with an $A packet via the
79    # initial packet stream.
80    _STARTUP_LAUNCH = "launch"
81
82    # GDB Signal numbers that are not target-specific used for common
83    # exceptions
84    TARGET_EXC_BAD_ACCESS = 0x91
85    TARGET_EXC_BAD_INSTRUCTION = 0x92
86    TARGET_EXC_ARITHMETIC = 0x93
87    TARGET_EXC_EMULATION = 0x94
88    TARGET_EXC_SOFTWARE = 0x95
89    TARGET_EXC_BREAKPOINT = 0x96
90
91    _verbose_log_handler = None
92    _log_formatter = logging.Formatter(
93        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
94
95    def setUpBaseLogging(self):
96        self.logger = logging.getLogger(__name__)
97
98        if len(self.logger.handlers) > 0:
99            return  # We have set up this handler already
100
101        self.logger.propagate = False
102        self.logger.setLevel(logging.DEBUG)
103
104        # log all warnings to stderr
105        handler = logging.StreamHandler()
106        handler.setLevel(logging.WARNING)
107        handler.setFormatter(self._log_formatter)
108        self.logger.addHandler(handler)
109
110    def isVerboseLoggingRequested(self):
111        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
112        # logged.
113        return any(("gdb-remote" in channel)
114                   for channel in lldbtest_config.channels)
115
116    def getDebugServer(self):
117        method = getattr(self, self.testMethodName)
118        return getattr(method, "debug_server", None)
119
120    def setUp(self):
121        super(GdbRemoteTestCaseBase, self).setUp()
122
123        self.setUpBaseLogging()
124        self.debug_monitor_extra_args = []
125
126        if self.isVerboseLoggingRequested():
127            # If requested, full logs go to a log file
128            self._verbose_log_handler = logging.FileHandler(
129                self.getLogBasenameForCurrentTest() + "-host.log")
130            self._verbose_log_handler.setFormatter(self._log_formatter)
131            self._verbose_log_handler.setLevel(logging.DEBUG)
132            self.logger.addHandler(self._verbose_log_handler)
133
134        self.test_sequence = GdbRemoteTestSequence(self.logger)
135        self.set_inferior_startup_launch()
136        self.port = self.get_next_port()
137        self.stub_sends_two_stop_notifications_on_kill = False
138        if configuration.lldb_platform_url:
139            if configuration.lldb_platform_url.startswith('unix-'):
140                url_pattern = '(.+)://\[?(.+?)\]?/.*'
141            else:
142                url_pattern = '(.+)://(.+):\d+'
143            scheme, host = re.match(
144                url_pattern, configuration.lldb_platform_url).groups()
145            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
146                self.stub_device = host
147                self.stub_hostname = 'localhost'
148            else:
149                self.stub_device = None
150                self.stub_hostname = host
151        else:
152            self.stub_hostname = "localhost"
153
154        debug_server = self.getDebugServer()
155        if debug_server == "debugserver":
156            self._init_debugserver_test()
157        else:
158            self._init_llgs_test()
159
160    def tearDown(self):
161        self.logger.removeHandler(self._verbose_log_handler)
162        self._verbose_log_handler = None
163        TestBase.tearDown(self)
164
165    def build(self, *args, **kwargs):
166        self.buildDefault(*args, **kwargs)
167
168    def getLocalServerLogFile(self):
169        return self.getLogBasenameForCurrentTest() + "-server.log"
170
171    def setUpServerLogging(self, is_llgs):
172        if len(lldbtest_config.channels) == 0:
173            return  # No logging requested
174
175        if lldb.remote_platform:
176            log_file = lldbutil.join_remote_paths(
177                lldb.remote_platform.GetWorkingDirectory(), "server.log")
178        else:
179            log_file = self.getLocalServerLogFile()
180
181        if is_llgs:
182            self.debug_monitor_extra_args.append("--log-file=" + log_file)
183            self.debug_monitor_extra_args.append(
184                "--log-channels={}".format(":".join(lldbtest_config.channels)))
185        else:
186            self.debug_monitor_extra_args = [
187                "--log-file=" + log_file, "--log-flags=0x800000"]
188
189    def get_next_port(self):
190        return 12000 + random.randint(0, 3999)
191
192    def reset_test_sequence(self):
193        self.test_sequence = GdbRemoteTestSequence(self.logger)
194
195
196    def _init_llgs_test(self):
197        reverse_connect = True
198        if lldb.remote_platform:
199            # Reverse connections may be tricky due to firewalls/NATs.
200            reverse_connect = False
201
202            # FIXME: This is extremely linux-oriented
203
204            # Grab the ppid from /proc/[shell pid]/stat
205            err, retcode, shell_stat = self.run_platform_command(
206                "cat /proc/$$/stat")
207            self.assertTrue(
208                err.Success() and retcode == 0,
209                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
210                (err.GetCString(),
211                 retcode))
212
213            # [pid] ([executable]) [state] [*ppid*]
214            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
215            err, retcode, ls_output = self.run_platform_command(
216                "ls -l /proc/%s/exe" % pid)
217            self.assertTrue(
218                err.Success() and retcode == 0,
219                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
220                (pid,
221                 err.GetCString(),
222                 retcode))
223            exe = ls_output.split()[-1]
224
225            # If the binary has been deleted, the link name has " (deleted)" appended.
226            # Remove if it's there.
227            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
228        else:
229            self.debug_monitor_exe = get_lldb_server_exe()
230
231        self.debug_monitor_extra_args = ["gdbserver"]
232        self.setUpServerLogging(is_llgs=True)
233
234        self.reverse_connect = reverse_connect
235
236    def _init_debugserver_test(self):
237        self.debug_monitor_exe = get_debugserver_exe()
238        self.setUpServerLogging(is_llgs=False)
239        self.reverse_connect = True
240
241        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
242        # when the process truly dies.
243        self.stub_sends_two_stop_notifications_on_kill = True
244
245    def forward_adb_port(self, source, target, direction, device):
246        adb = ['adb'] + (['-s', device] if device else []) + [direction]
247
248        def remove_port_forward():
249            subprocess.call(adb + ["--remove", "tcp:%d" % source])
250
251        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
252        self.addTearDownHook(remove_port_forward)
253
254    def _verify_socket(self, sock):
255        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
256        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
257        # the connect() will always be successful, but the connection will be immediately dropped
258        # if ADB could not connect on the remote side. This function tries to detect this
259        # situation, and report it as "connection refused" so that the upper layers attempt the
260        # connection again.
261        triple = self.dbg.GetSelectedPlatform().GetTriple()
262        if not re.match(".*-.*-.*-android", triple):
263            return  # Not android.
264        can_read, _, _ = select.select([sock], [], [], 0.1)
265        if sock not in can_read:
266            return  # Data is not available, but the connection is alive.
267        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
268            raise _ConnectionRefused()  # Got EOF, connection dropped.
269
270    def create_socket(self):
271        try:
272            sock = socket.socket(family=socket.AF_INET)
273        except OSError as e:
274            if e.errno != errno.EAFNOSUPPORT:
275                raise
276            sock = socket.socket(family=socket.AF_INET6)
277
278        logger = self.logger
279
280        triple = self.dbg.GetSelectedPlatform().GetTriple()
281        if re.match(".*-.*-.*-android", triple):
282            self.forward_adb_port(
283                self.port,
284                self.port,
285                "forward",
286                self.stub_device)
287
288        logger.info(
289            "Connecting to debug monitor on %s:%d",
290            self.stub_hostname,
291            self.port)
292        connect_info = (self.stub_hostname, self.port)
293        try:
294            sock.connect(connect_info)
295        except socket.error as serr:
296            if serr.errno == errno.ECONNREFUSED:
297                raise _ConnectionRefused()
298            raise serr
299
300        def shutdown_socket():
301            if sock:
302                try:
303                    # send the kill packet so lldb-server shuts down gracefully
304                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
305                except:
306                    logger.warning(
307                        "failed to send kill packet to debug monitor: {}; ignoring".format(
308                            sys.exc_info()[0]))
309
310                try:
311                    sock.close()
312                except:
313                    logger.warning(
314                        "failed to close socket to debug monitor: {}; ignoring".format(
315                            sys.exc_info()[0]))
316
317        self.addTearDownHook(shutdown_socket)
318
319        self._verify_socket(sock)
320
321        return sock
322
323    def set_inferior_startup_launch(self):
324        self._inferior_startup = self._STARTUP_LAUNCH
325
326    def set_inferior_startup_attach(self):
327        self._inferior_startup = self._STARTUP_ATTACH
328
329    def set_inferior_startup_attach_manually(self):
330        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
331
332    def get_debug_monitor_command_line_args(self, attach_pid=None):
333        commandline_args = self.debug_monitor_extra_args
334        if attach_pid:
335            commandline_args += ["--attach=%d" % attach_pid]
336        if self.reverse_connect:
337            commandline_args += ["--reverse-connect", self.connect_address]
338        else:
339            if lldb.remote_platform:
340                commandline_args += ["*:{}".format(self.port)]
341            else:
342                commandline_args += ["localhost:{}".format(self.port)]
343
344        return commandline_args
345
346    def get_target_byte_order(self):
347        inferior_exe_path = self.getBuildArtifact("a.out")
348        target = self.dbg.CreateTarget(inferior_exe_path)
349        return target.GetByteOrder()
350
351    def launch_debug_monitor(self, attach_pid=None, logfile=None):
352        if self.reverse_connect:
353            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
354            sock = socket.socket(family, type, proto)
355            sock.settimeout(self.DEFAULT_TIMEOUT)
356
357            sock.bind(addr)
358            sock.listen(1)
359            addr = sock.getsockname()
360            self.connect_address = "[{}]:{}".format(*addr)
361
362
363        # Create the command line.
364        commandline_args = self.get_debug_monitor_command_line_args(
365            attach_pid=attach_pid)
366
367        # Start the server.
368        server = self.spawnSubprocess(
369            self.debug_monitor_exe,
370            commandline_args,
371            install_remote=False)
372        self.assertIsNotNone(server)
373
374        if self.reverse_connect:
375            self.sock = sock.accept()[0]
376            self.sock.settimeout(self.DEFAULT_TIMEOUT)
377
378        return server
379
380    def connect_to_debug_monitor(self, attach_pid=None):
381        if self.reverse_connect:
382            # Create the stub.
383            server = self.launch_debug_monitor(attach_pid=attach_pid)
384            self.assertIsNotNone(server)
385
386            # Schedule debug monitor to be shut down during teardown.
387            logger = self.logger
388
389            self._server = Server(self.sock, server)
390            return server
391
392        # We're using a random port algorithm to try not to collide with other ports,
393        # and retry a max # times.
394        attempts = 0
395        MAX_ATTEMPTS = 20
396
397        while attempts < MAX_ATTEMPTS:
398            server = self.launch_debug_monitor(attach_pid=attach_pid)
399
400            # Schedule debug monitor to be shut down during teardown.
401            logger = self.logger
402
403            connect_attemps = 0
404            MAX_CONNECT_ATTEMPTS = 10
405
406            while connect_attemps < MAX_CONNECT_ATTEMPTS:
407                # Create a socket to talk to the server
408                try:
409                    logger.info("Connect attempt %d", connect_attemps + 1)
410                    self.sock = self.create_socket()
411                    self._server = Server(self.sock, server)
412                    return server
413                except _ConnectionRefused as serr:
414                    # Ignore, and try again.
415                    pass
416                time.sleep(0.5)
417                connect_attemps += 1
418
419            # We should close the server here to be safe.
420            server.terminate()
421
422            # Increment attempts.
423            print(
424                "connect to debug monitor on port %d failed, attempt #%d of %d" %
425                (self.port, attempts + 1, MAX_ATTEMPTS))
426            attempts += 1
427
428            # And wait a random length of time before next attempt, to avoid
429            # collisions.
430            time.sleep(random.randint(1, 5))
431
432            # Now grab a new port number.
433            self.port = self.get_next_port()
434
435        raise Exception(
436            "failed to create a socket to the launched debug monitor after %d tries" %
437            attempts)
438
439    def launch_process_for_attach(
440            self,
441            inferior_args=None,
442            sleep_seconds=3,
443            exe_path=None):
444        # We're going to start a child process that the debug monitor stub can later attach to.
445        # This process needs to be started so that it just hangs around for a while.  We'll
446        # have it sleep.
447        if not exe_path:
448            exe_path = self.getBuildArtifact("a.out")
449
450        args = []
451        if inferior_args:
452            args.extend(inferior_args)
453        if sleep_seconds:
454            args.append("sleep:%d" % sleep_seconds)
455
456        return self.spawnSubprocess(exe_path, args)
457
458    def prep_debug_monitor_and_inferior(
459            self,
460            inferior_args=None,
461            inferior_sleep_seconds=3,
462            inferior_exe_path=None,
463            inferior_env=None):
464        """Prep the debug monitor, the inferior, and the expected packet stream.
465
466        Handle the separate cases of using the debug monitor in attach-to-inferior mode
467        and in launch-inferior mode.
468
469        For attach-to-inferior mode, the inferior process is first started, then
470        the debug monitor is started in attach to pid mode (using --attach on the
471        stub command line), and the no-ack-mode setup is appended to the packet
472        stream.  The packet stream is not yet executed, ready to have more expected
473        packet entries added to it.
474
475        For launch-inferior mode, the stub is first started, then no ack mode is
476        setup on the expected packet stream, then the verified launch packets are added
477        to the expected socket stream.  The packet stream is not yet executed, ready
478        to have more expected packet entries added to it.
479
480        The return value is:
481        {inferior:<inferior>, server:<server>}
482        """
483        inferior = None
484        attach_pid = None
485
486        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
487            # Launch the process that we'll use as the inferior.
488            inferior = self.launch_process_for_attach(
489                inferior_args=inferior_args,
490                sleep_seconds=inferior_sleep_seconds,
491                exe_path=inferior_exe_path)
492            self.assertIsNotNone(inferior)
493            self.assertTrue(inferior.pid > 0)
494            if self._inferior_startup == self._STARTUP_ATTACH:
495                # In this case, we want the stub to attach via the command
496                # line, so set the command line attach pid here.
497                attach_pid = inferior.pid
498
499        if self._inferior_startup == self._STARTUP_LAUNCH:
500            # Build launch args
501            if not inferior_exe_path:
502                inferior_exe_path = self.getBuildArtifact("a.out")
503
504            if lldb.remote_platform:
505                remote_path = lldbutil.append_to_process_working_directory(self,
506                    os.path.basename(inferior_exe_path))
507                remote_file_spec = lldb.SBFileSpec(remote_path, False)
508                err = lldb.remote_platform.Install(lldb.SBFileSpec(
509                    inferior_exe_path, True), remote_file_spec)
510                if err.Fail():
511                    raise Exception(
512                        "remote_platform.Install('%s', '%s') failed: %s" %
513                        (inferior_exe_path, remote_path, err))
514                inferior_exe_path = remote_path
515
516            launch_args = [inferior_exe_path]
517            if inferior_args:
518                launch_args.extend(inferior_args)
519
520        # Launch the debug monitor stub, attaching to the inferior.
521        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
522        self.assertIsNotNone(server)
523
524        self.do_handshake()
525
526        # Build the expected protocol stream
527        if inferior_env:
528            for name, value in inferior_env.items():
529                self.add_set_environment_packets(name, value)
530        if self._inferior_startup == self._STARTUP_LAUNCH:
531            self.add_verified_launch_packets(launch_args)
532
533        return {"inferior": inferior, "server": server}
534
535    def do_handshake(self):
536        server = self._server
537        server.send_ack()
538        server.send_packet(b"QStartNoAckMode")
539        self.assertEqual(server.get_normal_packet(), b"+")
540        self.assertEqual(server.get_normal_packet(), b"OK")
541        server.send_ack()
542
543    def add_verified_launch_packets(self, launch_args):
544        self.test_sequence.add_log_lines(
545            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
546             "send packet: $OK#00",
547             "read packet: $qLaunchSuccess#a5",
548             "send packet: $OK#00"],
549            True)
550
551    def add_thread_suffix_request_packets(self):
552        self.test_sequence.add_log_lines(
553            ["read packet: $QThreadSuffixSupported#e4",
554             "send packet: $OK#00",
555             ], True)
556
557    def add_process_info_collection_packets(self):
558        self.test_sequence.add_log_lines(
559            ["read packet: $qProcessInfo#dc",
560             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
561            True)
562
563    def add_set_environment_packets(self, name, value):
564        self.test_sequence.add_log_lines(
565            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
566             "send packet: $OK#00",
567             ], True)
568
569    _KNOWN_PROCESS_INFO_KEYS = [
570        "pid",
571        "parent-pid",
572        "real-uid",
573        "real-gid",
574        "effective-uid",
575        "effective-gid",
576        "cputype",
577        "cpusubtype",
578        "ostype",
579        "triple",
580        "vendor",
581        "endian",
582        "elf_abi",
583        "ptrsize"
584    ]
585
586    def parse_process_info_response(self, context):
587        # Ensure we have a process info response.
588        self.assertIsNotNone(context)
589        process_info_raw = context.get("process_info_raw")
590        self.assertIsNotNone(process_info_raw)
591
592        # Pull out key:value; pairs.
593        process_info_dict = {
594            match.group(1): match.group(2) for match in re.finditer(
595                r"([^:]+):([^;]+);", process_info_raw)}
596
597        # Validate keys are known.
598        for (key, val) in list(process_info_dict.items()):
599            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
600            self.assertIsNotNone(val)
601
602        return process_info_dict
603
604    def add_register_info_collection_packets(self):
605        self.test_sequence.add_log_lines(
606            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
607                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
608                "save_key": "reg_info_responses"}],
609            True)
610
611    def parse_register_info_packets(self, context):
612        """Return an array of register info dictionaries, one per register info."""
613        reg_info_responses = context.get("reg_info_responses")
614        self.assertIsNotNone(reg_info_responses)
615
616        # Parse register infos.
617        return [parse_reg_info_response(reg_info_response)
618                for reg_info_response in reg_info_responses]
619
620    def expect_gdbremote_sequence(self):
621        return expect_lldb_gdbserver_replay(
622            self,
623            self._server,
624            self.test_sequence,
625            self.DEFAULT_TIMEOUT * len(self.test_sequence),
626            self.logger)
627
628    _KNOWN_REGINFO_KEYS = [
629        "name",
630        "alt-name",
631        "bitsize",
632        "offset",
633        "encoding",
634        "format",
635        "set",
636        "gcc",
637        "ehframe",
638        "dwarf",
639        "generic",
640        "container-regs",
641        "invalidate-regs",
642        "dynamic_size_dwarf_expr_bytes",
643        "dynamic_size_dwarf_len"
644    ]
645
646    def assert_valid_reg_info(self, reg_info):
647        # Assert we know about all the reginfo keys parsed.
648        for key in reg_info:
649            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
650
651        # Check the bare-minimum expected set of register info keys.
652        self.assertTrue("name" in reg_info)
653        self.assertTrue("bitsize" in reg_info)
654
655        if not self.getArchitecture() == 'aarch64':
656            self.assertTrue("offset" in reg_info)
657
658        self.assertTrue("encoding" in reg_info)
659        self.assertTrue("format" in reg_info)
660
661    def find_pc_reg_info(self, reg_infos):
662        lldb_reg_index = 0
663        for reg_info in reg_infos:
664            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
665                return (lldb_reg_index, reg_info)
666            lldb_reg_index += 1
667
668        return (None, None)
669
670    def add_lldb_register_index(self, reg_infos):
671        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
672
673        We'll use this when we want to call packets like P/p with a register index but do so
674        on only a subset of the full register info set.
675        """
676        self.assertIsNotNone(reg_infos)
677
678        reg_index = 0
679        for reg_info in reg_infos:
680            reg_info["lldb_register_index"] = reg_index
681            reg_index += 1
682
683    def add_query_memory_region_packets(self, address):
684        self.test_sequence.add_log_lines(
685            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
686             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
687            True)
688
689    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
690        self.assertIsNotNone(key_val_text)
691        kv_dict = {}
692        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
693            key = match.group(1)
694            val = match.group(2)
695            if key in kv_dict:
696                if allow_dupes:
697                    if isinstance(kv_dict[key], list):
698                        kv_dict[key].append(val)
699                    else:
700                        # Promote to list
701                        kv_dict[key] = [kv_dict[key], val]
702                else:
703                    self.fail(
704                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
705                            key, val, key_val_text, kv_dict))
706            else:
707                kv_dict[key] = val
708        return kv_dict
709
710    def parse_memory_region_packet(self, context):
711        # Ensure we have a context.
712        self.assertIsNotNone(context.get("memory_region_response"))
713
714        # Pull out key:value; pairs.
715        mem_region_dict = self.parse_key_val_dict(
716            context.get("memory_region_response"))
717
718        # Validate keys are known.
719        for (key, val) in list(mem_region_dict.items()):
720            self.assertIn(key,
721                ["start",
722                 "size",
723                 "permissions",
724                 "flags",
725                 "name",
726                 "error",
727                 "dirty-pages",
728                 "type"])
729            self.assertIsNotNone(val)
730
731        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
732        # Return the dictionary of key-value pairs for the memory region.
733        return mem_region_dict
734
735    def assert_address_within_memory_region(
736            self, test_address, mem_region_dict):
737        self.assertIsNotNone(mem_region_dict)
738        self.assertTrue("start" in mem_region_dict)
739        self.assertTrue("size" in mem_region_dict)
740
741        range_start = int(mem_region_dict["start"], 16)
742        range_size = int(mem_region_dict["size"], 16)
743        range_end = range_start + range_size
744
745        if test_address < range_start:
746            self.fail(
747                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
748                    test_address,
749                    range_start,
750                    range_end,
751                    range_size))
752        elif test_address >= range_end:
753            self.fail(
754                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
755                    test_address,
756                    range_start,
757                    range_end,
758                    range_size))
759
760    def add_threadinfo_collection_packets(self):
761        self.test_sequence.add_log_lines(
762            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
763                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
764                "save_key": "threadinfo_responses"}],
765            True)
766
767    def parse_threadinfo_packets(self, context):
768        """Return an array of thread ids (decimal ints), one per thread."""
769        threadinfo_responses = context.get("threadinfo_responses")
770        self.assertIsNotNone(threadinfo_responses)
771
772        thread_ids = []
773        for threadinfo_response in threadinfo_responses:
774            new_thread_infos = parse_threadinfo_response(threadinfo_response)
775            thread_ids.extend(new_thread_infos)
776        return thread_ids
777
778    def wait_for_thread_count(self, thread_count):
779        start_time = time.time()
780        timeout_time = start_time + self.DEFAULT_TIMEOUT
781
782        actual_thread_count = 0
783        while actual_thread_count < thread_count:
784            self.reset_test_sequence()
785            self.add_threadinfo_collection_packets()
786
787            context = self.expect_gdbremote_sequence()
788            self.assertIsNotNone(context)
789
790            threads = self.parse_threadinfo_packets(context)
791            self.assertIsNotNone(threads)
792
793            actual_thread_count = len(threads)
794
795            if time.time() > timeout_time:
796                raise Exception(
797                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
798                        self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
799
800        return threads
801
802    def add_set_breakpoint_packets(
803            self,
804            address,
805            z_packet_type=0,
806            do_continue=True,
807            breakpoint_kind=1):
808        self.test_sequence.add_log_lines(
809            [  # Set the breakpoint.
810                "read packet: $Z{2},{0:x},{1}#00".format(
811                    address, breakpoint_kind, z_packet_type),
812                # Verify the stub could set it.
813                "send packet: $OK#00",
814            ], True)
815
816        if (do_continue):
817            self.test_sequence.add_log_lines(
818                [  # Continue the inferior.
819                    "read packet: $c#63",
820                    # Expect a breakpoint stop report.
821                    {"direction": "send",
822                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
823                     "capture": {1: "stop_signo",
824                                 2: "stop_thread_id"}},
825                ], True)
826
827    def add_remove_breakpoint_packets(
828            self,
829            address,
830            z_packet_type=0,
831            breakpoint_kind=1):
832        self.test_sequence.add_log_lines(
833            [  # Remove the breakpoint.
834                "read packet: $z{2},{0:x},{1}#00".format(
835                    address, breakpoint_kind, z_packet_type),
836                # Verify the stub could unset it.
837                "send packet: $OK#00",
838            ], True)
839
840    def add_qSupported_packets(self, client_features=[]):
841        features = ''.join(';' + x for x in client_features)
842        self.test_sequence.add_log_lines(
843            ["read packet: $qSupported{}#00".format(features),
844             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
845             ], True)
846
847    _KNOWN_QSUPPORTED_STUB_FEATURES = [
848        "augmented-libraries-svr4-read",
849        "PacketSize",
850        "QStartNoAckMode",
851        "QThreadSuffixSupported",
852        "QListThreadsInStopReply",
853        "qXfer:auxv:read",
854        "qXfer:libraries:read",
855        "qXfer:libraries-svr4:read",
856        "qXfer:features:read",
857        "qEcho",
858        "QPassSignals",
859        "multiprocess",
860        "fork-events",
861        "vfork-events",
862        "memory-tagging",
863        "qSaveCore",
864    ]
865
866    def parse_qSupported_response(self, context):
867        self.assertIsNotNone(context)
868
869        raw_response = context.get("qSupported_response")
870        self.assertIsNotNone(raw_response)
871
872        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
873        # +,-,? is stripped from the key and set as the value.
874        supported_dict = {}
875        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
876            key = match.group(1)
877            val = match.group(3)
878
879            # key=val: store as is
880            if val and len(val) > 0:
881                supported_dict[key] = val
882            else:
883                if len(key) < 2:
884                    raise Exception(
885                        "singular stub feature is too short: must be stub_feature{+,-,?}")
886                supported_type = key[-1]
887                key = key[:-1]
888                if not supported_type in ["+", "-", "?"]:
889                    raise Exception(
890                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
891                supported_dict[key] = supported_type
892            # Ensure we know the supported element
893            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
894                raise Exception(
895                    "unknown qSupported stub feature reported: %s" %
896                    key)
897
898        return supported_dict
899
900    def run_process_then_stop(self, run_seconds=1):
901        # Tell the stub to continue.
902        self.test_sequence.add_log_lines(
903            ["read packet: $vCont;c#a8"],
904            True)
905        context = self.expect_gdbremote_sequence()
906
907        # Wait for run_seconds.
908        time.sleep(run_seconds)
909
910        # Send an interrupt, capture a T response.
911        self.reset_test_sequence()
912        self.test_sequence.add_log_lines(
913            ["read packet: {}".format(chr(3)),
914             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
915            True)
916        context = self.expect_gdbremote_sequence()
917        self.assertIsNotNone(context)
918        self.assertIsNotNone(context.get("stop_result"))
919
920        return context
921
922    def continue_process_and_wait_for_stop(self):
923        self.test_sequence.add_log_lines(
924            [
925                "read packet: $vCont;c#a8",
926                {
927                    "direction": "send",
928                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
929                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
930                },
931            ],
932            True,
933        )
934        context = self.expect_gdbremote_sequence()
935        self.assertIsNotNone(context)
936        return self.parse_interrupt_packets(context)
937
938    def select_modifiable_register(self, reg_infos):
939        """Find a register that can be read/written freely."""
940        PREFERRED_REGISTER_NAMES = set(["rax", ])
941
942        # First check for the first register from the preferred register name
943        # set.
944        alternative_register_index = None
945
946        self.assertIsNotNone(reg_infos)
947        for reg_info in reg_infos:
948            if ("name" in reg_info) and (
949                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
950                # We found a preferred register.  Use it.
951                return reg_info["lldb_register_index"]
952            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
953                    reg_info["generic"] == "arg1"):
954                # A frame pointer or first arg register will do as a
955                # register to modify temporarily.
956                alternative_register_index = reg_info["lldb_register_index"]
957
958        # We didn't find a preferred register.  Return whatever alternative register
959        # we found, if any.
960        return alternative_register_index
961
962    def extract_registers_from_stop_notification(self, stop_key_vals_text):
963        self.assertIsNotNone(stop_key_vals_text)
964        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
965
966        registers = {}
967        for (key, val) in list(kv_dict.items()):
968            if re.match(r"^[0-9a-fA-F]+$", key):
969                registers[int(key, 16)] = val
970        return registers
971
972    def gather_register_infos(self):
973        self.reset_test_sequence()
974        self.add_register_info_collection_packets()
975
976        context = self.expect_gdbremote_sequence()
977        self.assertIsNotNone(context)
978
979        reg_infos = self.parse_register_info_packets(context)
980        self.assertIsNotNone(reg_infos)
981        self.add_lldb_register_index(reg_infos)
982
983        return reg_infos
984
985    def find_generic_register_with_name(self, reg_infos, generic_name):
986        self.assertIsNotNone(reg_infos)
987        for reg_info in reg_infos:
988            if ("generic" in reg_info) and (
989                    reg_info["generic"] == generic_name):
990                return reg_info
991        return None
992
993    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
994        self.assertIsNotNone(reg_infos)
995        for reg_info in reg_infos:
996            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
997                return reg_info
998        return None
999
1000    def decode_gdbremote_binary(self, encoded_bytes):
1001        decoded_bytes = ""
1002        i = 0
1003        while i < len(encoded_bytes):
1004            if encoded_bytes[i] == "}":
1005                # Handle escaped char.
1006                self.assertTrue(i + 1 < len(encoded_bytes))
1007                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1008                i += 2
1009            elif encoded_bytes[i] == "*":
1010                # Handle run length encoding.
1011                self.assertTrue(len(decoded_bytes) > 0)
1012                self.assertTrue(i + 1 < len(encoded_bytes))
1013                repeat_count = ord(encoded_bytes[i + 1]) - 29
1014                decoded_bytes += decoded_bytes[-1] * repeat_count
1015                i += 2
1016            else:
1017                decoded_bytes += encoded_bytes[i]
1018                i += 1
1019        return decoded_bytes
1020
1021    def build_auxv_dict(self, endian, word_size, auxv_data):
1022        self.assertIsNotNone(endian)
1023        self.assertIsNotNone(word_size)
1024        self.assertIsNotNone(auxv_data)
1025
1026        auxv_dict = {}
1027
1028        # PowerPC64le's auxvec has a special key that must be ignored.
1029        # This special key may be used multiple times, resulting in
1030        # multiple key/value pairs with the same key, which would otherwise
1031        # break this test check for repeated keys.
1032        #
1033        # AT_IGNOREPPC = 22
1034        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1035        arch = self.getArchitecture()
1036        ignore_keys = None
1037        if arch in ignored_keys_for_arch:
1038            ignore_keys = ignored_keys_for_arch[arch]
1039
1040        while len(auxv_data) > 0:
1041            # Chop off key.
1042            raw_key = auxv_data[:word_size]
1043            auxv_data = auxv_data[word_size:]
1044
1045            # Chop of value.
1046            raw_value = auxv_data[:word_size]
1047            auxv_data = auxv_data[word_size:]
1048
1049            # Convert raw text from target endian.
1050            key = unpack_endian_binary_string(endian, raw_key)
1051            value = unpack_endian_binary_string(endian, raw_value)
1052
1053            if ignore_keys and key in ignore_keys:
1054                continue
1055
1056            # Handle ending entry.
1057            if key == 0:
1058                self.assertEqual(value, 0)
1059                return auxv_dict
1060
1061            # The key should not already be present.
1062            self.assertFalse(key in auxv_dict)
1063            auxv_dict[key] = value
1064
1065        self.fail(
1066            "should not reach here - implies required double zero entry not found")
1067        return auxv_dict
1068
1069    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1070        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1071        offset = 0
1072        done = False
1073        decoded_data = ""
1074
1075        while not done:
1076            # Grab the next iteration of data.
1077            self.reset_test_sequence()
1078            self.test_sequence.add_log_lines(
1079                [
1080                    "read packet: ${}{:x},{:x}:#00".format(
1081                        command_prefix,
1082                        offset,
1083                        chunk_length),
1084                    {
1085                        "direction": "send",
1086                        "regex": re.compile(
1087                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1088                            re.MULTILINE | re.DOTALL),
1089                        "capture": {
1090                            1: "response_type",
1091                            2: "content_raw"}}],
1092                True)
1093
1094            context = self.expect_gdbremote_sequence()
1095            self.assertIsNotNone(context)
1096
1097            response_type = context.get("response_type")
1098            self.assertIsNotNone(response_type)
1099            self.assertTrue(response_type in ["l", "m"])
1100
1101            # Move offset along.
1102            offset += chunk_length
1103
1104            # Figure out if we're done.  We're done if the response type is l.
1105            done = response_type == "l"
1106
1107            # Decode binary data.
1108            content_raw = context.get("content_raw")
1109            if content_raw and len(content_raw) > 0:
1110                self.assertIsNotNone(content_raw)
1111                decoded_data += self.decode_gdbremote_binary(content_raw)
1112        return decoded_data
1113
1114    def add_interrupt_packets(self):
1115        self.test_sequence.add_log_lines([
1116            # Send the intterupt.
1117            "read packet: {}".format(chr(3)),
1118            # And wait for the stop notification.
1119            {"direction": "send",
1120             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1121             "capture": {1: "stop_signo",
1122                         2: "stop_key_val_text"}},
1123        ], True)
1124
1125    def parse_interrupt_packets(self, context):
1126        self.assertIsNotNone(context.get("stop_signo"))
1127        self.assertIsNotNone(context.get("stop_key_val_text"))
1128        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1129            context["stop_key_val_text"]))
1130
1131    def add_QSaveRegisterState_packets(self, thread_id):
1132        if thread_id:
1133            # Use the thread suffix form.
1134            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1135                thread_id)
1136        else:
1137            request = "read packet: $QSaveRegisterState#00"
1138
1139        self.test_sequence.add_log_lines([request,
1140                                          {"direction": "send",
1141                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1142                                           "capture": {1: "save_response"}},
1143                                          ],
1144                                         True)
1145
1146    def parse_QSaveRegisterState_response(self, context):
1147        self.assertIsNotNone(context)
1148
1149        save_response = context.get("save_response")
1150        self.assertIsNotNone(save_response)
1151
1152        if len(save_response) < 1 or save_response[0] == "E":
1153            # error received
1154            return (False, None)
1155        else:
1156            return (True, int(save_response))
1157
1158    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1159        if thread_id:
1160            # Use the thread suffix form.
1161            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1162                save_id, thread_id)
1163        else:
1164            request = "read packet: $QRestoreRegisterState:{}#00".format(
1165                save_id)
1166
1167        self.test_sequence.add_log_lines([
1168            request,
1169            "send packet: $OK#00"
1170        ], True)
1171
1172    def flip_all_bits_in_each_register_value(
1173            self, reg_infos, endian, thread_id=None):
1174        self.assertIsNotNone(reg_infos)
1175
1176        successful_writes = 0
1177        failed_writes = 0
1178
1179        for reg_info in reg_infos:
1180            # Use the lldb register index added to the reg info.  We're not necessarily
1181            # working off a full set of register infos, so an inferred register
1182            # index could be wrong.
1183            reg_index = reg_info["lldb_register_index"]
1184            self.assertIsNotNone(reg_index)
1185
1186            reg_byte_size = int(reg_info["bitsize"]) // 8
1187            self.assertTrue(reg_byte_size > 0)
1188
1189            # Handle thread suffix.
1190            if thread_id:
1191                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1192                    reg_index, thread_id)
1193            else:
1194                p_request = "read packet: $p{:x}#00".format(reg_index)
1195
1196            # Read the existing value.
1197            self.reset_test_sequence()
1198            self.test_sequence.add_log_lines([
1199                p_request,
1200                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1201            ], True)
1202            context = self.expect_gdbremote_sequence()
1203            self.assertIsNotNone(context)
1204
1205            # Verify the response length.
1206            p_response = context.get("p_response")
1207            self.assertIsNotNone(p_response)
1208            initial_reg_value = unpack_register_hex_unsigned(
1209                endian, p_response)
1210
1211            # Flip the value by xoring with all 1s
1212            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1213            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1214            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1215
1216            # Handle thread suffix for P.
1217            if thread_id:
1218                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1219                    reg_index, pack_register_hex(
1220                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1221            else:
1222                P_request = "read packet: $P{:x}={}#00".format(
1223                    reg_index, pack_register_hex(
1224                        endian, flipped_bits_int, byte_size=reg_byte_size))
1225
1226            # Write the flipped value to the register.
1227            self.reset_test_sequence()
1228            self.test_sequence.add_log_lines([P_request,
1229                                              {"direction": "send",
1230                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1231                                               "capture": {1: "P_response"}},
1232                                              ],
1233                                             True)
1234            context = self.expect_gdbremote_sequence()
1235            self.assertIsNotNone(context)
1236
1237            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1238            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1239            # all flipping perfectly.
1240            P_response = context.get("P_response")
1241            self.assertIsNotNone(P_response)
1242            if P_response == "OK":
1243                successful_writes += 1
1244            else:
1245                failed_writes += 1
1246                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1247
1248            # Read back the register value, ensure it matches the flipped
1249            # value.
1250            if P_response == "OK":
1251                self.reset_test_sequence()
1252                self.test_sequence.add_log_lines([
1253                    p_request,
1254                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1255                ], True)
1256                context = self.expect_gdbremote_sequence()
1257                self.assertIsNotNone(context)
1258
1259                verify_p_response_raw = context.get("p_response")
1260                self.assertIsNotNone(verify_p_response_raw)
1261                verify_bits = unpack_register_hex_unsigned(
1262                    endian, verify_p_response_raw)
1263
1264                if verify_bits != flipped_bits_int:
1265                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1266                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1267                    successful_writes -= 1
1268                    failed_writes += 1
1269
1270        return (successful_writes, failed_writes)
1271
1272    def is_bit_flippable_register(self, reg_info):
1273        if not reg_info:
1274            return False
1275        if not "set" in reg_info:
1276            return False
1277        if reg_info["set"] != "General Purpose Registers":
1278            return False
1279        if ("container-regs" in reg_info) and (
1280                len(reg_info["container-regs"]) > 0):
1281            # Don't try to bit flip registers contained in another register.
1282            return False
1283        if re.match("^.s$", reg_info["name"]):
1284            # This is a 2-letter register name that ends in "s", like a segment register.
1285            # Don't try to bit flip these.
1286            return False
1287        if re.match("^(c|)psr$", reg_info["name"]):
1288            # This is an ARM program status register; don't flip it.
1289            return False
1290        # Okay, this looks fine-enough.
1291        return True
1292
1293    def read_register_values(self, reg_infos, endian, thread_id=None):
1294        self.assertIsNotNone(reg_infos)
1295        values = {}
1296
1297        for reg_info in reg_infos:
1298            # We append a register index when load reg infos so we can work
1299            # with subsets.
1300            reg_index = reg_info.get("lldb_register_index")
1301            self.assertIsNotNone(reg_index)
1302
1303            # Handle thread suffix.
1304            if thread_id:
1305                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1306                    reg_index, thread_id)
1307            else:
1308                p_request = "read packet: $p{:x}#00".format(reg_index)
1309
1310            # Read it with p.
1311            self.reset_test_sequence()
1312            self.test_sequence.add_log_lines([
1313                p_request,
1314                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1315            ], True)
1316            context = self.expect_gdbremote_sequence()
1317            self.assertIsNotNone(context)
1318
1319            # Convert value from target endian to integral.
1320            p_response = context.get("p_response")
1321            self.assertIsNotNone(p_response)
1322            self.assertTrue(len(p_response) > 0)
1323            self.assertFalse(p_response[0] == "E")
1324
1325            values[reg_index] = unpack_register_hex_unsigned(
1326                endian, p_response)
1327
1328        return values
1329
1330    def add_vCont_query_packets(self):
1331        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1332                                          {"direction": "send",
1333                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1334                                           "capture": {2: "vCont_query_response"}},
1335                                          ],
1336                                         True)
1337
1338    def parse_vCont_query_response(self, context):
1339        self.assertIsNotNone(context)
1340        vCont_query_response = context.get("vCont_query_response")
1341
1342        # Handle case of no vCont support at all - in which case the capture
1343        # group will be none or zero length.
1344        if not vCont_query_response or len(vCont_query_response) == 0:
1345            return {}
1346
1347        return {key: 1 for key in vCont_query_response.split(
1348            ";") if key and len(key) > 0}
1349
1350    def count_single_steps_until_true(
1351            self,
1352            thread_id,
1353            predicate,
1354            args,
1355            max_step_count=100,
1356            use_Hc_packet=True,
1357            step_instruction="s"):
1358        """Used by single step test that appears in a few different contexts."""
1359        single_step_count = 0
1360
1361        while single_step_count < max_step_count:
1362            self.assertIsNotNone(thread_id)
1363
1364            # Build the packet for the single step instruction.  We replace
1365            # {thread}, if present, with the thread_id.
1366            step_packet = "read packet: ${}#00".format(
1367                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1368            # print("\nstep_packet created: {}\n".format(step_packet))
1369
1370            # Single step.
1371            self.reset_test_sequence()
1372            if use_Hc_packet:
1373                self.test_sequence.add_log_lines(
1374                    [  # Set the continue thread.
1375                        "read packet: $Hc{0:x}#00".format(thread_id),
1376                        "send packet: $OK#00",
1377                    ], True)
1378            self.test_sequence.add_log_lines([
1379                # Single step.
1380                step_packet,
1381                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1382                # Expect a breakpoint stop report.
1383                {"direction": "send",
1384                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1385                 "capture": {1: "stop_signo",
1386                             2: "stop_thread_id"}},
1387            ], True)
1388            context = self.expect_gdbremote_sequence()
1389            self.assertIsNotNone(context)
1390            self.assertIsNotNone(context.get("stop_signo"))
1391            self.assertEqual(int(context.get("stop_signo"), 16),
1392                             lldbutil.get_signal_number('SIGTRAP'))
1393
1394            single_step_count += 1
1395
1396            # See if the predicate is true.  If so, we're done.
1397            if predicate(args):
1398                return (True, single_step_count)
1399
1400        # The predicate didn't return true within the runaway step count.
1401        return (False, single_step_count)
1402
1403    def g_c1_c2_contents_are(self, args):
1404        """Used by single step test that appears in a few different contexts."""
1405        g_c1_address = args["g_c1_address"]
1406        g_c2_address = args["g_c2_address"]
1407        expected_g_c1 = args["expected_g_c1"]
1408        expected_g_c2 = args["expected_g_c2"]
1409
1410        # Read g_c1 and g_c2 contents.
1411        self.reset_test_sequence()
1412        self.test_sequence.add_log_lines(
1413            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1414             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1415             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1416             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1417            True)
1418
1419        # Run the packet stream.
1420        context = self.expect_gdbremote_sequence()
1421        self.assertIsNotNone(context)
1422
1423        # Check if what we read from inferior memory is what we are expecting.
1424        self.assertIsNotNone(context.get("g_c1_contents"))
1425        self.assertIsNotNone(context.get("g_c2_contents"))
1426
1427        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1428            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1429
1430    def single_step_only_steps_one_instruction(
1431            self, use_Hc_packet=True, step_instruction="s"):
1432        """Used by single step test that appears in a few different contexts."""
1433        # Start up the inferior.
1434        procs = self.prep_debug_monitor_and_inferior(
1435            inferior_args=[
1436                "get-code-address-hex:swap_chars",
1437                "get-data-address-hex:g_c1",
1438                "get-data-address-hex:g_c2",
1439                "sleep:1",
1440                "call-function:swap_chars",
1441                "sleep:5"])
1442
1443        # Run the process
1444        self.test_sequence.add_log_lines(
1445            [  # Start running after initial stop.
1446                "read packet: $c#63",
1447                # Match output line that prints the memory address of the function call entry point.
1448                # Note we require launch-only testing so we can get inferior otuput.
1449                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1450                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1451                # Now stop the inferior.
1452                "read packet: {}".format(chr(3)),
1453                # And wait for the stop notification.
1454                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1455            True)
1456
1457        # Run the packet stream.
1458        context = self.expect_gdbremote_sequence()
1459        self.assertIsNotNone(context)
1460
1461        # Grab the main thread id.
1462        self.assertIsNotNone(context.get("stop_thread_id"))
1463        main_thread_id = int(context.get("stop_thread_id"), 16)
1464
1465        # Grab the function address.
1466        self.assertIsNotNone(context.get("function_address"))
1467        function_address = int(context.get("function_address"), 16)
1468
1469        # Grab the data addresses.
1470        self.assertIsNotNone(context.get("g_c1_address"))
1471        g_c1_address = int(context.get("g_c1_address"), 16)
1472
1473        self.assertIsNotNone(context.get("g_c2_address"))
1474        g_c2_address = int(context.get("g_c2_address"), 16)
1475
1476        # Set a breakpoint at the given address.
1477        if self.getArchitecture().startswith("arm"):
1478            # TODO: Handle case when setting breakpoint in thumb code
1479            BREAKPOINT_KIND = 4
1480        else:
1481            BREAKPOINT_KIND = 1
1482        self.reset_test_sequence()
1483        self.add_set_breakpoint_packets(
1484            function_address,
1485            do_continue=True,
1486            breakpoint_kind=BREAKPOINT_KIND)
1487        context = self.expect_gdbremote_sequence()
1488        self.assertIsNotNone(context)
1489
1490        # Remove the breakpoint.
1491        self.reset_test_sequence()
1492        self.add_remove_breakpoint_packets(
1493            function_address, breakpoint_kind=BREAKPOINT_KIND)
1494        context = self.expect_gdbremote_sequence()
1495        self.assertIsNotNone(context)
1496
1497        # Verify g_c1 and g_c2 match expected initial state.
1498        args = {}
1499        args["g_c1_address"] = g_c1_address
1500        args["g_c2_address"] = g_c2_address
1501        args["expected_g_c1"] = "0"
1502        args["expected_g_c2"] = "1"
1503
1504        self.assertTrue(self.g_c1_c2_contents_are(args))
1505
1506        # Verify we take only a small number of steps to hit the first state.
1507        # Might need to work through function entry prologue code.
1508        args["expected_g_c1"] = "1"
1509        args["expected_g_c2"] = "1"
1510        (state_reached,
1511         step_count) = self.count_single_steps_until_true(main_thread_id,
1512                                                          self.g_c1_c2_contents_are,
1513                                                          args,
1514                                                          max_step_count=25,
1515                                                          use_Hc_packet=use_Hc_packet,
1516                                                          step_instruction=step_instruction)
1517        self.assertTrue(state_reached)
1518
1519        # Verify we hit the next state.
1520        args["expected_g_c1"] = "1"
1521        args["expected_g_c2"] = "0"
1522        (state_reached,
1523         step_count) = self.count_single_steps_until_true(main_thread_id,
1524                                                          self.g_c1_c2_contents_are,
1525                                                          args,
1526                                                          max_step_count=5,
1527                                                          use_Hc_packet=use_Hc_packet,
1528                                                          step_instruction=step_instruction)
1529        self.assertTrue(state_reached)
1530        expected_step_count = 1
1531        arch = self.getArchitecture()
1532
1533        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1534        # of variable value
1535        if re.match("mips", arch):
1536            expected_step_count = 3
1537        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1538        # variable value
1539        if re.match("s390x", arch):
1540            expected_step_count = 2
1541        # ARM64 requires "4" instructions: 2 to compute the address (adrp, add),
1542        # one to materialize the constant (mov) and the store
1543        if re.match("arm64", arch):
1544            expected_step_count = 4
1545
1546        self.assertEqual(step_count, expected_step_count)
1547
1548        # ARM64: Once addresses and constants are materialized, only one
1549        # instruction is needed.
1550        if re.match("arm64", arch):
1551            expected_step_count = 1
1552
1553        # Verify we hit the next state.
1554        args["expected_g_c1"] = "0"
1555        args["expected_g_c2"] = "0"
1556        (state_reached,
1557         step_count) = self.count_single_steps_until_true(main_thread_id,
1558                                                          self.g_c1_c2_contents_are,
1559                                                          args,
1560                                                          max_step_count=5,
1561                                                          use_Hc_packet=use_Hc_packet,
1562                                                          step_instruction=step_instruction)
1563        self.assertTrue(state_reached)
1564        self.assertEqual(step_count, expected_step_count)
1565
1566        # Verify we hit the next state.
1567        args["expected_g_c1"] = "0"
1568        args["expected_g_c2"] = "1"
1569        (state_reached,
1570         step_count) = self.count_single_steps_until_true(main_thread_id,
1571                                                          self.g_c1_c2_contents_are,
1572                                                          args,
1573                                                          max_step_count=5,
1574                                                          use_Hc_packet=use_Hc_packet,
1575                                                          step_instruction=step_instruction)
1576        self.assertTrue(state_reached)
1577        self.assertEqual(step_count, expected_step_count)
1578
1579    def maybe_strict_output_regex(self, regex):
1580        return '.*' + regex + \
1581            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1582
1583    def install_and_create_launch_args(self):
1584        exe_path = self.getBuildArtifact("a.out")
1585        if not lldb.remote_platform:
1586            return [exe_path]
1587        remote_path = lldbutil.append_to_process_working_directory(self,
1588            os.path.basename(exe_path))
1589        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1590        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1591                                           remote_file_spec)
1592        if err.Fail():
1593            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1594                            (exe_path, remote_path, err))
1595        return [remote_path]
1596