1""" 2Utility functions for minions 3""" 4 5 6import logging 7import os 8import threading 9 10import salt.payload 11import salt.utils.files 12import salt.utils.platform 13import salt.utils.process 14 15log = logging.getLogger(__name__) 16 17 18def running(opts): 19 """ 20 Return the running jobs on this minion 21 """ 22 23 ret = [] 24 proc_dir = os.path.join(opts["cachedir"], "proc") 25 if not os.path.isdir(proc_dir): 26 return ret 27 for fn_ in os.listdir(proc_dir): 28 path = os.path.join(proc_dir, fn_) 29 try: 30 data = _read_proc_file(path, opts) 31 if data is not None: 32 ret.append(data) 33 except OSError: 34 # proc files may be removed at any time during this process by 35 # the minion process that is executing the JID in question, so 36 # we must ignore ENOENT during this process 37 pass 38 return ret 39 40 41def cache_jobs(opts, jid, ret): 42 """ 43 Write job information to cache 44 """ 45 fn_ = os.path.join(opts["cachedir"], "minion_jobs", jid, "return.p") 46 jdir = os.path.dirname(fn_) 47 if not os.path.isdir(jdir): 48 os.makedirs(jdir) 49 with salt.utils.files.fopen(fn_, "w+b") as fp_: 50 fp_.write(salt.payload.dumps(ret)) 51 52 53def _read_proc_file(path, opts): 54 """ 55 Return a dict of JID metadata, or None 56 """ 57 current_thread = threading.currentThread().name 58 pid = os.getpid() 59 with salt.utils.files.fopen(path, "rb") as fp_: 60 buf = fp_.read() 61 fp_.close() 62 if buf: 63 data = salt.payload.loads(buf) 64 else: 65 # Proc file is empty, remove 66 try: 67 os.remove(path) 68 except OSError: 69 log.debug("Unable to remove proc file %s.", path) 70 return None 71 if not isinstance(data, dict): 72 # Invalid serial object 73 return None 74 if not salt.utils.process.os_is_running(data["pid"]): 75 # The process is no longer running, clear out the file and 76 # continue 77 try: 78 os.remove(path) 79 except OSError: 80 log.debug("Unable to remove proc file %s.", path) 81 return None 82 if opts.get("multiprocessing"): 83 if data.get("pid") == pid: 84 return None 85 else: 86 if data.get("pid") != pid: 87 try: 88 os.remove(path) 89 except OSError: 90 log.debug("Unable to remove proc file %s.", path) 91 return None 92 thread_name = "{}-Job-{}".format(data.get("jid"), data.get("jid")) 93 if data.get("jid") == current_thread or thread_name == current_thread: 94 return None 95 found = data.get("jid") in [ 96 x.name for x in threading.enumerate() 97 ] or thread_name in [x.name for x in threading.enumerate()] 98 if not found: 99 found = thread_name in [x.name for x in threading.enumerate()] 100 if not found: 101 try: 102 os.remove(path) 103 except OSError: 104 log.debug("Unable to remove proc file %s.", path) 105 return None 106 107 if not _check_cmdline(data): 108 pid = data.get("pid") 109 if pid: 110 log.warning("PID %s exists but does not appear to be a salt process.", pid) 111 try: 112 os.remove(path) 113 except OSError: 114 log.debug("Unable to remove proc file %s.", path) 115 return None 116 return data 117 118 119def _check_cmdline(data): 120 """ 121 In some cases where there are an insane number of processes being created 122 on a system a PID can get recycled or assigned to a non-Salt process. 123 On Linux this fn checks to make sure the PID we are checking on is actually 124 a Salt process. 125 126 For non-Linux systems we punt and just return True 127 """ 128 if not salt.utils.platform.is_linux(): 129 return True 130 pid = data.get("pid") 131 if not pid: 132 return False 133 if not os.path.isdir("/proc"): 134 return True 135 path = os.path.join("/proc/{}/cmdline".format(pid)) 136 if not os.path.isfile(path): 137 return False 138 try: 139 with salt.utils.files.fopen(path, "rb") as fp_: 140 if b"salt" in fp_.read(): 141 return True 142 except OSError: 143 return False 144