1# -*- coding: utf-8 -*-
2"""The readline based xonsh shell.
3
4Portions of this code related to initializing the readline library
5are included from the IPython project.  The IPython project is:
6
7* Copyright (c) 2008-2014, IPython Development Team
8* Copyright (c) 2001-2007, Fernando Perez <fernando.perez@colorado.edu>
9* Copyright (c) 2001, Janko Hauser <jhauser@zscout.de>
10* Copyright (c) 2001, Nathaniel Gray <n8gray@caltech.edu>
11
12"""
13import os
14import sys
15import cmd
16import select
17import shutil
18import builtins
19import importlib
20import threading
21import subprocess
22import collections
23
24from xonsh.lazyasd import LazyObject, lazyobject
25from xonsh.base_shell import BaseShell
26from xonsh.ansi_colors import (
27    ansi_partial_color_format,
28    ansi_color_style_names,
29    ansi_color_style,
30)
31from xonsh.prompt.base import multiline_prompt
32from xonsh.tools import (
33    print_exception,
34    check_for_partial_string,
35    to_bool,
36    columnize,
37    carriage_return,
38)
39from xonsh.platform import (
40    ON_WINDOWS,
41    ON_CYGWIN,
42    ON_MSYS,
43    ON_DARWIN,
44    ON_POSIX,
45    os_environ,
46)
47from xonsh.lazyimps import pygments, pyghooks, winutils
48from xonsh.events import events
49
50readline = None
51RL_COMPLETION_SUPPRESS_APPEND = RL_LIB = RL_STATE = None
52RL_COMPLETION_QUERY_ITEMS = None
53RL_CAN_RESIZE = False
54RL_DONE = None
55RL_VARIABLE_VALUE = None
56_RL_STATE_DONE = 0x1000000
57_RL_STATE_ISEARCH = 0x0000080
58
59_RL_PREV_CASE_SENSITIVE_COMPLETIONS = "to-be-set"
60
61
62def setup_readline():
63    """Sets up the readline module and completion suppression, if available."""
64    global RL_COMPLETION_SUPPRESS_APPEND, RL_LIB, RL_CAN_RESIZE, RL_STATE, readline, RL_COMPLETION_QUERY_ITEMS
65    if RL_COMPLETION_SUPPRESS_APPEND is not None:
66        return
67    for _rlmod_name in ("gnureadline", "readline"):
68        try:
69            readline = importlib.import_module(_rlmod_name)
70            sys.modules["readline"] = readline
71        except ImportError:
72            pass
73        else:
74            break
75
76    if readline is None:
77        print(
78            """Skipping setup. Because no `readline` implementation available.
79            Please install a backend (`readline`, `prompt-toolkit`, etc) to use
80            `xonsh` interactively.
81            See https://github.com/xonsh/xonsh/issues/1170"""
82        )
83        return
84
85    import ctypes
86    import ctypes.util
87
88    uses_libedit = readline.__doc__ and "libedit" in readline.__doc__
89    readline.set_completer_delims(" \t\n")
90    # Cygwin seems to hang indefinitely when querying the readline lib
91    if (not ON_CYGWIN) and (not ON_MSYS) and (not readline.__file__.endswith(".py")):
92        RL_LIB = lib = ctypes.cdll.LoadLibrary(readline.__file__)
93        try:
94            RL_COMPLETION_SUPPRESS_APPEND = ctypes.c_int.in_dll(
95                lib, "rl_completion_suppress_append"
96            )
97        except ValueError:
98            # not all versions of readline have this symbol, ie Macs sometimes
99            RL_COMPLETION_SUPPRESS_APPEND = None
100        try:
101            RL_COMPLETION_QUERY_ITEMS = ctypes.c_int.in_dll(
102                lib, "rl_completion_query_items"
103            )
104        except ValueError:
105            # not all versions of readline have this symbol, ie Macs sometimes
106            RL_COMPLETION_QUERY_ITEMS = None
107        try:
108            RL_STATE = ctypes.c_int.in_dll(lib, "rl_readline_state")
109        except Exception:
110            pass
111        RL_CAN_RESIZE = hasattr(lib, "rl_reset_screen_size")
112    env = builtins.__xonsh_env__
113    # reads in history
114    readline.set_history_length(-1)
115    ReadlineHistoryAdder()
116    # sets up IPython-like history matching with up and down
117    readline.parse_and_bind('"\e[B": history-search-forward')
118    readline.parse_and_bind('"\e[A": history-search-backward')
119    # Setup Shift-Tab to indent
120    readline.parse_and_bind('"\e[Z": "{0}"'.format(env.get("INDENT")))
121
122    # handle tab completion differences found in libedit readline compatibility
123    # as discussed at http://stackoverflow.com/a/7116997
124    if uses_libedit and ON_DARWIN:
125        readline.parse_and_bind("bind ^I rl_complete")
126        print(
127            "\n".join(
128                [
129                    "",
130                    "*" * 78,
131                    "libedit detected - readline will not be well behaved, including but not limited to:",
132                    "   * crashes on tab completion",
133                    "   * incorrect history navigation",
134                    "   * corrupting long-lines",
135                    "   * failure to wrap or indent lines properly",
136                    "",
137                    "It is highly recommended that you install gnureadline, which is installable with:",
138                    "     xpip install gnureadline",
139                    "*" * 78,
140                ]
141            ),
142            file=sys.stderr,
143        )
144    else:
145        readline.parse_and_bind("tab: complete")
146    # try to load custom user settings
147    inputrc_name = os_environ.get("INPUTRC")
148    if inputrc_name is None:
149        if uses_libedit:
150            inputrc_name = ".editrc"
151        else:
152            inputrc_name = ".inputrc"
153        inputrc_name = os.path.join(os.path.expanduser("~"), inputrc_name)
154    if (not ON_WINDOWS) and (not os.path.isfile(inputrc_name)):
155        inputrc_name = "/etc/inputrc"
156    if ON_WINDOWS:
157        winutils.enable_virtual_terminal_processing()
158    if os.path.isfile(inputrc_name):
159        try:
160            readline.read_init_file(inputrc_name)
161        except Exception:
162            # this seems to fail with libedit
163            print_exception("xonsh: could not load readline default init file.")
164    # properly reset input typed before the first prompt
165    readline.set_startup_hook(carriage_return)
166
167
168def teardown_readline():
169    """Tears down up the readline module, if available."""
170    try:
171        import readline
172    except (ImportError, TypeError):
173        return
174
175
176def _rebind_case_sensitive_completions():
177    # handle case sensitive, see Github issue #1342 for details
178    global _RL_PREV_CASE_SENSITIVE_COMPLETIONS
179    env = builtins.__xonsh_env__
180    case_sensitive = env.get("CASE_SENSITIVE_COMPLETIONS")
181    if case_sensitive is _RL_PREV_CASE_SENSITIVE_COMPLETIONS:
182        return
183    if case_sensitive:
184        readline.parse_and_bind("set completion-ignore-case off")
185    else:
186        readline.parse_and_bind("set completion-ignore-case on")
187    _RL_PREV_CASE_SENSITIVE_COMPLETIONS = case_sensitive
188
189
190def fix_readline_state_after_ctrl_c():
191    """
192    Fix to allow Ctrl-C to exit reverse-i-search.
193
194    Based on code from:
195        http://bugs.python.org/file39467/raw_input__workaround_demo.py
196    """
197    if ON_WINDOWS:
198        # hack to make pyreadline mimic the desired behavior
199        try:
200            _q = readline.rl.mode.process_keyevent_queue
201            if len(_q) > 1:
202                _q.pop()
203        except Exception:
204            pass
205    if RL_STATE is None:
206        return
207    if RL_STATE.value & _RL_STATE_ISEARCH:
208        RL_STATE.value &= ~_RL_STATE_ISEARCH
209    if not RL_STATE.value & _RL_STATE_DONE:
210        RL_STATE.value |= _RL_STATE_DONE
211
212
213def rl_completion_suppress_append(val=1):
214    """Sets the rl_completion_suppress_append variable, if possible.
215    A value of 1 (default) means to suppress, a value of 0 means to enable.
216    """
217    if RL_COMPLETION_SUPPRESS_APPEND is None:
218        return
219    RL_COMPLETION_SUPPRESS_APPEND.value = val
220
221
222def rl_completion_query_items(val=None):
223    """Sets the rl_completion_query_items variable, if possible.
224    A None value will set this to $COMPLETION_QUERY_LIMIT, otherwise any integer
225    is accepted.
226    """
227    if RL_COMPLETION_QUERY_ITEMS is None:
228        return
229    if val is None:
230        val = builtins.__xonsh_env__.get("COMPLETION_QUERY_LIMIT")
231    RL_COMPLETION_QUERY_ITEMS.value = val
232
233
234def rl_variable_dumper(readable=True):
235    """Dumps the currently set readline variables. If readable is True, then this
236    output may be used in an inputrc file.
237    """
238    RL_LIB.rl_variable_dumper(int(readable))
239
240
241def rl_variable_value(variable):
242    """Returns the currently set value for a readline configuration variable."""
243    global RL_VARIABLE_VALUE
244    if RL_VARIABLE_VALUE is None:
245        import ctypes
246
247        RL_VARIABLE_VALUE = RL_LIB.rl_variable_value
248        RL_VARIABLE_VALUE.restype = ctypes.c_char_p
249    env = builtins.__xonsh_env__
250    enc, errors = env.get("XONSH_ENCODING"), env.get("XONSH_ENCODING_ERRORS")
251    if isinstance(variable, str):
252        variable = variable.encode(encoding=enc, errors=errors)
253    rtn = RL_VARIABLE_VALUE(variable)
254    return rtn.decode(encoding=enc, errors=errors)
255
256
257@lazyobject
258def rl_on_new_line():
259    """Grabs one of a few possible redisplay functions in readline."""
260    names = ["rl_on_new_line", "rl_forced_update_display", "rl_redisplay"]
261    for name in names:
262        func = getattr(RL_LIB, name, None)
263        if func is not None:
264            break
265    else:
266
267        def print_for_newline():
268            print()
269
270        func = print_for_newline
271    return func
272
273
274def _insert_text_func(s, readline):
275    """Creates a function to insert text via readline."""
276
277    def inserter():
278        readline.insert_text(s)
279        readline.redisplay()
280
281    return inserter
282
283
284DEDENT_TOKENS = LazyObject(
285    lambda: frozenset(["raise", "return", "pass", "break", "continue"]),
286    globals(),
287    "DEDENT_TOKENS",
288)
289
290
291class ReadlineShell(BaseShell, cmd.Cmd):
292    """The readline based xonsh shell."""
293
294    def __init__(self, completekey="tab", stdin=None, stdout=None, **kwargs):
295        super().__init__(completekey=completekey, stdin=stdin, stdout=stdout, **kwargs)
296        setup_readline()
297        self._current_indent = ""
298        self._current_prompt = ""
299        self._force_hide = None
300        self._complete_only_last_table = {
301            # Truth table for completions, keys are:
302            # (prefix_begs_quote, prefix_ends_quote, i_ends_quote,
303            #  last_starts_with_prefix, i_has_space)
304            (True, True, True, True, True): True,
305            (True, True, True, True, False): True,
306            (True, True, True, False, True): False,
307            (True, True, True, False, False): True,
308            (True, True, False, True, True): False,
309            (True, True, False, True, False): False,
310            (True, True, False, False, True): False,
311            (True, True, False, False, False): False,
312            (True, False, True, True, True): True,
313            (True, False, True, True, False): False,
314            (True, False, True, False, True): False,
315            (True, False, True, False, False): True,
316            (True, False, False, True, True): False,
317            (True, False, False, True, False): False,
318            (True, False, False, False, True): False,
319            (True, False, False, False, False): False,
320            (False, True, True, True, True): True,
321            (False, True, True, True, False): True,
322            (False, True, True, False, True): True,
323            (False, True, True, False, False): True,
324            (False, True, False, True, True): False,
325            (False, True, False, True, False): False,
326            (False, True, False, False, True): False,
327            (False, True, False, False, False): False,
328            (False, False, True, True, True): False,
329            (False, False, True, True, False): False,
330            (False, False, True, False, True): False,
331            (False, False, True, False, False): True,
332            (False, False, False, True, True): True,
333            (False, False, False, True, False): False,
334            (False, False, False, False, True): False,
335            (False, False, False, False, False): False,
336        }
337        self.cmdqueue = collections.deque()
338
339    def __del__(self):
340        teardown_readline()
341
342    def singleline(self, store_in_history=True, **kwargs):
343        """Reads a single line of input. The store_in_history kwarg
344        flags whether the input should be stored in readline's in-memory
345        history.
346        """
347        if not store_in_history:  # store current position to remove it later
348            try:
349                import readline
350            except ImportError:
351                store_in_history = True
352            pos = readline.get_current_history_length() - 1
353        events.on_pre_prompt.fire()
354        rtn = input(self.prompt)
355        events.on_post_prompt.fire()
356        if not store_in_history and pos >= 0:
357            readline.remove_history_item(pos)
358        return rtn
359
360    def parseline(self, line):
361        """Overridden to no-op."""
362        return "", line, line
363
364    def _querycompletions(self, completions, loc):
365        """Returns whether or not we should show completions. 0 means that prefixes
366        should not be shown, 1 means that there is a common prefix among all completions
367        and they should be shown, while 2 means that there is no common prefix but
368        we are under the query limit and they should be shown.
369        """
370        if os.path.commonprefix([c[loc:] for c in completions]):
371            return 1
372        elif len(completions) <= builtins.__xonsh_env__.get("COMPLETION_QUERY_LIMIT"):
373            return 2
374        msg = "\nDisplay all {} possibilities? ".format(len(completions))
375        msg += "({GREEN}y{NO_COLOR} or {RED}n{NO_COLOR})"
376        self.print_color(msg, end="", flush=True, file=sys.stderr)
377        yn = "x"
378        while yn not in "yn":
379            yn = sys.stdin.read(1)
380        show_completions = to_bool(yn)
381        print()
382        if not show_completions:
383            rl_on_new_line()
384            return 0
385        w, h = shutil.get_terminal_size()
386        lines = columnize(completions, width=w)
387        more_msg = self.format_color(
388            "{YELLOW}==={NO_COLOR} more or "
389            "{PURPLE}({NO_COLOR}q{PURPLE}){NO_COLOR}uit "
390            "{YELLOW}==={NO_COLOR}"
391        )
392        while len(lines) > h - 1:
393            print("".join(lines[: h - 1]), end="", flush=True, file=sys.stderr)
394            lines = lines[h - 1 :]
395            print(more_msg, end="", flush=True, file=sys.stderr)
396            q = sys.stdin.read(1).lower()
397            print(flush=True, file=sys.stderr)
398            if q == "q":
399                rl_on_new_line()
400                return 0
401        print("".join(lines), end="", flush=True, file=sys.stderr)
402        rl_on_new_line()
403        return 0
404
405    def completedefault(self, prefix, line, begidx, endidx):
406        """Implements tab-completion for text."""
407        if self.completer is None:
408            return []
409        rl_completion_suppress_append()  # this needs to be called each time
410        _rebind_case_sensitive_completions()
411        rl_completion_query_items(val=999999999)
412        completions, l = self.completer.complete(
413            prefix, line, begidx, endidx, ctx=self.ctx
414        )
415        chopped = prefix[:-l]
416        if chopped:
417            rtn_completions = [chopped + i for i in completions]
418        else:
419            rtn_completions = completions
420        rtn = []
421        prefix_begs_quote = prefix.startswith("'") or prefix.startswith('"')
422        prefix_ends_quote = prefix.endswith("'") or prefix.endswith('"')
423        for i in rtn_completions:
424            i_ends_quote = i.endswith("'") or i.endswith('"')
425            last = i.rsplit(" ", 1)[-1]
426            last_starts_prefix = last.startswith(prefix)
427            i_has_space = " " in i
428            key = (
429                prefix_begs_quote,
430                prefix_ends_quote,
431                i_ends_quote,
432                last_starts_prefix,
433                i_has_space,
434            )
435            rtn.append(last if self._complete_only_last_table[key] else i)
436        # return based on show completions
437        show_completions = self._querycompletions(completions, endidx - begidx)
438        if show_completions == 0:
439            return []
440        elif show_completions == 1:
441            return rtn
442        elif show_completions == 2:
443            return completions
444        else:
445            raise ValueError("query completions flag not understood.")
446
447    # tab complete on first index too
448    completenames = completedefault
449
450    def _load_remaining_input_into_queue(self):
451        buf = b""
452        while True:
453            r, w, x = select.select([self.stdin], [], [], 1e-6)
454            if len(r) == 0:
455                break
456            buf += os.read(self.stdin.fileno(), 1024)
457        if len(buf) > 0:
458            buf = buf.decode().replace("\r\n", "\n").replace("\r", "\n")
459            self.cmdqueue.extend(buf.splitlines(keepends=True))
460
461    def postcmd(self, stop, line):
462        """Called just before execution of line. For readline, this handles the
463        automatic indentation of code blocks.
464        """
465        try:
466            import readline
467        except ImportError:
468            return stop
469        if self.need_more_lines:
470            if len(line.strip()) == 0:
471                readline.set_pre_input_hook(None)
472                self._current_indent = ""
473            elif line.rstrip()[-1] == ":":
474                ind = line[: len(line) - len(line.lstrip())]
475                ind += builtins.__xonsh_env__.get("INDENT")
476                readline.set_pre_input_hook(_insert_text_func(ind, readline))
477                self._current_indent = ind
478            elif line.split(maxsplit=1)[0] in DEDENT_TOKENS:
479                env = builtins.__xonsh_env__
480                ind = self._current_indent[: -len(env.get("INDENT"))]
481                readline.set_pre_input_hook(_insert_text_func(ind, readline))
482                self._current_indent = ind
483            else:
484                ind = line[: len(line) - len(line.lstrip())]
485                if ind != self._current_indent:
486                    insert_func = _insert_text_func(ind, readline)
487                    readline.set_pre_input_hook(insert_func)
488                    self._current_indent = ind
489        else:
490            readline.set_pre_input_hook(None)
491        return stop
492
493    def _cmdloop(self, intro=None):
494        """Repeatedly issue a prompt, accept input, parse an initial prefix
495        off the received input, and dispatch to action methods, passing them
496        the remainder of the line as argument.
497
498        This was forked from Lib/cmd.py from the Python standard library v3.4.3,
499        (C) Python Software Foundation, 2015.
500        """
501        self.preloop()
502        if self.use_rawinput and self.completekey:
503            try:
504                import readline
505
506                self.old_completer = readline.get_completer()
507                readline.set_completer(self.complete)
508                readline.parse_and_bind(self.completekey + ": complete")
509                have_readline = True
510            except ImportError:
511                have_readline = False
512        try:
513            if intro is not None:
514                self.intro = intro
515            if self.intro:
516                self.stdout.write(str(self.intro) + "\n")
517            stop = None
518            while not stop:
519                line = None
520                exec_now = False
521                if len(self.cmdqueue) > 0:
522                    line = self.cmdqueue.popleft()
523                    exec_now = line.endswith("\n")
524                if self.use_rawinput and not exec_now:
525                    inserter = (
526                        None if line is None else _insert_text_func(line, readline)
527                    )
528                    if inserter is not None:
529                        readline.set_pre_input_hook(inserter)
530                    try:
531                        line = self.singleline()
532                    except EOFError:
533                        if builtins.__xonsh_env__.get("IGNOREEOF"):
534                            self.stdout.write('Use "exit" to leave the shell.' "\n")
535                            line = ""
536                        else:
537                            line = "EOF"
538                    if inserter is not None:
539                        readline.set_pre_input_hook(None)
540                else:
541                    self.print_color(self.prompt, file=self.stdout)
542                    if line is not None:
543                        os.write(self.stdin.fileno(), line.encode())
544                    if not exec_now:
545                        line = self.stdin.readline()
546                    if len(line) == 0:
547                        line = "EOF"
548                    else:
549                        line = line.rstrip("\r\n")
550                    if have_readline and line != "EOF":
551                        readline.add_history(line)
552                if not ON_WINDOWS:
553                    # select() is not fully functional on windows
554                    self._load_remaining_input_into_queue()
555                line = self.precmd(line)
556                stop = self.onecmd(line)
557                stop = self.postcmd(stop, line)
558                if ON_WINDOWS:
559                    winutils.enable_virtual_terminal_processing()
560            self.postloop()
561        finally:
562            if self.use_rawinput and self.completekey:
563                try:
564                    import readline
565
566                    readline.set_completer(self.old_completer)
567                except ImportError:
568                    pass
569
570    def cmdloop(self, intro=None):
571        while not builtins.__xonsh_exit__:
572            try:
573                self._cmdloop(intro=intro)
574            except (KeyboardInterrupt, SystemExit):
575                print()  # Gives a newline
576                fix_readline_state_after_ctrl_c()
577                self.reset_buffer()
578                intro = None
579
580    @property
581    def prompt(self):
582        """Obtains the current prompt string."""
583        global RL_LIB, RL_CAN_RESIZE
584        if RL_CAN_RESIZE:
585            # This is needed to support some system where line-wrapping doesn't
586            # work. This is a bug in upstream Python, or possibly readline.
587            RL_LIB.rl_reset_screen_size()
588        if self.need_more_lines:
589            if self.mlprompt is None:
590                try:
591                    self.mlprompt = multiline_prompt(curr=self._current_prompt)
592                except Exception:  # pylint: disable=broad-except
593                    print_exception()
594                    self.mlprompt = "<multiline prompt error> "
595            return self.mlprompt
596        env = builtins.__xonsh_env__  # pylint: disable=no-member
597        p = env.get("PROMPT")
598        try:
599            p = self.prompt_formatter(p)
600        except Exception:  # pylint: disable=broad-except
601            print_exception()
602        hide = True if self._force_hide is None else self._force_hide
603        p = ansi_partial_color_format(p, style=env.get("XONSH_COLOR_STYLE"), hide=hide)
604        self._current_prompt = p
605        self.settitle()
606        return p
607
608    def format_color(self, string, hide=False, force_string=False, **kwargs):
609        """Readline implementation of color formatting. This uses ANSI color
610        codes.
611        """
612        hide = hide if self._force_hide is None else self._force_hide
613        style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE")
614        return ansi_partial_color_format(string, hide=hide, style=style)
615
616    def print_color(self, string, hide=False, **kwargs):
617        if isinstance(string, str):
618            s = self.format_color(string, hide=hide)
619        else:
620            # assume this is a list of (Token, str) tuples and format it
621            env = builtins.__xonsh_env__
622            self.styler.style_name = env.get("XONSH_COLOR_STYLE")
623            style_proxy = pyghooks.xonsh_style_proxy(self.styler)
624            formatter = pyghooks.XonshTerminal256Formatter(style=style_proxy)
625            s = pygments.format(string, formatter).rstrip()
626        print(s, **kwargs)
627
628    def color_style_names(self):
629        """Returns an iterable of all available style names."""
630        return ansi_color_style_names()
631
632    def color_style(self):
633        """Returns the current color map."""
634        style = style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE")
635        return ansi_color_style(style=style)
636
637    def restore_tty_sanity(self):
638        """An interface for resetting the TTY stdin mode. This is highly
639        dependent on the shell backend. Also it is mostly optional since
640        it only affects ^Z backgrounding behaviour.
641        """
642        if not ON_POSIX:
643            return
644        stty, _ = builtins.__xonsh_commands_cache__.lazyget("stty", None)
645        if stty is None:
646            return
647        # If available, we should just call the stty utility. This call should
648        # not throw even if stty fails. It should also be noted that subprocess
649        # calls, like the following, seem to be ineffective:
650        #       subprocess.call([stty, 'sane'], shell=True)
651        # My guess is that this is because Popen does some crazy redirecting
652        # under the covers. This effectively hides the true TTY stdin handle
653        # from stty. To get around this we have to use the lower level
654        # os.system() function.
655        os.system(stty + " sane")
656
657
658class ReadlineHistoryAdder(threading.Thread):
659    def __init__(self, wait_for_gc=True, *args, **kwargs):
660        """Thread responsible for adding inputs from history to the
661        current readline instance. May wait for the history garbage
662        collector to finish.
663        """
664        super(ReadlineHistoryAdder, self).__init__(*args, **kwargs)
665        self.daemon = True
666        self.wait_for_gc = wait_for_gc
667        self.start()
668
669    def run(self):
670        try:
671            import readline
672        except ImportError:
673            return
674        hist = builtins.__xonsh_history__
675        if hist is None:
676            return
677        i = 1
678        for h in hist.all_items():
679            line = h["inp"].rstrip()
680            if i == 1:
681                pass
682            elif line == readline.get_history_item(i - 1):
683                continue
684            readline.add_history(line)
685            if RL_LIB is not None:
686                RL_LIB.history_set_pos(i)
687            i += 1
688