1"""
2Functions which implement running reactor jobs
3"""
4import fnmatch
5import glob
6import logging
7import os
8
9import salt.client
10import salt.defaults.exitcodes
11import salt.runner
12import salt.state
13import salt.utils.args
14import salt.utils.cache
15import salt.utils.data
16import salt.utils.event
17import salt.utils.files
18import salt.utils.master
19import salt.utils.process
20import salt.utils.yaml
21import salt.wheel
22
23log = logging.getLogger(__name__)
24
25REACTOR_INTERNAL_KEYWORDS = frozenset(
26    ["__id__", "__sls__", "name", "order", "fun", "key", "state"]
27)
28
29
30class Reactor(salt.utils.process.SignalHandlingProcess, salt.state.Compiler):
31    """
32    Read in the reactor configuration variable and compare it to events
33    processed on the master.
34    The reactor has the capability to execute pre-programmed executions
35    as reactions to events
36    """
37
38    aliases = {
39        "cmd": "local",
40    }
41
42    def __init__(self, opts, **kwargs):
43        super().__init__(**kwargs)
44        local_minion_opts = opts.copy()
45        local_minion_opts["file_client"] = "local"
46        self.minion = salt.minion.MasterMinion(local_minion_opts)
47        salt.state.Compiler.__init__(self, opts, self.minion.rend)
48        self.is_leader = True
49
50    def render_reaction(self, glob_ref, tag, data):
51        """
52        Execute the render system against a single reaction file and return
53        the data structure
54        """
55        react = {}
56
57        if glob_ref.startswith("salt://"):
58            glob_ref = self.minion.functions["cp.cache_file"](glob_ref) or ""
59        globbed_ref = glob.glob(glob_ref)
60        if not globbed_ref:
61            log.error(
62                "Can not render SLS %s for tag %s. File missing or not found.",
63                glob_ref,
64                tag,
65            )
66        for fn_ in globbed_ref:
67            try:
68                res = self.render_template(fn_, tag=tag, data=data)
69
70                # for #20841, inject the sls name here since verify_high()
71                # assumes it exists in case there are any errors
72                for name in res:
73                    res[name]["__sls__"] = fn_
74
75                react.update(res)
76            except Exception:  # pylint: disable=broad-except
77                log.exception('Failed to render "%s": ', fn_)
78        return react
79
80    def list_reactors(self, tag):
81        """
82        Take in the tag from an event and return a list of the reactors to
83        process
84        """
85        log.debug("Gathering reactors for tag %s", tag)
86        reactors = []
87        if isinstance(self.opts["reactor"], str):
88            try:
89                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
90                    react_map = salt.utils.yaml.safe_load(fp_)
91            except OSError:
92                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
93            except Exception:  # pylint: disable=broad-except
94                log.error(
95                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
96                )
97        else:
98            react_map = self.opts["reactor"]
99        for ropt in react_map:
100            if not isinstance(ropt, dict):
101                continue
102            if len(ropt) != 1:
103                continue
104            key = next(iter(ropt.keys()))
105            val = ropt[key]
106            if fnmatch.fnmatch(tag, key):
107                if isinstance(val, str):
108                    reactors.append(val)
109                elif isinstance(val, list):
110                    reactors.extend(val)
111        return reactors
112
113    def list_all(self):
114        """
115        Return a list of the reactors
116        """
117        if isinstance(self.minion.opts["reactor"], str):
118            log.debug("Reading reactors from yaml %s", self.opts["reactor"])
119            try:
120                with salt.utils.files.fopen(self.opts["reactor"]) as fp_:
121                    react_map = salt.utils.yaml.safe_load(fp_)
122            except OSError:
123                log.error('Failed to read reactor map: "%s"', self.opts["reactor"])
124            except Exception:  # pylint: disable=broad-except
125                log.error(
126                    'Failed to parse YAML in reactor map: "%s"', self.opts["reactor"]
127                )
128        else:
129            log.debug("Not reading reactors from yaml")
130            react_map = self.minion.opts["reactor"]
131        return react_map
132
133    def add_reactor(self, tag, reaction):
134        """
135        Add a reactor
136        """
137        reactors = self.list_all()
138        for reactor in reactors:
139            _tag = next(iter(reactor.keys()))
140            if _tag == tag:
141                return {"status": False, "comment": "Reactor already exists."}
142
143        self.minion.opts["reactor"].append({tag: reaction})
144        return {"status": True, "comment": "Reactor added."}
145
146    def delete_reactor(self, tag):
147        """
148        Delete a reactor
149        """
150        reactors = self.list_all()
151        for reactor in reactors:
152            _tag = next(iter(reactor.keys()))
153            if _tag == tag:
154                self.minion.opts["reactor"].remove(reactor)
155                return {"status": True, "comment": "Reactor deleted."}
156
157        return {"status": False, "comment": "Reactor does not exists."}
158
159    def resolve_aliases(self, chunks):
160        """
161        Preserve backward compatibility by rewriting the 'state' key in the low
162        chunks if it is using a legacy type.
163        """
164        for idx, _ in enumerate(chunks):
165            new_state = self.aliases.get(chunks[idx]["state"])
166            if new_state is not None:
167                chunks[idx]["state"] = new_state
168
169    def reactions(self, tag, data, reactors):
170        """
171        Render a list of reactor files and returns a reaction struct
172        """
173        log.debug("Compiling reactions for tag %s", tag)
174        high = {}
175        chunks = []
176        try:
177            for fn_ in reactors:
178                high.update(self.render_reaction(fn_, tag, data))
179            if high:
180                errors = self.verify_high(high)
181                if errors:
182                    log.error(
183                        "Unable to render reactions for event %s due to "
184                        "errors (%s) in one or more of the sls files (%s)",
185                        tag,
186                        errors,
187                        reactors,
188                    )
189                    return []  # We'll return nothing since there was an error
190                chunks = self.order_chunks(self.compile_high_data(high))
191        except Exception as exc:  # pylint: disable=broad-except
192            log.exception("Exception encountered while compiling reactions")
193
194        self.resolve_aliases(chunks)
195        return chunks
196
197    def call_reactions(self, chunks):
198        """
199        Execute the reaction state
200        """
201        for chunk in chunks:
202            self.wrap.run(chunk)
203
204    def run(self):
205        """
206        Enter into the server loop
207        """
208        salt.utils.process.appendproctitle(self.__class__.__name__)
209
210        if self.opts["reactor_niceness"] and not salt.utils.platform.is_windows():
211            log.info("Reactor setting niceness to %i", self.opts["reactor_niceness"])
212            os.nice(self.opts["reactor_niceness"])
213
214        # instantiate some classes inside our new process
215        with salt.utils.event.get_event(
216            self.opts["__role"],
217            self.opts["sock_dir"],
218            self.opts["transport"],
219            opts=self.opts,
220            listen=True,
221        ) as event:
222            self.wrap = ReactWrap(self.opts)
223
224            for data in event.iter_events(full=True):
225                # skip all events fired by ourselves
226                if data["data"].get("user") == self.wrap.event_user:
227                    continue
228
229                # NOTE: these events must contain the masters key in order to be accepted
230                # see salt.runners.reactor for the requesting interface
231                if "salt/reactors/manage" in data["tag"]:
232                    master_key = salt.utils.master.get_master_key("root", self.opts)
233                    if data["data"].get("key") != master_key:
234                        log.error(
235                            "received salt/reactors/manage event without matching"
236                            " master_key. discarding"
237                        )
238                        continue
239                if data["tag"].endswith("salt/reactors/manage/is_leader"):
240                    event.fire_event(
241                        {"result": self.is_leader}, "salt/reactors/manage/leader/value"
242                    )
243                if data["tag"].endswith("salt/reactors/manage/set_leader"):
244                    # we only want to register events from the local master
245                    if data["data"].get("id") == self.opts["id"]:
246                        self.is_leader = data["data"]["value"]
247                    event.fire_event(
248                        {"result": self.is_leader}, "salt/reactors/manage/leader/value"
249                    )
250                if data["tag"].endswith("salt/reactors/manage/add"):
251                    _data = data["data"]
252                    res = self.add_reactor(_data["event"], _data["reactors"])
253                    event.fire_event(
254                        {"reactors": self.list_all(), "result": res},
255                        "salt/reactors/manage/add-complete",
256                    )
257                elif data["tag"].endswith("salt/reactors/manage/delete"):
258                    _data = data["data"]
259                    res = self.delete_reactor(_data["event"])
260                    event.fire_event(
261                        {"reactors": self.list_all(), "result": res},
262                        "salt/reactors/manage/delete-complete",
263                    )
264                elif data["tag"].endswith("salt/reactors/manage/list"):
265                    event.fire_event(
266                        {"reactors": self.list_all()},
267                        "salt/reactors/manage/list-results",
268                    )
269                else:
270                    # do not handle any reactions if not leader in cluster
271                    if not self.is_leader:
272                        continue
273                    else:
274                        reactors = self.list_reactors(data["tag"])
275                        if not reactors:
276                            continue
277                        chunks = self.reactions(data["tag"], data["data"], reactors)
278                        if chunks:
279                            try:
280                                self.call_reactions(chunks)
281                            except SystemExit:
282                                log.warning("Exit ignored by reactor")
283
284
285class ReactWrap:
286    """
287    Wrapper that executes low data for the Reactor System
288    """
289
290    # class-wide cache of clients
291    client_cache = None
292    event_user = "Reactor"
293
294    reaction_class = {
295        "local": salt.client.LocalClient,
296        "runner": salt.runner.RunnerClient,
297        "wheel": salt.wheel.Wheel,
298        "caller": salt.client.Caller,
299    }
300
301    def __init__(self, opts):
302        self.opts = opts
303        if ReactWrap.client_cache is None:
304            ReactWrap.client_cache = salt.utils.cache.CacheDict(
305                opts["reactor_refresh_interval"]
306            )
307
308        self.pool = salt.utils.process.ThreadPool(
309            self.opts["reactor_worker_threads"],  # number of workers for runner/wheel
310            queue_size=self.opts["reactor_worker_hwm"],  # queue size for those workers
311        )
312
313    def populate_client_cache(self, low):
314        """
315        Populate the client cache with an instance of the specified type
316        """
317        reaction_type = low["state"]
318        if reaction_type not in self.client_cache:
319            log.debug("Reactor is populating %s client cache", reaction_type)
320            if reaction_type in ("runner", "wheel"):
321                # Reaction types that run locally on the master want the full
322                # opts passed.
323                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
324                    self.opts
325                )
326                # The len() function will cause the module functions to load if
327                # they aren't already loaded. We want to load them so that the
328                # spawned threads don't need to load them. Loading in the
329                # spawned threads creates race conditions such as sometimes not
330                # finding the required function because another thread is in
331                # the middle of loading the functions.
332                len(self.client_cache[reaction_type].functions)
333            else:
334                # Reactions which use remote pubs only need the conf file when
335                # instantiating a client instance.
336                self.client_cache[reaction_type] = self.reaction_class[reaction_type](
337                    self.opts["conf_file"]
338                )
339
340    def run(self, low):
341        """
342        Execute a reaction by invoking the proper wrapper func
343        """
344        self.populate_client_cache(low)
345        try:
346            l_fun = getattr(self, low["state"])
347        except AttributeError:
348            log.error("ReactWrap is missing a wrapper function for '%s'", low["state"])
349
350        try:
351            wrap_call = salt.utils.args.format_call(l_fun, low)
352            args = wrap_call.get("args", ())
353            kwargs = wrap_call.get("kwargs", {})
354            # TODO: Setting user doesn't seem to work for actual remote pubs
355            if low["state"] in ("runner", "wheel"):
356                # Update called function's low data with event user to
357                # segregate events fired by reactor and avoid reaction loops
358                kwargs["__user__"] = self.event_user
359                # Replace ``state`` kwarg which comes from high data compiler.
360                # It breaks some runner functions and seems unnecessary.
361                kwargs["__state__"] = kwargs.pop("state")
362                # NOTE: if any additional keys are added here, they will also
363                # need to be added to filter_kwargs()
364
365            if "args" in kwargs:
366                # New configuration
367                reactor_args = kwargs.pop("args")
368                for item in ("arg", "kwarg"):
369                    if item in low:
370                        log.warning(
371                            "Reactor '%s' is ignoring '%s' param %s due to "
372                            "presence of 'args' param. Check the Reactor System "
373                            "documentation for the correct argument format.",
374                            low["__id__"],
375                            item,
376                            low[item],
377                        )
378                if (
379                    low["state"] == "caller"
380                    and isinstance(reactor_args, list)
381                    and not salt.utils.data.is_dictlist(reactor_args)
382                ):
383                    # Legacy 'caller' reactors were already using the 'args'
384                    # param, but only supported a list of positional arguments.
385                    # If low['args'] is a list but is *not* a dictlist, then
386                    # this is actually using the legacy configuration. So, put
387                    # the reactor args into kwarg['arg'] so that the wrapper
388                    # interprets them as positional args.
389                    kwargs["arg"] = reactor_args
390                    kwargs["kwarg"] = {}
391                else:
392                    kwargs["arg"] = ()
393                    kwargs["kwarg"] = reactor_args
394                if not isinstance(kwargs["kwarg"], dict):
395                    kwargs["kwarg"] = salt.utils.data.repack_dictlist(kwargs["kwarg"])
396                    if not kwargs["kwarg"]:
397                        log.error(
398                            "Reactor '%s' failed to execute %s '%s': "
399                            "Incorrect argument format, check the Reactor System "
400                            "documentation for the correct format.",
401                            low["__id__"],
402                            low["state"],
403                            low["fun"],
404                        )
405                        return
406            else:
407                # Legacy configuration
408                react_call = {}
409                if low["state"] in ("runner", "wheel"):
410                    if "arg" not in kwargs or "kwarg" not in kwargs:
411                        # Runner/wheel execute on the master, so we can use
412                        # format_call to get the functions args/kwargs
413                        react_fun = self.client_cache[low["state"]].functions.get(
414                            low["fun"]
415                        )
416                        if react_fun is None:
417                            log.error(
418                                "Reactor '%s' failed to execute %s '%s': "
419                                "function not available",
420                                low["__id__"],
421                                low["state"],
422                                low["fun"],
423                            )
424                            return
425
426                        react_call = salt.utils.args.format_call(
427                            react_fun, low, expected_extra_kws=REACTOR_INTERNAL_KEYWORDS
428                        )
429
430                if "arg" not in kwargs:
431                    kwargs["arg"] = react_call.get("args", ())
432                if "kwarg" not in kwargs:
433                    kwargs["kwarg"] = react_call.get("kwargs", {})
434
435            # Execute the wrapper with the proper args/kwargs. kwargs['arg']
436            # and kwargs['kwarg'] contain the positional and keyword arguments
437            # that will be passed to the client interface to execute the
438            # desired runner/wheel/remote-exec/etc. function.
439            ret = l_fun(*args, **kwargs)
440
441            if ret is False:
442                log.error(
443                    "Reactor '%s' failed  to execute %s '%s': "
444                    "TaskPool queue is full!"
445                    "Consider tuning reactor_worker_threads and/or"
446                    " reactor_worker_hwm",
447                    low["__id__"],
448                    low["state"],
449                    low["fun"],
450                )
451
452        except SystemExit:
453            log.warning("Reactor '%s' attempted to exit. Ignored.", low["__id__"])
454        except Exception:  # pylint: disable=broad-except
455            log.error(
456                "Reactor '%s' failed to execute %s '%s'",
457                low["__id__"],
458                low["state"],
459                low["fun"],
460                exc_info=True,
461            )
462
463    def runner(self, fun, **kwargs):
464        """
465        Wrap RunnerClient for executing :ref:`runner modules <all-salt.runners>`
466        """
467        return self.pool.fire_async(self.client_cache["runner"].low, args=(fun, kwargs))
468
469    def wheel(self, fun, **kwargs):
470        """
471        Wrap Wheel to enable executing :ref:`wheel modules <all-salt.wheel>`
472        """
473        return self.pool.fire_async(self.client_cache["wheel"].low, args=(fun, kwargs))
474
475    def local(self, fun, tgt, **kwargs):
476        """
477        Wrap LocalClient for running :ref:`execution modules <all-salt.modules>`
478        """
479        self.client_cache["local"].cmd_async(tgt, fun, **kwargs)
480
481    def caller(self, fun, **kwargs):
482        """
483        Wrap LocalCaller to execute remote exec functions locally on the Minion
484        """
485        self.client_cache["caller"].cmd(fun, *kwargs["arg"], **kwargs["kwarg"])
486