1"""
2    :codeauthor: Pedro Algarvio (pedro@algarvio.me)
3
4
5    salt.utils.nb_popen
6    ~~~~~~~~~~~~~~~~~~~
7
8    Non blocking subprocess Popen.
9
10    This functionality has been adapted to work on windows following the recipe
11    found on:
12
13        http://code.activestate.com/recipes/440554/
14"""
15
16import errno
17import logging
18import os
19import select
20import subprocess
21import sys
22import tempfile
23import time
24
25mswindows = sys.platform == "win32"
26
27try:
28    from win32file import ReadFile, WriteFile
29    from win32pipe import PeekNamedPipe
30    import msvcrt
31except ImportError:
32    import fcntl
33
34log = logging.getLogger(__name__)
35
36
37class NonBlockingPopen(subprocess.Popen):
38
39    # _stdin_logger_name_ = 'salt.utils.nb_popen.STDIN.PID-{pid}'
40    _stdout_logger_name_ = "salt.utils.nb_popen.STDOUT.PID-{pid}"
41    _stderr_logger_name_ = "salt.utils.nb_popen.STDERR.PID-{pid}"
42
43    def __init__(self, *args, **kwargs):
44        self.stream_stds = kwargs.pop("stream_stds", False)
45
46        # Half a megabyte in memory is more than enough to start writing to
47        # a temporary file.
48        self.max_size_in_mem = kwargs.pop("max_size_in_mem", 512000)
49
50        # Let's configure the std{in, out,err} logging handler names
51        # self._stdin_logger_name_ = kwargs.pop(
52        #    'stdin_logger_name', self._stdin_logger_name_
53        # )
54        self._stdout_logger_name_ = kwargs.pop(
55            "stdout_logger_name", self._stdout_logger_name_
56        )
57        self._stderr_logger_name_ = kwargs.pop(
58            "stderr_logger_name", self._stderr_logger_name_
59        )
60
61        logging_command = kwargs.pop("logging_command", None)
62        stderr = kwargs.get("stderr", None)
63
64        super().__init__(*args, **kwargs)
65
66        # self._stdin_logger = logging.getLogger(
67        #    self._stdin_logger_name_.format(pid=self.pid)
68        # )
69
70        self.stdout_buff = tempfile.SpooledTemporaryFile(self.max_size_in_mem)
71        self._stdout_logger = logging.getLogger(
72            self._stdout_logger_name_.format(pid=self.pid)
73        )
74
75        if stderr is subprocess.STDOUT:
76            self.stderr_buff = self.stdout_buff
77            self._stderr_logger = self._stdout_logger
78        else:
79            self.stderr_buff = tempfile.SpooledTemporaryFile(self.max_size_in_mem)
80            self._stderr_logger = logging.getLogger(
81                self._stderr_logger_name_.format(pid=self.pid)
82            )
83
84        log.info(
85            "Running command under pid %s: '%s'",
86            self.pid,
87            args if logging_command is None else logging_command,
88        )
89
90    def recv(self, maxsize=None):
91        return self._recv("stdout", maxsize)
92
93    def recv_err(self, maxsize=None):
94        return self._recv("stderr", maxsize)
95
96    def send_recv(self, input="", maxsize=None):
97        return self.send(input), self.recv(maxsize), self.recv_err(maxsize)
98
99    def get_conn_maxsize(self, which, maxsize):
100        if maxsize is None:
101            maxsize = 1024
102        elif maxsize < 1:
103            maxsize = 1
104        return getattr(self, which), maxsize
105
106    def _close(self, which):
107        getattr(self, which).close()
108        setattr(self, which, None)
109
110    if mswindows:
111
112        def send(self, input):
113            if not self.stdin:
114                return None
115
116            try:
117                x = msvcrt.get_osfhandle(self.stdin.fileno())
118                (errCode, written) = WriteFile(x, input)
119                # self._stdin_logger.debug(input.rstrip())
120            except ValueError:
121                return self._close("stdin")
122            except (subprocess.pywintypes.error, Exception) as why:
123                if why.args[0] in (109, errno.ESHUTDOWN):
124                    return self._close("stdin")
125                raise
126
127            return written
128
129        def _recv(self, which, maxsize):
130            conn, maxsize = self.get_conn_maxsize(which, maxsize)
131            if conn is None:
132                return None
133
134            try:
135                x = msvcrt.get_osfhandle(conn.fileno())
136                (read, nAvail, nMessage) = PeekNamedPipe(x, 0)
137                if maxsize < nAvail:
138                    nAvail = maxsize
139                if nAvail > 0:
140                    (errCode, read) = ReadFile(x, nAvail, None)
141            except ValueError:
142                return self._close(which)
143            except (subprocess.pywintypes.error, Exception) as why:
144                if why.args[0] in (109, errno.ESHUTDOWN):
145                    return self._close(which)
146                raise
147
148            getattr(self, "{}_buff".format(which)).write(read)
149            getattr(self, "_{}_logger".format(which)).debug(read.rstrip())
150            if self.stream_stds:
151                getattr(sys, which).write(read)
152
153            if self.universal_newlines:
154                read = self._translate_newlines(read)
155            return read
156
157    else:
158
159        def send(self, input):
160            if not self.stdin:
161                return None
162
163            if not select.select([], [self.stdin], [], 0)[1]:
164                return 0
165
166            try:
167                written = os.write(self.stdin.fileno(), input)
168                # self._stdin_logger.debug(input.rstrip())
169            except OSError as why:
170                if why.args[0] == errno.EPIPE:  # broken pipe
171                    return self._close("stdin")
172                raise
173
174            return written
175
176        def _recv(self, which, maxsize):
177            conn, maxsize = self.get_conn_maxsize(which, maxsize)
178            if conn is None:
179                return None
180
181            flags = fcntl.fcntl(conn, fcntl.F_GETFL)
182            if not conn.closed:
183                fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK)
184
185            try:
186                if not select.select([conn], [], [], 0)[0]:
187                    return ""
188
189                buff = conn.read(maxsize)
190                if not buff:
191                    return self._close(which)
192
193                if self.universal_newlines:
194                    buff = self._translate_newlines(buff)
195
196                getattr(self, "{}_buff".format(which)).write(buff)
197                getattr(self, "_{}_logger".format(which)).debug(buff.rstrip())
198                if self.stream_stds:
199                    getattr(sys, which).write(buff)
200
201                return buff
202            finally:
203                if not conn.closed:
204                    fcntl.fcntl(conn, fcntl.F_SETFL, flags)
205
206    def poll_and_read_until_finish(self, interval=0.01):
207        silent_iterations = 0
208        while self.poll() is None:
209            if self.stdout is not None:
210                silent_iterations = 0
211                self.recv()
212
213            if self.stderr is not None:
214                silent_iterations = 0
215                self.recv_err()
216
217            silent_iterations += 1
218
219            if silent_iterations > 100:
220                silent_iterations = 0
221                (stdoutdata, stderrdata) = self.communicate()
222                if stdoutdata:
223                    log.debug(stdoutdata)
224                if stderrdata:
225                    log.error(stderrdata)
226            time.sleep(interval)
227
228    def communicate(self, input=None):  # pylint: disable=arguments-differ
229        super().communicate(input)
230        self.stdout_buff.flush()
231        self.stdout_buff.seek(0)
232        self.stderr_buff.flush()
233        self.stderr_buff.seek(0)
234        return self.stdout_buff.read(), self.stderr_buff.read()
235