1# Copyright (C) 2010 Google Inc. All rights reserved.
2#
3# Redistribution and use in source and binary forms, with or without
4# modification, are permitted provided that the following conditions are
5# met:
6#
7#     * Redistributions of source code must retain the above copyright
8# notice, this list of conditions and the following disclaimer.
9#     * Redistributions in binary form must reproduce the above
10# copyright notice, this list of conditions and the following disclaimer
11# in the documentation and/or other materials provided with the
12# distribution.
13#     * Neither the Google name nor the names of its
14# contributors may be used to endorse or promote products derived from
15# this software without specific prior written permission.
16#
17# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
18# "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
19# LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
20# A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
21# OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
22# SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
23# LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
24# DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
25# THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
26# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
27# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
28
29"""Package that implements the ServerProcess wrapper class"""
30
31import errno
32import logging
33import re
34import signal
35import sys
36import time
37
38# Note that although win32 python does provide an implementation of
39# the win32 select API, it only works on sockets, and not on the named pipes
40# used by subprocess, so we have to use the native APIs directly.
41_quote_cmd = None
42
43if sys.platform == 'win32':
44    import msvcrt
45    import win32pipe
46    import win32file
47    import subprocess
48    _quote_cmd = subprocess.list2cmdline
49else:
50    import fcntl
51    import os
52    import pipes
53    import select
54    _quote_cmd = lambda cmdline: ' '.join(pipes.quote(arg) for arg in cmdline)
55
56
57_log = logging.getLogger(__name__)
58
59
60_trailing_spaces_re = re.compile('(.*[^ ])?( +)$')
61
62
63def quote_data(data):
64    txt = repr(data).replace('\\n', '\\n\n')[1:-1]
65    lines = []
66    for l in txt.splitlines():
67        m = _trailing_spaces_re.match(l)
68        if m:
69            l = m.group(1) + m.group(2).replace(' ', '\x20')
70        lines.append(l)
71    return lines
72
73
74class ServerProcess(object):
75    """This class provides a wrapper around a subprocess that
76    implements a simple request/response usage model. The primary benefit
77    is that reading responses takes a deadline, so that we don't ever block
78    indefinitely. The class also handles transparently restarting processes
79    as necessary to keep issuing commands.
80    """
81
82    def __init__(self, port_obj, name, cmd, env=None, treat_no_data_as_crash=False,
83                 more_logging=False):
84        self._port = port_obj
85        self._name = name  # Should be the command name (e.g. content_shell, image_diff)
86        self._cmd = cmd
87        self._env = env
88        self._treat_no_data_as_crash = treat_no_data_as_crash
89        self._logging = more_logging
90        self._host = self._port.host
91        self._proc = None
92        self._pid = None
93        self._reset()
94
95        # See comment in imports for why we need the win32 APIs and can't just use select.
96        self._use_win32_apis = sys.platform == 'win32'
97
98    def name(self):
99        return self._name
100
101    def pid(self):
102        return self._pid
103
104    def _reset(self):
105        if getattr(self, '_proc', None):
106            if self._proc.stdin:
107                self._proc.stdin.close()
108                self._proc.stdin = None
109            if self._proc.stdout:
110                self._proc.stdout.close()
111                self._proc.stdout = None
112            if self._proc.stderr:
113                self._proc.stderr.close()
114                self._proc.stderr = None
115
116        self._proc = None
117        self._output = str()  # bytesarray() once we require Python 2.6
118        self._error = str()  # bytesarray() once we require Python 2.6
119        self._crashed = False
120        self.timed_out = False
121
122    def process_name(self):
123        return self._name
124
125    def _start(self):
126        if self._proc:
127            raise ValueError('%s already running' % self._name)
128        self._reset()
129        # close_fds is a workaround for http://bugs.python.org/issue2320
130        close_fds = not self._host.platform.is_win()
131        if self._logging:
132            env_str = ''
133            if self._env:
134                env_str += '\n'.join('%s=%s' % (k, v) for k, v in self._env.items()) + '\n'
135            _log.info('CMD: \n%s%s\n', env_str, _quote_cmd(self._cmd))
136        proc = self._host.executive.popen(self._cmd, stdin=self._host.executive.PIPE,
137                                          stdout=self._host.executive.PIPE,
138                                          stderr=self._host.executive.PIPE,
139                                          close_fds=close_fds,
140                                          env=self._env)
141        self._set_proc(proc)
142
143    def _set_proc(self, proc):
144        assert not self._proc
145        self._proc = proc
146        self._pid = self._proc.pid
147        if not self._use_win32_apis:
148            fd = self._proc.stdout.fileno()
149            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
150            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
151            fd = self._proc.stderr.fileno()
152            fl = fcntl.fcntl(fd, fcntl.F_GETFL)
153            fcntl.fcntl(fd, fcntl.F_SETFL, fl | os.O_NONBLOCK)
154
155    def _handle_possible_interrupt(self):
156        """This routine checks to see if the process crashed or exited
157        because of a keyboard interrupt and raises KeyboardInterrupt
158        accordingly.
159        """
160        # FIXME: Linux and Mac set the returncode to -signal.SIGINT if a
161        # subprocess is killed with a ctrl^C.  Previous comments in this
162        # routine said that supposedly Windows returns 0xc000001d, but that's
163        # not what -1073741510 evaluates to. Figure out what the right value
164        # is for win32 here ...
165        if self._proc.returncode in (-1073741510, -signal.SIGINT):
166            raise KeyboardInterrupt
167
168    def poll(self):
169        """Check to see if the underlying process is running; returns None
170        if it still is (wrapper around subprocess.poll).
171        """
172        if self._proc:
173            return self._proc.poll()
174        return None
175
176    def write(self, bytes):
177        """Write a request to the subprocess. The subprocess is (re-)start()'ed
178        if is not already running.
179        """
180        if not self._proc:
181            self._start()
182        try:
183            self._log_data(' IN', bytes)
184            self._proc.stdin.write(bytes)
185        except IOError:
186            self.stop(0.0)
187            # stop() calls _reset(), so we have to set crashed to True after calling stop().
188            self._crashed = True
189
190    def _pop_stdout_line_if_ready(self):
191        index_after_newline = self._output.find('\n') + 1
192        if index_after_newline > 0:
193            return self._pop_output_bytes(index_after_newline)
194        return None
195
196    def _pop_stderr_line_if_ready(self):
197        index_after_newline = self._error.find('\n') + 1
198        if index_after_newline > 0:
199            return self._pop_error_bytes(index_after_newline)
200        return None
201
202    def pop_all_buffered_stderr(self):
203        return self._pop_error_bytes(len(self._error))
204
205    def read_stdout_line(self, deadline):
206        return self._read(deadline, self._pop_stdout_line_if_ready)
207
208    def read_stderr_line(self, deadline):
209        return self._read(deadline, self._pop_stderr_line_if_ready)
210
211    def read_either_stdout_or_stderr_line(self, deadline):
212        def retrieve_bytes_from_buffers():
213            stdout_line = self._pop_stdout_line_if_ready()
214            if stdout_line:
215                return stdout_line, None
216            stderr_line = self._pop_stderr_line_if_ready()
217            if stderr_line:
218                return None, stderr_line
219            return None  # Instructs the caller to keep waiting.
220
221        return_value = self._read(deadline, retrieve_bytes_from_buffers)
222        # FIXME: This is a bit of a hack around the fact that _read normally only
223        # returns one value, but this caller wants it to return two.
224        if return_value is None:
225            return None, None
226        return return_value
227
228    def read_stdout(self, deadline, size):
229        if size <= 0:
230            raise ValueError('ServerProcess.read() called with a non-positive size: %d ' % size)
231
232        def retrieve_bytes_from_stdout_buffer():
233            if len(self._output) >= size:
234                return self._pop_output_bytes(size)
235            return None
236
237        return self._read(deadline, retrieve_bytes_from_stdout_buffer)
238
239    def _log(self, message):
240        # This is a bit of a hack, but we first log a blank line to avoid
241        # messing up the master process's output.
242        _log.info('')
243        _log.info(message)
244
245    def _log_data(self, prefix, data):
246        if self._logging and data and len(data):
247            for line in quote_data(data):
248                _log.info('%s: %s', prefix, line)
249
250    def _handle_timeout(self):
251        self.timed_out = True
252        self._port.sample_process(self._name, self._proc.pid)
253
254    def _split_string_after_index(self, string, index):
255        return string[:index], string[index:]
256
257    def _pop_output_bytes(self, bytes_count):
258        output, self._output = self._split_string_after_index(self._output, bytes_count)
259        return output
260
261    def _pop_error_bytes(self, bytes_count):
262        output, self._error = self._split_string_after_index(self._error, bytes_count)
263        return output
264
265    def _wait_for_data_and_update_buffers_using_select(self, deadline, stopping=False):
266        if self._proc.stdout.closed or self._proc.stderr.closed:
267            # If the process crashed and is using FIFOs, like Chromium Android, the
268            # stdout and stderr pipes will be closed.
269            return
270
271        out_fd = self._proc.stdout.fileno()
272        err_fd = self._proc.stderr.fileno()
273        select_fds = (out_fd, err_fd)
274        try:
275            read_fds, _, _ = select.select(select_fds, [], select_fds, max(deadline - time.time(), 0))
276        except select.error as error:
277            # We can ignore EINVAL since it's likely the process just crashed and we'll
278            # figure that out the next time through the loop in _read().
279            if error.args[0] == errno.EINVAL:
280                return
281            raise
282
283        try:
284            # Note that we may get no data during read() even though
285            # select says we got something; see the select() man page
286            # on linux. I don't know if this happens on Mac OS and
287            # other Unixen as well, but we don't bother special-casing
288            # Linux because it's relatively harmless either way.
289            if out_fd in read_fds:
290                data = self._proc.stdout.read()
291                if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
292                    self._crashed = True
293                self._log_data('OUT', data)
294                self._output += data
295
296            if err_fd in read_fds:
297                data = self._proc.stderr.read()
298                if not data and not stopping and (self._treat_no_data_as_crash or self._proc.poll()):
299                    self._crashed = True
300                self._log_data('ERR', data)
301                self._error += data
302        except IOError:
303            # We can ignore the IOErrors because we will detect if the
304            # subprocess crashed the next time through the loop in _read().
305            pass
306
307    def _wait_for_data_and_update_buffers_using_win32_apis(self, deadline):
308        # See http://code.activestate.com/recipes/440554-module-to-allow-asynchronous-subprocess-use-on-win/
309        # and http://docs.activestate.com/activepython/2.6/pywin32/modules.html
310        # for documentation on all of these win32-specific modules.
311        now = time.time()
312        out_fh = msvcrt.get_osfhandle(self._proc.stdout.fileno())
313        err_fh = msvcrt.get_osfhandle(self._proc.stderr.fileno())
314        while (self._proc.poll() is None) and (now < deadline):
315            output = self._non_blocking_read_win32(out_fh)
316            self._log_data('OUT', output)
317            error = self._non_blocking_read_win32(err_fh)
318            self._log_data('ERR', error)
319            if output or error:
320                if output:
321                    self._output += output
322                if error:
323                    self._error += error
324                return
325            time.sleep(0.01)
326            now = time.time()
327        return
328
329    def _non_blocking_read_win32(self, handle):
330        try:
331            _, avail, _ = win32pipe.PeekNamedPipe(handle, 0)
332            if avail > 0:
333                _, buf = win32file.ReadFile(handle, avail, None)
334                return buf
335        except Exception as error:  # pylint: disable=broad-except
336            if error[0] not in (109, errno.ESHUTDOWN):  # 109 == win32 ERROR_BROKEN_PIPE
337                raise
338        return None
339
340    def has_crashed(self):
341        if not self._crashed and self.poll():
342            self._crashed = True
343            self._handle_possible_interrupt()
344        return self._crashed
345
346    # This read function is a bit oddly-designed, as it polls both stdout and stderr, yet
347    # only reads/returns from one of them (buffering both in local self._output/self._error).
348    # It might be cleaner to pass in the file descriptor to poll instead.
349    def _read(self, deadline, fetch_bytes_from_buffers_callback):
350        while True:
351            if self.has_crashed():
352                return None
353
354            if time.time() > deadline:
355                self._handle_timeout()
356                return None
357
358            bytes = fetch_bytes_from_buffers_callback()
359            if bytes is not None:
360                return bytes
361
362            if self._use_win32_apis:
363                self._wait_for_data_and_update_buffers_using_win32_apis(deadline)
364            else:
365                self._wait_for_data_and_update_buffers_using_select(deadline)
366
367    def start(self):
368        if not self._proc:
369            self._start()
370
371    def stop(self, timeout_secs=0.0, kill_tree=True):
372        if not self._proc:
373            return (None, None)
374
375        now = time.time()
376        if self._proc.stdin:
377            if self._logging:
378                _log.info(' IN: ^D')
379            self._proc.stdin.close()
380            self._proc.stdin = None
381        killed = False
382        if timeout_secs:
383            deadline = now + timeout_secs
384            while self._proc.poll() is None and time.time() < deadline:
385                time.sleep(0.01)
386            if self._proc.poll() is None:
387                _log.warning('stopping %s(pid %d) timed out, killing it', self._name, self._proc.pid)
388
389        if self._proc.poll() is None:
390            self._kill(kill_tree)
391            killed = True
392            _log.debug('killed pid %d', self._proc.pid)
393
394        # read any remaining data on the pipes and return it.
395        if not killed:
396            if self._use_win32_apis:
397                self._wait_for_data_and_update_buffers_using_win32_apis(now)
398            else:
399                self._wait_for_data_and_update_buffers_using_select(now, stopping=True)
400        out, err = self._output, self._error
401        self._reset()
402        return (out, err)
403
404    def kill(self):
405        self.stop(0.0)
406
407    def _kill(self, kill_tree=True):
408        self._host.executive.kill_process(self._proc.pid, kill_tree)
409        if self._proc.poll() is not None:
410            self._proc.wait()
411
412    def replace_input(self, stdin):
413        assert self._proc
414        if stdin:
415            self._proc.stdin.close()
416            self._proc.stdin = stdin
417
418    def replace_outputs(self, stdout, stderr):
419        assert self._proc
420        if stdout:
421            self._proc.stdout.close()
422            self._proc.stdout = stdout
423        if stderr:
424            self._proc.stderr.close()
425            self._proc.stderr = stderr
426