1""" 2Functions which implement running reactor jobs 3""" 4import fnmatch 5import glob 6import logging 7import os 8 9import salt.client 10import salt.defaults.exitcodes 11import salt.runner 12import salt.state 13import salt.utils.args 14import salt.utils.cache 15import salt.utils.data 16import salt.utils.event 17import salt.utils.files 18import salt.utils.master 19import salt.utils.process 20import salt.utils.yaml 21import salt.wheel 22 23log = logging.getLogger(__name__) 24 25REACTOR_INTERNAL_KEYWORDS = frozenset( 26 ["__id__", "__sls__", "name", "order", "fun", "key", "state"] 27) 28 29 30class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler): 31 """ 32 Read in the reactor configuration variable and compare it to events 33 processed on the master. 34 The reactor has the capability to execute pre-programmed executions 35 as reactions to events 36 """ 37 38 aliases = { 39 "cmd": "local", 40 } 41 42 def __init__(self, opts, **kwargs): 43 super().__init__(**kwargs) 44 local_minion_opts = opts.copy() 45 local_minion_opts["file_client"] = "local" 46 self.minion = salt.minion.MasterMinion(local_minion_opts) 47 salt.state.Compiler.__init__(self, opts, self.minion.rend) 48 self.is_leader = True 49 50 def render_reaction(self, glob_ref, tag, data): 51 """ 52 Execute the render system against a single reaction file and return 53 the data structure 54 """ 55 react = {} 56 57 if glob_ref.startswith("salt://"): 58 glob_ref = self.minion.functions["cp.cache_file"](glob_ref) or "" 59 globbed_ref = glob.glob(glob_ref) 60 if not globbed_ref: 61 log.error( 62 "Can not render SLS %s for tag %s. File missing or not found.", 63 glob_ref, 64 tag, 65 ) 66 for fn_ in globbed_ref: 67 try: 68 res = self.render_template(fn_, tag=tag, data=data) 69 70 # for #20841, inject the sls name here since verify_high() 71 # assumes it exists in case there are any errors 72 for name in res: 73 res[name]["__sls__"] = fn_ 74 75 react.update(res) 76 except Exception: # pylint: disable=broad-except 77 log.exception('Failed to render "%s": ', fn_) 78 return react 79 80 def list_reactors(self, tag): 81 """ 82 Take in the tag from an event and return a list of the reactors to 83 process 84 """ 85 log.debug("Gathering reactors for tag %s", tag) 86 reactors = [] 87 if isinstance(self.opts["reactor"], str): 88 try: 89 with salt.utils.files.fopen(self.opts["reactor"]) as fp_: 90 react_map = salt.utils.yaml.safe_load(fp_) 91 except OSError: 92 log.error('Failed to read reactor map: "%s"', self.opts["reactor"]) 93 except Exception: # pylint: disable=broad-except 94 log.error( 95 'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"] 96 ) 97 else: 98 react_map = self.opts["reactor"] 99 for ropt in react_map: 100 if not isinstance(ropt, dict): 101 continue 102 if len(ropt) != 1: 103 continue 104 key = next(iter(ropt.keys())) 105 val = ropt[key] 106 if fnmatch.fnmatch(tag, key): 107 if isinstance(val, str): 108 reactors.append(val) 109 elif isinstance(val, list): 110 reactors.extend(val) 111 return reactors 112 113 def list_all(self): 114 """ 115 Return a list of the reactors 116 """ 117 if isinstance(self.minion.opts["reactor"], str): 118 log.debug("Reading reactors from yaml %s", self.opts["reactor"]) 119 try: 120 with salt.utils.files.fopen(self.opts["reactor"]) as fp_: 121 react_map = salt.utils.yaml.safe_load(fp_) 122 except OSError: 123 log.error('Failed to read reactor map: "%s"', self.opts["reactor"]) 124 except Exception: # pylint: disable=broad-except 125 log.error( 126 'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"] 127 ) 128 else: 129 log.debug("Not reading reactors from yaml") 130 react_map = self.minion.opts["reactor"] 131 return react_map 132 133 def add_reactor(self, tag, reaction): 134 """ 135 Add a reactor 136 """ 137 reactors = self.list_all() 138 for reactor in reactors: 139 _tag = next(iter(reactor.keys())) 140 if _tag == tag: 141 return {"status": False, "comment": "Reactor already exists."} 142 143 self.minion.opts["reactor"].append({tag: reaction}) 144 return {"status": True, "comment": "Reactor added."} 145 146 def delete_reactor(self, tag): 147 """ 148 Delete a reactor 149 """ 150 reactors = self.list_all() 151 for reactor in reactors: 152 _tag = next(iter(reactor.keys())) 153 if _tag == tag: 154 self.minion.opts["reactor"].remove(reactor) 155 return {"status": True, "comment": "Reactor deleted."} 156 157 return {"status": False, "comment": "Reactor does not exists."} 158 159 def resolve_aliases(self, chunks): 160 """ 161 Preserve backward compatibility by rewriting the 'state' key in the low 162 chunks if it is using a legacy type. 163 """ 164 for idx, _ in enumerate(chunks): 165 new_state = self.aliases.get(chunks[idx]["state"]) 166 if new_state is not None: 167 chunks[idx]["state"] = new_state 168 169 def reactions(self, tag, data, reactors): 170 """ 171 Render a list of reactor files and returns a reaction struct 172 """ 173 log.debug("Compiling reactions for tag %s", tag) 174 high = {} 175 chunks = [] 176 try: 177 for fn_ in reactors: 178 high.update(self.render_reaction(fn_, tag, data)) 179 if high: 180 errors = self.verify_high(high) 181 if errors: 182 log.error( 183 "Unable to render reactions for event %s due to " 184 "errors (%s) in one or more of the sls files (%s)", 185 tag, 186 errors, 187 reactors, 188 ) 189 return [] # We'll return nothing since there was an error 190 chunks = self.order_chunks(self.compile_high_data(high)) 191 except Exception as exc: # pylint: disable=broad-except 192 log.exception("Exception encountered while compiling reactions") 193 194 self.resolve_aliases(chunks) 195 return chunks 196 197 def call_reactions(self, chunks): 198 """ 199 Execute the reaction state 200 """ 201 for chunk in chunks: 202 self.wrap.run(chunk) 203 204 def run(self): 205 """ 206 Enter into the server loop 207 """ 208 salt.utils.process.appendproctitle(self.__class__.__name__) 209 210 if self.opts["reactor_niceness"] and not salt.utils.platform.is_windows(): 211 log.info("Reactor setting niceness to %i", self.opts["reactor_niceness"]) 212 os.nice(self.opts["reactor_niceness"]) 213 214 # instantiate some classes inside our new process 215 with salt.utils.event.get_event( 216 self.opts["__role"], 217 self.opts["sock_dir"], 218 self.opts["transport"], 219 opts=self.opts, 220 listen=True, 221 ) as event: 222 self.wrap = ReactWrap(self.opts) 223 224 for data in event.iter_events(full=True): 225 # skip all events fired by ourselves 226 if data["data"].get("user") == self.wrap.event_user: 227 continue 228 229 # NOTE: these events must contain the masters key in order to be accepted 230 # see salt.runners.reactor for the requesting interface 231 if "salt/reactors/manage" in data["tag"]: 232 master_key = salt.utils.master.get_master_key("root", self.opts) 233 if data["data"].get("key") != master_key: 234 log.error( 235 "received salt/reactors/manage event without matching" 236 " master_key. discarding" 237 ) 238 continue 239 if data["tag"].endswith("salt/reactors/manage/is_leader"): 240 event.fire_event( 241 {"result": self.is_leader}, "salt/reactors/manage/leader/value" 242 ) 243 if data["tag"].endswith("salt/reactors/manage/set_leader"): 244 # we only want to register events from the local master 245 if data["data"].get("id") == self.opts["id"]: 246 self.is_leader = data["data"]["value"] 247 event.fire_event( 248 {"result": self.is_leader}, "salt/reactors/manage/leader/value" 249 ) 250 if data["tag"].endswith("salt/reactors/manage/add"): 251 _data = data["data"] 252 res = self.add_reactor(_data["event"], _data["reactors"]) 253 event.fire_event( 254 {"reactors": self.list_all(), "result": res}, 255 "salt/reactors/manage/add-complete", 256 ) 257 elif data["tag"].endswith("salt/reactors/manage/delete"): 258 _data = data["data"] 259 res = self.delete_reactor(_data["event"]) 260 event.fire_event( 261 {"reactors": self.list_all(), "result": res}, 262 "salt/reactors/manage/delete-complete", 263 ) 264 elif data["tag"].endswith("salt/reactors/manage/list"): 265 event.fire_event( 266 {"reactors": self.list_all()}, 267 "salt/reactors/manage/list-results", 268 ) 269 else: 270 # do not handle any reactions if not leader in cluster 271 if not self.is_leader: 272 continue 273 else: 274 reactors = self.list_reactors(data["tag"]) 275 if not reactors: 276 continue 277 chunks = self.reactions(data["tag"], data["data"], reactors) 278 if chunks: 279 try: 280 self.call_reactions(chunks) 281 except SystemExit: 282 log.warning("Exit ignored by reactor") 283 284 285class ReactWrap: 286 """ 287 Wrapper that executes low data for the Reactor System 288 """ 289 290 # class-wide cache of clients 291 client_cache = None 292 event_user = "Reactor" 293 294 reaction_class = { 295 "local": salt.client.LocalClient, 296 "runner": salt.runner.RunnerClient, 297 "wheel": salt.wheel.Wheel, 298 "caller": salt.client.Caller, 299 } 300 301 def __init__(self, opts): 302 self.opts = opts 303 if ReactWrap.client_cache is None: 304 ReactWrap.client_cache = salt.utils.cache.CacheDict( 305 opts["reactor_refresh_interval"] 306 ) 307 308 self.pool = salt.utils.process.ThreadPool( 309 self.opts["reactor_worker_threads"], # number of workers for runner/wheel 310 queue_size=self.opts["reactor_worker_hwm"], # queue size for those workers 311 ) 312 313 def populate_client_cache(self, low): 314 """ 315 Populate the client cache with an instance of the specified type 316 """ 317 reaction_type = low["state"] 318 if reaction_type not in self.client_cache: 319 log.debug("Reactor is populating %s client cache", reaction_type) 320 if reaction_type in ("runner", "wheel"): 321 # Reaction types that run locally on the master want the full 322 # opts passed. 323 self.client_cache[reaction_type] = self.reaction_class[reaction_type]( 324 self.opts 325 ) 326 # The len() function will cause the module functions to load if 327 # they aren't already loaded. We want to load them so that the 328 # spawned threads don't need to load them. Loading in the 329 # spawned threads creates race conditions such as sometimes not 330 # finding the required function because another thread is in 331 # the middle of loading the functions. 332 len(self.client_cache[reaction_type].functions) 333 else: 334 # Reactions which use remote pubs only need the conf file when 335 # instantiating a client instance. 336 self.client_cache[reaction_type] = self.reaction_class[reaction_type]( 337 self.opts["conf_file"] 338 ) 339 340 def run(self, low): 341 """ 342 Execute a reaction by invoking the proper wrapper func 343 """ 344 self.populate_client_cache(low) 345 try: 346 l_fun = getattr(self, low["state"]) 347 except AttributeError: 348 log.error("ReactWrap is missing a wrapper function for '%s'", low["state"]) 349 350 try: 351 wrap_call = salt.utils.args.format_call(l_fun, low) 352 args = wrap_call.get("args", ()) 353 kwargs = wrap_call.get("kwargs", {}) 354 # TODO: Setting user doesn't seem to work for actual remote pubs 355 if low["state"] in ("runner", "wheel"): 356 # Update called function's low data with event user to 357 # segregate events fired by reactor and avoid reaction loops 358 kwargs["__user__"] = self.event_user 359 # Replace ``state`` kwarg which comes from high data compiler. 360 # It breaks some runner functions and seems unnecessary. 361 kwargs["__state__"] = kwargs.pop("state") 362 # NOTE: if any additional keys are added here, they will also 363 # need to be added to filter_kwargs() 364 365 if "args" in kwargs: 366 # New configuration 367 reactor_args = kwargs.pop("args") 368 for item in ("arg", "kwarg"): 369 if item in low: 370 log.warning( 371 "Reactor '%s' is ignoring '%s' param %s due to " 372 "presence of 'args' param. Check the Reactor System " 373 "documentation for the correct argument format.", 374 low["__id__"], 375 item, 376 low[item], 377 ) 378 if ( 379 low["state"] == "caller" 380 and isinstance(reactor_args, list) 381 and not salt.utils.data.is_dictlist(reactor_args) 382 ): 383 # Legacy 'caller' reactors were already using the 'args' 384 # param, but only supported a list of positional arguments. 385 # If low['args'] is a list but is *not* a dictlist, then 386 # this is actually using the legacy configuration. So, put 387 # the reactor args into kwarg['arg'] so that the wrapper 388 # interprets them as positional args. 389 kwargs["arg"] = reactor_args 390 kwargs["kwarg"] = {} 391 else: 392 kwargs["arg"] = () 393 kwargs["kwarg"] = reactor_args 394 if not isinstance(kwargs["kwarg"], dict): 395 kwargs["kwarg"] = salt.utils.data.repack_dictlist(kwargs["kwarg"]) 396 if not kwargs["kwarg"]: 397 log.error( 398 "Reactor '%s' failed to execute %s '%s': " 399 "Incorrect argument format, check the Reactor System " 400 "documentation for the correct format.", 401 low["__id__"], 402 low["state"], 403 low["fun"], 404 ) 405 return 406 else: 407 # Legacy configuration 408 react_call = {} 409 if low["state"] in ("runner", "wheel"): 410 if "arg" not in kwargs or "kwarg" not in kwargs: 411 # Runner/wheel execute on the master, so we can use 412 # format_call to get the functions args/kwargs 413 react_fun = self.client_cache[low["state"]].functions.get( 414 low["fun"] 415 ) 416 if react_fun is None: 417 log.error( 418 "Reactor '%s' failed to execute %s '%s': " 419 "function not available", 420 low["__id__"], 421 low["state"], 422 low["fun"], 423 ) 424 return 425 426 react_call = salt.utils.args.format_call( 427 react_fun, low, expected_extra_kws=REACTOR_INTERNAL_KEYWORDS 428 ) 429 430 if "arg" not in kwargs: 431 kwargs["arg"] = react_call.get("args", ()) 432 if "kwarg" not in kwargs: 433 kwargs["kwarg"] = react_call.get("kwargs", {}) 434 435 # Execute the wrapper with the proper args/kwargs. kwargs['arg'] 436 # and kwargs['kwarg'] contain the positional and keyword arguments 437 # that will be passed to the client interface to execute the 438 # desired runner/wheel/remote-exec/etc. function. 439 ret = l_fun(*args, **kwargs) 440 441 if ret is False: 442 log.error( 443 "Reactor '%s' failed to execute %s '%s': " 444 "TaskPool queue is full!" 445 "Consider tuning reactor_worker_threads and/or" 446 " reactor_worker_hwm", 447 low["__id__"], 448 low["state"], 449 low["fun"], 450 ) 451 452 except SystemExit: 453 log.warning("Reactor '%s' attempted to exit. Ignored.", low["__id__"]) 454 except Exception: # pylint: disable=broad-except 455 log.error( 456 "Reactor '%s' failed to execute %s '%s'", 457 low["__id__"], 458 low["state"], 459 low["fun"], 460 exc_info=True, 461 ) 462 463 def runner(self, fun, **kwargs): 464 """ 465 Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>` 466 """ 467 return self.pool.fire_async(self.client_cache["runner"].low, args=(fun, kwargs)) 468 469 def wheel(self, fun, **kwargs): 470 """ 471 Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>` 472 """ 473 return self.pool.fire_async(self.client_cache["wheel"].low, args=(fun, kwargs)) 474 475 def local(self, fun, tgt, **kwargs): 476 """ 477 Wrap LocalClient for running :ref:`execution modules <all-salt.modules>` 478 """ 479 self.client_cache["local"].cmd_async(tgt, fun, **kwargs) 480 481 def caller(self, fun, **kwargs): 482 """ 483 Wrap LocalCaller to execute remote exec functions locally on the Minion 484 """ 485 self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"]) 486