1# -*- coding: utf-8 -*-
2"""The base class for xonsh shell"""
3import io
4import os
5import sys
6import time
7import builtins
8
9from xonsh.tools import (
10    XonshError,
11    print_exception,
12    DefaultNotGiven,
13    check_for_partial_string,
14    format_std_prepost,
15    get_line_continuation,
16)
17from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS
18from xonsh.codecache import (
19    should_use_cache,
20    code_cache_name,
21    code_cache_check,
22    get_cache_filename,
23    update_cache,
24    run_compiled_code,
25)
26from xonsh.completer import Completer
27from xonsh.prompt.base import multiline_prompt, PromptFormatter
28from xonsh.events import events
29from xonsh.shell import transform_command
30from xonsh.lazyimps import pygments, pyghooks
31from xonsh.ansi_colors import ansi_partial_color_format
32
33if ON_WINDOWS:
34    import ctypes
35
36    kernel32 = ctypes.windll.kernel32
37    kernel32.SetConsoleTitleW.argtypes = [ctypes.c_wchar_p]
38
39
40class _TeeStdBuf(io.RawIOBase):
41    """A dispatcher for bytes to two buffers, as std stream buffer and an
42    in memory buffer.
43    """
44
45    def __init__(
46        self, stdbuf, membuf, encoding=None, errors=None, prestd=b"", poststd=b""
47    ):
48        """
49        Parameters
50        ----------
51        stdbuf : BytesIO-like or StringIO-like
52            The std stream buffer.
53        membuf : BytesIO-like
54            The in memory stream buffer.
55        encoding : str or None, optional
56            The encoding of the stream. Only used if stdbuf is a text stream,
57            rather than a binary one. Defaults to $XONSH_ENCODING if None.
58        errors : str or None, optional
59            The error form for the encoding of the stream. Only used if stdbuf
60            is a text stream, rather than a binary one. Deafults to
61            $XONSH_ENCODING_ERRORS if None.
62        prestd : bytes, optional
63            The prefix to prepend to the standard buffer.
64        poststd : bytes, optional
65            The postfix to append to the standard buffer.
66        """
67        self.stdbuf = stdbuf
68        self.membuf = membuf
69        env = builtins.__xonsh_env__
70        self.encoding = env.get("XONSH_ENCODING") if encoding is None else encoding
71        self.errors = env.get("XONSH_ENCODING_ERRORS") if errors is None else errors
72        self.prestd = prestd
73        self.poststd = poststd
74        self._std_is_binary = not hasattr(stdbuf, "encoding")
75
76    def fileno(self):
77        """Returns the file descriptor of the std buffer."""
78        return self.stdbuf.fileno()
79
80    def seek(self, offset, whence=io.SEEK_SET):
81        """Sets the location in both the stdbuf and the membuf."""
82        self.stdbuf.seek(offset, whence)
83        self.membuf.seek(offset, whence)
84
85    def truncate(self, size=None):
86        """Truncate both buffers."""
87        self.stdbuf.truncate(size)
88        self.membuf.truncate(size)
89
90    def readinto(self, b):
91        """Read bytes into buffer from both streams."""
92        if self._std_is_binary:
93            self.stdbuf.readinto(b)
94        return self.membuf.readinto(b)
95
96    def write(self, b):
97        """Write bytes into both buffers."""
98        std_b = b
99        if self.prestd:
100            std_b = self.prestd + b
101        if self.poststd:
102            std_b += self.poststd
103        # write to stdbuf
104        if self._std_is_binary:
105            self.stdbuf.write(std_b)
106        else:
107            self.stdbuf.write(std_b.decode(encoding=self.encoding, errors=self.errors))
108        return self.membuf.write(b)
109
110
111class _TeeStd(io.TextIOBase):
112    """Tees a std stream into an in-memory container and the original stream."""
113
114    def __init__(self, name, mem, prestd="", poststd=""):
115        """
116        Parameters
117        ----------
118        name : str
119            The name of the buffer in the sys module, e.g. 'stdout'.
120        mem : io.TextIOBase-like
121            The in-memory text-based representation.
122        prestd : str, optional
123            The prefix to prepend to the standard stream.
124        poststd : str, optional
125            The postfix to append to the standard stream.
126        """
127        self._name = name
128        self.std = std = getattr(sys, name)
129        self.mem = mem
130        self.prestd = prestd
131        self.poststd = poststd
132        preb = prestd.encode(encoding=mem.encoding, errors=mem.errors)
133        postb = poststd.encode(encoding=mem.encoding, errors=mem.errors)
134        if hasattr(std, "buffer"):
135            buffer = _TeeStdBuf(std.buffer, mem.buffer, prestd=preb, poststd=postb)
136        else:
137            # TextIO does not have buffer as part of the API, so std streams
138            # may not either.
139            buffer = _TeeStdBuf(
140                std,
141                mem.buffer,
142                encoding=mem.encoding,
143                errors=mem.errors,
144                prestd=preb,
145                poststd=postb,
146            )
147        self.buffer = buffer
148        setattr(sys, name, self)
149
150    @property
151    def encoding(self):
152        """The encoding of the in-memory buffer."""
153        return self.mem.encoding
154
155    @property
156    def errors(self):
157        """The errors of the in-memory buffer."""
158        return self.mem.errors
159
160    @property
161    def newlines(self):
162        """The newlines of the in-memory buffer."""
163        return self.mem.newlines
164
165    def _replace_std(self):
166        std = self.std
167        if std is None:
168            return
169        setattr(sys, self._name, std)
170        self.std = self._name = None
171
172    def __del__(self):
173        self._replace_std()
174
175    def close(self):
176        """Restores the original std stream."""
177        self._replace_std()
178
179    def write(self, s):
180        """Writes data to the original std stream and the in-memory object."""
181        self.mem.write(s)
182        if self.std is None:
183            return
184        std_s = s
185        if self.prestd:
186            std_s = self.prestd + std_s
187        if self.poststd:
188            std_s += self.poststd
189        self.std.write(std_s)
190
191    def flush(self):
192        """Flushes both the original stdout and the buffer."""
193        self.std.flush()
194        self.mem.flush()
195
196    def fileno(self):
197        """Tunnel fileno() calls to the std stream."""
198        return self.std.fileno()
199
200    def seek(self, offset, whence=io.SEEK_SET):
201        """Seek to a location in both streams."""
202        self.std.seek(offset, whence)
203        self.mem.seek(offset, whence)
204
205    def truncate(self, size=None):
206        """Seek to a location in both streams."""
207        self.std.truncate(size)
208        self.mem.truncate(size)
209
210    def detach(self):
211        """This operation is not supported."""
212        raise io.UnsupportedOperation
213
214    def read(self, size=None):
215        """Read from the in-memory stream and seek to a new location in the
216        std stream.
217        """
218        s = self.mem.read(size)
219        loc = self.std.tell()
220        self.std.seek(loc + len(s))
221        return s
222
223    def readline(self, size=-1):
224        """Read a line from the in-memory stream and seek to a new location
225        in the std stream.
226        """
227        s = self.mem.readline(size)
228        loc = self.std.tell()
229        self.std.seek(loc + len(s))
230        return s
231
232
233class Tee:
234    """Class that merges tee'd stdout and stderr into a single stream.
235
236    This represents what a user would actually see on the command line.
237    This class has the same interface as io.TextIOWrapper, except that
238    the buffer is optional.
239    """
240
241    # pylint is a stupid about counting public methods when using inheritance.
242    # pylint: disable=too-few-public-methods
243
244    def __init__(
245        self,
246        buffer=None,
247        encoding=None,
248        errors=None,
249        newline=None,
250        line_buffering=False,
251        write_through=False,
252    ):
253        self.buffer = io.BytesIO() if buffer is None else buffer
254        self.memory = io.TextIOWrapper(
255            self.buffer,
256            encoding=encoding,
257            errors=errors,
258            newline=newline,
259            line_buffering=line_buffering,
260            write_through=write_through,
261        )
262        self.stdout = _TeeStd("stdout", self.memory)
263        env = builtins.__xonsh_env__
264        prestderr = format_std_prepost(env.get("XONSH_STDERR_PREFIX"))
265        poststderr = format_std_prepost(env.get("XONSH_STDERR_POSTFIX"))
266        self.stderr = _TeeStd(
267            "stderr", self.memory, prestd=prestderr, poststd=poststderr
268        )
269
270    @property
271    def line_buffering(self):
272        return self.memory.line_buffering
273
274    def __del__(self):
275        del self.stdout, self.stderr
276        self.stdout = self.stderr = None
277
278    def close(self):
279        """Closes the buffer as well as the stdout and stderr tees."""
280        self.stdout.close()
281        self.stderr.close()
282        self.memory.close()
283
284    def getvalue(self):
285        """Gets the current contents of the in-memory buffer."""
286        m = self.memory
287        loc = m.tell()
288        m.seek(0)
289        s = m.read()
290        m.seek(loc)
291        return s
292
293
294class BaseShell(object):
295    """The xonsh shell."""
296
297    def __init__(self, execer, ctx, **kwargs):
298        super().__init__()
299        self.execer = execer
300        self.ctx = ctx
301        self.completer = Completer() if kwargs.get("completer", True) else None
302        self.buffer = []
303        self.need_more_lines = False
304        self.mlprompt = None
305        self._styler = DefaultNotGiven
306        self.prompt_formatter = PromptFormatter()
307        self.accumulated_inputs = ""
308
309    @property
310    def styler(self):
311        if self._styler is DefaultNotGiven:
312            if HAS_PYGMENTS:
313                from xonsh.pyghooks import XonshStyle
314
315                env = builtins.__xonsh_env__
316                self._styler = XonshStyle(env.get("XONSH_COLOR_STYLE"))
317            else:
318                self._styler = None
319        return self._styler
320
321    @styler.setter
322    def styler(self, value):
323        self._styler = value
324
325    @styler.deleter
326    def styler(self):
327        self._styler = DefaultNotGiven
328
329    def emptyline(self):
330        """Called when an empty line has been entered."""
331        self.need_more_lines = False
332        self.default("")
333
334    def singleline(self, **kwargs):
335        """Reads a single line of input from the shell."""
336        msg = "{0} has not implemented singleline()."
337        raise RuntimeError(msg.format(self.__class__.__name__))
338
339    def precmd(self, line):
340        """Called just before execution of line."""
341        return line if self.need_more_lines else line.lstrip()
342
343    def default(self, line):
344        """Implements code execution."""
345        line = line if line.endswith("\n") else line + "\n"
346        src, code = self.push(line)
347        if code is None:
348            return
349
350        events.on_precommand.fire(cmd=src)
351
352        env = builtins.__xonsh_env__
353        hist = builtins.__xonsh_history__  # pylint: disable=no-member
354        ts1 = None
355        enc = env.get("XONSH_ENCODING")
356        err = env.get("XONSH_ENCODING_ERRORS")
357        tee = Tee(encoding=enc, errors=err)
358        try:
359            ts0 = time.time()
360            run_compiled_code(code, self.ctx, None, "single")
361            ts1 = time.time()
362            if hist is not None and hist.last_cmd_rtn is None:
363                hist.last_cmd_rtn = 0  # returncode for success
364        except XonshError as e:
365            print(e.args[0], file=sys.stderr)
366            if hist is not None and hist.last_cmd_rtn is None:
367                hist.last_cmd_rtn = 1  # return code for failure
368        except Exception:  # pylint: disable=broad-except
369            print_exception()
370            if hist is not None and hist.last_cmd_rtn is None:
371                hist.last_cmd_rtn = 1  # return code for failure
372        finally:
373            ts1 = ts1 or time.time()
374            tee_out = tee.getvalue()
375            self._append_history(inp=src, ts=[ts0, ts1], tee_out=tee_out)
376            self.accumulated_inputs += src
377            if (
378                tee_out
379                and env.get("XONSH_APPEND_NEWLINE")
380                and not tee_out.endswith(os.linesep)
381            ):
382                print(os.linesep, end="")
383            tee.close()
384            self._fix_cwd()
385        if builtins.__xonsh_exit__:  # pylint: disable=no-member
386            return True
387
388    def _append_history(self, tee_out=None, **info):
389        """Append information about the command to the history.
390
391        This also handles on_postcommand because this is the place where all the
392        information is available.
393        """
394        hist = builtins.__xonsh_history__  # pylint: disable=no-member
395        info["rtn"] = hist.last_cmd_rtn if hist is not None else None
396        tee_out = tee_out or None
397        last_out = hist.last_cmd_out if hist is not None else None
398        if last_out is None and tee_out is None:
399            pass
400        elif last_out is None and tee_out is not None:
401            info["out"] = tee_out
402        elif last_out is not None and tee_out is None:
403            info["out"] = last_out
404        else:
405            info["out"] = tee_out + "\n" + last_out
406        events.on_postcommand.fire(
407            cmd=info["inp"], rtn=info["rtn"], out=info.get("out", None), ts=info["ts"]
408        )
409        if hist is not None:
410            hist.append(info)
411            hist.last_cmd_rtn = hist.last_cmd_out = None
412
413    def _fix_cwd(self):
414        """Check if the cwd changed out from under us."""
415        env = builtins.__xonsh_env__
416        try:
417            cwd = os.getcwd()
418        except (FileNotFoundError, OSError):
419            cwd = None
420        if cwd is None:
421            # directory has been deleted out from under us, most likely
422            pwd = env.get("PWD", None)
423            if pwd is None:
424                # we have no idea where we are
425                env["PWD"] = "<invalid directory>"
426            elif os.path.isdir(pwd):
427                # unclear why os.getcwd() failed. do nothing.
428                pass
429            else:
430                # OK PWD is really gone.
431                msg = "{UNDERLINE_INTENSE_WHITE}{BACKGROUND_INTENSE_BLACK}"
432                msg += "xonsh: working directory does not exist: " + pwd
433                msg += "{NO_COLOR}"
434                self.print_color(msg, file=sys.stderr)
435        elif "PWD" not in env:
436            # $PWD is missing from env, recreate it
437            env["PWD"] = cwd
438        elif os.path.realpath(cwd) != os.path.realpath(env["PWD"]):
439            # The working directory has changed without updating $PWD, fix this
440            old = env["PWD"]
441            env["PWD"] = cwd
442            env["OLDPWD"] = old
443            events.on_chdir.fire(olddir=old, newdir=cwd)
444
445    def push(self, line):
446        """Pushes a line onto the buffer and compiles the code in a way that
447        enables multiline input.
448        """
449        self.buffer.append(line)
450        if self.need_more_lines:
451            return None, None
452        src = "".join(self.buffer)
453        src = transform_command(src)
454        return self.compile(src)
455
456    def compile(self, src):
457        """Compiles source code and returns the (possibly modified) source and
458        a valid code object.
459        """
460        _cache = should_use_cache(self.execer, "single")
461        if _cache:
462            codefname = code_cache_name(src)
463            cachefname = get_cache_filename(codefname, code=True)
464            usecache, code = code_cache_check(cachefname)
465            if usecache:
466                self.reset_buffer()
467                return src, code
468        lincont = get_line_continuation()
469        if src.endswith(lincont + "\n"):
470            self.need_more_lines = True
471            return src, None
472        try:
473            code = self.execer.compile(src, mode="single", glbs=self.ctx, locs=None)
474            if _cache:
475                update_cache(code, cachefname)
476            self.reset_buffer()
477        except SyntaxError:
478            partial_string_info = check_for_partial_string(src)
479            in_partial_string = (
480                partial_string_info[0] is not None and partial_string_info[1] is None
481            )
482            if (src == "\n" or src.endswith("\n\n")) and not in_partial_string:
483                self.reset_buffer()
484                print_exception()
485                return src, None
486            self.need_more_lines = True
487            code = None
488        except Exception:  # pylint: disable=broad-except
489            self.reset_buffer()
490            print_exception()
491            code = None
492        return src, code
493
494    def reset_buffer(self):
495        """Resets the line buffer."""
496        self.buffer.clear()
497        self.need_more_lines = False
498        self.mlprompt = None
499
500    def settitle(self):
501        """Sets terminal title."""
502        env = builtins.__xonsh_env__  # pylint: disable=no-member
503        term = env.get("TERM", None)
504        # Shells running in emacs sets TERM to "dumb" or "eterm-color".
505        # Do not set title for these to avoid garbled prompt.
506        if (term is None and not ON_WINDOWS) or term in [
507            "dumb",
508            "eterm-color",
509            "linux",
510        ]:
511            return
512        t = env.get("TITLE")
513        if t is None:
514            return
515        t = self.prompt_formatter(t)
516        if ON_WINDOWS and "ANSICON" not in env:
517            kernel32.SetConsoleTitleW(t)
518        else:
519            with open(1, "wb", closefd=False) as f:
520                # prevent xonsh from answering interactive questions
521                # on the next command by writing the title
522                f.write("\x1b]0;{0}\x07".format(t).encode())
523                f.flush()
524
525    @property
526    def prompt(self):
527        """Obtains the current prompt string."""
528        if self.need_more_lines:
529            if self.mlprompt is None:
530                try:
531                    self.mlprompt = multiline_prompt()
532                except Exception:  # pylint: disable=broad-except
533                    print_exception()
534                    self.mlprompt = "<multiline prompt error> "
535            return self.mlprompt
536        env = builtins.__xonsh_env__  # pylint: disable=no-member
537        p = env.get("PROMPT")
538        try:
539            p = self.prompt_formatter(p)
540        except Exception:  # pylint: disable=broad-except
541            print_exception()
542        self.settitle()
543        return p
544
545    def format_color(self, string, hide=False, force_string=False, **kwargs):
546        """Formats the colors in a string. ``BaseShell``'s default implementation
547        of this method uses colors based on ANSI color codes.
548        """
549        style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE")
550        return ansi_partial_color_format(string, hide=hide, style=style)
551
552    def print_color(self, string, hide=False, **kwargs):
553        """Prints a string in color. This base implementation's colors are based
554        on ANSI color codes if a string was given as input. If a list of token
555        pairs is given, it will color based on pygments, if available. If
556        pygments is not available, it will print a colorless string.
557        """
558        if isinstance(string, str):
559            s = self.format_color(string, hide=hide)
560        elif HAS_PYGMENTS:
561            # assume this is a list of (Token, str) tuples and format it
562            env = builtins.__xonsh_env__
563            self.styler.style_name = env.get("XONSH_COLOR_STYLE")
564            style_proxy = pyghooks.xonsh_style_proxy(self.styler)
565            formatter = pyghooks.XonshTerminal256Formatter(style=style_proxy)
566            s = pygments.format(string, formatter).rstrip()
567        else:
568            # assume this is a list of (Token, str) tuples and remove color
569            s = "".join([x for _, x in string])
570        print(s, **kwargs)
571
572    def color_style_names(self):
573        """Returns an iterable of all available style names."""
574        return ()
575
576    def color_style(self):
577        """Returns the current color map."""
578        return {}
579
580    def restore_tty_sanity(self):
581        """An interface for resetting the TTY stdin mode. This is highly
582        dependent on the shell backend. Also it is mostly optional since
583        it only affects ^Z backgrounding behaviour.
584        """
585        pass
586