1""" 2Makes it possible to do the compiled analysis in a subprocess. This has two 3goals: 4 51. Making it safer - Segfaults and RuntimeErrors as well as stdout/stderr can 6 be ignored and dealt with. 72. Make it possible to handle different Python versions as well as virtualenvs. 8""" 9 10import os 11import sys 12import queue 13import subprocess 14import traceback 15import weakref 16from functools import partial 17from threading import Thread 18 19from jedi._compatibility import pickle_dump, pickle_load 20from jedi import debug 21from jedi.cache import memoize_method 22from jedi.inference.compiled.subprocess import functions 23from jedi.inference.compiled.access import DirectObjectAccess, AccessPath, \ 24 SignatureParam 25from jedi.api.exceptions import InternalError 26 27 28_MAIN_PATH = os.path.join(os.path.dirname(__file__), '__main__.py') 29PICKLE_PROTOCOL = 4 30 31 32def _GeneralizedPopen(*args, **kwargs): 33 if os.name == 'nt': 34 try: 35 # Was introduced in Python 3.7. 36 CREATE_NO_WINDOW = subprocess.CREATE_NO_WINDOW 37 except AttributeError: 38 CREATE_NO_WINDOW = 0x08000000 39 kwargs['creationflags'] = CREATE_NO_WINDOW 40 # The child process doesn't need file descriptors except 0, 1, 2. 41 # This is unix only. 42 kwargs['close_fds'] = 'posix' in sys.builtin_module_names 43 44 return subprocess.Popen(*args, **kwargs) 45 46 47def _enqueue_output(out, queue_): 48 for line in iter(out.readline, b''): 49 queue_.put(line) 50 51 52def _add_stderr_to_debug(stderr_queue): 53 while True: 54 # Try to do some error reporting from the subprocess and print its 55 # stderr contents. 56 try: 57 line = stderr_queue.get_nowait() 58 line = line.decode('utf-8', 'replace') 59 debug.warning('stderr output: %s' % line.rstrip('\n')) 60 except queue.Empty: 61 break 62 63 64def _get_function(name): 65 return getattr(functions, name) 66 67 68def _cleanup_process(process, thread): 69 try: 70 process.kill() 71 process.wait() 72 except OSError: 73 # Raised if the process is already killed. 74 pass 75 thread.join() 76 for stream in [process.stdin, process.stdout, process.stderr]: 77 try: 78 stream.close() 79 except OSError: 80 # Raised if the stream is broken. 81 pass 82 83 84class _InferenceStateProcess: 85 def __init__(self, inference_state): 86 self._inference_state_weakref = weakref.ref(inference_state) 87 self._inference_state_id = id(inference_state) 88 self._handles = {} 89 90 def get_or_create_access_handle(self, obj): 91 id_ = id(obj) 92 try: 93 return self.get_access_handle(id_) 94 except KeyError: 95 access = DirectObjectAccess(self._inference_state_weakref(), obj) 96 handle = AccessHandle(self, access, id_) 97 self.set_access_handle(handle) 98 return handle 99 100 def get_access_handle(self, id_): 101 return self._handles[id_] 102 103 def set_access_handle(self, handle): 104 self._handles[handle.id] = handle 105 106 107class InferenceStateSameProcess(_InferenceStateProcess): 108 """ 109 Basically just an easy access to functions.py. It has the same API 110 as InferenceStateSubprocess and does the same thing without using a subprocess. 111 This is necessary for the Interpreter process. 112 """ 113 def __getattr__(self, name): 114 return partial(_get_function(name), self._inference_state_weakref()) 115 116 117class InferenceStateSubprocess(_InferenceStateProcess): 118 def __init__(self, inference_state, compiled_subprocess): 119 super().__init__(inference_state) 120 self._used = False 121 self._compiled_subprocess = compiled_subprocess 122 123 def __getattr__(self, name): 124 func = _get_function(name) 125 126 def wrapper(*args, **kwargs): 127 self._used = True 128 129 result = self._compiled_subprocess.run( 130 self._inference_state_weakref(), 131 func, 132 args=args, 133 kwargs=kwargs, 134 ) 135 # IMO it should be possible to create a hook in pickle.load to 136 # mess with the loaded objects. However it's extremely complicated 137 # to work around this so just do it with this call. ~ dave 138 return self._convert_access_handles(result) 139 140 return wrapper 141 142 def _convert_access_handles(self, obj): 143 if isinstance(obj, SignatureParam): 144 return SignatureParam(*self._convert_access_handles(tuple(obj))) 145 elif isinstance(obj, tuple): 146 return tuple(self._convert_access_handles(o) for o in obj) 147 elif isinstance(obj, list): 148 return [self._convert_access_handles(o) for o in obj] 149 elif isinstance(obj, AccessHandle): 150 try: 151 # Rewrite the access handle to one we're already having. 152 obj = self.get_access_handle(obj.id) 153 except KeyError: 154 obj.add_subprocess(self) 155 self.set_access_handle(obj) 156 elif isinstance(obj, AccessPath): 157 return AccessPath(self._convert_access_handles(obj.accesses)) 158 return obj 159 160 def __del__(self): 161 if self._used and not self._compiled_subprocess.is_crashed: 162 self._compiled_subprocess.delete_inference_state(self._inference_state_id) 163 164 165class CompiledSubprocess: 166 is_crashed = False 167 168 def __init__(self, executable, env_vars=None): 169 self._executable = executable 170 self._env_vars = env_vars 171 self._inference_state_deletion_queue = queue.deque() 172 self._cleanup_callable = lambda: None 173 174 def __repr__(self): 175 pid = os.getpid() 176 return '<%s _executable=%r, is_crashed=%r, pid=%r>' % ( 177 self.__class__.__name__, 178 self._executable, 179 self.is_crashed, 180 pid, 181 ) 182 183 @memoize_method 184 def _get_process(self): 185 debug.dbg('Start environment subprocess %s', self._executable) 186 parso_path = sys.modules['parso'].__file__ 187 args = ( 188 self._executable, 189 _MAIN_PATH, 190 os.path.dirname(os.path.dirname(parso_path)), 191 '.'.join(str(x) for x in sys.version_info[:3]), 192 ) 193 process = _GeneralizedPopen( 194 args, 195 stdin=subprocess.PIPE, 196 stdout=subprocess.PIPE, 197 stderr=subprocess.PIPE, 198 env=self._env_vars 199 ) 200 self._stderr_queue = queue.Queue() 201 self._stderr_thread = t = Thread( 202 target=_enqueue_output, 203 args=(process.stderr, self._stderr_queue) 204 ) 205 t.daemon = True 206 t.start() 207 # Ensure the subprocess is properly cleaned up when the object 208 # is garbage collected. 209 self._cleanup_callable = weakref.finalize(self, 210 _cleanup_process, 211 process, 212 t) 213 return process 214 215 def run(self, inference_state, function, args=(), kwargs={}): 216 # Delete old inference_states. 217 while True: 218 try: 219 inference_state_id = self._inference_state_deletion_queue.pop() 220 except IndexError: 221 break 222 else: 223 self._send(inference_state_id, None) 224 225 assert callable(function) 226 return self._send(id(inference_state), function, args, kwargs) 227 228 def get_sys_path(self): 229 return self._send(None, functions.get_sys_path, (), {}) 230 231 def _kill(self): 232 self.is_crashed = True 233 self._cleanup_callable() 234 235 def _send(self, inference_state_id, function, args=(), kwargs={}): 236 if self.is_crashed: 237 raise InternalError("The subprocess %s has crashed." % self._executable) 238 239 data = inference_state_id, function, args, kwargs 240 try: 241 pickle_dump(data, self._get_process().stdin, PICKLE_PROTOCOL) 242 except BrokenPipeError: 243 self._kill() 244 raise InternalError("The subprocess %s was killed. Maybe out of memory?" 245 % self._executable) 246 247 try: 248 is_exception, traceback, result = pickle_load(self._get_process().stdout) 249 except EOFError as eof_error: 250 try: 251 stderr = self._get_process().stderr.read().decode('utf-8', 'replace') 252 except Exception as exc: 253 stderr = '<empty/not available (%r)>' % exc 254 self._kill() 255 _add_stderr_to_debug(self._stderr_queue) 256 raise InternalError( 257 "The subprocess %s has crashed (%r, stderr=%s)." % ( 258 self._executable, 259 eof_error, 260 stderr, 261 )) 262 263 _add_stderr_to_debug(self._stderr_queue) 264 265 if is_exception: 266 # Replace the attribute error message with a the traceback. It's 267 # way more informative. 268 result.args = (traceback,) 269 raise result 270 return result 271 272 def delete_inference_state(self, inference_state_id): 273 """ 274 Currently we are not deleting inference_state instantly. They only get 275 deleted once the subprocess is used again. It would probably a better 276 solution to move all of this into a thread. However, the memory usage 277 of a single inference_state shouldn't be that high. 278 """ 279 # With an argument - the inference_state gets deleted. 280 self._inference_state_deletion_queue.append(inference_state_id) 281 282 283class Listener: 284 def __init__(self): 285 self._inference_states = {} 286 # TODO refactor so we don't need to process anymore just handle 287 # controlling. 288 self._process = _InferenceStateProcess(Listener) 289 290 def _get_inference_state(self, function, inference_state_id): 291 from jedi.inference import InferenceState 292 293 try: 294 inference_state = self._inference_states[inference_state_id] 295 except KeyError: 296 from jedi import InterpreterEnvironment 297 inference_state = InferenceState( 298 # The project is not actually needed. Nothing should need to 299 # access it. 300 project=None, 301 environment=InterpreterEnvironment() 302 ) 303 self._inference_states[inference_state_id] = inference_state 304 return inference_state 305 306 def _run(self, inference_state_id, function, args, kwargs): 307 if inference_state_id is None: 308 return function(*args, **kwargs) 309 elif function is None: 310 del self._inference_states[inference_state_id] 311 else: 312 inference_state = self._get_inference_state(function, inference_state_id) 313 314 # Exchange all handles 315 args = list(args) 316 for i, arg in enumerate(args): 317 if isinstance(arg, AccessHandle): 318 args[i] = inference_state.compiled_subprocess.get_access_handle(arg.id) 319 for key, value in kwargs.items(): 320 if isinstance(value, AccessHandle): 321 kwargs[key] = inference_state.compiled_subprocess.get_access_handle(value.id) 322 323 return function(inference_state, *args, **kwargs) 324 325 def listen(self): 326 stdout = sys.stdout 327 # Mute stdout. Nobody should actually be able to write to it, 328 # because stdout is used for IPC. 329 sys.stdout = open(os.devnull, 'w') 330 stdin = sys.stdin 331 stdout = stdout.buffer 332 stdin = stdin.buffer 333 334 while True: 335 try: 336 payload = pickle_load(stdin) 337 except EOFError: 338 # It looks like the parent process closed. 339 # Don't make a big fuss here and just exit. 340 exit(0) 341 try: 342 result = False, None, self._run(*payload) 343 except Exception as e: 344 result = True, traceback.format_exc(), e 345 346 pickle_dump(result, stdout, PICKLE_PROTOCOL) 347 348 349class AccessHandle: 350 def __init__(self, subprocess, access, id_): 351 self.access = access 352 self._subprocess = subprocess 353 self.id = id_ 354 355 def add_subprocess(self, subprocess): 356 self._subprocess = subprocess 357 358 def __repr__(self): 359 try: 360 detail = self.access 361 except AttributeError: 362 detail = '#' + str(self.id) 363 return '<%s of %s>' % (self.__class__.__name__, detail) 364 365 def __getstate__(self): 366 return self.id 367 368 def __setstate__(self, state): 369 self.id = state 370 371 def __getattr__(self, name): 372 if name in ('id', 'access') or name.startswith('_'): 373 raise AttributeError("Something went wrong with unpickling") 374 375 # print('getattr', name, file=sys.stderr) 376 return partial(self._workaround, name) 377 378 def _workaround(self, name, *args, **kwargs): 379 """ 380 TODO Currently we're passing slice objects around. This should not 381 happen. They are also the only unhashable objects that we're passing 382 around. 383 """ 384 if args and isinstance(args[0], slice): 385 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs) 386 return self._cached_results(name, *args, **kwargs) 387 388 @memoize_method 389 def _cached_results(self, name, *args, **kwargs): 390 return self._subprocess.get_compiled_method_return(self.id, name, *args, **kwargs) 391