1"""
2    salt.utils.master
3    -----------------
4
5    Utilities that can only be used on a salt master.
6
7"""
8import logging
9import os
10import signal
11from threading import Event, Thread
12
13import salt.cache
14import salt.client
15import salt.config
16import salt.log
17import salt.payload
18import salt.pillar
19import salt.utils.atomicfile
20import salt.utils.files
21import salt.utils.minions
22import salt.utils.platform
23import salt.utils.stringutils
24import salt.utils.verify
25from salt.exceptions import SaltException
26from salt.utils.cache import CacheCli as cache_cli
27from salt.utils.process import Process
28from salt.utils.zeromq import zmq
29
30log = logging.getLogger(__name__)
31
32
33def get_running_jobs(opts):
34    """
35    Return the running jobs on this minion
36    """
37
38    ret = []
39    proc_dir = os.path.join(opts["cachedir"], "proc")
40    if not os.path.isdir(proc_dir):
41        return ret
42    for fn_ in os.listdir(proc_dir):
43        path = os.path.join(proc_dir, fn_)
44        try:
45            data = _read_proc_file(path, opts)
46            if data is not None:
47                ret.append(data)
48        except OSError:
49            # proc files may be removed at any time during this process by
50            # the master process that is executing the JID in question, so
51            # we must ignore ENOENT during this process
52            log.trace("%s removed during processing by master process", path)
53    return ret
54
55
56def _read_proc_file(path, opts):
57    """
58    Return a dict of JID metadata, or None
59    """
60    with salt.utils.files.fopen(path, "rb") as fp_:
61        buf = fp_.read()
62        fp_.close()
63        if buf:
64            data = salt.payload.loads(buf)
65        else:
66            # Proc file is empty, remove
67            try:
68                os.remove(path)
69            except OSError:
70                log.debug("Unable to remove proc file %s.", path)
71            return None
72    if not isinstance(data, dict):
73        # Invalid serial object
74        return None
75    if not salt.utils.process.os_is_running(data["pid"]):
76        # The process is no longer running, clear out the file and
77        # continue
78        try:
79            os.remove(path)
80        except OSError:
81            log.debug("Unable to remove proc file %s.", path)
82        return None
83
84    if not _check_cmdline(data):
85        pid = data.get("pid")
86        if pid:
87            log.warning("PID %s exists but does not appear to be a salt process.", pid)
88        try:
89            os.remove(path)
90        except OSError:
91            log.debug("Unable to remove proc file %s.", path)
92        return None
93    return data
94
95
96def _check_cmdline(data):
97    """
98    In some cases where there are an insane number of processes being created
99    on a system a PID can get recycled or assigned to a non-Salt process.
100    On Linux this fn checks to make sure the PID we are checking on is actually
101    a Salt process.
102
103    For non-Linux systems we punt and just return True
104    """
105    if not salt.utils.platform.is_linux():
106        return True
107    pid = data.get("pid")
108    if not pid:
109        return False
110    if not os.path.isdir("/proc"):
111        return True
112    path = os.path.join("/proc/{}/cmdline".format(pid))
113    if not os.path.isfile(path):
114        return False
115    try:
116        with salt.utils.files.fopen(path, "rb") as fp_:
117            return b"salt" in fp_.read()
118    except OSError:
119        return False
120
121
122class MasterPillarUtil:
123    """
124    Helper utility for easy access to targeted minion grain and
125    pillar data, either from cached data on the master or retrieved
126    on demand, or (by default) both.
127
128    The minion pillar data returned in get_minion_pillar() is
129    compiled directly from salt.pillar.Pillar on the master to
130    avoid any possible 'pillar poisoning' from a compromised or
131    untrusted minion.
132
133    ** However, the minion grains are still possibly entirely
134    supplied by the minion. **
135
136    Example use case:
137        For runner modules that need access minion pillar data,
138        MasterPillarUtil.get_minion_pillar should be used instead
139        of getting the pillar data by executing the "pillar" module
140        on the minions:
141
142        # my_runner.py
143        tgt = 'web*'
144        pillar_util = salt.utils.master.MasterPillarUtil(tgt, tgt_type='glob', opts=__opts__)
145        pillar_data = pillar_util.get_minion_pillar()
146    """
147
148    def __init__(
149        self,
150        tgt="",
151        tgt_type="glob",
152        saltenv=None,
153        use_cached_grains=True,
154        use_cached_pillar=True,
155        grains_fallback=True,
156        pillar_fallback=True,
157        opts=None,
158    ):
159
160        log.debug("New instance of %s created.", self.__class__.__name__)
161        if opts is None:
162            log.error("%s: Missing master opts init arg.", self.__class__.__name__)
163            raise SaltException(
164                "{}: Missing master opts init arg.".format(self.__class__.__name__)
165            )
166        else:
167            self.opts = opts
168        self.tgt = tgt
169        self.tgt_type = tgt_type
170        self.saltenv = saltenv
171        self.use_cached_grains = use_cached_grains
172        self.use_cached_pillar = use_cached_pillar
173        self.grains_fallback = grains_fallback
174        self.pillar_fallback = pillar_fallback
175        self.cache = salt.cache.factory(opts)
176        log.debug(
177            "Init settings: tgt: '%s', tgt_type: '%s', saltenv: '%s', "
178            "use_cached_grains: %s, use_cached_pillar: %s, "
179            "grains_fallback: %s, pillar_fallback: %s",
180            tgt,
181            tgt_type,
182            saltenv,
183            use_cached_grains,
184            use_cached_pillar,
185            grains_fallback,
186            pillar_fallback,
187        )
188
189    def _get_cached_mine_data(self, *minion_ids):
190        # Return one dict with the cached mine data of the targeted minions
191        mine_data = {minion_id: {} for minion_id in minion_ids}
192        if not self.opts.get("minion_data_cache", False) and not self.opts.get(
193            "enforce_mine_cache", False
194        ):
195            log.debug(
196                "Skipping cached mine data minion_data_cache"
197                "and enfore_mine_cache are both disabled."
198            )
199            return mine_data
200        if not minion_ids:
201            minion_ids = self.cache.list("minions")
202        for minion_id in minion_ids:
203            if not salt.utils.verify.valid_id(self.opts, minion_id):
204                continue
205            mdata = self.cache.fetch("minions/{}".format(minion_id), "mine")
206            if isinstance(mdata, dict):
207                mine_data[minion_id] = mdata
208        return mine_data
209
210    def _get_cached_minion_data(self, *minion_ids):
211        # Return two separate dicts of cached grains and pillar data of the
212        # minions
213        grains = {minion_id: {} for minion_id in minion_ids}
214        pillars = grains.copy()
215        if not self.opts.get("minion_data_cache", False):
216            log.debug("Skipping cached data because minion_data_cache is not enabled.")
217            return grains, pillars
218        if not minion_ids:
219            minion_ids = self.cache.list("minions")
220        for minion_id in minion_ids:
221            if not salt.utils.verify.valid_id(self.opts, minion_id):
222                continue
223            mdata = self.cache.fetch("minions/{}".format(minion_id), "data")
224            if not isinstance(mdata, dict):
225                log.warning(
226                    "cache.fetch should always return a dict. ReturnedType: %s,"
227                    " MinionId: %s",
228                    type(mdata).__name__,
229                    minion_id,
230                )
231                continue
232            if "grains" in mdata:
233                grains[minion_id] = mdata["grains"]
234            if "pillar" in mdata:
235                pillars[minion_id] = mdata["pillar"]
236        return grains, pillars
237
238    def _get_live_minion_grains(self, minion_ids):
239        # Returns a dict of grains fetched directly from the minions
240        log.debug('Getting live grains for minions: "%s"', minion_ids)
241        with salt.client.get_local_client(self.opts["conf_file"]) as client:
242            return client.cmd(
243                ",".join(minion_ids),
244                "grains.items",
245                timeout=self.opts["timeout"],
246                tgt_type="list",
247            )
248
249    def _get_live_minion_pillar(self, minion_id=None, minion_grains=None):
250        # Returns a dict of pillar data for one minion
251        if minion_id is None:
252            return {}
253        if not minion_grains:
254            log.warning("Cannot get pillar data for %s: no grains supplied.", minion_id)
255            return {}
256        log.debug("Getting live pillar for %s", minion_id)
257        pillar = salt.pillar.Pillar(
258            self.opts, minion_grains, minion_id, self.saltenv, self.opts["ext_pillar"]
259        )
260        log.debug("Compiling pillar for %s", minion_id)
261        ret = pillar.compile_pillar()
262        return ret
263
264    def _get_minion_grains(self, *minion_ids, **kwargs):
265        # Get the minion grains either from cache or from a direct query
266        # on the minion. By default try to use cached grains first, then
267        # fall back to querying the minion directly.
268        ret = {}
269        cached_grains = kwargs.get("cached_grains", {})
270        cret = {}
271        lret = {}
272        if self.use_cached_grains:
273            cret = {
274                minion_id: mcache
275                for (minion_id, mcache) in cached_grains.items()
276                if mcache
277            }
278            missed_minions = [
279                minion_id for minion_id in minion_ids if minion_id not in cret
280            ]
281            log.debug("Missed cached minion grains for: %s", missed_minions)
282            if self.grains_fallback:
283                lret = self._get_live_minion_grains(missed_minions)
284            ret = dict(
285                list({minion_id: {} for minion_id in minion_ids}.items())
286                + list(lret.items())
287                + list(cret.items())
288            )
289        else:
290            lret = self._get_live_minion_grains(minion_ids)
291            missed_minions = [
292                minion_id for minion_id in minion_ids if minion_id not in lret
293            ]
294            log.debug("Missed live minion grains for: %s", missed_minions)
295            if self.grains_fallback:
296                cret = {
297                    minion_id: mcache
298                    for (minion_id, mcache) in cached_grains.items()
299                    if mcache
300                }
301            ret = dict(
302                list({minion_id: {} for minion_id in minion_ids}.items())
303                + list(lret.items())
304                + list(cret.items())
305            )
306        return ret
307
308    def _get_minion_pillar(self, *minion_ids, **kwargs):
309        # Get the minion pillar either from cache or from a direct query
310        # on the minion. By default try use the cached pillar first, then
311        # fall back to rendering pillar on demand with the supplied grains.
312        ret = {}
313        grains = kwargs.get("grains", {})
314        cached_pillar = kwargs.get("cached_pillar", {})
315        cret = {}
316        lret = {}
317        if self.use_cached_pillar:
318            cret = {
319                minion_id: mcache
320                for (minion_id, mcache) in cached_pillar.items()
321                if mcache
322            }
323            missed_minions = [
324                minion_id for minion_id in minion_ids if minion_id not in cret
325            ]
326            log.debug("Missed cached minion pillars for: %s", missed_minions)
327            if self.pillar_fallback:
328                lret = {
329                    minion_id: self._get_live_minion_pillar(
330                        minion_id, grains.get(minion_id, {})
331                    )
332                    for minion_id in missed_minions
333                }
334            ret = dict(
335                list({minion_id: {} for minion_id in minion_ids}.items())
336                + list(lret.items())
337                + list(cret.items())
338            )
339        else:
340            lret = {
341                minion_id: self._get_live_minion_pillar(
342                    minion_id, grains.get(minion_id, {})
343                )
344                for minion_id in minion_ids
345            }
346            missed_minions = [
347                minion_id for minion_id in minion_ids if minion_id not in lret
348            ]
349            log.debug("Missed live minion pillars for: %s", missed_minions)
350            if self.pillar_fallback:
351                cret = {
352                    minion_id: mcache
353                    for (minion_id, mcache) in cached_pillar.items()
354                    if mcache
355                }
356            ret = dict(
357                list({minion_id: {} for minion_id in minion_ids}.items())
358                + list(lret.items())
359                + list(cret.items())
360            )
361        return ret
362
363    def _tgt_to_list(self):
364        # Return a list of minion ids that match the target and tgt_type
365        minion_ids = []
366        ckminions = salt.utils.minions.CkMinions(self.opts)
367        _res = ckminions.check_minions(self.tgt, self.tgt_type)
368        minion_ids = _res["minions"]
369        if not minion_ids:
370            log.debug(
371                'No minions matched for tgt="%s" and tgt_type="%s"',
372                self.tgt,
373                self.tgt_type,
374            )
375            return {}
376        log.debug(
377            'Matching minions for tgt="%s" and tgt_type="%s": %s',
378            self.tgt,
379            self.tgt_type,
380            minion_ids,
381        )
382        return minion_ids
383
384    def get_minion_pillar(self):
385        """
386        Get pillar data for the targeted minions, either by fetching the
387        cached minion data on the master, or by compiling the minion's
388        pillar data on the master.
389
390        For runner modules that need access minion pillar data, this
391        function should be used instead of getting the pillar data by
392        executing the pillar module on the minions.
393
394        By default, this function tries hard to get the pillar data:
395            - Try to get the cached minion grains and pillar if the
396                master has minion_data_cache: True
397            - If the pillar data for the minion is cached, use it.
398            - If there is no cached grains/pillar data for a minion,
399                then try to get the minion grains directly from the minion.
400            - Use the minion grains to compile the pillar directly from the
401                master using salt.pillar.Pillar
402        """
403        minion_pillars = {}
404        minion_grains = {}
405        minion_ids = self._tgt_to_list()
406        if self.tgt and not minion_ids:
407            return {}
408        if any(
409            arg
410            for arg in [
411                self.use_cached_grains,
412                self.use_cached_pillar,
413                self.grains_fallback,
414                self.pillar_fallback,
415            ]
416        ):
417            log.debug("Getting cached minion data")
418            cached_minion_grains, cached_minion_pillars = self._get_cached_minion_data(
419                *minion_ids
420            )
421        else:
422            cached_minion_grains = {}
423            cached_minion_pillars = {}
424        log.debug("Getting minion grain data for: %s", minion_ids)
425        minion_grains = self._get_minion_grains(
426            *minion_ids, cached_grains=cached_minion_grains
427        )
428        log.debug("Getting minion pillar data for: %s", minion_ids)
429        minion_pillars = self._get_minion_pillar(
430            *minion_ids, grains=minion_grains, cached_pillar=cached_minion_pillars
431        )
432        return minion_pillars
433
434    def get_minion_grains(self):
435        """
436        Get grains data for the targeted minions, either by fetching the
437        cached minion data on the master, or by fetching the grains
438        directly on the minion.
439
440        By default, this function tries hard to get the grains data:
441            - Try to get the cached minion grains if the master
442                has minion_data_cache: True
443            - If the grains data for the minion is cached, use it.
444            - If there is no cached grains data for a minion,
445                then try to get the minion grains directly from the minion.
446        """
447        minion_grains = {}
448        minion_ids = self._tgt_to_list()
449        if not minion_ids:
450            return {}
451        if any(arg for arg in [self.use_cached_grains, self.grains_fallback]):
452            log.debug("Getting cached minion data.")
453            cached_minion_grains, cached_minion_pillars = self._get_cached_minion_data(
454                *minion_ids
455            )
456        else:
457            cached_minion_grains = {}
458        log.debug("Getting minion grain data for: %s", minion_ids)
459        minion_grains = self._get_minion_grains(
460            *minion_ids, cached_grains=cached_minion_grains
461        )
462        return minion_grains
463
464    def get_cached_mine_data(self):
465        """
466        Get cached mine data for the targeted minions.
467        """
468        mine_data = {}
469        minion_ids = self._tgt_to_list()
470        log.debug("Getting cached mine data for: %s", minion_ids)
471        mine_data = self._get_cached_mine_data(*minion_ids)
472        return mine_data
473
474    def clear_cached_minion_data(
475        self,
476        clear_pillar=False,
477        clear_grains=False,
478        clear_mine=False,
479        clear_mine_func=None,
480    ):
481        """
482        Clear the cached data/files for the targeted minions.
483        """
484        clear_what = []
485        if clear_pillar:
486            clear_what.append("pillar")
487        if clear_grains:
488            clear_what.append("grains")
489        if clear_mine:
490            clear_what.append("mine")
491        if clear_mine_func is not None:
492            clear_what.append("mine_func: '{}'".format(clear_mine_func))
493        if not clear_what:
494            log.debug("No cached data types specified for clearing.")
495            return False
496
497        minion_ids = self._tgt_to_list()
498        log.debug("Clearing cached %s data for: %s", ", ".join(clear_what), minion_ids)
499        if clear_pillar == clear_grains:
500            # clear_pillar and clear_grains are both True or both False.
501            # This means we don't deal with pillar/grains caches at all.
502            grains = {}
503            pillars = {}
504        else:
505            # Unless both clear_pillar and clear_grains are True, we need
506            # to read in the pillar/grains data since they are both stored
507            # in the same file, 'data.p'
508            grains, pillars = self._get_cached_minion_data(*minion_ids)
509        try:
510            c_minions = self.cache.list("minions")
511            for minion_id in minion_ids:
512                if not salt.utils.verify.valid_id(self.opts, minion_id):
513                    continue
514
515                if minion_id not in c_minions:
516                    # Cache bank for this minion does not exist. Nothing to do.
517                    continue
518                bank = "minions/{}".format(minion_id)
519                minion_pillar = pillars.pop(minion_id, False)
520                minion_grains = grains.pop(minion_id, False)
521                if (
522                    (clear_pillar and clear_grains)
523                    or (clear_pillar and not minion_grains)
524                    or (clear_grains and not minion_pillar)
525                ):
526                    # Not saving pillar or grains, so just delete the cache file
527                    self.cache.flush(bank, "data")
528                elif clear_pillar and minion_grains:
529                    self.cache.store(bank, "data", {"grains": minion_grains})
530                elif clear_grains and minion_pillar:
531                    self.cache.store(bank, "data", {"pillar": minion_pillar})
532                if clear_mine:
533                    # Delete the whole mine file
534                    self.cache.flush(bank, "mine")
535                elif clear_mine_func is not None:
536                    # Delete a specific function from the mine file
537                    mine_data = self.cache.fetch(bank, "mine")
538                    if isinstance(mine_data, dict):
539                        if mine_data.pop(clear_mine_func, False):
540                            self.cache.store(bank, "mine", mine_data)
541        except OSError:
542            return True
543        return True
544
545
546class CacheTimer(Thread):
547    """
548    A basic timer class the fires timer-events every second.
549    This is used for cleanup by the ConnectedCache()
550    """
551
552    def __init__(self, opts, event):
553        Thread.__init__(self)
554        self.opts = opts
555        self.stopped = event
556        self.daemon = True
557        self.timer_sock = os.path.join(self.opts["sock_dir"], "con_timer.ipc")
558
559    def run(self):
560        """
561        main loop that fires the event every second
562        """
563        context = zmq.Context()
564        # the socket for outgoing timer events
565        socket = context.socket(zmq.PUB)
566        socket.setsockopt(zmq.LINGER, 100)
567        socket.bind("ipc://" + self.timer_sock)
568
569        count = 0
570        log.debug("ConCache-Timer started")
571        while not self.stopped.wait(1):
572            socket.send(salt.payload.dumps(count))
573
574            count += 1
575            if count >= 60:
576                count = 0
577
578
579class CacheWorker(Process):
580    """
581    Worker for ConnectedCache which runs in its
582    own process to prevent blocking of ConnectedCache
583    main-loop when refreshing minion-list
584    """
585
586    def __init__(self, opts, **kwargs):
587        """
588        Sets up the zmq-connection to the ConCache
589        """
590        super().__init__(**kwargs)
591        self.opts = opts
592
593    def run(self):
594        """
595        Gather currently connected minions and update the cache
596        """
597        new_mins = list(salt.utils.minions.CkMinions(self.opts).connected_ids())
598        cc = cache_cli(self.opts)
599        cc.get_cached()
600        cc.put_cache([new_mins])
601        log.debug("ConCache CacheWorker update finished")
602
603
604class ConnectedCache(Process):
605    """
606    Provides access to all minions ids that the master has
607    successfully authenticated. The cache is cleaned up regularly by
608    comparing it to the IPs that have open connections to
609    the master publisher port.
610    """
611
612    def __init__(self, opts, **kwargs):
613        """
614        starts the timer and inits the cache itself
615        """
616        super().__init__(**kwargs)
617        log.debug("ConCache initializing...")
618
619        # the possible settings for the cache
620        self.opts = opts
621
622        # the actual cached minion ids
623        self.minions = []
624
625        self.cache_sock = os.path.join(self.opts["sock_dir"], "con_cache.ipc")
626        self.update_sock = os.path.join(self.opts["sock_dir"], "con_upd.ipc")
627        self.upd_t_sock = os.path.join(self.opts["sock_dir"], "con_timer.ipc")
628        self.cleanup()
629
630        # the timer provides 1-second intervals to the loop in run()
631        # to make the cache system most responsive, we do not use a loop-
632        # delay which makes it hard to get 1-second intervals without a timer
633        self.timer_stop = Event()
634        self.timer = CacheTimer(self.opts, self.timer_stop)
635        self.timer.start()
636        self.running = True
637
638    def signal_handler(self, sig, frame):
639        """
640        handle signals and shutdown
641        """
642        self.stop()
643
644    def cleanup(self):
645        """
646        remove sockets on shutdown
647        """
648        log.debug("ConCache cleaning up")
649        if os.path.exists(self.cache_sock):
650            os.remove(self.cache_sock)
651        if os.path.exists(self.update_sock):
652            os.remove(self.update_sock)
653        if os.path.exists(self.upd_t_sock):
654            os.remove(self.upd_t_sock)
655
656    def secure(self):
657        """
658        secure the sockets for root-only access
659        """
660        log.debug("ConCache securing sockets")
661        if os.path.exists(self.cache_sock):
662            os.chmod(self.cache_sock, 0o600)
663        if os.path.exists(self.update_sock):
664            os.chmod(self.update_sock, 0o600)
665        if os.path.exists(self.upd_t_sock):
666            os.chmod(self.upd_t_sock, 0o600)
667
668    def stop(self):
669        """
670        shutdown cache process
671        """
672        # avoid getting called twice
673        self.cleanup()
674        if self.running:
675            self.running = False
676            self.timer_stop.set()
677            self.timer.join()
678
679    def run(self):
680        """
681        Main loop of the ConCache, starts updates in intervals and
682        answers requests from the MWorkers
683        """
684        context = zmq.Context()
685        # the socket for incoming cache requests
686        creq_in = context.socket(zmq.REP)
687        creq_in.setsockopt(zmq.LINGER, 100)
688        creq_in.bind("ipc://" + self.cache_sock)
689
690        # the socket for incoming cache-updates from workers
691        cupd_in = context.socket(zmq.SUB)
692        cupd_in.setsockopt(zmq.SUBSCRIBE, b"")
693        cupd_in.setsockopt(zmq.LINGER, 100)
694        cupd_in.bind("ipc://" + self.update_sock)
695
696        # the socket for the timer-event
697        timer_in = context.socket(zmq.SUB)
698        timer_in.setsockopt(zmq.SUBSCRIBE, b"")
699        timer_in.setsockopt(zmq.LINGER, 100)
700        timer_in.connect("ipc://" + self.upd_t_sock)
701
702        poller = zmq.Poller()
703        poller.register(creq_in, zmq.POLLIN)
704        poller.register(cupd_in, zmq.POLLIN)
705        poller.register(timer_in, zmq.POLLIN)
706
707        # register a signal handler
708        signal.signal(signal.SIGINT, self.signal_handler)
709
710        # secure the sockets from the world
711        self.secure()
712
713        log.info("ConCache started")
714
715        while self.running:
716
717            # we check for new events with the poller
718            try:
719                socks = dict(poller.poll(1))
720            except KeyboardInterrupt:
721                self.stop()
722            except zmq.ZMQError as zmq_err:
723                log.error("ConCache ZeroMQ-Error occurred")
724                log.exception(zmq_err)
725                self.stop()
726
727            # check for next cache-request
728            if socks.get(creq_in) == zmq.POLLIN:
729                msg = salt.payload.loads(creq_in.recv())
730                log.debug("ConCache Received request: %s", msg)
731
732                # requests to the minion list are send as str's
733                if isinstance(msg, str):
734                    if msg == "minions":
735                        # Send reply back to client
736                        reply = salt.payload.dumps(self.minions)
737                        creq_in.send(reply)
738
739            # check for next cache-update from workers
740            if socks.get(cupd_in) == zmq.POLLIN:
741                new_c_data = salt.payload.loads(cupd_in.recv())
742                # tell the worker to exit
743                # cupd_in.send(serial.dumps('ACK'))
744
745                # check if the returned data is usable
746                if not isinstance(new_c_data, list):
747                    log.error("ConCache Worker returned unusable result")
748                    del new_c_data
749                    continue
750
751                # the cache will receive lists of minions
752                # 1. if the list only has 1 item, its from an MWorker, we append it
753                # 2. if the list contains another list, its from a CacheWorker and
754                #    the currently cached minions are replaced with that list
755                # 3. anything else is considered malformed
756
757                try:
758
759                    if not new_c_data:
760                        log.debug("ConCache Got empty update from worker")
761                        continue
762
763                    data = new_c_data[0]
764
765                    if isinstance(data, str):
766                        if data not in self.minions:
767                            log.debug(
768                                "ConCache Adding minion %s to cache", new_c_data[0]
769                            )
770                            self.minions.append(data)
771
772                    elif isinstance(data, list):
773                        log.debug("ConCache Replacing minion list from worker")
774                        self.minions = data
775
776                except IndexError:
777                    log.debug("ConCache Got malformed result dict from worker")
778                    del new_c_data
779
780                log.info("ConCache %s entries in cache", len(self.minions))
781
782            # check for next timer-event to start new jobs
783            if socks.get(timer_in) == zmq.POLLIN:
784                sec_event = salt.payload.loads(timer_in.recv())
785
786                # update the list every 30 seconds
787                if int(sec_event % 30) == 0:
788                    cw = CacheWorker(self.opts)
789                    cw.start()
790
791        self.stop()
792        creq_in.close()
793        cupd_in.close()
794        timer_in.close()
795        context.term()
796        log.debug("ConCache Shutting down")
797
798
799def ping_all_connected_minions(opts):
800    if opts["minion_data_cache"]:
801        tgt = list(salt.utils.minions.CkMinions(opts).connected_ids())
802        form = "list"
803    else:
804        tgt = "*"
805        form = "glob"
806    with salt.client.LocalClient() as client:
807        client.cmd_async(tgt, "test.ping", tgt_type=form)
808
809
810def get_master_key(key_user, opts, skip_perm_errors=False):
811    if key_user == "root":
812        if opts.get("user", "root") != "root":
813            key_user = opts.get("user", "root")
814    if key_user.startswith("sudo_"):
815        key_user = opts.get("user", "root")
816    if salt.utils.platform.is_windows():
817        # The username may contain '\' if it is in Windows
818        # 'DOMAIN\username' format. Fix this for the keyfile path.
819        key_user = key_user.replace("\\", "_")
820    keyfile = os.path.join(opts["cachedir"], ".{}_key".format(key_user))
821    # Make sure all key parent directories are accessible
822    salt.utils.verify.check_path_traversal(opts["cachedir"], key_user, skip_perm_errors)
823
824    try:
825        with salt.utils.files.fopen(keyfile, "r") as key:
826            return key.read()
827    except OSError:
828        # Fall back to eauth
829        return ""
830
831
832def get_values_of_matching_keys(pattern_dict, user_name):
833    """
834    Check a whitelist and/or blacklist to see if the value matches it.
835    """
836    ret = []
837    for expr in pattern_dict:
838        if salt.utils.stringutils.expr_match(user_name, expr):
839            ret.extend(pattern_dict[expr])
840    return ret
841
842
843# test code for the ConCache class
844if __name__ == "__main__":
845
846    opts = salt.config.master_config("/etc/salt/master")
847
848    conc = ConnectedCache(opts)
849    conc.start()
850