1#
2# CDDL HEADER START
3#
4# The contents of this file are subject to the terms of the
5# Common Development and Distribution License (the "License").
6# You may not use this file except in compliance with the License.
7#
8# See LICENSE.txt included in this distribution for the specific
9# language governing permissions and limitations under the License.
10#
11# When distributing Covered Code, include this CDDL HEADER in each
12# file and include the License file at LICENSE.txt.
13# If applicable, add the following below this CDDL HEADER, with the
14# fields enclosed by brackets "[]" replaced with your own identifying
15# information: Portions Copyright [yyyy] [name of copyright owner]
16#
17# CDDL HEADER END
18#
19
20#
21# Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved.
22#
23
24import logging
25import os
26import signal
27import subprocess
28import threading
29import time
30
31
32class TimeoutException(Exception):
33    """
34    Exception returned when command exceeded its timeout.
35    """
36    pass
37
38
39class Command:
40    """
41    wrapper for synchronous execution of commands via subprocess.Popen()
42    and getting their output (stderr is redirected to stdout by default)
43    and exit value
44    """
45
46    # state definitions
47    FINISHED = "finished"
48    INTERRUPTED = "interrupted"
49    ERRORED = "errored"
50    TIMEDOUT = "timed out"
51
52    def __init__(self, cmd, args_subst=None, args_append=None, logger=None,
53                 excl_subst=False, work_dir=None, env_vars=None, timeout=None,
54                 redirect_stderr=True, resource_limits=None, doprint=False):
55
56        if doprint is None:
57            doprint = False
58
59        if isinstance(doprint, list):
60            doprint = doprint[0]
61
62        self.cmd = cmd
63        self.state = "notrun"
64        self.excl_subst = excl_subst
65        self.work_dir = work_dir
66        self.env_vars = env_vars
67        self.timeout = timeout
68        self.pid = None
69        self.redirect_stderr = redirect_stderr
70        self.limits = resource_limits
71        self.doprint = doprint
72        self.err = None
73
74        self.logger = logger or logging.getLogger(__name__)
75
76        if args_subst or args_append:
77            self.fill_arg(args_append, args_subst)
78
79    def __str__(self):
80        return " ".join(self.cmd)
81
82    def execute(self):
83        """
84        Execute the command and capture its output and return code.
85        """
86
87        class TimeoutThread(threading.Thread):
88            """
89            Wait until the timeout specified in seconds expires and kill
90            the process specified by the Popen object after that.
91            If timeout expires, TimeoutException is stored in the object
92            and can be retrieved by the caller.
93            """
94
95            def __init__(self, logger, timeout, condition, p):
96                super(TimeoutThread, self).__init__()
97                self.timeout = timeout
98                self.popen = p
99                self.condition = condition
100                self.logger = logger
101                self.start()
102                self.exception = None
103
104            def terminate(self, p):
105                """
106                Make sure the process goes away.
107                """
108                self.logger.info("Terminating PID {}".format(p.pid))
109                p.terminate()
110
111                # The following code tries more methods to terminate
112                # the process and is specific to Unix.
113                if os.name == 'posix':
114                    timeout = self.timeout
115                    # disable E1101 - non existent attribute SIGKILL on windows
116                    # pylint: disable=E1101
117                    term_signals = [signal.SIGINT, signal.SIGKILL]
118                    # pylint: enable=E1101
119                    for sig in term_signals:
120                        timeout = timeout / 2  # exponential back-off
121                        self.logger.info("Sleeping for {} seconds".
122                                         format(timeout))
123                        time.sleep(timeout)
124
125                        if p.poll() is None:
126                            self.logger.info("Command with PID {} still alive,"
127                                             " killing with signal {}".
128                                             format(p.pid, sig))
129                            p.send_signal(sig)
130                        else:
131                            self.logger.info("Command with PID {} is gone".
132                                             format(p.pid))
133                            break
134
135            def run(self):
136                with self.condition:
137                    if not self.condition.wait(self.timeout):
138                        p = self.popen
139                        self.logger.info("Terminating command {} with PID {} "
140                                         "after timeout of {} seconds".
141                                         format(p.args, p.pid, self.timeout))
142                        self.exception = TimeoutException("Command {} with pid"
143                                                          " {} timed out".
144                                                          format(p.args,
145                                                                 p.pid))
146                        self.terminate(p)
147                    else:
148                        return None
149
150            def get_exception(self):
151                return self.exception
152
153        class OutputThread(threading.Thread):
154            """
155            Capture data from subprocess.Popen(). This avoids hangs when
156            stdout/stderr buffers fill up.
157            """
158
159            def __init__(self, event, logger, doprint=False):
160                super(OutputThread, self).__init__()
161                self.read_fd, self.write_fd = os.pipe()
162                self.pipe_fobj = os.fdopen(self.read_fd, encoding='utf8')
163                self.out = []
164                self.event = event
165                self.logger = logger
166                self.doprint = doprint
167
168                # Start the thread now.
169                self.start()
170
171            def run(self):
172                """
173                It might happen that after the process is gone, the thread
174                still has data to read from the pipe. Hence, event is used
175                to synchronize with the caller.
176                """
177                while True:
178                    line = self.pipe_fobj.readline()
179                    if not line:
180                        self.logger.debug("end of output")
181                        self.pipe_fobj.close()
182                        self.event.set()
183                        return
184
185                    self.out.append(line)
186
187                    if self.doprint:
188                        # Even if logging below fails, the thread has to keep
189                        # running to avoid hangups of the executed command.
190                        try:
191                            self.logger.info(line.rstrip())
192                        except Exception as print_exc:
193                            self.logger.error(print_exc)
194
195            def getoutput(self):
196                return self.out
197
198            def fileno(self):
199                return self.write_fd
200
201            def close(self):
202                self.logger.debug("closed")
203                os.close(self.write_fd)
204
205        orig_work_dir = None
206        if self.work_dir:
207            try:
208                orig_work_dir = os.getcwd()
209            except OSError:
210                self.state = Command.ERRORED
211                self.logger.error("Cannot get working directory",
212                                  exc_info=True)
213                return
214
215            try:
216                os.chdir(self.work_dir)
217            except OSError:
218                self.state = Command.ERRORED
219                self.logger.error("Cannot change working directory to {}".
220                                  format(self.work_dir), exc_info=True)
221                return
222
223        timeout_thread = None
224        output_event = threading.Event()
225        output_thread = OutputThread(output_event, self.logger,
226                                     doprint=self.doprint)
227
228        # If stderr redirection is off, setup a thread that will capture
229        # stderr data.
230        if self.redirect_stderr:
231            stderr_dest = subprocess.STDOUT
232        else:
233            stderr_event = threading.Event()
234            stderr_thread = OutputThread(stderr_event, self.logger,
235                                         doprint=self.doprint)
236            stderr_dest = stderr_thread
237
238        try:
239            start_time = time.time()
240            try:
241                self.logger.debug("working directory = {}".format(os.getcwd()))
242            except PermissionError:
243                pass
244            self.logger.debug("command = '{}'".format(self))
245            my_args = {'stderr': stderr_dest,
246                       'stdout': output_thread}
247            if self.env_vars:
248                my_env = os.environ.copy()
249                my_env.update(self.env_vars)
250                self.logger.debug("environment variables: {}".format(my_env))
251                my_args['env'] = my_env
252            if self.limits:
253                my_args['preexec_fn'] = \
254                    lambda: self.set_resource_limits(self.limits)
255
256            # Actually run the command.
257            p = subprocess.Popen(self.cmd, **my_args)
258
259            self.pid = p.pid
260
261            if self.timeout:
262                time_condition = threading.Condition()
263                self.logger.debug("Setting timeout to {} seconds".
264                                  format(self.timeout))
265                timeout_thread = TimeoutThread(self.logger, self.timeout,
266                                               time_condition, p)
267
268            self.logger.debug("Waiting for process with PID {}".format(p.pid))
269            p.wait()
270            self.logger.debug("done waiting")
271
272            if self.timeout:
273                e = timeout_thread.get_exception()
274                if e:
275                    raise e  # pylint: disable=E0702
276
277        except KeyboardInterrupt:
278            self.logger.info("Got KeyboardException while processing ",
279                             exc_info=True)
280            self.state = Command.INTERRUPTED
281        except OSError:
282            self.logger.error("Got OS error", exc_info=True)
283            self.state = Command.ERRORED
284        except TimeoutException:
285            self.logger.error("Timed out")
286            self.state = Command.TIMEDOUT
287        else:
288            self.state = Command.FINISHED
289            self.returncode = int(p.returncode)
290            self.logger.debug("'{}' -> {}".format(self, self.getretcode()))
291        finally:
292            if self.timeout != 0 and timeout_thread:
293                with time_condition:
294                    time_condition.notifyAll()
295
296            # The subprocess module does not close the write pipe descriptor
297            # it fetched via OutputThread's fileno() so in order to gracefully
298            # exit the read loop we have to close it here ourselves.
299            output_thread.close()
300            self.logger.debug("Waiting on output thread to finish reading")
301            output_event.wait()
302            self.out = output_thread.getoutput()
303
304            if not self.redirect_stderr:
305                stderr_thread.close()
306                self.logger.debug("Waiting on stderr thread to finish reading")
307                stderr_event.wait()
308                self.err = stderr_thread.getoutput()
309
310            elapsed_time = time.time() - start_time
311            self.logger.debug("Command '{}' took {} seconds".
312                              format(self, int(elapsed_time)))
313
314        if orig_work_dir:
315            try:
316                os.chdir(orig_work_dir)
317            except OSError:
318                self.state = Command.ERRORED
319                self.logger.error("Cannot change working directory back to {}".
320                                  format(orig_work_dir), exc_info=True)
321                return
322
323    def fill_arg(self, args_append=None, args_subst=None):
324        """
325        Replace argument names with actual values or append arguments
326        to the command vector.
327
328        The action depends whether exclusive substitution is on.
329        If yes, arguments will be appended only if no substitution was
330        performed.
331        """
332
333        newcmd = []
334        subst_done = -1
335        for i, cmdarg in enumerate(self.cmd):
336            if args_subst:
337                for pattern in args_subst.keys():
338                    if pattern in cmdarg:
339                        newarg = cmdarg.replace(pattern, args_subst[pattern])
340                        self.logger.debug("replacing cmdarg with {}".
341                                          format(newarg))
342                        newcmd.append(newarg)
343                        subst_done = i
344
345                if subst_done != i:
346                    newcmd.append(self.cmd[i])
347            else:
348                newcmd.append(self.cmd[i])
349
350        if args_append and (not self.excl_subst or subst_done == -1):
351            self.logger.debug("appending {}".format(args_append))
352            newcmd.extend(args_append)
353
354        self.cmd = newcmd
355
356    def get_resource(self, name):
357        try:
358            import resource
359            if name == "RLIMIT_NOFILE":
360                return resource.RLIMIT_NOFILE
361        except ImportError:
362            raise NotImplementedError("manipulating resources is not "
363                                      "available on your platform")
364
365        raise NotImplementedError("unknown resource")
366
367    def set_resource_limit(self, name, value):
368        try:
369            import resource
370            self.logger.debug("Setting resource {} to {}"
371                              .format(name, value))
372            resource.setrlimit(self.get_resource(name), (value, value))
373        except ImportError:
374            raise NotImplementedError("manipulating resources is not "
375                                      "available on your platform")
376
377    def set_resource_limits(self, limits):
378        self.logger.debug("Setting resource limits")
379        for name, value in limits.items():
380            self.set_resource_limit(name, value)
381
382    def getretcode(self):
383        if self.state != Command.FINISHED:
384            return None
385        else:
386            return self.returncode
387
388    def getoutputstr(self):
389        if self.state == Command.FINISHED:
390            return "".join(self.out).strip()
391        else:
392            return None
393
394    def getoutput(self):
395        if self.state == Command.FINISHED:
396            return self.out
397        else:
398            return None
399
400    def geterroutput(self):
401        return self.err
402
403    def geterroutputstr(self):
404        if self.err:
405            return "".join(self.err).strip()
406        else:
407            return ""
408
409    def getstate(self):
410        return self.state
411
412    def getpid(self):
413        return self.pid
414
415    def log_error(self, msg):
416        if self.state is Command.FINISHED:
417            self.logger.error("{}: command {} in directory {} exited with {}".
418                              format(msg, self.cmd, self.work_dir,
419                                     self.getretcode()))
420        else:
421            self.logger.error("{}: command {} in directory {} ended with "
422                              "invalid state {}".
423                              format(msg, self.cmd, self.work_dir, self.state))
424