1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62class GdbRemoteTestCaseBase(Base, metaclass=GdbRemoteTestCaseFactory):
63
64    # Default time out in seconds. The timeout is increased tenfold under Asan.
65    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
66    # Default sleep time in seconds. The sleep time is doubled under Asan.
67    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
68
69    _GDBREMOTE_KILL_PACKET = b"$k#6b"
70
71    # Start the inferior separately, attach to the inferior on the stub
72    # command line.
73    _STARTUP_ATTACH = "attach"
74    # Start the inferior separately, start the stub without attaching, allow
75    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
76    _STARTUP_ATTACH_MANUALLY = "attach_manually"
77    # Start the stub, and launch the inferior with an $A packet via the
78    # initial packet stream.
79    _STARTUP_LAUNCH = "launch"
80
81    # GDB Signal numbers that are not target-specific used for common
82    # exceptions
83    TARGET_EXC_BAD_ACCESS = 0x91
84    TARGET_EXC_BAD_INSTRUCTION = 0x92
85    TARGET_EXC_ARITHMETIC = 0x93
86    TARGET_EXC_EMULATION = 0x94
87    TARGET_EXC_SOFTWARE = 0x95
88    TARGET_EXC_BREAKPOINT = 0x96
89
90    _verbose_log_handler = None
91    _log_formatter = logging.Formatter(
92        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
93
94    def setUpBaseLogging(self):
95        self.logger = logging.getLogger(__name__)
96
97        if len(self.logger.handlers) > 0:
98            return  # We have set up this handler already
99
100        self.logger.propagate = False
101        self.logger.setLevel(logging.DEBUG)
102
103        # log all warnings to stderr
104        handler = logging.StreamHandler()
105        handler.setLevel(logging.WARNING)
106        handler.setFormatter(self._log_formatter)
107        self.logger.addHandler(handler)
108
109    def isVerboseLoggingRequested(self):
110        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
111        # logged.
112        return any(("gdb-remote" in channel)
113                   for channel in lldbtest_config.channels)
114
115    def getDebugServer(self):
116        method = getattr(self, self.testMethodName)
117        return getattr(method, "debug_server", None)
118
119    def setUp(self):
120        super(GdbRemoteTestCaseBase, self).setUp()
121
122        self.setUpBaseLogging()
123        self.debug_monitor_extra_args = []
124
125        if self.isVerboseLoggingRequested():
126            # If requested, full logs go to a log file
127            self._verbose_log_handler = logging.FileHandler(
128                self.getLogBasenameForCurrentTest() + "-host.log")
129            self._verbose_log_handler.setFormatter(self._log_formatter)
130            self._verbose_log_handler.setLevel(logging.DEBUG)
131            self.logger.addHandler(self._verbose_log_handler)
132
133        self.test_sequence = GdbRemoteTestSequence(self.logger)
134        self.set_inferior_startup_launch()
135        self.port = self.get_next_port()
136        self.stub_sends_two_stop_notifications_on_kill = False
137        if configuration.lldb_platform_url:
138            if configuration.lldb_platform_url.startswith('unix-'):
139                url_pattern = '(.+)://\[?(.+?)\]?/.*'
140            else:
141                url_pattern = '(.+)://(.+):\d+'
142            scheme, host = re.match(
143                url_pattern, configuration.lldb_platform_url).groups()
144            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
145                self.stub_device = host
146                self.stub_hostname = 'localhost'
147            else:
148                self.stub_device = None
149                self.stub_hostname = host
150        else:
151            self.stub_hostname = "localhost"
152
153        debug_server = self.getDebugServer()
154        if debug_server == "debugserver":
155            self._init_debugserver_test()
156        else:
157            self._init_llgs_test()
158
159    def tearDown(self):
160        self.logger.removeHandler(self._verbose_log_handler)
161        self._verbose_log_handler = None
162        TestBase.tearDown(self)
163
164    def getLocalServerLogFile(self):
165        return self.getLogBasenameForCurrentTest() + "-server.log"
166
167    def setUpServerLogging(self, is_llgs):
168        if len(lldbtest_config.channels) == 0:
169            return  # No logging requested
170
171        if lldb.remote_platform:
172            log_file = lldbutil.join_remote_paths(
173                lldb.remote_platform.GetWorkingDirectory(), "server.log")
174        else:
175            log_file = self.getLocalServerLogFile()
176
177        if is_llgs:
178            self.debug_monitor_extra_args.append("--log-file=" + log_file)
179            self.debug_monitor_extra_args.append(
180                "--log-channels={}".format(":".join(lldbtest_config.channels)))
181        else:
182            self.debug_monitor_extra_args = [
183                "--log-file=" + log_file, "--log-flags=0x800000"]
184
185    def get_next_port(self):
186        return 12000 + random.randint(0, 3999)
187
188    def reset_test_sequence(self):
189        self.test_sequence = GdbRemoteTestSequence(self.logger)
190
191
192    def _init_llgs_test(self):
193        reverse_connect = True
194        if lldb.remote_platform:
195            # Reverse connections may be tricky due to firewalls/NATs.
196            reverse_connect = False
197
198            # FIXME: This is extremely linux-oriented
199
200            # Grab the ppid from /proc/[shell pid]/stat
201            err, retcode, shell_stat = self.run_platform_command(
202                "cat /proc/$$/stat")
203            self.assertTrue(
204                err.Success() and retcode == 0,
205                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
206                (err.GetCString(),
207                 retcode))
208
209            # [pid] ([executable]) [state] [*ppid*]
210            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
211            err, retcode, ls_output = self.run_platform_command(
212                "ls -l /proc/%s/exe" % pid)
213            self.assertTrue(
214                err.Success() and retcode == 0,
215                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
216                (pid,
217                 err.GetCString(),
218                 retcode))
219            exe = ls_output.split()[-1]
220
221            # If the binary has been deleted, the link name has " (deleted)" appended.
222            # Remove if it's there.
223            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
224        else:
225            self.debug_monitor_exe = get_lldb_server_exe()
226
227        self.debug_monitor_extra_args = ["gdbserver"]
228        self.setUpServerLogging(is_llgs=True)
229
230        self.reverse_connect = reverse_connect
231
232    def _init_debugserver_test(self):
233        self.debug_monitor_exe = get_debugserver_exe()
234        self.setUpServerLogging(is_llgs=False)
235        self.reverse_connect = True
236
237        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
238        # when the process truly dies.
239        self.stub_sends_two_stop_notifications_on_kill = True
240
241    def forward_adb_port(self, source, target, direction, device):
242        adb = ['adb'] + (['-s', device] if device else []) + [direction]
243
244        def remove_port_forward():
245            subprocess.call(adb + ["--remove", "tcp:%d" % source])
246
247        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
248        self.addTearDownHook(remove_port_forward)
249
250    def _verify_socket(self, sock):
251        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
252        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
253        # the connect() will always be successful, but the connection will be immediately dropped
254        # if ADB could not connect on the remote side. This function tries to detect this
255        # situation, and report it as "connection refused" so that the upper layers attempt the
256        # connection again.
257        triple = self.dbg.GetSelectedPlatform().GetTriple()
258        if not re.match(".*-.*-.*-android", triple):
259            return  # Not android.
260        can_read, _, _ = select.select([sock], [], [], 0.1)
261        if sock not in can_read:
262            return  # Data is not available, but the connection is alive.
263        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
264            raise _ConnectionRefused()  # Got EOF, connection dropped.
265
266    def create_socket(self):
267        try:
268            sock = socket.socket(family=socket.AF_INET)
269        except OSError as e:
270            if e.errno != errno.EAFNOSUPPORT:
271                raise
272            sock = socket.socket(family=socket.AF_INET6)
273
274        logger = self.logger
275
276        triple = self.dbg.GetSelectedPlatform().GetTriple()
277        if re.match(".*-.*-.*-android", triple):
278            self.forward_adb_port(
279                self.port,
280                self.port,
281                "forward",
282                self.stub_device)
283
284        logger.info(
285            "Connecting to debug monitor on %s:%d",
286            self.stub_hostname,
287            self.port)
288        connect_info = (self.stub_hostname, self.port)
289        try:
290            sock.connect(connect_info)
291        except socket.error as serr:
292            if serr.errno == errno.ECONNREFUSED:
293                raise _ConnectionRefused()
294            raise serr
295
296        def shutdown_socket():
297            if sock:
298                try:
299                    # send the kill packet so lldb-server shuts down gracefully
300                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
301                except:
302                    logger.warning(
303                        "failed to send kill packet to debug monitor: {}; ignoring".format(
304                            sys.exc_info()[0]))
305
306                try:
307                    sock.close()
308                except:
309                    logger.warning(
310                        "failed to close socket to debug monitor: {}; ignoring".format(
311                            sys.exc_info()[0]))
312
313        self.addTearDownHook(shutdown_socket)
314
315        self._verify_socket(sock)
316
317        return sock
318
319    def set_inferior_startup_launch(self):
320        self._inferior_startup = self._STARTUP_LAUNCH
321
322    def set_inferior_startup_attach(self):
323        self._inferior_startup = self._STARTUP_ATTACH
324
325    def set_inferior_startup_attach_manually(self):
326        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
327
328    def get_debug_monitor_command_line_args(self, attach_pid=None):
329        commandline_args = self.debug_monitor_extra_args
330        if attach_pid:
331            commandline_args += ["--attach=%d" % attach_pid]
332        if self.reverse_connect:
333            commandline_args += ["--reverse-connect", self.connect_address]
334        else:
335            if lldb.remote_platform:
336                commandline_args += ["*:{}".format(self.port)]
337            else:
338                commandline_args += ["localhost:{}".format(self.port)]
339
340        return commandline_args
341
342    def get_target_byte_order(self):
343        inferior_exe_path = self.getBuildArtifact("a.out")
344        target = self.dbg.CreateTarget(inferior_exe_path)
345        return target.GetByteOrder()
346
347    def launch_debug_monitor(self, attach_pid=None, logfile=None):
348        if self.reverse_connect:
349            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
350            sock = socket.socket(family, type, proto)
351            sock.settimeout(self.DEFAULT_TIMEOUT)
352
353            sock.bind(addr)
354            sock.listen(1)
355            addr = sock.getsockname()
356            self.connect_address = "[{}]:{}".format(*addr)
357
358
359        # Create the command line.
360        commandline_args = self.get_debug_monitor_command_line_args(
361            attach_pid=attach_pid)
362
363        # Start the server.
364        server = self.spawnSubprocess(
365            self.debug_monitor_exe,
366            commandline_args,
367            install_remote=False)
368        self.assertIsNotNone(server)
369
370        if self.reverse_connect:
371            self.sock = sock.accept()[0]
372            self.sock.settimeout(self.DEFAULT_TIMEOUT)
373
374        return server
375
376    def connect_to_debug_monitor(self, attach_pid=None):
377        if self.reverse_connect:
378            # Create the stub.
379            server = self.launch_debug_monitor(attach_pid=attach_pid)
380            self.assertIsNotNone(server)
381
382            # Schedule debug monitor to be shut down during teardown.
383            logger = self.logger
384
385            self._server = Server(self.sock, server)
386            return server
387
388        # We're using a random port algorithm to try not to collide with other ports,
389        # and retry a max # times.
390        attempts = 0
391        MAX_ATTEMPTS = 20
392
393        while attempts < MAX_ATTEMPTS:
394            server = self.launch_debug_monitor(attach_pid=attach_pid)
395
396            # Schedule debug monitor to be shut down during teardown.
397            logger = self.logger
398
399            connect_attemps = 0
400            MAX_CONNECT_ATTEMPTS = 10
401
402            while connect_attemps < MAX_CONNECT_ATTEMPTS:
403                # Create a socket to talk to the server
404                try:
405                    logger.info("Connect attempt %d", connect_attemps + 1)
406                    self.sock = self.create_socket()
407                    self._server = Server(self.sock, server)
408                    return server
409                except _ConnectionRefused as serr:
410                    # Ignore, and try again.
411                    pass
412                time.sleep(0.5)
413                connect_attemps += 1
414
415            # We should close the server here to be safe.
416            server.terminate()
417
418            # Increment attempts.
419            print(
420                "connect to debug monitor on port %d failed, attempt #%d of %d" %
421                (self.port, attempts + 1, MAX_ATTEMPTS))
422            attempts += 1
423
424            # And wait a random length of time before next attempt, to avoid
425            # collisions.
426            time.sleep(random.randint(1, 5))
427
428            # Now grab a new port number.
429            self.port = self.get_next_port()
430
431        raise Exception(
432            "failed to create a socket to the launched debug monitor after %d tries" %
433            attempts)
434
435    def launch_process_for_attach(
436            self,
437            inferior_args=None,
438            sleep_seconds=3,
439            exe_path=None):
440        # We're going to start a child process that the debug monitor stub can later attach to.
441        # This process needs to be started so that it just hangs around for a while.  We'll
442        # have it sleep.
443        if not exe_path:
444            exe_path = self.getBuildArtifact("a.out")
445
446        args = []
447        if inferior_args:
448            args.extend(inferior_args)
449        if sleep_seconds:
450            args.append("sleep:%d" % sleep_seconds)
451
452        return self.spawnSubprocess(exe_path, args)
453
454    def prep_debug_monitor_and_inferior(
455            self,
456            inferior_args=None,
457            inferior_sleep_seconds=3,
458            inferior_exe_path=None,
459            inferior_env=None):
460        """Prep the debug monitor, the inferior, and the expected packet stream.
461
462        Handle the separate cases of using the debug monitor in attach-to-inferior mode
463        and in launch-inferior mode.
464
465        For attach-to-inferior mode, the inferior process is first started, then
466        the debug monitor is started in attach to pid mode (using --attach on the
467        stub command line), and the no-ack-mode setup is appended to the packet
468        stream.  The packet stream is not yet executed, ready to have more expected
469        packet entries added to it.
470
471        For launch-inferior mode, the stub is first started, then no ack mode is
472        setup on the expected packet stream, then the verified launch packets are added
473        to the expected socket stream.  The packet stream is not yet executed, ready
474        to have more expected packet entries added to it.
475
476        The return value is:
477        {inferior:<inferior>, server:<server>}
478        """
479        inferior = None
480        attach_pid = None
481
482        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
483            # Launch the process that we'll use as the inferior.
484            inferior = self.launch_process_for_attach(
485                inferior_args=inferior_args,
486                sleep_seconds=inferior_sleep_seconds,
487                exe_path=inferior_exe_path)
488            self.assertIsNotNone(inferior)
489            self.assertTrue(inferior.pid > 0)
490            if self._inferior_startup == self._STARTUP_ATTACH:
491                # In this case, we want the stub to attach via the command
492                # line, so set the command line attach pid here.
493                attach_pid = inferior.pid
494
495        if self._inferior_startup == self._STARTUP_LAUNCH:
496            # Build launch args
497            if not inferior_exe_path:
498                inferior_exe_path = self.getBuildArtifact("a.out")
499
500            if lldb.remote_platform:
501                remote_path = lldbutil.append_to_process_working_directory(self,
502                    os.path.basename(inferior_exe_path))
503                remote_file_spec = lldb.SBFileSpec(remote_path, False)
504                err = lldb.remote_platform.Install(lldb.SBFileSpec(
505                    inferior_exe_path, True), remote_file_spec)
506                if err.Fail():
507                    raise Exception(
508                        "remote_platform.Install('%s', '%s') failed: %s" %
509                        (inferior_exe_path, remote_path, err))
510                inferior_exe_path = remote_path
511
512            launch_args = [inferior_exe_path]
513            if inferior_args:
514                launch_args.extend(inferior_args)
515
516        # Launch the debug monitor stub, attaching to the inferior.
517        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
518        self.assertIsNotNone(server)
519
520        self.do_handshake()
521
522        # Build the expected protocol stream
523        if inferior_env:
524            for name, value in inferior_env.items():
525                self.add_set_environment_packets(name, value)
526        if self._inferior_startup == self._STARTUP_LAUNCH:
527            self.add_verified_launch_packets(launch_args)
528
529        return {"inferior": inferior, "server": server}
530
531    def do_handshake(self):
532        server = self._server
533        server.send_ack()
534        server.send_packet(b"QStartNoAckMode")
535        self.assertEqual(server.get_normal_packet(), b"+")
536        self.assertEqual(server.get_normal_packet(), b"OK")
537        server.send_ack()
538
539    def add_verified_launch_packets(self, launch_args):
540        self.test_sequence.add_log_lines(
541            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
542             "send packet: $OK#00",
543             "read packet: $qLaunchSuccess#a5",
544             "send packet: $OK#00"],
545            True)
546
547    def add_thread_suffix_request_packets(self):
548        self.test_sequence.add_log_lines(
549            ["read packet: $QThreadSuffixSupported#e4",
550             "send packet: $OK#00",
551             ], True)
552
553    def add_process_info_collection_packets(self):
554        self.test_sequence.add_log_lines(
555            ["read packet: $qProcessInfo#dc",
556             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
557            True)
558
559    def add_set_environment_packets(self, name, value):
560        self.test_sequence.add_log_lines(
561            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
562             "send packet: $OK#00",
563             ], True)
564
565    _KNOWN_PROCESS_INFO_KEYS = [
566        "pid",
567        "parent-pid",
568        "real-uid",
569        "real-gid",
570        "effective-uid",
571        "effective-gid",
572        "cputype",
573        "cpusubtype",
574        "ostype",
575        "triple",
576        "vendor",
577        "endian",
578        "elf_abi",
579        "ptrsize"
580    ]
581
582    def parse_process_info_response(self, context):
583        # Ensure we have a process info response.
584        self.assertIsNotNone(context)
585        process_info_raw = context.get("process_info_raw")
586        self.assertIsNotNone(process_info_raw)
587
588        # Pull out key:value; pairs.
589        process_info_dict = {
590            match.group(1): match.group(2) for match in re.finditer(
591                r"([^:]+):([^;]+);", process_info_raw)}
592
593        # Validate keys are known.
594        for (key, val) in list(process_info_dict.items()):
595            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
596            self.assertIsNotNone(val)
597
598        return process_info_dict
599
600    def add_register_info_collection_packets(self):
601        self.test_sequence.add_log_lines(
602            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
603                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
604                "save_key": "reg_info_responses"}],
605            True)
606
607    def parse_register_info_packets(self, context):
608        """Return an array of register info dictionaries, one per register info."""
609        reg_info_responses = context.get("reg_info_responses")
610        self.assertIsNotNone(reg_info_responses)
611
612        # Parse register infos.
613        return [parse_reg_info_response(reg_info_response)
614                for reg_info_response in reg_info_responses]
615
616    def expect_gdbremote_sequence(self):
617        return expect_lldb_gdbserver_replay(
618            self,
619            self._server,
620            self.test_sequence,
621            self.DEFAULT_TIMEOUT * len(self.test_sequence),
622            self.logger)
623
624    _KNOWN_REGINFO_KEYS = [
625        "name",
626        "alt-name",
627        "bitsize",
628        "offset",
629        "encoding",
630        "format",
631        "set",
632        "gcc",
633        "ehframe",
634        "dwarf",
635        "generic",
636        "container-regs",
637        "invalidate-regs",
638        "dynamic_size_dwarf_expr_bytes",
639        "dynamic_size_dwarf_len"
640    ]
641
642    def assert_valid_reg_info(self, reg_info):
643        # Assert we know about all the reginfo keys parsed.
644        for key in reg_info:
645            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
646
647        # Check the bare-minimum expected set of register info keys.
648        self.assertTrue("name" in reg_info)
649        self.assertTrue("bitsize" in reg_info)
650
651        if not self.getArchitecture() == 'aarch64':
652            self.assertTrue("offset" in reg_info)
653
654        self.assertTrue("encoding" in reg_info)
655        self.assertTrue("format" in reg_info)
656
657    def find_pc_reg_info(self, reg_infos):
658        lldb_reg_index = 0
659        for reg_info in reg_infos:
660            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
661                return (lldb_reg_index, reg_info)
662            lldb_reg_index += 1
663
664        return (None, None)
665
666    def add_lldb_register_index(self, reg_infos):
667        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
668
669        We'll use this when we want to call packets like P/p with a register index but do so
670        on only a subset of the full register info set.
671        """
672        self.assertIsNotNone(reg_infos)
673
674        reg_index = 0
675        for reg_info in reg_infos:
676            reg_info["lldb_register_index"] = reg_index
677            reg_index += 1
678
679    def add_query_memory_region_packets(self, address):
680        self.test_sequence.add_log_lines(
681            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
682             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
683            True)
684
685    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
686        self.assertIsNotNone(key_val_text)
687        kv_dict = {}
688        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
689            key = match.group(1)
690            val = match.group(2)
691            if key in kv_dict:
692                if allow_dupes:
693                    if isinstance(kv_dict[key], list):
694                        kv_dict[key].append(val)
695                    else:
696                        # Promote to list
697                        kv_dict[key] = [kv_dict[key], val]
698                else:
699                    self.fail(
700                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
701                            key, val, key_val_text, kv_dict))
702            else:
703                kv_dict[key] = val
704        return kv_dict
705
706    def parse_memory_region_packet(self, context):
707        # Ensure we have a context.
708        self.assertIsNotNone(context.get("memory_region_response"))
709
710        # Pull out key:value; pairs.
711        mem_region_dict = self.parse_key_val_dict(
712            context.get("memory_region_response"))
713
714        # Validate keys are known.
715        for (key, val) in list(mem_region_dict.items()):
716            self.assertIn(key,
717                ["start",
718                 "size",
719                 "permissions",
720                 "flags",
721                 "name",
722                 "error",
723                 "dirty-pages",
724                 "type"])
725            self.assertIsNotNone(val)
726
727        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
728        # Return the dictionary of key-value pairs for the memory region.
729        return mem_region_dict
730
731    def assert_address_within_memory_region(
732            self, test_address, mem_region_dict):
733        self.assertIsNotNone(mem_region_dict)
734        self.assertTrue("start" in mem_region_dict)
735        self.assertTrue("size" in mem_region_dict)
736
737        range_start = int(mem_region_dict["start"], 16)
738        range_size = int(mem_region_dict["size"], 16)
739        range_end = range_start + range_size
740
741        if test_address < range_start:
742            self.fail(
743                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
744                    test_address,
745                    range_start,
746                    range_end,
747                    range_size))
748        elif test_address >= range_end:
749            self.fail(
750                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
751                    test_address,
752                    range_start,
753                    range_end,
754                    range_size))
755
756    def add_threadinfo_collection_packets(self):
757        self.test_sequence.add_log_lines(
758            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
759                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
760                "save_key": "threadinfo_responses"}],
761            True)
762
763    def parse_threadinfo_packets(self, context):
764        """Return an array of thread ids (decimal ints), one per thread."""
765        threadinfo_responses = context.get("threadinfo_responses")
766        self.assertIsNotNone(threadinfo_responses)
767
768        thread_ids = []
769        for threadinfo_response in threadinfo_responses:
770            new_thread_infos = parse_threadinfo_response(threadinfo_response)
771            thread_ids.extend(new_thread_infos)
772        return thread_ids
773
774    def launch_with_threads(self, thread_count):
775        procs = self.prep_debug_monitor_and_inferior(
776                inferior_args=["thread:new"]*(thread_count-1) + ["trap"])
777
778        self.test_sequence.add_log_lines([
779                "read packet: $c#00",
780                {"direction": "send",
781                    "regex": r"^\$T([0-9a-fA-F]{2})([^#]*)#..$",
782                    "capture": {1: "stop_signo", 2: "stop_reply_kv"}}], True)
783        self.add_threadinfo_collection_packets()
784        context = self.expect_gdbremote_sequence()
785        threads = self.parse_threadinfo_packets(context)
786        self.assertGreaterEqual(len(threads), thread_count)
787        return context, threads
788
789    def add_set_breakpoint_packets(
790            self,
791            address,
792            z_packet_type=0,
793            do_continue=True,
794            breakpoint_kind=1):
795        self.test_sequence.add_log_lines(
796            [  # Set the breakpoint.
797                "read packet: $Z{2},{0:x},{1}#00".format(
798                    address, breakpoint_kind, z_packet_type),
799                # Verify the stub could set it.
800                "send packet: $OK#00",
801            ], True)
802
803        if (do_continue):
804            self.test_sequence.add_log_lines(
805                [  # Continue the inferior.
806                    "read packet: $c#63",
807                    # Expect a breakpoint stop report.
808                    {"direction": "send",
809                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
810                     "capture": {1: "stop_signo",
811                                 2: "stop_thread_id"}},
812                ], True)
813
814    def add_remove_breakpoint_packets(
815            self,
816            address,
817            z_packet_type=0,
818            breakpoint_kind=1):
819        self.test_sequence.add_log_lines(
820            [  # Remove the breakpoint.
821                "read packet: $z{2},{0:x},{1}#00".format(
822                    address, breakpoint_kind, z_packet_type),
823                # Verify the stub could unset it.
824                "send packet: $OK#00",
825            ], True)
826
827    def add_qSupported_packets(self, client_features=[]):
828        features = ''.join(';' + x for x in client_features)
829        self.test_sequence.add_log_lines(
830            ["read packet: $qSupported{}#00".format(features),
831             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
832             ], True)
833
834    _KNOWN_QSUPPORTED_STUB_FEATURES = [
835        "augmented-libraries-svr4-read",
836        "PacketSize",
837        "QStartNoAckMode",
838        "QThreadSuffixSupported",
839        "QListThreadsInStopReply",
840        "qXfer:auxv:read",
841        "qXfer:libraries:read",
842        "qXfer:libraries-svr4:read",
843        "qXfer:features:read",
844        "qXfer:siginfo:read",
845        "qEcho",
846        "QPassSignals",
847        "multiprocess",
848        "fork-events",
849        "vfork-events",
850        "memory-tagging",
851        "qSaveCore",
852        "native-signals",
853        "QNonStop",
854    ]
855
856    def parse_qSupported_response(self, context):
857        self.assertIsNotNone(context)
858
859        raw_response = context.get("qSupported_response")
860        self.assertIsNotNone(raw_response)
861
862        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
863        # +,-,? is stripped from the key and set as the value.
864        supported_dict = {}
865        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
866            key = match.group(1)
867            val = match.group(3)
868
869            # key=val: store as is
870            if val and len(val) > 0:
871                supported_dict[key] = val
872            else:
873                if len(key) < 2:
874                    raise Exception(
875                        "singular stub feature is too short: must be stub_feature{+,-,?}")
876                supported_type = key[-1]
877                key = key[:-1]
878                if not supported_type in ["+", "-", "?"]:
879                    raise Exception(
880                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
881                supported_dict[key] = supported_type
882            # Ensure we know the supported element
883            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
884                raise Exception(
885                    "unknown qSupported stub feature reported: %s" %
886                    key)
887
888        return supported_dict
889
890    def continue_process_and_wait_for_stop(self):
891        self.test_sequence.add_log_lines(
892            [
893                "read packet: $vCont;c#a8",
894                {
895                    "direction": "send",
896                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
897                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
898                },
899            ],
900            True,
901        )
902        context = self.expect_gdbremote_sequence()
903        self.assertIsNotNone(context)
904        return self.parse_interrupt_packets(context)
905
906    def select_modifiable_register(self, reg_infos):
907        """Find a register that can be read/written freely."""
908        PREFERRED_REGISTER_NAMES = set(["rax", ])
909
910        # First check for the first register from the preferred register name
911        # set.
912        alternative_register_index = None
913
914        self.assertIsNotNone(reg_infos)
915        for reg_info in reg_infos:
916            if ("name" in reg_info) and (
917                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
918                # We found a preferred register.  Use it.
919                return reg_info["lldb_register_index"]
920            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
921                    reg_info["generic"] == "arg1"):
922                # A frame pointer or first arg register will do as a
923                # register to modify temporarily.
924                alternative_register_index = reg_info["lldb_register_index"]
925
926        # We didn't find a preferred register.  Return whatever alternative register
927        # we found, if any.
928        return alternative_register_index
929
930    def extract_registers_from_stop_notification(self, stop_key_vals_text):
931        self.assertIsNotNone(stop_key_vals_text)
932        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
933
934        registers = {}
935        for (key, val) in list(kv_dict.items()):
936            if re.match(r"^[0-9a-fA-F]+$", key):
937                registers[int(key, 16)] = val
938        return registers
939
940    def gather_register_infos(self):
941        self.reset_test_sequence()
942        self.add_register_info_collection_packets()
943
944        context = self.expect_gdbremote_sequence()
945        self.assertIsNotNone(context)
946
947        reg_infos = self.parse_register_info_packets(context)
948        self.assertIsNotNone(reg_infos)
949        self.add_lldb_register_index(reg_infos)
950
951        return reg_infos
952
953    def find_generic_register_with_name(self, reg_infos, generic_name):
954        self.assertIsNotNone(reg_infos)
955        for reg_info in reg_infos:
956            if ("generic" in reg_info) and (
957                    reg_info["generic"] == generic_name):
958                return reg_info
959        return None
960
961    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
962        self.assertIsNotNone(reg_infos)
963        for reg_info in reg_infos:
964            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
965                return reg_info
966        return None
967
968    def decode_gdbremote_binary(self, encoded_bytes):
969        decoded_bytes = ""
970        i = 0
971        while i < len(encoded_bytes):
972            if encoded_bytes[i] == "}":
973                # Handle escaped char.
974                self.assertTrue(i + 1 < len(encoded_bytes))
975                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
976                i += 2
977            elif encoded_bytes[i] == "*":
978                # Handle run length encoding.
979                self.assertTrue(len(decoded_bytes) > 0)
980                self.assertTrue(i + 1 < len(encoded_bytes))
981                repeat_count = ord(encoded_bytes[i + 1]) - 29
982                decoded_bytes += decoded_bytes[-1] * repeat_count
983                i += 2
984            else:
985                decoded_bytes += encoded_bytes[i]
986                i += 1
987        return decoded_bytes
988
989    def build_auxv_dict(self, endian, word_size, auxv_data):
990        self.assertIsNotNone(endian)
991        self.assertIsNotNone(word_size)
992        self.assertIsNotNone(auxv_data)
993
994        auxv_dict = {}
995
996        # PowerPC64le's auxvec has a special key that must be ignored.
997        # This special key may be used multiple times, resulting in
998        # multiple key/value pairs with the same key, which would otherwise
999        # break this test check for repeated keys.
1000        #
1001        # AT_IGNOREPPC = 22
1002        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1003        arch = self.getArchitecture()
1004        ignore_keys = None
1005        if arch in ignored_keys_for_arch:
1006            ignore_keys = ignored_keys_for_arch[arch]
1007
1008        while len(auxv_data) > 0:
1009            # Chop off key.
1010            raw_key = auxv_data[:word_size]
1011            auxv_data = auxv_data[word_size:]
1012
1013            # Chop of value.
1014            raw_value = auxv_data[:word_size]
1015            auxv_data = auxv_data[word_size:]
1016
1017            # Convert raw text from target endian.
1018            key = unpack_endian_binary_string(endian, raw_key)
1019            value = unpack_endian_binary_string(endian, raw_value)
1020
1021            if ignore_keys and key in ignore_keys:
1022                continue
1023
1024            # Handle ending entry.
1025            if key == 0:
1026                self.assertEqual(value, 0)
1027                return auxv_dict
1028
1029            # The key should not already be present.
1030            self.assertFalse(key in auxv_dict)
1031            auxv_dict[key] = value
1032
1033        self.fail(
1034            "should not reach here - implies required double zero entry not found")
1035        return auxv_dict
1036
1037    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1038        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1039        offset = 0
1040        done = False
1041        decoded_data = ""
1042
1043        while not done:
1044            # Grab the next iteration of data.
1045            self.reset_test_sequence()
1046            self.test_sequence.add_log_lines(
1047                [
1048                    "read packet: ${}{:x},{:x}:#00".format(
1049                        command_prefix,
1050                        offset,
1051                        chunk_length),
1052                    {
1053                        "direction": "send",
1054                        "regex": re.compile(
1055                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1056                            re.MULTILINE | re.DOTALL),
1057                        "capture": {
1058                            1: "response_type",
1059                            2: "content_raw"}}],
1060                True)
1061
1062            context = self.expect_gdbremote_sequence()
1063            self.assertIsNotNone(context)
1064
1065            response_type = context.get("response_type")
1066            self.assertIsNotNone(response_type)
1067            self.assertTrue(response_type in ["l", "m"])
1068
1069            # Move offset along.
1070            offset += chunk_length
1071
1072            # Figure out if we're done.  We're done if the response type is l.
1073            done = response_type == "l"
1074
1075            # Decode binary data.
1076            content_raw = context.get("content_raw")
1077            if content_raw and len(content_raw) > 0:
1078                self.assertIsNotNone(content_raw)
1079                decoded_data += self.decode_gdbremote_binary(content_raw)
1080        return decoded_data
1081
1082    def add_interrupt_packets(self):
1083        self.test_sequence.add_log_lines([
1084            # Send the intterupt.
1085            "read packet: {}".format(chr(3)),
1086            # And wait for the stop notification.
1087            {"direction": "send",
1088             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1089             "capture": {1: "stop_signo",
1090                         2: "stop_key_val_text"}},
1091        ], True)
1092
1093    def parse_interrupt_packets(self, context):
1094        self.assertIsNotNone(context.get("stop_signo"))
1095        self.assertIsNotNone(context.get("stop_key_val_text"))
1096        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1097            context["stop_key_val_text"]))
1098
1099    def add_QSaveRegisterState_packets(self, thread_id):
1100        if thread_id:
1101            # Use the thread suffix form.
1102            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1103                thread_id)
1104        else:
1105            request = "read packet: $QSaveRegisterState#00"
1106
1107        self.test_sequence.add_log_lines([request,
1108                                          {"direction": "send",
1109                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1110                                           "capture": {1: "save_response"}},
1111                                          ],
1112                                         True)
1113
1114    def parse_QSaveRegisterState_response(self, context):
1115        self.assertIsNotNone(context)
1116
1117        save_response = context.get("save_response")
1118        self.assertIsNotNone(save_response)
1119
1120        if len(save_response) < 1 or save_response[0] == "E":
1121            # error received
1122            return (False, None)
1123        else:
1124            return (True, int(save_response))
1125
1126    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1127        if thread_id:
1128            # Use the thread suffix form.
1129            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1130                save_id, thread_id)
1131        else:
1132            request = "read packet: $QRestoreRegisterState:{}#00".format(
1133                save_id)
1134
1135        self.test_sequence.add_log_lines([
1136            request,
1137            "send packet: $OK#00"
1138        ], True)
1139
1140    def flip_all_bits_in_each_register_value(
1141            self, reg_infos, endian, thread_id=None):
1142        self.assertIsNotNone(reg_infos)
1143
1144        successful_writes = 0
1145        failed_writes = 0
1146
1147        for reg_info in reg_infos:
1148            # Use the lldb register index added to the reg info.  We're not necessarily
1149            # working off a full set of register infos, so an inferred register
1150            # index could be wrong.
1151            reg_index = reg_info["lldb_register_index"]
1152            self.assertIsNotNone(reg_index)
1153
1154            reg_byte_size = int(reg_info["bitsize"]) // 8
1155            self.assertTrue(reg_byte_size > 0)
1156
1157            # Handle thread suffix.
1158            if thread_id:
1159                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1160                    reg_index, thread_id)
1161            else:
1162                p_request = "read packet: $p{:x}#00".format(reg_index)
1163
1164            # Read the existing value.
1165            self.reset_test_sequence()
1166            self.test_sequence.add_log_lines([
1167                p_request,
1168                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1169            ], True)
1170            context = self.expect_gdbremote_sequence()
1171            self.assertIsNotNone(context)
1172
1173            # Verify the response length.
1174            p_response = context.get("p_response")
1175            self.assertIsNotNone(p_response)
1176            initial_reg_value = unpack_register_hex_unsigned(
1177                endian, p_response)
1178
1179            # Flip the value by xoring with all 1s
1180            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1181            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1182            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1183
1184            # Handle thread suffix for P.
1185            if thread_id:
1186                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1187                    reg_index, pack_register_hex(
1188                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1189            else:
1190                P_request = "read packet: $P{:x}={}#00".format(
1191                    reg_index, pack_register_hex(
1192                        endian, flipped_bits_int, byte_size=reg_byte_size))
1193
1194            # Write the flipped value to the register.
1195            self.reset_test_sequence()
1196            self.test_sequence.add_log_lines([P_request,
1197                                              {"direction": "send",
1198                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1199                                               "capture": {1: "P_response"}},
1200                                              ],
1201                                             True)
1202            context = self.expect_gdbremote_sequence()
1203            self.assertIsNotNone(context)
1204
1205            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1206            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1207            # all flipping perfectly.
1208            P_response = context.get("P_response")
1209            self.assertIsNotNone(P_response)
1210            if P_response == "OK":
1211                successful_writes += 1
1212            else:
1213                failed_writes += 1
1214                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1215
1216            # Read back the register value, ensure it matches the flipped
1217            # value.
1218            if P_response == "OK":
1219                self.reset_test_sequence()
1220                self.test_sequence.add_log_lines([
1221                    p_request,
1222                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1223                ], True)
1224                context = self.expect_gdbremote_sequence()
1225                self.assertIsNotNone(context)
1226
1227                verify_p_response_raw = context.get("p_response")
1228                self.assertIsNotNone(verify_p_response_raw)
1229                verify_bits = unpack_register_hex_unsigned(
1230                    endian, verify_p_response_raw)
1231
1232                if verify_bits != flipped_bits_int:
1233                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1234                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1235                    successful_writes -= 1
1236                    failed_writes += 1
1237
1238        return (successful_writes, failed_writes)
1239
1240    def is_bit_flippable_register(self, reg_info):
1241        if not reg_info:
1242            return False
1243        if not "set" in reg_info:
1244            return False
1245        if reg_info["set"] != "General Purpose Registers":
1246            return False
1247        if ("container-regs" in reg_info) and (
1248                len(reg_info["container-regs"]) > 0):
1249            # Don't try to bit flip registers contained in another register.
1250            return False
1251        if re.match("^.s$", reg_info["name"]):
1252            # This is a 2-letter register name that ends in "s", like a segment register.
1253            # Don't try to bit flip these.
1254            return False
1255        if re.match("^(c|)psr$", reg_info["name"]):
1256            # This is an ARM program status register; don't flip it.
1257            return False
1258        # Okay, this looks fine-enough.
1259        return True
1260
1261    def read_register_values(self, reg_infos, endian, thread_id=None):
1262        self.assertIsNotNone(reg_infos)
1263        values = {}
1264
1265        for reg_info in reg_infos:
1266            # We append a register index when load reg infos so we can work
1267            # with subsets.
1268            reg_index = reg_info.get("lldb_register_index")
1269            self.assertIsNotNone(reg_index)
1270
1271            # Handle thread suffix.
1272            if thread_id:
1273                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1274                    reg_index, thread_id)
1275            else:
1276                p_request = "read packet: $p{:x}#00".format(reg_index)
1277
1278            # Read it with p.
1279            self.reset_test_sequence()
1280            self.test_sequence.add_log_lines([
1281                p_request,
1282                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1283            ], True)
1284            context = self.expect_gdbremote_sequence()
1285            self.assertIsNotNone(context)
1286
1287            # Convert value from target endian to integral.
1288            p_response = context.get("p_response")
1289            self.assertIsNotNone(p_response)
1290            self.assertTrue(len(p_response) > 0)
1291            self.assertFalse(p_response[0] == "E")
1292
1293            values[reg_index] = unpack_register_hex_unsigned(
1294                endian, p_response)
1295
1296        return values
1297
1298    def add_vCont_query_packets(self):
1299        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1300                                          {"direction": "send",
1301                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1302                                           "capture": {2: "vCont_query_response"}},
1303                                          ],
1304                                         True)
1305
1306    def parse_vCont_query_response(self, context):
1307        self.assertIsNotNone(context)
1308        vCont_query_response = context.get("vCont_query_response")
1309
1310        # Handle case of no vCont support at all - in which case the capture
1311        # group will be none or zero length.
1312        if not vCont_query_response or len(vCont_query_response) == 0:
1313            return {}
1314
1315        return {key: 1 for key in vCont_query_response.split(
1316            ";") if key and len(key) > 0}
1317
1318    def count_single_steps_until_true(
1319            self,
1320            thread_id,
1321            predicate,
1322            args,
1323            max_step_count=100,
1324            use_Hc_packet=True,
1325            step_instruction="s"):
1326        """Used by single step test that appears in a few different contexts."""
1327        single_step_count = 0
1328
1329        while single_step_count < max_step_count:
1330            self.assertIsNotNone(thread_id)
1331
1332            # Build the packet for the single step instruction.  We replace
1333            # {thread}, if present, with the thread_id.
1334            step_packet = "read packet: ${}#00".format(
1335                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1336            # print("\nstep_packet created: {}\n".format(step_packet))
1337
1338            # Single step.
1339            self.reset_test_sequence()
1340            if use_Hc_packet:
1341                self.test_sequence.add_log_lines(
1342                    [  # Set the continue thread.
1343                        "read packet: $Hc{0:x}#00".format(thread_id),
1344                        "send packet: $OK#00",
1345                    ], True)
1346            self.test_sequence.add_log_lines([
1347                # Single step.
1348                step_packet,
1349                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1350                # Expect a breakpoint stop report.
1351                {"direction": "send",
1352                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1353                 "capture": {1: "stop_signo",
1354                             2: "stop_thread_id"}},
1355            ], True)
1356            context = self.expect_gdbremote_sequence()
1357            self.assertIsNotNone(context)
1358            self.assertIsNotNone(context.get("stop_signo"))
1359            self.assertEqual(int(context.get("stop_signo"), 16),
1360                             lldbutil.get_signal_number('SIGTRAP'))
1361
1362            single_step_count += 1
1363
1364            # See if the predicate is true.  If so, we're done.
1365            if predicate(args):
1366                return (True, single_step_count)
1367
1368        # The predicate didn't return true within the runaway step count.
1369        return (False, single_step_count)
1370
1371    def g_c1_c2_contents_are(self, args):
1372        """Used by single step test that appears in a few different contexts."""
1373        g_c1_address = args["g_c1_address"]
1374        g_c2_address = args["g_c2_address"]
1375        expected_g_c1 = args["expected_g_c1"]
1376        expected_g_c2 = args["expected_g_c2"]
1377
1378        # Read g_c1 and g_c2 contents.
1379        self.reset_test_sequence()
1380        self.test_sequence.add_log_lines(
1381            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1382             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1383             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1384             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1385            True)
1386
1387        # Run the packet stream.
1388        context = self.expect_gdbremote_sequence()
1389        self.assertIsNotNone(context)
1390
1391        # Check if what we read from inferior memory is what we are expecting.
1392        self.assertIsNotNone(context.get("g_c1_contents"))
1393        self.assertIsNotNone(context.get("g_c2_contents"))
1394
1395        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1396            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1397
1398    def single_step_only_steps_one_instruction(
1399            self, use_Hc_packet=True, step_instruction="s"):
1400        """Used by single step test that appears in a few different contexts."""
1401        # Start up the inferior.
1402        procs = self.prep_debug_monitor_and_inferior(
1403            inferior_args=[
1404                "get-code-address-hex:swap_chars",
1405                "get-data-address-hex:g_c1",
1406                "get-data-address-hex:g_c2",
1407                "sleep:1",
1408                "call-function:swap_chars",
1409                "sleep:5"])
1410
1411        # Run the process
1412        self.test_sequence.add_log_lines(
1413            [  # Start running after initial stop.
1414                "read packet: $c#63",
1415                # Match output line that prints the memory address of the function call entry point.
1416                # Note we require launch-only testing so we can get inferior otuput.
1417                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1418                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1419                # Now stop the inferior.
1420                "read packet: {}".format(chr(3)),
1421                # And wait for the stop notification.
1422                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1423            True)
1424
1425        # Run the packet stream.
1426        context = self.expect_gdbremote_sequence()
1427        self.assertIsNotNone(context)
1428
1429        # Grab the main thread id.
1430        self.assertIsNotNone(context.get("stop_thread_id"))
1431        main_thread_id = int(context.get("stop_thread_id"), 16)
1432
1433        # Grab the function address.
1434        self.assertIsNotNone(context.get("function_address"))
1435        function_address = int(context.get("function_address"), 16)
1436
1437        # Grab the data addresses.
1438        self.assertIsNotNone(context.get("g_c1_address"))
1439        g_c1_address = int(context.get("g_c1_address"), 16)
1440
1441        self.assertIsNotNone(context.get("g_c2_address"))
1442        g_c2_address = int(context.get("g_c2_address"), 16)
1443
1444        # Set a breakpoint at the given address.
1445        if self.getArchitecture().startswith("arm"):
1446            # TODO: Handle case when setting breakpoint in thumb code
1447            BREAKPOINT_KIND = 4
1448        else:
1449            BREAKPOINT_KIND = 1
1450        self.reset_test_sequence()
1451        self.add_set_breakpoint_packets(
1452            function_address,
1453            do_continue=True,
1454            breakpoint_kind=BREAKPOINT_KIND)
1455        context = self.expect_gdbremote_sequence()
1456        self.assertIsNotNone(context)
1457
1458        # Remove the breakpoint.
1459        self.reset_test_sequence()
1460        self.add_remove_breakpoint_packets(
1461            function_address, breakpoint_kind=BREAKPOINT_KIND)
1462        context = self.expect_gdbremote_sequence()
1463        self.assertIsNotNone(context)
1464
1465        # Verify g_c1 and g_c2 match expected initial state.
1466        args = {}
1467        args["g_c1_address"] = g_c1_address
1468        args["g_c2_address"] = g_c2_address
1469        args["expected_g_c1"] = "0"
1470        args["expected_g_c2"] = "1"
1471
1472        self.assertTrue(self.g_c1_c2_contents_are(args))
1473
1474        # Verify we take only a small number of steps to hit the first state.
1475        # Might need to work through function entry prologue code.
1476        args["expected_g_c1"] = "1"
1477        args["expected_g_c2"] = "1"
1478        (state_reached,
1479         step_count) = self.count_single_steps_until_true(main_thread_id,
1480                                                          self.g_c1_c2_contents_are,
1481                                                          args,
1482                                                          max_step_count=25,
1483                                                          use_Hc_packet=use_Hc_packet,
1484                                                          step_instruction=step_instruction)
1485        self.assertTrue(state_reached)
1486
1487        # Verify we hit the next state.
1488        args["expected_g_c1"] = "1"
1489        args["expected_g_c2"] = "0"
1490        (state_reached,
1491         step_count) = self.count_single_steps_until_true(main_thread_id,
1492                                                          self.g_c1_c2_contents_are,
1493                                                          args,
1494                                                          max_step_count=5,
1495                                                          use_Hc_packet=use_Hc_packet,
1496                                                          step_instruction=step_instruction)
1497        self.assertTrue(state_reached)
1498        expected_step_count = 1
1499        arch = self.getArchitecture()
1500
1501        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1502        # of variable value
1503        if re.match("mips", arch):
1504            expected_step_count = 3
1505        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1506        # variable value
1507        if re.match("s390x", arch):
1508            expected_step_count = 2
1509        # ARM64 requires "4" instructions: 2 to compute the address (adrp,
1510        # add), one to materialize the constant (mov) and the store. Once
1511        # addresses and constants are materialized, only one instruction is
1512        # needed.
1513        if re.match("arm64", arch):
1514            before_materialization_step_count = 4
1515            after_matrialization_step_count = 1
1516            self.assertIn(step_count, [before_materialization_step_count,
1517                                       after_matrialization_step_count])
1518            expected_step_count = after_matrialization_step_count
1519        else:
1520            self.assertEqual(step_count, expected_step_count)
1521
1522        # Verify we hit the next state.
1523        args["expected_g_c1"] = "0"
1524        args["expected_g_c2"] = "0"
1525        (state_reached,
1526         step_count) = self.count_single_steps_until_true(main_thread_id,
1527                                                          self.g_c1_c2_contents_are,
1528                                                          args,
1529                                                          max_step_count=5,
1530                                                          use_Hc_packet=use_Hc_packet,
1531                                                          step_instruction=step_instruction)
1532        self.assertTrue(state_reached)
1533        self.assertEqual(step_count, expected_step_count)
1534
1535        # Verify we hit the next state.
1536        args["expected_g_c1"] = "0"
1537        args["expected_g_c2"] = "1"
1538        (state_reached,
1539         step_count) = self.count_single_steps_until_true(main_thread_id,
1540                                                          self.g_c1_c2_contents_are,
1541                                                          args,
1542                                                          max_step_count=5,
1543                                                          use_Hc_packet=use_Hc_packet,
1544                                                          step_instruction=step_instruction)
1545        self.assertTrue(state_reached)
1546        self.assertEqual(step_count, expected_step_count)
1547
1548    def maybe_strict_output_regex(self, regex):
1549        return '.*' + regex + \
1550            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1551
1552    def install_and_create_launch_args(self):
1553        exe_path = self.getBuildArtifact("a.out")
1554        if not lldb.remote_platform:
1555            return [exe_path]
1556        remote_path = lldbutil.append_to_process_working_directory(self,
1557            os.path.basename(exe_path))
1558        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1559        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1560                                           remote_file_spec)
1561        if err.Fail():
1562            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1563                            (exe_path, remote_path, err))
1564        return [remote_path]
1565