1# -*- coding: utf-8 -*-
2"""Interface for running Python functions as subprocess-mode commands.
3
4Code for several helper methods in the `ProcProxy` class have been reproduced
5without modification from `subprocess.py` in the Python 3.4.2 standard library.
6The contents of `subprocess.py` (and, thus, the reproduced methods) are
7Copyright (c) 2003-2005 by Peter Astrand <astrand@lysator.liu.se> and were
8licensed to the Python Software foundation under a Contributor Agreement.
9"""
10import io
11import os
12import re
13import sys
14import time
15import queue
16import array
17import ctypes
18import signal
19import inspect
20import builtins
21import functools
22import threading
23import subprocess
24import collections.abc as cabc
25
26from xonsh.platform import (
27    ON_WINDOWS,
28    ON_POSIX,
29    ON_MSYS,
30    ON_CYGWIN,
31    CAN_RESIZE_WINDOW,
32    LFLAG,
33    CC,
34)
35from xonsh.tools import (
36    redirect_stdout,
37    redirect_stderr,
38    print_exception,
39    XonshCalledProcessError,
40    findfirst,
41    on_main_thread,
42    XonshError,
43    format_std_prepost,
44)
45from xonsh.lazyasd import lazyobject, LazyObject
46from xonsh.jobs import wait_for_active_job, give_terminal_to, _continue
47from xonsh.lazyimps import fcntl, termios, _winapi, msvcrt, winutils
48
49# these decorators are imported for users back-compatible
50from xonsh.tools import unthreadable, uncapturable  # NOQA
51
52# foreground has be deprecated
53foreground = unthreadable
54
55
56@lazyobject
57def STDOUT_CAPTURE_KINDS():
58    return frozenset(["stdout", "object"])
59
60
61# The following escape codes are xterm codes.
62# See http://rtfm.etla.org/xterm/ctlseq.html for more.
63MODE_NUMS = ("1049", "47", "1047")
64START_ALTERNATE_MODE = LazyObject(
65    lambda: frozenset("\x1b[?{0}h".format(i).encode() for i in MODE_NUMS),
66    globals(),
67    "START_ALTERNATE_MODE",
68)
69END_ALTERNATE_MODE = LazyObject(
70    lambda: frozenset("\x1b[?{0}l".format(i).encode() for i in MODE_NUMS),
71    globals(),
72    "END_ALTERNATE_MODE",
73)
74ALTERNATE_MODE_FLAGS = LazyObject(
75    lambda: tuple(START_ALTERNATE_MODE) + tuple(END_ALTERNATE_MODE),
76    globals(),
77    "ALTERNATE_MODE_FLAGS",
78)
79RE_HIDDEN_BYTES = LazyObject(
80    lambda: re.compile(b"(\001.*?\002)"), globals(), "RE_HIDDEN"
81)
82
83
84@lazyobject
85def RE_VT100_ESCAPE():
86    return re.compile(b"(\x9B|\x1B\[)[0-?]*[ -\/]*[@-~]")
87
88
89@lazyobject
90def RE_HIDE_ESCAPE():
91    return re.compile(
92        b"(" + RE_HIDDEN_BYTES.pattern + b"|" + RE_VT100_ESCAPE.pattern + b")"
93    )
94
95
96class QueueReader:
97    """Provides a file-like interface to reading from a queue."""
98
99    def __init__(self, fd, timeout=None):
100        """
101        Parameters
102        ----------
103        fd : int
104            A file descriptor
105        timeout : float or None, optional
106            The queue reading timeout.
107        """
108        self.fd = fd
109        self.timeout = timeout
110        self.closed = False
111        self.queue = queue.Queue()
112        self.thread = None
113
114    def close(self):
115        """close the reader"""
116        self.closed = True
117
118    def is_fully_read(self):
119        """Returns whether or not the queue is fully read and the reader is
120        closed.
121        """
122        return (
123            self.closed
124            and (self.thread is None or not self.thread.is_alive())
125            and self.queue.empty()
126        )
127
128    def read_queue(self):
129        """Reads a single chunk from the queue. This is blocking if
130        the timeout is None and non-blocking otherwise.
131        """
132        try:
133            return self.queue.get(block=True, timeout=self.timeout)
134        except queue.Empty:
135            return b""
136
137    def read(self, size=-1):
138        """Reads bytes from the file."""
139        i = 0
140        buf = b""
141        while size < 0 or i != size:
142            line = self.read_queue()
143            if line:
144                buf += line
145            else:
146                break
147            i += len(line)
148        return buf
149
150    def readline(self, size=-1):
151        """Reads a line, or a partial line from the file descriptor."""
152        i = 0
153        nl = b"\n"
154        buf = b""
155        while size < 0 or i != size:
156            line = self.read_queue()
157            if line:
158                buf += line
159                if line.endswith(nl):
160                    break
161            else:
162                break
163            i += len(line)
164        return buf
165
166    def _read_all_lines(self):
167        """This reads all remaining lines in a blocking fashion."""
168        lines = []
169        while not self.is_fully_read():
170            chunk = self.read_queue()
171            lines.extend(chunk.splitlines(keepends=True))
172        return lines
173
174    def readlines(self, hint=-1):
175        """Reads lines from the file descriptor. This is blocking for negative
176        hints (i.e. read all the remaining lines) and non-blocking otherwise.
177        """
178        if hint == -1:
179            return self._read_all_lines()
180        lines = []
181        while len(lines) != hint:
182            chunk = self.read_queue()
183            if not chunk:
184                break
185            lines.extend(chunk.splitlines(keepends=True))
186        return lines
187
188    def fileno(self):
189        """Returns the file descriptor number."""
190        return self.fd
191
192    @staticmethod
193    def readable():
194        """Returns true, because this object is always readable."""
195        return True
196
197    def iterqueue(self):
198        """Iterates through all remaining chunks in a blocking fashion."""
199        while not self.is_fully_read():
200            chunk = self.read_queue()
201            if not chunk:
202                continue
203            yield chunk
204
205
206def populate_fd_queue(reader, fd, queue):
207    """Reads 1 kb of data from a file descriptor into a queue.
208    If this ends or fails, it flags the calling reader object as closed.
209    """
210    while True:
211        try:
212            c = os.read(fd, 1024)
213        except OSError:
214            reader.closed = True
215            break
216        if c:
217            queue.put(c)
218        else:
219            reader.closed = True
220            break
221
222
223class NonBlockingFDReader(QueueReader):
224    """A class for reading characters from a file descriptor on a background
225    thread. This has the advantages that the calling thread can close the
226    file and that the reading does not block the calling thread.
227    """
228
229    def __init__(self, fd, timeout=None):
230        """
231        Parameters
232        ----------
233        fd : int
234            A file descriptor
235        timeout : float or None, optional
236            The queue reading timeout.
237        """
238        super().__init__(fd, timeout=timeout)
239        # start reading from stream
240        self.thread = threading.Thread(
241            target=populate_fd_queue, args=(self, self.fd, self.queue)
242        )
243        self.thread.daemon = True
244        self.thread.start()
245
246
247def populate_buffer(reader, fd, buffer, chunksize):
248    """Reads bytes from the file descriptor and copies them into a buffer.
249
250    The reads happen in parallel using the pread() syscall; which is only
251    available on POSIX systems. If the read fails for any reason, the reader is
252    flagged as closed.
253    """
254    offset = 0
255    while True:
256        try:
257            buf = os.pread(fd, chunksize, offset)
258        except OSError:
259            reader.closed = True
260            break
261        if buf:
262            buffer.write(buf)
263            offset += len(buf)
264        else:
265            reader.closed = True
266            break
267
268
269class BufferedFDParallelReader:
270    """Buffered, parallel background thread reader."""
271
272    def __init__(self, fd, buffer=None, chunksize=1024):
273        """
274        Parameters
275        ----------
276        fd : int
277            File descriptor from which to read.
278        buffer : binary file-like or None, optional
279            A buffer to write bytes into. If None, a new BytesIO object
280            is created.
281        chunksize : int, optional
282            The max size of the parallel reads, default 1 kb.
283        """
284        self.fd = fd
285        self.buffer = io.BytesIO() if buffer is None else buffer
286        self.chunksize = chunksize
287        self.closed = False
288        # start reading from stream
289        self.thread = threading.Thread(
290            target=populate_buffer, args=(self, fd, self.buffer, chunksize)
291        )
292        self.thread.daemon = True
293
294        self.thread.start()
295
296
297def _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd):
298    # if we are getting close to the end of the console buffer,
299    # expand it so that we can read from it successfully.
300    if cols == 0:
301        return orig_posize[-1], max_offset, orig_posize
302    rows = ((max_offset + expandsize) // cols) + 1
303    winutils.set_console_screen_buffer_size(cols, rows, fd=fd)
304    orig_posize = orig_posize[:3] + (rows,)
305    max_offset = (rows - 1) * cols
306    return rows, max_offset, orig_posize
307
308
309def populate_console(reader, fd, buffer, chunksize, queue, expandsize=None):
310    """Reads bytes from the file descriptor and puts lines into the queue.
311    The reads happened in parallel,
312    using xonsh.winutils.read_console_output_character(),
313    and is thus only available on windows. If the read fails for any reason,
314    the reader is flagged as closed.
315    """
316    # OK, so this function is super annoying because Windows stores its
317    # buffers as a 2D regular, dense array -- without trailing newlines.
318    # Meanwhile, we want to add *lines* to the queue. Also, as is typical
319    # with parallel reads, the entire buffer that you ask for may not be
320    # filled. Thus we have to deal with the full generality.
321    #   1. reads may end in the middle of a line
322    #   2. excess whitespace at the end of a line may not be real, unless
323    #   3. you haven't read to the end of the line yet!
324    # So there are alignment issues everywhere.  Also, Windows will automatically
325    # read past the current cursor position, even though there is presumably
326    # nothing to see there.
327    #
328    # These chunked reads basically need to happen like this because,
329    #   a. The default buffer size is HUGE for the console (90k lines x 120 cols)
330    #      as so we can't just read in everything at the end and see what we
331    #      care about without a noticeable performance hit.
332    #   b. Even with this huge size, it is still possible to write more lines than
333    #      this, so we should scroll along with the console.
334    # Unfortunately, because we do not have control over the terminal emulator,
335    # It is not possible to compute how far back we should set the beginning
336    # read position because we don't know how many characters have been popped
337    # off the top of the buffer. If we did somehow know this number we could do
338    # something like the following:
339    #
340    #    new_offset = (y*cols) + x
341    #    if new_offset == max_offset:
342    #        new_offset -= scrolled_offset
343    #        x = new_offset%cols
344    #        y = new_offset//cols
345    #        continue
346    #
347    # So this method is imperfect and only works as long as the screen has
348    # room to expand to.  Thus the trick here is to expand the screen size
349    # when we get close enough to the end of the screen. There remain some
350    # async issues related to not being able to set the cursor position.
351    # but they just affect the alignment / capture of the output of the
352    # first command run after a screen resize.
353    if expandsize is None:
354        expandsize = 100 * chunksize
355    x, y, cols, rows = posize = winutils.get_position_size(fd)
356    pre_x = pre_y = -1
357    orig_posize = posize
358    offset = (cols * y) + x
359    max_offset = (rows - 1) * cols
360    # I believe that there is a bug in PTK that if we reset the
361    # cursor position, the cursor on the next prompt is accidentally on
362    # the next line.  If this is fixed, uncomment the following line.
363    # if max_offset < offset + expandsize:
364    #     rows, max_offset, orig_posize = _expand_console_buffer(
365    #                                        cols, max_offset, expandsize,
366    #                                        orig_posize, fd)
367    #     winutils.set_console_cursor_position(x, y, fd=fd)
368    while True:
369        posize = winutils.get_position_size(fd)
370        offset = (cols * y) + x
371        if ((posize[1], posize[0]) <= (y, x) and posize[2:] == (cols, rows)) or (
372            pre_x == x and pre_y == y
373        ):
374            # already at or ahead of the current cursor position.
375            if reader.closed:
376                break
377            else:
378                time.sleep(reader.timeout)
379                continue
380        elif max_offset <= offset + expandsize:
381            ecb = _expand_console_buffer(cols, max_offset, expandsize, orig_posize, fd)
382            rows, max_offset, orig_posize = ecb
383            continue
384        elif posize[2:] == (cols, rows):
385            # cursor updated but screen size is the same.
386            pass
387        else:
388            # screen size changed, which is offset preserving
389            orig_posize = posize
390            cols, rows = posize[2:]
391            x = offset % cols
392            y = offset // cols
393            pre_x = pre_y = -1
394            max_offset = (rows - 1) * cols
395            continue
396        try:
397            buf = winutils.read_console_output_character(
398                x=x, y=y, fd=fd, buf=buffer, bufsize=chunksize, raw=True
399            )
400        except (OSError, IOError):
401            reader.closed = True
402            break
403        # cursor position and offset
404        if not reader.closed:
405            buf = buf.rstrip()
406        nread = len(buf)
407        if nread == 0:
408            time.sleep(reader.timeout)
409            continue
410        cur_x, cur_y = posize[0], posize[1]
411        cur_offset = (cols * cur_y) + cur_x
412        beg_offset = (cols * y) + x
413        end_offset = beg_offset + nread
414        if end_offset > cur_offset and cur_offset != max_offset:
415            buf = buf[: cur_offset - end_offset]
416        # convert to lines
417        xshift = cols - x
418        yshift = (nread // cols) + (1 if nread % cols > 0 else 0)
419        lines = [buf[:xshift]]
420        lines += [
421            buf[l * cols + xshift : (l + 1) * cols + xshift] for l in range(yshift)
422        ]
423        lines = [line for line in lines if line]
424        if not lines:
425            time.sleep(reader.timeout)
426            continue
427        # put lines in the queue
428        nl = b"\n"
429        for line in lines[:-1]:
430            queue.put(line.rstrip() + nl)
431        if len(lines[-1]) == xshift:
432            queue.put(lines[-1].rstrip() + nl)
433        else:
434            queue.put(lines[-1])
435        # update x and y locations
436        if (beg_offset + len(buf)) % cols == 0:
437            new_offset = beg_offset + len(buf)
438        else:
439            new_offset = beg_offset + len(buf.rstrip())
440        pre_x = x
441        pre_y = y
442        x = new_offset % cols
443        y = new_offset // cols
444        time.sleep(reader.timeout)
445
446
447class ConsoleParallelReader(QueueReader):
448    """Parallel reader for consoles that runs in a background thread.
449    This is only needed, available, and useful on Windows.
450    """
451
452    def __init__(self, fd, buffer=None, chunksize=1024, timeout=None):
453        """
454        Parameters
455        ----------
456        fd : int
457            Standard buffer file descriptor, 0 for stdin, 1 for stdout (default),
458            and 2 for stderr.
459        buffer : ctypes.c_wchar_p, optional
460            An existing buffer to (re-)use.
461        chunksize : int, optional
462            The max size of the parallel reads, default 1 kb.
463        timeout : float, optional
464            The queue reading timeout.
465        """
466        timeout = timeout or builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY")
467        super().__init__(fd, timeout=timeout)
468        self._buffer = buffer  # this cannot be public
469        if buffer is None:
470            self._buffer = ctypes.c_char_p(b" " * chunksize)
471        self.chunksize = chunksize
472        # start reading from stream
473        self.thread = threading.Thread(
474            target=populate_console,
475            args=(self, fd, self._buffer, chunksize, self.queue),
476        )
477        self.thread.daemon = True
478        self.thread.start()
479
480
481def safe_fdclose(handle, cache=None):
482    """Closes a file handle in the safest way possible, and potentially
483    storing the result.
484    """
485    if cache is not None and cache.get(handle, False):
486        return
487    status = True
488    if handle is None:
489        pass
490    elif isinstance(handle, int):
491        if handle >= 3:
492            # don't close stdin, stdout, stderr, -1
493            try:
494                os.close(handle)
495            except OSError:
496                status = False
497    elif handle is sys.stdin or handle is sys.stdout or handle is sys.stderr:
498        # don't close stdin, stdout, or stderr
499        pass
500    else:
501        try:
502            handle.close()
503        except OSError:
504            status = False
505    if cache is not None:
506        cache[handle] = status
507
508
509def safe_flush(handle):
510    """Attempts to safely flush a file handle, returns success bool."""
511    status = True
512    try:
513        handle.flush()
514    except OSError:
515        status = False
516    return status
517
518
519def still_writable(fd):
520    """Determines whether a file descriptor is still writable by trying to
521    write an empty string and seeing if it fails.
522    """
523    try:
524        os.write(fd, b"")
525        status = True
526    except OSError:
527        status = False
528    return status
529
530
531class PopenThread(threading.Thread):
532    """A thread for running and managing subprocess. This allows reading
533    from the stdin, stdout, and stderr streams in a non-blocking fashion.
534
535    This takes the same arguments and keyword arguments as regular Popen.
536    This requires that the captured_stdout and captured_stderr attributes
537    to be set following instantiation.
538    """
539
540    def __init__(self, *args, stdin=None, stdout=None, stderr=None, **kwargs):
541        super().__init__()
542        self.lock = threading.RLock()
543        env = builtins.__xonsh_env__
544        # stdin setup
545        self.orig_stdin = stdin
546        if stdin is None:
547            self.stdin_fd = 0
548        elif isinstance(stdin, int):
549            self.stdin_fd = stdin
550        else:
551            self.stdin_fd = stdin.fileno()
552        self.store_stdin = env.get("XONSH_STORE_STDIN")
553        self.timeout = env.get("XONSH_PROC_FREQUENCY")
554        self.in_alt_mode = False
555        self.stdin_mode = None
556        # stdout setup
557        self.orig_stdout = stdout
558        self.stdout_fd = 1 if stdout is None else stdout.fileno()
559        self._set_pty_size()
560        # stderr setup
561        self.orig_stderr = stderr
562        # Set some signal handles, if we can. Must come before process
563        # is started to prevent deadlock on windows
564        self.proc = None  # has to be here for closure for handles
565        self.old_int_handler = self.old_winch_handler = None
566        self.old_tstp_handler = self.old_quit_handler = None
567        if on_main_thread():
568            self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int)
569            if ON_POSIX:
570                self.old_tstp_handler = signal.signal(signal.SIGTSTP, self._signal_tstp)
571                self.old_quit_handler = signal.signal(signal.SIGQUIT, self._signal_quit)
572            if CAN_RESIZE_WINDOW:
573                self.old_winch_handler = signal.signal(
574                    signal.SIGWINCH, self._signal_winch
575                )
576        # start up process
577        if ON_WINDOWS and stdout is not None:
578            os.set_inheritable(stdout.fileno(), False)
579
580        try:
581            self.proc = proc = subprocess.Popen(
582                *args, stdin=stdin, stdout=stdout, stderr=stderr, **kwargs
583            )
584        except Exception:
585            self._clean_up()
586            raise
587
588        self.pid = proc.pid
589        self.universal_newlines = uninew = proc.universal_newlines
590        if uninew:
591            self.encoding = enc = env.get("XONSH_ENCODING")
592            self.encoding_errors = err = env.get("XONSH_ENCODING_ERRORS")
593            self.stdin = io.BytesIO()  # stdin is always bytes!
594            self.stdout = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
595            self.stderr = io.TextIOWrapper(io.BytesIO(), encoding=enc, errors=err)
596        else:
597            self.encoding = self.encoding_errors = None
598            self.stdin = io.BytesIO()
599            self.stdout = io.BytesIO()
600            self.stderr = io.BytesIO()
601        self.suspended = False
602        self.prevs_are_closed = False
603        self.start()
604
605    def run(self):
606        """Runs the subprocess by performing a parallel read on stdin if allowed,
607        and copying bytes from captured_stdout to stdout and bytes from
608        captured_stderr to stderr.
609        """
610        proc = self.proc
611        spec = self._wait_and_getattr("spec")
612        # get stdin and apply parallel reader if needed.
613        stdin = self.stdin
614        if self.orig_stdin is None:
615            origin = None
616        elif ON_POSIX and self.store_stdin:
617            origin = self.orig_stdin
618            origfd = origin if isinstance(origin, int) else origin.fileno()
619            origin = BufferedFDParallelReader(origfd, buffer=stdin)
620        else:
621            origin = None
622        # get non-blocking stdout
623        stdout = self.stdout.buffer if self.universal_newlines else self.stdout
624        capout = spec.captured_stdout
625        if capout is None:
626            procout = None
627        else:
628            procout = NonBlockingFDReader(capout.fileno(), timeout=self.timeout)
629        # get non-blocking stderr
630        stderr = self.stderr.buffer if self.universal_newlines else self.stderr
631        caperr = spec.captured_stderr
632        if caperr is None:
633            procerr = None
634        else:
635            procerr = NonBlockingFDReader(caperr.fileno(), timeout=self.timeout)
636        # initial read from buffer
637        self._read_write(procout, stdout, sys.__stdout__)
638        self._read_write(procerr, stderr, sys.__stderr__)
639        # loop over reads while process is running.
640        i = j = cnt = 1
641        while proc.poll() is None:
642            # this is here for CPU performance reasons.
643            if i + j == 0:
644                cnt = min(cnt + 1, 1000)
645                tout = self.timeout * cnt
646                if procout is not None:
647                    procout.timeout = tout
648                if procerr is not None:
649                    procerr.timeout = tout
650            elif cnt == 1:
651                pass
652            else:
653                cnt = 1
654                if procout is not None:
655                    procout.timeout = self.timeout
656                if procerr is not None:
657                    procerr.timeout = self.timeout
658            # redirect some output!
659            i = self._read_write(procout, stdout, sys.__stdout__)
660            j = self._read_write(procerr, stderr, sys.__stderr__)
661            if self.suspended:
662                break
663        if self.suspended:
664            return
665        # close files to send EOF to non-blocking reader.
666        # capout & caperr seem to be needed only by Windows, while
667        # orig_stdout & orig_stderr are need by posix and Windows.
668        # Also, order seems to matter here,
669        # with orig_* needed to be closed before cap*
670        safe_fdclose(self.orig_stdout)
671        safe_fdclose(self.orig_stderr)
672        if ON_WINDOWS:
673            safe_fdclose(capout)
674            safe_fdclose(caperr)
675        # read in the remaining data in a blocking fashion.
676        while (procout is not None and not procout.is_fully_read()) or (
677            procerr is not None and not procerr.is_fully_read()
678        ):
679            self._read_write(procout, stdout, sys.__stdout__)
680            self._read_write(procerr, stderr, sys.__stderr__)
681        # kill the process if it is still alive. Happens when piping.
682        if proc.poll() is None:
683            proc.terminate()
684
685    def _wait_and_getattr(self, name):
686        """make sure the instance has a certain attr, and return it."""
687        while not hasattr(self, name):
688            time.sleep(1e-7)
689        return getattr(self, name)
690
691    def _read_write(self, reader, writer, stdbuf):
692        """Reads a chunk of bytes from a buffer and write into memory or back
693        down to the standard buffer, as appropriate. Returns the number of
694        successful reads.
695        """
696        if reader is None:
697            return 0
698        i = -1
699        for i, chunk in enumerate(iter(reader.read_queue, b"")):
700            self._alt_mode_switch(chunk, writer, stdbuf)
701        if i >= 0:
702            writer.flush()
703            stdbuf.flush()
704        return i + 1
705
706    def _alt_mode_switch(self, chunk, membuf, stdbuf):
707        """Enables recursively switching between normal capturing mode
708        and 'alt' mode, which passes through values to the standard
709        buffer. Pagers, text editors, curses applications, etc. use
710        alternate mode.
711        """
712        i, flag = findfirst(chunk, ALTERNATE_MODE_FLAGS)
713        if flag is None:
714            self._alt_mode_writer(chunk, membuf, stdbuf)
715        else:
716            # This code is executed when the child process switches the
717            # terminal into or out of alternate mode. The line below assumes
718            # that the user has opened vim, less, or similar, and writes writes
719            # to stdin.
720            j = i + len(flag)
721            # write the first part of the chunk in the current mode.
722            self._alt_mode_writer(chunk[:i], membuf, stdbuf)
723            # switch modes
724            # write the flag itself the current mode where alt mode is on
725            # so that it is streamed to the terminal ASAP.
726            # this is needed for terminal emulators to find the correct
727            # positions before and after alt mode.
728            alt_mode = flag in START_ALTERNATE_MODE
729            if alt_mode:
730                self.in_alt_mode = alt_mode
731                self._alt_mode_writer(flag, membuf, stdbuf)
732                self._enable_cbreak_stdin()
733            else:
734                self._alt_mode_writer(flag, membuf, stdbuf)
735                self.in_alt_mode = alt_mode
736                self._disable_cbreak_stdin()
737            # recurse this function, but without the current flag.
738            self._alt_mode_switch(chunk[j:], membuf, stdbuf)
739
740    def _alt_mode_writer(self, chunk, membuf, stdbuf):
741        """Write bytes to the standard buffer if in alt mode or otherwise
742        to the in-memory buffer.
743        """
744        if not chunk:
745            pass  # don't write empty values
746        elif self.in_alt_mode:
747            stdbuf.buffer.write(chunk)
748        else:
749            with self.lock:
750                p = membuf.tell()
751                membuf.seek(0, io.SEEK_END)
752                membuf.write(chunk)
753                membuf.seek(p)
754
755    #
756    # Window resize handlers
757    #
758
759    def _signal_winch(self, signum, frame):
760        """Signal handler for SIGWINCH - window size has changed."""
761        self.send_signal(signal.SIGWINCH)
762        self._set_pty_size()
763
764    def _set_pty_size(self):
765        """Sets the window size of the child pty based on the window size of
766        our own controlling terminal.
767        """
768        if ON_WINDOWS or not os.isatty(self.stdout_fd):
769            return
770        # Get the terminal size of the real terminal, set it on the
771        #       pseudoterminal.
772        buf = array.array("h", [0, 0, 0, 0])
773        # 1 = stdout here
774        try:
775            fcntl.ioctl(1, termios.TIOCGWINSZ, buf, True)
776            fcntl.ioctl(self.stdout_fd, termios.TIOCSWINSZ, buf)
777        except OSError:
778            pass
779
780    #
781    # SIGINT handler
782    #
783
784    def _signal_int(self, signum, frame):
785        """Signal handler for SIGINT - Ctrl+C may have been pressed."""
786        self.send_signal(signum)
787        if self.proc is not None and self.proc.poll() is not None:
788            self._restore_sigint(frame=frame)
789        if on_main_thread():
790            signal.pthread_kill(threading.get_ident(), signal.SIGINT)
791
792    def _restore_sigint(self, frame=None):
793        old = self.old_int_handler
794        if old is not None:
795            if on_main_thread():
796                signal.signal(signal.SIGINT, old)
797            self.old_int_handler = None
798        if frame is not None:
799            self._disable_cbreak_stdin()
800            if old is not None and old is not self._signal_int:
801                old(signal.SIGINT, frame)
802
803    #
804    # SIGTSTP handler
805    #
806
807    def _signal_tstp(self, signum, frame):
808        """Signal handler for suspending SIGTSTP - Ctrl+Z may have been pressed.
809        """
810        self.suspended = True
811        self.send_signal(signum)
812        self._restore_sigtstp(frame=frame)
813
814    def _restore_sigtstp(self, frame=None):
815        old = self.old_tstp_handler
816        if old is not None:
817            if on_main_thread():
818                signal.signal(signal.SIGTSTP, old)
819            self.old_tstp_handler = None
820        if frame is not None:
821            self._disable_cbreak_stdin()
822
823    #
824    # SIGQUIT handler
825    #
826
827    def _signal_quit(self, signum, frame):
828        """Signal handler for quiting SIGQUIT - Ctrl+\ may have been pressed.
829        """
830        self.send_signal(signum)
831        self._restore_sigquit(frame=frame)
832
833    def _restore_sigquit(self, frame=None):
834        old = self.old_quit_handler
835        if old is not None:
836            if on_main_thread():
837                signal.signal(signal.SIGQUIT, old)
838            self.old_quit_handler = None
839        if frame is not None:
840            self._disable_cbreak_stdin()
841
842    #
843    # cbreak mode handlers
844    #
845
846    def _enable_cbreak_stdin(self):
847        if not ON_POSIX:
848            return
849        try:
850            self.stdin_mode = termios.tcgetattr(self.stdin_fd)[:]
851        except termios.error:
852            # this can happen for cases where another process is controlling
853            # xonsh's tty device, such as in testing.
854            self.stdin_mode = None
855            return
856        new = self.stdin_mode[:]
857        new[LFLAG] &= ~(termios.ECHO | termios.ICANON)
858        new[CC][termios.VMIN] = 1
859        new[CC][termios.VTIME] = 0
860        try:
861            # termios.TCSAFLUSH may be less reliable than termios.TCSANOW
862            termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new)
863        except termios.error:
864            self._disable_cbreak_stdin()
865
866    def _disable_cbreak_stdin(self):
867        if not ON_POSIX or self.stdin_mode is None:
868            return
869        new = self.stdin_mode[:]
870        new[LFLAG] |= termios.ECHO | termios.ICANON
871        new[CC][termios.VMIN] = 1
872        new[CC][termios.VTIME] = 0
873        try:
874            termios.tcsetattr(self.stdin_fd, termios.TCSANOW, new)
875        except termios.error:
876            pass
877
878    #
879    # Dispatch methods
880    #
881
882    def poll(self):
883        """Dispatches to Popen.returncode."""
884        return self.proc.returncode
885
886    def wait(self, timeout=None):
887        """Dispatches to Popen.wait(), but also does process cleanup such as
888        joining this thread and replacing the original window size signal
889        handler.
890        """
891        self._disable_cbreak_stdin()
892        rtn = self.proc.wait(timeout=timeout)
893        self.join()
894        # need to replace the old signal handlers somewhere...
895        if self.old_winch_handler is not None and on_main_thread():
896            signal.signal(signal.SIGWINCH, self.old_winch_handler)
897            self.old_winch_handler = None
898        self._clean_up()
899        return rtn
900
901    def _clean_up(self):
902        self._restore_sigint()
903        self._restore_sigtstp()
904        self._restore_sigquit()
905
906    @property
907    def returncode(self):
908        """Process return code."""
909        return self.proc.returncode
910
911    @returncode.setter
912    def returncode(self, value):
913        """Process return code."""
914        self.proc.returncode = value
915
916    @property
917    def signal(self):
918        """Process signal, or None."""
919        s = getattr(self.proc, "signal", None)
920        if s is None:
921            rtn = self.returncode
922            if rtn is not None and rtn != 0:
923                s = (-1 * rtn, rtn < 0 if ON_WINDOWS else os.WCOREDUMP(rtn))
924        return s
925
926    @signal.setter
927    def signal(self, value):
928        """Process signal, or None."""
929        self.proc.signal = value
930
931    def send_signal(self, signal):
932        """Dispatches to Popen.send_signal()."""
933        dt = 0.0
934        while self.proc is None and dt < self.timeout:
935            time.sleep(1e-7)
936            dt += 1e-7
937        if self.proc is None:
938            return
939        try:
940            rtn = self.proc.send_signal(signal)
941        except ProcessLookupError:
942            # This can happen in the case of !(cmd) when the command has ended
943            rtn = None
944        return rtn
945
946    def terminate(self):
947        """Dispatches to Popen.terminate()."""
948        return self.proc.terminate()
949
950    def kill(self):
951        """Dispatches to Popen.kill()."""
952        return self.proc.kill()
953
954
955class Handle(int):
956    closed = False
957
958    def Close(self, CloseHandle=None):
959        CloseHandle = CloseHandle or _winapi.CloseHandle
960        if not self.closed:
961            self.closed = True
962            CloseHandle(self)
963
964    def Detach(self):
965        if not self.closed:
966            self.closed = True
967            return int(self)
968        raise ValueError("already closed")
969
970    def __repr__(self):
971        return "Handle(%d)" % int(self)
972
973    __del__ = Close
974    __str__ = __repr__
975
976
977class FileThreadDispatcher:
978    """Dispatches to different file handles depending on the
979    current thread. Useful if you want file operation to go to different
980    places for different threads.
981    """
982
983    def __init__(self, default=None):
984        """
985        Parameters
986        ----------
987        default : file-like or None, optional
988            The file handle to write to if a thread cannot be found in
989            the registry. If None, a new in-memory instance.
990
991        Attributes
992        ----------
993        registry : dict
994            Maps thread idents to file handles.
995        """
996        if default is None:
997            default = io.TextIOWrapper(io.BytesIO())
998        self.default = default
999        self.registry = {}
1000
1001    def register(self, handle):
1002        """Registers a file handle for the current thread. Returns self so
1003        that this method can be used in a with-statement.
1004        """
1005        self.registry[threading.get_ident()] = handle
1006        return self
1007
1008    def deregister(self):
1009        """Removes the current thread from the registry."""
1010        del self.registry[threading.get_ident()]
1011
1012    @property
1013    def available(self):
1014        """True if the thread is available in the registry."""
1015        return threading.get_ident() in self.registry
1016
1017    @property
1018    def handle(self):
1019        """Gets the current handle for the thread."""
1020        return self.registry.get(threading.get_ident(), self.default)
1021
1022    def __enter__(self):
1023        pass
1024
1025    def __exit__(self, ex_type, ex_value, ex_traceback):
1026        self.deregister()
1027
1028    #
1029    # io.TextIOBase interface
1030    #
1031
1032    @property
1033    def encoding(self):
1034        """Gets the encoding for this thread's handle."""
1035        return self.handle.encoding
1036
1037    @property
1038    def errors(self):
1039        """Gets the errors for this thread's handle."""
1040        return self.handle.errors
1041
1042    @property
1043    def newlines(self):
1044        """Gets the newlines for this thread's handle."""
1045        return self.handle.newlines
1046
1047    @property
1048    def buffer(self):
1049        """Gets the buffer for this thread's handle."""
1050        return self.handle.buffer
1051
1052    def detach(self):
1053        """Detaches the buffer for the current thread."""
1054        return self.handle.detach()
1055
1056    def read(self, size=None):
1057        """Reads from the handle for the current thread."""
1058        return self.handle.read(size)
1059
1060    def readline(self, size=-1):
1061        """Reads a line from the handle for the current thread."""
1062        return self.handle.readline(size)
1063
1064    def readlines(self, hint=-1):
1065        """Reads lines from the handle for the current thread."""
1066        return self.handle.readlines(hint)
1067
1068    def seek(self, offset, whence=io.SEEK_SET):
1069        """Seeks the current file."""
1070        return self.handle.seek(offset, whence)
1071
1072    def tell(self):
1073        """Reports the current position in the handle for the current thread."""
1074        return self.handle.tell()
1075
1076    def write(self, s):
1077        """Writes to this thread's handle. This also flushes, just to be
1078        extra sure the string was written.
1079        """
1080        h = self.handle
1081        try:
1082            r = h.write(s)
1083            h.flush()
1084        except OSError:
1085            r = None
1086        return r
1087
1088    @property
1089    def line_buffering(self):
1090        """Gets if line buffering for this thread's handle enabled."""
1091        return self.handle.line_buffering
1092
1093    #
1094    # io.IOBase interface
1095    #
1096
1097    def close(self):
1098        """Closes the current thread's handle."""
1099        return self.handle.close()
1100
1101    @property
1102    def closed(self):
1103        """Is the thread's handle closed."""
1104        return self.handle.closed
1105
1106    def fileno(self):
1107        """Returns the file descriptor for the current thread."""
1108        return self.handle.fileno()
1109
1110    def flush(self):
1111        """Flushes the file descriptor for the current thread."""
1112        return safe_flush(self.handle)
1113
1114    def isatty(self):
1115        """Returns if the file descriptor for the current thread is a tty."""
1116        return self.handle.isatty()
1117
1118    def readable(self):
1119        """Returns if file descriptor for the current thread is readable."""
1120        return self.handle.readable()
1121
1122    def seekable(self):
1123        """Returns if file descriptor for the current thread is seekable."""
1124        return self.handle.seekable()
1125
1126    def truncate(self, size=None):
1127        """Truncates the file for for the current thread."""
1128        return self.handle.truncate()
1129
1130    def writable(self, size=None):
1131        """Returns if file descriptor for the current thread is writable."""
1132        return self.handle.writable(size)
1133
1134    def writelines(self):
1135        """Writes lines for the file descriptor for the current thread."""
1136        return self.handle.writelines()
1137
1138
1139# These should NOT be lazy since they *need* to get the true stdout from the
1140# main thread. Also their creation time should be negligible.
1141STDOUT_DISPATCHER = FileThreadDispatcher(default=sys.stdout)
1142STDERR_DISPATCHER = FileThreadDispatcher(default=sys.stderr)
1143
1144
1145def parse_proxy_return(r, stdout, stderr):
1146    """Proxies may return a variety of outputs. This handles them generally.
1147
1148    Parameters
1149    ----------
1150    r : tuple, str, int, or None
1151        Return from proxy function
1152    stdout : file-like
1153        Current stdout stream
1154    stdout : file-like
1155        Current stderr stream
1156
1157    Returns
1158    -------
1159    cmd_result : int
1160        The return code of the proxy
1161    """
1162    cmd_result = 0
1163    if isinstance(r, str):
1164        stdout.write(r)
1165        stdout.flush()
1166    elif isinstance(r, int):
1167        cmd_result = r
1168    elif isinstance(r, cabc.Sequence):
1169        rlen = len(r)
1170        if rlen > 0 and r[0] is not None:
1171            stdout.write(r[0])
1172            stdout.flush()
1173        if rlen > 1 and r[1] is not None:
1174            stderr.write(r[1])
1175            stderr.flush()
1176        if rlen > 2 and r[2] is not None:
1177            cmd_result = r[2]
1178    elif r is not None:
1179        # for the random object...
1180        stdout.write(str(r))
1181        stdout.flush()
1182    return cmd_result
1183
1184
1185def proxy_zero(f, args, stdin, stdout, stderr, spec, stack):
1186    """Calls a proxy function which takes no parameters."""
1187    return f()
1188
1189
1190def proxy_one(f, args, stdin, stdout, stderr, spec, stack):
1191    """Calls a proxy function which takes one parameter: args"""
1192    return f(args)
1193
1194
1195def proxy_two(f, args, stdin, stdout, stderr, spec, stack):
1196    """Calls a proxy function which takes two parameter: args and stdin."""
1197    return f(args, stdin)
1198
1199
1200def proxy_three(f, args, stdin, stdout, stderr, spec, stack):
1201    """Calls a proxy function which takes three parameter: args, stdin, stdout.
1202    """
1203    return f(args, stdin, stdout)
1204
1205
1206def proxy_four(f, args, stdin, stdout, stderr, spec, stack):
1207    """Calls a proxy function which takes four parameter: args, stdin, stdout,
1208    and stderr.
1209    """
1210    return f(args, stdin, stdout, stderr)
1211
1212
1213def proxy_five(f, args, stdin, stdout, stderr, spec, stack):
1214    """Calls a proxy function which takes four parameter: args, stdin, stdout,
1215    stderr, and spec.
1216    """
1217    return f(args, stdin, stdout, stderr, spec)
1218
1219
1220PROXIES = (proxy_zero, proxy_one, proxy_two, proxy_three, proxy_four, proxy_five)
1221PROXY_KWARG_NAMES = frozenset(["args", "stdin", "stdout", "stderr", "spec", "stack"])
1222
1223
1224def partial_proxy(f):
1225    """Dispatches the appropriate proxy function based on the number of args."""
1226    numargs = 0
1227    for name, param in inspect.signature(f).parameters.items():
1228        if (
1229            param.kind == param.POSITIONAL_ONLY
1230            or param.kind == param.POSITIONAL_OR_KEYWORD
1231        ):
1232            numargs += 1
1233        elif name in PROXY_KWARG_NAMES and param.kind == param.KEYWORD_ONLY:
1234            numargs += 1
1235    if numargs < 6:
1236        return functools.partial(PROXIES[numargs], f)
1237    elif numargs == 6:
1238        # don't need to partial.
1239        return f
1240    else:
1241        e = "Expected proxy with 6 or fewer arguments for {}, not {}"
1242        raise XonshError(e.format(", ".join(PROXY_KWARG_NAMES), numargs))
1243
1244
1245class ProcProxyThread(threading.Thread):
1246    """
1247    Class representing a function to be run as a subprocess-mode command.
1248    """
1249
1250    def __init__(
1251        self,
1252        f,
1253        args,
1254        stdin=None,
1255        stdout=None,
1256        stderr=None,
1257        universal_newlines=False,
1258        env=None,
1259    ):
1260        """Parameters
1261        ----------
1262        f : function
1263            The function to be executed.
1264        args : list
1265            A (possibly empty) list containing the arguments that were given on
1266            the command line
1267        stdin : file-like, optional
1268            A file-like object representing stdin (input can be read from
1269            here).  If `stdin` is not provided or if it is explicitly set to
1270            `None`, then an instance of `io.StringIO` representing an empty
1271            file is used.
1272        stdout : file-like, optional
1273            A file-like object representing stdout (normal output can be
1274            written here).  If `stdout` is not provided or if it is explicitly
1275            set to `None`, then `sys.stdout` is used.
1276        stderr : file-like, optional
1277            A file-like object representing stderr (error output can be
1278            written here).  If `stderr` is not provided or if it is explicitly
1279            set to `None`, then `sys.stderr` is used.
1280        universal_newlines : bool, optional
1281            Whether or not to use universal newlines.
1282        env : Mapping, optional
1283            Environment mapping.
1284        """
1285        self.orig_f = f
1286        self.f = partial_proxy(f)
1287        self.args = args
1288        self.pid = None
1289        self.returncode = None
1290        self._closed_handle_cache = {}
1291
1292        handles = self._get_handles(stdin, stdout, stderr)
1293        (
1294            self.p2cread,
1295            self.p2cwrite,
1296            self.c2pread,
1297            self.c2pwrite,
1298            self.errread,
1299            self.errwrite,
1300        ) = handles
1301
1302        # default values
1303        self.stdin = stdin
1304        self.stdout = stdout
1305        self.stderr = stderr
1306        self.env = env or builtins.__xonsh_env__
1307        self._interrupted = False
1308
1309        if ON_WINDOWS:
1310            if self.p2cwrite != -1:
1311                self.p2cwrite = msvcrt.open_osfhandle(self.p2cwrite.Detach(), 0)
1312            if self.c2pread != -1:
1313                self.c2pread = msvcrt.open_osfhandle(self.c2pread.Detach(), 0)
1314            if self.errread != -1:
1315                self.errread = msvcrt.open_osfhandle(self.errread.Detach(), 0)
1316
1317        if self.p2cwrite != -1:
1318            self.stdin = io.open(self.p2cwrite, "wb", -1)
1319            if universal_newlines:
1320                self.stdin = io.TextIOWrapper(
1321                    self.stdin, write_through=True, line_buffering=False
1322                )
1323        elif isinstance(stdin, int) and stdin != 0:
1324            self.stdin = io.open(stdin, "wb", -1)
1325
1326        if self.c2pread != -1:
1327            self.stdout = io.open(self.c2pread, "rb", -1)
1328            if universal_newlines:
1329                self.stdout = io.TextIOWrapper(self.stdout)
1330
1331        if self.errread != -1:
1332            self.stderr = io.open(self.errread, "rb", -1)
1333            if universal_newlines:
1334                self.stderr = io.TextIOWrapper(self.stderr)
1335
1336        # Set some signal handles, if we can. Must come before process
1337        # is started to prevent deadlock on windows
1338        self.old_int_handler = None
1339        if on_main_thread():
1340            self.old_int_handler = signal.signal(signal.SIGINT, self._signal_int)
1341        # start up the proc
1342        super().__init__()
1343        self.start()
1344
1345    def __del__(self):
1346        self._restore_sigint()
1347
1348    def run(self):
1349        """Set up input/output streams and execute the child function in a new
1350        thread.  This is part of the `threading.Thread` interface and should
1351        not be called directly.
1352        """
1353        if self.f is None:
1354            return
1355        spec = self._wait_and_getattr("spec")
1356        last_in_pipeline = spec.last_in_pipeline
1357        if last_in_pipeline:
1358            capout = spec.captured_stdout  # NOQA
1359            caperr = spec.captured_stderr  # NOQA
1360        env = builtins.__xonsh_env__
1361        enc = env.get("XONSH_ENCODING")
1362        err = env.get("XONSH_ENCODING_ERRORS")
1363        if ON_WINDOWS:
1364            if self.p2cread != -1:
1365                self.p2cread = msvcrt.open_osfhandle(self.p2cread.Detach(), 0)
1366            if self.c2pwrite != -1:
1367                self.c2pwrite = msvcrt.open_osfhandle(self.c2pwrite.Detach(), 0)
1368            if self.errwrite != -1:
1369                self.errwrite = msvcrt.open_osfhandle(self.errwrite.Detach(), 0)
1370        # get stdin
1371        if self.stdin is None:
1372            sp_stdin = None
1373        elif self.p2cread != -1:
1374            sp_stdin = io.TextIOWrapper(
1375                io.open(self.p2cread, "rb", -1), encoding=enc, errors=err
1376            )
1377        else:
1378            sp_stdin = sys.stdin
1379        # stdout
1380        if self.c2pwrite != -1:
1381            sp_stdout = io.TextIOWrapper(
1382                io.open(self.c2pwrite, "wb", -1), encoding=enc, errors=err
1383            )
1384        else:
1385            sp_stdout = sys.stdout
1386        # stderr
1387        if self.errwrite == self.c2pwrite:
1388            sp_stderr = sp_stdout
1389        elif self.errwrite != -1:
1390            sp_stderr = io.TextIOWrapper(
1391                io.open(self.errwrite, "wb", -1), encoding=enc, errors=err
1392            )
1393        else:
1394            sp_stderr = sys.stderr
1395        # run the function itself
1396        try:
1397            with STDOUT_DISPATCHER.register(sp_stdout), STDERR_DISPATCHER.register(
1398                sp_stderr
1399            ), redirect_stdout(STDOUT_DISPATCHER), redirect_stderr(STDERR_DISPATCHER):
1400                r = self.f(self.args, sp_stdin, sp_stdout, sp_stderr, spec, spec.stack)
1401        except SystemExit as e:
1402            r = e.code if isinstance(e.code, int) else int(bool(e.code))
1403        except OSError as e:
1404            status = still_writable(self.c2pwrite) and still_writable(self.errwrite)
1405            if status:
1406                # stdout and stderr are still writable, so error must
1407                # come from function itself.
1408                print_exception()
1409                r = 1
1410            else:
1411                # stdout and stderr are no longer writable, so error must
1412                # come from the fact that the next process in the pipeline
1413                # has closed the other side of the pipe. The function then
1414                # attempted to write to this side of the pipe anyway. This
1415                # is not truly an error and we should exit gracefully.
1416                r = 0
1417        except Exception:
1418            print_exception()
1419            r = 1
1420        safe_flush(sp_stdout)
1421        safe_flush(sp_stderr)
1422        self.returncode = parse_proxy_return(r, sp_stdout, sp_stderr)
1423        if not last_in_pipeline and not ON_WINDOWS:
1424            # mac requires us *not to* close the handles here while
1425            # windows requires us *to* close the handles here
1426            return
1427        # clean up
1428        # scopz: not sure why this is needed, but stdin cannot go here
1429        # and stdout & stderr must.
1430        handles = [self.stdout, self.stderr]
1431        for handle in handles:
1432            safe_fdclose(handle, cache=self._closed_handle_cache)
1433
1434    def _wait_and_getattr(self, name):
1435        """make sure the instance has a certain attr, and return it."""
1436        while not hasattr(self, name):
1437            time.sleep(1e-7)
1438        return getattr(self, name)
1439
1440    def poll(self):
1441        """Check if the function has completed.
1442
1443        Returns
1444        -------
1445        None if the function is still executing, and the returncode otherwise
1446        """
1447        return self.returncode
1448
1449    def wait(self, timeout=None):
1450        """Waits for the process to finish and returns the return code."""
1451        self.join()
1452        self._restore_sigint()
1453        return self.returncode
1454
1455    #
1456    # SIGINT handler
1457    #
1458
1459    def _signal_int(self, signum, frame):
1460        """Signal handler for SIGINT - Ctrl+C may have been pressed."""
1461        # Check if we have already been interrupted. This should prevent
1462        # the possibility of infinite recursion.
1463        if self._interrupted:
1464            return
1465        self._interrupted = True
1466        # close file handles here to stop an processes piped to us.
1467        handles = (
1468            self.p2cread,
1469            self.p2cwrite,
1470            self.c2pread,
1471            self.c2pwrite,
1472            self.errread,
1473            self.errwrite,
1474        )
1475        for handle in handles:
1476            safe_fdclose(handle)
1477        if self.poll() is not None:
1478            self._restore_sigint(frame=frame)
1479        if on_main_thread():
1480            signal.pthread_kill(threading.get_ident(), signal.SIGINT)
1481
1482    def _restore_sigint(self, frame=None):
1483        old = self.old_int_handler
1484        if old is not None:
1485            if on_main_thread():
1486                signal.signal(signal.SIGINT, old)
1487            self.old_int_handler = None
1488        if frame is not None:
1489            if old is not None and old is not self._signal_int:
1490                old(signal.SIGINT, frame)
1491        if self._interrupted:
1492            self.returncode = 1
1493
1494    # The code below (_get_devnull, _get_handles, and _make_inheritable) comes
1495    # from subprocess.py in the Python 3.4.2 Standard Library
1496    def _get_devnull(self):
1497        if not hasattr(self, "_devnull"):
1498            self._devnull = os.open(os.devnull, os.O_RDWR)
1499        return self._devnull
1500
1501    if ON_WINDOWS:
1502
1503        def _make_inheritable(self, handle):
1504            """Return a duplicate of handle, which is inheritable"""
1505            h = _winapi.DuplicateHandle(
1506                _winapi.GetCurrentProcess(),
1507                handle,
1508                _winapi.GetCurrentProcess(),
1509                0,
1510                1,
1511                _winapi.DUPLICATE_SAME_ACCESS,
1512            )
1513            return Handle(h)
1514
1515        def _get_handles(self, stdin, stdout, stderr):
1516            """Construct and return tuple with IO objects:
1517            p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
1518            """
1519            if stdin is None and stdout is None and stderr is None:
1520                return (-1, -1, -1, -1, -1, -1)
1521
1522            p2cread, p2cwrite = -1, -1
1523            c2pread, c2pwrite = -1, -1
1524            errread, errwrite = -1, -1
1525
1526            if stdin is None:
1527                p2cread = _winapi.GetStdHandle(_winapi.STD_INPUT_HANDLE)
1528                if p2cread is None:
1529                    p2cread, _ = _winapi.CreatePipe(None, 0)
1530                    p2cread = Handle(p2cread)
1531                    _winapi.CloseHandle(_)
1532            elif stdin == subprocess.PIPE:
1533                p2cread, p2cwrite = Handle(p2cread), Handle(p2cwrite)
1534            elif stdin == subprocess.DEVNULL:
1535                p2cread = msvcrt.get_osfhandle(self._get_devnull())
1536            elif isinstance(stdin, int):
1537                p2cread = msvcrt.get_osfhandle(stdin)
1538            else:
1539                # Assuming file-like object
1540                p2cread = msvcrt.get_osfhandle(stdin.fileno())
1541            p2cread = self._make_inheritable(p2cread)
1542
1543            if stdout is None:
1544                c2pwrite = _winapi.GetStdHandle(_winapi.STD_OUTPUT_HANDLE)
1545                if c2pwrite is None:
1546                    _, c2pwrite = _winapi.CreatePipe(None, 0)
1547                    c2pwrite = Handle(c2pwrite)
1548                    _winapi.CloseHandle(_)
1549            elif stdout == subprocess.PIPE:
1550                c2pread, c2pwrite = _winapi.CreatePipe(None, 0)
1551                c2pread, c2pwrite = Handle(c2pread), Handle(c2pwrite)
1552            elif stdout == subprocess.DEVNULL:
1553                c2pwrite = msvcrt.get_osfhandle(self._get_devnull())
1554            elif isinstance(stdout, int):
1555                c2pwrite = msvcrt.get_osfhandle(stdout)
1556            else:
1557                # Assuming file-like object
1558                c2pwrite = msvcrt.get_osfhandle(stdout.fileno())
1559            c2pwrite = self._make_inheritable(c2pwrite)
1560
1561            if stderr is None:
1562                errwrite = _winapi.GetStdHandle(_winapi.STD_ERROR_HANDLE)
1563                if errwrite is None:
1564                    _, errwrite = _winapi.CreatePipe(None, 0)
1565                    errwrite = Handle(errwrite)
1566                    _winapi.CloseHandle(_)
1567            elif stderr == subprocess.PIPE:
1568                errread, errwrite = _winapi.CreatePipe(None, 0)
1569                errread, errwrite = Handle(errread), Handle(errwrite)
1570            elif stderr == subprocess.STDOUT:
1571                errwrite = c2pwrite
1572            elif stderr == subprocess.DEVNULL:
1573                errwrite = msvcrt.get_osfhandle(self._get_devnull())
1574            elif isinstance(stderr, int):
1575                errwrite = msvcrt.get_osfhandle(stderr)
1576            else:
1577                # Assuming file-like object
1578                errwrite = msvcrt.get_osfhandle(stderr.fileno())
1579            errwrite = self._make_inheritable(errwrite)
1580
1581            return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
1582
1583    else:
1584        # POSIX versions
1585        def _get_handles(self, stdin, stdout, stderr):
1586            """Construct and return tuple with IO objects:
1587            p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite
1588            """
1589            p2cread, p2cwrite = -1, -1
1590            c2pread, c2pwrite = -1, -1
1591            errread, errwrite = -1, -1
1592
1593            if stdin is None:
1594                pass
1595            elif stdin == subprocess.PIPE:
1596                p2cread, p2cwrite = os.pipe()
1597            elif stdin == subprocess.DEVNULL:
1598                p2cread = self._get_devnull()
1599            elif isinstance(stdin, int):
1600                p2cread = stdin
1601            else:
1602                # Assuming file-like object
1603                p2cread = stdin.fileno()
1604
1605            if stdout is None:
1606                pass
1607            elif stdout == subprocess.PIPE:
1608                c2pread, c2pwrite = os.pipe()
1609            elif stdout == subprocess.DEVNULL:
1610                c2pwrite = self._get_devnull()
1611            elif isinstance(stdout, int):
1612                c2pwrite = stdout
1613            else:
1614                # Assuming file-like object
1615                c2pwrite = stdout.fileno()
1616
1617            if stderr is None:
1618                pass
1619            elif stderr == subprocess.PIPE:
1620                errread, errwrite = os.pipe()
1621            elif stderr == subprocess.STDOUT:
1622                errwrite = c2pwrite
1623            elif stderr == subprocess.DEVNULL:
1624                errwrite = self._get_devnull()
1625            elif isinstance(stderr, int):
1626                errwrite = stderr
1627            else:
1628                # Assuming file-like object
1629                errwrite = stderr.fileno()
1630
1631            return (p2cread, p2cwrite, c2pread, c2pwrite, errread, errwrite)
1632
1633
1634#
1635# Foreground Thread Process Proxies
1636#
1637
1638
1639class ProcProxy(object):
1640    """This is process proxy class that runs its alias functions on the
1641    same thread that it was called from, which is typically the main thread.
1642    This prevents the process from running on a background thread, but enables
1643    debugger and profiler tools (functions) be run on the same thread that they
1644    are attempting to debug.
1645    """
1646
1647    def __init__(
1648        self,
1649        f,
1650        args,
1651        stdin=None,
1652        stdout=None,
1653        stderr=None,
1654        universal_newlines=False,
1655        env=None,
1656    ):
1657        self.orig_f = f
1658        self.f = partial_proxy(f)
1659        self.args = args
1660        self.pid = os.getpid()
1661        self.returncode = None
1662        self.stdin = stdin
1663        self.stdout = stdout
1664        self.stderr = stderr
1665        self.universal_newlines = universal_newlines
1666        self.env = env
1667
1668    def poll(self):
1669        """Check if the function has completed via the returncode or None.
1670        """
1671        return self.returncode
1672
1673    def wait(self, timeout=None):
1674        """Runs the function and returns the result. Timeout argument only
1675        present for API compatibility.
1676        """
1677        if self.f is None:
1678            return 0
1679        env = builtins.__xonsh_env__
1680        enc = env.get("XONSH_ENCODING")
1681        err = env.get("XONSH_ENCODING_ERRORS")
1682        spec = self._wait_and_getattr("spec")
1683        # set file handles
1684        if self.stdin is None:
1685            stdin = None
1686        else:
1687            if isinstance(self.stdin, int):
1688                inbuf = io.open(self.stdin, "rb", -1)
1689            else:
1690                inbuf = self.stdin
1691            stdin = io.TextIOWrapper(inbuf, encoding=enc, errors=err)
1692        stdout = self._pick_buf(self.stdout, sys.stdout, enc, err)
1693        stderr = self._pick_buf(self.stderr, sys.stderr, enc, err)
1694        # run the actual function
1695        try:
1696            r = self.f(self.args, stdin, stdout, stderr, spec, spec.stack)
1697        except Exception:
1698            print_exception()
1699            r = 1
1700        self.returncode = parse_proxy_return(r, stdout, stderr)
1701        safe_flush(stdout)
1702        safe_flush(stderr)
1703        return self.returncode
1704
1705    @staticmethod
1706    def _pick_buf(handle, sysbuf, enc, err):
1707        if handle is None or handle is sysbuf:
1708            buf = sysbuf
1709        elif isinstance(handle, int):
1710            if handle < 3:
1711                buf = sysbuf
1712            else:
1713                buf = io.TextIOWrapper(
1714                    io.open(handle, "wb", -1), encoding=enc, errors=err
1715                )
1716        elif hasattr(handle, "encoding"):
1717            # must be a text stream, no need to wrap.
1718            buf = handle
1719        else:
1720            # must be a binary stream, should wrap it.
1721            buf = io.TextIOWrapper(handle, encoding=enc, errors=err)
1722        return buf
1723
1724    def _wait_and_getattr(self, name):
1725        """make sure the instance has a certain attr, and return it."""
1726        while not hasattr(self, name):
1727            time.sleep(1e-7)
1728        return getattr(self, name)
1729
1730
1731@lazyobject
1732def SIGNAL_MESSAGES():
1733    sm = {
1734        signal.SIGABRT: "Aborted",
1735        signal.SIGFPE: "Floating point exception",
1736        signal.SIGILL: "Illegal instructions",
1737        signal.SIGTERM: "Terminated",
1738        signal.SIGSEGV: "Segmentation fault",
1739    }
1740    if ON_POSIX:
1741        sm.update(
1742            {signal.SIGQUIT: "Quit", signal.SIGHUP: "Hangup", signal.SIGKILL: "Killed"}
1743        )
1744    return sm
1745
1746
1747def safe_readlines(handle, hint=-1):
1748    """Attempts to read lines without throwing an error."""
1749    try:
1750        lines = handle.readlines(hint)
1751    except OSError:
1752        lines = []
1753    return lines
1754
1755
1756def safe_readable(handle):
1757    """Attempts to find if the handle is readable without throwing an error."""
1758    try:
1759        status = handle.readable()
1760    except (OSError, ValueError):
1761        status = False
1762    return status
1763
1764
1765def update_fg_process_group(pipeline_group, background):
1766    if background:
1767        return False
1768    if not ON_POSIX:
1769        return False
1770    env = builtins.__xonsh_env__
1771    if not env.get("XONSH_INTERACTIVE"):
1772        return False
1773    return give_terminal_to(pipeline_group)
1774
1775
1776class CommandPipeline:
1777    """Represents a subprocess-mode command pipeline."""
1778
1779    attrnames = (
1780        "stdin",
1781        "stdout",
1782        "stderr",
1783        "pid",
1784        "returncode",
1785        "args",
1786        "alias",
1787        "stdin_redirect",
1788        "stdout_redirect",
1789        "stderr_redirect",
1790        "timestamps",
1791        "executed_cmd",
1792        "input",
1793        "output",
1794        "errors",
1795    )
1796
1797    nonblocking = (io.BytesIO, NonBlockingFDReader, ConsoleParallelReader)
1798
1799    def __init__(self, specs):
1800        """
1801        Parameters
1802        ----------
1803        specs : list of SubprocSpec
1804            Process specifications
1805
1806        Attributes
1807        ----------
1808        spec : SubprocSpec
1809            The last specification in specs
1810        proc : Popen-like
1811            The process in procs
1812        ended : bool
1813            Boolean for if the command has stopped executing.
1814        input : str
1815            A string of the standard input.
1816        output : str
1817            A string of the standard output.
1818        errors : str
1819            A string of the standard error.
1820        lines : list of str
1821            The output lines
1822        starttime : floats or None
1823            Pipeline start timestamp.
1824        """
1825        self.starttime = None
1826        self.ended = False
1827        self.procs = []
1828        self.specs = specs
1829        self.spec = specs[-1]
1830        self.captured = specs[-1].captured
1831        self.input = self._output = self.errors = self.endtime = None
1832        self._closed_handle_cache = {}
1833        self.lines = []
1834        self._stderr_prefix = self._stderr_postfix = None
1835        self.term_pgid = None
1836
1837        background = self.spec.background
1838        pipeline_group = None
1839        for spec in specs:
1840            if self.starttime is None:
1841                self.starttime = time.time()
1842            try:
1843                proc = spec.run(pipeline_group=pipeline_group)
1844            except Exception:
1845                print_exception()
1846                self._return_terminal()
1847                self.proc = None
1848                return
1849            if (
1850                proc.pid
1851                and pipeline_group is None
1852                and not spec.is_proxy
1853                and self.captured != "object"
1854            ):
1855                pipeline_group = proc.pid
1856                if update_fg_process_group(pipeline_group, background):
1857                    self.term_pgid = pipeline_group
1858            self.procs.append(proc)
1859        self.proc = self.procs[-1]
1860
1861    def __repr__(self):
1862        s = self.__class__.__name__ + "("
1863        s += ", ".join(a + "=" + str(getattr(self, a)) for a in self.attrnames)
1864        s += ")"
1865        return s
1866
1867    def __bool__(self):
1868        return self.returncode == 0
1869
1870    def __len__(self):
1871        return len(self.procs)
1872
1873    def __iter__(self):
1874        """Iterates through stdout and returns the lines, converting to
1875        strings and universal newlines if needed.
1876        """
1877        if self.ended:
1878            yield from iter(self.lines)
1879        else:
1880            yield from self.tee_stdout()
1881
1882    def iterraw(self):
1883        """Iterates through the last stdout, and returns the lines
1884        exactly as found.
1885        """
1886        # get appropriate handles
1887        spec = self.spec
1888        proc = self.proc
1889        if proc is None:
1890            return
1891        timeout = builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY")
1892        # get the correct stdout
1893        stdout = proc.stdout
1894        if (
1895            stdout is None or spec.stdout is None or not safe_readable(stdout)
1896        ) and spec.captured_stdout is not None:
1897            stdout = spec.captured_stdout
1898        if hasattr(stdout, "buffer"):
1899            stdout = stdout.buffer
1900        if stdout is not None and not isinstance(stdout, self.nonblocking):
1901            stdout = NonBlockingFDReader(stdout.fileno(), timeout=timeout)
1902        if (
1903            not stdout
1904            or self.captured == "stdout"
1905            or not safe_readable(stdout)
1906            or not spec.threadable
1907        ):
1908            # we get here if the process is not threadable or the
1909            # class is the real Popen
1910            PrevProcCloser(pipeline=self)
1911            task = wait_for_active_job()
1912            if task is None or task["status"] != "stopped":
1913                proc.wait()
1914                self._endtime()
1915                if self.captured == "object":
1916                    self.end(tee_output=False)
1917                elif self.captured == "hiddenobject" and stdout:
1918                    b = stdout.read()
1919                    lines = b.splitlines(keepends=True)
1920                    yield from lines
1921                    self.end(tee_output=False)
1922                elif self.captured == "stdout":
1923                    b = stdout.read()
1924                    s = self._decode_uninew(b, universal_newlines=True)
1925                    self.lines = s.splitlines(keepends=True)
1926            return
1927        # get the correct stderr
1928        stderr = proc.stderr
1929        if (
1930            stderr is None or spec.stderr is None or not safe_readable(stderr)
1931        ) and spec.captured_stderr is not None:
1932            stderr = spec.captured_stderr
1933        if hasattr(stderr, "buffer"):
1934            stderr = stderr.buffer
1935        if stderr is not None and not isinstance(stderr, self.nonblocking):
1936            stderr = NonBlockingFDReader(stderr.fileno(), timeout=timeout)
1937        # read from process while it is running
1938        check_prev_done = len(self.procs) == 1
1939        prev_end_time = None
1940        i = j = cnt = 1
1941        while proc.poll() is None:
1942            if getattr(proc, "suspended", False):
1943                return
1944            elif getattr(proc, "in_alt_mode", False):
1945                time.sleep(0.1)  # probably not leaving any time soon
1946                continue
1947            elif not check_prev_done:
1948                # In the case of pipelines with more than one command
1949                # we should give the commands a little time
1950                # to start up fully. This is particularly true for
1951                # GNU Parallel, which has a long startup time.
1952                pass
1953            elif self._prev_procs_done():
1954                self._close_prev_procs()
1955                proc.prevs_are_closed = True
1956                break
1957            stdout_lines = safe_readlines(stdout, 1024)
1958            i = len(stdout_lines)
1959            if i != 0:
1960                yield from stdout_lines
1961            stderr_lines = safe_readlines(stderr, 1024)
1962            j = len(stderr_lines)
1963            if j != 0:
1964                self.stream_stderr(stderr_lines)
1965            if not check_prev_done:
1966                # if we are piping...
1967                if stdout_lines or stderr_lines:
1968                    # see if we have some output.
1969                    check_prev_done = True
1970                elif prev_end_time is None:
1971                    # or see if we already know that the next-to-last
1972                    # proc in the pipeline has ended.
1973                    if self._prev_procs_done():
1974                        # if it has, record the time
1975                        prev_end_time = time.time()
1976                elif time.time() - prev_end_time >= 0.1:
1977                    # if we still don't have any output, even though the
1978                    # next-to-last proc has finished, wait a bit to make
1979                    # sure we have fully started up, etc.
1980                    check_prev_done = True
1981            # this is for CPU usage
1982            if i + j == 0:
1983                cnt = min(cnt + 1, 1000)
1984            else:
1985                cnt = 1
1986            time.sleep(timeout * cnt)
1987        # read from process now that it is over
1988        yield from safe_readlines(stdout)
1989        self.stream_stderr(safe_readlines(stderr))
1990        proc.wait()
1991        self._endtime()
1992        yield from safe_readlines(stdout)
1993        self.stream_stderr(safe_readlines(stderr))
1994        if self.captured == "object":
1995            self.end(tee_output=False)
1996
1997    def itercheck(self):
1998        """Iterates through the command lines and throws an error if the
1999        returncode is non-zero.
2000        """
2001        yield from self
2002        if self.returncode:
2003            # I included self, as providing access to stderr and other details
2004            # useful when instance isn't assigned to a variable in the shell.
2005            raise XonshCalledProcessError(
2006                self.returncode, self.executed_cmd, self.stdout, self.stderr, self
2007            )
2008
2009    def tee_stdout(self):
2010        """Writes the process stdout to the output variable, line-by-line, and
2011        yields each line. This may optionally accept lines (in bytes) to iterate
2012        over, in which case it does not call iterraw().
2013        """
2014        env = builtins.__xonsh_env__
2015        enc = env.get("XONSH_ENCODING")
2016        err = env.get("XONSH_ENCODING_ERRORS")
2017        lines = self.lines
2018        stream = self.captured not in STDOUT_CAPTURE_KINDS
2019        if stream and not self.spec.stdout:
2020            stream = False
2021        stdout_has_buffer = hasattr(sys.stdout, "buffer")
2022        nl = b"\n"
2023        cr = b"\r"
2024        crnl = b"\r\n"
2025        for line in self.iterraw():
2026            # write to stdout line ASAP, if needed
2027            if stream:
2028                if stdout_has_buffer:
2029                    sys.stdout.buffer.write(line)
2030                else:
2031                    sys.stdout.write(line.decode(encoding=enc, errors=err))
2032                sys.stdout.flush()
2033            # do some munging of the line before we return it
2034            if line.endswith(crnl):
2035                line = line[:-2] + nl
2036            elif line.endswith(cr):
2037                line = line[:-1] + nl
2038            line = RE_HIDE_ESCAPE.sub(b"", line)
2039            line = line.decode(encoding=enc, errors=err)
2040            # tee it up!
2041            lines.append(line)
2042            yield line
2043
2044    def stream_stderr(self, lines):
2045        """Streams lines to sys.stderr and the errors attribute."""
2046        if not lines:
2047            return
2048        env = builtins.__xonsh_env__
2049        enc = env.get("XONSH_ENCODING")
2050        err = env.get("XONSH_ENCODING_ERRORS")
2051        b = b"".join(lines)
2052        if self.stderr_prefix:
2053            b = self.stderr_prefix + b
2054        if self.stderr_postfix:
2055            b += self.stderr_postfix
2056        stderr_has_buffer = hasattr(sys.stderr, "buffer")
2057        # write bytes to std stream
2058        if stderr_has_buffer:
2059            sys.stderr.buffer.write(b)
2060        else:
2061            sys.stderr.write(b.decode(encoding=enc, errors=err))
2062        sys.stderr.flush()
2063        # do some munging of the line before we save it to the attr
2064        b = b.replace(b"\r\n", b"\n").replace(b"\r", b"\n")
2065        b = RE_HIDE_ESCAPE.sub(b"", b)
2066        env = builtins.__xonsh_env__
2067        s = b.decode(
2068            encoding=env.get("XONSH_ENCODING"), errors=env.get("XONSH_ENCODING_ERRORS")
2069        )
2070        # set the errors
2071        if self.errors is None:
2072            self.errors = s
2073        else:
2074            self.errors += s
2075
2076    def _decode_uninew(self, b, universal_newlines=None):
2077        """Decode bytes into a str and apply universal newlines as needed."""
2078        if not b:
2079            return ""
2080        if isinstance(b, bytes):
2081            env = builtins.__xonsh_env__
2082            s = b.decode(
2083                encoding=env.get("XONSH_ENCODING"),
2084                errors=env.get("XONSH_ENCODING_ERRORS"),
2085            )
2086        else:
2087            s = b
2088        if universal_newlines or self.spec.universal_newlines:
2089            s = s.replace("\r\n", "\n").replace("\r", "\n")
2090        return s
2091
2092    #
2093    # Ending methods
2094    #
2095
2096    def end(self, tee_output=True):
2097        """
2098        End the pipeline, return the controlling terminal if needed.
2099
2100        Main things done in self._end().
2101        """
2102        if self.ended:
2103            return
2104        self._end(tee_output=tee_output)
2105        self._return_terminal()
2106
2107    def _end(self, tee_output):
2108        """Waits for the command to complete and then runs any closing and
2109        cleanup procedures that need to be run.
2110        """
2111        if tee_output:
2112            for _ in self.tee_stdout():
2113                pass
2114        self._endtime()
2115        # since we are driven by getting output, input may not be available
2116        # until the command has completed.
2117        self._set_input()
2118        self._close_prev_procs()
2119        self._close_proc()
2120        self._check_signal()
2121        self._apply_to_history()
2122        self.ended = True
2123        self._raise_subproc_error()
2124
2125    def _return_terminal(self):
2126        if ON_WINDOWS or not ON_POSIX:
2127            return
2128        pgid = os.getpgid(0)
2129        if self.term_pgid is None or pgid == self.term_pgid:
2130            return
2131        if give_terminal_to(pgid):  # if gave term succeed
2132            self.term_pgid = pgid
2133            if hasattr(builtins, "__xonsh_shell__"):
2134                # restoring sanity could probably be called whenever we return
2135                # control to the shell. But it only seems to matter after a
2136                # ^Z event. This *has* to be called after we give the terminal
2137                # back to the shell.
2138                builtins.__xonsh_shell__.shell.restore_tty_sanity()
2139
2140    def resume(self, job, tee_output=True):
2141        self.ended = False
2142        if give_terminal_to(job["pgrp"]):
2143            self.term_pgid = job["pgrp"]
2144        _continue(job)
2145        self.end(tee_output=tee_output)
2146
2147    def _endtime(self):
2148        """Sets the closing timestamp if it hasn't been already."""
2149        if self.endtime is None:
2150            self.endtime = time.time()
2151
2152    def _safe_close(self, handle):
2153        safe_fdclose(handle, cache=self._closed_handle_cache)
2154
2155    def _prev_procs_done(self):
2156        """Boolean for if all previous processes have completed. If there
2157        is only a single process in the pipeline, this returns False.
2158        """
2159        any_running = False
2160        for s, p in zip(self.specs[:-1], self.procs[:-1]):
2161            if p.poll() is None:
2162                any_running = True
2163                continue
2164            self._safe_close(s.stdin)
2165            self._safe_close(s.stdout)
2166            self._safe_close(s.stderr)
2167            if p is None:
2168                continue
2169            self._safe_close(p.stdin)
2170            self._safe_close(p.stdout)
2171            self._safe_close(p.stderr)
2172        return False if any_running else (len(self) > 1)
2173
2174    def _close_prev_procs(self):
2175        """Closes all but the last proc's stdout."""
2176        for s, p in zip(self.specs[:-1], self.procs[:-1]):
2177            self._safe_close(s.stdin)
2178            self._safe_close(s.stdout)
2179            self._safe_close(s.stderr)
2180            if p is None:
2181                continue
2182            self._safe_close(p.stdin)
2183            self._safe_close(p.stdout)
2184            self._safe_close(p.stderr)
2185
2186    def _close_proc(self):
2187        """Closes last proc's stdout."""
2188        s = self.spec
2189        p = self.proc
2190        self._safe_close(s.stdin)
2191        self._safe_close(s.stdout)
2192        self._safe_close(s.stderr)
2193        self._safe_close(s.captured_stdout)
2194        self._safe_close(s.captured_stderr)
2195        if p is None:
2196            return
2197        self._safe_close(p.stdin)
2198        self._safe_close(p.stdout)
2199        self._safe_close(p.stderr)
2200
2201    def _set_input(self):
2202        """Sets the input variable."""
2203        if self.proc is None:
2204            return
2205        stdin = self.proc.stdin
2206        if (
2207            stdin is None
2208            or isinstance(stdin, int)
2209            or stdin.closed
2210            or not stdin.seekable()
2211            or not safe_readable(stdin)
2212        ):
2213            input = b""
2214        else:
2215            stdin.seek(0)
2216            input = stdin.read()
2217        self.input = self._decode_uninew(input)
2218
2219    def _check_signal(self):
2220        """Checks if a signal was received and issues a message."""
2221        proc_signal = getattr(self.proc, "signal", None)
2222        if proc_signal is None:
2223            return
2224        sig, core = proc_signal
2225        sig_str = SIGNAL_MESSAGES.get(sig)
2226        if sig_str:
2227            if core:
2228                sig_str += " (core dumped)"
2229            print(sig_str, file=sys.stderr)
2230            if self.errors is not None:
2231                self.errors += sig_str + "\n"
2232
2233    def _apply_to_history(self):
2234        """Applies the results to the current history object."""
2235        hist = builtins.__xonsh_history__
2236        if hist is not None:
2237            hist.last_cmd_rtn = 1 if self.proc is None else self.proc.returncode
2238
2239    def _raise_subproc_error(self):
2240        """Raises a subprocess error, if we are supposed to."""
2241        spec = self.spec
2242        rtn = self.returncode
2243        if (
2244            not spec.is_proxy
2245            and rtn is not None
2246            and rtn > 0
2247            and builtins.__xonsh_env__.get("RAISE_SUBPROC_ERROR")
2248        ):
2249            try:
2250                raise subprocess.CalledProcessError(rtn, spec.cmd, output=self.output)
2251            finally:
2252                # this is need to get a working terminal in interactive mode
2253                self._return_terminal()
2254
2255    #
2256    # Properties
2257    #
2258
2259    @property
2260    def stdin(self):
2261        """Process stdin."""
2262        return self.proc.stdin
2263
2264    @property
2265    def stdout(self):
2266        """Process stdout."""
2267        return self.proc.stdout
2268
2269    @property
2270    def stderr(self):
2271        """Process stderr."""
2272        return self.proc.stderr
2273
2274    @property
2275    def inp(self):
2276        """Creates normalized input string from args."""
2277        return " ".join(self.args)
2278
2279    @property
2280    def output(self):
2281        """Non-blocking, lazy access to output"""
2282        if self.ended:
2283            if self._output is None:
2284                self._output = "".join(self.lines)
2285            return self._output
2286        else:
2287            return "".join(self.lines)
2288
2289    @property
2290    def out(self):
2291        """Output value as a str."""
2292        self.end()
2293        return self.output
2294
2295    @property
2296    def err(self):
2297        """Error messages as a string."""
2298        self.end()
2299        return self.errors
2300
2301    @property
2302    def pid(self):
2303        """Process identifier."""
2304        return self.proc.pid
2305
2306    @property
2307    def returncode(self):
2308        """Process return code, waits until command is completed."""
2309        self.end()
2310        if self.proc is None:
2311            return 1
2312        return self.proc.returncode
2313
2314    rtn = returncode
2315
2316    @property
2317    def args(self):
2318        """Arguments to the process."""
2319        return self.spec.args
2320
2321    @property
2322    def rtn(self):
2323        """Alias to return code."""
2324        return self.returncode
2325
2326    @property
2327    def alias(self):
2328        """Alias the process used."""
2329        return self.spec.alias
2330
2331    @property
2332    def stdin_redirect(self):
2333        """Redirection used for stdin."""
2334        stdin = self.spec.stdin
2335        name = getattr(stdin, "name", "<stdin>")
2336        mode = getattr(stdin, "mode", "r")
2337        return [name, mode]
2338
2339    @property
2340    def stdout_redirect(self):
2341        """Redirection used for stdout."""
2342        stdout = self.spec.stdout
2343        name = getattr(stdout, "name", "<stdout>")
2344        mode = getattr(stdout, "mode", "a")
2345        return [name, mode]
2346
2347    @property
2348    def stderr_redirect(self):
2349        """Redirection used for stderr."""
2350        stderr = self.spec.stderr
2351        name = getattr(stderr, "name", "<stderr>")
2352        mode = getattr(stderr, "mode", "r")
2353        return [name, mode]
2354
2355    @property
2356    def timestamps(self):
2357        """The start and end time stamps."""
2358        return [self.starttime, self.endtime]
2359
2360    @property
2361    def executed_cmd(self):
2362        """The resolve and executed command."""
2363        return self.spec.cmd
2364
2365    @property
2366    def stderr_prefix(self):
2367        """Prefix to print in front of stderr, as bytes."""
2368        p = self._stderr_prefix
2369        if p is None:
2370            env = builtins.__xonsh_env__
2371            t = env.get("XONSH_STDERR_PREFIX")
2372            s = format_std_prepost(t, env=env)
2373            p = s.encode(
2374                encoding=env.get("XONSH_ENCODING"),
2375                errors=env.get("XONSH_ENCODING_ERRORS"),
2376            )
2377            self._stderr_prefix = p
2378        return p
2379
2380    @property
2381    def stderr_postfix(self):
2382        """Postfix to print after stderr, as bytes."""
2383        p = self._stderr_postfix
2384        if p is None:
2385            env = builtins.__xonsh_env__
2386            t = env.get("XONSH_STDERR_POSTFIX")
2387            s = format_std_prepost(t, env=env)
2388            p = s.encode(
2389                encoding=env.get("XONSH_ENCODING"),
2390                errors=env.get("XONSH_ENCODING_ERRORS"),
2391            )
2392            self._stderr_postfix = p
2393        return p
2394
2395
2396class HiddenCommandPipeline(CommandPipeline):
2397    def __repr__(self):
2398        return ""
2399
2400
2401def pause_call_resume(p, f, *args, **kwargs):
2402    """For a process p, this will call a function f with the remaining args and
2403    and kwargs. If the process cannot accept signals, the function will be called.
2404
2405    Parameters
2406    ----------
2407    p : Popen object or similar
2408    f : callable
2409    args : remaining arguments
2410    kwargs : keyword arguments
2411    """
2412    can_send_signal = (
2413        hasattr(p, "send_signal") and ON_POSIX and not ON_MSYS and not ON_CYGWIN
2414    )
2415    if can_send_signal:
2416        p.send_signal(signal.SIGSTOP)
2417    try:
2418        f(*args, **kwargs)
2419    except Exception:
2420        pass
2421    if can_send_signal:
2422        p.send_signal(signal.SIGCONT)
2423
2424
2425class PrevProcCloser(threading.Thread):
2426    """Previous process closer thread for pipelines whose last command
2427    is itself unthreadable. This makes sure that the pipeline is
2428    driven forward and does not deadlock.
2429    """
2430
2431    def __init__(self, pipeline):
2432        """
2433        Parameters
2434        ----------
2435        pipeline : CommandPipeline
2436            The pipeline whose prev procs we should close.
2437        """
2438        self.pipeline = pipeline
2439        super().__init__()
2440        self.daemon = True
2441        self.start()
2442
2443    def run(self):
2444        """Runs the closing algorithm."""
2445        pipeline = self.pipeline
2446        check_prev_done = len(pipeline.procs) == 1
2447        if check_prev_done:
2448            return
2449        proc = pipeline.proc
2450        prev_end_time = None
2451        timeout = builtins.__xonsh_env__.get("XONSH_PROC_FREQUENCY")
2452        sleeptime = min(timeout * 1000, 0.1)
2453        while proc.poll() is None:
2454            if not check_prev_done:
2455                # In the case of pipelines with more than one command
2456                # we should give the commands a little time
2457                # to start up fully. This is particularly true for
2458                # GNU Parallel, which has a long startup time.
2459                pass
2460            elif pipeline._prev_procs_done():
2461                pipeline._close_prev_procs()
2462                proc.prevs_are_closed = True
2463                break
2464            if not check_prev_done:
2465                # if we are piping...
2466                if prev_end_time is None:
2467                    # or see if we already know that the next-to-last
2468                    # proc in the pipeline has ended.
2469                    if pipeline._prev_procs_done():
2470                        # if it has, record the time
2471                        prev_end_time = time.time()
2472                elif time.time() - prev_end_time >= 0.1:
2473                    # if we still don't have any output, even though the
2474                    # next-to-last proc has finished, wait a bit to make
2475                    # sure we have fully started up, etc.
2476                    check_prev_done = True
2477            # this is for CPU usage
2478            time.sleep(sleeptime)
2479