1""" 2For running command line executables with a timeout 3""" 4 5import shlex 6import subprocess 7import threading 8 9import salt.exceptions 10import salt.utils.data 11import salt.utils.stringutils 12 13 14class TimedProc: 15 """ 16 Create a TimedProc object, calls subprocess.Popen with passed args and **kwargs 17 """ 18 19 def __init__(self, args, **kwargs): 20 21 self.wait = not kwargs.pop("bg", False) 22 self.stdin = kwargs.pop("stdin", None) 23 self.with_communicate = kwargs.pop("with_communicate", self.wait) 24 self.timeout = kwargs.pop("timeout", None) 25 self.stdin_raw_newlines = kwargs.pop("stdin_raw_newlines", False) 26 27 # If you're not willing to wait for the process 28 # you can't define any stdin, stdout or stderr 29 if not self.wait: 30 self.stdin = kwargs["stdin"] = None 31 self.with_communicate = False 32 elif self.stdin is not None: 33 if not self.stdin_raw_newlines: 34 # Translate a newline submitted as '\n' on the CLI to an actual 35 # newline character. 36 self.stdin = salt.utils.stringutils.to_bytes( 37 self.stdin.replace("\\n", "\n") 38 ) 39 kwargs["stdin"] = subprocess.PIPE 40 41 if not self.with_communicate: 42 self.stdout = kwargs["stdout"] = None 43 self.stderr = kwargs["stderr"] = None 44 45 if self.timeout and not isinstance(self.timeout, (int, float)): 46 raise salt.exceptions.TimedProcTimeoutError( 47 "Error: timeout {} must be a number".format(self.timeout) 48 ) 49 if kwargs.get("shell", False): 50 args = salt.utils.data.decode(args, to_str=True) 51 52 try: 53 self.process = subprocess.Popen(args, **kwargs) 54 except (AttributeError, TypeError): 55 if not kwargs.get("shell", False): 56 if not isinstance(args, (list, tuple)): 57 try: 58 args = shlex.split(args) 59 except AttributeError: 60 args = shlex.split(str(args)) 61 str_args = [] 62 for arg in args: 63 if not isinstance(arg, str): 64 str_args.append(str(arg)) 65 else: 66 str_args.append(arg) 67 args = str_args 68 else: 69 if not isinstance(args, (list, tuple, str)): 70 # Handle corner case where someone does a 'cmd.run 3' 71 args = str(args) 72 # Ensure that environment variables are strings 73 for key, val in kwargs.get("env", {}).items(): 74 if not isinstance(val, str): 75 kwargs["env"][key] = str(val) 76 if not isinstance(key, str): 77 kwargs["env"][str(key)] = kwargs["env"].pop(key) 78 args = salt.utils.data.decode(args) 79 self.process = subprocess.Popen(args, **kwargs) 80 self.command = args 81 82 def run(self): 83 """ 84 wait for subprocess to terminate and return subprocess' return code. 85 If timeout is reached, throw TimedProcTimeoutError 86 """ 87 88 def receive(): 89 if self.with_communicate: 90 self.stdout, self.stderr = self.process.communicate(input=self.stdin) 91 elif self.wait: 92 self.process.wait() 93 94 if not self.timeout: 95 receive() 96 else: 97 rt = threading.Thread(target=receive) 98 rt.start() 99 rt.join(self.timeout) 100 if rt.is_alive(): 101 # Subprocess cleanup (best effort) 102 self.process.kill() 103 104 def terminate(): 105 if rt.is_alive(): 106 self.process.terminate() 107 108 threading.Timer(10, terminate).start() 109 raise salt.exceptions.TimedProcTimeoutError( 110 "{} : Timed out after {} seconds".format( 111 self.command, 112 str(self.timeout), 113 ) 114 ) 115 return self.process.returncode 116