1
2from lldbsuite.test.lldbtest import *
3import os
4import vscode
5import time
6
7
8class VSCodeTestCaseBase(TestBase):
9
10    NO_DEBUG_INFO_TESTCASE = True
11
12    def create_debug_adaptor(self):
13        '''Create the Visual Studio Code debug adaptor'''
14        self.assertTrue(os.path.exists(self.lldbVSCodeExec),
15                        'lldb-vscode must exist')
16        log_file_path = self.getBuildArtifact('vscode.txt')
17        self.vscode = vscode.DebugAdaptor(
18            executable=self.lldbVSCodeExec, init_commands=self.setUpCommands(),
19            log_file=log_file_path)
20
21    def build_and_create_debug_adaptor(self):
22        self.build()
23        self.create_debug_adaptor()
24
25    def set_source_breakpoints(self, source_path, lines, condition=None,
26                               hitCondition=None):
27        '''Sets source breakpoints and returns an array of strings containing
28           the breakpoint IDs ("1", "2") for each breakpoint that was set.
29        '''
30        response = self.vscode.request_setBreakpoints(
31            source_path, lines, condition=condition, hitCondition=hitCondition)
32        if response is None:
33            return []
34        breakpoints = response['body']['breakpoints']
35        breakpoint_ids = []
36        for breakpoint in breakpoints:
37            breakpoint_ids.append('%i' % (breakpoint['id']))
38        return breakpoint_ids
39
40    def set_function_breakpoints(self, functions, condition=None,
41                                 hitCondition=None):
42        '''Sets breakpoints by function name given an array of function names
43           and returns an array of strings containing the breakpoint IDs
44           ("1", "2") for each breakpoint that was set.
45        '''
46        response = self.vscode.request_setFunctionBreakpoints(
47            functions, condition=condition, hitCondition=hitCondition)
48        if response is None:
49            return []
50        breakpoints = response['body']['breakpoints']
51        breakpoint_ids = []
52        for breakpoint in breakpoints:
53            breakpoint_ids.append('%i' % (breakpoint['id']))
54        return breakpoint_ids
55
56    def waitUntil(self, condition_callback):
57        for _ in range(20):
58            if condition_callback():
59                return True
60            time.sleep(0.5)
61        return False
62
63    def verify_breakpoint_hit(self, breakpoint_ids):
64        '''Wait for the process we are debugging to stop, and verify we hit
65           any breakpoint location in the "breakpoint_ids" array.
66           "breakpoint_ids" should be a list of breakpoint ID strings
67           (["1", "2"]). The return value from self.set_source_breakpoints()
68           or self.set_function_breakpoints() can be passed to this function'''
69        stopped_events = self.vscode.wait_for_stopped()
70        for stopped_event in stopped_events:
71            if 'body' in stopped_event:
72                body = stopped_event['body']
73                if 'reason' not in body:
74                    continue
75                if body['reason'] != 'breakpoint':
76                    continue
77                if 'description' not in body:
78                    continue
79                # Descriptions for breakpoints will be in the form
80                # "breakpoint 1.1", so look for any description that matches
81                # ("breakpoint 1.") in the description field as verification
82                # that one of the breakpoint locations was hit. VSCode doesn't
83                # allow breakpoints to have multiple locations, but LLDB does.
84                # So when looking at the description we just want to make sure
85                # the right breakpoint matches and not worry about the actual
86                # location.
87                description = body['description']
88                print("description: %s" % (description))
89                for breakpoint_id in breakpoint_ids:
90                    match_desc = 'breakpoint %s.' % (breakpoint_id)
91                    if match_desc in description:
92                        return
93        self.assertTrue(False, "breakpoint not hit")
94
95    def verify_exception_breakpoint_hit(self, filter_label):
96        '''Wait for the process we are debugging to stop, and verify the stop
97           reason is 'exception' and that the description matches
98           'filter_label'
99        '''
100        stopped_events = self.vscode.wait_for_stopped()
101        for stopped_event in stopped_events:
102            if 'body' in stopped_event:
103                body = stopped_event['body']
104                if 'reason' not in body:
105                    continue
106                if body['reason'] != 'exception':
107                    continue
108                if 'description' not in body:
109                    continue
110                description = body['description']
111                if filter_label == description:
112                    return True
113        return False
114
115    def verify_commands(self, flavor, output, commands):
116        self.assertTrue(output and len(output) > 0, "expect console output")
117        lines = output.splitlines()
118        prefix = '(lldb) '
119        for cmd in commands:
120            found = False
121            for line in lines:
122                if line.startswith(prefix) and cmd in line:
123                    found = True
124                    break
125            self.assertTrue(found,
126                            "verify '%s' found in console output for '%s'" % (
127                                cmd, flavor))
128
129    def get_dict_value(self, d, key_path):
130        '''Verify each key in the key_path array is in contained in each
131           dictionary within "d". Assert if any key isn't in the
132           corresponding dictionary. This is handy for grabbing values from VS
133           Code response dictionary like getting
134           response['body']['stackFrames']
135        '''
136        value = d
137        for key in key_path:
138            if key in value:
139                value = value[key]
140            else:
141                self.assertTrue(key in value,
142                                'key "%s" from key_path "%s" not in "%s"' % (
143                                    key, key_path, d))
144        return value
145
146    def get_stackFrames_and_totalFramesCount(self, threadId=None, startFrame=None,
147                        levels=None, dump=False):
148        response = self.vscode.request_stackTrace(threadId=threadId,
149                                                  startFrame=startFrame,
150                                                  levels=levels,
151                                                  dump=dump)
152        if response:
153            stackFrames = self.get_dict_value(response, ['body', 'stackFrames'])
154            totalFrames = self.get_dict_value(response, ['body', 'totalFrames'])
155            self.assertTrue(totalFrames > 0,
156                    'verify totalFrames count is provided by extension that supports '
157                    'async frames loading')
158            return (stackFrames, totalFrames)
159        return (None, 0)
160
161    def get_stackFrames(self, threadId=None, startFrame=None, levels=None,
162                        dump=False):
163        (stackFrames, totalFrames) = self.get_stackFrames_and_totalFramesCount(
164                                                threadId=threadId,
165                                                startFrame=startFrame,
166                                                levels=levels,
167                                                dump=dump)
168        return stackFrames
169
170    def get_source_and_line(self, threadId=None, frameIndex=0):
171        stackFrames = self.get_stackFrames(threadId=threadId,
172                                           startFrame=frameIndex,
173                                           levels=1)
174        if stackFrames is not None:
175            stackFrame = stackFrames[0]
176            ['source', 'path']
177            if 'source' in stackFrame:
178                source = stackFrame['source']
179                if 'path' in source:
180                    if 'line' in stackFrame:
181                        return (source['path'], stackFrame['line'])
182        return ('', 0)
183
184    def get_stdout(self, timeout=0.0):
185        return self.vscode.get_output('stdout', timeout=timeout)
186
187    def get_console(self, timeout=0.0):
188        return self.vscode.get_output('console', timeout=timeout)
189
190    def collect_console(self, duration):
191        return self.vscode.collect_output('console', duration=duration)
192
193    def get_local_as_int(self, name, threadId=None):
194        value = self.vscode.get_local_variable_value(name, threadId=threadId)
195        if value.startswith('0x'):
196            return int(value, 16)
197        elif value.startswith('0'):
198            return int(value, 8)
199        else:
200            return int(value)
201
202    def set_local(self, name, value, id=None):
203        '''Set a top level local variable only.'''
204        return self.vscode.request_setVariable(1, name, str(value), id=id)
205
206    def set_global(self, name, value, id=None):
207        '''Set a top level global variable only.'''
208        return self.vscode.request_setVariable(2, name, str(value), id=id)
209
210    def stepIn(self, threadId=None, waitForStop=True):
211        self.vscode.request_stepIn(threadId=threadId)
212        if waitForStop:
213            return self.vscode.wait_for_stopped()
214        return None
215
216    def stepOver(self, threadId=None, waitForStop=True):
217        self.vscode.request_next(threadId=threadId)
218        if waitForStop:
219            return self.vscode.wait_for_stopped()
220        return None
221
222    def stepOut(self, threadId=None, waitForStop=True):
223        self.vscode.request_stepOut(threadId=threadId)
224        if waitForStop:
225            return self.vscode.wait_for_stopped()
226        return None
227
228    def continue_to_next_stop(self):
229        self.vscode.request_continue()
230        return self.vscode.wait_for_stopped()
231
232    def continue_to_breakpoints(self, breakpoint_ids):
233        self.vscode.request_continue()
234        self.verify_breakpoint_hit(breakpoint_ids)
235
236    def continue_to_exception_breakpoint(self, filter_label):
237        self.vscode.request_continue()
238        self.assertTrue(self.verify_exception_breakpoint_hit(filter_label),
239                        'verify we got "%s"' % (filter_label))
240
241    def continue_to_exit(self, exitCode=0):
242        self.vscode.request_continue()
243        stopped_events = self.vscode.wait_for_stopped()
244        self.assertEquals(len(stopped_events), 1,
245                        "stopped_events = {}".format(stopped_events))
246        self.assertEquals(stopped_events[0]['event'], 'exited',
247                        'make sure program ran to completion')
248        self.assertEquals(stopped_events[0]['body']['exitCode'], exitCode,
249                        'exitCode == %i' % (exitCode))
250
251    def attach(self, program=None, pid=None, waitFor=None, trace=None,
252               initCommands=None, preRunCommands=None, stopCommands=None,
253               exitCommands=None, attachCommands=None, coreFile=None,
254               disconnectAutomatically=True, terminateCommands=None):
255        '''Build the default Makefile target, create the VSCode debug adaptor,
256           and attach to the process.
257        '''
258        # Make sure we disconnect and terminate the VSCode debug adaptor even
259        # if we throw an exception during the test case.
260        def cleanup():
261            if disconnectAutomatically:
262                self.vscode.request_disconnect(terminateDebuggee=True)
263            self.vscode.terminate()
264
265        # Execute the cleanup function during test case tear down.
266        self.addTearDownHook(cleanup)
267        # Initialize and launch the program
268        self.vscode.request_initialize()
269        response = self.vscode.request_attach(
270            program=program, pid=pid, waitFor=waitFor, trace=trace,
271            initCommands=initCommands, preRunCommands=preRunCommands,
272            stopCommands=stopCommands, exitCommands=exitCommands,
273            attachCommands=attachCommands, terminateCommands=terminateCommands,
274            coreFile=coreFile)
275        if not (response and response['success']):
276            self.assertTrue(response['success'],
277                            'attach failed (%s)' % (response['message']))
278
279    def launch(self, program=None, args=None, cwd=None, env=None,
280               stopOnEntry=False, disableASLR=True,
281               disableSTDIO=False, shellExpandArguments=False,
282               trace=False, initCommands=None, preRunCommands=None,
283               stopCommands=None, exitCommands=None, terminateCommands=None,
284               sourcePath=None, debuggerRoot=None, launchCommands=None,
285               sourceMap=None, disconnectAutomatically=True, runInTerminal=False):
286        '''Sending launch request to vscode
287        '''
288
289        # Make sure we disconnect and terminate the VSCode debug adapter,
290        # if we throw an exception during the test case
291        def cleanup():
292            if disconnectAutomatically:
293                self.vscode.request_disconnect(terminateDebuggee=True)
294            self.vscode.terminate()
295
296        # Execute the cleanup function during test case tear down.
297        self.addTearDownHook(cleanup)
298
299        # Initialize and launch the program
300        self.vscode.request_initialize()
301        response = self.vscode.request_launch(
302            program,
303            args=args,
304            cwd=cwd,
305            env=env,
306            stopOnEntry=stopOnEntry,
307            disableASLR=disableASLR,
308            disableSTDIO=disableSTDIO,
309            shellExpandArguments=shellExpandArguments,
310            trace=trace,
311            initCommands=initCommands,
312            preRunCommands=preRunCommands,
313            stopCommands=stopCommands,
314            exitCommands=exitCommands,
315            terminateCommands=terminateCommands,
316            sourcePath=sourcePath,
317            debuggerRoot=debuggerRoot,
318            launchCommands=launchCommands,
319            sourceMap=sourceMap,
320            runInTerminal=runInTerminal)
321        if not (response and response['success']):
322            self.assertTrue(response['success'],
323                            'launch failed (%s)' % (response['message']))
324        # We need to trigger a request_configurationDone after we've successfully
325        # attached a runInTerminal process to finish initialization.
326        if runInTerminal:
327            self.vscode.request_configurationDone()
328
329
330    def build_and_launch(self, program, args=None, cwd=None, env=None,
331                         stopOnEntry=False, disableASLR=True,
332                         disableSTDIO=False, shellExpandArguments=False,
333                         trace=False, initCommands=None, preRunCommands=None,
334                         stopCommands=None, exitCommands=None,
335                         terminateCommands=None, sourcePath=None,
336                         debuggerRoot=None, runInTerminal=False):
337        '''Build the default Makefile target, create the VSCode debug adaptor,
338           and launch the process.
339        '''
340        self.build_and_create_debug_adaptor()
341        self.assertTrue(os.path.exists(program), 'executable must exist')
342
343        self.launch(program, args, cwd, env, stopOnEntry, disableASLR,
344                    disableSTDIO, shellExpandArguments, trace,
345                    initCommands, preRunCommands, stopCommands, exitCommands,
346                    terminateCommands, sourcePath, debuggerRoot, runInTerminal=runInTerminal)
347