2Functions which implement running reactor jobs
4import fnmatch
5import glob
6import logging
7import os
9import salt.client
10import salt.defaults.exitcodes
11import salt.runner
12import salt.state
13import salt.utils.args
14import salt.utils.cache
15import salt.utils.data
16import salt.utils.event
17import salt.utils.files
18import salt.utils.master
19import salt.utils.process
20import salt.utils.yaml
21import salt.wheel
23log = logging.getLogger(__name__)
26    ["__id__", "__sls__", "name", "order", "fun", "key", "state"]
30class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
31    """
32    Read in the reactor configuration variable and compare it to events
33    processed on the master.
34    The reactor has the capability to execute pre-programmed executions
35    as reactions to events
36    """
38    aliases = {
39        "cmd": "local",
40    }
42    def __init__(self, opts, **kwargs):
43        super().__init__(**kwargs)
44        local_minion_opts = opts.copy()
45        local_minion_opts["file_client"] = "local"
46        self.minion = salt.minion.MasterMinion(local_minion_opts)
47        salt.state.Compiler.__init__(self, opts, self.minion.rend)
48        self.is_leader = True
50    def render_reaction(self, glob_ref, tag, data):
51        """
52        Execute the render system against a single reaction file and return
53        the data structure
54        """
55        react = {}
57        if glob_ref.startswith("salt://"):
58            glob_ref = self.minion.functions["cp.cache_file"](glob_ref) or ""
59        globbed_ref = glob.glob(glob_ref)
60        if not globbed_ref:
61            log.error(
62                "Can not render SLS %s for tag %s. File missing or not found.",
63                glob_ref,
64                tag,
65            )
66        for fn_ in globbed_ref:
67            try:
68                res = self.render_template(fn_, tag=tag, data=data)
70                # for #20841, inject the sls name here since verify_high()
71                # assumes it exists in case there are any errors
72                for name in res:
73                    res[name]["__sls__"] = fn_
75                react.update(res)
76            except Exception:  # pylint: disable=broad-except
77                log.exception('Failed to render "%s": ', fn_)
78        return react
80    def list_reactors(self, tag):
81        """
82        Take in the tag from an event and return a list of the reactors to
83        process
84        """
85        log.debug("Gathering reactors for tag %s", tag)
86        reactors = []
87        if isinstance(self.opts["reactor"], str):
88            try:
89                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
90                    react_map = salt.utils.yaml.safe_load(fp_)
91            except OSError:
92                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
93            except Exception:  # pylint: disable=broad-except
94                log.error(
95                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
96                )
97        else:
98            react_map = self.opts["reactor"]
99        for ropt in react_map:
100            if not isinstance(ropt, dict):
101                continue
102            if len(ropt) != 1:
103                continue
104            key = next(iter(ropt.keys()))
105            val = ropt[key]
106            if fnmatch.fnmatch(tag, key):
107                if isinstance(val, str):
108                    reactors.append(val)
109                elif isinstance(val, list):
110                    reactors.extend(val)
111        return reactors
113    def list_all(self):
114        """
115        Return a list of the reactors
116        """
117        if isinstance(self.minion.opts["reactor"], str):
118            log.debug("Reading reactors from yaml %s", self.opts["reactor"])
119            try:
120                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
121                    react_map = salt.utils.yaml.safe_load(fp_)
122            except OSError:
123                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
124            except Exception:  # pylint: disable=broad-except
125                log.error(
126                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
127                )
128        else:
129            log.debug("Not reading reactors from yaml")
130            react_map = self.minion.opts["reactor"]
131        return react_map
133    def add_reactor(self, tag, reaction):
134        """
135        Add a reactor
136        """
137        reactors = self.list_all()
138        for reactor in reactors:
139            _tag = next(iter(reactor.keys()))
140            if _tag == tag:
141                return {"status": False, "comment": "Reactor already exists."}
143        self.minion.opts["reactor"].append({tag: reaction})
144        return {"status": True, "comment": "Reactor added."}
146    def delete_reactor(self, tag):
147        """
148        Delete a reactor
149        """
150        reactors = self.list_all()
151        for reactor in reactors:
152            _tag = next(iter(reactor.keys()))
153            if _tag == tag:
154                self.minion.opts["reactor"].remove(reactor)
155                return {"status": True, "comment": "Reactor deleted."}
157        return {"status": False, "comment": "Reactor does not exists."}
159    def resolve_aliases(self, chunks):
160        """
161        Preserve backward compatibility by rewriting the 'state' key in the low
162        chunks if it is using a legacy type.
163        """
164        for idx, _ in enumerate(chunks):
165            new_state = self.aliases.get(chunks[idx]["state"])
166            if new_state is not None:
167                chunks[idx]["state"] = new_state
169    def reactions(self, tag, data, reactors):
170        """
171        Render a list of reactor files and returns a reaction struct
172        """
173        log.debug("Compiling reactions for tag %s", tag)
174        high = {}
175        chunks = []
176        try:
177            for fn_ in reactors:
178                high.update(self.render_reaction(fn_, tag, data))
179            if high:
180                errors = self.verify_high(high)
181                if errors:
182                    log.error(
183                        "Unable to render reactions for event %s due to "
184                        "errors (%s) in one or more of the sls files (%s)",
185                        tag,
186                        errors,
187                        reactors,
188                    )
189                    return []  # We'll return nothing since there was an error
190                chunks = self.order_chunks(self.compile_high_data(high))
191        except Exception as exc:  # pylint: disable=broad-except
192            log.exception("Exception encountered while compiling reactions")
194        self.resolve_aliases(chunks)
195        return chunks
197    def call_reactions(self, chunks):
198        """
199        Execute the reaction state
200        """
201        for chunk in chunks:
202            self.wrap.run(chunk)
204    def run(self):
205        """
206        Enter into the server loop
207        """
208        salt.utils.process.appendproctitle(self.__class__.__name__)
210        if self.opts["reactor_niceness"] and not salt.utils.platform.is_windows():
211            log.info("Reactor setting niceness to %i", self.opts["reactor_niceness"])
212            os.nice(self.opts["reactor_niceness"])
214        # instantiate some classes inside our new process
215        with salt.utils.event.get_event(
216            self.opts["__role"],
217            self.opts["sock_dir"],
218            self.opts["transport"],
219            opts=self.opts,
220            listen=True,
221        ) as event:
222            self.wrap = ReactWrap(self.opts)
224            for data in event.iter_events(full=True):
225                # skip all events fired by ourselves
226                if data["data"].get("user") == self.wrap.event_user:
227                    continue
229                # NOTE: these events must contain the masters key in order to be accepted
230                # see salt.runners.reactor for the requesting interface
231                if "salt/reactors/manage" in data["tag"]:
232                    master_key = salt.utils.master.get_master_key("root", self.opts)
233                    if data["data"].get("key") != master_key:
234                        log.error(
235                            "received salt/reactors/manage event without matching"
236                            " master_key. discarding"
237                        )
238                        continue
239                if data["tag"].endswith("salt/reactors/manage/is_leader"):
240                    event.fire_event(
241                        {"result": self.is_leader}, "salt/reactors/manage/leader/value"
242                    )
243                if data["tag"].endswith("salt/reactors/manage/set_leader"):
244                    # we only want to register events from the local master
245                    if data["data"].get("id") == self.opts["id"]:
246                        self.is_leader = data["data"]["value"]
247                    event.fire_event(
248                        {"result": self.is_leader}, "salt/reactors/manage/leader/value"
249                    )
250                if data["tag"].endswith("salt/reactors/manage/add"):
251                    _data = data["data"]
252                    res = self.add_reactor(_data["event"], _data["reactors"])
253                    event.fire_event(
254                        {"reactors": self.list_all(), "result": res},
255                        "salt/reactors/manage/add-complete",
256                    )
257                elif data["tag"].endswith("salt/reactors/manage/delete"):
258                    _data = data["data"]
259                    res = self.delete_reactor(_data["event"])
260                    event.fire_event(
261                        {"reactors": self.list_all(), "result": res},
262                        "salt/reactors/manage/delete-complete",
263                    )
264                elif data["tag"].endswith("salt/reactors/manage/list"):
265                    event.fire_event(
266                        {"reactors": self.list_all()},
267                        "salt/reactors/manage/list-results",
268                    )
269                else:
270                    # do not handle any reactions if not leader in cluster
271                    if not self.is_leader:
272                        continue
273                    else:
274                        reactors = self.list_reactors(data["tag"])
275                        if not reactors:
276                            continue
277                        chunks = self.reactions(data["tag"], data["data"], reactors)
278                        if chunks:
279                            try:
280                                self.call_reactions(chunks)
281                            except SystemExit:
282                                log.warning("Exit ignored by reactor")
285class ReactWrap:
286    """
287    Wrapper that executes low data for the Reactor System
288    """
290    # class-wide cache of clients
291    client_cache = None
292    event_user = "Reactor"
294    reaction_class = {
295        "local": salt.client.LocalClient,
296        "runner": salt.runner.RunnerClient,
297        "wheel": salt.wheel.Wheel,
298        "caller": salt.client.Caller,
299    }
301    def __init__(self, opts):
302        self.opts = opts
303        if ReactWrap.client_cache is None:
304            ReactWrap.client_cache = salt.utils.cache.CacheDict(
305                opts["reactor_refresh_interval"]
306            )
308        self.pool = salt.utils.process.ThreadPool(
309            self.opts["reactor_worker_threads"],  # number of workers for runner/wheel
310            queue_size=self.opts["reactor_worker_hwm"],  # queue size for those workers
311        )
313    def populate_client_cache(self, low):
314        """
315        Populate the client cache with an instance of the specified type
316        """
317        reaction_type = low["state"]
318        if reaction_type not in self.client_cache:
319            log.debug("Reactor is populating %s client cache", reaction_type)
320            if reaction_type in ("runner", "wheel"):
321                # Reaction types that run locally on the master want the full
322                # opts passed.
323                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
324                    self.opts
325                )
326                # The len() function will cause the module functions to load if
327                # they aren't already loaded. We want to load them so that the
328                # spawned threads don't need to load them. Loading in the
329                # spawned threads creates race conditions such as sometimes not
330                # finding the required function because another thread is in
331                # the middle of loading the functions.
332                len(self.client_cache[reaction_type].functions)
333            else:
334                # Reactions which use remote pubs only need the conf file when
335                # instantiating a client instance.
336                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
337                    self.opts["conf_file"]
338                )
340    def run(self, low):
341        """
342        Execute a reaction by invoking the proper wrapper func
343        """
344        self.populate_client_cache(low)
345        try:
346            l_fun = getattr(self, low["state"])
347        except AttributeError:
348            log.error("ReactWrap is missing a wrapper function for '%s'", low["state"])
350        try:
351            wrap_call = salt.utils.args.format_call(l_fun, low)
352            args = wrap_call.get("args", ())
353            kwargs = wrap_call.get("kwargs", {})
354            # TODO: Setting user doesn't seem to work for actual remote pubs
355            if low["state"] in ("runner", "wheel"):
356                # Update called function's low data with event user to
357                # segregate events fired by reactor and avoid reaction loops
358                kwargs["__user__"] = self.event_user
359                # Replace ``state`` kwarg which comes from high data compiler.
360                # It breaks some runner functions and seems unnecessary.
361                kwargs["__state__"] = kwargs.pop("state")
362                # NOTE: if any additional keys are added here, they will also
363                # need to be added to filter_kwargs()
365            if "args" in kwargs:
366                # New configuration
367                reactor_args = kwargs.pop("args")
368                for item in ("arg", "kwarg"):
369                    if item in low:
370                        log.warning(
371                            "Reactor '%s' is ignoring '%s' param %s due to "
372                            "presence of 'args' param. Check the Reactor System "
373                            "documentation for the correct argument format.",
374                            low["__id__"],
375                            item,
376                            low[item],
377                        )
378                if (
379                    low["state"] == "caller"
380                    and isinstance(reactor_args, list)
381                    and not salt.utils.data.is_dictlist(reactor_args)
382                ):
383                    # Legacy 'caller' reactors were already using the 'args'
384                    # param, but only supported a list of positional arguments.
385                    # If low['args'] is a list but is *not* a dictlist, then
386                    # this is actually using the legacy configuration. So, put
387                    # the reactor args into kwarg['arg'] so that the wrapper
388                    # interprets them as positional args.
389                    kwargs["arg"] = reactor_args
390                    kwargs["kwarg"] = {}
391                else:
392                    kwargs["arg"] = ()
393                    kwargs["kwarg"] = reactor_args
394                if not isinstance(kwargs["kwarg"], dict):
395                    kwargs["kwarg"] = salt.utils.data.repack_dictlist(kwargs["kwarg"])
396                    if not kwargs["kwarg"]:
397                        log.error(
398                            "Reactor '%s' failed to execute %s '%s': "
399                            "Incorrect argument format, check the Reactor System "
400                            "documentation for the correct format.",
401                            low["__id__"],
402                            low["state"],
403                            low["fun"],
404                        )
405                        return
406            else:
407                # Legacy configuration
408                react_call = {}
409                if low["state"] in ("runner", "wheel"):
410                    if "arg" not in kwargs or "kwarg" not in kwargs:
411                        # Runner/wheel execute on the master, so we can use
412                        # format_call to get the functions args/kwargs
413                        react_fun = self.client_cache[low["state"]].functions.get(
414                            low["fun"]
415                        )
416                        if react_fun is None:
417                            log.error(
418                                "Reactor '%s' failed to execute %s '%s': "
419                                "function not available",
420                                low["__id__"],
421                                low["state"],
422                                low["fun"],
423                            )
424                            return
426                        react_call = salt.utils.args.format_call(
427                            react_fun, low, expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
428                        )
430                if "arg" not in kwargs:
431                    kwargs["arg"] = react_call.get("args", ())
432                if "kwarg" not in kwargs:
433                    kwargs["kwarg"] = react_call.get("kwargs", {})
435            # Execute the wrapper with the proper args/kwargs. kwargs['arg']
436            # and kwargs['kwarg'] contain the positional and keyword arguments
437            # that will be passed to the client interface to execute the
438            # desired runner/wheel/remote-exec/etc. function.
439            ret = l_fun(*args, **kwargs)
441            if ret is False:
442                log.error(
443                    "Reactor '%s' failed  to execute %s '%s': "
444                    "TaskPool queue is full!"
445                    "Consider tuning reactor_worker_threads and/or"
446                    " reactor_worker_hwm",
447                    low["__id__"],
448                    low["state"],
449                    low["fun"],
450                )
452        except SystemExit:
453            log.warning("Reactor '%s' attempted to exit. Ignored.", low["__id__"])
454        except Exception:  # pylint: disable=broad-except
455            log.error(
456                "Reactor '%s' failed to execute %s '%s'",
457                low["__id__"],
458                low["state"],
459                low["fun"],
460                exc_info=True,
461            )
463    def runner(self, fun, **kwargs):
464        """
465        Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
466        """
467        return self.pool.fire_async(self.client_cache["runner"].low, args=(fun, kwargs))
469    def wheel(self, fun, **kwargs):
470        """
471        Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
472        """
473        return self.pool.fire_async(self.client_cache["wheel"].low, args=(fun, kwargs))
475    def local(self, fun, tgt, **kwargs):
476        """
477        Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
478        """
479        self.client_cache["local"].cmd_async(tgt, fun, **kwargs)
481    def caller(self, fun, **kwargs):
482        """
483        Wrap LocalCaller to execute remote exec functions locally on the Minion
484        """
485        self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])