1"""
2Base class for gdb-remote test cases.
3"""
4
5from __future__ import division, print_function
6
7
8import errno
9import os
10import os.path
11import random
12import re
13import select
14import socket
15import subprocess
16import sys
17import tempfile
18import time
19from lldbsuite.test import configuration
20from lldbsuite.test.lldbtest import *
21from lldbsuite.support import seven
22from lldbgdbserverutils import *
23import logging
24
25
26class _ConnectionRefused(IOError):
27    pass
28
29
30class GdbRemoteTestCaseFactory(type):
31
32    def __new__(cls, name, bases, attrs):
33        newattrs = {}
34        for attrname, attrvalue in attrs.items():
35            if not attrname.startswith("test"):
36                newattrs[attrname] = attrvalue
37                continue
38
39            # If any debug server categories were explicitly tagged, assume
40            # that list to be authoritative. If none were specified, try
41            # all of them.
42            all_categories = set(["debugserver", "llgs"])
43            categories = set(
44                getattr(attrvalue, "categories", [])) & all_categories
45            if not categories:
46                categories = all_categories
47
48            for cat in categories:
49                @decorators.add_test_categories([cat])
50                @wraps(attrvalue)
51                def test_method(self, attrvalue=attrvalue):
52                    return attrvalue(self)
53
54                method_name = attrname + "_" + cat
55                test_method.__name__ = method_name
56                test_method.debug_server = cat
57                newattrs[method_name] = test_method
58
59        return super(GdbRemoteTestCaseFactory, cls).__new__(
60                cls, name, bases, newattrs)
61
62@add_metaclass(GdbRemoteTestCaseFactory)
63class GdbRemoteTestCaseBase(Base):
64
65    # Default time out in seconds. The timeout is increased tenfold under Asan.
66    DEFAULT_TIMEOUT =  20 * (10 if ('ASAN_OPTIONS' in os.environ) else 1)
67    # Default sleep time in seconds. The sleep time is doubled under Asan.
68    DEFAULT_SLEEP   =  5  * (2  if ('ASAN_OPTIONS' in os.environ) else 1)
69
70    _GDBREMOTE_KILL_PACKET = b"$k#6b"
71
72    # Start the inferior separately, attach to the inferior on the stub
73    # command line.
74    _STARTUP_ATTACH = "attach"
75    # Start the inferior separately, start the stub without attaching, allow
76    # the test to attach to the inferior however it wants (e.g. $vAttach;pid).
77    _STARTUP_ATTACH_MANUALLY = "attach_manually"
78    # Start the stub, and launch the inferior with an $A packet via the
79    # initial packet stream.
80    _STARTUP_LAUNCH = "launch"
81
82    # GDB Signal numbers that are not target-specific used for common
83    # exceptions
84    TARGET_EXC_BAD_ACCESS = 0x91
85    TARGET_EXC_BAD_INSTRUCTION = 0x92
86    TARGET_EXC_ARITHMETIC = 0x93
87    TARGET_EXC_EMULATION = 0x94
88    TARGET_EXC_SOFTWARE = 0x95
89    TARGET_EXC_BREAKPOINT = 0x96
90
91    _verbose_log_handler = None
92    _log_formatter = logging.Formatter(
93        fmt='%(asctime)-15s %(levelname)-8s %(message)s')
94
95    def setUpBaseLogging(self):
96        self.logger = logging.getLogger(__name__)
97
98        if len(self.logger.handlers) > 0:
99            return  # We have set up this handler already
100
101        self.logger.propagate = False
102        self.logger.setLevel(logging.DEBUG)
103
104        # log all warnings to stderr
105        handler = logging.StreamHandler()
106        handler.setLevel(logging.WARNING)
107        handler.setFormatter(self._log_formatter)
108        self.logger.addHandler(handler)
109
110    def isVerboseLoggingRequested(self):
111        # We will report our detailed logs if the user requested that the "gdb-remote" channel is
112        # logged.
113        return any(("gdb-remote" in channel)
114                   for channel in lldbtest_config.channels)
115
116    def getDebugServer(self):
117        method = getattr(self, self.testMethodName)
118        return getattr(method, "debug_server", None)
119
120    def setUp(self):
121        super(GdbRemoteTestCaseBase, self).setUp()
122
123        self.setUpBaseLogging()
124        self.debug_monitor_extra_args = []
125
126        if self.isVerboseLoggingRequested():
127            # If requested, full logs go to a log file
128            self._verbose_log_handler = logging.FileHandler(
129                self.getLogBasenameForCurrentTest() + "-host.log")
130            self._verbose_log_handler.setFormatter(self._log_formatter)
131            self._verbose_log_handler.setLevel(logging.DEBUG)
132            self.logger.addHandler(self._verbose_log_handler)
133
134        self.test_sequence = GdbRemoteTestSequence(self.logger)
135        self.set_inferior_startup_launch()
136        self.port = self.get_next_port()
137        self.stub_sends_two_stop_notifications_on_kill = False
138        if configuration.lldb_platform_url:
139            if configuration.lldb_platform_url.startswith('unix-'):
140                url_pattern = '(.+)://\[?(.+?)\]?/.*'
141            else:
142                url_pattern = '(.+)://(.+):\d+'
143            scheme, host = re.match(
144                url_pattern, configuration.lldb_platform_url).groups()
145            if configuration.lldb_platform_name == 'remote-android' and host != 'localhost':
146                self.stub_device = host
147                self.stub_hostname = 'localhost'
148            else:
149                self.stub_device = None
150                self.stub_hostname = host
151        else:
152            self.stub_hostname = "localhost"
153
154        debug_server = self.getDebugServer()
155        if debug_server == "debugserver":
156            self._init_debugserver_test()
157        else:
158            self._init_llgs_test()
159
160    def tearDown(self):
161        self.logger.removeHandler(self._verbose_log_handler)
162        self._verbose_log_handler = None
163        TestBase.tearDown(self)
164
165    def build(self, *args, **kwargs):
166        self.buildDefault(*args, **kwargs)
167
168    def getLocalServerLogFile(self):
169        return self.getLogBasenameForCurrentTest() + "-server.log"
170
171    def setUpServerLogging(self, is_llgs):
172        if len(lldbtest_config.channels) == 0:
173            return  # No logging requested
174
175        if lldb.remote_platform:
176            log_file = lldbutil.join_remote_paths(
177                lldb.remote_platform.GetWorkingDirectory(), "server.log")
178        else:
179            log_file = self.getLocalServerLogFile()
180
181        if is_llgs:
182            self.debug_monitor_extra_args.append("--log-file=" + log_file)
183            self.debug_monitor_extra_args.append(
184                "--log-channels={}".format(":".join(lldbtest_config.channels)))
185        else:
186            self.debug_monitor_extra_args = [
187                "--log-file=" + log_file, "--log-flags=0x800000"]
188
189    def get_next_port(self):
190        return 12000 + random.randint(0, 3999)
191
192    def reset_test_sequence(self):
193        self.test_sequence = GdbRemoteTestSequence(self.logger)
194
195
196    def _init_llgs_test(self):
197        reverse_connect = True
198        if lldb.remote_platform:
199            # Reverse connections may be tricky due to firewalls/NATs.
200            reverse_connect = False
201
202            # FIXME: This is extremely linux-oriented
203
204            # Grab the ppid from /proc/[shell pid]/stat
205            err, retcode, shell_stat = self.run_platform_command(
206                "cat /proc/$$/stat")
207            self.assertTrue(
208                err.Success() and retcode == 0,
209                "Failed to read file /proc/$$/stat: %s, retcode: %d" %
210                (err.GetCString(),
211                 retcode))
212
213            # [pid] ([executable]) [state] [*ppid*]
214            pid = re.match(r"^\d+ \(.+\) . (\d+)", shell_stat).group(1)
215            err, retcode, ls_output = self.run_platform_command(
216                "ls -l /proc/%s/exe" % pid)
217            self.assertTrue(
218                err.Success() and retcode == 0,
219                "Failed to read file /proc/%s/exe: %s, retcode: %d" %
220                (pid,
221                 err.GetCString(),
222                 retcode))
223            exe = ls_output.split()[-1]
224
225            # If the binary has been deleted, the link name has " (deleted)" appended.
226            # Remove if it's there.
227            self.debug_monitor_exe = re.sub(r' \(deleted\)$', '', exe)
228        else:
229            self.debug_monitor_exe = get_lldb_server_exe()
230
231        self.debug_monitor_extra_args = ["gdbserver"]
232        self.setUpServerLogging(is_llgs=True)
233
234        self.reverse_connect = reverse_connect
235
236    def _init_debugserver_test(self):
237        self.debug_monitor_exe = get_debugserver_exe()
238        self.setUpServerLogging(is_llgs=False)
239        self.reverse_connect = True
240
241        # The debugserver stub has a race on handling the 'k' command, so it sends an X09 right away, then sends the real X notification
242        # when the process truly dies.
243        self.stub_sends_two_stop_notifications_on_kill = True
244
245    def forward_adb_port(self, source, target, direction, device):
246        adb = ['adb'] + (['-s', device] if device else []) + [direction]
247
248        def remove_port_forward():
249            subprocess.call(adb + ["--remove", "tcp:%d" % source])
250
251        subprocess.call(adb + ["tcp:%d" % source, "tcp:%d" % target])
252        self.addTearDownHook(remove_port_forward)
253
254    def _verify_socket(self, sock):
255        # Normally, when the remote stub is not ready, we will get ECONNREFUSED during the
256        # connect() attempt. However, due to the way how ADB forwarding works, on android targets
257        # the connect() will always be successful, but the connection will be immediately dropped
258        # if ADB could not connect on the remote side. This function tries to detect this
259        # situation, and report it as "connection refused" so that the upper layers attempt the
260        # connection again.
261        triple = self.dbg.GetSelectedPlatform().GetTriple()
262        if not re.match(".*-.*-.*-android", triple):
263            return  # Not android.
264        can_read, _, _ = select.select([sock], [], [], 0.1)
265        if sock not in can_read:
266            return  # Data is not available, but the connection is alive.
267        if len(sock.recv(1, socket.MSG_PEEK)) == 0:
268            raise _ConnectionRefused()  # Got EOF, connection dropped.
269
270    def create_socket(self):
271        try:
272            sock = socket.socket(family=socket.AF_INET)
273        except OSError as e:
274            if e.errno != errno.EAFNOSUPPORT:
275                raise
276            sock = socket.socket(family=socket.AF_INET6)
277
278        logger = self.logger
279
280        triple = self.dbg.GetSelectedPlatform().GetTriple()
281        if re.match(".*-.*-.*-android", triple):
282            self.forward_adb_port(
283                self.port,
284                self.port,
285                "forward",
286                self.stub_device)
287
288        logger.info(
289            "Connecting to debug monitor on %s:%d",
290            self.stub_hostname,
291            self.port)
292        connect_info = (self.stub_hostname, self.port)
293        try:
294            sock.connect(connect_info)
295        except socket.error as serr:
296            if serr.errno == errno.ECONNREFUSED:
297                raise _ConnectionRefused()
298            raise serr
299
300        def shutdown_socket():
301            if sock:
302                try:
303                    # send the kill packet so lldb-server shuts down gracefully
304                    sock.sendall(GdbRemoteTestCaseBase._GDBREMOTE_KILL_PACKET)
305                except:
306                    logger.warning(
307                        "failed to send kill packet to debug monitor: {}; ignoring".format(
308                            sys.exc_info()[0]))
309
310                try:
311                    sock.close()
312                except:
313                    logger.warning(
314                        "failed to close socket to debug monitor: {}; ignoring".format(
315                            sys.exc_info()[0]))
316
317        self.addTearDownHook(shutdown_socket)
318
319        self._verify_socket(sock)
320
321        return sock
322
323    def set_inferior_startup_launch(self):
324        self._inferior_startup = self._STARTUP_LAUNCH
325
326    def set_inferior_startup_attach(self):
327        self._inferior_startup = self._STARTUP_ATTACH
328
329    def set_inferior_startup_attach_manually(self):
330        self._inferior_startup = self._STARTUP_ATTACH_MANUALLY
331
332    def get_debug_monitor_command_line_args(self, attach_pid=None):
333        commandline_args = self.debug_monitor_extra_args
334        if attach_pid:
335            commandline_args += ["--attach=%d" % attach_pid]
336        if self.reverse_connect:
337            commandline_args += ["--reverse-connect", self.connect_address]
338        else:
339            if lldb.remote_platform:
340                commandline_args += ["*:{}".format(self.port)]
341            else:
342                commandline_args += ["localhost:{}".format(self.port)]
343
344        return commandline_args
345
346    def get_target_byte_order(self):
347        inferior_exe_path = self.getBuildArtifact("a.out")
348        target = self.dbg.CreateTarget(inferior_exe_path)
349        return target.GetByteOrder()
350
351    def launch_debug_monitor(self, attach_pid=None, logfile=None):
352        if self.reverse_connect:
353            family, type, proto, _, addr = socket.getaddrinfo("localhost", 0, proto=socket.IPPROTO_TCP)[0]
354            sock = socket.socket(family, type, proto)
355            sock.settimeout(self.DEFAULT_TIMEOUT)
356
357            sock.bind(addr)
358            sock.listen(1)
359            addr = sock.getsockname()
360            self.connect_address = "[{}]:{}".format(*addr)
361
362
363        # Create the command line.
364        commandline_args = self.get_debug_monitor_command_line_args(
365            attach_pid=attach_pid)
366
367        # Start the server.
368        server = self.spawnSubprocess(
369            self.debug_monitor_exe,
370            commandline_args,
371            install_remote=False)
372        self.assertIsNotNone(server)
373
374        if self.reverse_connect:
375            self.sock = sock.accept()[0]
376            self.sock.settimeout(self.DEFAULT_TIMEOUT)
377
378        return server
379
380    def connect_to_debug_monitor(self, attach_pid=None):
381        if self.reverse_connect:
382            # Create the stub.
383            server = self.launch_debug_monitor(attach_pid=attach_pid)
384            self.assertIsNotNone(server)
385
386            # Schedule debug monitor to be shut down during teardown.
387            logger = self.logger
388
389            self._server = Server(self.sock, server)
390            return server
391
392        # We're using a random port algorithm to try not to collide with other ports,
393        # and retry a max # times.
394        attempts = 0
395        MAX_ATTEMPTS = 20
396
397        while attempts < MAX_ATTEMPTS:
398            server = self.launch_debug_monitor(attach_pid=attach_pid)
399
400            # Schedule debug monitor to be shut down during teardown.
401            logger = self.logger
402
403            connect_attemps = 0
404            MAX_CONNECT_ATTEMPTS = 10
405
406            while connect_attemps < MAX_CONNECT_ATTEMPTS:
407                # Create a socket to talk to the server
408                try:
409                    logger.info("Connect attempt %d", connect_attemps + 1)
410                    self.sock = self.create_socket()
411                    self._server = Server(self.sock, server)
412                    return server
413                except _ConnectionRefused as serr:
414                    # Ignore, and try again.
415                    pass
416                time.sleep(0.5)
417                connect_attemps += 1
418
419            # We should close the server here to be safe.
420            server.terminate()
421
422            # Increment attempts.
423            print(
424                "connect to debug monitor on port %d failed, attempt #%d of %d" %
425                (self.port, attempts + 1, MAX_ATTEMPTS))
426            attempts += 1
427
428            # And wait a random length of time before next attempt, to avoid
429            # collisions.
430            time.sleep(random.randint(1, 5))
431
432            # Now grab a new port number.
433            self.port = self.get_next_port()
434
435        raise Exception(
436            "failed to create a socket to the launched debug monitor after %d tries" %
437            attempts)
438
439    def launch_process_for_attach(
440            self,
441            inferior_args=None,
442            sleep_seconds=3,
443            exe_path=None):
444        # We're going to start a child process that the debug monitor stub can later attach to.
445        # This process needs to be started so that it just hangs around for a while.  We'll
446        # have it sleep.
447        if not exe_path:
448            exe_path = self.getBuildArtifact("a.out")
449
450        args = []
451        if inferior_args:
452            args.extend(inferior_args)
453        if sleep_seconds:
454            args.append("sleep:%d" % sleep_seconds)
455
456        return self.spawnSubprocess(exe_path, args)
457
458    def prep_debug_monitor_and_inferior(
459            self,
460            inferior_args=None,
461            inferior_sleep_seconds=3,
462            inferior_exe_path=None,
463            inferior_env=None):
464        """Prep the debug monitor, the inferior, and the expected packet stream.
465
466        Handle the separate cases of using the debug monitor in attach-to-inferior mode
467        and in launch-inferior mode.
468
469        For attach-to-inferior mode, the inferior process is first started, then
470        the debug monitor is started in attach to pid mode (using --attach on the
471        stub command line), and the no-ack-mode setup is appended to the packet
472        stream.  The packet stream is not yet executed, ready to have more expected
473        packet entries added to it.
474
475        For launch-inferior mode, the stub is first started, then no ack mode is
476        setup on the expected packet stream, then the verified launch packets are added
477        to the expected socket stream.  The packet stream is not yet executed, ready
478        to have more expected packet entries added to it.
479
480        The return value is:
481        {inferior:<inferior>, server:<server>}
482        """
483        inferior = None
484        attach_pid = None
485
486        if self._inferior_startup == self._STARTUP_ATTACH or self._inferior_startup == self._STARTUP_ATTACH_MANUALLY:
487            # Launch the process that we'll use as the inferior.
488            inferior = self.launch_process_for_attach(
489                inferior_args=inferior_args,
490                sleep_seconds=inferior_sleep_seconds,
491                exe_path=inferior_exe_path)
492            self.assertIsNotNone(inferior)
493            self.assertTrue(inferior.pid > 0)
494            if self._inferior_startup == self._STARTUP_ATTACH:
495                # In this case, we want the stub to attach via the command
496                # line, so set the command line attach pid here.
497                attach_pid = inferior.pid
498
499        if self._inferior_startup == self._STARTUP_LAUNCH:
500            # Build launch args
501            if not inferior_exe_path:
502                inferior_exe_path = self.getBuildArtifact("a.out")
503
504            if lldb.remote_platform:
505                remote_path = lldbutil.append_to_process_working_directory(self,
506                    os.path.basename(inferior_exe_path))
507                remote_file_spec = lldb.SBFileSpec(remote_path, False)
508                err = lldb.remote_platform.Install(lldb.SBFileSpec(
509                    inferior_exe_path, True), remote_file_spec)
510                if err.Fail():
511                    raise Exception(
512                        "remote_platform.Install('%s', '%s') failed: %s" %
513                        (inferior_exe_path, remote_path, err))
514                inferior_exe_path = remote_path
515
516            launch_args = [inferior_exe_path]
517            if inferior_args:
518                launch_args.extend(inferior_args)
519
520        # Launch the debug monitor stub, attaching to the inferior.
521        server = self.connect_to_debug_monitor(attach_pid=attach_pid)
522        self.assertIsNotNone(server)
523
524        self.do_handshake()
525
526        # Build the expected protocol stream
527        if inferior_env:
528            for name, value in inferior_env.items():
529                self.add_set_environment_packets(name, value)
530        if self._inferior_startup == self._STARTUP_LAUNCH:
531            self.add_verified_launch_packets(launch_args)
532
533        return {"inferior": inferior, "server": server}
534
535    def do_handshake(self):
536        server = self._server
537        server.send_ack()
538        server.send_packet(b"QStartNoAckMode")
539        self.assertEqual(server.get_normal_packet(), b"+")
540        self.assertEqual(server.get_normal_packet(), b"OK")
541        server.send_ack()
542
543    def add_verified_launch_packets(self, launch_args):
544        self.test_sequence.add_log_lines(
545            ["read packet: %s" % build_gdbremote_A_packet(launch_args),
546             "send packet: $OK#00",
547             "read packet: $qLaunchSuccess#a5",
548             "send packet: $OK#00"],
549            True)
550
551    def add_thread_suffix_request_packets(self):
552        self.test_sequence.add_log_lines(
553            ["read packet: $QThreadSuffixSupported#e4",
554             "send packet: $OK#00",
555             ], True)
556
557    def add_process_info_collection_packets(self):
558        self.test_sequence.add_log_lines(
559            ["read packet: $qProcessInfo#dc",
560             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "process_info_raw"}}],
561            True)
562
563    def add_set_environment_packets(self, name, value):
564        self.test_sequence.add_log_lines(
565            ["read packet: $QEnvironment:" + name + "=" + value + "#00",
566             "send packet: $OK#00",
567             ], True)
568
569    _KNOWN_PROCESS_INFO_KEYS = [
570        "pid",
571        "parent-pid",
572        "real-uid",
573        "real-gid",
574        "effective-uid",
575        "effective-gid",
576        "cputype",
577        "cpusubtype",
578        "ostype",
579        "triple",
580        "vendor",
581        "endian",
582        "elf_abi",
583        "ptrsize"
584    ]
585
586    def parse_process_info_response(self, context):
587        # Ensure we have a process info response.
588        self.assertIsNotNone(context)
589        process_info_raw = context.get("process_info_raw")
590        self.assertIsNotNone(process_info_raw)
591
592        # Pull out key:value; pairs.
593        process_info_dict = {
594            match.group(1): match.group(2) for match in re.finditer(
595                r"([^:]+):([^;]+);", process_info_raw)}
596
597        # Validate keys are known.
598        for (key, val) in list(process_info_dict.items()):
599            self.assertTrue(key in self._KNOWN_PROCESS_INFO_KEYS)
600            self.assertIsNotNone(val)
601
602        return process_info_dict
603
604    def add_register_info_collection_packets(self):
605        self.test_sequence.add_log_lines(
606            [{"type": "multi_response", "query": "qRegisterInfo", "append_iteration_suffix": True,
607                "end_regex": re.compile(r"^\$(E\d+)?#[0-9a-fA-F]{2}$"),
608                "save_key": "reg_info_responses"}],
609            True)
610
611    def parse_register_info_packets(self, context):
612        """Return an array of register info dictionaries, one per register info."""
613        reg_info_responses = context.get("reg_info_responses")
614        self.assertIsNotNone(reg_info_responses)
615
616        # Parse register infos.
617        return [parse_reg_info_response(reg_info_response)
618                for reg_info_response in reg_info_responses]
619
620    def expect_gdbremote_sequence(self):
621        return expect_lldb_gdbserver_replay(
622            self,
623            self._server,
624            self.test_sequence,
625            self.DEFAULT_TIMEOUT * len(self.test_sequence),
626            self.logger)
627
628    _KNOWN_REGINFO_KEYS = [
629        "name",
630        "alt-name",
631        "bitsize",
632        "offset",
633        "encoding",
634        "format",
635        "set",
636        "gcc",
637        "ehframe",
638        "dwarf",
639        "generic",
640        "container-regs",
641        "invalidate-regs",
642        "dynamic_size_dwarf_expr_bytes",
643        "dynamic_size_dwarf_len"
644    ]
645
646    def assert_valid_reg_info(self, reg_info):
647        # Assert we know about all the reginfo keys parsed.
648        for key in reg_info:
649            self.assertTrue(key in self._KNOWN_REGINFO_KEYS)
650
651        # Check the bare-minimum expected set of register info keys.
652        self.assertTrue("name" in reg_info)
653        self.assertTrue("bitsize" in reg_info)
654
655        if not self.getArchitecture() == 'aarch64':
656            self.assertTrue("offset" in reg_info)
657
658        self.assertTrue("encoding" in reg_info)
659        self.assertTrue("format" in reg_info)
660
661    def find_pc_reg_info(self, reg_infos):
662        lldb_reg_index = 0
663        for reg_info in reg_infos:
664            if ("generic" in reg_info) and (reg_info["generic"] == "pc"):
665                return (lldb_reg_index, reg_info)
666            lldb_reg_index += 1
667
668        return (None, None)
669
670    def add_lldb_register_index(self, reg_infos):
671        """Add a "lldb_register_index" key containing the 0-baed index of each reg_infos entry.
672
673        We'll use this when we want to call packets like P/p with a register index but do so
674        on only a subset of the full register info set.
675        """
676        self.assertIsNotNone(reg_infos)
677
678        reg_index = 0
679        for reg_info in reg_infos:
680            reg_info["lldb_register_index"] = reg_index
681            reg_index += 1
682
683    def add_query_memory_region_packets(self, address):
684        self.test_sequence.add_log_lines(
685            ["read packet: $qMemoryRegionInfo:{0:x}#00".format(address),
686             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "memory_region_response"}}],
687            True)
688
689    def parse_key_val_dict(self, key_val_text, allow_dupes=True):
690        self.assertIsNotNone(key_val_text)
691        kv_dict = {}
692        for match in re.finditer(r";?([^:]+):([^;]+)", key_val_text):
693            key = match.group(1)
694            val = match.group(2)
695            if key in kv_dict:
696                if allow_dupes:
697                    if isinstance(kv_dict[key], list):
698                        kv_dict[key].append(val)
699                    else:
700                        # Promote to list
701                        kv_dict[key] = [kv_dict[key], val]
702                else:
703                    self.fail(
704                        "key '{}' already present when attempting to add value '{}' (text='{}', dict={})".format(
705                            key, val, key_val_text, kv_dict))
706            else:
707                kv_dict[key] = val
708        return kv_dict
709
710    def parse_memory_region_packet(self, context):
711        # Ensure we have a context.
712        self.assertIsNotNone(context.get("memory_region_response"))
713
714        # Pull out key:value; pairs.
715        mem_region_dict = self.parse_key_val_dict(
716            context.get("memory_region_response"))
717
718        # Validate keys are known.
719        for (key, val) in list(mem_region_dict.items()):
720            self.assertIn(key,
721                ["start",
722                 "size",
723                 "permissions",
724                 "flags",
725                 "name",
726                 "error",
727                 "dirty-pages"])
728            self.assertIsNotNone(val)
729
730        mem_region_dict["name"] = seven.unhexlify(mem_region_dict.get("name", ""))
731        # Return the dictionary of key-value pairs for the memory region.
732        return mem_region_dict
733
734    def assert_address_within_memory_region(
735            self, test_address, mem_region_dict):
736        self.assertIsNotNone(mem_region_dict)
737        self.assertTrue("start" in mem_region_dict)
738        self.assertTrue("size" in mem_region_dict)
739
740        range_start = int(mem_region_dict["start"], 16)
741        range_size = int(mem_region_dict["size"], 16)
742        range_end = range_start + range_size
743
744        if test_address < range_start:
745            self.fail(
746                "address 0x{0:x} comes before range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
747                    test_address,
748                    range_start,
749                    range_end,
750                    range_size))
751        elif test_address >= range_end:
752            self.fail(
753                "address 0x{0:x} comes after range 0x{1:x} - 0x{2:x} (size 0x{3:x})".format(
754                    test_address,
755                    range_start,
756                    range_end,
757                    range_size))
758
759    def add_threadinfo_collection_packets(self):
760        self.test_sequence.add_log_lines(
761            [{"type": "multi_response", "first_query": "qfThreadInfo", "next_query": "qsThreadInfo",
762                "append_iteration_suffix": False, "end_regex": re.compile(r"^\$(l)?#[0-9a-fA-F]{2}$"),
763                "save_key": "threadinfo_responses"}],
764            True)
765
766    def parse_threadinfo_packets(self, context):
767        """Return an array of thread ids (decimal ints), one per thread."""
768        threadinfo_responses = context.get("threadinfo_responses")
769        self.assertIsNotNone(threadinfo_responses)
770
771        thread_ids = []
772        for threadinfo_response in threadinfo_responses:
773            new_thread_infos = parse_threadinfo_response(threadinfo_response)
774            thread_ids.extend(new_thread_infos)
775        return thread_ids
776
777    def wait_for_thread_count(self, thread_count):
778        start_time = time.time()
779        timeout_time = start_time + self.DEFAULT_TIMEOUT
780
781        actual_thread_count = 0
782        while actual_thread_count < thread_count:
783            self.reset_test_sequence()
784            self.add_threadinfo_collection_packets()
785
786            context = self.expect_gdbremote_sequence()
787            self.assertIsNotNone(context)
788
789            threads = self.parse_threadinfo_packets(context)
790            self.assertIsNotNone(threads)
791
792            actual_thread_count = len(threads)
793
794            if time.time() > timeout_time:
795                raise Exception(
796                    'timed out after {} seconds while waiting for theads: waiting for at least {} threads, found {}'.format(
797                        self.DEFAULT_TIMEOUT, thread_count, actual_thread_count))
798
799        return threads
800
801    def add_set_breakpoint_packets(
802            self,
803            address,
804            z_packet_type=0,
805            do_continue=True,
806            breakpoint_kind=1):
807        self.test_sequence.add_log_lines(
808            [  # Set the breakpoint.
809                "read packet: $Z{2},{0:x},{1}#00".format(
810                    address, breakpoint_kind, z_packet_type),
811                # Verify the stub could set it.
812                "send packet: $OK#00",
813            ], True)
814
815        if (do_continue):
816            self.test_sequence.add_log_lines(
817                [  # Continue the inferior.
818                    "read packet: $c#63",
819                    # Expect a breakpoint stop report.
820                    {"direction": "send",
821                     "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
822                     "capture": {1: "stop_signo",
823                                 2: "stop_thread_id"}},
824                ], True)
825
826    def add_remove_breakpoint_packets(
827            self,
828            address,
829            z_packet_type=0,
830            breakpoint_kind=1):
831        self.test_sequence.add_log_lines(
832            [  # Remove the breakpoint.
833                "read packet: $z{2},{0:x},{1}#00".format(
834                    address, breakpoint_kind, z_packet_type),
835                # Verify the stub could unset it.
836                "send packet: $OK#00",
837            ], True)
838
839    def add_qSupported_packets(self, client_features=[]):
840        features = ''.join(';' + x for x in client_features)
841        self.test_sequence.add_log_lines(
842            ["read packet: $qSupported{}#00".format(features),
843             {"direction": "send", "regex": r"^\$(.*)#[0-9a-fA-F]{2}", "capture": {1: "qSupported_response"}},
844             ], True)
845
846    _KNOWN_QSUPPORTED_STUB_FEATURES = [
847        "augmented-libraries-svr4-read",
848        "PacketSize",
849        "QStartNoAckMode",
850        "QThreadSuffixSupported",
851        "QListThreadsInStopReply",
852        "qXfer:auxv:read",
853        "qXfer:libraries:read",
854        "qXfer:libraries-svr4:read",
855        "qXfer:features:read",
856        "qEcho",
857        "QPassSignals",
858        "multiprocess",
859        "fork-events",
860        "vfork-events",
861        "memory-tagging",
862    ]
863
864    def parse_qSupported_response(self, context):
865        self.assertIsNotNone(context)
866
867        raw_response = context.get("qSupported_response")
868        self.assertIsNotNone(raw_response)
869
870        # For values with key=val, the dict key and vals are set as expected.  For feature+, feature- and feature?, the
871        # +,-,? is stripped from the key and set as the value.
872        supported_dict = {}
873        for match in re.finditer(r";?([^=;]+)(=([^;]+))?", raw_response):
874            key = match.group(1)
875            val = match.group(3)
876
877            # key=val: store as is
878            if val and len(val) > 0:
879                supported_dict[key] = val
880            else:
881                if len(key) < 2:
882                    raise Exception(
883                        "singular stub feature is too short: must be stub_feature{+,-,?}")
884                supported_type = key[-1]
885                key = key[:-1]
886                if not supported_type in ["+", "-", "?"]:
887                    raise Exception(
888                        "malformed stub feature: final character {} not in expected set (+,-,?)".format(supported_type))
889                supported_dict[key] = supported_type
890            # Ensure we know the supported element
891            if key not in self._KNOWN_QSUPPORTED_STUB_FEATURES:
892                raise Exception(
893                    "unknown qSupported stub feature reported: %s" %
894                    key)
895
896        return supported_dict
897
898    def run_process_then_stop(self, run_seconds=1):
899        # Tell the stub to continue.
900        self.test_sequence.add_log_lines(
901            ["read packet: $vCont;c#a8"],
902            True)
903        context = self.expect_gdbremote_sequence()
904
905        # Wait for run_seconds.
906        time.sleep(run_seconds)
907
908        # Send an interrupt, capture a T response.
909        self.reset_test_sequence()
910        self.test_sequence.add_log_lines(
911            ["read packet: {}".format(chr(3)),
912             {"direction": "send", "regex": r"^\$T([0-9a-fA-F]+)([^#]+)#[0-9a-fA-F]{2}$", "capture": {1: "stop_result"}}],
913            True)
914        context = self.expect_gdbremote_sequence()
915        self.assertIsNotNone(context)
916        self.assertIsNotNone(context.get("stop_result"))
917
918        return context
919
920    def continue_process_and_wait_for_stop(self):
921        self.test_sequence.add_log_lines(
922            [
923                "read packet: $vCont;c#a8",
924                {
925                    "direction": "send",
926                    "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
927                    "capture": {1: "stop_signo", 2: "stop_key_val_text"},
928                },
929            ],
930            True,
931        )
932        context = self.expect_gdbremote_sequence()
933        self.assertIsNotNone(context)
934        return self.parse_interrupt_packets(context)
935
936    def select_modifiable_register(self, reg_infos):
937        """Find a register that can be read/written freely."""
938        PREFERRED_REGISTER_NAMES = set(["rax", ])
939
940        # First check for the first register from the preferred register name
941        # set.
942        alternative_register_index = None
943
944        self.assertIsNotNone(reg_infos)
945        for reg_info in reg_infos:
946            if ("name" in reg_info) and (
947                    reg_info["name"] in PREFERRED_REGISTER_NAMES):
948                # We found a preferred register.  Use it.
949                return reg_info["lldb_register_index"]
950            if ("generic" in reg_info) and (reg_info["generic"] == "fp" or
951                    reg_info["generic"] == "arg1"):
952                # A frame pointer or first arg register will do as a
953                # register to modify temporarily.
954                alternative_register_index = reg_info["lldb_register_index"]
955
956        # We didn't find a preferred register.  Return whatever alternative register
957        # we found, if any.
958        return alternative_register_index
959
960    def extract_registers_from_stop_notification(self, stop_key_vals_text):
961        self.assertIsNotNone(stop_key_vals_text)
962        kv_dict = self.parse_key_val_dict(stop_key_vals_text)
963
964        registers = {}
965        for (key, val) in list(kv_dict.items()):
966            if re.match(r"^[0-9a-fA-F]+$", key):
967                registers[int(key, 16)] = val
968        return registers
969
970    def gather_register_infos(self):
971        self.reset_test_sequence()
972        self.add_register_info_collection_packets()
973
974        context = self.expect_gdbremote_sequence()
975        self.assertIsNotNone(context)
976
977        reg_infos = self.parse_register_info_packets(context)
978        self.assertIsNotNone(reg_infos)
979        self.add_lldb_register_index(reg_infos)
980
981        return reg_infos
982
983    def find_generic_register_with_name(self, reg_infos, generic_name):
984        self.assertIsNotNone(reg_infos)
985        for reg_info in reg_infos:
986            if ("generic" in reg_info) and (
987                    reg_info["generic"] == generic_name):
988                return reg_info
989        return None
990
991    def find_register_with_name_and_dwarf_regnum(self, reg_infos, name, dwarf_num):
992        self.assertIsNotNone(reg_infos)
993        for reg_info in reg_infos:
994            if (reg_info["name"] == name) and (reg_info["dwarf"] == dwarf_num):
995                return reg_info
996        return None
997
998    def decode_gdbremote_binary(self, encoded_bytes):
999        decoded_bytes = ""
1000        i = 0
1001        while i < len(encoded_bytes):
1002            if encoded_bytes[i] == "}":
1003                # Handle escaped char.
1004                self.assertTrue(i + 1 < len(encoded_bytes))
1005                decoded_bytes += chr(ord(encoded_bytes[i + 1]) ^ 0x20)
1006                i += 2
1007            elif encoded_bytes[i] == "*":
1008                # Handle run length encoding.
1009                self.assertTrue(len(decoded_bytes) > 0)
1010                self.assertTrue(i + 1 < len(encoded_bytes))
1011                repeat_count = ord(encoded_bytes[i + 1]) - 29
1012                decoded_bytes += decoded_bytes[-1] * repeat_count
1013                i += 2
1014            else:
1015                decoded_bytes += encoded_bytes[i]
1016                i += 1
1017        return decoded_bytes
1018
1019    def build_auxv_dict(self, endian, word_size, auxv_data):
1020        self.assertIsNotNone(endian)
1021        self.assertIsNotNone(word_size)
1022        self.assertIsNotNone(auxv_data)
1023
1024        auxv_dict = {}
1025
1026        # PowerPC64le's auxvec has a special key that must be ignored.
1027        # This special key may be used multiple times, resulting in
1028        # multiple key/value pairs with the same key, which would otherwise
1029        # break this test check for repeated keys.
1030        #
1031        # AT_IGNOREPPC = 22
1032        ignored_keys_for_arch = { 'powerpc64le' : [22] }
1033        arch = self.getArchitecture()
1034        ignore_keys = None
1035        if arch in ignored_keys_for_arch:
1036            ignore_keys = ignored_keys_for_arch[arch]
1037
1038        while len(auxv_data) > 0:
1039            # Chop off key.
1040            raw_key = auxv_data[:word_size]
1041            auxv_data = auxv_data[word_size:]
1042
1043            # Chop of value.
1044            raw_value = auxv_data[:word_size]
1045            auxv_data = auxv_data[word_size:]
1046
1047            # Convert raw text from target endian.
1048            key = unpack_endian_binary_string(endian, raw_key)
1049            value = unpack_endian_binary_string(endian, raw_value)
1050
1051            if ignore_keys and key in ignore_keys:
1052                continue
1053
1054            # Handle ending entry.
1055            if key == 0:
1056                self.assertEqual(value, 0)
1057                return auxv_dict
1058
1059            # The key should not already be present.
1060            self.assertFalse(key in auxv_dict)
1061            auxv_dict[key] = value
1062
1063        self.fail(
1064            "should not reach here - implies required double zero entry not found")
1065        return auxv_dict
1066
1067    def read_binary_data_in_chunks(self, command_prefix, chunk_length):
1068        """Collect command_prefix{offset:x},{chunk_length:x} until a single 'l' or 'l' with data is returned."""
1069        offset = 0
1070        done = False
1071        decoded_data = ""
1072
1073        while not done:
1074            # Grab the next iteration of data.
1075            self.reset_test_sequence()
1076            self.test_sequence.add_log_lines(
1077                [
1078                    "read packet: ${}{:x},{:x}:#00".format(
1079                        command_prefix,
1080                        offset,
1081                        chunk_length),
1082                    {
1083                        "direction": "send",
1084                        "regex": re.compile(
1085                            r"^\$([^E])(.*)#[0-9a-fA-F]{2}$",
1086                            re.MULTILINE | re.DOTALL),
1087                        "capture": {
1088                            1: "response_type",
1089                            2: "content_raw"}}],
1090                True)
1091
1092            context = self.expect_gdbremote_sequence()
1093            self.assertIsNotNone(context)
1094
1095            response_type = context.get("response_type")
1096            self.assertIsNotNone(response_type)
1097            self.assertTrue(response_type in ["l", "m"])
1098
1099            # Move offset along.
1100            offset += chunk_length
1101
1102            # Figure out if we're done.  We're done if the response type is l.
1103            done = response_type == "l"
1104
1105            # Decode binary data.
1106            content_raw = context.get("content_raw")
1107            if content_raw and len(content_raw) > 0:
1108                self.assertIsNotNone(content_raw)
1109                decoded_data += self.decode_gdbremote_binary(content_raw)
1110        return decoded_data
1111
1112    def add_interrupt_packets(self):
1113        self.test_sequence.add_log_lines([
1114            # Send the intterupt.
1115            "read packet: {}".format(chr(3)),
1116            # And wait for the stop notification.
1117            {"direction": "send",
1118             "regex": r"^\$T([0-9a-fA-F]{2})(.*)#[0-9a-fA-F]{2}$",
1119             "capture": {1: "stop_signo",
1120                         2: "stop_key_val_text"}},
1121        ], True)
1122
1123    def parse_interrupt_packets(self, context):
1124        self.assertIsNotNone(context.get("stop_signo"))
1125        self.assertIsNotNone(context.get("stop_key_val_text"))
1126        return (int(context["stop_signo"], 16), self.parse_key_val_dict(
1127            context["stop_key_val_text"]))
1128
1129    def add_QSaveRegisterState_packets(self, thread_id):
1130        if thread_id:
1131            # Use the thread suffix form.
1132            request = "read packet: $QSaveRegisterState;thread:{:x}#00".format(
1133                thread_id)
1134        else:
1135            request = "read packet: $QSaveRegisterState#00"
1136
1137        self.test_sequence.add_log_lines([request,
1138                                          {"direction": "send",
1139                                           "regex": r"^\$(E?.*)#[0-9a-fA-F]{2}$",
1140                                           "capture": {1: "save_response"}},
1141                                          ],
1142                                         True)
1143
1144    def parse_QSaveRegisterState_response(self, context):
1145        self.assertIsNotNone(context)
1146
1147        save_response = context.get("save_response")
1148        self.assertIsNotNone(save_response)
1149
1150        if len(save_response) < 1 or save_response[0] == "E":
1151            # error received
1152            return (False, None)
1153        else:
1154            return (True, int(save_response))
1155
1156    def add_QRestoreRegisterState_packets(self, save_id, thread_id=None):
1157        if thread_id:
1158            # Use the thread suffix form.
1159            request = "read packet: $QRestoreRegisterState:{};thread:{:x}#00".format(
1160                save_id, thread_id)
1161        else:
1162            request = "read packet: $QRestoreRegisterState:{}#00".format(
1163                save_id)
1164
1165        self.test_sequence.add_log_lines([
1166            request,
1167            "send packet: $OK#00"
1168        ], True)
1169
1170    def flip_all_bits_in_each_register_value(
1171            self, reg_infos, endian, thread_id=None):
1172        self.assertIsNotNone(reg_infos)
1173
1174        successful_writes = 0
1175        failed_writes = 0
1176
1177        for reg_info in reg_infos:
1178            # Use the lldb register index added to the reg info.  We're not necessarily
1179            # working off a full set of register infos, so an inferred register
1180            # index could be wrong.
1181            reg_index = reg_info["lldb_register_index"]
1182            self.assertIsNotNone(reg_index)
1183
1184            reg_byte_size = int(reg_info["bitsize"]) // 8
1185            self.assertTrue(reg_byte_size > 0)
1186
1187            # Handle thread suffix.
1188            if thread_id:
1189                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1190                    reg_index, thread_id)
1191            else:
1192                p_request = "read packet: $p{:x}#00".format(reg_index)
1193
1194            # Read the existing value.
1195            self.reset_test_sequence()
1196            self.test_sequence.add_log_lines([
1197                p_request,
1198                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1199            ], True)
1200            context = self.expect_gdbremote_sequence()
1201            self.assertIsNotNone(context)
1202
1203            # Verify the response length.
1204            p_response = context.get("p_response")
1205            self.assertIsNotNone(p_response)
1206            initial_reg_value = unpack_register_hex_unsigned(
1207                endian, p_response)
1208
1209            # Flip the value by xoring with all 1s
1210            all_one_bits_raw = "ff" * (int(reg_info["bitsize"]) // 8)
1211            flipped_bits_int = initial_reg_value ^ int(all_one_bits_raw, 16)
1212            # print("reg (index={}, name={}): val={}, flipped bits (int={}, hex={:x})".format(reg_index, reg_info["name"], initial_reg_value, flipped_bits_int, flipped_bits_int))
1213
1214            # Handle thread suffix for P.
1215            if thread_id:
1216                P_request = "read packet: $P{:x}={};thread:{:x}#00".format(
1217                    reg_index, pack_register_hex(
1218                        endian, flipped_bits_int, byte_size=reg_byte_size), thread_id)
1219            else:
1220                P_request = "read packet: $P{:x}={}#00".format(
1221                    reg_index, pack_register_hex(
1222                        endian, flipped_bits_int, byte_size=reg_byte_size))
1223
1224            # Write the flipped value to the register.
1225            self.reset_test_sequence()
1226            self.test_sequence.add_log_lines([P_request,
1227                                              {"direction": "send",
1228                                               "regex": r"^\$(OK|E[0-9a-fA-F]+)#[0-9a-fA-F]{2}",
1229                                               "capture": {1: "P_response"}},
1230                                              ],
1231                                             True)
1232            context = self.expect_gdbremote_sequence()
1233            self.assertIsNotNone(context)
1234
1235            # Determine if the write succeeded.  There are a handful of registers that can fail, or partially fail
1236            # (e.g. flags, segment selectors, etc.) due to register value restrictions.  Don't worry about them
1237            # all flipping perfectly.
1238            P_response = context.get("P_response")
1239            self.assertIsNotNone(P_response)
1240            if P_response == "OK":
1241                successful_writes += 1
1242            else:
1243                failed_writes += 1
1244                # print("reg (index={}, name={}) write FAILED (error: {})".format(reg_index, reg_info["name"], P_response))
1245
1246            # Read back the register value, ensure it matches the flipped
1247            # value.
1248            if P_response == "OK":
1249                self.reset_test_sequence()
1250                self.test_sequence.add_log_lines([
1251                    p_request,
1252                    {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1253                ], True)
1254                context = self.expect_gdbremote_sequence()
1255                self.assertIsNotNone(context)
1256
1257                verify_p_response_raw = context.get("p_response")
1258                self.assertIsNotNone(verify_p_response_raw)
1259                verify_bits = unpack_register_hex_unsigned(
1260                    endian, verify_p_response_raw)
1261
1262                if verify_bits != flipped_bits_int:
1263                    # Some registers, like mxcsrmask and others, will permute what's written.  Adjust succeed/fail counts.
1264                    # print("reg (index={}, name={}): read verify FAILED: wrote {:x}, verify read back {:x}".format(reg_index, reg_info["name"], flipped_bits_int, verify_bits))
1265                    successful_writes -= 1
1266                    failed_writes += 1
1267
1268        return (successful_writes, failed_writes)
1269
1270    def is_bit_flippable_register(self, reg_info):
1271        if not reg_info:
1272            return False
1273        if not "set" in reg_info:
1274            return False
1275        if reg_info["set"] != "General Purpose Registers":
1276            return False
1277        if ("container-regs" in reg_info) and (
1278                len(reg_info["container-regs"]) > 0):
1279            # Don't try to bit flip registers contained in another register.
1280            return False
1281        if re.match("^.s$", reg_info["name"]):
1282            # This is a 2-letter register name that ends in "s", like a segment register.
1283            # Don't try to bit flip these.
1284            return False
1285        if re.match("^(c|)psr$", reg_info["name"]):
1286            # This is an ARM program status register; don't flip it.
1287            return False
1288        # Okay, this looks fine-enough.
1289        return True
1290
1291    def read_register_values(self, reg_infos, endian, thread_id=None):
1292        self.assertIsNotNone(reg_infos)
1293        values = {}
1294
1295        for reg_info in reg_infos:
1296            # We append a register index when load reg infos so we can work
1297            # with subsets.
1298            reg_index = reg_info.get("lldb_register_index")
1299            self.assertIsNotNone(reg_index)
1300
1301            # Handle thread suffix.
1302            if thread_id:
1303                p_request = "read packet: $p{:x};thread:{:x}#00".format(
1304                    reg_index, thread_id)
1305            else:
1306                p_request = "read packet: $p{:x}#00".format(reg_index)
1307
1308            # Read it with p.
1309            self.reset_test_sequence()
1310            self.test_sequence.add_log_lines([
1311                p_request,
1312                {"direction": "send", "regex": r"^\$([0-9a-fA-F]+)#", "capture": {1: "p_response"}},
1313            ], True)
1314            context = self.expect_gdbremote_sequence()
1315            self.assertIsNotNone(context)
1316
1317            # Convert value from target endian to integral.
1318            p_response = context.get("p_response")
1319            self.assertIsNotNone(p_response)
1320            self.assertTrue(len(p_response) > 0)
1321            self.assertFalse(p_response[0] == "E")
1322
1323            values[reg_index] = unpack_register_hex_unsigned(
1324                endian, p_response)
1325
1326        return values
1327
1328    def add_vCont_query_packets(self):
1329        self.test_sequence.add_log_lines(["read packet: $vCont?#49",
1330                                          {"direction": "send",
1331                                           "regex": r"^\$(vCont)?(.*)#[0-9a-fA-F]{2}$",
1332                                           "capture": {2: "vCont_query_response"}},
1333                                          ],
1334                                         True)
1335
1336    def parse_vCont_query_response(self, context):
1337        self.assertIsNotNone(context)
1338        vCont_query_response = context.get("vCont_query_response")
1339
1340        # Handle case of no vCont support at all - in which case the capture
1341        # group will be none or zero length.
1342        if not vCont_query_response or len(vCont_query_response) == 0:
1343            return {}
1344
1345        return {key: 1 for key in vCont_query_response.split(
1346            ";") if key and len(key) > 0}
1347
1348    def count_single_steps_until_true(
1349            self,
1350            thread_id,
1351            predicate,
1352            args,
1353            max_step_count=100,
1354            use_Hc_packet=True,
1355            step_instruction="s"):
1356        """Used by single step test that appears in a few different contexts."""
1357        single_step_count = 0
1358
1359        while single_step_count < max_step_count:
1360            self.assertIsNotNone(thread_id)
1361
1362            # Build the packet for the single step instruction.  We replace
1363            # {thread}, if present, with the thread_id.
1364            step_packet = "read packet: ${}#00".format(
1365                re.sub(r"{thread}", "{:x}".format(thread_id), step_instruction))
1366            # print("\nstep_packet created: {}\n".format(step_packet))
1367
1368            # Single step.
1369            self.reset_test_sequence()
1370            if use_Hc_packet:
1371                self.test_sequence.add_log_lines(
1372                    [  # Set the continue thread.
1373                        "read packet: $Hc{0:x}#00".format(thread_id),
1374                        "send packet: $OK#00",
1375                    ], True)
1376            self.test_sequence.add_log_lines([
1377                # Single step.
1378                step_packet,
1379                # "read packet: $vCont;s:{0:x}#00".format(thread_id),
1380                # Expect a breakpoint stop report.
1381                {"direction": "send",
1382                 "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);",
1383                 "capture": {1: "stop_signo",
1384                             2: "stop_thread_id"}},
1385            ], True)
1386            context = self.expect_gdbremote_sequence()
1387            self.assertIsNotNone(context)
1388            self.assertIsNotNone(context.get("stop_signo"))
1389            self.assertEqual(int(context.get("stop_signo"), 16),
1390                             lldbutil.get_signal_number('SIGTRAP'))
1391
1392            single_step_count += 1
1393
1394            # See if the predicate is true.  If so, we're done.
1395            if predicate(args):
1396                return (True, single_step_count)
1397
1398        # The predicate didn't return true within the runaway step count.
1399        return (False, single_step_count)
1400
1401    def g_c1_c2_contents_are(self, args):
1402        """Used by single step test that appears in a few different contexts."""
1403        g_c1_address = args["g_c1_address"]
1404        g_c2_address = args["g_c2_address"]
1405        expected_g_c1 = args["expected_g_c1"]
1406        expected_g_c2 = args["expected_g_c2"]
1407
1408        # Read g_c1 and g_c2 contents.
1409        self.reset_test_sequence()
1410        self.test_sequence.add_log_lines(
1411            ["read packet: $m{0:x},{1:x}#00".format(g_c1_address, 1),
1412             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c1_contents"}},
1413             "read packet: $m{0:x},{1:x}#00".format(g_c2_address, 1),
1414             {"direction": "send", "regex": r"^\$(.+)#[0-9a-fA-F]{2}$", "capture": {1: "g_c2_contents"}}],
1415            True)
1416
1417        # Run the packet stream.
1418        context = self.expect_gdbremote_sequence()
1419        self.assertIsNotNone(context)
1420
1421        # Check if what we read from inferior memory is what we are expecting.
1422        self.assertIsNotNone(context.get("g_c1_contents"))
1423        self.assertIsNotNone(context.get("g_c2_contents"))
1424
1425        return (seven.unhexlify(context.get("g_c1_contents")) == expected_g_c1) and (
1426            seven.unhexlify(context.get("g_c2_contents")) == expected_g_c2)
1427
1428    def single_step_only_steps_one_instruction(
1429            self, use_Hc_packet=True, step_instruction="s"):
1430        """Used by single step test that appears in a few different contexts."""
1431        # Start up the inferior.
1432        procs = self.prep_debug_monitor_and_inferior(
1433            inferior_args=[
1434                "get-code-address-hex:swap_chars",
1435                "get-data-address-hex:g_c1",
1436                "get-data-address-hex:g_c2",
1437                "sleep:1",
1438                "call-function:swap_chars",
1439                "sleep:5"])
1440
1441        # Run the process
1442        self.test_sequence.add_log_lines(
1443            [  # Start running after initial stop.
1444                "read packet: $c#63",
1445                # Match output line that prints the memory address of the function call entry point.
1446                # Note we require launch-only testing so we can get inferior otuput.
1447                {"type": "output_match", "regex": r"^code address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\ndata address: 0x([0-9a-fA-F]+)\r\n$",
1448                 "capture": {1: "function_address", 2: "g_c1_address", 3: "g_c2_address"}},
1449                # Now stop the inferior.
1450                "read packet: {}".format(chr(3)),
1451                # And wait for the stop notification.
1452                {"direction": "send", "regex": r"^\$T([0-9a-fA-F]{2})thread:([0-9a-fA-F]+);", "capture": {1: "stop_signo", 2: "stop_thread_id"}}],
1453            True)
1454
1455        # Run the packet stream.
1456        context = self.expect_gdbremote_sequence()
1457        self.assertIsNotNone(context)
1458
1459        # Grab the main thread id.
1460        self.assertIsNotNone(context.get("stop_thread_id"))
1461        main_thread_id = int(context.get("stop_thread_id"), 16)
1462
1463        # Grab the function address.
1464        self.assertIsNotNone(context.get("function_address"))
1465        function_address = int(context.get("function_address"), 16)
1466
1467        # Grab the data addresses.
1468        self.assertIsNotNone(context.get("g_c1_address"))
1469        g_c1_address = int(context.get("g_c1_address"), 16)
1470
1471        self.assertIsNotNone(context.get("g_c2_address"))
1472        g_c2_address = int(context.get("g_c2_address"), 16)
1473
1474        # Set a breakpoint at the given address.
1475        if self.getArchitecture().startswith("arm"):
1476            # TODO: Handle case when setting breakpoint in thumb code
1477            BREAKPOINT_KIND = 4
1478        else:
1479            BREAKPOINT_KIND = 1
1480        self.reset_test_sequence()
1481        self.add_set_breakpoint_packets(
1482            function_address,
1483            do_continue=True,
1484            breakpoint_kind=BREAKPOINT_KIND)
1485        context = self.expect_gdbremote_sequence()
1486        self.assertIsNotNone(context)
1487
1488        # Remove the breakpoint.
1489        self.reset_test_sequence()
1490        self.add_remove_breakpoint_packets(
1491            function_address, breakpoint_kind=BREAKPOINT_KIND)
1492        context = self.expect_gdbremote_sequence()
1493        self.assertIsNotNone(context)
1494
1495        # Verify g_c1 and g_c2 match expected initial state.
1496        args = {}
1497        args["g_c1_address"] = g_c1_address
1498        args["g_c2_address"] = g_c2_address
1499        args["expected_g_c1"] = "0"
1500        args["expected_g_c2"] = "1"
1501
1502        self.assertTrue(self.g_c1_c2_contents_are(args))
1503
1504        # Verify we take only a small number of steps to hit the first state.
1505        # Might need to work through function entry prologue code.
1506        args["expected_g_c1"] = "1"
1507        args["expected_g_c2"] = "1"
1508        (state_reached,
1509         step_count) = self.count_single_steps_until_true(main_thread_id,
1510                                                          self.g_c1_c2_contents_are,
1511                                                          args,
1512                                                          max_step_count=25,
1513                                                          use_Hc_packet=use_Hc_packet,
1514                                                          step_instruction=step_instruction)
1515        self.assertTrue(state_reached)
1516
1517        # Verify we hit the next state.
1518        args["expected_g_c1"] = "1"
1519        args["expected_g_c2"] = "0"
1520        (state_reached,
1521         step_count) = self.count_single_steps_until_true(main_thread_id,
1522                                                          self.g_c1_c2_contents_are,
1523                                                          args,
1524                                                          max_step_count=5,
1525                                                          use_Hc_packet=use_Hc_packet,
1526                                                          step_instruction=step_instruction)
1527        self.assertTrue(state_reached)
1528        expected_step_count = 1
1529        arch = self.getArchitecture()
1530
1531        # MIPS required "3" (ADDIU, SB, LD) machine instructions for updation
1532        # of variable value
1533        if re.match("mips", arch):
1534            expected_step_count = 3
1535        # S390X requires "2" (LARL, MVI) machine instructions for updation of
1536        # variable value
1537        if re.match("s390x", arch):
1538            expected_step_count = 2
1539        # ARM64 requires "4" instructions: 2 to compute the address (adrp, add),
1540        # one to materialize the constant (mov) and the store
1541        if re.match("arm64", arch):
1542            expected_step_count = 4
1543
1544        self.assertEqual(step_count, expected_step_count)
1545
1546        # ARM64: Once addresses and constants are materialized, only one
1547        # instruction is needed.
1548        if re.match("arm64", arch):
1549            expected_step_count = 1
1550
1551        # Verify we hit the next state.
1552        args["expected_g_c1"] = "0"
1553        args["expected_g_c2"] = "0"
1554        (state_reached,
1555         step_count) = self.count_single_steps_until_true(main_thread_id,
1556                                                          self.g_c1_c2_contents_are,
1557                                                          args,
1558                                                          max_step_count=5,
1559                                                          use_Hc_packet=use_Hc_packet,
1560                                                          step_instruction=step_instruction)
1561        self.assertTrue(state_reached)
1562        self.assertEqual(step_count, expected_step_count)
1563
1564        # Verify we hit the next state.
1565        args["expected_g_c1"] = "0"
1566        args["expected_g_c2"] = "1"
1567        (state_reached,
1568         step_count) = self.count_single_steps_until_true(main_thread_id,
1569                                                          self.g_c1_c2_contents_are,
1570                                                          args,
1571                                                          max_step_count=5,
1572                                                          use_Hc_packet=use_Hc_packet,
1573                                                          step_instruction=step_instruction)
1574        self.assertTrue(state_reached)
1575        self.assertEqual(step_count, expected_step_count)
1576
1577    def maybe_strict_output_regex(self, regex):
1578        return '.*' + regex + \
1579            '.*' if lldbplatformutil.hasChattyStderr(self) else '^' + regex + '$'
1580
1581    def install_and_create_launch_args(self):
1582        exe_path = self.getBuildArtifact("a.out")
1583        if not lldb.remote_platform:
1584            return [exe_path]
1585        remote_path = lldbutil.append_to_process_working_directory(self,
1586            os.path.basename(exe_path))
1587        remote_file_spec = lldb.SBFileSpec(remote_path, False)
1588        err = lldb.remote_platform.Install(lldb.SBFileSpec(exe_path, True),
1589                                           remote_file_spec)
1590        if err.Fail():
1591            raise Exception("remote_platform.Install('%s', '%s') failed: %s" %
1592                            (exe_path, remote_path, err))
1593        return [remote_path]
1594