1# Copyright (C) 2010 Google Inc. All rights reserved. 2# 3# Redistribution and use in source and binary forms, with or without 4# modification, are permitted provided that the following conditions are 5# met: 6# 7# * Redistributions of source code must retain the above copyright 8# notice, this list of conditions and the following disclaimer. 9# * Redistributions in binary form must reproduce the above 10# copyright notice, this list of conditions and the following disclaimer 11# in the documentation and/or other materials provided with the 12# distribution. 13# * Neither the Google name nor the names of its 14# contributors may be used to endorse or promote products derived from 15# this software without specific prior written permission. 16# 17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS 18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT 19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR 20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT 21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, 22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT 23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, 24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY 25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT 26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE 27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE. 28 29"""Package that implements the ServerProcess wrapper class""" 30 31import errno 32import logging 33import re 34import signal 35import sys 36import time 37 38# Note that although win32 python does provide an implementation of 39# the win32 select API, it only works on sockets, and not on the named pipes 40# used by subprocess, so we have to use the native APIs directly. 41_quote_cmd = None 42 43if sys.platform == 'win32': 44 import msvcrt 45 import win32pipe 46 import win32file 47 import subprocess 48 _quote_cmd = subprocess.list2cmdline 49else: 50 import fcntl 51 import os 52 import pipes 53 import select 54 _quote_cmd = lambda cmdline: ' '.join(pipes.quote(arg) for arg in cmdline) 55 56 57_log = logging.getLogger(__name__) 58 59 60_trailing_spaces_re = re.compile('(.*[^ ])?( +)$') 61 62 63def quote_data(data): 64 txt = repr(data).replace('\\n', '\\n\n')[1:-1] 65 lines = [] 66 for l in txt.splitlines(): 67 m = _trailing_spaces_re.match(l) 68 if m: 69 l = m.group(1) + m.group(2).replace(' ', '\x20') 70 lines.append(l) 71 return lines 72 73 74class ServerProcess(object): 75 """This class provides a wrapper around a subprocess that 76 implements a simple request/response usage model. The primary benefit 77 is that reading responses takes a deadline, so that we don't ever block 78 indefinitely. The class also handles transparently restarting processes 79 as necessary to keep issuing commands. 80 """ 81 82 def __init__(self, port_obj, name, cmd, env=None, treat_no_data_as_crash=False, 83 more_logging=False): 84 self._port = port_obj 85 self._name = name # Should be the command name (e.g. content_shell, image_diff) 86 self._cmd = cmd 87 self._env = env 88 self._treat_no_data_as_crash = treat_no_data_as_crash 89 self._logging = more_logging 90 self._host = self._port.host 91 self._proc = None 92 self._pid = None 93 self._reset() 94 95 # See comment in imports for why we need the win32 APIs and can't just use select. 96 self._use_win32_apis = sys.platform == 'win32' 97 98 def name(self): 99 return self._name 100 101 def pid(self): 102 return self._pid 103 104 def _reset(self): 105 if getattr(self, '_proc', None): 106 if self._proc.stdin: 107 self._proc.stdin.close() 108 self._proc.stdin = None 109 if self._proc.stdout: 110 self._proc.stdout.close() 111 self._proc.stdout = None 112 if self._proc.stderr: 113 self._proc.stderr.close() 114 self._proc.stderr = None 115 116 self._proc = None 117 self._output = str() # bytesarray() once we require Python 2.6 118 self._error = str() # bytesarray() once we require Python 2.6 119 self._crashed = False 120 self.timed_out = False 121 122 def process_name(self): 123 return self._name 124 125 def _start(self): 126 if self._proc: 127 raise ValueError('%s already running' % self._name) 128 self._reset() 129 # close_fds is a workaround for http://bugs.python.org/issue2320 130 close_fds = not self._host.platform.is_win() 131 if self._logging: 132 env_str = '' 133 if self._env: 134 env_str += '\n'.join('%s=%s' % (k, v) for k, v in self._env.items()) + '\n' 135 _log.info('CMD: \n%s%s\n', env_str, _quote_cmd(self._cmd)) 136 proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE, 137 stdout=self._host.executive.PIPE, 138 stderr=self._host.executive.PIPE, 139 close_fds=close_fds, 140 env=self._env) 141 self._set_proc(proc) 142 143 def _set_proc(self, proc): 144 assert not self._proc 145 self._proc = proc 146 self._pid = self._proc.pid 147 if not self._use_win32_apis: 148 fd = self._proc.stdout.fileno() 149 fl = fcntl.fcntl(fd, fcntl.F_GETFL) 150 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 151 fd = self._proc.stderr.fileno() 152 fl = fcntl.fcntl(fd, fcntl.F_GETFL) 153 fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK) 154 155 def _handle_possible_interrupt(self): 156 """This routine checks to see if the process crashed or exited 157 because of a keyboard interrupt and raises KeyboardInterrupt 158 accordingly. 159 """ 160 # FIXME: Linux and Mac set the returncode to -signal.SIGINT if a 161 # subprocess is killed with a ctrl^C. Previous comments in this 162 # routine said that supposedly Windows returns 0xc000001d, but that's 163 # not what -1073741510 evaluates to. Figure out what the right value 164 # is for win32 here ... 165 if self._proc.returncode in (-1073741510, -signal.SIGINT): 166 raise KeyboardInterrupt 167 168 def poll(self): 169 """Check to see if the underlying process is running; returns None 170 if it still is (wrapper around subprocess.poll). 171 """ 172 if self._proc: 173 return self._proc.poll() 174 return None 175 176 def write(self, bytes): 177 """Write a request to the subprocess. The subprocess is (re-)start()'ed 178 if is not already running. 179 """ 180 if not self._proc: 181 self._start() 182 try: 183 self._log_data(' IN', bytes) 184 self._proc.stdin.write(bytes) 185 except IOError: 186 self.stop(0.0) 187 # stop() calls _reset(), so we have to set crashed to True after calling stop(). 188 self._crashed = True 189 190 def _pop_stdout_line_if_ready(self): 191 index_after_newline = self._output.find('\n') + 1 192 if index_after_newline > 0: 193 return self._pop_output_bytes(index_after_newline) 194 return None 195 196 def _pop_stderr_line_if_ready(self): 197 index_after_newline = self._error.find('\n') + 1 198 if index_after_newline > 0: 199 return self._pop_error_bytes(index_after_newline) 200 return None 201 202 def pop_all_buffered_stderr(self): 203 return self._pop_error_bytes(len(self._error)) 204 205 def read_stdout_line(self, deadline): 206 return self._read(deadline, self._pop_stdout_line_if_ready) 207 208 def read_stderr_line(self, deadline): 209 return self._read(deadline, self._pop_stderr_line_if_ready) 210 211 def read_either_stdout_or_stderr_line(self, deadline): 212 def retrieve_bytes_from_buffers(): 213 stdout_line = self._pop_stdout_line_if_ready() 214 if stdout_line: 215 return stdout_line, None 216 stderr_line = self._pop_stderr_line_if_ready() 217 if stderr_line: 218 return None, stderr_line 219 return None # Instructs the caller to keep waiting. 220 221 return_value = self._read(deadline, retrieve_bytes_from_buffers) 222 # FIXME: This is a bit of a hack around the fact that _read normally only 223 # returns one value, but this caller wants it to return two. 224 if return_value is None: 225 return None, None 226 return return_value 227 228 def read_stdout(self, deadline, size): 229 if size <= 0: 230 raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size) 231 232 def retrieve_bytes_from_stdout_buffer(): 233 if len(self._output) >= size: 234 return self._pop_output_bytes(size) 235 return None 236 237 return self._read(deadline, retrieve_bytes_from_stdout_buffer) 238 239 def _log(self, message): 240 # This is a bit of a hack, but we first log a blank line to avoid 241 # messing up the master process's output. 242 _log.info('') 243 _log.info(message) 244 245 def _log_data(self, prefix, data): 246 if self._logging and data and len(data): 247 for line in quote_data(data): 248 _log.info('%s: %s', prefix, line) 249 250 def _handle_timeout(self): 251 self.timed_out = True 252 self._port.sample_process(self._name, self._proc.pid) 253 254 def _split_string_after_index(self, string, index): 255 return string[:index], string[index:] 256 257 def _pop_output_bytes(self, bytes_count): 258 output, self._output = self._split_string_after_index(self._output, bytes_count) 259 return output 260 261 def _pop_error_bytes(self, bytes_count): 262 output, self._error = self._split_string_after_index(self._error, bytes_count) 263 return output 264 265 def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False): 266 if self._proc.stdout.closed or self._proc.stderr.closed: 267 # If the process crashed and is using FIFOs, like Chromium Android, the 268 # stdout and stderr pipes will be closed. 269 return 270 271 out_fd = self._proc.stdout.fileno() 272 err_fd = self._proc.stderr.fileno() 273 select_fds = (out_fd, err_fd) 274 try: 275 read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0)) 276 except select.error as error: 277 # We can ignore EINVAL since it's likely the process just crashed and we'll 278 # figure that out the next time through the loop in _read(). 279 if error.args[0] == errno.EINVAL: 280 return 281 raise 282 283 try: 284 # Note that we may get no data during read() even though 285 # select says we got something; see the select() man page 286 # on linux. I don't know if this happens on Mac OS and 287 # other Unixen as well, but we don't bother special-casing 288 # Linux because it's relatively harmless either way. 289 if out_fd in read_fds: 290 data = self._proc.stdout.read() 291 if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()): 292 self._crashed = True 293 self._log_data('OUT', data) 294 self._output += data 295 296 if err_fd in read_fds: 297 data = self._proc.stderr.read() 298 if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()): 299 self._crashed = True 300 self._log_data('ERR', data) 301 self._error += data 302 except IOError: 303 # We can ignore the IOErrors because we will detect if the 304 # subprocess crashed the next time through the loop in _read(). 305 pass 306 307 def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline): 308 # See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/ 309 # and http://docs.activestate.com/activepython/2.6/pywin32/modules.html 310 # for documentation on all of these win32-specific modules. 311 now = time.time() 312 out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno()) 313 err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno()) 314 while (self._proc.poll() is None) and (now < deadline): 315 output = self._non_blocking_read_win32(out_fh) 316 self._log_data('OUT', output) 317 error = self._non_blocking_read_win32(err_fh) 318 self._log_data('ERR', error) 319 if output or error: 320 if output: 321 self._output += output 322 if error: 323 self._error += error 324 return 325 time.sleep(0.01) 326 now = time.time() 327 return 328 329 def _non_blocking_read_win32(self, handle): 330 try: 331 _, avail, _ = win32pipe.PeekNamedPipe(handle, 0) 332 if avail > 0: 333 _, buf = win32file.ReadFile(handle, avail, None) 334 return buf 335 except Exception as error: # pylint: disable=broad-except 336 if error[0] not in (109, errno.ESHUTDOWN): # 109 == win32 ERROR_BROKEN_PIPE 337 raise 338 return None 339 340 def has_crashed(self): 341 if not self._crashed and self.poll(): 342 self._crashed = True 343 self._handle_possible_interrupt() 344 return self._crashed 345 346 # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet 347 # only reads/returns from one of them (buffering both in local self._output/self._error). 348 # It might be cleaner to pass in the file descriptor to poll instead. 349 def _read(self, deadline, fetch_bytes_from_buffers_callback): 350 while True: 351 if self.has_crashed(): 352 return None 353 354 if time.time() > deadline: 355 self._handle_timeout() 356 return None 357 358 bytes = fetch_bytes_from_buffers_callback() 359 if bytes is not None: 360 return bytes 361 362 if self._use_win32_apis: 363 self._wait_for_data_and_update_buffers_using_win32_apis(deadline) 364 else: 365 self._wait_for_data_and_update_buffers_using_select(deadline) 366 367 def start(self): 368 if not self._proc: 369 self._start() 370 371 def stop(self, timeout_secs=0.0, kill_tree=True): 372 if not self._proc: 373 return (None, None) 374 375 now = time.time() 376 if self._proc.stdin: 377 if self._logging: 378 _log.info(' IN: ^D') 379 self._proc.stdin.close() 380 self._proc.stdin = None 381 killed = False 382 if timeout_secs: 383 deadline = now + timeout_secs 384 while self._proc.poll() is None and time.time() < deadline: 385 time.sleep(0.01) 386 if self._proc.poll() is None: 387 _log.warning('stopping %s(pid %d) timed out, killing it', self._name, self._proc.pid) 388 389 if self._proc.poll() is None: 390 self._kill(kill_tree) 391 killed = True 392 _log.debug('killed pid %d', self._proc.pid) 393 394 # read any remaining data on the pipes and return it. 395 if not killed: 396 if self._use_win32_apis: 397 self._wait_for_data_and_update_buffers_using_win32_apis(now) 398 else: 399 self._wait_for_data_and_update_buffers_using_select(now, stopping=True) 400 out, err = self._output, self._error 401 self._reset() 402 return (out, err) 403 404 def kill(self): 405 self.stop(0.0) 406 407 def _kill(self, kill_tree=True): 408 self._host.executive.kill_process(self._proc.pid, kill_tree) 409 if self._proc.poll() is not None: 410 self._proc.wait() 411 412 def replace_input(self, stdin): 413 assert self._proc 414 if stdin: 415 self._proc.stdin.close() 416 self._proc.stdin = stdin 417 418 def replace_outputs(self, stdout, stderr): 419 assert self._proc 420 if stdout: 421 self._proc.stdout.close() 422 self._proc.stdout = stdout 423 if stderr: 424 self._proc.stderr.close() 425 self._proc.stderr = stderr 426