1""" 2 salt.utils.master 3 ----------------- 4 5 Utilities that can only be used on a salt master. 6 7""" 8import logging 9import os 10import signal 11from threading import Event, Thread 12 13import salt.cache 14import salt.client 15import salt.config 16import salt.log 17import salt.payload 18import salt.pillar 19import salt.utils.atomicfile 20import salt.utils.files 21import salt.utils.minions 22import salt.utils.platform 23import salt.utils.stringutils 24import salt.utils.verify 25from salt.exceptions import SaltException 26from salt.utils.cache import CacheCli as cache_cli 27from salt.utils.process import Process 28from salt.utils.zeromq import zmq 29 30log = logging.getLogger(__name__) 31 32 33def get_running_jobs(opts): 34 """ 35 Return the running jobs on this minion 36 """ 37 38 ret = [] 39 proc_dir = os.path.join(opts["cachedir"], "proc") 40 if not os.path.isdir(proc_dir): 41 return ret 42 for fn_ in os.listdir(proc_dir): 43 path = os.path.join(proc_dir, fn_) 44 try: 45 data = _read_proc_file(path, opts) 46 if data is not None: 47 ret.append(data) 48 except OSError: 49 # proc files may be removed at any time during this process by 50 # the master process that is executing the JID in question, so 51 # we must ignore ENOENT during this process 52 log.trace("%s removed during processing by master process", path) 53 return ret 54 55 56def _read_proc_file(path, opts): 57 """ 58 Return a dict of JID metadata, or None 59 """ 60 with salt.utils.files.fopen(path, "rb") as fp_: 61 buf = fp_.read() 62 fp_.close() 63 if buf: 64 data = salt.payload.loads(buf) 65 else: 66 # Proc file is empty, remove 67 try: 68 os.remove(path) 69 except OSError: 70 log.debug("Unable to remove proc file %s.", path) 71 return None 72 if not isinstance(data, dict): 73 # Invalid serial object 74 return None 75 if not salt.utils.process.os_is_running(data["pid"]): 76 # The process is no longer running, clear out the file and 77 # continue 78 try: 79 os.remove(path) 80 except OSError: 81 log.debug("Unable to remove proc file %s.", path) 82 return None 83 84 if not _check_cmdline(data): 85 pid = data.get("pid") 86 if pid: 87 log.warning("PID %s exists but does not appear to be a salt process.", pid) 88 try: 89 os.remove(path) 90 except OSError: 91 log.debug("Unable to remove proc file %s.", path) 92 return None 93 return data 94 95 96def _check_cmdline(data): 97 """ 98 In some cases where there are an insane number of processes being created 99 on a system a PID can get recycled or assigned to a non-Salt process. 100 On Linux this fn checks to make sure the PID we are checking on is actually 101 a Salt process. 102 103 For non-Linux systems we punt and just return True 104 """ 105 if not salt.utils.platform.is_linux(): 106 return True 107 pid = data.get("pid") 108 if not pid: 109 return False 110 if not os.path.isdir("/proc"): 111 return True 112 path = os.path.join("/proc/{}/cmdline".format(pid)) 113 if not os.path.isfile(path): 114 return False 115 try: 116 with salt.utils.files.fopen(path, "rb") as fp_: 117 return b"salt" in fp_.read() 118 except OSError: 119 return False 120 121 122class MasterPillarUtil: 123 """ 124 Helper utility for easy access to targeted minion grain and 125 pillar data, either from cached data on the master or retrieved 126 on demand, or (by default) both. 127 128 The minion pillar data returned in get_minion_pillar() is 129 compiled directly from salt.pillar.Pillar on the master to 130 avoid any possible 'pillar poisoning' from a compromised or 131 untrusted minion. 132 133 ** However, the minion grains are still possibly entirely 134 supplied by the minion. ** 135 136 Example use case: 137 For runner modules that need access minion pillar data, 138 MasterPillarUtil.get_minion_pillar should be used instead 139 of getting the pillar data by executing the "pillar" module 140 on the minions: 141 142 # my_runner.py 143 tgt = 'web*' 144 pillar_util = salt.utils.master.MasterPillarUtil(tgt, tgt_type='glob', opts=__opts__) 145 pillar_data = pillar_util.get_minion_pillar() 146 """ 147 148 def __init__( 149 self, 150 tgt="", 151 tgt_type="glob", 152 saltenv=None, 153 use_cached_grains=True, 154 use_cached_pillar=True, 155 grains_fallback=True, 156 pillar_fallback=True, 157 opts=None, 158 ): 159 160 log.debug("New instance of %s created.", self.__class__.__name__) 161 if opts is None: 162 log.error("%s: Missing master opts init arg.", self.__class__.__name__) 163 raise SaltException( 164 "{}: Missing master opts init arg.".format(self.__class__.__name__) 165 ) 166 else: 167 self.opts = opts 168 self.tgt = tgt 169 self.tgt_type = tgt_type 170 self.saltenv = saltenv 171 self.use_cached_grains = use_cached_grains 172 self.use_cached_pillar = use_cached_pillar 173 self.grains_fallback = grains_fallback 174 self.pillar_fallback = pillar_fallback 175 self.cache = salt.cache.factory(opts) 176 log.debug( 177 "Init settings: tgt: '%s', tgt_type: '%s', saltenv: '%s', " 178 "use_cached_grains: %s, use_cached_pillar: %s, " 179 "grains_fallback: %s, pillar_fallback: %s", 180 tgt, 181 tgt_type, 182 saltenv, 183 use_cached_grains, 184 use_cached_pillar, 185 grains_fallback, 186 pillar_fallback, 187 ) 188 189 def _get_cached_mine_data(self, *minion_ids): 190 # Return one dict with the cached mine data of the targeted minions 191 mine_data = {minion_id: {} for minion_id in minion_ids} 192 if not self.opts.get("minion_data_cache", False) and not self.opts.get( 193 "enforce_mine_cache", False 194 ): 195 log.debug( 196 "Skipping cached mine data minion_data_cache" 197 "and enfore_mine_cache are both disabled." 198 ) 199 return mine_data 200 if not minion_ids: 201 minion_ids = self.cache.list("minions") 202 for minion_id in minion_ids: 203 if not salt.utils.verify.valid_id(self.opts, minion_id): 204 continue 205 mdata = self.cache.fetch("minions/{}".format(minion_id), "mine") 206 if isinstance(mdata, dict): 207 mine_data[minion_id] = mdata 208 return mine_data 209 210 def _get_cached_minion_data(self, *minion_ids): 211 # Return two separate dicts of cached grains and pillar data of the 212 # minions 213 grains = {minion_id: {} for minion_id in minion_ids} 214 pillars = grains.copy() 215 if not self.opts.get("minion_data_cache", False): 216 log.debug("Skipping cached data because minion_data_cache is not enabled.") 217 return grains, pillars 218 if not minion_ids: 219 minion_ids = self.cache.list("minions") 220 for minion_id in minion_ids: 221 if not salt.utils.verify.valid_id(self.opts, minion_id): 222 continue 223 mdata = self.cache.fetch("minions/{}".format(minion_id), "data") 224 if not isinstance(mdata, dict): 225 log.warning( 226 "cache.fetch should always return a dict. ReturnedType: %s," 227 " MinionId: %s", 228 type(mdata).__name__, 229 minion_id, 230 ) 231 continue 232 if "grains" in mdata: 233 grains[minion_id] = mdata["grains"] 234 if "pillar" in mdata: 235 pillars[minion_id] = mdata["pillar"] 236 return grains, pillars 237 238 def _get_live_minion_grains(self, minion_ids): 239 # Returns a dict of grains fetched directly from the minions 240 log.debug('Getting live grains for minions: "%s"', minion_ids) 241 with salt.client.get_local_client(self.opts["conf_file"]) as client: 242 return client.cmd( 243 ",".join(minion_ids), 244 "grains.items", 245 timeout=self.opts["timeout"], 246 tgt_type="list", 247 ) 248 249 def _get_live_minion_pillar(self, minion_id=None, minion_grains=None): 250 # Returns a dict of pillar data for one minion 251 if minion_id is None: 252 return {} 253 if not minion_grains: 254 log.warning("Cannot get pillar data for %s: no grains supplied.", minion_id) 255 return {} 256 log.debug("Getting live pillar for %s", minion_id) 257 pillar = salt.pillar.Pillar( 258 self.opts, minion_grains, minion_id, self.saltenv, self.opts["ext_pillar"] 259 ) 260 log.debug("Compiling pillar for %s", minion_id) 261 ret = pillar.compile_pillar() 262 return ret 263 264 def _get_minion_grains(self, *minion_ids, **kwargs): 265 # Get the minion grains either from cache or from a direct query 266 # on the minion. By default try to use cached grains first, then 267 # fall back to querying the minion directly. 268 ret = {} 269 cached_grains = kwargs.get("cached_grains", {}) 270 cret = {} 271 lret = {} 272 if self.use_cached_grains: 273 cret = { 274 minion_id: mcache 275 for (minion_id, mcache) in cached_grains.items() 276 if mcache 277 } 278 missed_minions = [ 279 minion_id for minion_id in minion_ids if minion_id not in cret 280 ] 281 log.debug("Missed cached minion grains for: %s", missed_minions) 282 if self.grains_fallback: 283 lret = self._get_live_minion_grains(missed_minions) 284 ret = dict( 285 list({minion_id: {} for minion_id in minion_ids}.items()) 286 + list(lret.items()) 287 + list(cret.items()) 288 ) 289 else: 290 lret = self._get_live_minion_grains(minion_ids) 291 missed_minions = [ 292 minion_id for minion_id in minion_ids if minion_id not in lret 293 ] 294 log.debug("Missed live minion grains for: %s", missed_minions) 295 if self.grains_fallback: 296 cret = { 297 minion_id: mcache 298 for (minion_id, mcache) in cached_grains.items() 299 if mcache 300 } 301 ret = dict( 302 list({minion_id: {} for minion_id in minion_ids}.items()) 303 + list(lret.items()) 304 + list(cret.items()) 305 ) 306 return ret 307 308 def _get_minion_pillar(self, *minion_ids, **kwargs): 309 # Get the minion pillar either from cache or from a direct query 310 # on the minion. By default try use the cached pillar first, then 311 # fall back to rendering pillar on demand with the supplied grains. 312 ret = {} 313 grains = kwargs.get("grains", {}) 314 cached_pillar = kwargs.get("cached_pillar", {}) 315 cret = {} 316 lret = {} 317 if self.use_cached_pillar: 318 cret = { 319 minion_id: mcache 320 for (minion_id, mcache) in cached_pillar.items() 321 if mcache 322 } 323 missed_minions = [ 324 minion_id for minion_id in minion_ids if minion_id not in cret 325 ] 326 log.debug("Missed cached minion pillars for: %s", missed_minions) 327 if self.pillar_fallback: 328 lret = { 329 minion_id: self._get_live_minion_pillar( 330 minion_id, grains.get(minion_id, {}) 331 ) 332 for minion_id in missed_minions 333 } 334 ret = dict( 335 list({minion_id: {} for minion_id in minion_ids}.items()) 336 + list(lret.items()) 337 + list(cret.items()) 338 ) 339 else: 340 lret = { 341 minion_id: self._get_live_minion_pillar( 342 minion_id, grains.get(minion_id, {}) 343 ) 344 for minion_id in minion_ids 345 } 346 missed_minions = [ 347 minion_id for minion_id in minion_ids if minion_id not in lret 348 ] 349 log.debug("Missed live minion pillars for: %s", missed_minions) 350 if self.pillar_fallback: 351 cret = { 352 minion_id: mcache 353 for (minion_id, mcache) in cached_pillar.items() 354 if mcache 355 } 356 ret = dict( 357 list({minion_id: {} for minion_id in minion_ids}.items()) 358 + list(lret.items()) 359 + list(cret.items()) 360 ) 361 return ret 362 363 def _tgt_to_list(self): 364 # Return a list of minion ids that match the target and tgt_type 365 minion_ids = [] 366 ckminions = salt.utils.minions.CkMinions(self.opts) 367 _res = ckminions.check_minions(self.tgt, self.tgt_type) 368 minion_ids = _res["minions"] 369 if not minion_ids: 370 log.debug( 371 'No minions matched for tgt="%s" and tgt_type="%s"', 372 self.tgt, 373 self.tgt_type, 374 ) 375 return {} 376 log.debug( 377 'Matching minions for tgt="%s" and tgt_type="%s": %s', 378 self.tgt, 379 self.tgt_type, 380 minion_ids, 381 ) 382 return minion_ids 383 384 def get_minion_pillar(self): 385 """ 386 Get pillar data for the targeted minions, either by fetching the 387 cached minion data on the master, or by compiling the minion's 388 pillar data on the master. 389 390 For runner modules that need access minion pillar data, this 391 function should be used instead of getting the pillar data by 392 executing the pillar module on the minions. 393 394 By default, this function tries hard to get the pillar data: 395 - Try to get the cached minion grains and pillar if the 396 master has minion_data_cache: True 397 - If the pillar data for the minion is cached, use it. 398 - If there is no cached grains/pillar data for a minion, 399 then try to get the minion grains directly from the minion. 400 - Use the minion grains to compile the pillar directly from the 401 master using salt.pillar.Pillar 402 """ 403 minion_pillars = {} 404 minion_grains = {} 405 minion_ids = self._tgt_to_list() 406 if self.tgt and not minion_ids: 407 return {} 408 if any( 409 arg 410 for arg in [ 411 self.use_cached_grains, 412 self.use_cached_pillar, 413 self.grains_fallback, 414 self.pillar_fallback, 415 ] 416 ): 417 log.debug("Getting cached minion data") 418 cached_minion_grains, cached_minion_pillars = self._get_cached_minion_data( 419 *minion_ids 420 ) 421 else: 422 cached_minion_grains = {} 423 cached_minion_pillars = {} 424 log.debug("Getting minion grain data for: %s", minion_ids) 425 minion_grains = self._get_minion_grains( 426 *minion_ids, cached_grains=cached_minion_grains 427 ) 428 log.debug("Getting minion pillar data for: %s", minion_ids) 429 minion_pillars = self._get_minion_pillar( 430 *minion_ids, grains=minion_grains, cached_pillar=cached_minion_pillars 431 ) 432 return minion_pillars 433 434 def get_minion_grains(self): 435 """ 436 Get grains data for the targeted minions, either by fetching the 437 cached minion data on the master, or by fetching the grains 438 directly on the minion. 439 440 By default, this function tries hard to get the grains data: 441 - Try to get the cached minion grains if the master 442 has minion_data_cache: True 443 - If the grains data for the minion is cached, use it. 444 - If there is no cached grains data for a minion, 445 then try to get the minion grains directly from the minion. 446 """ 447 minion_grains = {} 448 minion_ids = self._tgt_to_list() 449 if not minion_ids: 450 return {} 451 if any(arg for arg in [self.use_cached_grains, self.grains_fallback]): 452 log.debug("Getting cached minion data.") 453 cached_minion_grains, cached_minion_pillars = self._get_cached_minion_data( 454 *minion_ids 455 ) 456 else: 457 cached_minion_grains = {} 458 log.debug("Getting minion grain data for: %s", minion_ids) 459 minion_grains = self._get_minion_grains( 460 *minion_ids, cached_grains=cached_minion_grains 461 ) 462 return minion_grains 463 464 def get_cached_mine_data(self): 465 """ 466 Get cached mine data for the targeted minions. 467 """ 468 mine_data = {} 469 minion_ids = self._tgt_to_list() 470 log.debug("Getting cached mine data for: %s", minion_ids) 471 mine_data = self._get_cached_mine_data(*minion_ids) 472 return mine_data 473 474 def clear_cached_minion_data( 475 self, 476 clear_pillar=False, 477 clear_grains=False, 478 clear_mine=False, 479 clear_mine_func=None, 480 ): 481 """ 482 Clear the cached data/files for the targeted minions. 483 """ 484 clear_what = [] 485 if clear_pillar: 486 clear_what.append("pillar") 487 if clear_grains: 488 clear_what.append("grains") 489 if clear_mine: 490 clear_what.append("mine") 491 if clear_mine_func is not None: 492 clear_what.append("mine_func: '{}'".format(clear_mine_func)) 493 if not clear_what: 494 log.debug("No cached data types specified for clearing.") 495 return False 496 497 minion_ids = self._tgt_to_list() 498 log.debug("Clearing cached %s data for: %s", ", ".join(clear_what), minion_ids) 499 if clear_pillar == clear_grains: 500 # clear_pillar and clear_grains are both True or both False. 501 # This means we don't deal with pillar/grains caches at all. 502 grains = {} 503 pillars = {} 504 else: 505 # Unless both clear_pillar and clear_grains are True, we need 506 # to read in the pillar/grains data since they are both stored 507 # in the same file, 'data.p' 508 grains, pillars = self._get_cached_minion_data(*minion_ids) 509 try: 510 c_minions = self.cache.list("minions") 511 for minion_id in minion_ids: 512 if not salt.utils.verify.valid_id(self.opts, minion_id): 513 continue 514 515 if minion_id not in c_minions: 516 # Cache bank for this minion does not exist. Nothing to do. 517 continue 518 bank = "minions/{}".format(minion_id) 519 minion_pillar = pillars.pop(minion_id, False) 520 minion_grains = grains.pop(minion_id, False) 521 if ( 522 (clear_pillar and clear_grains) 523 or (clear_pillar and not minion_grains) 524 or (clear_grains and not minion_pillar) 525 ): 526 # Not saving pillar or grains, so just delete the cache file 527 self.cache.flush(bank, "data") 528 elif clear_pillar and minion_grains: 529 self.cache.store(bank, "data", {"grains": minion_grains}) 530 elif clear_grains and minion_pillar: 531 self.cache.store(bank, "data", {"pillar": minion_pillar}) 532 if clear_mine: 533 # Delete the whole mine file 534 self.cache.flush(bank, "mine") 535 elif clear_mine_func is not None: 536 # Delete a specific function from the mine file 537 mine_data = self.cache.fetch(bank, "mine") 538 if isinstance(mine_data, dict): 539 if mine_data.pop(clear_mine_func, False): 540 self.cache.store(bank, "mine", mine_data) 541 except OSError: 542 return True 543 return True 544 545 546class CacheTimer(Thread): 547 """ 548 A basic timer class the fires timer-events every second. 549 This is used for cleanup by the ConnectedCache() 550 """ 551 552 def __init__(self, opts, event): 553 Thread.__init__(self) 554 self.opts = opts 555 self.stopped = event 556 self.daemon = True 557 self.timer_sock = os.path.join(self.opts["sock_dir"], "con_timer.ipc") 558 559 def run(self): 560 """ 561 main loop that fires the event every second 562 """ 563 context = zmq.Context() 564 # the socket for outgoing timer events 565 socket = context.socket(zmq.PUB) 566 socket.setsockopt(zmq.LINGER, 100) 567 socket.bind("ipc://" + self.timer_sock) 568 569 count = 0 570 log.debug("ConCache-Timer started") 571 while not self.stopped.wait(1): 572 socket.send(salt.payload.dumps(count)) 573 574 count += 1 575 if count >= 60: 576 count = 0 577 578 579class CacheWorker(Process): 580 """ 581 Worker for ConnectedCache which runs in its 582 own process to prevent blocking of ConnectedCache 583 main-loop when refreshing minion-list 584 """ 585 586 def __init__(self, opts, **kwargs): 587 """ 588 Sets up the zmq-connection to the ConCache 589 """ 590 super().__init__(**kwargs) 591 self.opts = opts 592 593 def run(self): 594 """ 595 Gather currently connected minions and update the cache 596 """ 597 new_mins = list(salt.utils.minions.CkMinions(self.opts).connected_ids()) 598 cc = cache_cli(self.opts) 599 cc.get_cached() 600 cc.put_cache([new_mins]) 601 log.debug("ConCache CacheWorker update finished") 602 603 604class ConnectedCache(Process): 605 """ 606 Provides access to all minions ids that the master has 607 successfully authenticated. The cache is cleaned up regularly by 608 comparing it to the IPs that have open connections to 609 the master publisher port. 610 """ 611 612 def __init__(self, opts, **kwargs): 613 """ 614 starts the timer and inits the cache itself 615 """ 616 super().__init__(**kwargs) 617 log.debug("ConCache initializing...") 618 619 # the possible settings for the cache 620 self.opts = opts 621 622 # the actual cached minion ids 623 self.minions = [] 624 625 self.cache_sock = os.path.join(self.opts["sock_dir"], "con_cache.ipc") 626 self.update_sock = os.path.join(self.opts["sock_dir"], "con_upd.ipc") 627 self.upd_t_sock = os.path.join(self.opts["sock_dir"], "con_timer.ipc") 628 self.cleanup() 629 630 # the timer provides 1-second intervals to the loop in run() 631 # to make the cache system most responsive, we do not use a loop- 632 # delay which makes it hard to get 1-second intervals without a timer 633 self.timer_stop = Event() 634 self.timer = CacheTimer(self.opts, self.timer_stop) 635 self.timer.start() 636 self.running = True 637 638 def signal_handler(self, sig, frame): 639 """ 640 handle signals and shutdown 641 """ 642 self.stop() 643 644 def cleanup(self): 645 """ 646 remove sockets on shutdown 647 """ 648 log.debug("ConCache cleaning up") 649 if os.path.exists(self.cache_sock): 650 os.remove(self.cache_sock) 651 if os.path.exists(self.update_sock): 652 os.remove(self.update_sock) 653 if os.path.exists(self.upd_t_sock): 654 os.remove(self.upd_t_sock) 655 656 def secure(self): 657 """ 658 secure the sockets for root-only access 659 """ 660 log.debug("ConCache securing sockets") 661 if os.path.exists(self.cache_sock): 662 os.chmod(self.cache_sock, 0o600) 663 if os.path.exists(self.update_sock): 664 os.chmod(self.update_sock, 0o600) 665 if os.path.exists(self.upd_t_sock): 666 os.chmod(self.upd_t_sock, 0o600) 667 668 def stop(self): 669 """ 670 shutdown cache process 671 """ 672 # avoid getting called twice 673 self.cleanup() 674 if self.running: 675 self.running = False 676 self.timer_stop.set() 677 self.timer.join() 678 679 def run(self): 680 """ 681 Main loop of the ConCache, starts updates in intervals and 682 answers requests from the MWorkers 683 """ 684 context = zmq.Context() 685 # the socket for incoming cache requests 686 creq_in = context.socket(zmq.REP) 687 creq_in.setsockopt(zmq.LINGER, 100) 688 creq_in.bind("ipc://" + self.cache_sock) 689 690 # the socket for incoming cache-updates from workers 691 cupd_in = context.socket(zmq.SUB) 692 cupd_in.setsockopt(zmq.SUBSCRIBE, b"") 693 cupd_in.setsockopt(zmq.LINGER, 100) 694 cupd_in.bind("ipc://" + self.update_sock) 695 696 # the socket for the timer-event 697 timer_in = context.socket(zmq.SUB) 698 timer_in.setsockopt(zmq.SUBSCRIBE, b"") 699 timer_in.setsockopt(zmq.LINGER, 100) 700 timer_in.connect("ipc://" + self.upd_t_sock) 701 702 poller = zmq.Poller() 703 poller.register(creq_in, zmq.POLLIN) 704 poller.register(cupd_in, zmq.POLLIN) 705 poller.register(timer_in, zmq.POLLIN) 706 707 # register a signal handler 708 signal.signal(signal.SIGINT, self.signal_handler) 709 710 # secure the sockets from the world 711 self.secure() 712 713 log.info("ConCache started") 714 715 while self.running: 716 717 # we check for new events with the poller 718 try: 719 socks = dict(poller.poll(1)) 720 except KeyboardInterrupt: 721 self.stop() 722 except zmq.ZMQError as zmq_err: 723 log.error("ConCache ZeroMQ-Error occurred") 724 log.exception(zmq_err) 725 self.stop() 726 727 # check for next cache-request 728 if socks.get(creq_in) == zmq.POLLIN: 729 msg = salt.payload.loads(creq_in.recv()) 730 log.debug("ConCache Received request: %s", msg) 731 732 # requests to the minion list are send as str's 733 if isinstance(msg, str): 734 if msg == "minions": 735 # Send reply back to client 736 reply = salt.payload.dumps(self.minions) 737 creq_in.send(reply) 738 739 # check for next cache-update from workers 740 if socks.get(cupd_in) == zmq.POLLIN: 741 new_c_data = salt.payload.loads(cupd_in.recv()) 742 # tell the worker to exit 743 # cupd_in.send(serial.dumps('ACK')) 744 745 # check if the returned data is usable 746 if not isinstance(new_c_data, list): 747 log.error("ConCache Worker returned unusable result") 748 del new_c_data 749 continue 750 751 # the cache will receive lists of minions 752 # 1. if the list only has 1 item, its from an MWorker, we append it 753 # 2. if the list contains another list, its from a CacheWorker and 754 # the currently cached minions are replaced with that list 755 # 3. anything else is considered malformed 756 757 try: 758 759 if not new_c_data: 760 log.debug("ConCache Got empty update from worker") 761 continue 762 763 data = new_c_data[0] 764 765 if isinstance(data, str): 766 if data not in self.minions: 767 log.debug( 768 "ConCache Adding minion %s to cache", new_c_data[0] 769 ) 770 self.minions.append(data) 771 772 elif isinstance(data, list): 773 log.debug("ConCache Replacing minion list from worker") 774 self.minions = data 775 776 except IndexError: 777 log.debug("ConCache Got malformed result dict from worker") 778 del new_c_data 779 780 log.info("ConCache %s entries in cache", len(self.minions)) 781 782 # check for next timer-event to start new jobs 783 if socks.get(timer_in) == zmq.POLLIN: 784 sec_event = salt.payload.loads(timer_in.recv()) 785 786 # update the list every 30 seconds 787 if int(sec_event % 30) == 0: 788 cw = CacheWorker(self.opts) 789 cw.start() 790 791 self.stop() 792 creq_in.close() 793 cupd_in.close() 794 timer_in.close() 795 context.term() 796 log.debug("ConCache Shutting down") 797 798 799def ping_all_connected_minions(opts): 800 if opts["minion_data_cache"]: 801 tgt = list(salt.utils.minions.CkMinions(opts).connected_ids()) 802 form = "list" 803 else: 804 tgt = "*" 805 form = "glob" 806 with salt.client.LocalClient() as client: 807 client.cmd_async(tgt, "test.ping", tgt_type=form) 808 809 810def get_master_key(key_user, opts, skip_perm_errors=False): 811 if key_user == "root": 812 if opts.get("user", "root") != "root": 813 key_user = opts.get("user", "root") 814 if key_user.startswith("sudo_"): 815 key_user = opts.get("user", "root") 816 if salt.utils.platform.is_windows(): 817 # The username may contain '\' if it is in Windows 818 # 'DOMAIN\username' format. Fix this for the keyfile path. 819 key_user = key_user.replace("\\", "_") 820 keyfile = os.path.join(opts["cachedir"], ".{}_key".format(key_user)) 821 # Make sure all key parent directories are accessible 822 salt.utils.verify.check_path_traversal(opts["cachedir"], key_user, skip_perm_errors) 823 824 try: 825 with salt.utils.files.fopen(keyfile, "r") as key: 826 return key.read() 827 except OSError: 828 # Fall back to eauth 829 return "" 830 831 832def get_values_of_matching_keys(pattern_dict, user_name): 833 """ 834 Check a whitelist and/or blacklist to see if the value matches it. 835 """ 836 ret = [] 837 for expr in pattern_dict: 838 if salt.utils.stringutils.expr_match(user_name, expr): 839 ret.extend(pattern_dict[expr]) 840 return ret 841 842 843# test code for the ConCache class 844if __name__ == "__main__": 845 846 opts = salt.config.master_config("/etc/salt/master") 847 848 conc = ConnectedCache(opts) 849 conc.start() 850