1#
2# Proxy minion metaproxy modules
3#
4
5import logging
6import os
7import signal
8import sys
9import threading
10import traceback
11import types
12
13# pylint: disable=3rd-party-module-not-gated
14import salt
15import salt.beacons
16import salt.cli.daemons
17import salt.client
18import salt.crypt
19import salt.defaults.exitcodes
20import salt.engines
21import salt.ext.tornado.gen  # pylint: disable=F0401
22import salt.ext.tornado.ioloop  # pylint: disable=F0401
23import salt.loader
24import salt.log.setup
25import salt.minion
26import salt.payload
27import salt.pillar
28import salt.serializers.msgpack
29import salt.syspaths
30import salt.utils.args
31import salt.utils.context
32import salt.utils.data
33import salt.utils.dictupdate
34import salt.utils.error
35import salt.utils.event
36import salt.utils.files
37import salt.utils.jid
38import salt.utils.minion
39import salt.utils.minions
40import salt.utils.network
41import salt.utils.platform
42import salt.utils.process
43import salt.utils.schedule
44import salt.utils.ssdp
45import salt.utils.user
46import salt.utils.zeromq
47from salt.defaults import DEFAULT_TARGET_DELIM
48from salt.exceptions import (
49    CommandExecutionError,
50    CommandNotFoundError,
51    SaltInvocationError,
52    SaltSystemExit,
53)
54from salt.minion import ProxyMinion
55from salt.utils.event import tagify
56from salt.utils.process import SignalHandlingProcess, default_signals
57
58log = logging.getLogger(__name__)
59
60
61def post_master_init(self, master):
62    """
63    Function to finish init after a proxy
64    minion has finished connecting to a master.
65
66    This is primarily loading modules, pillars, etc. (since they need
67    to know which master they connected to)
68    """
69
70    log.debug("subclassed LazyLoaded _post_master_init")
71    if self.connected:
72        self.opts["master"] = master
73
74        self.opts["pillar"] = yield salt.pillar.get_async_pillar(
75            self.opts,
76            self.opts["grains"],
77            self.opts["id"],
78            saltenv=self.opts["saltenv"],
79            pillarenv=self.opts.get("pillarenv"),
80        ).compile_pillar()
81
82    if "proxy" not in self.opts["pillar"] and "proxy" not in self.opts:
83        errmsg = (
84            "No proxy key found in pillar or opts for id "
85            + self.opts["id"]
86            + ". "
87            + "Check your pillar/opts configuration and contents.  Salt-proxy aborted."
88        )
89        log.error(errmsg)
90        self._running = False
91        raise SaltSystemExit(code=-1, msg=errmsg)
92
93    if "proxy" not in self.opts:
94        self.opts["proxy"] = self.opts["pillar"]["proxy"]
95
96    if self.opts.get("proxy_merge_pillar_in_opts"):
97        # Override proxy opts with pillar data when the user required.
98        self.opts = salt.utils.dictupdate.merge(
99            self.opts,
100            self.opts["pillar"],
101            strategy=self.opts.get("proxy_merge_pillar_in_opts_strategy"),
102            merge_lists=self.opts.get("proxy_deep_merge_pillar_in_opts", False),
103        )
104    elif self.opts.get("proxy_mines_pillar"):
105        # Even when not required, some details such as mine configuration
106        # should be merged anyway whenever possible.
107        if "mine_interval" in self.opts["pillar"]:
108            self.opts["mine_interval"] = self.opts["pillar"]["mine_interval"]
109        if "mine_functions" in self.opts["pillar"]:
110            general_proxy_mines = self.opts.get("mine_functions", {})
111            specific_proxy_mines = self.opts["pillar"]["mine_functions"]
112            try:
113                self.opts["mine_functions"] = general_proxy_mines + specific_proxy_mines
114            except TypeError as terr:
115                log.error(
116                    "Unable to merge mine functions from the pillar in the opts, for proxy %s",
117                    self.opts["id"],
118                )
119
120    fq_proxyname = self.opts["proxy"]["proxytype"]
121
122    # Need to load the modules so they get all the dunder variables
123    (
124        self.functions,
125        self.returners,
126        self.function_errors,
127        self.executors,
128    ) = self._load_modules()
129
130    # we can then sync any proxymodules down from the master
131    # we do a sync_all here in case proxy code was installed by
132    # SPM or was manually placed in /srv/salt/_modules etc.
133    self.functions["saltutil.sync_all"](saltenv=self.opts["saltenv"])
134
135    # Pull in the utils
136    self.utils = salt.loader.utils(self.opts)
137
138    # Then load the proxy module
139    self.proxy = salt.loader.proxy(self.opts, utils=self.utils)
140
141    # And re-load the modules so the __proxy__ variable gets injected
142    (
143        self.functions,
144        self.returners,
145        self.function_errors,
146        self.executors,
147    ) = self._load_modules()
148    self.functions.pack["__proxy__"] = self.proxy
149    self.proxy.pack["__salt__"] = self.functions
150    self.proxy.pack["__ret__"] = self.returners
151    self.proxy.pack["__pillar__"] = self.opts["pillar"]
152
153    # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
154    self.utils = salt.loader.utils(self.opts, proxy=self.proxy)
155    self.proxy.pack["__utils__"] = self.utils
156
157    # Reload all modules so all dunder variables are injected
158    self.proxy.reload_modules()
159
160    # Start engines here instead of in the Minion superclass __init__
161    # This is because we need to inject the __proxy__ variable but
162    # it is not setup until now.
163    self.io_loop.spawn_callback(
164        salt.engines.start_engines, self.opts, self.process_manager, proxy=self.proxy
165    )
166
167    if (
168        "{}.init".format(fq_proxyname) not in self.proxy
169        or "{}.shutdown".format(fq_proxyname) not in self.proxy
170    ):
171        errmsg = (
172            "Proxymodule {} is missing an init() or a shutdown() or both. ".format(
173                fq_proxyname
174            )
175            + "Check your proxymodule.  Salt-proxy aborted."
176        )
177        log.error(errmsg)
178        self._running = False
179        raise SaltSystemExit(code=-1, msg=errmsg)
180
181    self.module_executors = self.proxy.get(
182        "{}.module_executors".format(fq_proxyname), lambda: []
183    )()
184    proxy_init_fn = self.proxy[fq_proxyname + ".init"]
185    proxy_init_fn(self.opts)
186
187    self.opts["grains"] = salt.loader.grains(self.opts, proxy=self.proxy)
188
189    self.mod_opts = self._prep_mod_opts()
190    self.matchers = salt.loader.matchers(self.opts)
191    self.beacons = salt.beacons.Beacon(self.opts, self.functions)
192    uid = salt.utils.user.get_uid(user=self.opts.get("user", None))
193    self.proc_dir = salt.minion.get_proc_dir(self.opts["cachedir"], uid=uid)
194
195    if self.connected and self.opts["pillar"]:
196        # The pillar has changed due to the connection to the master.
197        # Reload the functions so that they can use the new pillar data.
198        (
199            self.functions,
200            self.returners,
201            self.function_errors,
202            self.executors,
203        ) = self._load_modules()
204        if hasattr(self, "schedule"):
205            self.schedule.functions = self.functions
206            self.schedule.returners = self.returners
207
208    if not hasattr(self, "schedule"):
209        self.schedule = salt.utils.schedule.Schedule(
210            self.opts,
211            self.functions,
212            self.returners,
213            cleanup=[salt.minion.master_event(type="alive")],
214            proxy=self.proxy,
215        )
216
217    # add default scheduling jobs to the minions scheduler
218    if self.opts["mine_enabled"] and "mine.update" in self.functions:
219        self.schedule.add_job(
220            {
221                "__mine_interval": {
222                    "function": "mine.update",
223                    "minutes": self.opts["mine_interval"],
224                    "jid_include": True,
225                    "maxrunning": 2,
226                    "run_on_start": True,
227                    "return_job": self.opts.get("mine_return_job", False),
228                }
229            },
230            persist=True,
231        )
232        log.info("Added mine.update to scheduler")
233    else:
234        self.schedule.delete_job("__mine_interval", persist=True)
235
236    # add master_alive job if enabled
237    if self.opts["transport"] != "tcp" and self.opts["master_alive_interval"] > 0:
238        self.schedule.add_job(
239            {
240                salt.minion.master_event(type="alive", master=self.opts["master"]): {
241                    "function": "status.master",
242                    "seconds": self.opts["master_alive_interval"],
243                    "jid_include": True,
244                    "maxrunning": 1,
245                    "return_job": False,
246                    "kwargs": {"master": self.opts["master"], "connected": True},
247                }
248            },
249            persist=True,
250        )
251        if (
252            self.opts["master_failback"]
253            and "master_list" in self.opts
254            and self.opts["master"] != self.opts["master_list"][0]
255        ):
256            self.schedule.add_job(
257                {
258                    salt.minion.master_event(type="failback"): {
259                        "function": "status.ping_master",
260                        "seconds": self.opts["master_failback_interval"],
261                        "jid_include": True,
262                        "maxrunning": 1,
263                        "return_job": False,
264                        "kwargs": {"master": self.opts["master_list"][0]},
265                    }
266                },
267                persist=True,
268            )
269        else:
270            self.schedule.delete_job(
271                salt.minion.master_event(type="failback"), persist=True
272            )
273    else:
274        self.schedule.delete_job(
275            salt.minion.master_event(type="alive", master=self.opts["master"]),
276            persist=True,
277        )
278        self.schedule.delete_job(
279            salt.minion.master_event(type="failback"), persist=True
280        )
281
282    # proxy keepalive
283    proxy_alive_fn = fq_proxyname + ".alive"
284    if (
285        proxy_alive_fn in self.proxy
286        and "status.proxy_reconnect" in self.functions
287        and self.opts.get("proxy_keep_alive", True)
288    ):
289        # if `proxy_keep_alive` is either not specified, either set to False does not retry reconnecting
290        self.schedule.add_job(
291            {
292                "__proxy_keepalive": {
293                    "function": "status.proxy_reconnect",
294                    "minutes": self.opts.get(
295                        "proxy_keep_alive_interval", 1
296                    ),  # by default, check once per minute
297                    "jid_include": True,
298                    "maxrunning": 1,
299                    "return_job": False,
300                    "kwargs": {"proxy_name": fq_proxyname},
301                }
302            },
303            persist=True,
304        )
305        self.schedule.enable_schedule()
306    else:
307        self.schedule.delete_job("__proxy_keepalive", persist=True)
308
309    #  Sync the grains here so the proxy can communicate them to the master
310    self.functions["saltutil.sync_grains"](saltenv="base")
311    self.grains_cache = self.opts["grains"]
312    self.ready = True
313
314
315def target(cls, minion_instance, opts, data, connected):
316    """
317    Handle targeting of the minion.
318
319    Calling _thread_multi_return or _thread_return
320    depending on a single or multiple commands.
321    """
322    if not minion_instance:
323        minion_instance = cls(opts)
324        minion_instance.connected = connected
325        if not hasattr(minion_instance, "functions"):
326            # Need to load the modules so they get all the dunder variables
327            (
328                functions,
329                returners,
330                function_errors,
331                executors,
332            ) = minion_instance._load_modules(grains=opts["grains"])
333            minion_instance.functions = functions
334            minion_instance.returners = returners
335            minion_instance.function_errors = function_errors
336            minion_instance.executors = executors
337
338            # Pull in the utils
339            minion_instance.utils = salt.loader.utils(minion_instance.opts)
340
341            # Then load the proxy module
342            minion_instance.proxy = salt.loader.proxy(
343                minion_instance.opts, utils=minion_instance.utils
344            )
345
346            # And re-load the modules so the __proxy__ variable gets injected
347            (
348                functions,
349                returners,
350                function_errors,
351                executors,
352            ) = minion_instance._load_modules(grains=opts["grains"])
353            minion_instance.functions = functions
354            minion_instance.returners = returners
355            minion_instance.function_errors = function_errors
356            minion_instance.executors = executors
357
358            minion_instance.functions.pack["__proxy__"] = minion_instance.proxy
359            minion_instance.proxy.pack["__salt__"] = minion_instance.functions
360            minion_instance.proxy.pack["__ret__"] = minion_instance.returners
361            minion_instance.proxy.pack["__pillar__"] = minion_instance.opts["pillar"]
362
363            # Reload utils as well (chicken and egg, __utils__ needs __proxy__ and __proxy__ needs __utils__
364            minion_instance.utils = salt.loader.utils(
365                minion_instance.opts, proxy=minion_instance.proxy
366            )
367            minion_instance.proxy.pack["__utils__"] = minion_instance.utils
368
369            # Reload all modules so all dunder variables are injected
370            minion_instance.proxy.reload_modules()
371
372            fq_proxyname = opts["proxy"]["proxytype"]
373
374            minion_instance.module_executors = minion_instance.proxy.get(
375                "{}.module_executors".format(fq_proxyname), lambda: []
376            )()
377
378            proxy_init_fn = minion_instance.proxy[fq_proxyname + ".init"]
379            proxy_init_fn(opts)
380        if not hasattr(minion_instance, "proc_dir"):
381            uid = salt.utils.user.get_uid(user=opts.get("user", None))
382            minion_instance.proc_dir = salt.minion.get_proc_dir(
383                opts["cachedir"], uid=uid
384            )
385
386    with salt.ext.tornado.stack_context.StackContext(minion_instance.ctx):
387        if isinstance(data["fun"], tuple) or isinstance(data["fun"], list):
388            ProxyMinion._thread_multi_return(minion_instance, opts, data)
389        else:
390            ProxyMinion._thread_return(minion_instance, opts, data)
391
392
393def thread_return(cls, minion_instance, opts, data):
394    """
395    This method should be used as a threading target, start the actual
396    minion side execution.
397    """
398    fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
399
400    salt.utils.process.appendproctitle(
401        "{}._thread_return {}".format(cls.__name__, data["jid"])
402    )
403
404    sdata = {"pid": os.getpid()}
405    sdata.update(data)
406    log.info("Starting a new job with PID %s", sdata["pid"])
407    with salt.utils.files.fopen(fn_, "w+b") as fp_:
408        fp_.write(salt.payload.dumps(sdata))
409    ret = {"success": False}
410    function_name = data["fun"]
411    executors = (
412        data.get("module_executors")
413        or getattr(minion_instance, "module_executors", [])
414        or opts.get("module_executors", ["direct_call"])
415    )
416    allow_missing_funcs = any(
417        [
418            minion_instance.executors["{}.allow_missing_func".format(executor)](
419                function_name
420            )
421            for executor in executors
422            if "{}.allow_missing_func".format(executor) in minion_instance.executors
423        ]
424    )
425    if function_name in minion_instance.functions or allow_missing_funcs is True:
426        try:
427            minion_blackout_violation = False
428            if minion_instance.connected and minion_instance.opts["pillar"].get(
429                "minion_blackout", False
430            ):
431                whitelist = minion_instance.opts["pillar"].get(
432                    "minion_blackout_whitelist", []
433                )
434                # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
435                if (
436                    function_name != "saltutil.refresh_pillar"
437                    and function_name not in whitelist
438                ):
439                    minion_blackout_violation = True
440            # use minion_blackout_whitelist from grains if it exists
441            if minion_instance.opts["grains"].get("minion_blackout", False):
442                whitelist = minion_instance.opts["grains"].get(
443                    "minion_blackout_whitelist", []
444                )
445                if (
446                    function_name != "saltutil.refresh_pillar"
447                    and function_name not in whitelist
448                ):
449                    minion_blackout_violation = True
450            if minion_blackout_violation:
451                raise SaltInvocationError(
452                    "Minion in blackout mode. Set 'minion_blackout' "
453                    "to False in pillar or grains to resume operations. Only "
454                    "saltutil.refresh_pillar allowed in blackout mode."
455                )
456
457            if function_name in minion_instance.functions:
458                func = minion_instance.functions[function_name]
459                args, kwargs = salt.minion.load_args_and_kwargs(func, data["arg"], data)
460            else:
461                # only run if function_name is not in minion_instance.functions and allow_missing_funcs is True
462                func = function_name
463                args, kwargs = data["arg"], data
464            minion_instance.functions.pack["__context__"]["retcode"] = 0
465            if isinstance(executors, str):
466                executors = [executors]
467            elif not isinstance(executors, list) or not executors:
468                raise SaltInvocationError(
469                    "Wrong executors specification: {}. String or non-empty list"
470                    " expected".format(executors)
471                )
472            if opts.get("sudo_user", "") and executors[-1] != "sudo":
473                executors[-1] = "sudo"  # replace the last one with sudo
474            log.trace("Executors list %s", executors)  # pylint: disable=no-member
475
476            for name in executors:
477                fname = "{}.execute".format(name)
478                if fname not in minion_instance.executors:
479                    raise SaltInvocationError(
480                        "Executor '{}' is not available".format(name)
481                    )
482                return_data = minion_instance.executors[fname](
483                    opts, data, func, args, kwargs
484                )
485                if return_data is not None:
486                    break
487
488            if isinstance(return_data, types.GeneratorType):
489                ind = 0
490                iret = {}
491                for single in return_data:
492                    if isinstance(single, dict) and isinstance(iret, dict):
493                        iret.update(single)
494                    else:
495                        if not iret:
496                            iret = []
497                        iret.append(single)
498                    tag = tagify([data["jid"], "prog", opts["id"], str(ind)], "job")
499                    event_data = {"return": single}
500                    minion_instance._fire_master(event_data, tag)
501                    ind += 1
502                ret["return"] = iret
503            else:
504                ret["return"] = return_data
505
506            retcode = minion_instance.functions.pack["__context__"].get(
507                "retcode", salt.defaults.exitcodes.EX_OK
508            )
509            if retcode == salt.defaults.exitcodes.EX_OK:
510                # No nonzero retcode in __context__ dunder. Check if return
511                # is a dictionary with a "result" or "success" key.
512                try:
513                    func_result = all(
514                        return_data.get(x, True) for x in ("result", "success")
515                    )
516                except Exception:  # pylint: disable=broad-except
517                    # return data is not a dict
518                    func_result = True
519                if not func_result:
520                    retcode = salt.defaults.exitcodes.EX_GENERIC
521
522            ret["retcode"] = retcode
523            ret["success"] = retcode == salt.defaults.exitcodes.EX_OK
524        except CommandNotFoundError as exc:
525            msg = "Command required for '{}' not found".format(function_name)
526            log.debug(msg, exc_info=True)
527            ret["return"] = "{}: {}".format(msg, exc)
528            ret["out"] = "nested"
529            ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
530        except CommandExecutionError as exc:
531            log.error(
532                "A command in '%s' had a problem: %s",
533                function_name,
534                exc,
535                exc_info_on_loglevel=logging.DEBUG,
536            )
537            ret["return"] = "ERROR: {}".format(exc)
538            ret["out"] = "nested"
539            ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
540        except SaltInvocationError as exc:
541            log.error(
542                "Problem executing '%s': %s",
543                function_name,
544                exc,
545                exc_info_on_loglevel=logging.DEBUG,
546            )
547            ret["return"] = "ERROR executing '{}': {}".format(function_name, exc)
548            ret["out"] = "nested"
549            ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
550        except TypeError as exc:
551            msg = "Passed invalid arguments to {}: {}\n{}".format(
552                function_name, exc, func.__doc__ or ""
553            )
554            log.warning(msg, exc_info_on_loglevel=logging.DEBUG)
555            ret["return"] = msg
556            ret["out"] = "nested"
557            ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
558        except Exception:  # pylint: disable=broad-except
559            msg = "The minion function caused an exception"
560            log.warning(msg, exc_info=True)
561            salt.utils.error.fire_exception(
562                salt.exceptions.MinionError(msg), opts, job=data
563            )
564            ret["return"] = "{}: {}".format(msg, traceback.format_exc())
565            ret["out"] = "nested"
566            ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
567    else:
568        docs = minion_instance.functions["sys.doc"]("{}*".format(function_name))
569        if docs:
570            docs[function_name] = minion_instance.functions.missing_fun_string(
571                function_name
572            )
573            ret["return"] = docs
574        else:
575            ret["return"] = minion_instance.functions.missing_fun_string(function_name)
576            mod_name = function_name.split(".")[0]
577            if mod_name in minion_instance.function_errors:
578                ret["return"] += " Possible reasons: '{}'".format(
579                    minion_instance.function_errors[mod_name]
580                )
581        ret["success"] = False
582        ret["retcode"] = salt.defaults.exitcodes.EX_GENERIC
583        ret["out"] = "nested"
584
585    ret["jid"] = data["jid"]
586    ret["fun"] = data["fun"]
587    ret["fun_args"] = data["arg"]
588    if "master_id" in data:
589        ret["master_id"] = data["master_id"]
590    if "metadata" in data:
591        if isinstance(data["metadata"], dict):
592            ret["metadata"] = data["metadata"]
593        else:
594            log.warning("The metadata parameter must be a dictionary. Ignoring.")
595    if minion_instance.connected:
596        minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer())
597
598    # Add default returners from minion config
599    # Should have been coverted to comma-delimited string already
600    if isinstance(opts.get("return"), str):
601        if data["ret"]:
602            data["ret"] = ",".join((data["ret"], opts["return"]))
603        else:
604            data["ret"] = opts["return"]
605
606    log.debug("minion return: %s", ret)
607    # TODO: make a list? Seems odd to split it this late :/
608    if data["ret"] and isinstance(data["ret"], str):
609        if "ret_config" in data:
610            ret["ret_config"] = data["ret_config"]
611        if "ret_kwargs" in data:
612            ret["ret_kwargs"] = data["ret_kwargs"]
613        ret["id"] = opts["id"]
614        for returner in set(data["ret"].split(",")):
615            try:
616                returner_str = "{}.returner".format(returner)
617                if returner_str in minion_instance.returners:
618                    minion_instance.returners[returner_str](ret)
619                else:
620                    returner_err = minion_instance.returners.missing_fun_string(
621                        returner_str
622                    )
623                    log.error(
624                        "Returner %s could not be loaded: %s",
625                        returner_str,
626                        returner_err,
627                    )
628            except Exception as exc:  # pylint: disable=broad-except
629                log.exception("The return failed for job %s: %s", data["jid"], exc)
630
631
632def thread_multi_return(cls, minion_instance, opts, data):
633    """
634    This method should be used as a threading target, start the actual
635    minion side execution.
636    """
637    fn_ = os.path.join(minion_instance.proc_dir, data["jid"])
638
639    salt.utils.process.appendproctitle(
640        "{}._thread_multi_return {}".format(cls.__name__, data["jid"])
641    )
642
643    sdata = {"pid": os.getpid()}
644    sdata.update(data)
645    log.info("Starting a new job with PID %s", sdata["pid"])
646    with salt.utils.files.fopen(fn_, "w+b") as fp_:
647        fp_.write(salt.payload.dumps(sdata))
648
649    multifunc_ordered = opts.get("multifunc_ordered", False)
650    num_funcs = len(data["fun"])
651    if multifunc_ordered:
652        ret = {
653            "return": [None] * num_funcs,
654            "retcode": [None] * num_funcs,
655            "success": [False] * num_funcs,
656        }
657    else:
658        ret = {"return": {}, "retcode": {}, "success": {}}
659
660    for ind in range(0, num_funcs):
661        if not multifunc_ordered:
662            ret["success"][data["fun"][ind]] = False
663        try:
664            minion_blackout_violation = False
665            if minion_instance.connected and minion_instance.opts["pillar"].get(
666                "minion_blackout", False
667            ):
668                whitelist = minion_instance.opts["pillar"].get(
669                    "minion_blackout_whitelist", []
670                )
671                # this minion is blacked out. Only allow saltutil.refresh_pillar and the whitelist
672                if (
673                    data["fun"][ind] != "saltutil.refresh_pillar"
674                    and data["fun"][ind] not in whitelist
675                ):
676                    minion_blackout_violation = True
677            elif minion_instance.opts["grains"].get("minion_blackout", False):
678                whitelist = minion_instance.opts["grains"].get(
679                    "minion_blackout_whitelist", []
680                )
681                if (
682                    data["fun"][ind] != "saltutil.refresh_pillar"
683                    and data["fun"][ind] not in whitelist
684                ):
685                    minion_blackout_violation = True
686            if minion_blackout_violation:
687                raise SaltInvocationError(
688                    "Minion in blackout mode. Set 'minion_blackout' "
689                    "to False in pillar or grains to resume operations. Only "
690                    "saltutil.refresh_pillar allowed in blackout mode."
691                )
692
693            func = minion_instance.functions[data["fun"][ind]]
694
695            args, kwargs = salt.minion.load_args_and_kwargs(
696                func, data["arg"][ind], data
697            )
698            minion_instance.functions.pack["__context__"]["retcode"] = 0
699            key = ind if multifunc_ordered else data["fun"][ind]
700            ret["return"][key] = func(*args, **kwargs)
701            retcode = minion_instance.functions.pack["__context__"].get("retcode", 0)
702            if retcode == 0:
703                # No nonzero retcode in __context__ dunder. Check if return
704                # is a dictionary with a "result" or "success" key.
705                try:
706                    func_result = all(
707                        ret["return"][key].get(x, True) for x in ("result", "success")
708                    )
709                except Exception:  # pylint: disable=broad-except
710                    # return data is not a dict
711                    func_result = True
712                if not func_result:
713                    retcode = 1
714
715            ret["retcode"][key] = retcode
716            ret["success"][key] = retcode == 0
717        except Exception as exc:  # pylint: disable=broad-except
718            trb = traceback.format_exc()
719            log.warning("The minion function caused an exception: %s", exc)
720            if multifunc_ordered:
721                ret["return"][ind] = trb
722            else:
723                ret["return"][data["fun"][ind]] = trb
724        ret["jid"] = data["jid"]
725        ret["fun"] = data["fun"]
726        ret["fun_args"] = data["arg"]
727    if "metadata" in data:
728        ret["metadata"] = data["metadata"]
729    if minion_instance.connected:
730        minion_instance._return_pub(ret, timeout=minion_instance._return_retry_timer())
731    if data["ret"]:
732        if "ret_config" in data:
733            ret["ret_config"] = data["ret_config"]
734        if "ret_kwargs" in data:
735            ret["ret_kwargs"] = data["ret_kwargs"]
736        for returner in set(data["ret"].split(",")):
737            ret["id"] = opts["id"]
738            try:
739                minion_instance.returners["{}.returner".format(returner)](ret)
740            except Exception as exc:  # pylint: disable=broad-except
741                log.error("The return failed for job %s: %s", data["jid"], exc)
742
743
744def handle_payload(self, payload):
745    """
746    Verify the publication and then pass
747    the payload along to _handle_decoded_payload.
748    """
749    if payload is not None and payload["enc"] == "aes":
750        if self._target_load(payload["load"]):
751
752            self._handle_decoded_payload(payload["load"])
753        elif self.opts["zmq_filtering"]:
754            # In the filtering enabled case, we'd like to know when minion sees something it shouldnt
755            log.trace(
756                "Broadcast message received not for this minion, Load: %s",
757                payload["load"],
758            )
759    # If it's not AES, and thus has not been verified, we do nothing.
760    # In the future, we could add support for some clearfuncs, but
761    # the minion currently has no need.
762
763
764def handle_decoded_payload(self, data):
765    """
766    Override this method if you wish to handle the decoded data
767    differently.
768    """
769    # Ensure payload is unicode. Disregard failure to decode binary blobs.
770    if "user" in data:
771        log.info(
772            "User %s Executing command %s with jid %s",
773            data["user"],
774            data["fun"],
775            data["jid"],
776        )
777    else:
778        log.info("Executing command %s with jid %s", data["fun"], data["jid"])
779    log.debug("Command details %s", data)
780
781    # Don't duplicate jobs
782    log.trace("Started JIDs: %s", self.jid_queue)
783    if self.jid_queue is not None:
784        if data["jid"] in self.jid_queue:
785            return
786        else:
787            self.jid_queue.append(data["jid"])
788            if len(self.jid_queue) > self.opts["minion_jid_queue_hwm"]:
789                self.jid_queue.pop(0)
790
791    if isinstance(data["fun"], str):
792        if data["fun"] == "sys.reload_modules":
793            (
794                self.functions,
795                self.returners,
796                self.function_errors,
797                self.executors,
798            ) = self._load_modules()
799            self.schedule.functions = self.functions
800            self.schedule.returners = self.returners
801
802    process_count_max = self.opts.get("process_count_max")
803    if process_count_max > 0:
804        process_count = len(salt.utils.minion.running(self.opts))
805        while process_count >= process_count_max:
806            log.warning(
807                "Maximum number of processes reached while executing jid %s, waiting...",
808                data["jid"],
809            )
810            yield salt.ext.tornado.gen.sleep(10)
811            process_count = len(salt.utils.minion.running(self.opts))
812
813    # We stash an instance references to allow for the socket
814    # communication in Windows. You can't pickle functions, and thus
815    # python needs to be able to reconstruct the reference on the other
816    # side.
817    instance = self
818    multiprocessing_enabled = self.opts.get("multiprocessing", True)
819    if multiprocessing_enabled:
820        if sys.platform.startswith("win"):
821            # let python reconstruct the minion on the other side if we're
822            # running on windows
823            instance = None
824        with default_signals(signal.SIGINT, signal.SIGTERM):
825            process = SignalHandlingProcess(
826                target=self._target,
827                name="ProcessPayload",
828                args=(instance, self.opts, data, self.connected),
829            )
830    else:
831        process = threading.Thread(
832            target=self._target,
833            args=(instance, self.opts, data, self.connected),
834            name=data["jid"],
835        )
836
837    if multiprocessing_enabled:
838        with default_signals(signal.SIGINT, signal.SIGTERM):
839            # Reset current signals before starting the process in
840            # order not to inherit the current signal handlers
841            process.start()
842    else:
843        process.start()
844    process.name = "{}-Job-{}".format(process.name, data["jid"])
845    self.subprocess_list.add(process)
846
847
848def target_load(self, load):
849    """
850    Verify that the publication is valid.
851    """
852    if "tgt" not in load or "jid" not in load or "fun" not in load or "arg" not in load:
853        return False
854    # Verify that the publication applies to this minion
855
856    # It's important to note that the master does some pre-processing
857    # to determine which minions to send a request to. So for example,
858    # a "salt -G 'grain_key:grain_val' test.ping" will invoke some
859    # pre-processing on the master and this minion should not see the
860    # publication if the master does not determine that it should.
861    if "tgt_type" in load:
862        match_func = self.matchers.get("{}_match.match".format(load["tgt_type"]), None)
863        if match_func is None:
864            return False
865        if load["tgt_type"] in ("grain", "grain_pcre", "pillar"):
866            delimiter = load.get("delimiter", DEFAULT_TARGET_DELIM)
867            if not match_func(load["tgt"], delimiter=delimiter):
868                return False
869        elif not match_func(load["tgt"]):
870            return False
871    else:
872        if not self.matchers["glob_match.match"](load["tgt"]):
873            return False
874
875    return True
876
877
878# Main Minion Tune In
879def tune_in(self, start=True):
880    """
881    Lock onto the publisher. This is the main event loop for the minion
882    :rtype : None
883    """
884    super(ProxyMinion, self).tune_in(start=start)
885