1# 2# Proxy minion metaproxy modules 3# 4 5import logging 6import os 7import signal 8import sys 9import threading 10import traceback 11import types 12 13# pylint: disable=3rd-party-module-not-gated 14import salt 15import salt.beacons 16import salt.cli.daemons 17import salt.client 18import salt.crypt 19import salt.defaults.exitcodes 20import salt.engines 21import salt.ext.tornado.gen # pylint: disable=F0401 22import salt.ext.tornado.ioloop # pylint: disable=F0401 23import salt.loader 24import salt.log.setup 25import salt.minion 26import salt.payload 27import salt.pillar 28import salt.serializers.msgpack 29import salt.syspaths 30import salt.utils.args 31import salt.utils.context 32import salt.utils.data 33import salt.utils.dictupdate 34import salt.utils.error 35import salt.utils.event 36import salt.utils.files 37import salt.utils.jid 38import salt.utils.minion 39import salt.utils.minions 40import salt.utils.network 41import salt.utils.platform 42import salt.utils.process 43import salt.utils.schedule 44import salt.utils.ssdp 45import salt.utils.user 46import salt.utils.zeromq 47from salt.defaults import DEFAULT_TARGET_DELIM 48from salt.exceptions import ( 49 CommandExecutionError, 50 CommandNotFoundError, 51 SaltInvocationError, 52 SaltSystemExit, 53) 54from salt.minion import ProxyMinion 55from salt.utils.event import tagify 56from salt.utils.process import SignalHandlingProcess, default_signals 57 58log = logging.getLogger(__name__) 59 60 61def post_master_init(self, master): 62 """ 63 Function to finish init after a proxy 64 minion has finished connecting to a master. 65 66 This is primarily loading modules, pillars, etc. (since they need 67 to know which master they connected to) 68 """ 69 70 log.debug("subclassed LazyLoaded _post_master_init") 71 if self.connected: 72 self.opts["master"] = master 73 74 self.opts["pillar"] = yield salt.pillar.get_async_pillar( 75 self.opts, 76 self.opts["grains"], 77 self.opts["id"], 78 saltenv=self.opts["saltenv"], 79 pillarenv=self.opts.get("pillarenv"), 80 ).compile_pillar() 81 82 if "proxy" not in self.opts["pillar"] and "proxy" not in self.opts: 83 errmsg = ( 84 "No proxy key found in pillar or opts for id " 85 + self.opts["id"] 86 + ". " 87 + "Check your pillar/opts configuration and contents. Salt-proxy aborted." 88 ) 89 log.error(errmsg) 90 self._running = False 91 raise SaltSystemExit(code=-1, msg=errmsg) 92 93 if "proxy" not in self.opts: 94 self.opts["proxy"] = self.opts["pillar"]["proxy"] 95 96 if self.opts.get("proxy_merge_pillar_in_opts"): 97 # Override proxy opts with pillar data when the user required. 98 self.opts = salt.utils.dictupdate.merge( 99 self.opts, 100 self.opts["pillar"], 101 strategy=self.opts.get("proxy_merge_pillar_in_opts_strategy"), 102 merge_lists=self.opts.get("proxy_deep_merge_pillar_in_opts", False), 103 ) 104 elif self.opts.get("proxy_mines_pillar"): 105 # Even when not required, some details such as mine configuration 106 # should be merged anyway whenever possible. 107 if "mine_interval" in self.opts["pillar"]: 108 self.opts["mine_interval"] = self.opts["pillar"]["mine_interval"] 109 if "mine_functions" in self.opts["pillar"]: 110 general_proxy_mines = self.opts.get("mine_functions", {}) 111 specific_proxy_mines = self.opts["pillar"]["mine_functions"] 112 try: 113 self.opts["mine_functions"] = general_proxy_mines + specific_proxy_mines 114 except TypeError as terr: 115 log.error( 116 "Unable to merge mine functions from the pillar in the opts, for proxy %s", 117 self.opts["id"], 118 ) 119 120 fq_proxyname = self.opts["proxy"]["proxytype"] 121 122 # Need to load the modules so they get all the dunder variables 123 ( 124 self.functions, 125 self.returners, 126 self.function_errors, 127 self.executors, 128 ) = self._load_modules() 129 130 # we can then sync any proxymodules down from the master 131 # we do a sync_all here in case proxy code was installed by 132 # SPM or was manually placed in /srv/salt/_modules etc. 133 self.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"]) 134 135 # Pull in the utils 136 self.utils = salt.loader.utils(self.opts) 137 138 # Then load the proxy module 139 self.proxy = salt.loader.proxy(self.opts, utils=self.utils) 140 141 # And re-load the modules so the __proxy__ variable gets injected 142 ( 143 self.functions, 144 self.returners, 145 self.function_errors, 146 self.executors, 147 ) = self._load_modules() 148 self.functions.pack["__proxy__"] = self.proxy 149 self.proxy.pack["__salt__"] = self.functions 150 self.proxy.pack["__ret__"] = self.returners 151 self.proxy.pack["__pillar__"] = self.opts["pillar"] 152 153 # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__ 154 self.utils = salt.loader.utils(self.opts, proxy=self.proxy) 155 self.proxy.pack["__utils__"] = self.utils 156 157 # Reload all modules so all dunder variables are injected 158 self.proxy.reload_modules() 159 160 # Start engines here instead of in the Minion superclass __init__ 161 # This is because we need to inject the __proxy__ variable but 162 # it is not setup until now. 163 self.io_loop.spawn_callback( 164 salt.engines.start_engines, self.opts, self.process_manager, proxy=self.proxy 165 ) 166 167 if ( 168 "{}.init".format(fq_proxyname) not in self.proxy 169 or "{}.shutdown".format(fq_proxyname) not in self.proxy 170 ): 171 errmsg = ( 172 "Proxymodule {} is missing an init() or a shutdown() or both. ".format( 173 fq_proxyname 174 ) 175 + "Check your proxymodule. Salt-proxy aborted." 176 ) 177 log.error(errmsg) 178 self._running = False 179 raise SaltSystemExit(code=-1, msg=errmsg) 180 181 self.module_executors = self.proxy.get( 182 "{}.module_executors".format(fq_proxyname), lambda: [] 183 )() 184 proxy_init_fn = self.proxy[fq_proxyname + ".init"] 185 proxy_init_fn(self.opts) 186 187 self.opts["grains"] = salt.loader.grains(self.opts, proxy=self.proxy) 188 189 self.mod_opts = self._prep_mod_opts() 190 self.matchers = salt.loader.matchers(self.opts) 191 self.beacons = salt.beacons.Beacon(self.opts, self.functions) 192 uid = salt.utils.user.get_uid(user=self.opts.get("user", None)) 193 self.proc_dir = salt.minion.get_proc_dir(self.opts["cachedir"], uid=uid) 194 195 if self.connected and self.opts["pillar"]: 196 # The pillar has changed due to the connection to the master. 197 # Reload the functions so that they can use the new pillar data. 198 ( 199 self.functions, 200 self.returners, 201 self.function_errors, 202 self.executors, 203 ) = self._load_modules() 204 if hasattr(self, "schedule"): 205 self.schedule.functions = self.functions 206 self.schedule.returners = self.returners 207 208 if not hasattr(self, "schedule"): 209 self.schedule = salt.utils.schedule.Schedule( 210 self.opts, 211 self.functions, 212 self.returners, 213 cleanup=[salt.minion.master_event(type="alive")], 214 proxy=self.proxy, 215 ) 216 217 # add default scheduling jobs to the minions scheduler 218 if self.opts["mine_enabled"] and "mine.update" in self.functions: 219 self.schedule.add_job( 220 { 221 "__mine_interval": { 222 "function": "mine.update", 223 "minutes": self.opts["mine_interval"], 224 "jid_include": True, 225 "maxrunning": 2, 226 "run_on_start": True, 227 "return_job": self.opts.get("mine_return_job", False), 228 } 229 }, 230 persist=True, 231 ) 232 log.info("Added mine.update to scheduler") 233 else: 234 self.schedule.delete_job("__mine_interval", persist=True) 235 236 # add master_alive job if enabled 237 if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0: 238 self.schedule.add_job( 239 { 240 salt.minion.master_event(type="alive", master=self.opts["master"]): { 241 "function": "status.master", 242 "seconds": self.opts["master_alive_interval"], 243 "jid_include": True, 244 "maxrunning": 1, 245 "return_job": False, 246 "kwargs": {"master": self.opts["master"], "connected": True}, 247 } 248 }, 249 persist=True, 250 ) 251 if ( 252 self.opts["master_failback"] 253 and "master_list" in self.opts 254 and self.opts["master"] != self.opts["master_list"][0] 255 ): 256 self.schedule.add_job( 257 { 258 salt.minion.master_event(type="failback"): { 259 "function": "status.ping_master", 260 "seconds": self.opts["master_failback_interval"], 261 "jid_include": True, 262 "maxrunning": 1, 263 "return_job": False, 264 "kwargs": {"master": self.opts["master_list"][0]}, 265 } 266 }, 267 persist=True, 268 ) 269 else: 270 self.schedule.delete_job( 271 salt.minion.master_event(type="failback"), persist=True 272 ) 273 else: 274 self.schedule.delete_job( 275 salt.minion.master_event(type="alive", master=self.opts["master"]), 276 persist=True, 277 ) 278 self.schedule.delete_job( 279 salt.minion.master_event(type="failback"), persist=True 280 ) 281 282 # proxy keepalive 283 proxy_alive_fn = fq_proxyname + ".alive" 284 if ( 285 proxy_alive_fn in self.proxy 286 and "status.proxy_reconnect" in self.functions 287 and self.opts.get("proxy_keep_alive", True) 288 ): 289 # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting 290 self.schedule.add_job( 291 { 292 "__proxy_keepalive": { 293 "function": "status.proxy_reconnect", 294 "minutes": self.opts.get( 295 "proxy_keep_alive_interval", 1 296 ), # by default, check once per minute 297 "jid_include": True, 298 "maxrunning": 1, 299 "return_job": False, 300 "kwargs": {"proxy_name": fq_proxyname}, 301 } 302 }, 303 persist=True, 304 ) 305 self.schedule.enable_schedule() 306 else: 307 self.schedule.delete_job("__proxy_keepalive", persist=True) 308 309 # Sync the grains here so the proxy can communicate them to the master 310 self.functions["saltutil.sync_grains"](saltenv="base") 311 self.grains_cache = self.opts["grains"] 312 self.ready = True 313 314 315def target(cls, minion_instance, opts, data, connected): 316 """ 317 Handle targeting of the minion. 318 319 Calling _thread_multi_return or _thread_return 320 depending on a single or multiple commands. 321 """ 322 if not minion_instance: 323 minion_instance = cls(opts) 324 minion_instance.connected = connected 325 if not hasattr(minion_instance, "functions"): 326 # Need to load the modules so they get all the dunder variables 327 ( 328 functions, 329 returners, 330 function_errors, 331 executors, 332 ) = minion_instance._load_modules(grains=opts["grains"]) 333 minion_instance.functions = functions 334 minion_instance.returners = returners 335 minion_instance.function_errors = function_errors 336 minion_instance.executors = executors 337 338 # Pull in the utils 339 minion_instance.utils = salt.loader.utils(minion_instance.opts) 340 341 # Then load the proxy module 342 minion_instance.proxy = salt.loader.proxy( 343 minion_instance.opts, utils=minion_instance.utils 344 ) 345 346 # And re-load the modules so the __proxy__ variable gets injected 347 ( 348 functions, 349 returners, 350 function_errors, 351 executors, 352 ) = minion_instance._load_modules(grains=opts["grains"]) 353 minion_instance.functions = functions 354 minion_instance.returners = returners 355 minion_instance.function_errors = function_errors 356 minion_instance.executors = executors 357 358 minion_instance.functions.pack["__proxy__"] = minion_instance.proxy 359 minion_instance.proxy.pack["__salt__"] = minion_instance.functions 360 minion_instance.proxy.pack["__ret__"] = minion_instance.returners 361 minion_instance.proxy.pack["__pillar__"] = minion_instance.opts["pillar"] 362 363 # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__ 364 minion_instance.utils = salt.loader.utils( 365 minion_instance.opts, proxy=minion_instance.proxy 366 ) 367 minion_instance.proxy.pack["__utils__"] = minion_instance.utils 368 369 # Reload all modules so all dunder variables are injected 370 minion_instance.proxy.reload_modules() 371 372 fq_proxyname = opts["proxy"]["proxytype"] 373 374 minion_instance.module_executors = minion_instance.proxy.get( 375 "{}.module_executors".format(fq_proxyname), lambda: [] 376 )() 377 378 proxy_init_fn = minion_instance.proxy[fq_proxyname + ".init"] 379 proxy_init_fn(opts) 380 if not hasattr(minion_instance, "proc_dir"): 381 uid = salt.utils.user.get_uid(user=opts.get("user", None)) 382 minion_instance.proc_dir = salt.minion.get_proc_dir( 383 opts["cachedir"], uid=uid 384 ) 385 386 with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx): 387 if isinstance(data["fun"], tuple) or isinstance(data["fun"], list): 388 ProxyMinion._thread_multi_return(minion_instance, opts, data) 389 else: 390 ProxyMinion._thread_return(minion_instance, opts, data) 391 392 393def thread_return(cls, minion_instance, opts, data): 394 """ 395 This method should be used as a threading target, start the actual 396 minion side execution. 397 """ 398 fn_ = os.path.join(minion_instance.proc_dir, data["jid"]) 399 400 salt.utils.process.appendproctitle( 401 "{}._thread_return {}".format(cls.__name__, data["jid"]) 402 ) 403 404 sdata = {"pid": os.getpid()} 405 sdata.update(data) 406 log.info("Starting a new job with PID %s", sdata["pid"]) 407 with salt.utils.files.fopen(fn_, "w+b") as fp_: 408 fp_.write(salt.payload.dumps(sdata)) 409 ret = {"success": False} 410 function_name = data["fun"] 411 executors = ( 412 data.get("module_executors") 413 or getattr(minion_instance, "module_executors", []) 414 or opts.get("module_executors", ["direct_call"]) 415 ) 416 allow_missing_funcs = any( 417 [ 418 minion_instance.executors["{}.allow_missing_func".format(executor)]( 419 function_name 420 ) 421 for executor in executors 422 if "{}.allow_missing_func".format(executor) in minion_instance.executors 423 ] 424 ) 425 if function_name in minion_instance.functions or allow_missing_funcs is True: 426 try: 427 minion_blackout_violation = False 428 if minion_instance.connected and minion_instance.opts["pillar"].get( 429 "minion_blackout", False 430 ): 431 whitelist = minion_instance.opts["pillar"].get( 432 "minion_blackout_whitelist", [] 433 ) 434 # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist 435 if ( 436 function_name != "saltutil.refresh_pillar" 437 and function_name not in whitelist 438 ): 439 minion_blackout_violation = True 440 # use minion_blackout_whitelist from grains if it exists 441 if minion_instance.opts["grains"].get("minion_blackout", False): 442 whitelist = minion_instance.opts["grains"].get( 443 "minion_blackout_whitelist", [] 444 ) 445 if ( 446 function_name != "saltutil.refresh_pillar" 447 and function_name not in whitelist 448 ): 449 minion_blackout_violation = True 450 if minion_blackout_violation: 451 raise SaltInvocationError( 452 "Minion in blackout mode. Set 'minion_blackout' " 453 "to False in pillar or grains to resume operations. Only " 454 "saltutil.refresh_pillar allowed in blackout mode." 455 ) 456 457 if function_name in minion_instance.functions: 458 func = minion_instance.functions[function_name] 459 args, kwargs = salt.minion.load_args_and_kwargs(func, data["arg"], data) 460 else: 461 # only run if function_name is not in minion_instance.functions and allow_missing_funcs is True 462 func = function_name 463 args, kwargs = data["arg"], data 464 minion_instance.functions.pack["__context__"]["retcode"] = 0 465 if isinstance(executors, str): 466 executors = [executors] 467 elif not isinstance(executors, list) or not executors: 468 raise SaltInvocationError( 469 "Wrong executors specification: {}. String or non-empty list" 470 " expected".format(executors) 471 ) 472 if opts.get("sudo_user", "") and executors[-1] != "sudo": 473 executors[-1] = "sudo" # replace the last one with sudo 474 log.trace("Executors list %s", executors) # pylint: disable=no-member 475 476 for name in executors: 477 fname = "{}.execute".format(name) 478 if fname not in minion_instance.executors: 479 raise SaltInvocationError( 480 "Executor '{}' is not available".format(name) 481 ) 482 return_data = minion_instance.executors[fname]( 483 opts, data, func, args, kwargs 484 ) 485 if return_data is not None: 486 break 487 488 if isinstance(return_data, types.GeneratorType): 489 ind = 0 490 iret = {} 491 for single in return_data: 492 if isinstance(single, dict) and isinstance(iret, dict): 493 iret.update(single) 494 else: 495 if not iret: 496 iret = [] 497 iret.append(single) 498 tag = tagify([data["jid"], "prog", opts["id"], str(ind)], "job") 499 event_data = {"return": single} 500 minion_instance._fire_master(event_data, tag) 501 ind += 1 502 ret["return"] = iret 503 else: 504 ret["return"] = return_data 505 506 retcode = minion_instance.functions.pack["__context__"].get( 507 "retcode", salt.defaults.exitcodes.EX_OK 508 ) 509 if retcode == salt.defaults.exitcodes.EX_OK: 510 # No nonzero retcode in __context__ dunder. Check if return 511 # is a dictionary with a "result" or "success" key. 512 try: 513 func_result = all( 514 return_data.get(x, True) for x in ("result", "success") 515 ) 516 except Exception: # pylint: disable=broad-except 517 # return data is not a dict 518 func_result = True 519 if not func_result: 520 retcode = salt.defaults.exitcodes.EX_GENERIC 521 522 ret["retcode"] = retcode 523 ret["success"] = retcode == salt.defaults.exitcodes.EX_OK 524 except CommandNotFoundError as exc: 525 msg = "Command required for '{}' not found".format(function_name) 526 log.debug(msg, exc_info=True) 527 ret["return"] = "{}: {}".format(msg, exc) 528 ret["out"] = "nested" 529 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 530 except CommandExecutionError as exc: 531 log.error( 532 "A command in '%s' had a problem: %s", 533 function_name, 534 exc, 535 exc_info_on_loglevel=logging.DEBUG, 536 ) 537 ret["return"] = "ERROR: {}".format(exc) 538 ret["out"] = "nested" 539 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 540 except SaltInvocationError as exc: 541 log.error( 542 "Problem executing '%s': %s", 543 function_name, 544 exc, 545 exc_info_on_loglevel=logging.DEBUG, 546 ) 547 ret["return"] = "ERROR executing '{}': {}".format(function_name, exc) 548 ret["out"] = "nested" 549 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 550 except TypeError as exc: 551 msg = "Passed invalid arguments to {}: {}\n{}".format( 552 function_name, exc, func.__doc__ or "" 553 ) 554 log.warning(msg, exc_info_on_loglevel=logging.DEBUG) 555 ret["return"] = msg 556 ret["out"] = "nested" 557 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 558 except Exception: # pylint: disable=broad-except 559 msg = "The minion function caused an exception" 560 log.warning(msg, exc_info=True) 561 salt.utils.error.fire_exception( 562 salt.exceptions.MinionError(msg), opts, job=data 563 ) 564 ret["return"] = "{}: {}".format(msg, traceback.format_exc()) 565 ret["out"] = "nested" 566 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 567 else: 568 docs = minion_instance.functions["sys.doc"]("{}*".format(function_name)) 569 if docs: 570 docs[function_name] = minion_instance.functions.missing_fun_string( 571 function_name 572 ) 573 ret["return"] = docs 574 else: 575 ret["return"] = minion_instance.functions.missing_fun_string(function_name) 576 mod_name = function_name.split(".")[0] 577 if mod_name in minion_instance.function_errors: 578 ret["return"] += " Possible reasons: '{}'".format( 579 minion_instance.function_errors[mod_name] 580 ) 581 ret["success"] = False 582 ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC 583 ret["out"] = "nested" 584 585 ret["jid"] = data["jid"] 586 ret["fun"] = data["fun"] 587 ret["fun_args"] = data["arg"] 588 if "master_id" in data: 589 ret["master_id"] = data["master_id"] 590 if "metadata" in data: 591 if isinstance(data["metadata"], dict): 592 ret["metadata"] = data["metadata"] 593 else: 594 log.warning("The metadata parameter must be a dictionary. Ignoring.") 595 if minion_instance.connected: 596 minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer()) 597 598 # Add default returners from minion config 599 # Should have been coverted to comma-delimited string already 600 if isinstance(opts.get("return"), str): 601 if data["ret"]: 602 data["ret"] = ",".join((data["ret"], opts["return"])) 603 else: 604 data["ret"] = opts["return"] 605 606 log.debug("minion return: %s", ret) 607 # TODO: make a list? Seems odd to split it this late :/ 608 if data["ret"] and isinstance(data["ret"], str): 609 if "ret_config" in data: 610 ret["ret_config"] = data["ret_config"] 611 if "ret_kwargs" in data: 612 ret["ret_kwargs"] = data["ret_kwargs"] 613 ret["id"] = opts["id"] 614 for returner in set(data["ret"].split(",")): 615 try: 616 returner_str = "{}.returner".format(returner) 617 if returner_str in minion_instance.returners: 618 minion_instance.returners[returner_str](ret) 619 else: 620 returner_err = minion_instance.returners.missing_fun_string( 621 returner_str 622 ) 623 log.error( 624 "Returner %s could not be loaded: %s", 625 returner_str, 626 returner_err, 627 ) 628 except Exception as exc: # pylint: disable=broad-except 629 log.exception("The return failed for job %s: %s", data["jid"], exc) 630 631 632def thread_multi_return(cls, minion_instance, opts, data): 633 """ 634 This method should be used as a threading target, start the actual 635 minion side execution. 636 """ 637 fn_ = os.path.join(minion_instance.proc_dir, data["jid"]) 638 639 salt.utils.process.appendproctitle( 640 "{}._thread_multi_return {}".format(cls.__name__, data["jid"]) 641 ) 642 643 sdata = {"pid": os.getpid()} 644 sdata.update(data) 645 log.info("Starting a new job with PID %s", sdata["pid"]) 646 with salt.utils.files.fopen(fn_, "w+b") as fp_: 647 fp_.write(salt.payload.dumps(sdata)) 648 649 multifunc_ordered = opts.get("multifunc_ordered", False) 650 num_funcs = len(data["fun"]) 651 if multifunc_ordered: 652 ret = { 653 "return": [None] * num_funcs, 654 "retcode": [None] * num_funcs, 655 "success": [False] * num_funcs, 656 } 657 else: 658 ret = {"return": {}, "retcode": {}, "success": {}} 659 660 for ind in range(0, num_funcs): 661 if not multifunc_ordered: 662 ret["success"][data["fun"][ind]] = False 663 try: 664 minion_blackout_violation = False 665 if minion_instance.connected and minion_instance.opts["pillar"].get( 666 "minion_blackout", False 667 ): 668 whitelist = minion_instance.opts["pillar"].get( 669 "minion_blackout_whitelist", [] 670 ) 671 # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist 672 if ( 673 data["fun"][ind] != "saltutil.refresh_pillar" 674 and data["fun"][ind] not in whitelist 675 ): 676 minion_blackout_violation = True 677 elif minion_instance.opts["grains"].get("minion_blackout", False): 678 whitelist = minion_instance.opts["grains"].get( 679 "minion_blackout_whitelist", [] 680 ) 681 if ( 682 data["fun"][ind] != "saltutil.refresh_pillar" 683 and data["fun"][ind] not in whitelist 684 ): 685 minion_blackout_violation = True 686 if minion_blackout_violation: 687 raise SaltInvocationError( 688 "Minion in blackout mode. Set 'minion_blackout' " 689 "to False in pillar or grains to resume operations. Only " 690 "saltutil.refresh_pillar allowed in blackout mode." 691 ) 692 693 func = minion_instance.functions[data["fun"][ind]] 694 695 args, kwargs = salt.minion.load_args_and_kwargs( 696 func, data["arg"][ind], data 697 ) 698 minion_instance.functions.pack["__context__"]["retcode"] = 0 699 key = ind if multifunc_ordered else data["fun"][ind] 700 ret["return"][key] = func(*args, **kwargs) 701 retcode = minion_instance.functions.pack["__context__"].get("retcode", 0) 702 if retcode == 0: 703 # No nonzero retcode in __context__ dunder. Check if return 704 # is a dictionary with a "result" or "success" key. 705 try: 706 func_result = all( 707 ret["return"][key].get(x, True) for x in ("result", "success") 708 ) 709 except Exception: # pylint: disable=broad-except 710 # return data is not a dict 711 func_result = True 712 if not func_result: 713 retcode = 1 714 715 ret["retcode"][key] = retcode 716 ret["success"][key] = retcode == 0 717 except Exception as exc: # pylint: disable=broad-except 718 trb = traceback.format_exc() 719 log.warning("The minion function caused an exception: %s", exc) 720 if multifunc_ordered: 721 ret["return"][ind] = trb 722 else: 723 ret["return"][data["fun"][ind]] = trb 724 ret["jid"] = data["jid"] 725 ret["fun"] = data["fun"] 726 ret["fun_args"] = data["arg"] 727 if "metadata" in data: 728 ret["metadata"] = data["metadata"] 729 if minion_instance.connected: 730 minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer()) 731 if data["ret"]: 732 if "ret_config" in data: 733 ret["ret_config"] = data["ret_config"] 734 if "ret_kwargs" in data: 735 ret["ret_kwargs"] = data["ret_kwargs"] 736 for returner in set(data["ret"].split(",")): 737 ret["id"] = opts["id"] 738 try: 739 minion_instance.returners["{}.returner".format(returner)](ret) 740 except Exception as exc: # pylint: disable=broad-except 741 log.error("The return failed for job %s: %s", data["jid"], exc) 742 743 744def handle_payload(self, payload): 745 """ 746 Verify the publication and then pass 747 the payload along to _handle_decoded_payload. 748 """ 749 if payload is not None and payload["enc"] == "aes": 750 if self._target_load(payload["load"]): 751 752 self._handle_decoded_payload(payload["load"]) 753 elif self.opts["zmq_filtering"]: 754 # In the filtering enabled case, we'd like to know when minion sees something it shouldnt 755 log.trace( 756 "Broadcast message received not for this minion, Load: %s", 757 payload["load"], 758 ) 759 # If it's not AES, and thus has not been verified, we do nothing. 760 # In the future, we could add support for some clearfuncs, but 761 # the minion currently has no need. 762 763 764def handle_decoded_payload(self, data): 765 """ 766 Override this method if you wish to handle the decoded data 767 differently. 768 """ 769 # Ensure payload is unicode. Disregard failure to decode binary blobs. 770 if "user" in data: 771 log.info( 772 "User %s Executing command %s with jid %s", 773 data["user"], 774 data["fun"], 775 data["jid"], 776 ) 777 else: 778 log.info("Executing command %s with jid %s", data["fun"], data["jid"]) 779 log.debug("Command details %s", data) 780 781 # Don't duplicate jobs 782 log.trace("Started JIDs: %s", self.jid_queue) 783 if self.jid_queue is not None: 784 if data["jid"] in self.jid_queue: 785 return 786 else: 787 self.jid_queue.append(data["jid"]) 788 if len(self.jid_queue) > self.opts["minion_jid_queue_hwm"]: 789 self.jid_queue.pop(0) 790 791 if isinstance(data["fun"], str): 792 if data["fun"] == "sys.reload_modules": 793 ( 794 self.functions, 795 self.returners, 796 self.function_errors, 797 self.executors, 798 ) = self._load_modules() 799 self.schedule.functions = self.functions 800 self.schedule.returners = self.returners 801 802 process_count_max = self.opts.get("process_count_max") 803 if process_count_max > 0: 804 process_count = len(salt.utils.minion.running(self.opts)) 805 while process_count >= process_count_max: 806 log.warning( 807 "Maximum number of processes reached while executing jid %s, waiting...", 808 data["jid"], 809 ) 810 yield salt.ext.tornado.gen.sleep(10) 811 process_count = len(salt.utils.minion.running(self.opts)) 812 813 # We stash an instance references to allow for the socket 814 # communication in Windows. You can't pickle functions, and thus 815 # python needs to be able to reconstruct the reference on the other 816 # side. 817 instance = self 818 multiprocessing_enabled = self.opts.get("multiprocessing", True) 819 if multiprocessing_enabled: 820 if sys.platform.startswith("win"): 821 # let python reconstruct the minion on the other side if we're 822 # running on windows 823 instance = None 824 with default_signals(signal.SIGINT, signal.SIGTERM): 825 process = SignalHandlingProcess( 826 target=self._target, 827 name="ProcessPayload", 828 args=(instance, self.opts, data, self.connected), 829 ) 830 else: 831 process = threading.Thread( 832 target=self._target, 833 args=(instance, self.opts, data, self.connected), 834 name=data["jid"], 835 ) 836 837 if multiprocessing_enabled: 838 with default_signals(signal.SIGINT, signal.SIGTERM): 839 # Reset current signals before starting the process in 840 # order not to inherit the current signal handlers 841 process.start() 842 else: 843 process.start() 844 process.name = "{}-Job-{}".format(process.name, data["jid"]) 845 self.subprocess_list.add(process) 846 847 848def target_load(self, load): 849 """ 850 Verify that the publication is valid. 851 """ 852 if "tgt" not in load or "jid" not in load or "fun" not in load or "arg" not in load: 853 return False 854 # Verify that the publication applies to this minion 855 856 # It's important to note that the master does some pre-processing 857 # to determine which minions to send a request to. So for example, 858 # a "salt -G 'grain_key:grain_val' test.ping" will invoke some 859 # pre-processing on the master and this minion should not see the 860 # publication if the master does not determine that it should. 861 if "tgt_type" in load: 862 match_func = self.matchers.get("{}_match.match".format(load["tgt_type"]), None) 863 if match_func is None: 864 return False 865 if load["tgt_type"] in ("grain", "grain_pcre", "pillar"): 866 delimiter = load.get("delimiter", DEFAULT_TARGET_DELIM) 867 if not match_func(load["tgt"], delimiter=delimiter): 868 return False 869 elif not match_func(load["tgt"]): 870 return False 871 else: 872 if not self.matchers["glob_match.match"](load["tgt"]): 873 return False 874 875 return True 876 877 878# Main Minion Tune In 879def tune_in(self, start=True): 880 """ 881 Lock onto the publisher. This is the main event loop for the minion 882 :rtype : None 883 """ 884 super(ProxyMinion, self).tune_in(start=start) 885