1"""
2Render the pillar data
3"""
4
5import collections
6import copy
7import fnmatch
8import logging
9import os
10import sys
11import traceback
12import uuid
13
14import salt.ext.tornado.gen
15import salt.fileclient
16import salt.loader
17import salt.minion
18import salt.transport.client
19import salt.utils.args
20import salt.utils.cache
21import salt.utils.crypt
22import salt.utils.data
23import salt.utils.dictupdate
24import salt.utils.url
25from salt.exceptions import SaltClientError
26from salt.template import compile_template
27
28# Even though dictupdate is imported, invoking salt.utils.dictupdate.merge here
29# causes an UnboundLocalError. This should be investigated and fixed, but until
30# then, leave the import directly below this comment intact.
31from salt.utils.dictupdate import merge
32from salt.utils.odict import OrderedDict
33from salt.version import __version__
34
35log = logging.getLogger(__name__)
36
37
38def get_pillar(
39    opts,
40    grains,
41    minion_id,
42    saltenv=None,
43    ext=None,
44    funcs=None,
45    pillar_override=None,
46    pillarenv=None,
47    extra_minion_data=None,
48):
49    """
50    Return the correct pillar driver based on the file_client option
51    """
52    # When file_client is 'local' this makes the minion masterless
53    # but sometimes we want the minion to read its files from the local
54    # filesystem instead of asking for them from the master, but still
55    # get commands from the master.
56    # To enable this functionality set file_client=local and
57    # use_master_when_local=True in the minion config.  Then here we override
58    # the file client to be 'remote' for getting pillar.  If we don't do this
59    # then the minion never sends the event that the master uses to update
60    # its minion_data_cache.  If the master doesn't update the minion_data_cache
61    # then the SSE salt-master plugin won't see any grains for those minions.
62    file_client = opts["file_client"]
63    if opts.get("master_type") == "disable" and file_client == "remote":
64        file_client = "local"
65    elif file_client == "local" and opts.get("use_master_when_local"):
66        file_client = "remote"
67
68    ptype = {"remote": RemotePillar, "local": Pillar}.get(file_client, Pillar)
69    # If local pillar and we're caching, run through the cache system first
70    log.debug("Determining pillar cache")
71    if opts["pillar_cache"]:
72        log.debug("get_pillar using pillar cache with ext: %s", ext)
73        return PillarCache(
74            opts,
75            grains,
76            minion_id,
77            saltenv,
78            ext=ext,
79            functions=funcs,
80            pillar_override=pillar_override,
81            pillarenv=pillarenv,
82        )
83    return ptype(
84        opts,
85        grains,
86        minion_id,
87        saltenv,
88        ext,
89        functions=funcs,
90        pillar_override=pillar_override,
91        pillarenv=pillarenv,
92        extra_minion_data=extra_minion_data,
93    )
94
95
96# TODO: migrate everyone to this one!
97def get_async_pillar(
98    opts,
99    grains,
100    minion_id,
101    saltenv=None,
102    ext=None,
103    funcs=None,
104    pillar_override=None,
105    pillarenv=None,
106    extra_minion_data=None,
107):
108    """
109    Return the correct pillar driver based on the file_client option
110    """
111    file_client = opts["file_client"]
112    if opts.get("master_type") == "disable" and file_client == "remote":
113        file_client = "local"
114    elif file_client == "local" and opts.get("use_master_when_local"):
115        file_client = "remote"
116    ptype = {"remote": AsyncRemotePillar, "local": AsyncPillar}.get(
117        file_client, AsyncPillar
118    )
119    return ptype(
120        opts,
121        grains,
122        minion_id,
123        saltenv,
124        ext,
125        functions=funcs,
126        pillar_override=pillar_override,
127        pillarenv=pillarenv,
128        extra_minion_data=extra_minion_data,
129    )
130
131
132class RemotePillarMixin:
133    """
134    Common remote pillar functionality
135    """
136
137    def get_ext_pillar_extra_minion_data(self, opts):
138        """
139        Returns the extra data from the minion's opts dict (the config file).
140
141        This data will be passed to external pillar functions.
142        """
143
144        def get_subconfig(opts_key):
145            """
146            Returns a dict containing the opts key subtree, while maintaining
147            the opts structure
148            """
149            ret_dict = aux_dict = {}
150            config_val = opts
151            subkeys = opts_key.split(":")
152            # Build an empty dict with the opts path
153            for subkey in subkeys[:-1]:
154                aux_dict[subkey] = {}
155                aux_dict = aux_dict[subkey]
156                if not config_val.get(subkey):
157                    # The subkey is not in the config
158                    return {}
159                config_val = config_val[subkey]
160            if subkeys[-1] not in config_val:
161                return {}
162            aux_dict[subkeys[-1]] = config_val[subkeys[-1]]
163            return ret_dict
164
165        extra_data = {}
166        if "pass_to_ext_pillars" in opts:
167            if not isinstance(opts["pass_to_ext_pillars"], list):
168                log.exception("'pass_to_ext_pillars' config is malformed.")
169                raise SaltClientError("'pass_to_ext_pillars' config is malformed.")
170            for key in opts["pass_to_ext_pillars"]:
171                salt.utils.dictupdate.update(
172                    extra_data,
173                    get_subconfig(key),
174                    recursive_update=True,
175                    merge_lists=True,
176                )
177        log.trace("ext_pillar_extra_data = %s", extra_data)
178        return extra_data
179
180
181class AsyncRemotePillar(RemotePillarMixin):
182    """
183    Get the pillar from the master
184    """
185
186    def __init__(
187        self,
188        opts,
189        grains,
190        minion_id,
191        saltenv,
192        ext=None,
193        functions=None,
194        pillar_override=None,
195        pillarenv=None,
196        extra_minion_data=None,
197    ):
198        self.opts = opts
199        self.opts["saltenv"] = saltenv
200        self.ext = ext
201        self.grains = grains
202        self.minion_id = minion_id
203        self.channel = salt.transport.client.AsyncReqChannel.factory(opts)
204        if pillarenv is not None:
205            self.opts["pillarenv"] = pillarenv
206        self.pillar_override = pillar_override or {}
207        if not isinstance(self.pillar_override, dict):
208            self.pillar_override = {}
209            log.error("Pillar data must be a dictionary")
210        self.extra_minion_data = extra_minion_data or {}
211        if not isinstance(self.extra_minion_data, dict):
212            self.extra_minion_data = {}
213            log.error("Extra minion data must be a dictionary")
214        salt.utils.dictupdate.update(
215            self.extra_minion_data,
216            self.get_ext_pillar_extra_minion_data(opts),
217            recursive_update=True,
218            merge_lists=True,
219        )
220        self._closing = False
221
222    @salt.ext.tornado.gen.coroutine
223    def compile_pillar(self):
224        """
225        Return a future which will contain the pillar data from the master
226        """
227        load = {
228            "id": self.minion_id,
229            "grains": self.grains,
230            "saltenv": self.opts["saltenv"],
231            "pillarenv": self.opts["pillarenv"],
232            "pillar_override": self.pillar_override,
233            "extra_minion_data": self.extra_minion_data,
234            "ver": "2",
235            "cmd": "_pillar",
236        }
237        if self.ext:
238            load["ext"] = self.ext
239        try:
240            ret_pillar = yield self.channel.crypted_transfer_decode_dictentry(
241                load,
242                dictkey="pillar",
243            )
244        except salt.crypt.AuthenticationError as exc:
245            log.error(exc.message)
246            raise SaltClientError("Exception getting pillar.")
247        except Exception:  # pylint: disable=broad-except
248            log.exception("Exception getting pillar:")
249            raise SaltClientError("Exception getting pillar.")
250
251        if not isinstance(ret_pillar, dict):
252            msg = "Got a bad pillar from master, type {}, expecting dict: {}".format(
253                type(ret_pillar).__name__, ret_pillar
254            )
255            log.error(msg)
256            # raise an exception! Pillar isn't empty, we can't sync it!
257            raise SaltClientError(msg)
258        raise salt.ext.tornado.gen.Return(ret_pillar)
259
260    def destroy(self):
261        if self._closing:
262            return
263
264        self._closing = True
265        self.channel.close()
266
267    # pylint: disable=W1701
268    def __del__(self):
269        self.destroy()
270
271    # pylint: enable=W1701
272
273
274class RemotePillar(RemotePillarMixin):
275    """
276    Get the pillar from the master
277    """
278
279    def __init__(
280        self,
281        opts,
282        grains,
283        minion_id,
284        saltenv,
285        ext=None,
286        functions=None,
287        pillar_override=None,
288        pillarenv=None,
289        extra_minion_data=None,
290    ):
291        self.opts = opts
292        self.opts["saltenv"] = saltenv
293        self.ext = ext
294        self.grains = grains
295        self.minion_id = minion_id
296        self.channel = salt.transport.client.ReqChannel.factory(opts)
297        if pillarenv is not None:
298            self.opts["pillarenv"] = pillarenv
299        self.pillar_override = pillar_override or {}
300        if not isinstance(self.pillar_override, dict):
301            self.pillar_override = {}
302            log.error("Pillar data must be a dictionary")
303        self.extra_minion_data = extra_minion_data or {}
304        if not isinstance(self.extra_minion_data, dict):
305            self.extra_minion_data = {}
306            log.error("Extra minion data must be a dictionary")
307        salt.utils.dictupdate.update(
308            self.extra_minion_data,
309            self.get_ext_pillar_extra_minion_data(opts),
310            recursive_update=True,
311            merge_lists=True,
312        )
313        self._closing = False
314
315    def compile_pillar(self):
316        """
317        Return the pillar data from the master
318        """
319        load = {
320            "id": self.minion_id,
321            "grains": self.grains,
322            "saltenv": self.opts["saltenv"],
323            "pillarenv": self.opts["pillarenv"],
324            "pillar_override": self.pillar_override,
325            "extra_minion_data": self.extra_minion_data,
326            "ver": "2",
327            "cmd": "_pillar",
328        }
329        if self.ext:
330            load["ext"] = self.ext
331        ret_pillar = self.channel.crypted_transfer_decode_dictentry(
332            load,
333            dictkey="pillar",
334        )
335
336        if not isinstance(ret_pillar, dict):
337            log.error(
338                "Got a bad pillar from master, type %s, expecting dict: %s",
339                type(ret_pillar).__name__,
340                ret_pillar,
341            )
342            return {}
343        return ret_pillar
344
345    def destroy(self):
346        if hasattr(self, "_closing") and self._closing:
347            return
348
349        self._closing = True
350        self.channel.close()
351
352    # pylint: disable=W1701
353    def __del__(self):
354        self.destroy()
355
356    # pylint: enable=W1701
357
358
359class PillarCache:
360    """
361    Return a cached pillar if it exists, otherwise cache it.
362
363    Pillar caches are structed in two diminensions: minion_id with a dict of
364    saltenvs. Each saltenv contains a pillar dict
365
366    Example data structure:
367
368    ```
369    {'minion_1':
370        {'base': {'pilar_key_1' 'pillar_val_1'}
371    }
372    """
373
374    # TODO ABC?
375    def __init__(
376        self,
377        opts,
378        grains,
379        minion_id,
380        saltenv,
381        ext=None,
382        functions=None,
383        pillar_override=None,
384        pillarenv=None,
385        extra_minion_data=None,
386    ):
387        # Yes, we need all of these because we need to route to the Pillar object
388        # if we have no cache. This is another refactor target.
389
390        # Go ahead and assign these because they may be needed later
391        self.opts = opts
392        self.grains = grains
393        self.minion_id = minion_id
394        self.ext = ext
395        self.functions = functions
396        self.pillar_override = pillar_override
397        self.pillarenv = pillarenv
398
399        if saltenv is None:
400            self.saltenv = "base"
401        else:
402            self.saltenv = saltenv
403
404        # Determine caching backend
405        self.cache = salt.utils.cache.CacheFactory.factory(
406            self.opts["pillar_cache_backend"],
407            self.opts["pillar_cache_ttl"],
408            minion_cache_path=self._minion_cache_path(minion_id),
409        )
410
411    def _minion_cache_path(self, minion_id):
412        """
413        Return the path to the cache file for the minion.
414
415        Used only for disk-based backends
416        """
417        return os.path.join(self.opts["cachedir"], "pillar_cache", minion_id)
418
419    def fetch_pillar(self):
420        """
421        In the event of a cache miss, we need to incur the overhead of caching
422        a new pillar.
423        """
424        log.debug("Pillar cache getting external pillar with ext: %s", self.ext)
425        fresh_pillar = Pillar(
426            self.opts,
427            self.grains,
428            self.minion_id,
429            self.saltenv,
430            ext=self.ext,
431            functions=self.functions,
432            pillar_override=self.pillar_override,
433            pillarenv=self.pillarenv,
434        )
435        return fresh_pillar.compile_pillar()
436
437    def clear_pillar(self):
438        """
439        Clear the cache
440        """
441        self.cache.clear()
442
443        return True
444
445    def compile_pillar(self, *args, **kwargs):  # Will likely just be pillar_dirs
446        log.debug(
447            "Scanning pillar cache for information about minion %s and pillarenv %s",
448            self.minion_id,
449            self.pillarenv,
450        )
451        if self.opts["pillar_cache_backend"] == "memory":
452            cache_dict = self.cache
453        else:
454            cache_dict = self.cache._dict
455
456        log.debug("Scanning cache: %s", cache_dict)
457        # Check the cache!
458        if self.minion_id in self.cache:  # Keyed by minion_id
459            # TODO Compare grains, etc?
460            if self.pillarenv in self.cache[self.minion_id]:
461                # We have a cache hit! Send it back.
462                log.debug(
463                    "Pillar cache hit for minion %s and pillarenv %s",
464                    self.minion_id,
465                    self.pillarenv,
466                )
467                return self.cache[self.minion_id][self.pillarenv]
468            else:
469                # We found the minion but not the env. Store it.
470                fresh_pillar = self.fetch_pillar()
471
472                minion_cache = self.cache[self.minion_id]
473                minion_cache[self.pillarenv] = fresh_pillar
474                self.cache[self.minion_id] = minion_cache
475
476                log.debug(
477                    "Pillar cache miss for pillarenv %s for minion %s",
478                    self.pillarenv,
479                    self.minion_id,
480                )
481                return fresh_pillar
482        else:
483            # We haven't seen this minion yet in the cache. Store it.
484            fresh_pillar = self.fetch_pillar()
485            self.cache[self.minion_id] = {self.pillarenv: fresh_pillar}
486            log.debug("Pillar cache miss for minion %s", self.minion_id)
487            log.debug("Current pillar cache: %s", cache_dict)  # FIXME hack!
488            return fresh_pillar
489
490
491class Pillar:
492    """
493    Read over the pillar top files and render the pillar data
494    """
495
496    def __init__(
497        self,
498        opts,
499        grains,
500        minion_id,
501        saltenv,
502        ext=None,
503        functions=None,
504        pillar_override=None,
505        pillarenv=None,
506        extra_minion_data=None,
507    ):
508        self.minion_id = minion_id
509        self.ext = ext
510        if pillarenv is None:
511            if opts.get("pillarenv_from_saltenv", False):
512                opts["pillarenv"] = saltenv
513        # use the local file client
514        self.opts = self.__gen_opts(opts, grains, saltenv=saltenv, pillarenv=pillarenv)
515        self.saltenv = saltenv
516        self.client = salt.fileclient.get_file_client(self.opts, True)
517        self.avail = self.__gather_avail()
518
519        if opts.get("file_client", "") == "local" and not opts.get(
520            "use_master_when_local", False
521        ):
522            opts["grains"] = grains
523
524        # if we didn't pass in functions, lets load them
525        if functions is None:
526            utils = salt.loader.utils(opts)
527            if opts.get("file_client", "") == "local":
528                self.functions = salt.loader.minion_mods(opts, utils=utils)
529            else:
530                self.functions = salt.loader.minion_mods(self.opts, utils=utils)
531        else:
532            self.functions = functions
533
534        self.opts["minion_id"] = minion_id
535        self.matchers = salt.loader.matchers(self.opts)
536        self.rend = salt.loader.render(self.opts, self.functions)
537        ext_pillar_opts = copy.deepcopy(self.opts)
538        # Keep the incoming opts ID intact, ie, the master id
539        if "id" in opts:
540            ext_pillar_opts["id"] = opts["id"]
541        self.merge_strategy = "smart"
542        if opts.get("pillar_source_merging_strategy"):
543            self.merge_strategy = opts["pillar_source_merging_strategy"]
544
545        self.ext_pillars = salt.loader.pillars(ext_pillar_opts, self.functions)
546        self.ignored_pillars = {}
547        self.pillar_override = pillar_override or {}
548        if not isinstance(self.pillar_override, dict):
549            self.pillar_override = {}
550            log.error("Pillar data must be a dictionary")
551        self.extra_minion_data = extra_minion_data or {}
552        if not isinstance(self.extra_minion_data, dict):
553            self.extra_minion_data = {}
554            log.error("Extra minion data must be a dictionary")
555        self._closing = False
556
557    def __valid_on_demand_ext_pillar(self, opts):
558        """
559        Check to see if the on demand external pillar is allowed
560        """
561        if not isinstance(self.ext, dict):
562            log.error("On-demand pillar %s is not formatted as a dictionary", self.ext)
563            return False
564
565        on_demand = opts.get("on_demand_ext_pillar", [])
566        try:
567            invalid_on_demand = {x for x in self.ext if x not in on_demand}
568        except TypeError:
569            # Prevent traceback when on_demand_ext_pillar option is malformed
570            log.error(
571                "The 'on_demand_ext_pillar' configuration option is "
572                "malformed, it should be a list of ext_pillar module names"
573            )
574            return False
575
576        if invalid_on_demand:
577            log.error(
578                "The following ext_pillar modules are not allowed for "
579                "on-demand pillar data: %s. Valid on-demand ext_pillar "
580                "modules are: %s. The valid modules can be adjusted by "
581                "setting the 'on_demand_ext_pillar' config option.",
582                ", ".join(sorted(invalid_on_demand)),
583                ", ".join(on_demand),
584            )
585            return False
586        return True
587
588    def __gather_avail(self):
589        """
590        Gather the lists of available sls data from the master
591        """
592        avail = {}
593        for saltenv in self._get_envs():
594            avail[saltenv] = self.client.list_states(saltenv)
595        return avail
596
597    def __gen_opts(self, opts_in, grains, saltenv=None, ext=None, pillarenv=None):
598        """
599        The options need to be altered to conform to the file client
600        """
601        opts = copy.deepcopy(opts_in)
602        opts["file_client"] = "local"
603        if not grains:
604            opts["grains"] = {}
605        else:
606            opts["grains"] = grains
607        # Allow minion/CLI saltenv/pillarenv to take precedence over master
608        opts["saltenv"] = saltenv if saltenv is not None else opts.get("saltenv")
609        opts["pillarenv"] = (
610            pillarenv if pillarenv is not None else opts.get("pillarenv")
611        )
612        opts["id"] = self.minion_id
613        if opts["state_top"].startswith("salt://"):
614            opts["state_top"] = opts["state_top"]
615        elif opts["state_top"].startswith("/"):
616            opts["state_top"] = salt.utils.url.create(opts["state_top"][1:])
617        else:
618            opts["state_top"] = salt.utils.url.create(opts["state_top"])
619        if self.ext and self.__valid_on_demand_ext_pillar(opts):
620            if "ext_pillar" in opts:
621                opts["ext_pillar"].append(self.ext)
622            else:
623                opts["ext_pillar"] = [self.ext]
624        if "__env__" in opts["pillar_roots"]:
625            env = opts.get("pillarenv") or opts.get("saltenv") or "base"
626            if env not in opts["pillar_roots"]:
627                log.debug(
628                    "pillar environment '%s' maps to __env__ pillar_roots directory",
629                    env,
630                )
631                opts["pillar_roots"][env] = opts["pillar_roots"].pop("__env__")
632            else:
633                log.debug(
634                    "pillar_roots __env__ ignored (environment '%s' found in pillar_roots)",
635                    env,
636                )
637                opts["pillar_roots"].pop("__env__")
638        return opts
639
640    def _get_envs(self):
641        """
642        Pull the file server environments out of the master options
643        """
644        envs = ["base"]
645        if "pillar_roots" in self.opts:
646            envs.extend([x for x in list(self.opts["pillar_roots"]) if x not in envs])
647        return envs
648
649    def get_tops(self):
650        """
651        Gather the top files
652        """
653        tops = collections.defaultdict(list)
654        include = collections.defaultdict(list)
655        done = collections.defaultdict(list)
656        errors = []
657        # Gather initial top files
658        try:
659            saltenvs = set()
660            if self.opts["pillarenv"]:
661                # If the specified pillarenv is not present in the available
662                # pillar environments, do not cache the pillar top file.
663                if self.opts["pillarenv"] not in self.opts["pillar_roots"]:
664                    log.debug(
665                        "pillarenv '%s' not found in the configured pillar "
666                        "environments (%s)",
667                        self.opts["pillarenv"],
668                        ", ".join(self.opts["pillar_roots"]),
669                    )
670                else:
671                    saltenvs.add(self.opts["pillarenv"])
672            else:
673                saltenvs = self._get_envs()
674                if self.opts.get("pillar_source_merging_strategy", None) == "none":
675                    saltenvs &= {self.saltenv or "base"}
676
677            for saltenv in saltenvs:
678                top = self.client.cache_file(self.opts["state_top"], saltenv)
679                if top:
680                    tops[saltenv].append(
681                        compile_template(
682                            top,
683                            self.rend,
684                            self.opts["renderer"],
685                            self.opts["renderer_blacklist"],
686                            self.opts["renderer_whitelist"],
687                            saltenv=saltenv,
688                            _pillar_rend=True,
689                        )
690                    )
691        except Exception as exc:  # pylint: disable=broad-except
692            errors.append(
693                "Rendering Primary Top file failed, render error:\n{}".format(exc)
694            )
695            log.exception("Pillar rendering failed for minion %s", self.minion_id)
696
697        # Search initial top files for includes
698        for saltenv, ctops in tops.items():
699            for ctop in ctops:
700                if "include" not in ctop:
701                    continue
702                for sls in ctop["include"]:
703                    include[saltenv].append(sls)
704                ctop.pop("include")
705        # Go through the includes and pull out the extra tops and add them
706        while include:
707            pops = []
708            for saltenv, states in include.items():
709                pops.append(saltenv)
710                if not states:
711                    continue
712                for sls in states:
713                    if sls in done[saltenv]:
714                        continue
715                    try:
716                        tops[saltenv].append(
717                            compile_template(
718                                self.client.get_state(sls, saltenv).get("dest", False),
719                                self.rend,
720                                self.opts["renderer"],
721                                self.opts["renderer_blacklist"],
722                                self.opts["renderer_whitelist"],
723                                saltenv=saltenv,
724                                _pillar_rend=True,
725                            )
726                        )
727                    except Exception as exc:  # pylint: disable=broad-except
728                        errors.append(
729                            "Rendering Top file {} failed, render error:\n{}".format(
730                                sls, exc
731                            )
732                        )
733                    done[saltenv].append(sls)
734            for saltenv in pops:
735                if saltenv in include:
736                    include.pop(saltenv)
737
738        return tops, errors
739
740    def merge_tops(self, tops):
741        """
742        Cleanly merge the top files
743        """
744        top = collections.defaultdict(OrderedDict)
745        orders = collections.defaultdict(OrderedDict)
746        for ctops in tops.values():
747            for ctop in ctops:
748                for saltenv, targets in ctop.items():
749                    if saltenv == "include":
750                        continue
751                    for tgt in targets:
752                        matches = []
753                        states = OrderedDict()
754                        orders[saltenv][tgt] = 0
755                        ignore_missing = False
756                        for comp in ctop[saltenv][tgt]:
757                            if isinstance(comp, dict):
758                                if "match" in comp:
759                                    matches.append(comp)
760                                if "order" in comp:
761                                    order = comp["order"]
762                                    if not isinstance(order, int):
763                                        try:
764                                            order = int(order)
765                                        except ValueError:
766                                            order = 0
767                                    orders[saltenv][tgt] = order
768                                if comp.get("ignore_missing", False):
769                                    ignore_missing = True
770                            if isinstance(comp, str):
771                                states[comp] = True
772                        if ignore_missing:
773                            if saltenv not in self.ignored_pillars:
774                                self.ignored_pillars[saltenv] = []
775                            self.ignored_pillars[saltenv].extend(states.keys())
776                        top[saltenv][tgt] = matches
777                        top[saltenv][tgt].extend(states)
778        return self.sort_top_targets(top, orders)
779
780    def sort_top_targets(self, top, orders):
781        """
782        Returns the sorted high data from the merged top files
783        """
784        sorted_top = collections.defaultdict(OrderedDict)
785        # pylint: disable=cell-var-from-loop
786        for saltenv, targets in top.items():
787            sorted_targets = sorted(targets, key=lambda target: orders[saltenv][target])
788            for target in sorted_targets:
789                sorted_top[saltenv][target] = targets[target]
790        # pylint: enable=cell-var-from-loop
791        return sorted_top
792
793    def get_top(self):
794        """
795        Returns the high data derived from the top file
796        """
797        tops, errors = self.get_tops()
798        try:
799            merged_tops = self.merge_tops(tops)
800        except TypeError as err:
801            merged_tops = OrderedDict()
802            errors.append("Error encountered while rendering pillar top file.")
803        return merged_tops, errors
804
805    def top_matches(self, top, reload=False):
806        """
807        Search through the top high data for matches and return the states
808        that this minion needs to execute.
809
810        Returns:
811        {'saltenv': ['state1', 'state2', ...]}
812
813        reload
814            Reload the matcher loader
815        """
816        matches = {}
817        if reload:
818            self.matchers = salt.loader.matchers(self.opts)
819        for saltenv, body in top.items():
820            if self.opts["pillarenv"]:
821                if saltenv != self.opts["pillarenv"]:
822                    continue
823            for match, data in body.items():
824                if self.matchers["confirm_top.confirm_top"](
825                    match,
826                    data,
827                    self.opts.get("nodegroups", {}),
828                ):
829                    if saltenv not in matches:
830                        matches[saltenv] = env_matches = []
831                    else:
832                        env_matches = matches[saltenv]
833                    for item in data:
834                        if isinstance(item, str) and item not in env_matches:
835                            env_matches.append(item)
836        return matches
837
838    def render_pstate(self, sls, saltenv, mods, defaults=None):
839        """
840        Collect a single pillar sls file and render it
841        """
842        if defaults is None:
843            defaults = {}
844        err = ""
845        errors = []
846        state_data = self.client.get_state(sls, saltenv)
847        fn_ = state_data.get("dest", False)
848        if not fn_:
849            if sls in self.ignored_pillars.get(saltenv, []):
850                log.debug(
851                    "Skipping ignored and missing SLS '%s' in environment '%s'",
852                    sls,
853                    saltenv,
854                )
855                return None, mods, errors
856            elif self.opts["pillar_roots"].get(saltenv):
857                msg = (
858                    "Specified SLS '{}' in environment '{}' is not"
859                    " available on the salt master".format(sls, saltenv)
860                )
861                log.error(msg)
862                errors.append(msg)
863            else:
864                msg = "Specified SLS '{}' in environment '{}' was not found. ".format(
865                    sls, saltenv
866                )
867                if self.opts.get("__git_pillar", False) is True:
868                    msg += (
869                        "This is likely caused by a git_pillar top file "
870                        "containing an environment other than the one for the "
871                        "branch in which it resides. Each git_pillar "
872                        "branch/tag must have its own top file."
873                    )
874                else:
875                    msg += (
876                        "This could be because SLS '{0}' is in an "
877                        "environment other than '{1}', but '{1}' is "
878                        "included in that environment's Pillar top file. It "
879                        "could also be due to environment '{1}' not being "
880                        "defined in 'pillar_roots'.".format(sls, saltenv)
881                    )
882                log.debug(msg)
883                # return state, mods, errors
884                return None, mods, errors
885        state = None
886        try:
887            state = compile_template(
888                fn_,
889                self.rend,
890                self.opts["renderer"],
891                self.opts["renderer_blacklist"],
892                self.opts["renderer_whitelist"],
893                saltenv,
894                sls,
895                _pillar_rend=True,
896                **defaults
897            )
898        except Exception as exc:  # pylint: disable=broad-except
899            msg = "Rendering SLS '{}' failed, render error:\n{}".format(sls, exc)
900            log.critical(msg, exc_info=True)
901            if self.opts.get("pillar_safe_render_error", True):
902                errors.append(
903                    "Rendering SLS '{}' failed. Please see master log for "
904                    "details.".format(sls)
905                )
906            else:
907                errors.append(msg)
908        mods[sls] = state
909        nstate = None
910        if state:
911            if not isinstance(state, dict):
912                msg = "SLS '{}' does not render to a dictionary".format(sls)
913                log.error(msg)
914                errors.append(msg)
915            else:
916                if "include" in state:
917                    if not isinstance(state["include"], list):
918                        msg = (
919                            "Include Declaration in SLS '{}' is not "
920                            "formed as a list".format(sls)
921                        )
922                        log.error(msg)
923                        errors.append(msg)
924                    else:
925                        # render included state(s)
926                        include_states = []
927                        for sub_sls in state.pop("include"):
928                            if isinstance(sub_sls, dict):
929                                sub_sls, v = next(iter(sub_sls.items()))
930                                defaults = v.get("defaults", {})
931                                key = v.get("key", None)
932                            else:
933                                key = None
934                            try:
935                                matched_pstates = fnmatch.filter(
936                                    self.avail[saltenv],
937                                    sub_sls.lstrip(".").replace("/", "."),
938                                )
939                                if sub_sls.startswith("."):
940                                    if state_data.get("source", "").endswith(
941                                        "/init.sls"
942                                    ):
943                                        include_parts = sls.split(".")
944                                    else:
945                                        include_parts = sls.split(".")[:-1]
946                                    sub_sls = ".".join(include_parts + [sub_sls[1:]])
947                                matches = fnmatch.filter(
948                                    self.avail[saltenv],
949                                    sub_sls,
950                                )
951                                matched_pstates.extend(matches)
952                            except KeyError:
953                                errors.extend(
954                                    [
955                                        "No matching pillar environment for environment"
956                                        " '{}' found".format(saltenv)
957                                    ]
958                                )
959                                matched_pstates = [sub_sls]
960                            # If matched_pstates is empty, set to sub_sls
961                            if len(matched_pstates) < 1:
962                                matched_pstates = [sub_sls]
963                            for m_sub_sls in matched_pstates:
964                                if m_sub_sls not in mods:
965                                    nstate, mods, err = self.render_pstate(
966                                        m_sub_sls, saltenv, mods, defaults
967                                    )
968                                else:
969                                    nstate = mods[m_sub_sls]
970                                if nstate:
971                                    if key:
972                                        # If key is x:y, convert it to {x: {y: nstate}}
973                                        for key_fragment in reversed(key.split(":")):
974                                            nstate = {key_fragment: nstate}
975                                    if not self.opts.get(
976                                        "pillar_includes_override_sls", False
977                                    ):
978                                        include_states.append(nstate)
979                                    else:
980                                        state = merge(
981                                            state,
982                                            nstate,
983                                            self.merge_strategy,
984                                            self.opts.get("renderer", "yaml"),
985                                            self.opts.get("pillar_merge_lists", False),
986                                        )
987                                if err:
988                                    errors += err
989                        if not self.opts.get("pillar_includes_override_sls", False):
990                            # merge included state(s) with the current state
991                            # merged last to ensure that its values are
992                            # authoritative.
993                            include_states.append(state)
994                            state = None
995                            for s in include_states:
996                                if state is None:
997                                    state = s
998                                else:
999                                    state = merge(
1000                                        state,
1001                                        s,
1002                                        self.merge_strategy,
1003                                        self.opts.get("renderer", "yaml"),
1004                                        self.opts.get("pillar_merge_lists", False),
1005                                    )
1006        return state, mods, errors
1007
1008    def render_pillar(self, matches, errors=None):
1009        """
1010        Extract the sls pillar files from the matches and render them into the
1011        pillar
1012        """
1013        pillar = copy.copy(self.pillar_override)
1014        if errors is None:
1015            errors = []
1016        for saltenv, pstates in matches.items():
1017            pstatefiles = []
1018            mods = {}
1019            for sls_match in pstates:
1020                matched_pstates = []
1021                try:
1022                    matched_pstates = fnmatch.filter(self.avail[saltenv], sls_match)
1023                except KeyError:
1024                    errors.extend(
1025                        [
1026                            "No matching pillar environment for environment "
1027                            "'{}' found".format(saltenv)
1028                        ]
1029                    )
1030                if matched_pstates:
1031                    pstatefiles.extend(matched_pstates)
1032                else:
1033                    pstatefiles.append(sls_match)
1034
1035            for sls in pstatefiles:
1036                pstate, mods, err = self.render_pstate(sls, saltenv, mods)
1037
1038                if err:
1039                    errors += err
1040
1041                if pstate is not None:
1042                    if not isinstance(pstate, dict):
1043                        log.error(
1044                            "The rendered pillar sls file, '%s' state did "
1045                            "not return the expected data format. This is "
1046                            "a sign of a malformed pillar sls file. Returned "
1047                            "errors: %s",
1048                            sls,
1049                            ", ".join(["'{}'".format(e) for e in errors]),
1050                        )
1051                        continue
1052                    pillar = merge(
1053                        pillar,
1054                        pstate,
1055                        self.merge_strategy,
1056                        self.opts.get("renderer", "yaml"),
1057                        self.opts.get("pillar_merge_lists", False),
1058                    )
1059
1060        return pillar, errors
1061
1062    def _external_pillar_data(self, pillar, val, key):
1063        """
1064        Builds actual pillar data structure and updates the ``pillar`` variable
1065        """
1066        ext = None
1067        args = salt.utils.args.get_function_argspec(self.ext_pillars[key]).args
1068
1069        if isinstance(val, dict):
1070            if ("extra_minion_data" in args) and self.extra_minion_data:
1071                ext = self.ext_pillars[key](
1072                    self.minion_id,
1073                    pillar,
1074                    extra_minion_data=self.extra_minion_data,
1075                    **val
1076                )
1077            else:
1078                ext = self.ext_pillars[key](self.minion_id, pillar, **val)
1079        elif isinstance(val, list):
1080            if ("extra_minion_data" in args) and self.extra_minion_data:
1081                ext = self.ext_pillars[key](
1082                    self.minion_id,
1083                    pillar,
1084                    *val,
1085                    extra_minion_data=self.extra_minion_data
1086                )
1087            else:
1088                ext = self.ext_pillars[key](self.minion_id, pillar, *val)
1089        else:
1090            if ("extra_minion_data" in args) and self.extra_minion_data:
1091                ext = self.ext_pillars[key](
1092                    self.minion_id,
1093                    pillar,
1094                    val,
1095                    extra_minion_data=self.extra_minion_data,
1096                )
1097            else:
1098                ext = self.ext_pillars[key](self.minion_id, pillar, val)
1099        return ext
1100
1101    def ext_pillar(self, pillar, errors=None):
1102        """
1103        Render the external pillar data
1104        """
1105        if errors is None:
1106            errors = []
1107        try:
1108            # Make sure that on-demand git_pillar is fetched before we try to
1109            # compile the pillar data. git_pillar will fetch a remote when
1110            # the git ext_pillar() func is run, but only for masterless.
1111            if self.ext and "git" in self.ext and self.opts.get("__role") != "minion":
1112                # Avoid circular import
1113                import salt.utils.gitfs
1114                import salt.pillar.git_pillar
1115
1116                git_pillar = salt.utils.gitfs.GitPillar(
1117                    self.opts,
1118                    self.ext["git"],
1119                    per_remote_overrides=salt.pillar.git_pillar.PER_REMOTE_OVERRIDES,
1120                    per_remote_only=salt.pillar.git_pillar.PER_REMOTE_ONLY,
1121                    global_only=salt.pillar.git_pillar.GLOBAL_ONLY,
1122                )
1123                git_pillar.fetch_remotes()
1124        except TypeError:
1125            # Handle malformed ext_pillar
1126            pass
1127        if "ext_pillar" not in self.opts:
1128            return pillar, errors
1129        if not isinstance(self.opts["ext_pillar"], list):
1130            errors.append('The "ext_pillar" option is malformed')
1131            log.critical(errors[-1])
1132            return pillar, errors
1133        ext = None
1134        # Bring in CLI pillar data
1135        if self.pillar_override:
1136            pillar = merge(
1137                pillar,
1138                self.pillar_override,
1139                self.merge_strategy,
1140                self.opts.get("renderer", "yaml"),
1141                self.opts.get("pillar_merge_lists", False),
1142            )
1143
1144        for run in self.opts["ext_pillar"]:
1145            if not isinstance(run, dict):
1146                errors.append('The "ext_pillar" option is malformed')
1147                log.critical(errors[-1])
1148                return {}, errors
1149            if next(iter(run.keys())) in self.opts.get("exclude_ext_pillar", []):
1150                continue
1151            for key, val in run.items():
1152                if key not in self.ext_pillars:
1153                    log.critical(
1154                        "Specified ext_pillar interface %s is unavailable", key
1155                    )
1156                    continue
1157                try:
1158                    ext = self._external_pillar_data(pillar, val, key)
1159                except Exception as exc:  # pylint: disable=broad-except
1160                    errors.append(
1161                        "Failed to load ext_pillar {}: {}".format(
1162                            key,
1163                            exc.__str__(),
1164                        )
1165                    )
1166                    log.error(
1167                        "Exception caught loading ext_pillar '%s':\n%s",
1168                        key,
1169                        "".join(traceback.format_tb(sys.exc_info()[2])),
1170                    )
1171            if ext:
1172                pillar = merge(
1173                    pillar,
1174                    ext,
1175                    self.merge_strategy,
1176                    self.opts.get("renderer", "yaml"),
1177                    self.opts.get("pillar_merge_lists", False),
1178                )
1179                ext = None
1180        return pillar, errors
1181
1182    def compile_pillar(self, ext=True):
1183        """
1184        Render the pillar data and return
1185        """
1186        top, top_errors = self.get_top()
1187        if ext:
1188            if self.opts.get("ext_pillar_first", False):
1189                self.opts["pillar"], errors = self.ext_pillar(self.pillar_override)
1190                self.rend = salt.loader.render(self.opts, self.functions)
1191                matches = self.top_matches(top, reload=True)
1192                pillar, errors = self.render_pillar(matches, errors=errors)
1193                pillar = merge(
1194                    self.opts["pillar"],
1195                    pillar,
1196                    self.merge_strategy,
1197                    self.opts.get("renderer", "yaml"),
1198                    self.opts.get("pillar_merge_lists", False),
1199                )
1200            else:
1201                matches = self.top_matches(top)
1202                pillar, errors = self.render_pillar(matches)
1203                pillar, errors = self.ext_pillar(pillar, errors=errors)
1204        else:
1205            matches = self.top_matches(top)
1206            pillar, errors = self.render_pillar(matches)
1207        errors.extend(top_errors)
1208        if self.opts.get("pillar_opts", False):
1209            mopts = dict(self.opts)
1210            if "grains" in mopts:
1211                mopts.pop("grains")
1212            mopts["saltversion"] = __version__
1213            pillar["master"] = mopts
1214        if "pillar" in self.opts and self.opts.get("ssh_merge_pillar", False):
1215            pillar = merge(
1216                self.opts["pillar"],
1217                pillar,
1218                self.merge_strategy,
1219                self.opts.get("renderer", "yaml"),
1220                self.opts.get("pillar_merge_lists", False),
1221            )
1222        if errors:
1223            for error in errors:
1224                log.critical("Pillar render error: %s", error)
1225            pillar["_errors"] = errors
1226
1227        if self.pillar_override:
1228            pillar = merge(
1229                pillar,
1230                self.pillar_override,
1231                self.merge_strategy,
1232                self.opts.get("renderer", "yaml"),
1233                self.opts.get("pillar_merge_lists", False),
1234            )
1235
1236        decrypt_errors = self.decrypt_pillar(pillar)
1237        if decrypt_errors:
1238            pillar.setdefault("_errors", []).extend(decrypt_errors)
1239        return pillar
1240
1241    def decrypt_pillar(self, pillar):
1242        """
1243        Decrypt the specified pillar dictionary items, if configured to do so
1244        """
1245        errors = []
1246        if self.opts.get("decrypt_pillar"):
1247            decrypt_pillar = self.opts["decrypt_pillar"]
1248            if not isinstance(decrypt_pillar, dict):
1249                decrypt_pillar = salt.utils.data.repack_dictlist(
1250                    self.opts["decrypt_pillar"]
1251                )
1252            if not decrypt_pillar:
1253                errors.append("decrypt_pillar config option is malformed")
1254            for key, rend in decrypt_pillar.items():
1255                ptr = salt.utils.data.traverse_dict(
1256                    pillar,
1257                    key,
1258                    default=None,
1259                    delimiter=self.opts["decrypt_pillar_delimiter"],
1260                )
1261                if ptr is None:
1262                    log.debug("Pillar key %s not present", key)
1263                    continue
1264                try:
1265                    hash(ptr)
1266                    immutable = True
1267                except TypeError:
1268                    immutable = False
1269                try:
1270                    ret = salt.utils.crypt.decrypt(
1271                        ptr,
1272                        rend or self.opts["decrypt_pillar_default"],
1273                        renderers=self.rend,
1274                        opts=self.opts,
1275                        valid_rend=self.opts["decrypt_pillar_renderers"],
1276                    )
1277                    if immutable:
1278                        # Since the key pointed to an immutable type, we need
1279                        # to replace it in the pillar dict. First we will find
1280                        # the parent, and then we will replace the child key
1281                        # with the return data from the renderer.
1282                        parent, _, child = key.rpartition(
1283                            self.opts["decrypt_pillar_delimiter"]
1284                        )
1285                        if not parent:
1286                            # key is a top-level key, so the pointer to the
1287                            # parent is the pillar dict itself.
1288                            ptr = pillar
1289                        else:
1290                            ptr = salt.utils.data.traverse_dict(
1291                                pillar,
1292                                parent,
1293                                default=None,
1294                                delimiter=self.opts["decrypt_pillar_delimiter"],
1295                            )
1296                        if ptr is not None:
1297                            ptr[child] = ret
1298                except Exception as exc:  # pylint: disable=broad-except
1299                    msg = "Failed to decrypt pillar key '{}': {}".format(key, exc)
1300                    errors.append(msg)
1301                    log.error(msg, exc_info=True)
1302        return errors
1303
1304    def destroy(self):
1305        """
1306        This method exist in order to be API compatible with RemotePillar
1307        """
1308        if self._closing:
1309            return
1310        self._closing = True
1311
1312    # pylint: disable=W1701
1313    def __del__(self):
1314        self.destroy()
1315
1316    # pylint: enable=W1701
1317
1318
1319# TODO: actually migrate from Pillar to AsyncPillar to allow for futures in
1320# ext_pillar etc.
1321class AsyncPillar(Pillar):
1322    @salt.ext.tornado.gen.coroutine
1323    def compile_pillar(self, ext=True):
1324        ret = super().compile_pillar(ext=ext)
1325        raise salt.ext.tornado.gen.Return(ret)
1326