1# 2# CDDL HEADER START 3# 4# The contents of this file are subject to the terms of the 5# Common Development and Distribution License (the "License"). 6# You may not use this file except in compliance with the License. 7# 8# See LICENSE.txt included in this distribution for the specific 9# language governing permissions and limitations under the License. 10# 11# When distributing Covered Code, include this CDDL HEADER in each 12# file and include the License file at LICENSE.txt. 13# If applicable, add the following below this CDDL HEADER, with the 14# fields enclosed by brackets "[]" replaced with your own identifying 15# information: Portions Copyright [yyyy] [name of copyright owner] 16# 17# CDDL HEADER END 18# 19 20# 21# Copyright (c) 2017, 2020, Oracle and/or its affiliates. All rights reserved. 22# 23 24import logging 25import os 26import signal 27import subprocess 28import threading 29import time 30 31 32class TimeoutException(Exception): 33 """ 34 Exception returned when command exceeded its timeout. 35 """ 36 pass 37 38 39class Command: 40 """ 41 wrapper for synchronous execution of commands via subprocess.Popen() 42 and getting their output (stderr is redirected to stdout by default) 43 and exit value 44 """ 45 46 # state definitions 47 FINISHED = "finished" 48 INTERRUPTED = "interrupted" 49 ERRORED = "errored" 50 TIMEDOUT = "timed out" 51 52 def __init__(self, cmd, args_subst=None, args_append=None, logger=None, 53 excl_subst=False, work_dir=None, env_vars=None, timeout=None, 54 redirect_stderr=True, resource_limits=None, doprint=False): 55 56 if doprint is None: 57 doprint = False 58 59 if isinstance(doprint, list): 60 doprint = doprint[0] 61 62 self.cmd = cmd 63 self.state = "notrun" 64 self.excl_subst = excl_subst 65 self.work_dir = work_dir 66 self.env_vars = env_vars 67 self.timeout = timeout 68 self.pid = None 69 self.redirect_stderr = redirect_stderr 70 self.limits = resource_limits 71 self.doprint = doprint 72 self.err = None 73 74 self.logger = logger or logging.getLogger(__name__) 75 76 if args_subst or args_append: 77 self.fill_arg(args_append, args_subst) 78 79 def __str__(self): 80 return " ".join(self.cmd) 81 82 def execute(self): 83 """ 84 Execute the command and capture its output and return code. 85 """ 86 87 class TimeoutThread(threading.Thread): 88 """ 89 Wait until the timeout specified in seconds expires and kill 90 the process specified by the Popen object after that. 91 If timeout expires, TimeoutException is stored in the object 92 and can be retrieved by the caller. 93 """ 94 95 def __init__(self, logger, timeout, condition, p): 96 super(TimeoutThread, self).__init__() 97 self.timeout = timeout 98 self.popen = p 99 self.condition = condition 100 self.logger = logger 101 self.start() 102 self.exception = None 103 104 def terminate(self, p): 105 """ 106 Make sure the process goes away. 107 """ 108 self.logger.info("Terminating PID {}".format(p.pid)) 109 p.terminate() 110 111 # The following code tries more methods to terminate 112 # the process and is specific to Unix. 113 if os.name == 'posix': 114 timeout = self.timeout 115 # disable E1101 - non existent attribute SIGKILL on windows 116 # pylint: disable=E1101 117 term_signals = [signal.SIGINT, signal.SIGKILL] 118 # pylint: enable=E1101 119 for sig in term_signals: 120 timeout = timeout / 2 # exponential back-off 121 self.logger.info("Sleeping for {} seconds". 122 format(timeout)) 123 time.sleep(timeout) 124 125 if p.poll() is None: 126 self.logger.info("Command with PID {} still alive," 127 " killing with signal {}". 128 format(p.pid, sig)) 129 p.send_signal(sig) 130 else: 131 self.logger.info("Command with PID {} is gone". 132 format(p.pid)) 133 break 134 135 def run(self): 136 with self.condition: 137 if not self.condition.wait(self.timeout): 138 p = self.popen 139 self.logger.info("Terminating command {} with PID {} " 140 "after timeout of {} seconds". 141 format(p.args, p.pid, self.timeout)) 142 self.exception = TimeoutException("Command {} with pid" 143 " {} timed out". 144 format(p.args, 145 p.pid)) 146 self.terminate(p) 147 else: 148 return None 149 150 def get_exception(self): 151 return self.exception 152 153 class OutputThread(threading.Thread): 154 """ 155 Capture data from subprocess.Popen(). This avoids hangs when 156 stdout/stderr buffers fill up. 157 """ 158 159 def __init__(self, event, logger, doprint=False): 160 super(OutputThread, self).__init__() 161 self.read_fd, self.write_fd = os.pipe() 162 self.pipe_fobj = os.fdopen(self.read_fd, encoding='utf8') 163 self.out = [] 164 self.event = event 165 self.logger = logger 166 self.doprint = doprint 167 168 # Start the thread now. 169 self.start() 170 171 def run(self): 172 """ 173 It might happen that after the process is gone, the thread 174 still has data to read from the pipe. Hence, event is used 175 to synchronize with the caller. 176 """ 177 while True: 178 line = self.pipe_fobj.readline() 179 if not line: 180 self.logger.debug("end of output") 181 self.pipe_fobj.close() 182 self.event.set() 183 return 184 185 self.out.append(line) 186 187 if self.doprint: 188 # Even if logging below fails, the thread has to keep 189 # running to avoid hangups of the executed command. 190 try: 191 self.logger.info(line.rstrip()) 192 except Exception as print_exc: 193 self.logger.error(print_exc) 194 195 def getoutput(self): 196 return self.out 197 198 def fileno(self): 199 return self.write_fd 200 201 def close(self): 202 self.logger.debug("closed") 203 os.close(self.write_fd) 204 205 orig_work_dir = None 206 if self.work_dir: 207 try: 208 orig_work_dir = os.getcwd() 209 except OSError: 210 self.state = Command.ERRORED 211 self.logger.error("Cannot get working directory", 212 exc_info=True) 213 return 214 215 try: 216 os.chdir(self.work_dir) 217 except OSError: 218 self.state = Command.ERRORED 219 self.logger.error("Cannot change working directory to {}". 220 format(self.work_dir), exc_info=True) 221 return 222 223 timeout_thread = None 224 output_event = threading.Event() 225 output_thread = OutputThread(output_event, self.logger, 226 doprint=self.doprint) 227 228 # If stderr redirection is off, setup a thread that will capture 229 # stderr data. 230 if self.redirect_stderr: 231 stderr_dest = subprocess.STDOUT 232 else: 233 stderr_event = threading.Event() 234 stderr_thread = OutputThread(stderr_event, self.logger, 235 doprint=self.doprint) 236 stderr_dest = stderr_thread 237 238 try: 239 start_time = time.time() 240 try: 241 self.logger.debug("working directory = {}".format(os.getcwd())) 242 except PermissionError: 243 pass 244 self.logger.debug("command = '{}'".format(self)) 245 my_args = {'stderr': stderr_dest, 246 'stdout': output_thread} 247 if self.env_vars: 248 my_env = os.environ.copy() 249 my_env.update(self.env_vars) 250 self.logger.debug("environment variables: {}".format(my_env)) 251 my_args['env'] = my_env 252 if self.limits: 253 my_args['preexec_fn'] = \ 254 lambda: self.set_resource_limits(self.limits) 255 256 # Actually run the command. 257 p = subprocess.Popen(self.cmd, **my_args) 258 259 self.pid = p.pid 260 261 if self.timeout: 262 time_condition = threading.Condition() 263 self.logger.debug("Setting timeout to {} seconds". 264 format(self.timeout)) 265 timeout_thread = TimeoutThread(self.logger, self.timeout, 266 time_condition, p) 267 268 self.logger.debug("Waiting for process with PID {}".format(p.pid)) 269 p.wait() 270 self.logger.debug("done waiting") 271 272 if self.timeout: 273 e = timeout_thread.get_exception() 274 if e: 275 raise e # pylint: disable=E0702 276 277 except KeyboardInterrupt: 278 self.logger.info("Got KeyboardException while processing ", 279 exc_info=True) 280 self.state = Command.INTERRUPTED 281 except OSError: 282 self.logger.error("Got OS error", exc_info=True) 283 self.state = Command.ERRORED 284 except TimeoutException: 285 self.logger.error("Timed out") 286 self.state = Command.TIMEDOUT 287 else: 288 self.state = Command.FINISHED 289 self.returncode = int(p.returncode) 290 self.logger.debug("'{}' -> {}".format(self, self.getretcode())) 291 finally: 292 if self.timeout != 0 and timeout_thread: 293 with time_condition: 294 time_condition.notifyAll() 295 296 # The subprocess module does not close the write pipe descriptor 297 # it fetched via OutputThread's fileno() so in order to gracefully 298 # exit the read loop we have to close it here ourselves. 299 output_thread.close() 300 self.logger.debug("Waiting on output thread to finish reading") 301 output_event.wait() 302 self.out = output_thread.getoutput() 303 304 if not self.redirect_stderr: 305 stderr_thread.close() 306 self.logger.debug("Waiting on stderr thread to finish reading") 307 stderr_event.wait() 308 self.err = stderr_thread.getoutput() 309 310 elapsed_time = time.time() - start_time 311 self.logger.debug("Command '{}' took {} seconds". 312 format(self, int(elapsed_time))) 313 314 if orig_work_dir: 315 try: 316 os.chdir(orig_work_dir) 317 except OSError: 318 self.state = Command.ERRORED 319 self.logger.error("Cannot change working directory back to {}". 320 format(orig_work_dir), exc_info=True) 321 return 322 323 def fill_arg(self, args_append=None, args_subst=None): 324 """ 325 Replace argument names with actual values or append arguments 326 to the command vector. 327 328 The action depends whether exclusive substitution is on. 329 If yes, arguments will be appended only if no substitution was 330 performed. 331 """ 332 333 newcmd = [] 334 subst_done = -1 335 for i, cmdarg in enumerate(self.cmd): 336 if args_subst: 337 for pattern in args_subst.keys(): 338 if pattern in cmdarg: 339 newarg = cmdarg.replace(pattern, args_subst[pattern]) 340 self.logger.debug("replacing cmdarg with {}". 341 format(newarg)) 342 newcmd.append(newarg) 343 subst_done = i 344 345 if subst_done != i: 346 newcmd.append(self.cmd[i]) 347 else: 348 newcmd.append(self.cmd[i]) 349 350 if args_append and (not self.excl_subst or subst_done == -1): 351 self.logger.debug("appending {}".format(args_append)) 352 newcmd.extend(args_append) 353 354 self.cmd = newcmd 355 356 def get_resource(self, name): 357 try: 358 import resource 359 if name == "RLIMIT_NOFILE": 360 return resource.RLIMIT_NOFILE 361 except ImportError: 362 raise NotImplementedError("manipulating resources is not " 363 "available on your platform") 364 365 raise NotImplementedError("unknown resource") 366 367 def set_resource_limit(self, name, value): 368 try: 369 import resource 370 self.logger.debug("Setting resource {} to {}" 371 .format(name, value)) 372 resource.setrlimit(self.get_resource(name), (value, value)) 373 except ImportError: 374 raise NotImplementedError("manipulating resources is not " 375 "available on your platform") 376 377 def set_resource_limits(self, limits): 378 self.logger.debug("Setting resource limits") 379 for name, value in limits.items(): 380 self.set_resource_limit(name, value) 381 382 def getretcode(self): 383 if self.state != Command.FINISHED: 384 return None 385 else: 386 return self.returncode 387 388 def getoutputstr(self): 389 if self.state == Command.FINISHED: 390 return "".join(self.out).strip() 391 else: 392 return None 393 394 def getoutput(self): 395 if self.state == Command.FINISHED: 396 return self.out 397 else: 398 return None 399 400 def geterroutput(self): 401 return self.err 402 403 def geterroutputstr(self): 404 if self.err: 405 return "".join(self.err).strip() 406 else: 407 return "" 408 409 def getstate(self): 410 return self.state 411 412 def getpid(self): 413 return self.pid 414 415 def log_error(self, msg): 416 if self.state is Command.FINISHED: 417 self.logger.error("{}: command {} in directory {} exited with {}". 418 format(msg, self.cmd, self.work_dir, 419 self.getretcode())) 420 else: 421 self.logger.error("{}: command {} in directory {} ended with " 422 "invalid state {}". 423 format(msg, self.cmd, self.work_dir, self.state)) 424