1""" 2Render the pillar data 3""" 4 5import collections 6import copy 7import fnmatch 8import logging 9import os 10import sys 11import traceback 12import uuid 13 14import salt.ext.tornado.gen 15import salt.fileclient 16import salt.loader 17import salt.minion 18import salt.transport.client 19import salt.utils.args 20import salt.utils.cache 21import salt.utils.crypt 22import salt.utils.data 23import salt.utils.dictupdate 24import salt.utils.url 25from salt.exceptions import SaltClientError 26from salt.template import compile_template 27 28# Even though dictupdate is imported, invoking salt.utils.dictupdate.merge here 29# causes an UnboundLocalError. This should be investigated and fixed, but until 30# then, leave the import directly below this comment intact. 31from salt.utils.dictupdate import merge 32from salt.utils.odict import OrderedDict 33from salt.version import __version__ 34 35log = logging.getLogger(__name__) 36 37 38def get_pillar( 39 opts, 40 grains, 41 minion_id, 42 saltenv=None, 43 ext=None, 44 funcs=None, 45 pillar_override=None, 46 pillarenv=None, 47 extra_minion_data=None, 48): 49 """ 50 Return the correct pillar driver based on the file_client option 51 """ 52 # When file_client is 'local' this makes the minion masterless 53 # but sometimes we want the minion to read its files from the local 54 # filesystem instead of asking for them from the master, but still 55 # get commands from the master. 56 # To enable this functionality set file_client=local and 57 # use_master_when_local=True in the minion config. Then here we override 58 # the file client to be 'remote' for getting pillar. If we don't do this 59 # then the minion never sends the event that the master uses to update 60 # its minion_data_cache. If the master doesn't update the minion_data_cache 61 # then the SSE salt-master plugin won't see any grains for those minions. 62 file_client = opts["file_client"] 63 if opts.get("master_type") == "disable" and file_client == "remote": 64 file_client = "local" 65 elif file_client == "local" and opts.get("use_master_when_local"): 66 file_client = "remote" 67 68 ptype = {"remote": RemotePillar, "local": Pillar}.get(file_client, Pillar) 69 # If local pillar and we're caching, run through the cache system first 70 log.debug("Determining pillar cache") 71 if opts["pillar_cache"]: 72 log.debug("get_pillar using pillar cache with ext: %s", ext) 73 return PillarCache( 74 opts, 75 grains, 76 minion_id, 77 saltenv, 78 ext=ext, 79 functions=funcs, 80 pillar_override=pillar_override, 81 pillarenv=pillarenv, 82 ) 83 return ptype( 84 opts, 85 grains, 86 minion_id, 87 saltenv, 88 ext, 89 functions=funcs, 90 pillar_override=pillar_override, 91 pillarenv=pillarenv, 92 extra_minion_data=extra_minion_data, 93 ) 94 95 96# TODO: migrate everyone to this one! 97def get_async_pillar( 98 opts, 99 grains, 100 minion_id, 101 saltenv=None, 102 ext=None, 103 funcs=None, 104 pillar_override=None, 105 pillarenv=None, 106 extra_minion_data=None, 107): 108 """ 109 Return the correct pillar driver based on the file_client option 110 """ 111 file_client = opts["file_client"] 112 if opts.get("master_type") == "disable" and file_client == "remote": 113 file_client = "local" 114 elif file_client == "local" and opts.get("use_master_when_local"): 115 file_client = "remote" 116 ptype = {"remote": AsyncRemotePillar, "local": AsyncPillar}.get( 117 file_client, AsyncPillar 118 ) 119 return ptype( 120 opts, 121 grains, 122 minion_id, 123 saltenv, 124 ext, 125 functions=funcs, 126 pillar_override=pillar_override, 127 pillarenv=pillarenv, 128 extra_minion_data=extra_minion_data, 129 ) 130 131 132class RemotePillarMixin: 133 """ 134 Common remote pillar functionality 135 """ 136 137 def get_ext_pillar_extra_minion_data(self, opts): 138 """ 139 Returns the extra data from the minion's opts dict (the config file). 140 141 This data will be passed to external pillar functions. 142 """ 143 144 def get_subconfig(opts_key): 145 """ 146 Returns a dict containing the opts key subtree, while maintaining 147 the opts structure 148 """ 149 ret_dict = aux_dict = {} 150 config_val = opts 151 subkeys = opts_key.split(":") 152 # Build an empty dict with the opts path 153 for subkey in subkeys[:-1]: 154 aux_dict[subkey] = {} 155 aux_dict = aux_dict[subkey] 156 if not config_val.get(subkey): 157 # The subkey is not in the config 158 return {} 159 config_val = config_val[subkey] 160 if subkeys[-1] not in config_val: 161 return {} 162 aux_dict[subkeys[-1]] = config_val[subkeys[-1]] 163 return ret_dict 164 165 extra_data = {} 166 if "pass_to_ext_pillars" in opts: 167 if not isinstance(opts["pass_to_ext_pillars"], list): 168 log.exception("'pass_to_ext_pillars' config is malformed.") 169 raise SaltClientError("'pass_to_ext_pillars' config is malformed.") 170 for key in opts["pass_to_ext_pillars"]: 171 salt.utils.dictupdate.update( 172 extra_data, 173 get_subconfig(key), 174 recursive_update=True, 175 merge_lists=True, 176 ) 177 log.trace("ext_pillar_extra_data = %s", extra_data) 178 return extra_data 179 180 181class AsyncRemotePillar(RemotePillarMixin): 182 """ 183 Get the pillar from the master 184 """ 185 186 def __init__( 187 self, 188 opts, 189 grains, 190 minion_id, 191 saltenv, 192 ext=None, 193 functions=None, 194 pillar_override=None, 195 pillarenv=None, 196 extra_minion_data=None, 197 ): 198 self.opts = opts 199 self.opts["saltenv"] = saltenv 200 self.ext = ext 201 self.grains = grains 202 self.minion_id = minion_id 203 self.channel = salt.transport.client.AsyncReqChannel.factory(opts) 204 if pillarenv is not None: 205 self.opts["pillarenv"] = pillarenv 206 self.pillar_override = pillar_override or {} 207 if not isinstance(self.pillar_override, dict): 208 self.pillar_override = {} 209 log.error("Pillar data must be a dictionary") 210 self.extra_minion_data = extra_minion_data or {} 211 if not isinstance(self.extra_minion_data, dict): 212 self.extra_minion_data = {} 213 log.error("Extra minion data must be a dictionary") 214 salt.utils.dictupdate.update( 215 self.extra_minion_data, 216 self.get_ext_pillar_extra_minion_data(opts), 217 recursive_update=True, 218 merge_lists=True, 219 ) 220 self._closing = False 221 222 @salt.ext.tornado.gen.coroutine 223 def compile_pillar(self): 224 """ 225 Return a future which will contain the pillar data from the master 226 """ 227 load = { 228 "id": self.minion_id, 229 "grains": self.grains, 230 "saltenv": self.opts["saltenv"], 231 "pillarenv": self.opts["pillarenv"], 232 "pillar_override": self.pillar_override, 233 "extra_minion_data": self.extra_minion_data, 234 "ver": "2", 235 "cmd": "_pillar", 236 } 237 if self.ext: 238 load["ext"] = self.ext 239 try: 240 ret_pillar = yield self.channel.crypted_transfer_decode_dictentry( 241 load, 242 dictkey="pillar", 243 ) 244 except salt.crypt.AuthenticationError as exc: 245 log.error(exc.message) 246 raise SaltClientError("Exception getting pillar.") 247 except Exception: # pylint: disable=broad-except 248 log.exception("Exception getting pillar:") 249 raise SaltClientError("Exception getting pillar.") 250 251 if not isinstance(ret_pillar, dict): 252 msg = "Got a bad pillar from master, type {}, expecting dict: {}".format( 253 type(ret_pillar).__name__, ret_pillar 254 ) 255 log.error(msg) 256 # raise an exception! Pillar isn't empty, we can't sync it! 257 raise SaltClientError(msg) 258 raise salt.ext.tornado.gen.Return(ret_pillar) 259 260 def destroy(self): 261 if self._closing: 262 return 263 264 self._closing = True 265 self.channel.close() 266 267 # pylint: disable=W1701 268 def __del__(self): 269 self.destroy() 270 271 # pylint: enable=W1701 272 273 274class RemotePillar(RemotePillarMixin): 275 """ 276 Get the pillar from the master 277 """ 278 279 def __init__( 280 self, 281 opts, 282 grains, 283 minion_id, 284 saltenv, 285 ext=None, 286 functions=None, 287 pillar_override=None, 288 pillarenv=None, 289 extra_minion_data=None, 290 ): 291 self.opts = opts 292 self.opts["saltenv"] = saltenv 293 self.ext = ext 294 self.grains = grains 295 self.minion_id = minion_id 296 self.channel = salt.transport.client.ReqChannel.factory(opts) 297 if pillarenv is not None: 298 self.opts["pillarenv"] = pillarenv 299 self.pillar_override = pillar_override or {} 300 if not isinstance(self.pillar_override, dict): 301 self.pillar_override = {} 302 log.error("Pillar data must be a dictionary") 303 self.extra_minion_data = extra_minion_data or {} 304 if not isinstance(self.extra_minion_data, dict): 305 self.extra_minion_data = {} 306 log.error("Extra minion data must be a dictionary") 307 salt.utils.dictupdate.update( 308 self.extra_minion_data, 309 self.get_ext_pillar_extra_minion_data(opts), 310 recursive_update=True, 311 merge_lists=True, 312 ) 313 self._closing = False 314 315 def compile_pillar(self): 316 """ 317 Return the pillar data from the master 318 """ 319 load = { 320 "id": self.minion_id, 321 "grains": self.grains, 322 "saltenv": self.opts["saltenv"], 323 "pillarenv": self.opts["pillarenv"], 324 "pillar_override": self.pillar_override, 325 "extra_minion_data": self.extra_minion_data, 326 "ver": "2", 327 "cmd": "_pillar", 328 } 329 if self.ext: 330 load["ext"] = self.ext 331 ret_pillar = self.channel.crypted_transfer_decode_dictentry( 332 load, 333 dictkey="pillar", 334 ) 335 336 if not isinstance(ret_pillar, dict): 337 log.error( 338 "Got a bad pillar from master, type %s, expecting dict: %s", 339 type(ret_pillar).__name__, 340 ret_pillar, 341 ) 342 return {} 343 return ret_pillar 344 345 def destroy(self): 346 if hasattr(self, "_closing") and self._closing: 347 return 348 349 self._closing = True 350 self.channel.close() 351 352 # pylint: disable=W1701 353 def __del__(self): 354 self.destroy() 355 356 # pylint: enable=W1701 357 358 359class PillarCache: 360 """ 361 Return a cached pillar if it exists, otherwise cache it. 362 363 Pillar caches are structed in two diminensions: minion_id with a dict of 364 saltenvs. Each saltenv contains a pillar dict 365 366 Example data structure: 367 368 ``` 369 {'minion_1': 370 {'base': {'pilar_key_1' 'pillar_val_1'} 371 } 372 """ 373 374 # TODO ABC? 375 def __init__( 376 self, 377 opts, 378 grains, 379 minion_id, 380 saltenv, 381 ext=None, 382 functions=None, 383 pillar_override=None, 384 pillarenv=None, 385 extra_minion_data=None, 386 ): 387 # Yes, we need all of these because we need to route to the Pillar object 388 # if we have no cache. This is another refactor target. 389 390 # Go ahead and assign these because they may be needed later 391 self.opts = opts 392 self.grains = grains 393 self.minion_id = minion_id 394 self.ext = ext 395 self.functions = functions 396 self.pillar_override = pillar_override 397 self.pillarenv = pillarenv 398 399 if saltenv is None: 400 self.saltenv = "base" 401 else: 402 self.saltenv = saltenv 403 404 # Determine caching backend 405 self.cache = salt.utils.cache.CacheFactory.factory( 406 self.opts["pillar_cache_backend"], 407 self.opts["pillar_cache_ttl"], 408 minion_cache_path=self._minion_cache_path(minion_id), 409 ) 410 411 def _minion_cache_path(self, minion_id): 412 """ 413 Return the path to the cache file for the minion. 414 415 Used only for disk-based backends 416 """ 417 return os.path.join(self.opts["cachedir"], "pillar_cache", minion_id) 418 419 def fetch_pillar(self): 420 """ 421 In the event of a cache miss, we need to incur the overhead of caching 422 a new pillar. 423 """ 424 log.debug("Pillar cache getting external pillar with ext: %s", self.ext) 425 fresh_pillar = Pillar( 426 self.opts, 427 self.grains, 428 self.minion_id, 429 self.saltenv, 430 ext=self.ext, 431 functions=self.functions, 432 pillar_override=self.pillar_override, 433 pillarenv=self.pillarenv, 434 ) 435 return fresh_pillar.compile_pillar() 436 437 def clear_pillar(self): 438 """ 439 Clear the cache 440 """ 441 self.cache.clear() 442 443 return True 444 445 def compile_pillar(self, *args, **kwargs): # Will likely just be pillar_dirs 446 log.debug( 447 "Scanning pillar cache for information about minion %s and pillarenv %s", 448 self.minion_id, 449 self.pillarenv, 450 ) 451 if self.opts["pillar_cache_backend"] == "memory": 452 cache_dict = self.cache 453 else: 454 cache_dict = self.cache._dict 455 456 log.debug("Scanning cache: %s", cache_dict) 457 # Check the cache! 458 if self.minion_id in self.cache: # Keyed by minion_id 459 # TODO Compare grains, etc? 460 if self.pillarenv in self.cache[self.minion_id]: 461 # We have a cache hit! Send it back. 462 log.debug( 463 "Pillar cache hit for minion %s and pillarenv %s", 464 self.minion_id, 465 self.pillarenv, 466 ) 467 return self.cache[self.minion_id][self.pillarenv] 468 else: 469 # We found the minion but not the env. Store it. 470 fresh_pillar = self.fetch_pillar() 471 472 minion_cache = self.cache[self.minion_id] 473 minion_cache[self.pillarenv] = fresh_pillar 474 self.cache[self.minion_id] = minion_cache 475 476 log.debug( 477 "Pillar cache miss for pillarenv %s for minion %s", 478 self.pillarenv, 479 self.minion_id, 480 ) 481 return fresh_pillar 482 else: 483 # We haven't seen this minion yet in the cache. Store it. 484 fresh_pillar = self.fetch_pillar() 485 self.cache[self.minion_id] = {self.pillarenv: fresh_pillar} 486 log.debug("Pillar cache miss for minion %s", self.minion_id) 487 log.debug("Current pillar cache: %s", cache_dict) # FIXME hack! 488 return fresh_pillar 489 490 491class Pillar: 492 """ 493 Read over the pillar top files and render the pillar data 494 """ 495 496 def __init__( 497 self, 498 opts, 499 grains, 500 minion_id, 501 saltenv, 502 ext=None, 503 functions=None, 504 pillar_override=None, 505 pillarenv=None, 506 extra_minion_data=None, 507 ): 508 self.minion_id = minion_id 509 self.ext = ext 510 if pillarenv is None: 511 if opts.get("pillarenv_from_saltenv", False): 512 opts["pillarenv"] = saltenv 513 # use the local file client 514 self.opts = self.__gen_opts(opts, grains, saltenv=saltenv, pillarenv=pillarenv) 515 self.saltenv = saltenv 516 self.client = salt.fileclient.get_file_client(self.opts, True) 517 self.avail = self.__gather_avail() 518 519 if opts.get("file_client", "") == "local" and not opts.get( 520 "use_master_when_local", False 521 ): 522 opts["grains"] = grains 523 524 # if we didn't pass in functions, lets load them 525 if functions is None: 526 utils = salt.loader.utils(opts) 527 if opts.get("file_client", "") == "local": 528 self.functions = salt.loader.minion_mods(opts, utils=utils) 529 else: 530 self.functions = salt.loader.minion_mods(self.opts, utils=utils) 531 else: 532 self.functions = functions 533 534 self.opts["minion_id"] = minion_id 535 self.matchers = salt.loader.matchers(self.opts) 536 self.rend = salt.loader.render(self.opts, self.functions) 537 ext_pillar_opts = copy.deepcopy(self.opts) 538 # Keep the incoming opts ID intact, ie, the master id 539 if "id" in opts: 540 ext_pillar_opts["id"] = opts["id"] 541 self.merge_strategy = "smart" 542 if opts.get("pillar_source_merging_strategy"): 543 self.merge_strategy = opts["pillar_source_merging_strategy"] 544 545 self.ext_pillars = salt.loader.pillars(ext_pillar_opts, self.functions) 546 self.ignored_pillars = {} 547 self.pillar_override = pillar_override or {} 548 if not isinstance(self.pillar_override, dict): 549 self.pillar_override = {} 550 log.error("Pillar data must be a dictionary") 551 self.extra_minion_data = extra_minion_data or {} 552 if not isinstance(self.extra_minion_data, dict): 553 self.extra_minion_data = {} 554 log.error("Extra minion data must be a dictionary") 555 self._closing = False 556 557 def __valid_on_demand_ext_pillar(self, opts): 558 """ 559 Check to see if the on demand external pillar is allowed 560 """ 561 if not isinstance(self.ext, dict): 562 log.error("On-demand pillar %s is not formatted as a dictionary", self.ext) 563 return False 564 565 on_demand = opts.get("on_demand_ext_pillar", []) 566 try: 567 invalid_on_demand = {x for x in self.ext if x not in on_demand} 568 except TypeError: 569 # Prevent traceback when on_demand_ext_pillar option is malformed 570 log.error( 571 "The 'on_demand_ext_pillar' configuration option is " 572 "malformed, it should be a list of ext_pillar module names" 573 ) 574 return False 575 576 if invalid_on_demand: 577 log.error( 578 "The following ext_pillar modules are not allowed for " 579 "on-demand pillar data: %s. Valid on-demand ext_pillar " 580 "modules are: %s. The valid modules can be adjusted by " 581 "setting the 'on_demand_ext_pillar' config option.", 582 ", ".join(sorted(invalid_on_demand)), 583 ", ".join(on_demand), 584 ) 585 return False 586 return True 587 588 def __gather_avail(self): 589 """ 590 Gather the lists of available sls data from the master 591 """ 592 avail = {} 593 for saltenv in self._get_envs(): 594 avail[saltenv] = self.client.list_states(saltenv) 595 return avail 596 597 def __gen_opts(self, opts_in, grains, saltenv=None, ext=None, pillarenv=None): 598 """ 599 The options need to be altered to conform to the file client 600 """ 601 opts = copy.deepcopy(opts_in) 602 opts["file_client"] = "local" 603 if not grains: 604 opts["grains"] = {} 605 else: 606 opts["grains"] = grains 607 # Allow minion/CLI saltenv/pillarenv to take precedence over master 608 opts["saltenv"] = saltenv if saltenv is not None else opts.get("saltenv") 609 opts["pillarenv"] = ( 610 pillarenv if pillarenv is not None else opts.get("pillarenv") 611 ) 612 opts["id"] = self.minion_id 613 if opts["state_top"].startswith("salt://"): 614 opts["state_top"] = opts["state_top"] 615 elif opts["state_top"].startswith("/"): 616 opts["state_top"] = salt.utils.url.create(opts["state_top"][1:]) 617 else: 618 opts["state_top"] = salt.utils.url.create(opts["state_top"]) 619 if self.ext and self.__valid_on_demand_ext_pillar(opts): 620 if "ext_pillar" in opts: 621 opts["ext_pillar"].append(self.ext) 622 else: 623 opts["ext_pillar"] = [self.ext] 624 if "__env__" in opts["pillar_roots"]: 625 env = opts.get("pillarenv") or opts.get("saltenv") or "base" 626 if env not in opts["pillar_roots"]: 627 log.debug( 628 "pillar environment '%s' maps to __env__ pillar_roots directory", 629 env, 630 ) 631 opts["pillar_roots"][env] = opts["pillar_roots"].pop("__env__") 632 else: 633 log.debug( 634 "pillar_roots __env__ ignored (environment '%s' found in pillar_roots)", 635 env, 636 ) 637 opts["pillar_roots"].pop("__env__") 638 return opts 639 640 def _get_envs(self): 641 """ 642 Pull the file server environments out of the master options 643 """ 644 envs = ["base"] 645 if "pillar_roots" in self.opts: 646 envs.extend([x for x in list(self.opts["pillar_roots"]) if x not in envs]) 647 return envs 648 649 def get_tops(self): 650 """ 651 Gather the top files 652 """ 653 tops = collections.defaultdict(list) 654 include = collections.defaultdict(list) 655 done = collections.defaultdict(list) 656 errors = [] 657 # Gather initial top files 658 try: 659 saltenvs = set() 660 if self.opts["pillarenv"]: 661 # If the specified pillarenv is not present in the available 662 # pillar environments, do not cache the pillar top file. 663 if self.opts["pillarenv"] not in self.opts["pillar_roots"]: 664 log.debug( 665 "pillarenv '%s' not found in the configured pillar " 666 "environments (%s)", 667 self.opts["pillarenv"], 668 ", ".join(self.opts["pillar_roots"]), 669 ) 670 else: 671 saltenvs.add(self.opts["pillarenv"]) 672 else: 673 saltenvs = self._get_envs() 674 if self.opts.get("pillar_source_merging_strategy", None) == "none": 675 saltenvs &= {self.saltenv or "base"} 676 677 for saltenv in saltenvs: 678 top = self.client.cache_file(self.opts["state_top"], saltenv) 679 if top: 680 tops[saltenv].append( 681 compile_template( 682 top, 683 self.rend, 684 self.opts["renderer"], 685 self.opts["renderer_blacklist"], 686 self.opts["renderer_whitelist"], 687 saltenv=saltenv, 688 _pillar_rend=True, 689 ) 690 ) 691 except Exception as exc: # pylint: disable=broad-except 692 errors.append( 693 "Rendering Primary Top file failed, render error:\n{}".format(exc) 694 ) 695 log.exception("Pillar rendering failed for minion %s", self.minion_id) 696 697 # Search initial top files for includes 698 for saltenv, ctops in tops.items(): 699 for ctop in ctops: 700 if "include" not in ctop: 701 continue 702 for sls in ctop["include"]: 703 include[saltenv].append(sls) 704 ctop.pop("include") 705 # Go through the includes and pull out the extra tops and add them 706 while include: 707 pops = [] 708 for saltenv, states in include.items(): 709 pops.append(saltenv) 710 if not states: 711 continue 712 for sls in states: 713 if sls in done[saltenv]: 714 continue 715 try: 716 tops[saltenv].append( 717 compile_template( 718 self.client.get_state(sls, saltenv).get("dest", False), 719 self.rend, 720 self.opts["renderer"], 721 self.opts["renderer_blacklist"], 722 self.opts["renderer_whitelist"], 723 saltenv=saltenv, 724 _pillar_rend=True, 725 ) 726 ) 727 except Exception as exc: # pylint: disable=broad-except 728 errors.append( 729 "Rendering Top file {} failed, render error:\n{}".format( 730 sls, exc 731 ) 732 ) 733 done[saltenv].append(sls) 734 for saltenv in pops: 735 if saltenv in include: 736 include.pop(saltenv) 737 738 return tops, errors 739 740 def merge_tops(self, tops): 741 """ 742 Cleanly merge the top files 743 """ 744 top = collections.defaultdict(OrderedDict) 745 orders = collections.defaultdict(OrderedDict) 746 for ctops in tops.values(): 747 for ctop in ctops: 748 for saltenv, targets in ctop.items(): 749 if saltenv == "include": 750 continue 751 for tgt in targets: 752 matches = [] 753 states = OrderedDict() 754 orders[saltenv][tgt] = 0 755 ignore_missing = False 756 for comp in ctop[saltenv][tgt]: 757 if isinstance(comp, dict): 758 if "match" in comp: 759 matches.append(comp) 760 if "order" in comp: 761 order = comp["order"] 762 if not isinstance(order, int): 763 try: 764 order = int(order) 765 except ValueError: 766 order = 0 767 orders[saltenv][tgt] = order 768 if comp.get("ignore_missing", False): 769 ignore_missing = True 770 if isinstance(comp, str): 771 states[comp] = True 772 if ignore_missing: 773 if saltenv not in self.ignored_pillars: 774 self.ignored_pillars[saltenv] = [] 775 self.ignored_pillars[saltenv].extend(states.keys()) 776 top[saltenv][tgt] = matches 777 top[saltenv][tgt].extend(states) 778 return self.sort_top_targets(top, orders) 779 780 def sort_top_targets(self, top, orders): 781 """ 782 Returns the sorted high data from the merged top files 783 """ 784 sorted_top = collections.defaultdict(OrderedDict) 785 # pylint: disable=cell-var-from-loop 786 for saltenv, targets in top.items(): 787 sorted_targets = sorted(targets, key=lambda target: orders[saltenv][target]) 788 for target in sorted_targets: 789 sorted_top[saltenv][target] = targets[target] 790 # pylint: enable=cell-var-from-loop 791 return sorted_top 792 793 def get_top(self): 794 """ 795 Returns the high data derived from the top file 796 """ 797 tops, errors = self.get_tops() 798 try: 799 merged_tops = self.merge_tops(tops) 800 except TypeError as err: 801 merged_tops = OrderedDict() 802 errors.append("Error encountered while rendering pillar top file.") 803 return merged_tops, errors 804 805 def top_matches(self, top, reload=False): 806 """ 807 Search through the top high data for matches and return the states 808 that this minion needs to execute. 809 810 Returns: 811 {'saltenv': ['state1', 'state2', ...]} 812 813 reload 814 Reload the matcher loader 815 """ 816 matches = {} 817 if reload: 818 self.matchers = salt.loader.matchers(self.opts) 819 for saltenv, body in top.items(): 820 if self.opts["pillarenv"]: 821 if saltenv != self.opts["pillarenv"]: 822 continue 823 for match, data in body.items(): 824 if self.matchers["confirm_top.confirm_top"]( 825 match, 826 data, 827 self.opts.get("nodegroups", {}), 828 ): 829 if saltenv not in matches: 830 matches[saltenv] = env_matches = [] 831 else: 832 env_matches = matches[saltenv] 833 for item in data: 834 if isinstance(item, str) and item not in env_matches: 835 env_matches.append(item) 836 return matches 837 838 def render_pstate(self, sls, saltenv, mods, defaults=None): 839 """ 840 Collect a single pillar sls file and render it 841 """ 842 if defaults is None: 843 defaults = {} 844 err = "" 845 errors = [] 846 state_data = self.client.get_state(sls, saltenv) 847 fn_ = state_data.get("dest", False) 848 if not fn_: 849 if sls in self.ignored_pillars.get(saltenv, []): 850 log.debug( 851 "Skipping ignored and missing SLS '%s' in environment '%s'", 852 sls, 853 saltenv, 854 ) 855 return None, mods, errors 856 elif self.opts["pillar_roots"].get(saltenv): 857 msg = ( 858 "Specified SLS '{}' in environment '{}' is not" 859 " available on the salt master".format(sls, saltenv) 860 ) 861 log.error(msg) 862 errors.append(msg) 863 else: 864 msg = "Specified SLS '{}' in environment '{}' was not found. ".format( 865 sls, saltenv 866 ) 867 if self.opts.get("__git_pillar", False) is True: 868 msg += ( 869 "This is likely caused by a git_pillar top file " 870 "containing an environment other than the one for the " 871 "branch in which it resides. Each git_pillar " 872 "branch/tag must have its own top file." 873 ) 874 else: 875 msg += ( 876 "This could be because SLS '{0}' is in an " 877 "environment other than '{1}', but '{1}' is " 878 "included in that environment's Pillar top file. It " 879 "could also be due to environment '{1}' not being " 880 "defined in 'pillar_roots'.".format(sls, saltenv) 881 ) 882 log.debug(msg) 883 # return state, mods, errors 884 return None, mods, errors 885 state = None 886 try: 887 state = compile_template( 888 fn_, 889 self.rend, 890 self.opts["renderer"], 891 self.opts["renderer_blacklist"], 892 self.opts["renderer_whitelist"], 893 saltenv, 894 sls, 895 _pillar_rend=True, 896 **defaults 897 ) 898 except Exception as exc: # pylint: disable=broad-except 899 msg = "Rendering SLS '{}' failed, render error:\n{}".format(sls, exc) 900 log.critical(msg, exc_info=True) 901 if self.opts.get("pillar_safe_render_error", True): 902 errors.append( 903 "Rendering SLS '{}' failed. Please see master log for " 904 "details.".format(sls) 905 ) 906 else: 907 errors.append(msg) 908 mods[sls] = state 909 nstate = None 910 if state: 911 if not isinstance(state, dict): 912 msg = "SLS '{}' does not render to a dictionary".format(sls) 913 log.error(msg) 914 errors.append(msg) 915 else: 916 if "include" in state: 917 if not isinstance(state["include"], list): 918 msg = ( 919 "Include Declaration in SLS '{}' is not " 920 "formed as a list".format(sls) 921 ) 922 log.error(msg) 923 errors.append(msg) 924 else: 925 # render included state(s) 926 include_states = [] 927 for sub_sls in state.pop("include"): 928 if isinstance(sub_sls, dict): 929 sub_sls, v = next(iter(sub_sls.items())) 930 defaults = v.get("defaults", {}) 931 key = v.get("key", None) 932 else: 933 key = None 934 try: 935 matched_pstates = fnmatch.filter( 936 self.avail[saltenv], 937 sub_sls.lstrip(".").replace("/", "."), 938 ) 939 if sub_sls.startswith("."): 940 if state_data.get("source", "").endswith( 941 "/init.sls" 942 ): 943 include_parts = sls.split(".") 944 else: 945 include_parts = sls.split(".")[:-1] 946 sub_sls = ".".join(include_parts + [sub_sls[1:]]) 947 matches = fnmatch.filter( 948 self.avail[saltenv], 949 sub_sls, 950 ) 951 matched_pstates.extend(matches) 952 except KeyError: 953 errors.extend( 954 [ 955 "No matching pillar environment for environment" 956 " '{}' found".format(saltenv) 957 ] 958 ) 959 matched_pstates = [sub_sls] 960 # If matched_pstates is empty, set to sub_sls 961 if len(matched_pstates) < 1: 962 matched_pstates = [sub_sls] 963 for m_sub_sls in matched_pstates: 964 if m_sub_sls not in mods: 965 nstate, mods, err = self.render_pstate( 966 m_sub_sls, saltenv, mods, defaults 967 ) 968 else: 969 nstate = mods[m_sub_sls] 970 if nstate: 971 if key: 972 # If key is x:y, convert it to {x: {y: nstate}} 973 for key_fragment in reversed(key.split(":")): 974 nstate = {key_fragment: nstate} 975 if not self.opts.get( 976 "pillar_includes_override_sls", False 977 ): 978 include_states.append(nstate) 979 else: 980 state = merge( 981 state, 982 nstate, 983 self.merge_strategy, 984 self.opts.get("renderer", "yaml"), 985 self.opts.get("pillar_merge_lists", False), 986 ) 987 if err: 988 errors += err 989 if not self.opts.get("pillar_includes_override_sls", False): 990 # merge included state(s) with the current state 991 # merged last to ensure that its values are 992 # authoritative. 993 include_states.append(state) 994 state = None 995 for s in include_states: 996 if state is None: 997 state = s 998 else: 999 state = merge( 1000 state, 1001 s, 1002 self.merge_strategy, 1003 self.opts.get("renderer", "yaml"), 1004 self.opts.get("pillar_merge_lists", False), 1005 ) 1006 return state, mods, errors 1007 1008 def render_pillar(self, matches, errors=None): 1009 """ 1010 Extract the sls pillar files from the matches and render them into the 1011 pillar 1012 """ 1013 pillar = copy.copy(self.pillar_override) 1014 if errors is None: 1015 errors = [] 1016 for saltenv, pstates in matches.items(): 1017 pstatefiles = [] 1018 mods = {} 1019 for sls_match in pstates: 1020 matched_pstates = [] 1021 try: 1022 matched_pstates = fnmatch.filter(self.avail[saltenv], sls_match) 1023 except KeyError: 1024 errors.extend( 1025 [ 1026 "No matching pillar environment for environment " 1027 "'{}' found".format(saltenv) 1028 ] 1029 ) 1030 if matched_pstates: 1031 pstatefiles.extend(matched_pstates) 1032 else: 1033 pstatefiles.append(sls_match) 1034 1035 for sls in pstatefiles: 1036 pstate, mods, err = self.render_pstate(sls, saltenv, mods) 1037 1038 if err: 1039 errors += err 1040 1041 if pstate is not None: 1042 if not isinstance(pstate, dict): 1043 log.error( 1044 "The rendered pillar sls file, '%s' state did " 1045 "not return the expected data format. This is " 1046 "a sign of a malformed pillar sls file. Returned " 1047 "errors: %s", 1048 sls, 1049 ", ".join(["'{}'".format(e) for e in errors]), 1050 ) 1051 continue 1052 pillar = merge( 1053 pillar, 1054 pstate, 1055 self.merge_strategy, 1056 self.opts.get("renderer", "yaml"), 1057 self.opts.get("pillar_merge_lists", False), 1058 ) 1059 1060 return pillar, errors 1061 1062 def _external_pillar_data(self, pillar, val, key): 1063 """ 1064 Builds actual pillar data structure and updates the ``pillar`` variable 1065 """ 1066 ext = None 1067 args = salt.utils.args.get_function_argspec(self.ext_pillars[key]).args 1068 1069 if isinstance(val, dict): 1070 if ("extra_minion_data" in args) and self.extra_minion_data: 1071 ext = self.ext_pillars[key]( 1072 self.minion_id, 1073 pillar, 1074 extra_minion_data=self.extra_minion_data, 1075 **val 1076 ) 1077 else: 1078 ext = self.ext_pillars[key](self.minion_id, pillar, **val) 1079 elif isinstance(val, list): 1080 if ("extra_minion_data" in args) and self.extra_minion_data: 1081 ext = self.ext_pillars[key]( 1082 self.minion_id, 1083 pillar, 1084 *val, 1085 extra_minion_data=self.extra_minion_data 1086 ) 1087 else: 1088 ext = self.ext_pillars[key](self.minion_id, pillar, *val) 1089 else: 1090 if ("extra_minion_data" in args) and self.extra_minion_data: 1091 ext = self.ext_pillars[key]( 1092 self.minion_id, 1093 pillar, 1094 val, 1095 extra_minion_data=self.extra_minion_data, 1096 ) 1097 else: 1098 ext = self.ext_pillars[key](self.minion_id, pillar, val) 1099 return ext 1100 1101 def ext_pillar(self, pillar, errors=None): 1102 """ 1103 Render the external pillar data 1104 """ 1105 if errors is None: 1106 errors = [] 1107 try: 1108 # Make sure that on-demand git_pillar is fetched before we try to 1109 # compile the pillar data. git_pillar will fetch a remote when 1110 # the git ext_pillar() func is run, but only for masterless. 1111 if self.ext and "git" in self.ext and self.opts.get("__role") != "minion": 1112 # Avoid circular import 1113 import salt.utils.gitfs 1114 import salt.pillar.git_pillar 1115 1116 git_pillar = salt.utils.gitfs.GitPillar( 1117 self.opts, 1118 self.ext["git"], 1119 per_remote_overrides=salt.pillar.git_pillar.PER_REMOTE_OVERRIDES, 1120 per_remote_only=salt.pillar.git_pillar.PER_REMOTE_ONLY, 1121 global_only=salt.pillar.git_pillar.GLOBAL_ONLY, 1122 ) 1123 git_pillar.fetch_remotes() 1124 except TypeError: 1125 # Handle malformed ext_pillar 1126 pass 1127 if "ext_pillar" not in self.opts: 1128 return pillar, errors 1129 if not isinstance(self.opts["ext_pillar"], list): 1130 errors.append('The "ext_pillar" option is malformed') 1131 log.critical(errors[-1]) 1132 return pillar, errors 1133 ext = None 1134 # Bring in CLI pillar data 1135 if self.pillar_override: 1136 pillar = merge( 1137 pillar, 1138 self.pillar_override, 1139 self.merge_strategy, 1140 self.opts.get("renderer", "yaml"), 1141 self.opts.get("pillar_merge_lists", False), 1142 ) 1143 1144 for run in self.opts["ext_pillar"]: 1145 if not isinstance(run, dict): 1146 errors.append('The "ext_pillar" option is malformed') 1147 log.critical(errors[-1]) 1148 return {}, errors 1149 if next(iter(run.keys())) in self.opts.get("exclude_ext_pillar", []): 1150 continue 1151 for key, val in run.items(): 1152 if key not in self.ext_pillars: 1153 log.critical( 1154 "Specified ext_pillar interface %s is unavailable", key 1155 ) 1156 continue 1157 try: 1158 ext = self._external_pillar_data(pillar, val, key) 1159 except Exception as exc: # pylint: disable=broad-except 1160 errors.append( 1161 "Failed to load ext_pillar {}: {}".format( 1162 key, 1163 exc.__str__(), 1164 ) 1165 ) 1166 log.error( 1167 "Exception caught loading ext_pillar '%s':\n%s", 1168 key, 1169 "".join(traceback.format_tb(sys.exc_info()[2])), 1170 ) 1171 if ext: 1172 pillar = merge( 1173 pillar, 1174 ext, 1175 self.merge_strategy, 1176 self.opts.get("renderer", "yaml"), 1177 self.opts.get("pillar_merge_lists", False), 1178 ) 1179 ext = None 1180 return pillar, errors 1181 1182 def compile_pillar(self, ext=True): 1183 """ 1184 Render the pillar data and return 1185 """ 1186 top, top_errors = self.get_top() 1187 if ext: 1188 if self.opts.get("ext_pillar_first", False): 1189 self.opts["pillar"], errors = self.ext_pillar(self.pillar_override) 1190 self.rend = salt.loader.render(self.opts, self.functions) 1191 matches = self.top_matches(top, reload=True) 1192 pillar, errors = self.render_pillar(matches, errors=errors) 1193 pillar = merge( 1194 self.opts["pillar"], 1195 pillar, 1196 self.merge_strategy, 1197 self.opts.get("renderer", "yaml"), 1198 self.opts.get("pillar_merge_lists", False), 1199 ) 1200 else: 1201 matches = self.top_matches(top) 1202 pillar, errors = self.render_pillar(matches) 1203 pillar, errors = self.ext_pillar(pillar, errors=errors) 1204 else: 1205 matches = self.top_matches(top) 1206 pillar, errors = self.render_pillar(matches) 1207 errors.extend(top_errors) 1208 if self.opts.get("pillar_opts", False): 1209 mopts = dict(self.opts) 1210 if "grains" in mopts: 1211 mopts.pop("grains") 1212 mopts["saltversion"] = __version__ 1213 pillar["master"] = mopts 1214 if "pillar" in self.opts and self.opts.get("ssh_merge_pillar", False): 1215 pillar = merge( 1216 self.opts["pillar"], 1217 pillar, 1218 self.merge_strategy, 1219 self.opts.get("renderer", "yaml"), 1220 self.opts.get("pillar_merge_lists", False), 1221 ) 1222 if errors: 1223 for error in errors: 1224 log.critical("Pillar render error: %s", error) 1225 pillar["_errors"] = errors 1226 1227 if self.pillar_override: 1228 pillar = merge( 1229 pillar, 1230 self.pillar_override, 1231 self.merge_strategy, 1232 self.opts.get("renderer", "yaml"), 1233 self.opts.get("pillar_merge_lists", False), 1234 ) 1235 1236 decrypt_errors = self.decrypt_pillar(pillar) 1237 if decrypt_errors: 1238 pillar.setdefault("_errors", []).extend(decrypt_errors) 1239 return pillar 1240 1241 def decrypt_pillar(self, pillar): 1242 """ 1243 Decrypt the specified pillar dictionary items, if configured to do so 1244 """ 1245 errors = [] 1246 if self.opts.get("decrypt_pillar"): 1247 decrypt_pillar = self.opts["decrypt_pillar"] 1248 if not isinstance(decrypt_pillar, dict): 1249 decrypt_pillar = salt.utils.data.repack_dictlist( 1250 self.opts["decrypt_pillar"] 1251 ) 1252 if not decrypt_pillar: 1253 errors.append("decrypt_pillar config option is malformed") 1254 for key, rend in decrypt_pillar.items(): 1255 ptr = salt.utils.data.traverse_dict( 1256 pillar, 1257 key, 1258 default=None, 1259 delimiter=self.opts["decrypt_pillar_delimiter"], 1260 ) 1261 if ptr is None: 1262 log.debug("Pillar key %s not present", key) 1263 continue 1264 try: 1265 hash(ptr) 1266 immutable = True 1267 except TypeError: 1268 immutable = False 1269 try: 1270 ret = salt.utils.crypt.decrypt( 1271 ptr, 1272 rend or self.opts["decrypt_pillar_default"], 1273 renderers=self.rend, 1274 opts=self.opts, 1275 valid_rend=self.opts["decrypt_pillar_renderers"], 1276 ) 1277 if immutable: 1278 # Since the key pointed to an immutable type, we need 1279 # to replace it in the pillar dict. First we will find 1280 # the parent, and then we will replace the child key 1281 # with the return data from the renderer. 1282 parent, _, child = key.rpartition( 1283 self.opts["decrypt_pillar_delimiter"] 1284 ) 1285 if not parent: 1286 # key is a top-level key, so the pointer to the 1287 # parent is the pillar dict itself. 1288 ptr = pillar 1289 else: 1290 ptr = salt.utils.data.traverse_dict( 1291 pillar, 1292 parent, 1293 default=None, 1294 delimiter=self.opts["decrypt_pillar_delimiter"], 1295 ) 1296 if ptr is not None: 1297 ptr[child] = ret 1298 except Exception as exc: # pylint: disable=broad-except 1299 msg = "Failed to decrypt pillar key '{}': {}".format(key, exc) 1300 errors.append(msg) 1301 log.error(msg, exc_info=True) 1302 return errors 1303 1304 def destroy(self): 1305 """ 1306 This method exist in order to be API compatible with RemotePillar 1307 """ 1308 if self._closing: 1309 return 1310 self._closing = True 1311 1312 # pylint: disable=W1701 1313 def __del__(self): 1314 self.destroy() 1315 1316 # pylint: enable=W1701 1317 1318 1319# TODO: actually migrate from Pillar to AsyncPillar to allow for futures in 1320# ext_pillar etc. 1321class AsyncPillar(Pillar): 1322 @salt.ext.tornado.gen.coroutine 1323 def compile_pillar(self, ext=True): 1324 ret = super().compile_pillar(ext=ext) 1325 raise salt.ext.tornado.gen.Return(ret) 1326