1# -*- coding: utf-8 -*- 2"""Interface for running Python functions as subprocess-mode commands. 3 4Code for several helper methods in the `ProcProxy` class have been reproduced 5without modification from `subprocess.py` in the Python 3.4.2 standard library. 6The contents of `subprocess.py` (and, thus, the reproduced methods) are 7Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> and were 8licensed to the Python Software foundation under a Contributor Agreement. 9""" 10import io 11import os 12import re 13import sys 14import time 15import queue 16import array 17import ctypes 18import signal 19import inspect 20import builtins 21import functools 22import threading 23import subprocess 24import collections.abc as cabc 25 26from xonsh.platform import ( 27 ON_WINDOWS, 28 ON_POSIX, 29 ON_MSYS, 30 ON_CYGWIN, 31 CAN_RESIZE_WINDOW, 32 LFLAG, 33 CC, 34) 35from xonsh.tools import ( 36 redirect_stdout, 37 redirect_stderr, 38 print_exception, 39 XonshCalledProcessError, 40 findfirst, 41 on_main_thread, 42 XonshError, 43 format_std_prepost, 44) 45from xonsh.lazyasd import lazyobject, LazyObject 46from xonsh.jobs import wait_for_active_job, give_terminal_to, _continue 47from xonsh.lazyimps import fcntl, termios, _winapi, msvcrt, winutils 48 49# these decorators are imported for users back-compatible 50from xonsh.tools import unthreadable, uncapturable # NOQA 51 52# foreground has be deprecated 53foreground = unthreadable 54 55 56@lazyobject 57def STDOUT_CAPTURE_KINDS(): 58 return frozenset(["stdout", "object"]) 59 60 61# The following escape codes are xterm codes. 62# See http://rtfm.etla.org/xterm/ctlseq.html for more. 63MODE_NUMS = ("1049", "47", "1047") 64START_ALTERNATE_MODE = LazyObject( 65 lambda: frozenset("\x1b[?{0}h".format(i).encode() for i in MODE_NUMS), 66 globals(), 67 "START_ALTERNATE_MODE", 68) 69END_ALTERNATE_MODE = LazyObject( 70 lambda: frozenset("\x1b[?{0}l".format(i).encode() for i in MODE_NUMS), 71 globals(), 72 "END_ALTERNATE_MODE", 73) 74ALTERNATE_MODE_FLAGS = LazyObject( 75 lambda: tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE), 76 globals(), 77 "ALTERNATE_MODE_FLAGS", 78) 79RE_HIDDEN_BYTES = LazyObject( 80 lambda: re.compile(b"(\001.*?\002)"), globals(), "RE_HIDDEN" 81) 82 83 84@lazyobject 85def RE_VT100_ESCAPE(): 86 return re.compile(b"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]") 87 88 89@lazyobject 90def RE_HIDE_ESCAPE(): 91 return re.compile( 92 b"(" + RE_HIDDEN_BYTES.pattern + b"|" + RE_VT100_ESCAPE.pattern + b")" 93 ) 94 95 96class QueueReader: 97 """Provides a file-like interface to reading from a queue.""" 98 99 def __init__(self, fd, timeout=None): 100 """ 101 Parameters 102 ---------- 103 fd : int 104 A file descriptor 105 timeout : float or None, optional 106 The queue reading timeout. 107 """ 108 self.fd = fd 109 self.timeout = timeout 110 self.closed = False 111 self.queue = queue.Queue() 112 self.thread = None 113 114 def close(self): 115 """close the reader""" 116 self.closed = True 117 118 def is_fully_read(self): 119 """Returns whether or not the queue is fully read and the reader is 120 closed. 121 """ 122 return ( 123 self.closed 124 and (self.thread is None or not self.thread.is_alive()) 125 and self.queue.empty() 126 ) 127 128 def read_queue(self): 129 """Reads a single chunk from the queue. This is blocking if 130 the timeout is None and non-blocking otherwise. 131 """ 132 try: 133 return self.queue.get(block=True, timeout=self.timeout) 134 except queue.Empty: 135 return b"" 136 137 def read(self, size=-1): 138 """Reads bytes from the file.""" 139 i = 0 140 buf = b"" 141 while size < 0 or i != size: 142 line = self.read_queue() 143 if line: 144 buf += line 145 else: 146 break 147 i += len(line) 148 return buf 149 150 def readline(self, size=-1): 151 """Reads a line, or a partial line from the file descriptor.""" 152 i = 0 153 nl = b"\n" 154 buf = b"" 155 while size < 0 or i != size: 156 line = self.read_queue() 157 if line: 158 buf += line 159 if line.endswith(nl): 160 break 161 else: 162 break 163 i += len(line) 164 return buf 165 166 def _read_all_lines(self): 167 """This reads all remaining lines in a blocking fashion.""" 168 lines = [] 169 while not self.is_fully_read(): 170 chunk = self.read_queue() 171 lines.extend(chunk.splitlines(keepends=True)) 172 return lines 173 174 def readlines(self, hint=-1): 175 """Reads lines from the file descriptor. This is blocking for negative 176 hints (i.e. read all the remaining lines) and non-blocking otherwise. 177 """ 178 if hint == -1: 179 return self._read_all_lines() 180 lines = [] 181 while len(lines) != hint: 182 chunk = self.read_queue() 183 if not chunk: 184 break 185 lines.extend(chunk.splitlines(keepends=True)) 186 return lines 187 188 def fileno(self): 189 """Returns the file descriptor number.""" 190 return self.fd 191 192 @staticmethod 193 def readable(): 194 """Returns true, because this object is always readable.""" 195 return True 196 197 def iterqueue(self): 198 """Iterates through all remaining chunks in a blocking fashion.""" 199 while not self.is_fully_read(): 200 chunk = self.read_queue() 201 if not chunk: 202 continue 203 yield chunk 204 205 206def populate_fd_queue(reader, fd, queue): 207 """Reads 1 kb of data from a file descriptor into a queue. 208 If this ends or fails, it flags the calling reader object as closed. 209 """ 210 while True: 211 try: 212 c = os.read(fd, 1024) 213 except OSError: 214 reader.closed = True 215 break 216 if c: 217 queue.put(c) 218 else: 219 reader.closed = True 220 break 221 222 223class NonBlockingFDReader(QueueReader): 224 """A class for reading characters from a file descriptor on a background 225 thread. This has the advantages that the calling thread can close the 226 file and that the reading does not block the calling thread. 227 """ 228 229 def __init__(self, fd, timeout=None): 230 """ 231 Parameters 232 ---------- 233 fd : int 234 A file descriptor 235 timeout : float or None, optional 236 The queue reading timeout. 237 """ 238 super().__init__(fd, timeout=timeout) 239 # start reading from stream 240 self.thread = threading.Thread( 241 target=populate_fd_queue, args=(self, self.fd, self.queue) 242 ) 243 self.thread.daemon = True 244 self.thread.start() 245 246 247def populate_buffer(reader, fd, buffer, chunksize): 248 """Reads bytes from the file descriptor and copies them into a buffer. 249 250 The reads happen in parallel using the pread() syscall; which is only 251 available on POSIX systems. If the read fails for any reason, the reader is 252 flagged as closed. 253 """ 254 offset = 0 255 while True: 256 try: 257 buf = os.pread(fd, chunksize, offset) 258 except OSError: 259 reader.closed = True 260 break 261 if buf: 262 buffer.write(buf) 263 offset += len(buf) 264 else: 265 reader.closed = True 266 break 267 268 269class BufferedFDParallelReader: 270 """Buffered, parallel background thread reader.""" 271 272 def __init__(self, fd, buffer=None, chunksize=1024): 273 """ 274 Parameters 275 ---------- 276 fd : int 277 File descriptor from which to read. 278 buffer : binary file-like or None, optional 279 A buffer to write bytes into. If None, a new BytesIO object 280 is created. 281 chunksize : int, optional 282 The max size of the parallel reads, default 1 kb. 283 """ 284 self.fd = fd 285 self.buffer = io.BytesIO() if buffer is None else buffer 286 self.chunksize = chunksize 287 self.closed = False 288 # start reading from stream 289 self.thread = threading.Thread( 290 target=populate_buffer, args=(self, fd, self.buffer, chunksize) 291 ) 292 self.thread.daemon = True 293 294 self.thread.start() 295 296 297def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd): 298 # if we are getting close to the end of the console buffer, 299 # expand it so that we can read from it successfully. 300 if cols == 0: 301 return orig_posize[-1], max_offset, orig_posize 302 rows = ((max_offset + expandsize) // cols) + 1 303 winutils.set_console_screen_buffer_size(cols, rows, fd=fd) 304 orig_posize = orig_posize[:3] + (rows,) 305 max_offset = (rows - 1) * cols 306 return rows, max_offset, orig_posize 307 308 309def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None): 310 """Reads bytes from the file descriptor and puts lines into the queue. 311 The reads happened in parallel, 312 using xonsh.winutils.read_console_output_character(), 313 and is thus only available on windows. If the read fails for any reason, 314 the reader is flagged as closed. 315 """ 316 # OK, so this function is super annoying because Windows stores its 317 # buffers as a 2D regular, dense array -- without trailing newlines. 318 # Meanwhile, we want to add *lines* to the queue. Also, as is typical 319 # with parallel reads, the entire buffer that you ask for may not be 320 # filled. Thus we have to deal with the full generality. 321 # 1. reads may end in the middle of a line 322 # 2. excess whitespace at the end of a line may not be real, unless 323 # 3. you haven't read to the end of the line yet! 324 # So there are alignment issues everywhere. Also, Windows will automatically 325 # read past the current cursor position, even though there is presumably 326 # nothing to see there. 327 # 328 # These chunked reads basically need to happen like this because, 329 # a. The default buffer size is HUGE for the console (90k lines x 120 cols) 330 # as so we can't just read in everything at the end and see what we 331 # care about without a noticeable performance hit. 332 # b. Even with this huge size, it is still possible to write more lines than 333 # this, so we should scroll along with the console. 334 # Unfortunately, because we do not have control over the terminal emulator, 335 # It is not possible to compute how far back we should set the beginning 336 # read position because we don't know how many characters have been popped 337 # off the top of the buffer. If we did somehow know this number we could do 338 # something like the following: 339 # 340 # new_offset = (y*cols) + x 341 # if new_offset == max_offset: 342 # new_offset -= scrolled_offset 343 # x = new_offset%cols 344 # y = new_offset//cols 345 # continue 346 # 347 # So this method is imperfect and only works as long as the screen has 348 # room to expand to. Thus the trick here is to expand the screen size 349 # when we get close enough to the end of the screen. There remain some 350 # async issues related to not being able to set the cursor position. 351 # but they just affect the alignment / capture of the output of the 352 # first command run after a screen resize. 353 if expandsize is None: 354 expandsize = 100 * chunksize 355 x, y, cols, rows = posize = winutils.get_position_size(fd) 356 pre_x = pre_y = -1 357 orig_posize = posize 358 offset = (cols * y) + x 359 max_offset = (rows - 1) * cols 360 # I believe that there is a bug in PTK that if we reset the 361 # cursor position, the cursor on the next prompt is accidentally on 362 # the next line. If this is fixed, uncomment the following line. 363 # if max_offset < offset + expandsize: 364 # rows, max_offset, orig_posize = _expand_console_buffer( 365 # cols, max_offset, expandsize, 366 # orig_posize, fd) 367 # winutils.set_console_cursor_position(x, y, fd=fd) 368 while True: 369 posize = winutils.get_position_size(fd) 370 offset = (cols * y) + x 371 if ((posize[1], posize[0]) <= (y, x) and posize[2:] == (cols, rows)) or ( 372 pre_x == x and pre_y == y 373 ): 374 # already at or ahead of the current cursor position. 375 if reader.closed: 376 break 377 else: 378 time.sleep(reader.timeout) 379 continue 380 elif max_offset <= offset + expandsize: 381 ecb = _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd) 382 rows, max_offset, orig_posize = ecb 383 continue 384 elif posize[2:] == (cols, rows): 385 # cursor updated but screen size is the same. 386 pass 387 else: 388 # screen size changed, which is offset preserving 389 orig_posize = posize 390 cols, rows = posize[2:] 391 x = offset % cols 392 y = offset // cols 393 pre_x = pre_y = -1 394 max_offset = (rows - 1) * cols 395 continue 396 try: 397 buf = winutils.read_console_output_character( 398 x=x, y=y, fd=fd, buf=buffer, bufsize=chunksize, raw=True 399 ) 400 except (OSError, IOError): 401 reader.closed = True 402 break 403 # cursor position and offset 404 if not reader.closed: 405 buf = buf.rstrip() 406 nread = len(buf) 407 if nread == 0: 408 time.sleep(reader.timeout) 409 continue 410 cur_x, cur_y = posize[0], posize[1] 411 cur_offset = (cols * cur_y) + cur_x 412 beg_offset = (cols * y) + x 413 end_offset = beg_offset + nread 414 if end_offset > cur_offset and cur_offset != max_offset: 415 buf = buf[: cur_offset - end_offset] 416 # convert to lines 417 xshift = cols - x 418 yshift = (nread // cols) + (1 if nread % cols > 0 else 0) 419 lines = [buf[:xshift]] 420 lines += [ 421 buf[l * cols + xshift : (l + 1) * cols + xshift] for l in range(yshift) 422 ] 423 lines = [line for line in lines if line] 424 if not lines: 425 time.sleep(reader.timeout) 426 continue 427 # put lines in the queue 428 nl = b"\n" 429 for line in lines[:-1]: 430 queue.put(line.rstrip() + nl) 431 if len(lines[-1]) == xshift: 432 queue.put(lines[-1].rstrip() + nl) 433 else: 434 queue.put(lines[-1]) 435 # update x and y locations 436 if (beg_offset + len(buf)) % cols == 0: 437 new_offset = beg_offset + len(buf) 438 else: 439 new_offset = beg_offset + len(buf.rstrip()) 440 pre_x = x 441 pre_y = y 442 x = new_offset % cols 443 y = new_offset // cols 444 time.sleep(reader.timeout) 445 446 447class ConsoleParallelReader(QueueReader): 448 """Parallel reader for consoles that runs in a background thread. 449 This is only needed, available, and useful on Windows. 450 """ 451 452 def __init__(self, fd, buffer=None, chunksize=1024, timeout=None): 453 """ 454 Parameters 455 ---------- 456 fd : int 457 Standard buffer file descriptor, 0 for stdin, 1 for stdout (default), 458 and 2 for stderr. 459 buffer : ctypes.c_wchar_p, optional 460 An existing buffer to (re-)use. 461 chunksize : int, optional 462 The max size of the parallel reads, default 1 kb. 463 timeout : float, optional 464 The queue reading timeout. 465 """ 466 timeout = timeout or builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY") 467 super().__init__(fd, timeout=timeout) 468 self._buffer = buffer # this cannot be public 469 if buffer is None: 470 self._buffer = ctypes.c_char_p(b" " * chunksize) 471 self.chunksize = chunksize 472 # start reading from stream 473 self.thread = threading.Thread( 474 target=populate_console, 475 args=(self, fd, self._buffer, chunksize, self.queue), 476 ) 477 self.thread.daemon = True 478 self.thread.start() 479 480 481def safe_fdclose(handle, cache=None): 482 """Closes a file handle in the safest way possible, and potentially 483 storing the result. 484 """ 485 if cache is not None and cache.get(handle, False): 486 return 487 status = True 488 if handle is None: 489 pass 490 elif isinstance(handle, int): 491 if handle >= 3: 492 # don't close stdin, stdout, stderr, -1 493 try: 494 os.close(handle) 495 except OSError: 496 status = False 497 elif handle is sys.stdin or handle is sys.stdout or handle is sys.stderr: 498 # don't close stdin, stdout, or stderr 499 pass 500 else: 501 try: 502 handle.close() 503 except OSError: 504 status = False 505 if cache is not None: 506 cache[handle] = status 507 508 509def safe_flush(handle): 510 """Attempts to safely flush a file handle, returns success bool.""" 511 status = True 512 try: 513 handle.flush() 514 except OSError: 515 status = False 516 return status 517 518 519def still_writable(fd): 520 """Determines whether a file descriptor is still writable by trying to 521 write an empty string and seeing if it fails. 522 """ 523 try: 524 os.write(fd, b"") 525 status = True 526 except OSError: 527 status = False 528 return status 529 530 531class PopenThread(threading.Thread): 532 """A thread for running and managing subprocess. This allows reading 533 from the stdin, stdout, and stderr streams in a non-blocking fashion. 534 535 This takes the same arguments and keyword arguments as regular Popen. 536 This requires that the captured_stdout and captured_stderr attributes 537 to be set following instantiation. 538 """ 539 540 def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs): 541 super().__init__() 542 self.lock = threading.RLock() 543 env = builtins.__xonsh_env__ 544 # stdin setup 545 self.orig_stdin = stdin 546 if stdin is None: 547 self.stdin_fd = 0 548 elif isinstance(stdin, int): 549 self.stdin_fd = stdin 550 else: 551 self.stdin_fd = stdin.fileno() 552 self.store_stdin = env.get("XONSH_STORE_STDIN") 553 self.timeout = env.get("XONSH_PROC_FREQUENCY") 554 self.in_alt_mode = False 555 self.stdin_mode = None 556 # stdout setup 557 self.orig_stdout = stdout 558 self.stdout_fd = 1 if stdout is None else stdout.fileno() 559 self._set_pty_size() 560 # stderr setup 561 self.orig_stderr = stderr 562 # Set some signal handles, if we can. Must come before process 563 # is started to prevent deadlock on windows 564 self.proc = None # has to be here for closure for handles 565 self.old_int_handler = self.old_winch_handler = None 566 self.old_tstp_handler = self.old_quit_handler = None 567 if on_main_thread(): 568 self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int) 569 if ON_POSIX: 570 self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp) 571 self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit) 572 if CAN_RESIZE_WINDOW: 573 self.old_winch_handler = signal.signal( 574 signal.SIGWINCH, self._signal_winch 575 ) 576 # start up process 577 if ON_WINDOWS and stdout is not None: 578 os.set_inheritable(stdout.fileno(), False) 579 580 try: 581 self.proc = proc = subprocess.Popen( 582 *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs 583 ) 584 except Exception: 585 self._clean_up() 586 raise 587 588 self.pid = proc.pid 589 self.universal_newlines = uninew = proc.universal_newlines 590 if uninew: 591 self.encoding = enc = env.get("XONSH_ENCODING") 592 self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS") 593 self.stdin = io.BytesIO() # stdin is always bytes! 594 self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) 595 self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err) 596 else: 597 self.encoding = self.encoding_errors = None 598 self.stdin = io.BytesIO() 599 self.stdout = io.BytesIO() 600 self.stderr = io.BytesIO() 601 self.suspended = False 602 self.prevs_are_closed = False 603 self.start() 604 605 def run(self): 606 """Runs the subprocess by performing a parallel read on stdin if allowed, 607 and copying bytes from captured_stdout to stdout and bytes from 608 captured_stderr to stderr. 609 """ 610 proc = self.proc 611 spec = self._wait_and_getattr("spec") 612 # get stdin and apply parallel reader if needed. 613 stdin = self.stdin 614 if self.orig_stdin is None: 615 origin = None 616 elif ON_POSIX and self.store_stdin: 617 origin = self.orig_stdin 618 origfd = origin if isinstance(origin, int) else origin.fileno() 619 origin = BufferedFDParallelReader(origfd, buffer=stdin) 620 else: 621 origin = None 622 # get non-blocking stdout 623 stdout = self.stdout.buffer if self.universal_newlines else self.stdout 624 capout = spec.captured_stdout 625 if capout is None: 626 procout = None 627 else: 628 procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout) 629 # get non-blocking stderr 630 stderr = self.stderr.buffer if self.universal_newlines else self.stderr 631 caperr = spec.captured_stderr 632 if caperr is None: 633 procerr = None 634 else: 635 procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout) 636 # initial read from buffer 637 self._read_write(procout, stdout, sys.__stdout__) 638 self._read_write(procerr, stderr, sys.__stderr__) 639 # loop over reads while process is running. 640 i = j = cnt = 1 641 while proc.poll() is None: 642 # this is here for CPU performance reasons. 643 if i + j == 0: 644 cnt = min(cnt + 1, 1000) 645 tout = self.timeout * cnt 646 if procout is not None: 647 procout.timeout = tout 648 if procerr is not None: 649 procerr.timeout = tout 650 elif cnt == 1: 651 pass 652 else: 653 cnt = 1 654 if procout is not None: 655 procout.timeout = self.timeout 656 if procerr is not None: 657 procerr.timeout = self.timeout 658 # redirect some output! 659 i = self._read_write(procout, stdout, sys.__stdout__) 660 j = self._read_write(procerr, stderr, sys.__stderr__) 661 if self.suspended: 662 break 663 if self.suspended: 664 return 665 # close files to send EOF to non-blocking reader. 666 # capout & caperr seem to be needed only by Windows, while 667 # orig_stdout & orig_stderr are need by posix and Windows. 668 # Also, order seems to matter here, 669 # with orig_* needed to be closed before cap* 670 safe_fdclose(self.orig_stdout) 671 safe_fdclose(self.orig_stderr) 672 if ON_WINDOWS: 673 safe_fdclose(capout) 674 safe_fdclose(caperr) 675 # read in the remaining data in a blocking fashion. 676 while (procout is not None and not procout.is_fully_read()) or ( 677 procerr is not None and not procerr.is_fully_read() 678 ): 679 self._read_write(procout, stdout, sys.__stdout__) 680 self._read_write(procerr, stderr, sys.__stderr__) 681 # kill the process if it is still alive. Happens when piping. 682 if proc.poll() is None: 683 proc.terminate() 684 685 def _wait_and_getattr(self, name): 686 """make sure the instance has a certain attr, and return it.""" 687 while not hasattr(self, name): 688 time.sleep(1e-7) 689 return getattr(self, name) 690 691 def _read_write(self, reader, writer, stdbuf): 692 """Reads a chunk of bytes from a buffer and write into memory or back 693 down to the standard buffer, as appropriate. Returns the number of 694 successful reads. 695 """ 696 if reader is None: 697 return 0 698 i = -1 699 for i, chunk in enumerate(iter(reader.read_queue, b"")): 700 self._alt_mode_switch(chunk, writer, stdbuf) 701 if i >= 0: 702 writer.flush() 703 stdbuf.flush() 704 return i + 1 705 706 def _alt_mode_switch(self, chunk, membuf, stdbuf): 707 """Enables recursively switching between normal capturing mode 708 and 'alt' mode, which passes through values to the standard 709 buffer. Pagers, text editors, curses applications, etc. use 710 alternate mode. 711 """ 712 i, flag = findfirst(chunk, ALTERNATE_MODE_FLAGS) 713 if flag is None: 714 self._alt_mode_writer(chunk, membuf, stdbuf) 715 else: 716 # This code is executed when the child process switches the 717 # terminal into or out of alternate mode. The line below assumes 718 # that the user has opened vim, less, or similar, and writes writes 719 # to stdin. 720 j = i + len(flag) 721 # write the first part of the chunk in the current mode. 722 self._alt_mode_writer(chunk[:i], membuf, stdbuf) 723 # switch modes 724 # write the flag itself the current mode where alt mode is on 725 # so that it is streamed to the terminal ASAP. 726 # this is needed for terminal emulators to find the correct 727 # positions before and after alt mode. 728 alt_mode = flag in START_ALTERNATE_MODE 729 if alt_mode: 730 self.in_alt_mode = alt_mode 731 self._alt_mode_writer(flag, membuf, stdbuf) 732 self._enable_cbreak_stdin() 733 else: 734 self._alt_mode_writer(flag, membuf, stdbuf) 735 self.in_alt_mode = alt_mode 736 self._disable_cbreak_stdin() 737 # recurse this function, but without the current flag. 738 self._alt_mode_switch(chunk[j:], membuf, stdbuf) 739 740 def _alt_mode_writer(self, chunk, membuf, stdbuf): 741 """Write bytes to the standard buffer if in alt mode or otherwise 742 to the in-memory buffer. 743 """ 744 if not chunk: 745 pass # don't write empty values 746 elif self.in_alt_mode: 747 stdbuf.buffer.write(chunk) 748 else: 749 with self.lock: 750 p = membuf.tell() 751 membuf.seek(0, io.SEEK_END) 752 membuf.write(chunk) 753 membuf.seek(p) 754 755 # 756 # Window resize handlers 757 # 758 759 def _signal_winch(self, signum, frame): 760 """Signal handler for SIGWINCH - window size has changed.""" 761 self.send_signal(signal.SIGWINCH) 762 self._set_pty_size() 763 764 def _set_pty_size(self): 765 """Sets the window size of the child pty based on the window size of 766 our own controlling terminal. 767 """ 768 if ON_WINDOWS or not os.isatty(self.stdout_fd): 769 return 770 # Get the terminal size of the real terminal, set it on the 771 # pseudoterminal. 772 buf = array.array("h", [0, 0, 0, 0]) 773 # 1 = stdout here 774 try: 775 fcntl.ioctl(1, termios.TIOCGWINSZ, buf, True) 776 fcntl.ioctl(self.stdout_fd, termios.TIOCSWINSZ, buf) 777 except OSError: 778 pass 779 780 # 781 # SIGINT handler 782 # 783 784 def _signal_int(self, signum, frame): 785 """Signal handler for SIGINT - Ctrl+C may have been pressed.""" 786 self.send_signal(signum) 787 if self.proc is not None and self.proc.poll() is not None: 788 self._restore_sigint(frame=frame) 789 if on_main_thread(): 790 signal.pthread_kill(threading.get_ident(), signal.SIGINT) 791 792 def _restore_sigint(self, frame=None): 793 old = self.old_int_handler 794 if old is not None: 795 if on_main_thread(): 796 signal.signal(signal.SIGINT, old) 797 self.old_int_handler = None 798 if frame is not None: 799 self._disable_cbreak_stdin() 800 if old is not None and old is not self._signal_int: 801 old(signal.SIGINT, frame) 802 803 # 804 # SIGTSTP handler 805 # 806 807 def _signal_tstp(self, signum, frame): 808 """Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed. 809 """ 810 self.suspended = True 811 self.send_signal(signum) 812 self._restore_sigtstp(frame=frame) 813 814 def _restore_sigtstp(self, frame=None): 815 old = self.old_tstp_handler 816 if old is not None: 817 if on_main_thread(): 818 signal.signal(signal.SIGTSTP, old) 819 self.old_tstp_handler = None 820 if frame is not None: 821 self._disable_cbreak_stdin() 822 823 # 824 # SIGQUIT handler 825 # 826 827 def _signal_quit(self, signum, frame): 828 """Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed. 829 """ 830 self.send_signal(signum) 831 self._restore_sigquit(frame=frame) 832 833 def _restore_sigquit(self, frame=None): 834 old = self.old_quit_handler 835 if old is not None: 836 if on_main_thread(): 837 signal.signal(signal.SIGQUIT, old) 838 self.old_quit_handler = None 839 if frame is not None: 840 self._disable_cbreak_stdin() 841 842 # 843 # cbreak mode handlers 844 # 845 846 def _enable_cbreak_stdin(self): 847 if not ON_POSIX: 848 return 849 try: 850 self.stdin_mode = termios.tcgetattr(self.stdin_fd)[:] 851 except termios.error: 852 # this can happen for cases where another process is controlling 853 # xonsh's tty device, such as in testing. 854 self.stdin_mode = None 855 return 856 new = self.stdin_mode[:] 857 new[LFLAG] &= ~(termios.ECHO | termios.ICANON) 858 new[CC][termios.VMIN] = 1 859 new[CC][termios.VTIME] = 0 860 try: 861 # termios.TCSAFLUSH may be less reliable than termios.TCSANOW 862 termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new) 863 except termios.error: 864 self._disable_cbreak_stdin() 865 866 def _disable_cbreak_stdin(self): 867 if not ON_POSIX or self.stdin_mode is None: 868 return 869 new = self.stdin_mode[:] 870 new[LFLAG] |= termios.ECHO | termios.ICANON 871 new[CC][termios.VMIN] = 1 872 new[CC][termios.VTIME] = 0 873 try: 874 termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new) 875 except termios.error: 876 pass 877 878 # 879 # Dispatch methods 880 # 881 882 def poll(self): 883 """Dispatches to Popen.returncode.""" 884 return self.proc.returncode 885 886 def wait(self, timeout=None): 887 """Dispatches to Popen.wait(), but also does process cleanup such as 888 joining this thread and replacing the original window size signal 889 handler. 890 """ 891 self._disable_cbreak_stdin() 892 rtn = self.proc.wait(timeout=timeout) 893 self.join() 894 # need to replace the old signal handlers somewhere... 895 if self.old_winch_handler is not None and on_main_thread(): 896 signal.signal(signal.SIGWINCH, self.old_winch_handler) 897 self.old_winch_handler = None 898 self._clean_up() 899 return rtn 900 901 def _clean_up(self): 902 self._restore_sigint() 903 self._restore_sigtstp() 904 self._restore_sigquit() 905 906 @property 907 def returncode(self): 908 """Process return code.""" 909 return self.proc.returncode 910 911 @returncode.setter 912 def returncode(self, value): 913 """Process return code.""" 914 self.proc.returncode = value 915 916 @property 917 def signal(self): 918 """Process signal, or None.""" 919 s = getattr(self.proc, "signal", None) 920 if s is None: 921 rtn = self.returncode 922 if rtn is not None and rtn != 0: 923 s = (-1 * rtn, rtn < 0 if ON_WINDOWS else os.WCOREDUMP(rtn)) 924 return s 925 926 @signal.setter 927 def signal(self, value): 928 """Process signal, or None.""" 929 self.proc.signal = value 930 931 def send_signal(self, signal): 932 """Dispatches to Popen.send_signal().""" 933 dt = 0.0 934 while self.proc is None and dt < self.timeout: 935 time.sleep(1e-7) 936 dt += 1e-7 937 if self.proc is None: 938 return 939 try: 940 rtn = self.proc.send_signal(signal) 941 except ProcessLookupError: 942 # This can happen in the case of !(cmd) when the command has ended 943 rtn = None 944 return rtn 945 946 def terminate(self): 947 """Dispatches to Popen.terminate().""" 948 return self.proc.terminate() 949 950 def kill(self): 951 """Dispatches to Popen.kill().""" 952 return self.proc.kill() 953 954 955class Handle(int): 956 closed = False 957 958 def Close(self, CloseHandle=None): 959 CloseHandle = CloseHandle or _winapi.CloseHandle 960 if not self.closed: 961 self.closed = True 962 CloseHandle(self) 963 964 def Detach(self): 965 if not self.closed: 966 self.closed = True 967 return int(self) 968 raise ValueError("already closed") 969 970 def __repr__(self): 971 return "Handle(%d)" % int(self) 972 973 __del__ = Close 974 __str__ = __repr__ 975 976 977class FileThreadDispatcher: 978 """Dispatches to different file handles depending on the 979 current thread. Useful if you want file operation to go to different 980 places for different threads. 981 """ 982 983 def __init__(self, default=None): 984 """ 985 Parameters 986 ---------- 987 default : file-like or None, optional 988 The file handle to write to if a thread cannot be found in 989 the registry. If None, a new in-memory instance. 990 991 Attributes 992 ---------- 993 registry : dict 994 Maps thread idents to file handles. 995 """ 996 if default is None: 997 default = io.TextIOWrapper(io.BytesIO()) 998 self.default = default 999 self.registry = {} 1000 1001 def register(self, handle): 1002 """Registers a file handle for the current thread. Returns self so 1003 that this method can be used in a with-statement. 1004 """ 1005 self.registry[threading.get_ident()] = handle 1006 return self 1007 1008 def deregister(self): 1009 """Removes the current thread from the registry.""" 1010 del self.registry[threading.get_ident()] 1011 1012 @property 1013 def available(self): 1014 """True if the thread is available in the registry.""" 1015 return threading.get_ident() in self.registry 1016 1017 @property 1018 def handle(self): 1019 """Gets the current handle for the thread.""" 1020 return self.registry.get(threading.get_ident(), self.default) 1021 1022 def __enter__(self): 1023 pass 1024 1025 def __exit__(self, ex_type, ex_value, ex_traceback): 1026 self.deregister() 1027 1028 # 1029 # io.TextIOBase interface 1030 # 1031 1032 @property 1033 def encoding(self): 1034 """Gets the encoding for this thread's handle.""" 1035 return self.handle.encoding 1036 1037 @property 1038 def errors(self): 1039 """Gets the errors for this thread's handle.""" 1040 return self.handle.errors 1041 1042 @property 1043 def newlines(self): 1044 """Gets the newlines for this thread's handle.""" 1045 return self.handle.newlines 1046 1047 @property 1048 def buffer(self): 1049 """Gets the buffer for this thread's handle.""" 1050 return self.handle.buffer 1051 1052 def detach(self): 1053 """Detaches the buffer for the current thread.""" 1054 return self.handle.detach() 1055 1056 def read(self, size=None): 1057 """Reads from the handle for the current thread.""" 1058 return self.handle.read(size) 1059 1060 def readline(self, size=-1): 1061 """Reads a line from the handle for the current thread.""" 1062 return self.handle.readline(size) 1063 1064 def readlines(self, hint=-1): 1065 """Reads lines from the handle for the current thread.""" 1066 return self.handle.readlines(hint) 1067 1068 def seek(self, offset, whence=io.SEEK_SET): 1069 """Seeks the current file.""" 1070 return self.handle.seek(offset, whence) 1071 1072 def tell(self): 1073 """Reports the current position in the handle for the current thread.""" 1074 return self.handle.tell() 1075 1076 def write(self, s): 1077 """Writes to this thread's handle. This also flushes, just to be 1078 extra sure the string was written. 1079 """ 1080 h = self.handle 1081 try: 1082 r = h.write(s) 1083 h.flush() 1084 except OSError: 1085 r = None 1086 return r 1087 1088 @property 1089 def line_buffering(self): 1090 """Gets if line buffering for this thread's handle enabled.""" 1091 return self.handle.line_buffering 1092 1093 # 1094 # io.IOBase interface 1095 # 1096 1097 def close(self): 1098 """Closes the current thread's handle.""" 1099 return self.handle.close() 1100 1101 @property 1102 def closed(self): 1103 """Is the thread's handle closed.""" 1104 return self.handle.closed 1105 1106 def fileno(self): 1107 """Returns the file descriptor for the current thread.""" 1108 return self.handle.fileno() 1109 1110 def flush(self): 1111 """Flushes the file descriptor for the current thread.""" 1112 return safe_flush(self.handle) 1113 1114 def isatty(self): 1115 """Returns if the file descriptor for the current thread is a tty.""" 1116 return self.handle.isatty() 1117 1118 def readable(self): 1119 """Returns if file descriptor for the current thread is readable.""" 1120 return self.handle.readable() 1121 1122 def seekable(self): 1123 """Returns if file descriptor for the current thread is seekable.""" 1124 return self.handle.seekable() 1125 1126 def truncate(self, size=None): 1127 """Truncates the file for for the current thread.""" 1128 return self.handle.truncate() 1129 1130 def writable(self, size=None): 1131 """Returns if file descriptor for the current thread is writable.""" 1132 return self.handle.writable(size) 1133 1134 def writelines(self): 1135 """Writes lines for the file descriptor for the current thread.""" 1136 return self.handle.writelines() 1137 1138 1139# These should NOT be lazy since they *need* to get the true stdout from the 1140# main thread. Also their creation time should be negligible. 1141STDOUT_DISPATCHER = FileThreadDispatcher(default=sys.stdout) 1142STDERR_DISPATCHER = FileThreadDispatcher(default=sys.stderr) 1143 1144 1145def parse_proxy_return(r, stdout, stderr): 1146 """Proxies may return a variety of outputs. This handles them generally. 1147 1148 Parameters 1149 ---------- 1150 r : tuple, str, int, or None 1151 Return from proxy function 1152 stdout : file-like 1153 Current stdout stream 1154 stdout : file-like 1155 Current stderr stream 1156 1157 Returns 1158 ------- 1159 cmd_result : int 1160 The return code of the proxy 1161 """ 1162 cmd_result = 0 1163 if isinstance(r, str): 1164 stdout.write(r) 1165 stdout.flush() 1166 elif isinstance(r, int): 1167 cmd_result = r 1168 elif isinstance(r, cabc.Sequence): 1169 rlen = len(r) 1170 if rlen > 0 and r[0] is not None: 1171 stdout.write(r[0]) 1172 stdout.flush() 1173 if rlen > 1 and r[1] is not None: 1174 stderr.write(r[1]) 1175 stderr.flush() 1176 if rlen > 2 and r[2] is not None: 1177 cmd_result = r[2] 1178 elif r is not None: 1179 # for the random object... 1180 stdout.write(str(r)) 1181 stdout.flush() 1182 return cmd_result 1183 1184 1185def proxy_zero(f, args, stdin, stdout, stderr, spec, stack): 1186 """Calls a proxy function which takes no parameters.""" 1187 return f() 1188 1189 1190def proxy_one(f, args, stdin, stdout, stderr, spec, stack): 1191 """Calls a proxy function which takes one parameter: args""" 1192 return f(args) 1193 1194 1195def proxy_two(f, args, stdin, stdout, stderr, spec, stack): 1196 """Calls a proxy function which takes two parameter: args and stdin.""" 1197 return f(args, stdin) 1198 1199 1200def proxy_three(f, args, stdin, stdout, stderr, spec, stack): 1201 """Calls a proxy function which takes three parameter: args, stdin, stdout. 1202 """ 1203 return f(args, stdin, stdout) 1204 1205 1206def proxy_four(f, args, stdin, stdout, stderr, spec, stack): 1207 """Calls a proxy function which takes four parameter: args, stdin, stdout, 1208 and stderr. 1209 """ 1210 return f(args, stdin, stdout, stderr) 1211 1212 1213def proxy_five(f, args, stdin, stdout, stderr, spec, stack): 1214 """Calls a proxy function which takes four parameter: args, stdin, stdout, 1215 stderr, and spec. 1216 """ 1217 return f(args, stdin, stdout, stderr, spec) 1218 1219 1220PROXIES = (proxy_zero, proxy_one, proxy_two, proxy_three, proxy_four, proxy_five) 1221PROXY_KWARG_NAMES = frozenset(["args", "stdin", "stdout", "stderr", "spec", "stack"]) 1222 1223 1224def partial_proxy(f): 1225 """Dispatches the appropriate proxy function based on the number of args.""" 1226 numargs = 0 1227 for name, param in inspect.signature(f).parameters.items(): 1228 if ( 1229 param.kind == param.POSITIONAL_ONLY 1230 or param.kind == param.POSITIONAL_OR_KEYWORD 1231 ): 1232 numargs += 1 1233 elif name in PROXY_KWARG_NAMES and param.kind == param.KEYWORD_ONLY: 1234 numargs += 1 1235 if numargs < 6: 1236 return functools.partial(PROXIES[numargs], f) 1237 elif numargs == 6: 1238 # don't need to partial. 1239 return f 1240 else: 1241 e = "Expected proxy with 6 or fewer arguments for {}, not {}" 1242 raise XonshError(e.format(", ".join(PROXY_KWARG_NAMES), numargs)) 1243 1244 1245class ProcProxyThread(threading.Thread): 1246 """ 1247 Class representing a function to be run as a subprocess-mode command. 1248 """ 1249 1250 def __init__( 1251 self, 1252 f, 1253 args, 1254 stdin=None, 1255 stdout=None, 1256 stderr=None, 1257 universal_newlines=False, 1258 env=None, 1259 ): 1260 """Parameters 1261 ---------- 1262 f : function 1263 The function to be executed. 1264 args : list 1265 A (possibly empty) list containing the arguments that were given on 1266 the command line 1267 stdin : file-like, optional 1268 A file-like object representing stdin (input can be read from 1269 here). If `stdin` is not provided or if it is explicitly set to 1270 `None`, then an instance of `io.StringIO` representing an empty 1271 file is used. 1272 stdout : file-like, optional 1273 A file-like object representing stdout (normal output can be 1274 written here). If `stdout` is not provided or if it is explicitly 1275 set to `None`, then `sys.stdout` is used. 1276 stderr : file-like, optional 1277 A file-like object representing stderr (error output can be 1278 written here). If `stderr` is not provided or if it is explicitly 1279 set to `None`, then `sys.stderr` is used. 1280 universal_newlines : bool, optional 1281 Whether or not to use universal newlines. 1282 env : Mapping, optional 1283 Environment mapping. 1284 """ 1285 self.orig_f = f 1286 self.f = partial_proxy(f) 1287 self.args = args 1288 self.pid = None 1289 self.returncode = None 1290 self._closed_handle_cache = {} 1291 1292 handles = self._get_handles(stdin, stdout, stderr) 1293 ( 1294 self.p2cread, 1295 self.p2cwrite, 1296 self.c2pread, 1297 self.c2pwrite, 1298 self.errread, 1299 self.errwrite, 1300 ) = handles 1301 1302 # default values 1303 self.stdin = stdin 1304 self.stdout = stdout 1305 self.stderr = stderr 1306 self.env = env or builtins.__xonsh_env__ 1307 self._interrupted = False 1308 1309 if ON_WINDOWS: 1310 if self.p2cwrite != -1: 1311 self.p2cwrite = msvcrt.open_osfhandle(self.p2cwrite.Detach(), 0) 1312 if self.c2pread != -1: 1313 self.c2pread = msvcrt.open_osfhandle(self.c2pread.Detach(), 0) 1314 if self.errread != -1: 1315 self.errread = msvcrt.open_osfhandle(self.errread.Detach(), 0) 1316 1317 if self.p2cwrite != -1: 1318 self.stdin = io.open(self.p2cwrite, "wb", -1) 1319 if universal_newlines: 1320 self.stdin = io.TextIOWrapper( 1321 self.stdin, write_through=True, line_buffering=False 1322 ) 1323 elif isinstance(stdin, int) and stdin != 0: 1324 self.stdin = io.open(stdin, "wb", -1) 1325 1326 if self.c2pread != -1: 1327 self.stdout = io.open(self.c2pread, "rb", -1) 1328 if universal_newlines: 1329 self.stdout = io.TextIOWrapper(self.stdout) 1330 1331 if self.errread != -1: 1332 self.stderr = io.open(self.errread, "rb", -1) 1333 if universal_newlines: 1334 self.stderr = io.TextIOWrapper(self.stderr) 1335 1336 # Set some signal handles, if we can. Must come before process 1337 # is started to prevent deadlock on windows 1338 self.old_int_handler = None 1339 if on_main_thread(): 1340 self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int) 1341 # start up the proc 1342 super().__init__() 1343 self.start() 1344 1345 def __del__(self): 1346 self._restore_sigint() 1347 1348 def run(self): 1349 """Set up input/output streams and execute the child function in a new 1350 thread. This is part of the `threading.Thread` interface and should 1351 not be called directly. 1352 """ 1353 if self.f is None: 1354 return 1355 spec = self._wait_and_getattr("spec") 1356 last_in_pipeline = spec.last_in_pipeline 1357 if last_in_pipeline: 1358 capout = spec.captured_stdout # NOQA 1359 caperr = spec.captured_stderr # NOQA 1360 env = builtins.__xonsh_env__ 1361 enc = env.get("XONSH_ENCODING") 1362 err = env.get("XONSH_ENCODING_ERRORS") 1363 if ON_WINDOWS: 1364 if self.p2cread != -1: 1365 self.p2cread = msvcrt.open_osfhandle(self.p2cread.Detach(), 0) 1366 if self.c2pwrite != -1: 1367 self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0) 1368 if self.errwrite != -1: 1369 self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0) 1370 # get stdin 1371 if self.stdin is None: 1372 sp_stdin = None 1373 elif self.p2cread != -1: 1374 sp_stdin = io.TextIOWrapper( 1375 io.open(self.p2cread, "rb", -1), encoding=enc, errors=err 1376 ) 1377 else: 1378 sp_stdin = sys.stdin 1379 # stdout 1380 if self.c2pwrite != -1: 1381 sp_stdout = io.TextIOWrapper( 1382 io.open(self.c2pwrite, "wb", -1), encoding=enc, errors=err 1383 ) 1384 else: 1385 sp_stdout = sys.stdout 1386 # stderr 1387 if self.errwrite == self.c2pwrite: 1388 sp_stderr = sp_stdout 1389 elif self.errwrite != -1: 1390 sp_stderr = io.TextIOWrapper( 1391 io.open(self.errwrite, "wb", -1), encoding=enc, errors=err 1392 ) 1393 else: 1394 sp_stderr = sys.stderr 1395 # run the function itself 1396 try: 1397 with STDOUT_DISPATCHER.register(sp_stdout), STDERR_DISPATCHER.register( 1398 sp_stderr 1399 ), redirect_stdout(STDOUT_DISPATCHER), redirect_stderr(STDERR_DISPATCHER): 1400 r = self.f(self.args, sp_stdin, sp_stdout, sp_stderr, spec, spec.stack) 1401 except SystemExit as e: 1402 r = e.code if isinstance(e.code, int) else int(bool(e.code)) 1403 except OSError as e: 1404 status = still_writable(self.c2pwrite) and still_writable(self.errwrite) 1405 if status: 1406 # stdout and stderr are still writable, so error must 1407 # come from function itself. 1408 print_exception() 1409 r = 1 1410 else: 1411 # stdout and stderr are no longer writable, so error must 1412 # come from the fact that the next process in the pipeline 1413 # has closed the other side of the pipe. The function then 1414 # attempted to write to this side of the pipe anyway. This 1415 # is not truly an error and we should exit gracefully. 1416 r = 0 1417 except Exception: 1418 print_exception() 1419 r = 1 1420 safe_flush(sp_stdout) 1421 safe_flush(sp_stderr) 1422 self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr) 1423 if not last_in_pipeline and not ON_WINDOWS: 1424 # mac requires us *not to* close the handles here while 1425 # windows requires us *to* close the handles here 1426 return 1427 # clean up 1428 # scopz: not sure why this is needed, but stdin cannot go here 1429 # and stdout & stderr must. 1430 handles = [self.stdout, self.stderr] 1431 for handle in handles: 1432 safe_fdclose(handle, cache=self._closed_handle_cache) 1433 1434 def _wait_and_getattr(self, name): 1435 """make sure the instance has a certain attr, and return it.""" 1436 while not hasattr(self, name): 1437 time.sleep(1e-7) 1438 return getattr(self, name) 1439 1440 def poll(self): 1441 """Check if the function has completed. 1442 1443 Returns 1444 ------- 1445 None if the function is still executing, and the returncode otherwise 1446 """ 1447 return self.returncode 1448 1449 def wait(self, timeout=None): 1450 """Waits for the process to finish and returns the return code.""" 1451 self.join() 1452 self._restore_sigint() 1453 return self.returncode 1454 1455 # 1456 # SIGINT handler 1457 # 1458 1459 def _signal_int(self, signum, frame): 1460 """Signal handler for SIGINT - Ctrl+C may have been pressed.""" 1461 # Check if we have already been interrupted. This should prevent 1462 # the possibility of infinite recursion. 1463 if self._interrupted: 1464 return 1465 self._interrupted = True 1466 # close file handles here to stop an processes piped to us. 1467 handles = ( 1468 self.p2cread, 1469 self.p2cwrite, 1470 self.c2pread, 1471 self.c2pwrite, 1472 self.errread, 1473 self.errwrite, 1474 ) 1475 for handle in handles: 1476 safe_fdclose(handle) 1477 if self.poll() is not None: 1478 self._restore_sigint(frame=frame) 1479 if on_main_thread(): 1480 signal.pthread_kill(threading.get_ident(), signal.SIGINT) 1481 1482 def _restore_sigint(self, frame=None): 1483 old = self.old_int_handler 1484 if old is not None: 1485 if on_main_thread(): 1486 signal.signal(signal.SIGINT, old) 1487 self.old_int_handler = None 1488 if frame is not None: 1489 if old is not None and old is not self._signal_int: 1490 old(signal.SIGINT, frame) 1491 if self._interrupted: 1492 self.returncode = 1 1493 1494 # The code below (_get_devnull, _get_handles, and _make_inheritable) comes 1495 # from subprocess.py in the Python 3.4.2 Standard Library 1496 def _get_devnull(self): 1497 if not hasattr(self, "_devnull"): 1498 self._devnull = os.open(os.devnull, os.O_RDWR) 1499 return self._devnull 1500 1501 if ON_WINDOWS: 1502 1503 def _make_inheritable(self, handle): 1504 """Return a duplicate of handle, which is inheritable""" 1505 h = _winapi.DuplicateHandle( 1506 _winapi.GetCurrentProcess(), 1507 handle, 1508 _winapi.GetCurrentProcess(), 1509 0, 1510 1, 1511 _winapi.DUPLICATE_SAME_ACCESS, 1512 ) 1513 return Handle(h) 1514 1515 def _get_handles(self, stdin, stdout, stderr): 1516 """Construct and return tuple with IO objects: 1517 p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite 1518 """ 1519 if stdin is None and stdout is None and stderr is None: 1520 return (-1, -1, -1, -1, -1, -1) 1521 1522 p2cread, p2cwrite = -1, -1 1523 c2pread, c2pwrite = -1, -1 1524 errread, errwrite = -1, -1 1525 1526 if stdin is None: 1527 p2cread = _winapi.GetStdHandle(_winapi.STD_INPUT_HANDLE) 1528 if p2cread is None: 1529 p2cread, _ = _winapi.CreatePipe(None, 0) 1530 p2cread = Handle(p2cread) 1531 _winapi.CloseHandle(_) 1532 elif stdin == subprocess.PIPE: 1533 p2cread, p2cwrite = Handle(p2cread), Handle(p2cwrite) 1534 elif stdin == subprocess.DEVNULL: 1535 p2cread = msvcrt.get_osfhandle(self._get_devnull()) 1536 elif isinstance(stdin, int): 1537 p2cread = msvcrt.get_osfhandle(stdin) 1538 else: 1539 # Assuming file-like object 1540 p2cread = msvcrt.get_osfhandle(stdin.fileno()) 1541 p2cread = self._make_inheritable(p2cread) 1542 1543 if stdout is None: 1544 c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE) 1545 if c2pwrite is None: 1546 _, c2pwrite = _winapi.CreatePipe(None, 0) 1547 c2pwrite = Handle(c2pwrite) 1548 _winapi.CloseHandle(_) 1549 elif stdout == subprocess.PIPE: 1550 c2pread, c2pwrite = _winapi.CreatePipe(None, 0) 1551 c2pread, c2pwrite = Handle(c2pread), Handle(c2pwrite) 1552 elif stdout == subprocess.DEVNULL: 1553 c2pwrite = msvcrt.get_osfhandle(self._get_devnull()) 1554 elif isinstance(stdout, int): 1555 c2pwrite = msvcrt.get_osfhandle(stdout) 1556 else: 1557 # Assuming file-like object 1558 c2pwrite = msvcrt.get_osfhandle(stdout.fileno()) 1559 c2pwrite = self._make_inheritable(c2pwrite) 1560 1561 if stderr is None: 1562 errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE) 1563 if errwrite is None: 1564 _, errwrite = _winapi.CreatePipe(None, 0) 1565 errwrite = Handle(errwrite) 1566 _winapi.CloseHandle(_) 1567 elif stderr == subprocess.PIPE: 1568 errread, errwrite = _winapi.CreatePipe(None, 0) 1569 errread, errwrite = Handle(errread), Handle(errwrite) 1570 elif stderr == subprocess.STDOUT: 1571 errwrite = c2pwrite 1572 elif stderr == subprocess.DEVNULL: 1573 errwrite = msvcrt.get_osfhandle(self._get_devnull()) 1574 elif isinstance(stderr, int): 1575 errwrite = msvcrt.get_osfhandle(stderr) 1576 else: 1577 # Assuming file-like object 1578 errwrite = msvcrt.get_osfhandle(stderr.fileno()) 1579 errwrite = self._make_inheritable(errwrite) 1580 1581 return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite) 1582 1583 else: 1584 # POSIX versions 1585 def _get_handles(self, stdin, stdout, stderr): 1586 """Construct and return tuple with IO objects: 1587 p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite 1588 """ 1589 p2cread, p2cwrite = -1, -1 1590 c2pread, c2pwrite = -1, -1 1591 errread, errwrite = -1, -1 1592 1593 if stdin is None: 1594 pass 1595 elif stdin == subprocess.PIPE: 1596 p2cread, p2cwrite = os.pipe() 1597 elif stdin == subprocess.DEVNULL: 1598 p2cread = self._get_devnull() 1599 elif isinstance(stdin, int): 1600 p2cread = stdin 1601 else: 1602 # Assuming file-like object 1603 p2cread = stdin.fileno() 1604 1605 if stdout is None: 1606 pass 1607 elif stdout == subprocess.PIPE: 1608 c2pread, c2pwrite = os.pipe() 1609 elif stdout == subprocess.DEVNULL: 1610 c2pwrite = self._get_devnull() 1611 elif isinstance(stdout, int): 1612 c2pwrite = stdout 1613 else: 1614 # Assuming file-like object 1615 c2pwrite = stdout.fileno() 1616 1617 if stderr is None: 1618 pass 1619 elif stderr == subprocess.PIPE: 1620 errread, errwrite = os.pipe() 1621 elif stderr == subprocess.STDOUT: 1622 errwrite = c2pwrite 1623 elif stderr == subprocess.DEVNULL: 1624 errwrite = self._get_devnull() 1625 elif isinstance(stderr, int): 1626 errwrite = stderr 1627 else: 1628 # Assuming file-like object 1629 errwrite = stderr.fileno() 1630 1631 return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite) 1632 1633 1634# 1635# Foreground Thread Process Proxies 1636# 1637 1638 1639class ProcProxy(object): 1640 """This is process proxy class that runs its alias functions on the 1641 same thread that it was called from, which is typically the main thread. 1642 This prevents the process from running on a background thread, but enables 1643 debugger and profiler tools (functions) be run on the same thread that they 1644 are attempting to debug. 1645 """ 1646 1647 def __init__( 1648 self, 1649 f, 1650 args, 1651 stdin=None, 1652 stdout=None, 1653 stderr=None, 1654 universal_newlines=False, 1655 env=None, 1656 ): 1657 self.orig_f = f 1658 self.f = partial_proxy(f) 1659 self.args = args 1660 self.pid = os.getpid() 1661 self.returncode = None 1662 self.stdin = stdin 1663 self.stdout = stdout 1664 self.stderr = stderr 1665 self.universal_newlines = universal_newlines 1666 self.env = env 1667 1668 def poll(self): 1669 """Check if the function has completed via the returncode or None. 1670 """ 1671 return self.returncode 1672 1673 def wait(self, timeout=None): 1674 """Runs the function and returns the result. Timeout argument only 1675 present for API compatibility. 1676 """ 1677 if self.f is None: 1678 return 0 1679 env = builtins.__xonsh_env__ 1680 enc = env.get("XONSH_ENCODING") 1681 err = env.get("XONSH_ENCODING_ERRORS") 1682 spec = self._wait_and_getattr("spec") 1683 # set file handles 1684 if self.stdin is None: 1685 stdin = None 1686 else: 1687 if isinstance(self.stdin, int): 1688 inbuf = io.open(self.stdin, "rb", -1) 1689 else: 1690 inbuf = self.stdin 1691 stdin = io.TextIOWrapper(inbuf, encoding=enc, errors=err) 1692 stdout = self._pick_buf(self.stdout, sys.stdout, enc, err) 1693 stderr = self._pick_buf(self.stderr, sys.stderr, enc, err) 1694 # run the actual function 1695 try: 1696 r = self.f(self.args, stdin, stdout, stderr, spec, spec.stack) 1697 except Exception: 1698 print_exception() 1699 r = 1 1700 self.returncode = parse_proxy_return(r, stdout, stderr) 1701 safe_flush(stdout) 1702 safe_flush(stderr) 1703 return self.returncode 1704 1705 @staticmethod 1706 def _pick_buf(handle, sysbuf, enc, err): 1707 if handle is None or handle is sysbuf: 1708 buf = sysbuf 1709 elif isinstance(handle, int): 1710 if handle < 3: 1711 buf = sysbuf 1712 else: 1713 buf = io.TextIOWrapper( 1714 io.open(handle, "wb", -1), encoding=enc, errors=err 1715 ) 1716 elif hasattr(handle, "encoding"): 1717 # must be a text stream, no need to wrap. 1718 buf = handle 1719 else: 1720 # must be a binary stream, should wrap it. 1721 buf = io.TextIOWrapper(handle, encoding=enc, errors=err) 1722 return buf 1723 1724 def _wait_and_getattr(self, name): 1725 """make sure the instance has a certain attr, and return it.""" 1726 while not hasattr(self, name): 1727 time.sleep(1e-7) 1728 return getattr(self, name) 1729 1730 1731@lazyobject 1732def SIGNAL_MESSAGES(): 1733 sm = { 1734 signal.SIGABRT: "Aborted", 1735 signal.SIGFPE: "Floating point exception", 1736 signal.SIGILL: "Illegal instructions", 1737 signal.SIGTERM: "Terminated", 1738 signal.SIGSEGV: "Segmentation fault", 1739 } 1740 if ON_POSIX: 1741 sm.update( 1742 {signal.SIGQUIT: "Quit", signal.SIGHUP: "Hangup", signal.SIGKILL: "Killed"} 1743 ) 1744 return sm 1745 1746 1747def safe_readlines(handle, hint=-1): 1748 """Attempts to read lines without throwing an error.""" 1749 try: 1750 lines = handle.readlines(hint) 1751 except OSError: 1752 lines = [] 1753 return lines 1754 1755 1756def safe_readable(handle): 1757 """Attempts to find if the handle is readable without throwing an error.""" 1758 try: 1759 status = handle.readable() 1760 except (OSError, ValueError): 1761 status = False 1762 return status 1763 1764 1765def update_fg_process_group(pipeline_group, background): 1766 if background: 1767 return False 1768 if not ON_POSIX: 1769 return False 1770 env = builtins.__xonsh_env__ 1771 if not env.get("XONSH_INTERACTIVE"): 1772 return False 1773 return give_terminal_to(pipeline_group) 1774 1775 1776class CommandPipeline: 1777 """Represents a subprocess-mode command pipeline.""" 1778 1779 attrnames = ( 1780 "stdin", 1781 "stdout", 1782 "stderr", 1783 "pid", 1784 "returncode", 1785 "args", 1786 "alias", 1787 "stdin_redirect", 1788 "stdout_redirect", 1789 "stderr_redirect", 1790 "timestamps", 1791 "executed_cmd", 1792 "input", 1793 "output", 1794 "errors", 1795 ) 1796 1797 nonblocking = (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader) 1798 1799 def __init__(self, specs): 1800 """ 1801 Parameters 1802 ---------- 1803 specs : list of SubprocSpec 1804 Process specifications 1805 1806 Attributes 1807 ---------- 1808 spec : SubprocSpec 1809 The last specification in specs 1810 proc : Popen-like 1811 The process in procs 1812 ended : bool 1813 Boolean for if the command has stopped executing. 1814 input : str 1815 A string of the standard input. 1816 output : str 1817 A string of the standard output. 1818 errors : str 1819 A string of the standard error. 1820 lines : list of str 1821 The output lines 1822 starttime : floats or None 1823 Pipeline start timestamp. 1824 """ 1825 self.starttime = None 1826 self.ended = False 1827 self.procs = [] 1828 self.specs = specs 1829 self.spec = specs[-1] 1830 self.captured = specs[-1].captured 1831 self.input = self._output = self.errors = self.endtime = None 1832 self._closed_handle_cache = {} 1833 self.lines = [] 1834 self._stderr_prefix = self._stderr_postfix = None 1835 self.term_pgid = None 1836 1837 background = self.spec.background 1838 pipeline_group = None 1839 for spec in specs: 1840 if self.starttime is None: 1841 self.starttime = time.time() 1842 try: 1843 proc = spec.run(pipeline_group=pipeline_group) 1844 except Exception: 1845 print_exception() 1846 self._return_terminal() 1847 self.proc = None 1848 return 1849 if ( 1850 proc.pid 1851 and pipeline_group is None 1852 and not spec.is_proxy 1853 and self.captured != "object" 1854 ): 1855 pipeline_group = proc.pid 1856 if update_fg_process_group(pipeline_group, background): 1857 self.term_pgid = pipeline_group 1858 self.procs.append(proc) 1859 self.proc = self.procs[-1] 1860 1861 def __repr__(self): 1862 s = self.__class__.__name__ + "(" 1863 s += ", ".join(a + "=" + str(getattr(self, a)) for a in self.attrnames) 1864 s += ")" 1865 return s 1866 1867 def __bool__(self): 1868 return self.returncode == 0 1869 1870 def __len__(self): 1871 return len(self.procs) 1872 1873 def __iter__(self): 1874 """Iterates through stdout and returns the lines, converting to 1875 strings and universal newlines if needed. 1876 """ 1877 if self.ended: 1878 yield from iter(self.lines) 1879 else: 1880 yield from self.tee_stdout() 1881 1882 def iterraw(self): 1883 """Iterates through the last stdout, and returns the lines 1884 exactly as found. 1885 """ 1886 # get appropriate handles 1887 spec = self.spec 1888 proc = self.proc 1889 if proc is None: 1890 return 1891 timeout = builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY") 1892 # get the correct stdout 1893 stdout = proc.stdout 1894 if ( 1895 stdout is None or spec.stdout is None or not safe_readable(stdout) 1896 ) and spec.captured_stdout is not None: 1897 stdout = spec.captured_stdout 1898 if hasattr(stdout, "buffer"): 1899 stdout = stdout.buffer 1900 if stdout is not None and not isinstance(stdout, self.nonblocking): 1901 stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout) 1902 if ( 1903 not stdout 1904 or self.captured == "stdout" 1905 or not safe_readable(stdout) 1906 or not spec.threadable 1907 ): 1908 # we get here if the process is not threadable or the 1909 # class is the real Popen 1910 PrevProcCloser(pipeline=self) 1911 task = wait_for_active_job() 1912 if task is None or task["status"] != "stopped": 1913 proc.wait() 1914 self._endtime() 1915 if self.captured == "object": 1916 self.end(tee_output=False) 1917 elif self.captured == "hiddenobject" and stdout: 1918 b = stdout.read() 1919 lines = b.splitlines(keepends=True) 1920 yield from lines 1921 self.end(tee_output=False) 1922 elif self.captured == "stdout": 1923 b = stdout.read() 1924 s = self._decode_uninew(b, universal_newlines=True) 1925 self.lines = s.splitlines(keepends=True) 1926 return 1927 # get the correct stderr 1928 stderr = proc.stderr 1929 if ( 1930 stderr is None or spec.stderr is None or not safe_readable(stderr) 1931 ) and spec.captured_stderr is not None: 1932 stderr = spec.captured_stderr 1933 if hasattr(stderr, "buffer"): 1934 stderr = stderr.buffer 1935 if stderr is not None and not isinstance(stderr, self.nonblocking): 1936 stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout) 1937 # read from process while it is running 1938 check_prev_done = len(self.procs) == 1 1939 prev_end_time = None 1940 i = j = cnt = 1 1941 while proc.poll() is None: 1942 if getattr(proc, "suspended", False): 1943 return 1944 elif getattr(proc, "in_alt_mode", False): 1945 time.sleep(0.1) # probably not leaving any time soon 1946 continue 1947 elif not check_prev_done: 1948 # In the case of pipelines with more than one command 1949 # we should give the commands a little time 1950 # to start up fully. This is particularly true for 1951 # GNU Parallel, which has a long startup time. 1952 pass 1953 elif self._prev_procs_done(): 1954 self._close_prev_procs() 1955 proc.prevs_are_closed = True 1956 break 1957 stdout_lines = safe_readlines(stdout, 1024) 1958 i = len(stdout_lines) 1959 if i != 0: 1960 yield from stdout_lines 1961 stderr_lines = safe_readlines(stderr, 1024) 1962 j = len(stderr_lines) 1963 if j != 0: 1964 self.stream_stderr(stderr_lines) 1965 if not check_prev_done: 1966 # if we are piping... 1967 if stdout_lines or stderr_lines: 1968 # see if we have some output. 1969 check_prev_done = True 1970 elif prev_end_time is None: 1971 # or see if we already know that the next-to-last 1972 # proc in the pipeline has ended. 1973 if self._prev_procs_done(): 1974 # if it has, record the time 1975 prev_end_time = time.time() 1976 elif time.time() - prev_end_time >= 0.1: 1977 # if we still don't have any output, even though the 1978 # next-to-last proc has finished, wait a bit to make 1979 # sure we have fully started up, etc. 1980 check_prev_done = True 1981 # this is for CPU usage 1982 if i + j == 0: 1983 cnt = min(cnt + 1, 1000) 1984 else: 1985 cnt = 1 1986 time.sleep(timeout * cnt) 1987 # read from process now that it is over 1988 yield from safe_readlines(stdout) 1989 self.stream_stderr(safe_readlines(stderr)) 1990 proc.wait() 1991 self._endtime() 1992 yield from safe_readlines(stdout) 1993 self.stream_stderr(safe_readlines(stderr)) 1994 if self.captured == "object": 1995 self.end(tee_output=False) 1996 1997 def itercheck(self): 1998 """Iterates through the command lines and throws an error if the 1999 returncode is non-zero. 2000 """ 2001 yield from self 2002 if self.returncode: 2003 # I included self, as providing access to stderr and other details 2004 # useful when instance isn't assigned to a variable in the shell. 2005 raise XonshCalledProcessError( 2006 self.returncode, self.executed_cmd, self.stdout, self.stderr, self 2007 ) 2008 2009 def tee_stdout(self): 2010 """Writes the process stdout to the output variable, line-by-line, and 2011 yields each line. This may optionally accept lines (in bytes) to iterate 2012 over, in which case it does not call iterraw(). 2013 """ 2014 env = builtins.__xonsh_env__ 2015 enc = env.get("XONSH_ENCODING") 2016 err = env.get("XONSH_ENCODING_ERRORS") 2017 lines = self.lines 2018 stream = self.captured not in STDOUT_CAPTURE_KINDS 2019 if stream and not self.spec.stdout: 2020 stream = False 2021 stdout_has_buffer = hasattr(sys.stdout, "buffer") 2022 nl = b"\n" 2023 cr = b"\r" 2024 crnl = b"\r\n" 2025 for line in self.iterraw(): 2026 # write to stdout line ASAP, if needed 2027 if stream: 2028 if stdout_has_buffer: 2029 sys.stdout.buffer.write(line) 2030 else: 2031 sys.stdout.write(line.decode(encoding=enc, errors=err)) 2032 sys.stdout.flush() 2033 # do some munging of the line before we return it 2034 if line.endswith(crnl): 2035 line = line[:-2] + nl 2036 elif line.endswith(cr): 2037 line = line[:-1] + nl 2038 line = RE_HIDE_ESCAPE.sub(b"", line) 2039 line = line.decode(encoding=enc, errors=err) 2040 # tee it up! 2041 lines.append(line) 2042 yield line 2043 2044 def stream_stderr(self, lines): 2045 """Streams lines to sys.stderr and the errors attribute.""" 2046 if not lines: 2047 return 2048 env = builtins.__xonsh_env__ 2049 enc = env.get("XONSH_ENCODING") 2050 err = env.get("XONSH_ENCODING_ERRORS") 2051 b = b"".join(lines) 2052 if self.stderr_prefix: 2053 b = self.stderr_prefix + b 2054 if self.stderr_postfix: 2055 b += self.stderr_postfix 2056 stderr_has_buffer = hasattr(sys.stderr, "buffer") 2057 # write bytes to std stream 2058 if stderr_has_buffer: 2059 sys.stderr.buffer.write(b) 2060 else: 2061 sys.stderr.write(b.decode(encoding=enc, errors=err)) 2062 sys.stderr.flush() 2063 # do some munging of the line before we save it to the attr 2064 b = b.replace(b"\r\n", b"\n").replace(b"\r", b"\n") 2065 b = RE_HIDE_ESCAPE.sub(b"", b) 2066 env = builtins.__xonsh_env__ 2067 s = b.decode( 2068 encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS") 2069 ) 2070 # set the errors 2071 if self.errors is None: 2072 self.errors = s 2073 else: 2074 self.errors += s 2075 2076 def _decode_uninew(self, b, universal_newlines=None): 2077 """Decode bytes into a str and apply universal newlines as needed.""" 2078 if not b: 2079 return "" 2080 if isinstance(b, bytes): 2081 env = builtins.__xonsh_env__ 2082 s = b.decode( 2083 encoding=env.get("XONSH_ENCODING"), 2084 errors=env.get("XONSH_ENCODING_ERRORS"), 2085 ) 2086 else: 2087 s = b 2088 if universal_newlines or self.spec.universal_newlines: 2089 s = s.replace("\r\n", "\n").replace("\r", "\n") 2090 return s 2091 2092 # 2093 # Ending methods 2094 # 2095 2096 def end(self, tee_output=True): 2097 """ 2098 End the pipeline, return the controlling terminal if needed. 2099 2100 Main things done in self._end(). 2101 """ 2102 if self.ended: 2103 return 2104 self._end(tee_output=tee_output) 2105 self._return_terminal() 2106 2107 def _end(self, tee_output): 2108 """Waits for the command to complete and then runs any closing and 2109 cleanup procedures that need to be run. 2110 """ 2111 if tee_output: 2112 for _ in self.tee_stdout(): 2113 pass 2114 self._endtime() 2115 # since we are driven by getting output, input may not be available 2116 # until the command has completed. 2117 self._set_input() 2118 self._close_prev_procs() 2119 self._close_proc() 2120 self._check_signal() 2121 self._apply_to_history() 2122 self.ended = True 2123 self._raise_subproc_error() 2124 2125 def _return_terminal(self): 2126 if ON_WINDOWS or not ON_POSIX: 2127 return 2128 pgid = os.getpgid(0) 2129 if self.term_pgid is None or pgid == self.term_pgid: 2130 return 2131 if give_terminal_to(pgid): # if gave term succeed 2132 self.term_pgid = pgid 2133 if hasattr(builtins, "__xonsh_shell__"): 2134 # restoring sanity could probably be called whenever we return 2135 # control to the shell. But it only seems to matter after a 2136 # ^Z event. This *has* to be called after we give the terminal 2137 # back to the shell. 2138 builtins.__xonsh_shell__.shell.restore_tty_sanity() 2139 2140 def resume(self, job, tee_output=True): 2141 self.ended = False 2142 if give_terminal_to(job["pgrp"]): 2143 self.term_pgid = job["pgrp"] 2144 _continue(job) 2145 self.end(tee_output=tee_output) 2146 2147 def _endtime(self): 2148 """Sets the closing timestamp if it hasn't been already.""" 2149 if self.endtime is None: 2150 self.endtime = time.time() 2151 2152 def _safe_close(self, handle): 2153 safe_fdclose(handle, cache=self._closed_handle_cache) 2154 2155 def _prev_procs_done(self): 2156 """Boolean for if all previous processes have completed. If there 2157 is only a single process in the pipeline, this returns False. 2158 """ 2159 any_running = False 2160 for s, p in zip(self.specs[:-1], self.procs[:-1]): 2161 if p.poll() is None: 2162 any_running = True 2163 continue 2164 self._safe_close(s.stdin) 2165 self._safe_close(s.stdout) 2166 self._safe_close(s.stderr) 2167 if p is None: 2168 continue 2169 self._safe_close(p.stdin) 2170 self._safe_close(p.stdout) 2171 self._safe_close(p.stderr) 2172 return False if any_running else (len(self) > 1) 2173 2174 def _close_prev_procs(self): 2175 """Closes all but the last proc's stdout.""" 2176 for s, p in zip(self.specs[:-1], self.procs[:-1]): 2177 self._safe_close(s.stdin) 2178 self._safe_close(s.stdout) 2179 self._safe_close(s.stderr) 2180 if p is None: 2181 continue 2182 self._safe_close(p.stdin) 2183 self._safe_close(p.stdout) 2184 self._safe_close(p.stderr) 2185 2186 def _close_proc(self): 2187 """Closes last proc's stdout.""" 2188 s = self.spec 2189 p = self.proc 2190 self._safe_close(s.stdin) 2191 self._safe_close(s.stdout) 2192 self._safe_close(s.stderr) 2193 self._safe_close(s.captured_stdout) 2194 self._safe_close(s.captured_stderr) 2195 if p is None: 2196 return 2197 self._safe_close(p.stdin) 2198 self._safe_close(p.stdout) 2199 self._safe_close(p.stderr) 2200 2201 def _set_input(self): 2202 """Sets the input variable.""" 2203 if self.proc is None: 2204 return 2205 stdin = self.proc.stdin 2206 if ( 2207 stdin is None 2208 or isinstance(stdin, int) 2209 or stdin.closed 2210 or not stdin.seekable() 2211 or not safe_readable(stdin) 2212 ): 2213 input = b"" 2214 else: 2215 stdin.seek(0) 2216 input = stdin.read() 2217 self.input = self._decode_uninew(input) 2218 2219 def _check_signal(self): 2220 """Checks if a signal was received and issues a message.""" 2221 proc_signal = getattr(self.proc, "signal", None) 2222 if proc_signal is None: 2223 return 2224 sig, core = proc_signal 2225 sig_str = SIGNAL_MESSAGES.get(sig) 2226 if sig_str: 2227 if core: 2228 sig_str += " (core dumped)" 2229 print(sig_str, file=sys.stderr) 2230 if self.errors is not None: 2231 self.errors += sig_str + "\n" 2232 2233 def _apply_to_history(self): 2234 """Applies the results to the current history object.""" 2235 hist = builtins.__xonsh_history__ 2236 if hist is not None: 2237 hist.last_cmd_rtn = 1 if self.proc is None else self.proc.returncode 2238 2239 def _raise_subproc_error(self): 2240 """Raises a subprocess error, if we are supposed to.""" 2241 spec = self.spec 2242 rtn = self.returncode 2243 if ( 2244 not spec.is_proxy 2245 and rtn is not None 2246 and rtn > 0 2247 and builtins.__xonsh_env__.get("RAISE_SUBPROC_ERROR") 2248 ): 2249 try: 2250 raise subprocess.CalledProcessError(rtn, spec.cmd, output=self.output) 2251 finally: 2252 # this is need to get a working terminal in interactive mode 2253 self._return_terminal() 2254 2255 # 2256 # Properties 2257 # 2258 2259 @property 2260 def stdin(self): 2261 """Process stdin.""" 2262 return self.proc.stdin 2263 2264 @property 2265 def stdout(self): 2266 """Process stdout.""" 2267 return self.proc.stdout 2268 2269 @property 2270 def stderr(self): 2271 """Process stderr.""" 2272 return self.proc.stderr 2273 2274 @property 2275 def inp(self): 2276 """Creates normalized input string from args.""" 2277 return " ".join(self.args) 2278 2279 @property 2280 def output(self): 2281 """Non-blocking, lazy access to output""" 2282 if self.ended: 2283 if self._output is None: 2284 self._output = "".join(self.lines) 2285 return self._output 2286 else: 2287 return "".join(self.lines) 2288 2289 @property 2290 def out(self): 2291 """Output value as a str.""" 2292 self.end() 2293 return self.output 2294 2295 @property 2296 def err(self): 2297 """Error messages as a string.""" 2298 self.end() 2299 return self.errors 2300 2301 @property 2302 def pid(self): 2303 """Process identifier.""" 2304 return self.proc.pid 2305 2306 @property 2307 def returncode(self): 2308 """Process return code, waits until command is completed.""" 2309 self.end() 2310 if self.proc is None: 2311 return 1 2312 return self.proc.returncode 2313 2314 rtn = returncode 2315 2316 @property 2317 def args(self): 2318 """Arguments to the process.""" 2319 return self.spec.args 2320 2321 @property 2322 def rtn(self): 2323 """Alias to return code.""" 2324 return self.returncode 2325 2326 @property 2327 def alias(self): 2328 """Alias the process used.""" 2329 return self.spec.alias 2330 2331 @property 2332 def stdin_redirect(self): 2333 """Redirection used for stdin.""" 2334 stdin = self.spec.stdin 2335 name = getattr(stdin, "name", "<stdin>") 2336 mode = getattr(stdin, "mode", "r") 2337 return [name, mode] 2338 2339 @property 2340 def stdout_redirect(self): 2341 """Redirection used for stdout.""" 2342 stdout = self.spec.stdout 2343 name = getattr(stdout, "name", "<stdout>") 2344 mode = getattr(stdout, "mode", "a") 2345 return [name, mode] 2346 2347 @property 2348 def stderr_redirect(self): 2349 """Redirection used for stderr.""" 2350 stderr = self.spec.stderr 2351 name = getattr(stderr, "name", "<stderr>") 2352 mode = getattr(stderr, "mode", "r") 2353 return [name, mode] 2354 2355 @property 2356 def timestamps(self): 2357 """The start and end time stamps.""" 2358 return [self.starttime, self.endtime] 2359 2360 @property 2361 def executed_cmd(self): 2362 """The resolve and executed command.""" 2363 return self.spec.cmd 2364 2365 @property 2366 def stderr_prefix(self): 2367 """Prefix to print in front of stderr, as bytes.""" 2368 p = self._stderr_prefix 2369 if p is None: 2370 env = builtins.__xonsh_env__ 2371 t = env.get("XONSH_STDERR_PREFIX") 2372 s = format_std_prepost(t, env=env) 2373 p = s.encode( 2374 encoding=env.get("XONSH_ENCODING"), 2375 errors=env.get("XONSH_ENCODING_ERRORS"), 2376 ) 2377 self._stderr_prefix = p 2378 return p 2379 2380 @property 2381 def stderr_postfix(self): 2382 """Postfix to print after stderr, as bytes.""" 2383 p = self._stderr_postfix 2384 if p is None: 2385 env = builtins.__xonsh_env__ 2386 t = env.get("XONSH_STDERR_POSTFIX") 2387 s = format_std_prepost(t, env=env) 2388 p = s.encode( 2389 encoding=env.get("XONSH_ENCODING"), 2390 errors=env.get("XONSH_ENCODING_ERRORS"), 2391 ) 2392 self._stderr_postfix = p 2393 return p 2394 2395 2396class HiddenCommandPipeline(CommandPipeline): 2397 def __repr__(self): 2398 return "" 2399 2400 2401def pause_call_resume(p, f, *args, **kwargs): 2402 """For a process p, this will call a function f with the remaining args and 2403 and kwargs. If the process cannot accept signals, the function will be called. 2404 2405 Parameters 2406 ---------- 2407 p : Popen object or similar 2408 f : callable 2409 args : remaining arguments 2410 kwargs : keyword arguments 2411 """ 2412 can_send_signal = ( 2413 hasattr(p, "send_signal") and ON_POSIX and not ON_MSYS and not ON_CYGWIN 2414 ) 2415 if can_send_signal: 2416 p.send_signal(signal.SIGSTOP) 2417 try: 2418 f(*args, **kwargs) 2419 except Exception: 2420 pass 2421 if can_send_signal: 2422 p.send_signal(signal.SIGCONT) 2423 2424 2425class PrevProcCloser(threading.Thread): 2426 """Previous process closer thread for pipelines whose last command 2427 is itself unthreadable. This makes sure that the pipeline is 2428 driven forward and does not deadlock. 2429 """ 2430 2431 def __init__(self, pipeline): 2432 """ 2433 Parameters 2434 ---------- 2435 pipeline : CommandPipeline 2436 The pipeline whose prev procs we should close. 2437 """ 2438 self.pipeline = pipeline 2439 super().__init__() 2440 self.daemon = True 2441 self.start() 2442 2443 def run(self): 2444 """Runs the closing algorithm.""" 2445 pipeline = self.pipeline 2446 check_prev_done = len(pipeline.procs) == 1 2447 if check_prev_done: 2448 return 2449 proc = pipeline.proc 2450 prev_end_time = None 2451 timeout = builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY") 2452 sleeptime = min(timeout * 1000, 0.1) 2453 while proc.poll() is None: 2454 if not check_prev_done: 2455 # In the case of pipelines with more than one command 2456 # we should give the commands a little time 2457 # to start up fully. This is particularly true for 2458 # GNU Parallel, which has a long startup time. 2459 pass 2460 elif pipeline._prev_procs_done(): 2461 pipeline._close_prev_procs() 2462 proc.prevs_are_closed = True 2463 break 2464 if not check_prev_done: 2465 # if we are piping... 2466 if prev_end_time is None: 2467 # or see if we already know that the next-to-last 2468 # proc in the pipeline has ended. 2469 if pipeline._prev_procs_done(): 2470 # if it has, record the time 2471 prev_end_time = time.time() 2472 elif time.time() - prev_end_time >= 0.1: 2473 # if we still don't have any output, even though the 2474 # next-to-last proc has finished, wait a bit to make 2475 # sure we have fully started up, etc. 2476 check_prev_done = True 2477 # this is for CPU usage 2478 time.sleep(sleeptime) 2479