1# -*- coding: utf-8 -*- 2"""The readline based xonsh shell. 3 4Portions of this code related to initializing the readline library 5are included from the IPython project. The IPython project is: 6 7* Copyright (c) 2008-2014, IPython Development Team 8* Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu> 9* Copyright (c) 2001, Janko Hauser <jhauser@zscout.de> 10* Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu> 11 12""" 13import os 14import sys 15import cmd 16import select 17import shutil 18import builtins 19import importlib 20import threading 21import subprocess 22import collections 23 24from xonsh.lazyasd import LazyObject, lazyobject 25from xonsh.base_shell import BaseShell 26from xonsh.ansi_colors import ( 27 ansi_partial_color_format, 28 ansi_color_style_names, 29 ansi_color_style, 30) 31from xonsh.prompt.base import multiline_prompt 32from xonsh.tools import ( 33 print_exception, 34 check_for_partial_string, 35 to_bool, 36 columnize, 37 carriage_return, 38) 39from xonsh.platform import ( 40 ON_WINDOWS, 41 ON_CYGWIN, 42 ON_MSYS, 43 ON_DARWIN, 44 ON_POSIX, 45 os_environ, 46) 47from xonsh.lazyimps import pygments, pyghooks, winutils 48from xonsh.events import events 49 50readline = None 51RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = RL_STATE = None 52RL_COMPLETION_QUERY_ITEMS = None 53RL_CAN_RESIZE = False 54RL_DONE = None 55RL_VARIABLE_VALUE = None 56_RL_STATE_DONE = 0x1000000 57_RL_STATE_ISEARCH = 0x0000080 58 59_RL_PREV_CASE_SENSITIVE_COMPLETIONS = "to-be-set" 60 61 62def setup_readline(): 63 """Sets up the readline module and completion suppression, if available.""" 64 global RL_COMPLETION_SUPPRESS_APPEND, RL_LIB, RL_CAN_RESIZE, RL_STATE, readline, RL_COMPLETION_QUERY_ITEMS 65 if RL_COMPLETION_SUPPRESS_APPEND is not None: 66 return 67 for _rlmod_name in ("gnureadline", "readline"): 68 try: 69 readline = importlib.import_module(_rlmod_name) 70 sys.modules["readline"] = readline 71 except ImportError: 72 pass 73 else: 74 break 75 76 if readline is None: 77 print( 78 """Skipping setup. Because no `readline` implementation available. 79 Please install a backend (`readline`, `prompt-toolkit`, etc) to use 80 `xonsh` interactively. 81 See https://github.com/xonsh/xonsh/issues/1170""" 82 ) 83 return 84 85 import ctypes 86 import ctypes.util 87 88 uses_libedit = readline.__doc__ and "libedit" in readline.__doc__ 89 readline.set_completer_delims(" \t\n") 90 # Cygwin seems to hang indefinitely when querying the readline lib 91 if (not ON_CYGWIN) and (not ON_MSYS) and (not readline.__file__.endswith(".py")): 92 RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__) 93 try: 94 RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll( 95 lib, "rl_completion_suppress_append" 96 ) 97 except ValueError: 98 # not all versions of readline have this symbol, ie Macs sometimes 99 RL_COMPLETION_SUPPRESS_APPEND = None 100 try: 101 RL_COMPLETION_QUERY_ITEMS = ctypes.c_int.in_dll( 102 lib, "rl_completion_query_items" 103 ) 104 except ValueError: 105 # not all versions of readline have this symbol, ie Macs sometimes 106 RL_COMPLETION_QUERY_ITEMS = None 107 try: 108 RL_STATE = ctypes.c_int.in_dll(lib, "rl_readline_state") 109 except Exception: 110 pass 111 RL_CAN_RESIZE = hasattr(lib, "rl_reset_screen_size") 112 env = builtins.__xonsh_env__ 113 # reads in history 114 readline.set_history_length(-1) 115 ReadlineHistoryAdder() 116 # sets up IPython-like history matching with up and down 117 readline.parse_and_bind('"\e[B": history-search-forward') 118 readline.parse_and_bind('"\e[A": history-search-backward') 119 # Setup Shift-Tab to indent 120 readline.parse_and_bind('"\e[Z": "{0}"'.format(env.get("INDENT"))) 121 122 # handle tab completion differences found in libedit readline compatibility 123 # as discussed at http://stackoverflow.com/a/7116997 124 if uses_libedit and ON_DARWIN: 125 readline.parse_and_bind("bind ^I rl_complete") 126 print( 127 "\n".join( 128 [ 129 "", 130 "*" * 78, 131 "libedit detected - readline will not be well behaved, including but not limited to:", 132 " * crashes on tab completion", 133 " * incorrect history navigation", 134 " * corrupting long-lines", 135 " * failure to wrap or indent lines properly", 136 "", 137 "It is highly recommended that you install gnureadline, which is installable with:", 138 " xpip install gnureadline", 139 "*" * 78, 140 ] 141 ), 142 file=sys.stderr, 143 ) 144 else: 145 readline.parse_and_bind("tab: complete") 146 # try to load custom user settings 147 inputrc_name = os_environ.get("INPUTRC") 148 if inputrc_name is None: 149 if uses_libedit: 150 inputrc_name = ".editrc" 151 else: 152 inputrc_name = ".inputrc" 153 inputrc_name = os.path.join(os.path.expanduser("~"), inputrc_name) 154 if (not ON_WINDOWS) and (not os.path.isfile(inputrc_name)): 155 inputrc_name = "/etc/inputrc" 156 if ON_WINDOWS: 157 winutils.enable_virtual_terminal_processing() 158 if os.path.isfile(inputrc_name): 159 try: 160 readline.read_init_file(inputrc_name) 161 except Exception: 162 # this seems to fail with libedit 163 print_exception("xonsh: could not load readline default init file.") 164 # properly reset input typed before the first prompt 165 readline.set_startup_hook(carriage_return) 166 167 168def teardown_readline(): 169 """Tears down up the readline module, if available.""" 170 try: 171 import readline 172 except (ImportError, TypeError): 173 return 174 175 176def _rebind_case_sensitive_completions(): 177 # handle case sensitive, see Github issue #1342 for details 178 global _RL_PREV_CASE_SENSITIVE_COMPLETIONS 179 env = builtins.__xonsh_env__ 180 case_sensitive = env.get("CASE_SENSITIVE_COMPLETIONS") 181 if case_sensitive is _RL_PREV_CASE_SENSITIVE_COMPLETIONS: 182 return 183 if case_sensitive: 184 readline.parse_and_bind("set completion-ignore-case off") 185 else: 186 readline.parse_and_bind("set completion-ignore-case on") 187 _RL_PREV_CASE_SENSITIVE_COMPLETIONS = case_sensitive 188 189 190def fix_readline_state_after_ctrl_c(): 191 """ 192 Fix to allow Ctrl-C to exit reverse-i-search. 193 194 Based on code from: 195 http://bugs.python.org/file39467/raw_input__workaround_demo.py 196 """ 197 if ON_WINDOWS: 198 # hack to make pyreadline mimic the desired behavior 199 try: 200 _q = readline.rl.mode.process_keyevent_queue 201 if len(_q) > 1: 202 _q.pop() 203 except Exception: 204 pass 205 if RL_STATE is None: 206 return 207 if RL_STATE.value & _RL_STATE_ISEARCH: 208 RL_STATE.value &= ~_RL_STATE_ISEARCH 209 if not RL_STATE.value & _RL_STATE_DONE: 210 RL_STATE.value |= _RL_STATE_DONE 211 212 213def rl_completion_suppress_append(val=1): 214 """Sets the rl_completion_suppress_append variable, if possible. 215 A value of 1 (default) means to suppress, a value of 0 means to enable. 216 """ 217 if RL_COMPLETION_SUPPRESS_APPEND is None: 218 return 219 RL_COMPLETION_SUPPRESS_APPEND.value = val 220 221 222def rl_completion_query_items(val=None): 223 """Sets the rl_completion_query_items variable, if possible. 224 A None value will set this to $COMPLETION_QUERY_LIMIT, otherwise any integer 225 is accepted. 226 """ 227 if RL_COMPLETION_QUERY_ITEMS is None: 228 return 229 if val is None: 230 val = builtins.__xonsh_env__.get("COMPLETION_QUERY_LIMIT") 231 RL_COMPLETION_QUERY_ITEMS.value = val 232 233 234def rl_variable_dumper(readable=True): 235 """Dumps the currently set readline variables. If readable is True, then this 236 output may be used in an inputrc file. 237 """ 238 RL_LIB.rl_variable_dumper(int(readable)) 239 240 241def rl_variable_value(variable): 242 """Returns the currently set value for a readline configuration variable.""" 243 global RL_VARIABLE_VALUE 244 if RL_VARIABLE_VALUE is None: 245 import ctypes 246 247 RL_VARIABLE_VALUE = RL_LIB.rl_variable_value 248 RL_VARIABLE_VALUE.restype = ctypes.c_char_p 249 env = builtins.__xonsh_env__ 250 enc, errors = env.get("XONSH_ENCODING"), env.get("XONSH_ENCODING_ERRORS") 251 if isinstance(variable, str): 252 variable = variable.encode(encoding=enc, errors=errors) 253 rtn = RL_VARIABLE_VALUE(variable) 254 return rtn.decode(encoding=enc, errors=errors) 255 256 257@lazyobject 258def rl_on_new_line(): 259 """Grabs one of a few possible redisplay functions in readline.""" 260 names = ["rl_on_new_line", "rl_forced_update_display", "rl_redisplay"] 261 for name in names: 262 func = getattr(RL_LIB, name, None) 263 if func is not None: 264 break 265 else: 266 267 def print_for_newline(): 268 print() 269 270 func = print_for_newline 271 return func 272 273 274def _insert_text_func(s, readline): 275 """Creates a function to insert text via readline.""" 276 277 def inserter(): 278 readline.insert_text(s) 279 readline.redisplay() 280 281 return inserter 282 283 284DEDENT_TOKENS = LazyObject( 285 lambda: frozenset(["raise", "return", "pass", "break", "continue"]), 286 globals(), 287 "DEDENT_TOKENS", 288) 289 290 291class ReadlineShell(BaseShell, cmd.Cmd): 292 """The readline based xonsh shell.""" 293 294 def __init__(self, completekey="tab", stdin=None, stdout=None, **kwargs): 295 super().__init__(completekey=completekey, stdin=stdin, stdout=stdout, **kwargs) 296 setup_readline() 297 self._current_indent = "" 298 self._current_prompt = "" 299 self._force_hide = None 300 self._complete_only_last_table = { 301 # Truth table for completions, keys are: 302 # (prefix_begs_quote, prefix_ends_quote, i_ends_quote, 303 # last_starts_with_prefix, i_has_space) 304 (True, True, True, True, True): True, 305 (True, True, True, True, False): True, 306 (True, True, True, False, True): False, 307 (True, True, True, False, False): True, 308 (True, True, False, True, True): False, 309 (True, True, False, True, False): False, 310 (True, True, False, False, True): False, 311 (True, True, False, False, False): False, 312 (True, False, True, True, True): True, 313 (True, False, True, True, False): False, 314 (True, False, True, False, True): False, 315 (True, False, True, False, False): True, 316 (True, False, False, True, True): False, 317 (True, False, False, True, False): False, 318 (True, False, False, False, True): False, 319 (True, False, False, False, False): False, 320 (False, True, True, True, True): True, 321 (False, True, True, True, False): True, 322 (False, True, True, False, True): True, 323 (False, True, True, False, False): True, 324 (False, True, False, True, True): False, 325 (False, True, False, True, False): False, 326 (False, True, False, False, True): False, 327 (False, True, False, False, False): False, 328 (False, False, True, True, True): False, 329 (False, False, True, True, False): False, 330 (False, False, True, False, True): False, 331 (False, False, True, False, False): True, 332 (False, False, False, True, True): True, 333 (False, False, False, True, False): False, 334 (False, False, False, False, True): False, 335 (False, False, False, False, False): False, 336 } 337 self.cmdqueue = collections.deque() 338 339 def __del__(self): 340 teardown_readline() 341 342 def singleline(self, store_in_history=True, **kwargs): 343 """Reads a single line of input. The store_in_history kwarg 344 flags whether the input should be stored in readline's in-memory 345 history. 346 """ 347 if not store_in_history: # store current position to remove it later 348 try: 349 import readline 350 except ImportError: 351 store_in_history = True 352 pos = readline.get_current_history_length() - 1 353 events.on_pre_prompt.fire() 354 rtn = input(self.prompt) 355 events.on_post_prompt.fire() 356 if not store_in_history and pos >= 0: 357 readline.remove_history_item(pos) 358 return rtn 359 360 def parseline(self, line): 361 """Overridden to no-op.""" 362 return "", line, line 363 364 def _querycompletions(self, completions, loc): 365 """Returns whether or not we should show completions. 0 means that prefixes 366 should not be shown, 1 means that there is a common prefix among all completions 367 and they should be shown, while 2 means that there is no common prefix but 368 we are under the query limit and they should be shown. 369 """ 370 if os.path.commonprefix([c[loc:] for c in completions]): 371 return 1 372 elif len(completions) <= builtins.__xonsh_env__.get("COMPLETION_QUERY_LIMIT"): 373 return 2 374 msg = "\nDisplay all {} possibilities? ".format(len(completions)) 375 msg += "({GREEN}y{NO_COLOR} or {RED}n{NO_COLOR})" 376 self.print_color(msg, end="", flush=True, file=sys.stderr) 377 yn = "x" 378 while yn not in "yn": 379 yn = sys.stdin.read(1) 380 show_completions = to_bool(yn) 381 print() 382 if not show_completions: 383 rl_on_new_line() 384 return 0 385 w, h = shutil.get_terminal_size() 386 lines = columnize(completions, width=w) 387 more_msg = self.format_color( 388 "{YELLOW}==={NO_COLOR} more or " 389 "{PURPLE}({NO_COLOR}q{PURPLE}){NO_COLOR}uit " 390 "{YELLOW}==={NO_COLOR}" 391 ) 392 while len(lines) > h - 1: 393 print("".join(lines[: h - 1]), end="", flush=True, file=sys.stderr) 394 lines = lines[h - 1 :] 395 print(more_msg, end="", flush=True, file=sys.stderr) 396 q = sys.stdin.read(1).lower() 397 print(flush=True, file=sys.stderr) 398 if q == "q": 399 rl_on_new_line() 400 return 0 401 print("".join(lines), end="", flush=True, file=sys.stderr) 402 rl_on_new_line() 403 return 0 404 405 def completedefault(self, prefix, line, begidx, endidx): 406 """Implements tab-completion for text.""" 407 if self.completer is None: 408 return [] 409 rl_completion_suppress_append() # this needs to be called each time 410 _rebind_case_sensitive_completions() 411 rl_completion_query_items(val=999999999) 412 completions, l = self.completer.complete( 413 prefix, line, begidx, endidx, ctx=self.ctx 414 ) 415 chopped = prefix[:-l] 416 if chopped: 417 rtn_completions = [chopped + i for i in completions] 418 else: 419 rtn_completions = completions 420 rtn = [] 421 prefix_begs_quote = prefix.startswith("'") or prefix.startswith('"') 422 prefix_ends_quote = prefix.endswith("'") or prefix.endswith('"') 423 for i in rtn_completions: 424 i_ends_quote = i.endswith("'") or i.endswith('"') 425 last = i.rsplit(" ", 1)[-1] 426 last_starts_prefix = last.startswith(prefix) 427 i_has_space = " " in i 428 key = ( 429 prefix_begs_quote, 430 prefix_ends_quote, 431 i_ends_quote, 432 last_starts_prefix, 433 i_has_space, 434 ) 435 rtn.append(last if self._complete_only_last_table[key] else i) 436 # return based on show completions 437 show_completions = self._querycompletions(completions, endidx - begidx) 438 if show_completions == 0: 439 return [] 440 elif show_completions == 1: 441 return rtn 442 elif show_completions == 2: 443 return completions 444 else: 445 raise ValueError("query completions flag not understood.") 446 447 # tab complete on first index too 448 completenames = completedefault 449 450 def _load_remaining_input_into_queue(self): 451 buf = b"" 452 while True: 453 r, w, x = select.select([self.stdin], [], [], 1e-6) 454 if len(r) == 0: 455 break 456 buf += os.read(self.stdin.fileno(), 1024) 457 if len(buf) > 0: 458 buf = buf.decode().replace("\r\n", "\n").replace("\r", "\n") 459 self.cmdqueue.extend(buf.splitlines(keepends=True)) 460 461 def postcmd(self, stop, line): 462 """Called just before execution of line. For readline, this handles the 463 automatic indentation of code blocks. 464 """ 465 try: 466 import readline 467 except ImportError: 468 return stop 469 if self.need_more_lines: 470 if len(line.strip()) == 0: 471 readline.set_pre_input_hook(None) 472 self._current_indent = "" 473 elif line.rstrip()[-1] == ":": 474 ind = line[: len(line) - len(line.lstrip())] 475 ind += builtins.__xonsh_env__.get("INDENT") 476 readline.set_pre_input_hook(_insert_text_func(ind, readline)) 477 self._current_indent = ind 478 elif line.split(maxsplit=1)[0] in DEDENT_TOKENS: 479 env = builtins.__xonsh_env__ 480 ind = self._current_indent[: -len(env.get("INDENT"))] 481 readline.set_pre_input_hook(_insert_text_func(ind, readline)) 482 self._current_indent = ind 483 else: 484 ind = line[: len(line) - len(line.lstrip())] 485 if ind != self._current_indent: 486 insert_func = _insert_text_func(ind, readline) 487 readline.set_pre_input_hook(insert_func) 488 self._current_indent = ind 489 else: 490 readline.set_pre_input_hook(None) 491 return stop 492 493 def _cmdloop(self, intro=None): 494 """Repeatedly issue a prompt, accept input, parse an initial prefix 495 off the received input, and dispatch to action methods, passing them 496 the remainder of the line as argument. 497 498 This was forked from Lib/cmd.py from the Python standard library v3.4.3, 499 (C) Python Software Foundation, 2015. 500 """ 501 self.preloop() 502 if self.use_rawinput and self.completekey: 503 try: 504 import readline 505 506 self.old_completer = readline.get_completer() 507 readline.set_completer(self.complete) 508 readline.parse_and_bind(self.completekey + ": complete") 509 have_readline = True 510 except ImportError: 511 have_readline = False 512 try: 513 if intro is not None: 514 self.intro = intro 515 if self.intro: 516 self.stdout.write(str(self.intro) + "\n") 517 stop = None 518 while not stop: 519 line = None 520 exec_now = False 521 if len(self.cmdqueue) > 0: 522 line = self.cmdqueue.popleft() 523 exec_now = line.endswith("\n") 524 if self.use_rawinput and not exec_now: 525 inserter = ( 526 None if line is None else _insert_text_func(line, readline) 527 ) 528 if inserter is not None: 529 readline.set_pre_input_hook(inserter) 530 try: 531 line = self.singleline() 532 except EOFError: 533 if builtins.__xonsh_env__.get("IGNOREEOF"): 534 self.stdout.write('Use "exit" to leave the shell.' "\n") 535 line = "" 536 else: 537 line = "EOF" 538 if inserter is not None: 539 readline.set_pre_input_hook(None) 540 else: 541 self.print_color(self.prompt, file=self.stdout) 542 if line is not None: 543 os.write(self.stdin.fileno(), line.encode()) 544 if not exec_now: 545 line = self.stdin.readline() 546 if len(line) == 0: 547 line = "EOF" 548 else: 549 line = line.rstrip("\r\n") 550 if have_readline and line != "EOF": 551 readline.add_history(line) 552 if not ON_WINDOWS: 553 # select() is not fully functional on windows 554 self._load_remaining_input_into_queue() 555 line = self.precmd(line) 556 stop = self.onecmd(line) 557 stop = self.postcmd(stop, line) 558 if ON_WINDOWS: 559 winutils.enable_virtual_terminal_processing() 560 self.postloop() 561 finally: 562 if self.use_rawinput and self.completekey: 563 try: 564 import readline 565 566 readline.set_completer(self.old_completer) 567 except ImportError: 568 pass 569 570 def cmdloop(self, intro=None): 571 while not builtins.__xonsh_exit__: 572 try: 573 self._cmdloop(intro=intro) 574 except (KeyboardInterrupt, SystemExit): 575 print() # Gives a newline 576 fix_readline_state_after_ctrl_c() 577 self.reset_buffer() 578 intro = None 579 580 @property 581 def prompt(self): 582 """Obtains the current prompt string.""" 583 global RL_LIB, RL_CAN_RESIZE 584 if RL_CAN_RESIZE: 585 # This is needed to support some system where line-wrapping doesn't 586 # work. This is a bug in upstream Python, or possibly readline. 587 RL_LIB.rl_reset_screen_size() 588 if self.need_more_lines: 589 if self.mlprompt is None: 590 try: 591 self.mlprompt = multiline_prompt(curr=self._current_prompt) 592 except Exception: # pylint: disable=broad-except 593 print_exception() 594 self.mlprompt = "<multiline prompt error> " 595 return self.mlprompt 596 env = builtins.__xonsh_env__ # pylint: disable=no-member 597 p = env.get("PROMPT") 598 try: 599 p = self.prompt_formatter(p) 600 except Exception: # pylint: disable=broad-except 601 print_exception() 602 hide = True if self._force_hide is None else self._force_hide 603 p = ansi_partial_color_format(p, style=env.get("XONSH_COLOR_STYLE"), hide=hide) 604 self._current_prompt = p 605 self.settitle() 606 return p 607 608 def format_color(self, string, hide=False, force_string=False, **kwargs): 609 """Readline implementation of color formatting. This uses ANSI color 610 codes. 611 """ 612 hide = hide if self._force_hide is None else self._force_hide 613 style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE") 614 return ansi_partial_color_format(string, hide=hide, style=style) 615 616 def print_color(self, string, hide=False, **kwargs): 617 if isinstance(string, str): 618 s = self.format_color(string, hide=hide) 619 else: 620 # assume this is a list of (Token, str) tuples and format it 621 env = builtins.__xonsh_env__ 622 self.styler.style_name = env.get("XONSH_COLOR_STYLE") 623 style_proxy = pyghooks.xonsh_style_proxy(self.styler) 624 formatter = pyghooks.XonshTerminal256Formatter(style=style_proxy) 625 s = pygments.format(string, formatter).rstrip() 626 print(s, **kwargs) 627 628 def color_style_names(self): 629 """Returns an iterable of all available style names.""" 630 return ansi_color_style_names() 631 632 def color_style(self): 633 """Returns the current color map.""" 634 style = style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE") 635 return ansi_color_style(style=style) 636 637 def restore_tty_sanity(self): 638 """An interface for resetting the TTY stdin mode. This is highly 639 dependent on the shell backend. Also it is mostly optional since 640 it only affects ^Z backgrounding behaviour. 641 """ 642 if not ON_POSIX: 643 return 644 stty, _ = builtins.__xonsh_commands_cache__.lazyget("stty", None) 645 if stty is None: 646 return 647 # If available, we should just call the stty utility. This call should 648 # not throw even if stty fails. It should also be noted that subprocess 649 # calls, like the following, seem to be ineffective: 650 # subprocess.call([stty, 'sane'], shell=True) 651 # My guess is that this is because Popen does some crazy redirecting 652 # under the covers. This effectively hides the true TTY stdin handle 653 # from stty. To get around this we have to use the lower level 654 # os.system() function. 655 os.system(stty + " sane") 656 657 658class ReadlineHistoryAdder(threading.Thread): 659 def __init__(self, wait_for_gc=True, *args, **kwargs): 660 """Thread responsible for adding inputs from history to the 661 current readline instance. May wait for the history garbage 662 collector to finish. 663 """ 664 super(ReadlineHistoryAdder, self).__init__(*args, **kwargs) 665 self.daemon = True 666 self.wait_for_gc = wait_for_gc 667 self.start() 668 669 def run(self): 670 try: 671 import readline 672 except ImportError: 673 return 674 hist = builtins.__xonsh_history__ 675 if hist is None: 676 return 677 i = 1 678 for h in hist.all_items(): 679 line = h["inp"].rstrip() 680 if i == 1: 681 pass 682 elif line == readline.get_history_item(i - 1): 683 continue 684 readline.add_history(line) 685 if RL_LIB is not None: 686 RL_LIB.history_set_pos(i) 687 i += 1 688