1"""
2Makes it possible to do the compiled analysis in a subprocess. This has two
3goals:
4
51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can
6   be ignored and dealt with.
72. Make it possible to handle different Python versions as well as virtualenvs.
8"""
9
10import os
11import sys
12import queue
13import subprocess
14import traceback
15import weakref
16from functools import partial
17from threading import Thread
18
19from jedi._compatibility import pickle_dump, pickle_load
20from jedi import debug
21from jedi.cache import memoize_method
22from jedi.inference.compiled.subprocess import functions
23from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \
24    SignatureParam
25from jedi.api.exceptions import InternalError
26
27
28_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py')
29PICKLE_PROTOCOL = 4
30
31
32def _GeneralizedPopen(*args, **kwargs):
33    if os.name == 'nt':
34        try:
35            # Was introduced in Python 3.7.
36            CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW
37        except AttributeError:
38            CREATE_NO_WINDOW = 0x08000000
39        kwargs['creationflags'] = CREATE_NO_WINDOW
40    # The child process doesn't need file descriptors except 0, 1, 2.
41    # This is unix only.
42    kwargs['close_fds'] = 'posix' in sys.builtin_module_names
43
44    return subprocess.Popen(*args, **kwargs)
45
46
47def _enqueue_output(out, queue_):
48    for line in iter(out.readline, b''):
49        queue_.put(line)
50
51
52def _add_stderr_to_debug(stderr_queue):
53    while True:
54        # Try to do some error reporting from the subprocess and print its
55        # stderr contents.
56        try:
57            line = stderr_queue.get_nowait()
58            line = line.decode('utf-8', 'replace')
59            debug.warning('stderr output: %s' % line.rstrip('\n'))
60        except queue.Empty:
61            break
62
63
64def _get_function(name):
65    return getattr(functions, name)
66
67
68def _cleanup_process(process, thread):
69    try:
70        process.kill()
71        process.wait()
72    except OSError:
73        # Raised if the process is already killed.
74        pass
75    thread.join()
76    for stream in [process.stdin, process.stdout, process.stderr]:
77        try:
78            stream.close()
79        except OSError:
80            # Raised if the stream is broken.
81            pass
82
83
84class _InferenceStateProcess:
85    def __init__(self, inference_state):
86        self._inference_state_weakref = weakref.ref(inference_state)
87        self._inference_state_id = id(inference_state)
88        self._handles = {}
89
90    def get_or_create_access_handle(self, obj):
91        id_ = id(obj)
92        try:
93            return self.get_access_handle(id_)
94        except KeyError:
95            access = DirectObjectAccess(self._inference_state_weakref(), obj)
96            handle = AccessHandle(self, access, id_)
97            self.set_access_handle(handle)
98            return handle
99
100    def get_access_handle(self, id_):
101        return self._handles[id_]
102
103    def set_access_handle(self, handle):
104        self._handles[handle.id] = handle
105
106
107class InferenceStateSameProcess(_InferenceStateProcess):
108    """
109    Basically just an easy access to functions.py. It has the same API
110    as InferenceStateSubprocess and does the same thing without using a subprocess.
111    This is necessary for the Interpreter process.
112    """
113    def __getattr__(self, name):
114        return partial(_get_function(name), self._inference_state_weakref())
115
116
117class InferenceStateSubprocess(_InferenceStateProcess):
118    def __init__(self, inference_state, compiled_subprocess):
119        super().__init__(inference_state)
120        self._used = False
121        self._compiled_subprocess = compiled_subprocess
122
123    def __getattr__(self, name):
124        func = _get_function(name)
125
126        def wrapper(*args, **kwargs):
127            self._used = True
128
129            result = self._compiled_subprocess.run(
130                self._inference_state_weakref(),
131                func,
132                args=args,
133                kwargs=kwargs,
134            )
135            # IMO it should be possible to create a hook in pickle.load to
136            # mess with the loaded objects. However it's extremely complicated
137            # to work around this so just do it with this call. ~ dave
138            return self._convert_access_handles(result)
139
140        return wrapper
141
142    def _convert_access_handles(self, obj):
143        if isinstance(obj, SignatureParam):
144            return SignatureParam(*self._convert_access_handles(tuple(obj)))
145        elif isinstance(obj, tuple):
146            return tuple(self._convert_access_handles(o) for o in obj)
147        elif isinstance(obj, list):
148            return [self._convert_access_handles(o) for o in obj]
149        elif isinstance(obj, AccessHandle):
150            try:
151                # Rewrite the access handle to one we're already having.
152                obj = self.get_access_handle(obj.id)
153            except KeyError:
154                obj.add_subprocess(self)
155                self.set_access_handle(obj)
156        elif isinstance(obj, AccessPath):
157            return AccessPath(self._convert_access_handles(obj.accesses))
158        return obj
159
160    def __del__(self):
161        if self._used and not self._compiled_subprocess.is_crashed:
162            self._compiled_subprocess.delete_inference_state(self._inference_state_id)
163
164
165class CompiledSubprocess:
166    is_crashed = False
167
168    def __init__(self, executable, env_vars=None):
169        self._executable = executable
170        self._env_vars = env_vars
171        self._inference_state_deletion_queue = queue.deque()
172        self._cleanup_callable = lambda: None
173
174    def __repr__(self):
175        pid = os.getpid()
176        return '<%s _executable=%r, is_crashed=%r, pid=%r>' % (
177            self.__class__.__name__,
178            self._executable,
179            self.is_crashed,
180            pid,
181        )
182
183    @memoize_method
184    def _get_process(self):
185        debug.dbg('Start environment subprocess %s', self._executable)
186        parso_path = sys.modules['parso'].__file__
187        args = (
188            self._executable,
189            _MAIN_PATH,
190            os.path.dirname(os.path.dirname(parso_path)),
191            '.'.join(str(x) for x in sys.version_info[:3]),
192        )
193        process = _GeneralizedPopen(
194            args,
195            stdin=subprocess.PIPE,
196            stdout=subprocess.PIPE,
197            stderr=subprocess.PIPE,
198            env=self._env_vars
199        )
200        self._stderr_queue = queue.Queue()
201        self._stderr_thread = t = Thread(
202            target=_enqueue_output,
203            args=(process.stderr, self._stderr_queue)
204        )
205        t.daemon = True
206        t.start()
207        # Ensure the subprocess is properly cleaned up when the object
208        # is garbage collected.
209        self._cleanup_callable = weakref.finalize(self,
210                                                  _cleanup_process,
211                                                  process,
212                                                  t)
213        return process
214
215    def run(self, inference_state, function, args=(), kwargs={}):
216        # Delete old inference_states.
217        while True:
218            try:
219                inference_state_id = self._inference_state_deletion_queue.pop()
220            except IndexError:
221                break
222            else:
223                self._send(inference_state_id, None)
224
225        assert callable(function)
226        return self._send(id(inference_state), function, args, kwargs)
227
228    def get_sys_path(self):
229        return self._send(None, functions.get_sys_path, (), {})
230
231    def _kill(self):
232        self.is_crashed = True
233        self._cleanup_callable()
234
235    def _send(self, inference_state_id, function, args=(), kwargs={}):
236        if self.is_crashed:
237            raise InternalError("The subprocess %s has crashed." % self._executable)
238
239        data = inference_state_id, function, args, kwargs
240        try:
241            pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL)
242        except BrokenPipeError:
243            self._kill()
244            raise InternalError("The subprocess %s was killed. Maybe out of memory?"
245                                % self._executable)
246
247        try:
248            is_exception, traceback, result = pickle_load(self._get_process().stdout)
249        except EOFError as eof_error:
250            try:
251                stderr = self._get_process().stderr.read().decode('utf-8', 'replace')
252            except Exception as exc:
253                stderr = '<empty/not available (%r)>' % exc
254            self._kill()
255            _add_stderr_to_debug(self._stderr_queue)
256            raise InternalError(
257                "The subprocess %s has crashed (%r, stderr=%s)." % (
258                    self._executable,
259                    eof_error,
260                    stderr,
261                ))
262
263        _add_stderr_to_debug(self._stderr_queue)
264
265        if is_exception:
266            # Replace the attribute error message with a the traceback. It's
267            # way more informative.
268            result.args = (traceback,)
269            raise result
270        return result
271
272    def delete_inference_state(self, inference_state_id):
273        """
274        Currently we are not deleting inference_state instantly. They only get
275        deleted once the subprocess is used again. It would probably a better
276        solution to move all of this into a thread. However, the memory usage
277        of a single inference_state shouldn't be that high.
278        """
279        # With an argument - the inference_state gets deleted.
280        self._inference_state_deletion_queue.append(inference_state_id)
281
282
283class Listener:
284    def __init__(self):
285        self._inference_states = {}
286        # TODO refactor so we don't need to process anymore just handle
287        # controlling.
288        self._process = _InferenceStateProcess(Listener)
289
290    def _get_inference_state(self, function, inference_state_id):
291        from jedi.inference import InferenceState
292
293        try:
294            inference_state = self._inference_states[inference_state_id]
295        except KeyError:
296            from jedi import InterpreterEnvironment
297            inference_state = InferenceState(
298                # The project is not actually needed. Nothing should need to
299                # access it.
300                project=None,
301                environment=InterpreterEnvironment()
302            )
303            self._inference_states[inference_state_id] = inference_state
304        return inference_state
305
306    def _run(self, inference_state_id, function, args, kwargs):
307        if inference_state_id is None:
308            return function(*args, **kwargs)
309        elif function is None:
310            del self._inference_states[inference_state_id]
311        else:
312            inference_state = self._get_inference_state(function, inference_state_id)
313
314            # Exchange all handles
315            args = list(args)
316            for i, arg in enumerate(args):
317                if isinstance(arg, AccessHandle):
318                    args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id)
319            for key, value in kwargs.items():
320                if isinstance(value, AccessHandle):
321                    kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id)
322
323            return function(inference_state, *args, **kwargs)
324
325    def listen(self):
326        stdout = sys.stdout
327        # Mute stdout. Nobody should actually be able to write to it,
328        # because stdout is used for IPC.
329        sys.stdout = open(os.devnull, 'w')
330        stdin = sys.stdin
331        stdout = stdout.buffer
332        stdin = stdin.buffer
333
334        while True:
335            try:
336                payload = pickle_load(stdin)
337            except EOFError:
338                # It looks like the parent process closed.
339                # Don't make a big fuss here and just exit.
340                exit(0)
341            try:
342                result = False, None, self._run(*payload)
343            except Exception as e:
344                result = True, traceback.format_exc(), e
345
346            pickle_dump(result, stdout, PICKLE_PROTOCOL)
347
348
349class AccessHandle:
350    def __init__(self, subprocess, access, id_):
351        self.access = access
352        self._subprocess = subprocess
353        self.id = id_
354
355    def add_subprocess(self, subprocess):
356        self._subprocess = subprocess
357
358    def __repr__(self):
359        try:
360            detail = self.access
361        except AttributeError:
362            detail = '#' + str(self.id)
363        return '<%s of %s>' % (self.__class__.__name__, detail)
364
365    def __getstate__(self):
366        return self.id
367
368    def __setstate__(self, state):
369        self.id = state
370
371    def __getattr__(self, name):
372        if name in ('id', 'access') or name.startswith('_'):
373            raise AttributeError("Something went wrong with unpickling")
374
375        # print('getattr', name, file=sys.stderr)
376        return partial(self._workaround, name)
377
378    def _workaround(self, name, *args, **kwargs):
379        """
380        TODO Currently we're passing slice objects around. This should not
381        happen. They are also the only unhashable objects that we're passing
382        around.
383        """
384        if args and isinstance(args[0], slice):
385            return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)
386        return self._cached_results(name, *args, **kwargs)
387
388    @memoize_method
389    def _cached_results(self, name, *args, **kwargs):
390        return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs)
391