1# -*- coding: utf-8 -*- 2"""The base class for xonsh shell""" 3import io 4import os 5import sys 6import time 7import builtins 8 9from xonsh.tools import ( 10 XonshError, 11 print_exception, 12 DefaultNotGiven, 13 check_for_partial_string, 14 format_std_prepost, 15 get_line_continuation, 16) 17from xonsh.platform import HAS_PYGMENTS, ON_WINDOWS 18from xonsh.codecache import ( 19 should_use_cache, 20 code_cache_name, 21 code_cache_check, 22 get_cache_filename, 23 update_cache, 24 run_compiled_code, 25) 26from xonsh.completer import Completer 27from xonsh.prompt.base import multiline_prompt, PromptFormatter 28from xonsh.events import events 29from xonsh.shell import transform_command 30from xonsh.lazyimps import pygments, pyghooks 31from xonsh.ansi_colors import ansi_partial_color_format 32 33if ON_WINDOWS: 34 import ctypes 35 36 kernel32 = ctypes.windll.kernel32 37 kernel32.SetConsoleTitleW.argtypes = [ctypes.c_wchar_p] 38 39 40class _TeeStdBuf(io.RawIOBase): 41 """A dispatcher for bytes to two buffers, as std stream buffer and an 42 in memory buffer. 43 """ 44 45 def __init__( 46 self, stdbuf, membuf, encoding=None, errors=None, prestd=b"", poststd=b"" 47 ): 48 """ 49 Parameters 50 ---------- 51 stdbuf : BytesIO-like or StringIO-like 52 The std stream buffer. 53 membuf : BytesIO-like 54 The in memory stream buffer. 55 encoding : str or None, optional 56 The encoding of the stream. Only used if stdbuf is a text stream, 57 rather than a binary one. Defaults to $XONSH_ENCODING if None. 58 errors : str or None, optional 59 The error form for the encoding of the stream. Only used if stdbuf 60 is a text stream, rather than a binary one. Deafults to 61 $XONSH_ENCODING_ERRORS if None. 62 prestd : bytes, optional 63 The prefix to prepend to the standard buffer. 64 poststd : bytes, optional 65 The postfix to append to the standard buffer. 66 """ 67 self.stdbuf = stdbuf 68 self.membuf = membuf 69 env = builtins.__xonsh_env__ 70 self.encoding = env.get("XONSH_ENCODING") if encoding is None else encoding 71 self.errors = env.get("XONSH_ENCODING_ERRORS") if errors is None else errors 72 self.prestd = prestd 73 self.poststd = poststd 74 self._std_is_binary = not hasattr(stdbuf, "encoding") 75 76 def fileno(self): 77 """Returns the file descriptor of the std buffer.""" 78 return self.stdbuf.fileno() 79 80 def seek(self, offset, whence=io.SEEK_SET): 81 """Sets the location in both the stdbuf and the membuf.""" 82 self.stdbuf.seek(offset, whence) 83 self.membuf.seek(offset, whence) 84 85 def truncate(self, size=None): 86 """Truncate both buffers.""" 87 self.stdbuf.truncate(size) 88 self.membuf.truncate(size) 89 90 def readinto(self, b): 91 """Read bytes into buffer from both streams.""" 92 if self._std_is_binary: 93 self.stdbuf.readinto(b) 94 return self.membuf.readinto(b) 95 96 def write(self, b): 97 """Write bytes into both buffers.""" 98 std_b = b 99 if self.prestd: 100 std_b = self.prestd + b 101 if self.poststd: 102 std_b += self.poststd 103 # write to stdbuf 104 if self._std_is_binary: 105 self.stdbuf.write(std_b) 106 else: 107 self.stdbuf.write(std_b.decode(encoding=self.encoding, errors=self.errors)) 108 return self.membuf.write(b) 109 110 111class _TeeStd(io.TextIOBase): 112 """Tees a std stream into an in-memory container and the original stream.""" 113 114 def __init__(self, name, mem, prestd="", poststd=""): 115 """ 116 Parameters 117 ---------- 118 name : str 119 The name of the buffer in the sys module, e.g. 'stdout'. 120 mem : io.TextIOBase-like 121 The in-memory text-based representation. 122 prestd : str, optional 123 The prefix to prepend to the standard stream. 124 poststd : str, optional 125 The postfix to append to the standard stream. 126 """ 127 self._name = name 128 self.std = std = getattr(sys, name) 129 self.mem = mem 130 self.prestd = prestd 131 self.poststd = poststd 132 preb = prestd.encode(encoding=mem.encoding, errors=mem.errors) 133 postb = poststd.encode(encoding=mem.encoding, errors=mem.errors) 134 if hasattr(std, "buffer"): 135 buffer = _TeeStdBuf(std.buffer, mem.buffer, prestd=preb, poststd=postb) 136 else: 137 # TextIO does not have buffer as part of the API, so std streams 138 # may not either. 139 buffer = _TeeStdBuf( 140 std, 141 mem.buffer, 142 encoding=mem.encoding, 143 errors=mem.errors, 144 prestd=preb, 145 poststd=postb, 146 ) 147 self.buffer = buffer 148 setattr(sys, name, self) 149 150 @property 151 def encoding(self): 152 """The encoding of the in-memory buffer.""" 153 return self.mem.encoding 154 155 @property 156 def errors(self): 157 """The errors of the in-memory buffer.""" 158 return self.mem.errors 159 160 @property 161 def newlines(self): 162 """The newlines of the in-memory buffer.""" 163 return self.mem.newlines 164 165 def _replace_std(self): 166 std = self.std 167 if std is None: 168 return 169 setattr(sys, self._name, std) 170 self.std = self._name = None 171 172 def __del__(self): 173 self._replace_std() 174 175 def close(self): 176 """Restores the original std stream.""" 177 self._replace_std() 178 179 def write(self, s): 180 """Writes data to the original std stream and the in-memory object.""" 181 self.mem.write(s) 182 if self.std is None: 183 return 184 std_s = s 185 if self.prestd: 186 std_s = self.prestd + std_s 187 if self.poststd: 188 std_s += self.poststd 189 self.std.write(std_s) 190 191 def flush(self): 192 """Flushes both the original stdout and the buffer.""" 193 self.std.flush() 194 self.mem.flush() 195 196 def fileno(self): 197 """Tunnel fileno() calls to the std stream.""" 198 return self.std.fileno() 199 200 def seek(self, offset, whence=io.SEEK_SET): 201 """Seek to a location in both streams.""" 202 self.std.seek(offset, whence) 203 self.mem.seek(offset, whence) 204 205 def truncate(self, size=None): 206 """Seek to a location in both streams.""" 207 self.std.truncate(size) 208 self.mem.truncate(size) 209 210 def detach(self): 211 """This operation is not supported.""" 212 raise io.UnsupportedOperation 213 214 def read(self, size=None): 215 """Read from the in-memory stream and seek to a new location in the 216 std stream. 217 """ 218 s = self.mem.read(size) 219 loc = self.std.tell() 220 self.std.seek(loc + len(s)) 221 return s 222 223 def readline(self, size=-1): 224 """Read a line from the in-memory stream and seek to a new location 225 in the std stream. 226 """ 227 s = self.mem.readline(size) 228 loc = self.std.tell() 229 self.std.seek(loc + len(s)) 230 return s 231 232 233class Tee: 234 """Class that merges tee'd stdout and stderr into a single stream. 235 236 This represents what a user would actually see on the command line. 237 This class has the same interface as io.TextIOWrapper, except that 238 the buffer is optional. 239 """ 240 241 # pylint is a stupid about counting public methods when using inheritance. 242 # pylint: disable=too-few-public-methods 243 244 def __init__( 245 self, 246 buffer=None, 247 encoding=None, 248 errors=None, 249 newline=None, 250 line_buffering=False, 251 write_through=False, 252 ): 253 self.buffer = io.BytesIO() if buffer is None else buffer 254 self.memory = io.TextIOWrapper( 255 self.buffer, 256 encoding=encoding, 257 errors=errors, 258 newline=newline, 259 line_buffering=line_buffering, 260 write_through=write_through, 261 ) 262 self.stdout = _TeeStd("stdout", self.memory) 263 env = builtins.__xonsh_env__ 264 prestderr = format_std_prepost(env.get("XONSH_STDERR_PREFIX")) 265 poststderr = format_std_prepost(env.get("XONSH_STDERR_POSTFIX")) 266 self.stderr = _TeeStd( 267 "stderr", self.memory, prestd=prestderr, poststd=poststderr 268 ) 269 270 @property 271 def line_buffering(self): 272 return self.memory.line_buffering 273 274 def __del__(self): 275 del self.stdout, self.stderr 276 self.stdout = self.stderr = None 277 278 def close(self): 279 """Closes the buffer as well as the stdout and stderr tees.""" 280 self.stdout.close() 281 self.stderr.close() 282 self.memory.close() 283 284 def getvalue(self): 285 """Gets the current contents of the in-memory buffer.""" 286 m = self.memory 287 loc = m.tell() 288 m.seek(0) 289 s = m.read() 290 m.seek(loc) 291 return s 292 293 294class BaseShell(object): 295 """The xonsh shell.""" 296 297 def __init__(self, execer, ctx, **kwargs): 298 super().__init__() 299 self.execer = execer 300 self.ctx = ctx 301 self.completer = Completer() if kwargs.get("completer", True) else None 302 self.buffer = [] 303 self.need_more_lines = False 304 self.mlprompt = None 305 self._styler = DefaultNotGiven 306 self.prompt_formatter = PromptFormatter() 307 self.accumulated_inputs = "" 308 309 @property 310 def styler(self): 311 if self._styler is DefaultNotGiven: 312 if HAS_PYGMENTS: 313 from xonsh.pyghooks import XonshStyle 314 315 env = builtins.__xonsh_env__ 316 self._styler = XonshStyle(env.get("XONSH_COLOR_STYLE")) 317 else: 318 self._styler = None 319 return self._styler 320 321 @styler.setter 322 def styler(self, value): 323 self._styler = value 324 325 @styler.deleter 326 def styler(self): 327 self._styler = DefaultNotGiven 328 329 def emptyline(self): 330 """Called when an empty line has been entered.""" 331 self.need_more_lines = False 332 self.default("") 333 334 def singleline(self, **kwargs): 335 """Reads a single line of input from the shell.""" 336 msg = "{0} has not implemented singleline()." 337 raise RuntimeError(msg.format(self.__class__.__name__)) 338 339 def precmd(self, line): 340 """Called just before execution of line.""" 341 return line if self.need_more_lines else line.lstrip() 342 343 def default(self, line): 344 """Implements code execution.""" 345 line = line if line.endswith("\n") else line + "\n" 346 src, code = self.push(line) 347 if code is None: 348 return 349 350 events.on_precommand.fire(cmd=src) 351 352 env = builtins.__xonsh_env__ 353 hist = builtins.__xonsh_history__ # pylint: disable=no-member 354 ts1 = None 355 enc = env.get("XONSH_ENCODING") 356 err = env.get("XONSH_ENCODING_ERRORS") 357 tee = Tee(encoding=enc, errors=err) 358 try: 359 ts0 = time.time() 360 run_compiled_code(code, self.ctx, None, "single") 361 ts1 = time.time() 362 if hist is not None and hist.last_cmd_rtn is None: 363 hist.last_cmd_rtn = 0 # returncode for success 364 except XonshError as e: 365 print(e.args[0], file=sys.stderr) 366 if hist is not None and hist.last_cmd_rtn is None: 367 hist.last_cmd_rtn = 1 # return code for failure 368 except Exception: # pylint: disable=broad-except 369 print_exception() 370 if hist is not None and hist.last_cmd_rtn is None: 371 hist.last_cmd_rtn = 1 # return code for failure 372 finally: 373 ts1 = ts1 or time.time() 374 tee_out = tee.getvalue() 375 self._append_history(inp=src, ts=[ts0, ts1], tee_out=tee_out) 376 self.accumulated_inputs += src 377 if ( 378 tee_out 379 and env.get("XONSH_APPEND_NEWLINE") 380 and not tee_out.endswith(os.linesep) 381 ): 382 print(os.linesep, end="") 383 tee.close() 384 self._fix_cwd() 385 if builtins.__xonsh_exit__: # pylint: disable=no-member 386 return True 387 388 def _append_history(self, tee_out=None, **info): 389 """Append information about the command to the history. 390 391 This also handles on_postcommand because this is the place where all the 392 information is available. 393 """ 394 hist = builtins.__xonsh_history__ # pylint: disable=no-member 395 info["rtn"] = hist.last_cmd_rtn if hist is not None else None 396 tee_out = tee_out or None 397 last_out = hist.last_cmd_out if hist is not None else None 398 if last_out is None and tee_out is None: 399 pass 400 elif last_out is None and tee_out is not None: 401 info["out"] = tee_out 402 elif last_out is not None and tee_out is None: 403 info["out"] = last_out 404 else: 405 info["out"] = tee_out + "\n" + last_out 406 events.on_postcommand.fire( 407 cmd=info["inp"], rtn=info["rtn"], out=info.get("out", None), ts=info["ts"] 408 ) 409 if hist is not None: 410 hist.append(info) 411 hist.last_cmd_rtn = hist.last_cmd_out = None 412 413 def _fix_cwd(self): 414 """Check if the cwd changed out from under us.""" 415 env = builtins.__xonsh_env__ 416 try: 417 cwd = os.getcwd() 418 except (FileNotFoundError, OSError): 419 cwd = None 420 if cwd is None: 421 # directory has been deleted out from under us, most likely 422 pwd = env.get("PWD", None) 423 if pwd is None: 424 # we have no idea where we are 425 env["PWD"] = "<invalid directory>" 426 elif os.path.isdir(pwd): 427 # unclear why os.getcwd() failed. do nothing. 428 pass 429 else: 430 # OK PWD is really gone. 431 msg = "{UNDERLINE_INTENSE_WHITE}{BACKGROUND_INTENSE_BLACK}" 432 msg += "xonsh: working directory does not exist: " + pwd 433 msg += "{NO_COLOR}" 434 self.print_color(msg, file=sys.stderr) 435 elif "PWD" not in env: 436 # $PWD is missing from env, recreate it 437 env["PWD"] = cwd 438 elif os.path.realpath(cwd) != os.path.realpath(env["PWD"]): 439 # The working directory has changed without updating $PWD, fix this 440 old = env["PWD"] 441 env["PWD"] = cwd 442 env["OLDPWD"] = old 443 events.on_chdir.fire(olddir=old, newdir=cwd) 444 445 def push(self, line): 446 """Pushes a line onto the buffer and compiles the code in a way that 447 enables multiline input. 448 """ 449 self.buffer.append(line) 450 if self.need_more_lines: 451 return None, None 452 src = "".join(self.buffer) 453 src = transform_command(src) 454 return self.compile(src) 455 456 def compile(self, src): 457 """Compiles source code and returns the (possibly modified) source and 458 a valid code object. 459 """ 460 _cache = should_use_cache(self.execer, "single") 461 if _cache: 462 codefname = code_cache_name(src) 463 cachefname = get_cache_filename(codefname, code=True) 464 usecache, code = code_cache_check(cachefname) 465 if usecache: 466 self.reset_buffer() 467 return src, code 468 lincont = get_line_continuation() 469 if src.endswith(lincont + "\n"): 470 self.need_more_lines = True 471 return src, None 472 try: 473 code = self.execer.compile(src, mode="single", glbs=self.ctx, locs=None) 474 if _cache: 475 update_cache(code, cachefname) 476 self.reset_buffer() 477 except SyntaxError: 478 partial_string_info = check_for_partial_string(src) 479 in_partial_string = ( 480 partial_string_info[0] is not None and partial_string_info[1] is None 481 ) 482 if (src == "\n" or src.endswith("\n\n")) and not in_partial_string: 483 self.reset_buffer() 484 print_exception() 485 return src, None 486 self.need_more_lines = True 487 code = None 488 except Exception: # pylint: disable=broad-except 489 self.reset_buffer() 490 print_exception() 491 code = None 492 return src, code 493 494 def reset_buffer(self): 495 """Resets the line buffer.""" 496 self.buffer.clear() 497 self.need_more_lines = False 498 self.mlprompt = None 499 500 def settitle(self): 501 """Sets terminal title.""" 502 env = builtins.__xonsh_env__ # pylint: disable=no-member 503 term = env.get("TERM", None) 504 # Shells running in emacs sets TERM to "dumb" or "eterm-color". 505 # Do not set title for these to avoid garbled prompt. 506 if (term is None and not ON_WINDOWS) or term in [ 507 "dumb", 508 "eterm-color", 509 "linux", 510 ]: 511 return 512 t = env.get("TITLE") 513 if t is None: 514 return 515 t = self.prompt_formatter(t) 516 if ON_WINDOWS and "ANSICON" not in env: 517 kernel32.SetConsoleTitleW(t) 518 else: 519 with open(1, "wb", closefd=False) as f: 520 # prevent xonsh from answering interactive questions 521 # on the next command by writing the title 522 f.write("\x1b]0;{0}\x07".format(t).encode()) 523 f.flush() 524 525 @property 526 def prompt(self): 527 """Obtains the current prompt string.""" 528 if self.need_more_lines: 529 if self.mlprompt is None: 530 try: 531 self.mlprompt = multiline_prompt() 532 except Exception: # pylint: disable=broad-except 533 print_exception() 534 self.mlprompt = "<multiline prompt error> " 535 return self.mlprompt 536 env = builtins.__xonsh_env__ # pylint: disable=no-member 537 p = env.get("PROMPT") 538 try: 539 p = self.prompt_formatter(p) 540 except Exception: # pylint: disable=broad-except 541 print_exception() 542 self.settitle() 543 return p 544 545 def format_color(self, string, hide=False, force_string=False, **kwargs): 546 """Formats the colors in a string. ``BaseShell``'s default implementation 547 of this method uses colors based on ANSI color codes. 548 """ 549 style = builtins.__xonsh_env__.get("XONSH_COLOR_STYLE") 550 return ansi_partial_color_format(string, hide=hide, style=style) 551 552 def print_color(self, string, hide=False, **kwargs): 553 """Prints a string in color. This base implementation's colors are based 554 on ANSI color codes if a string was given as input. If a list of token 555 pairs is given, it will color based on pygments, if available. If 556 pygments is not available, it will print a colorless string. 557 """ 558 if isinstance(string, str): 559 s = self.format_color(string, hide=hide) 560 elif HAS_PYGMENTS: 561 # assume this is a list of (Token, str) tuples and format it 562 env = builtins.__xonsh_env__ 563 self.styler.style_name = env.get("XONSH_COLOR_STYLE") 564 style_proxy = pyghooks.xonsh_style_proxy(self.styler) 565 formatter = pyghooks.XonshTerminal256Formatter(style=style_proxy) 566 s = pygments.format(string, formatter).rstrip() 567 else: 568 # assume this is a list of (Token, str) tuples and remove color 569 s = "".join([x for _, x in string]) 570 print(s, **kwargs) 571 572 def color_style_names(self): 573 """Returns an iterable of all available style names.""" 574 return () 575 576 def color_style(self): 577 """Returns the current color map.""" 578 return {} 579 580 def restore_tty_sanity(self): 581 """An interface for resetting the TTY stdin mode. This is highly 582 dependent on the shell backend. Also it is mostly optional since 583 it only affects ^Z backgrounding behaviour. 584 """ 585 pass 586