1""" 2 :codeauthor: Pedro Algarvio (pedro@algarvio.me) 3 4 5 salt.utils.nb_popen 6 ~~~~~~~~~~~~~~~~~~~ 7 8 Non blocking subprocess Popen. 9 10 This functionality has been adapted to work on windows following the recipe 11 found on: 12 13 http://code.activestate.com/recipes/440554/ 14""" 15 16import errno 17import logging 18import os 19import select 20import subprocess 21import sys 22import tempfile 23import time 24 25mswindows = sys.platform == "win32" 26 27try: 28 from win32file import ReadFile, WriteFile 29 from win32pipe import PeekNamedPipe 30 import msvcrt 31except ImportError: 32 import fcntl 33 34log = logging.getLogger(__name__) 35 36 37class NonBlockingPopen(subprocess.Popen): 38 39 # _stdin_logger_name_ = 'salt.utils.nb_popen.STDIN.PID-{pid}' 40 _stdout_logger_name_ = "salt.utils.nb_popen.STDOUT.PID-{pid}" 41 _stderr_logger_name_ = "salt.utils.nb_popen.STDERR.PID-{pid}" 42 43 def __init__(self, *args, **kwargs): 44 self.stream_stds = kwargs.pop("stream_stds", False) 45 46 # Half a megabyte in memory is more than enough to start writing to 47 # a temporary file. 48 self.max_size_in_mem = kwargs.pop("max_size_in_mem", 512000) 49 50 # Let's configure the std{in, out,err} logging handler names 51 # self._stdin_logger_name_ = kwargs.pop( 52 # 'stdin_logger_name', self._stdin_logger_name_ 53 # ) 54 self._stdout_logger_name_ = kwargs.pop( 55 "stdout_logger_name", self._stdout_logger_name_ 56 ) 57 self._stderr_logger_name_ = kwargs.pop( 58 "stderr_logger_name", self._stderr_logger_name_ 59 ) 60 61 logging_command = kwargs.pop("logging_command", None) 62 stderr = kwargs.get("stderr", None) 63 64 super().__init__(*args, **kwargs) 65 66 # self._stdin_logger = logging.getLogger( 67 # self._stdin_logger_name_.format(pid=self.pid) 68 # ) 69 70 self.stdout_buff = tempfile.SpooledTemporaryFile(self.max_size_in_mem) 71 self._stdout_logger = logging.getLogger( 72 self._stdout_logger_name_.format(pid=self.pid) 73 ) 74 75 if stderr is subprocess.STDOUT: 76 self.stderr_buff = self.stdout_buff 77 self._stderr_logger = self._stdout_logger 78 else: 79 self.stderr_buff = tempfile.SpooledTemporaryFile(self.max_size_in_mem) 80 self._stderr_logger = logging.getLogger( 81 self._stderr_logger_name_.format(pid=self.pid) 82 ) 83 84 log.info( 85 "Running command under pid %s: '%s'", 86 self.pid, 87 args if logging_command is None else logging_command, 88 ) 89 90 def recv(self, maxsize=None): 91 return self._recv("stdout", maxsize) 92 93 def recv_err(self, maxsize=None): 94 return self._recv("stderr", maxsize) 95 96 def send_recv(self, input="", maxsize=None): 97 return self.send(input), self.recv(maxsize), self.recv_err(maxsize) 98 99 def get_conn_maxsize(self, which, maxsize): 100 if maxsize is None: 101 maxsize = 1024 102 elif maxsize < 1: 103 maxsize = 1 104 return getattr(self, which), maxsize 105 106 def _close(self, which): 107 getattr(self, which).close() 108 setattr(self, which, None) 109 110 if mswindows: 111 112 def send(self, input): 113 if not self.stdin: 114 return None 115 116 try: 117 x = msvcrt.get_osfhandle(self.stdin.fileno()) 118 (errCode, written) = WriteFile(x, input) 119 # self._stdin_logger.debug(input.rstrip()) 120 except ValueError: 121 return self._close("stdin") 122 except (subprocess.pywintypes.error, Exception) as why: 123 if why.args[0] in (109, errno.ESHUTDOWN): 124 return self._close("stdin") 125 raise 126 127 return written 128 129 def _recv(self, which, maxsize): 130 conn, maxsize = self.get_conn_maxsize(which, maxsize) 131 if conn is None: 132 return None 133 134 try: 135 x = msvcrt.get_osfhandle(conn.fileno()) 136 (read, nAvail, nMessage) = PeekNamedPipe(x, 0) 137 if maxsize < nAvail: 138 nAvail = maxsize 139 if nAvail > 0: 140 (errCode, read) = ReadFile(x, nAvail, None) 141 except ValueError: 142 return self._close(which) 143 except (subprocess.pywintypes.error, Exception) as why: 144 if why.args[0] in (109, errno.ESHUTDOWN): 145 return self._close(which) 146 raise 147 148 getattr(self, "{}_buff".format(which)).write(read) 149 getattr(self, "_{}_logger".format(which)).debug(read.rstrip()) 150 if self.stream_stds: 151 getattr(sys, which).write(read) 152 153 if self.universal_newlines: 154 read = self._translate_newlines(read) 155 return read 156 157 else: 158 159 def send(self, input): 160 if not self.stdin: 161 return None 162 163 if not select.select([], [self.stdin], [], 0)[1]: 164 return 0 165 166 try: 167 written = os.write(self.stdin.fileno(), input) 168 # self._stdin_logger.debug(input.rstrip()) 169 except OSError as why: 170 if why.args[0] == errno.EPIPE: # broken pipe 171 return self._close("stdin") 172 raise 173 174 return written 175 176 def _recv(self, which, maxsize): 177 conn, maxsize = self.get_conn_maxsize(which, maxsize) 178 if conn is None: 179 return None 180 181 flags = fcntl.fcntl(conn, fcntl.F_GETFL) 182 if not conn.closed: 183 fcntl.fcntl(conn, fcntl.F_SETFL, flags | os.O_NONBLOCK) 184 185 try: 186 if not select.select([conn], [], [], 0)[0]: 187 return "" 188 189 buff = conn.read(maxsize) 190 if not buff: 191 return self._close(which) 192 193 if self.universal_newlines: 194 buff = self._translate_newlines(buff) 195 196 getattr(self, "{}_buff".format(which)).write(buff) 197 getattr(self, "_{}_logger".format(which)).debug(buff.rstrip()) 198 if self.stream_stds: 199 getattr(sys, which).write(buff) 200 201 return buff 202 finally: 203 if not conn.closed: 204 fcntl.fcntl(conn, fcntl.F_SETFL, flags) 205 206 def poll_and_read_until_finish(self, interval=0.01): 207 silent_iterations = 0 208 while self.poll() is None: 209 if self.stdout is not None: 210 silent_iterations = 0 211 self.recv() 212 213 if self.stderr is not None: 214 silent_iterations = 0 215 self.recv_err() 216 217 silent_iterations += 1 218 219 if silent_iterations > 100: 220 silent_iterations = 0 221 (stdoutdata, stderrdata) = self.communicate() 222 if stdoutdata: 223 log.debug(stdoutdata) 224 if stderrdata: 225 log.error(stderrdata) 226 time.sleep(interval) 227 228 def communicate(self, input=None): # pylint: disable=arguments-differ 229 super().communicate(input) 230 self.stdout_buff.flush() 231 self.stdout_buff.seek(0) 232 self.stderr_buff.flush() 233 self.stderr_buff.seek(0) 234 return self.stdout_buff.read(), self.stderr_buff.read() 235