1""" 2 3We acquire the python information by running an interrogation script via subprocess trigger. This operation is not 4cheap, especially not on Windows. To not have to pay this hefty cost every time we apply multiple levels of 5caching. 6""" 7from __future__ import absolute_import, unicode_literals 8 9import logging 10import os 11import pipes 12import sys 13from collections import OrderedDict 14 15from virtualenv.app_data import AppDataDisabled 16from virtualenv.discovery.py_info import PythonInfo 17from virtualenv.info import PY2 18from virtualenv.util.path import Path 19from virtualenv.util.six import ensure_text 20from virtualenv.util.subprocess import Popen, subprocess 21 22_CACHE = OrderedDict() 23_CACHE[Path(sys.executable)] = PythonInfo() 24 25 26def from_exe(cls, app_data, exe, raise_on_error=True, ignore_cache=False): 27 """""" 28 result = _get_from_cache(cls, app_data, exe, ignore_cache=ignore_cache) 29 if isinstance(result, Exception): 30 if raise_on_error: 31 raise result 32 else: 33 logging.info("%s", str(result)) 34 result = None 35 return result 36 37 38def _get_from_cache(cls, app_data, exe, ignore_cache=True): 39 # note here we cannot resolve symlinks, as the symlink may trigger different prefix information if there's a 40 # pyenv.cfg somewhere alongside on python3.4+ 41 exe_path = Path(exe) 42 if not ignore_cache and exe_path in _CACHE: # check in the in-memory cache 43 result = _CACHE[exe_path] 44 else: # otherwise go through the app data cache 45 py_info = _get_via_file_cache(cls, app_data, exe_path, exe) 46 result = _CACHE[exe_path] = py_info 47 # independent if it was from the file or in-memory cache fix the original executable location 48 if isinstance(result, PythonInfo): 49 result.executable = exe 50 return result 51 52 53def _get_via_file_cache(cls, app_data, path, exe): 54 path_text = ensure_text(str(path)) 55 try: 56 path_modified = path.stat().st_mtime 57 except OSError: 58 path_modified = -1 59 if app_data is None: 60 app_data = AppDataDisabled() 61 py_info, py_info_store = None, app_data.py_info(path) 62 with py_info_store.locked(): 63 if py_info_store.exists(): # if exists and matches load 64 data = py_info_store.read() 65 of_path, of_st_mtime, of_content = data["path"], data["st_mtime"], data["content"] 66 if of_path == path_text and of_st_mtime == path_modified: 67 py_info = cls._from_dict({k: v for k, v in of_content.items()}) 68 else: 69 py_info_store.remove() 70 if py_info is None: # if not loaded run and save 71 failure, py_info = _run_subprocess(cls, exe, app_data) 72 if failure is None: 73 data = {"st_mtime": path_modified, "path": path_text, "content": py_info._to_dict()} 74 py_info_store.write(data) 75 else: 76 py_info = failure 77 return py_info 78 79 80def _run_subprocess(cls, exe, app_data): 81 py_info_script = Path(os.path.abspath(__file__)).parent / "py_info.py" 82 with app_data.ensure_extracted(py_info_script) as py_info_script: 83 cmd = [exe, str(py_info_script)] 84 # prevent sys.prefix from leaking into the child process - see https://bugs.python.org/issue22490 85 env = os.environ.copy() 86 env.pop("__PYVENV_LAUNCHER__", None) 87 logging.debug("get interpreter info via cmd: %s", LogCmd(cmd)) 88 try: 89 process = Popen( 90 cmd, 91 universal_newlines=True, 92 stdin=subprocess.PIPE, 93 stderr=subprocess.PIPE, 94 stdout=subprocess.PIPE, 95 env=env, 96 ) 97 out, err = process.communicate() 98 code = process.returncode 99 except OSError as os_error: 100 out, err, code = "", os_error.strerror, os_error.errno 101 result, failure = None, None 102 if code == 0: 103 result = cls._from_json(out) 104 result.executable = exe # keep original executable as this may contain initialization code 105 else: 106 msg = "failed to query {} with code {}{}{}".format( 107 exe, code, " out: {!r}".format(out) if out else "", " err: {!r}".format(err) if err else "", 108 ) 109 failure = RuntimeError(msg) 110 return failure, result 111 112 113class LogCmd(object): 114 def __init__(self, cmd, env=None): 115 self.cmd = cmd 116 self.env = env 117 118 def __repr__(self): 119 def e(v): 120 return v.decode("utf-8") if isinstance(v, bytes) else v 121 122 cmd_repr = e(" ").join(pipes.quote(e(c)) for c in self.cmd) 123 if self.env is not None: 124 cmd_repr += e(" env of {!r}").format(self.env) 125 if PY2: 126 return cmd_repr.encode("utf-8") 127 return cmd_repr 128 129 def __unicode__(self): 130 raw = repr(self) 131 if PY2: 132 return raw.decode("utf-8") 133 return raw 134 135 136def clear(app_data): 137 app_data.py_info_clear() 138 _CACHE.clear() 139 140 141___all___ = ( 142 "from_exe", 143 "clear", 144 "LogCmd", 145) 146