1# -*- coding: utf-8 -*- 2"""Module for caching command & alias names as well as for predicting whether 3a command will be able to be run in the background. 4 5A background predictor is a function that accepts a single argument list 6and returns whether or not the process can be run in the background (returns 7True) or must be run the foreground (returns False). 8""" 9import os 10import time 11import builtins 12import argparse 13import collections.abc as cabc 14 15from xonsh.platform import ON_WINDOWS, ON_POSIX, pathbasename 16from xonsh.tools import executables_in 17from xonsh.lazyasd import lazyobject 18 19 20class CommandsCache(cabc.Mapping): 21 """A lazy cache representing the commands available on the file system. 22 The keys are the command names and the values a tuple of (loc, has_alias) 23 where loc is either a str pointing to the executable on the file system or 24 None (if no executable exists) and has_alias is a boolean flag for whether 25 the command has an alias. 26 """ 27 28 def __init__(self): 29 self._cmds_cache = {} 30 self._path_checksum = None 31 self._alias_checksum = None 32 self._path_mtime = -1 33 self.threadable_predictors = default_threadable_predictors() 34 35 def __contains__(self, key): 36 _ = self.all_commands 37 return self.lazyin(key) 38 39 def __iter__(self): 40 for cmd, (path, is_alias) in self.all_commands.items(): 41 if ON_WINDOWS and path is not None: 42 # All command keys are stored in uppercase on Windows. 43 # This ensures the original command name is returned. 44 cmd = pathbasename(path) 45 yield cmd 46 47 def __len__(self): 48 return len(self.all_commands) 49 50 def __getitem__(self, key): 51 _ = self.all_commands 52 return self.lazyget(key) 53 54 def is_empty(self): 55 """Returns whether the cache is populated or not.""" 56 return len(self._cmds_cache) == 0 57 58 @staticmethod 59 def get_possible_names(name): 60 """Generates the possible `PATHEXT` extension variants of a given executable 61 name on Windows as a list, conserving the ordering in `PATHEXT`. 62 Returns a list as `name` being the only item in it on other platforms.""" 63 if ON_WINDOWS: 64 pathext = builtins.__xonsh_env__.get("PATHEXT", []) 65 name = name.upper() 66 return [name + ext for ext in ([""] + pathext)] 67 else: 68 return [name] 69 70 @staticmethod 71 def remove_dups(p): 72 ret = list() 73 for e in p: 74 if e not in ret: 75 ret.append(e) 76 return ret 77 78 @property 79 def all_commands(self): 80 paths = builtins.__xonsh_env__.get("PATH", []) 81 paths = CommandsCache.remove_dups(paths) 82 path_immut = tuple(x for x in paths if os.path.isdir(x)) 83 # did PATH change? 84 path_hash = hash(path_immut) 85 cache_valid = path_hash == self._path_checksum 86 self._path_checksum = path_hash 87 # did aliases change? 88 alss = getattr(builtins, "aliases", dict()) 89 al_hash = hash(frozenset(alss)) 90 cache_valid = cache_valid and al_hash == self._alias_checksum 91 self._alias_checksum = al_hash 92 # did the contents of any directory in PATH change? 93 max_mtime = 0 94 for path in path_immut: 95 mtime = os.stat(path).st_mtime 96 if mtime > max_mtime: 97 max_mtime = mtime 98 cache_valid = cache_valid and (max_mtime <= self._path_mtime) 99 self._path_mtime = max_mtime 100 if cache_valid: 101 return self._cmds_cache 102 allcmds = {} 103 for path in reversed(path_immut): 104 # iterate backwards so that entries at the front of PATH overwrite 105 # entries at the back. 106 for cmd in executables_in(path): 107 key = cmd.upper() if ON_WINDOWS else cmd 108 allcmds[key] = (os.path.join(path, cmd), alss.get(key, None)) 109 for cmd in alss: 110 if cmd not in allcmds: 111 key = cmd.upper() if ON_WINDOWS else cmd 112 allcmds[key] = (cmd, True) 113 self._cmds_cache = allcmds 114 return allcmds 115 116 def cached_name(self, name): 117 """Returns the name that would appear in the cache, if it exists.""" 118 if name is None: 119 return None 120 cached = pathbasename(name) 121 if ON_WINDOWS: 122 keys = self.get_possible_names(cached) 123 cached = next((k for k in keys if k in self._cmds_cache), None) 124 return cached 125 126 def lazyin(self, key): 127 """Checks if the value is in the current cache without the potential to 128 update the cache. It just says whether the value is known *now*. This 129 may not reflect precisely what is on the $PATH. 130 """ 131 return self.cached_name(key) in self._cmds_cache 132 133 def lazyiter(self): 134 """Returns an iterator over the current cache contents without the 135 potential to update the cache. This may not reflect what is on the 136 $PATH. 137 """ 138 return iter(self._cmds_cache) 139 140 def lazylen(self): 141 """Returns the length of the current cache contents without the 142 potential to update the cache. This may not reflect precisely 143 what is on the $PATH. 144 """ 145 return len(self._cmds_cache) 146 147 def lazyget(self, key, default=None): 148 """A lazy value getter.""" 149 return self._cmds_cache.get(self.cached_name(key), default) 150 151 def locate_binary(self, name, ignore_alias=False): 152 """Locates an executable on the file system using the cache. 153 154 Arguments 155 --------- 156 name : str 157 name of binary to search for 158 ignore_alias : bool, optional 159 Force return of binary path even if alias of ``name`` exists 160 (default ``False``) 161 """ 162 # make sure the cache is up to date by accessing the property 163 _ = self.all_commands 164 return self.lazy_locate_binary(name, ignore_alias) 165 166 def lazy_locate_binary(self, name, ignore_alias=False): 167 """Locates an executable in the cache, without checking its validity. 168 169 Arguments 170 --------- 171 name : str 172 name of binary to search for 173 ignore_alias : bool, optional 174 Force return of binary path even if alias of ``name`` exists 175 (default ``False``) 176 """ 177 possibilities = self.get_possible_names(name) 178 if ON_WINDOWS: 179 # Windows users expect to be able to execute files in the same 180 # directory without `./` 181 local_bin = next((fn for fn in possibilities if os.path.isfile(fn)), None) 182 if local_bin: 183 return os.path.abspath(local_bin) 184 cached = next((cmd for cmd in possibilities if cmd in self._cmds_cache), None) 185 if cached: 186 (path, alias) = self._cmds_cache[cached] 187 ispure = path == pathbasename(path) 188 if alias and ignore_alias and ispure: 189 # pure alias, which we are ignoring 190 return None 191 else: 192 return path 193 elif os.path.isfile(name) and name != pathbasename(name): 194 return name 195 196 def is_only_functional_alias(self, name): 197 """Returns whether or not a command is only a functional alias, and has 198 no underlying executable. For example, the "cd" command is only available 199 as a functional alias. 200 """ 201 _ = self.all_commands 202 return self.lazy_is_only_functional_alias(name) 203 204 def lazy_is_only_functional_alias(self, name): 205 """Returns whether or not a command is only a functional alias, and has 206 no underlying executable. For example, the "cd" command is only available 207 as a functional alias. This search is performed lazily. 208 """ 209 val = self._cmds_cache.get(name, None) 210 if val is None: 211 return False 212 return ( 213 val == (name, True) and self.locate_binary(name, ignore_alias=True) is None 214 ) 215 216 def predict_threadable(self, cmd): 217 """Predicts whether a command list is able to be run on a background 218 thread, rather than the main thread. 219 """ 220 name = self.cached_name(cmd[0]) 221 predictors = self.threadable_predictors 222 if ON_WINDOWS: 223 # On all names (keys) are stored in upper case so instead 224 # we get the original cmd or alias name 225 path, _ = self.lazyget(name, (None, None)) 226 if path is None: 227 return True 228 else: 229 name = pathbasename(path) 230 if name not in predictors: 231 pre, ext = os.path.splitext(name) 232 if pre in predictors: 233 predictors[name] = predictors[pre] 234 if name not in predictors: 235 predictors[name] = self.default_predictor(name, cmd[0]) 236 predictor = predictors[name] 237 return predictor(cmd[1:]) 238 239 # 240 # Background Predictors (as methods) 241 # 242 243 def default_predictor(self, name, cmd0): 244 if ON_POSIX: 245 return self.default_predictor_readbin( 246 name, cmd0, timeout=0.1, failure=predict_true 247 ) 248 else: 249 return predict_true 250 251 def default_predictor_readbin(self, name, cmd0, timeout, failure): 252 """Make a default predictor by 253 analyzing the content of the binary. Should only works on POSIX. 254 Return failure if the analysis fails. 255 """ 256 fname = cmd0 if os.path.isabs(cmd0) else None 257 fname = cmd0 if fname is None and os.sep in cmd0 else fname 258 fname = self.lazy_locate_binary(name) if fname is None else fname 259 260 if fname is None: 261 return failure 262 if not os.path.isfile(fname): 263 return failure 264 265 try: 266 fd = os.open(fname, os.O_RDONLY | os.O_NONBLOCK) 267 except Exception: 268 return failure # opening error 269 270 search_for = { 271 (b"ncurses",): [False], 272 (b"libgpm",): [False], 273 (b"isatty", b"tcgetattr", b"tcsetattr"): [False, False, False], 274 } 275 tstart = time.time() 276 block = b"" 277 while time.time() < tstart + timeout: 278 previous_block = block 279 try: 280 block = os.read(fd, 2048) 281 except Exception: 282 # should not occur, except e.g. if a file is deleted a a dir is 283 # created with the same name between os.path.isfile and os.open 284 os.close(fd) 285 return failure 286 if len(block) == 0: 287 os.close(fd) 288 return predict_true # no keys of search_for found 289 analyzed_block = previous_block + block 290 for k, v in search_for.items(): 291 for i in range(len(k)): 292 if v[i]: 293 continue 294 if k[i] in analyzed_block: 295 v[i] = True 296 if all(v): 297 os.close(fd) 298 return predict_false # use one key of search_for 299 os.close(fd) 300 return failure # timeout 301 302 303# 304# Background Predictors 305# 306 307 308def predict_true(args): 309 """Always say the process is threadable.""" 310 return True 311 312 313def predict_false(args): 314 """Never say the process is threadable.""" 315 return False 316 317 318@lazyobject 319def SHELL_PREDICTOR_PARSER(): 320 p = argparse.ArgumentParser("shell", add_help=False) 321 p.add_argument("-c", nargs="?", default=None) 322 p.add_argument("filename", nargs="?", default=None) 323 return p 324 325 326def predict_shell(args): 327 """Predict the backgroundability of the normal shell interface, which 328 comes down to whether it is being run in subproc mode. 329 """ 330 ns, _ = SHELL_PREDICTOR_PARSER.parse_known_args(args) 331 if ns.c is None and ns.filename is None: 332 pred = False 333 else: 334 pred = True 335 return pred 336 337 338@lazyobject 339def HELP_VER_PREDICTOR_PARSER(): 340 p = argparse.ArgumentParser("cmd", add_help=False) 341 p.add_argument("-h", "--help", dest="help", action="store_true", default=None) 342 p.add_argument( 343 "-v", "-V", "--version", dest="version", action="store_true", default=None 344 ) 345 return p 346 347 348def predict_help_ver(args): 349 """Predict the backgroundability of commands that have help & version 350 switches: -h, --help, -v, -V, --version. If either of these options is 351 present, the command is assumed to print to stdout normally and is therefore 352 threadable. Otherwise, the command is assumed to not be threadable. 353 This is useful for commands, like top, that normally enter alternate mode 354 but may not in certain circumstances. 355 """ 356 ns, _ = HELP_VER_PREDICTOR_PARSER.parse_known_args(args) 357 pred = ns.help is not None or ns.version is not None 358 return pred 359 360 361@lazyobject 362def HG_PREDICTOR_PARSER(): 363 p = argparse.ArgumentParser("hg", add_help=False) 364 p.add_argument("command") 365 p.add_argument( 366 "-i", "--interactive", action="store_true", default=False, dest="interactive" 367 ) 368 return p 369 370 371def predict_hg(args): 372 """Predict if mercurial is about to be run in interactive mode. 373 If it is interactive, predict False. If it isn't, predict True. 374 """ 375 ns, _ = HG_PREDICTOR_PARSER.parse_known_args(args) 376 return not ns.interactive 377 378 379def default_threadable_predictors(): 380 """Generates a new defaultdict for known threadable predictors. 381 The default is to predict true. 382 """ 383 # alphabetical, for what it is worth. 384 predictors = { 385 "bash": predict_shell, 386 "csh": predict_shell, 387 "clear": predict_false, 388 "cls": predict_false, 389 "cmd": predict_shell, 390 "curl": predict_true, 391 "ex": predict_false, 392 "emacsclient": predict_false, 393 "fish": predict_shell, 394 "gvim": predict_help_ver, 395 "hg": predict_hg, 396 "htop": predict_help_ver, 397 "ipython": predict_shell, 398 "ksh": predict_shell, 399 "less": predict_help_ver, 400 "man": predict_help_ver, 401 "more": predict_help_ver, 402 "mvim": predict_help_ver, 403 "mutt": predict_help_ver, 404 "nano": predict_help_ver, 405 "nvim": predict_false, 406 "ponysay": predict_help_ver, 407 "psql": predict_false, 408 "python": predict_shell, 409 "python2": predict_shell, 410 "python3": predict_shell, 411 "repo": predict_help_ver, 412 "ranger": predict_help_ver, 413 "rview": predict_false, 414 "rvim": predict_false, 415 "scp": predict_false, 416 "sh": predict_shell, 417 "ssh": predict_false, 418 "startx": predict_false, 419 "sudo": predict_help_ver, 420 "tcsh": predict_shell, 421 "telnet": predict_false, 422 "top": predict_help_ver, 423 "vi": predict_false, 424 "view": predict_false, 425 "vim": predict_false, 426 "vimpager": predict_help_ver, 427 "weechat": predict_help_ver, 428 "xclip": predict_help_ver, 429 "xo": predict_help_ver, 430 "xonsh": predict_shell, 431 "xon.sh": predict_shell, 432 "zsh": predict_shell, 433 } 434 return predictors 435