1"""
2Manage events
3
4Events are all fired off via a zeromq 'pub' socket, and listened to with local
5zeromq 'sub' sockets
6
7
8All of the formatting is self contained in the event module, so we should be
9able to modify the structure in the future since the same module used to read
10events is the same module used to fire off events.
11
12Old style event messages were comprised of two parts delimited at the 20 char
13point. The first 20 characters are used for the zeromq subscriber to match
14publications and 20 characters was chosen because it was at the time a few more
15characters than the length of a jid (Job ID).  Any tags of length less than 20
16characters were padded with "|" chars out to 20 characters.
17
18Although not explicit, the data for an event comprised a python dict that was
19serialized by msgpack.
20
21New style event messages support event tags longer than 20 characters while
22still being backwards compatible with old style tags.
23
24The longer tags better enable name spaced event tags which tend to be longer.
25Moreover, the constraint that the event data be a python dict is now an
26explicit constraint and fire-event will now raise a ValueError if not. Tags
27must be ascii safe strings, that is, have values less than 0x80
28
29Since the msgpack dict (map) indicators have values greater than or equal to
300x80 it can be unambiguously determined if the start of data is at char 21
31or not.
32
33In the new style, when the tag is longer than 20 characters, an end of tag
34string is appended to the tag given by the string constant TAGEND, that is, two
35line feeds '\n\n'.  When the tag is less than 20 characters then the tag is
36padded with pipes "|" out to 20 characters as before.  When the tag is exactly
3720 characters no padded is done.
38
39The get_event method intelligently figures out if the tag is longer than 20
40characters.
41
42
43The convention for namespacing is to use dot characters "." as the name space
44delimiter. The name space "salt" is reserved by SaltStack for internal events.
45
46For example:
47Namespaced tag
48    'salt.runner.manage.status.start'
49
50"""
51
52import atexit
53import contextlib
54import datetime
55import errno
56import fnmatch
57import hashlib
58import logging
59import os
60import time
61from collections.abc import MutableMapping
62
63import salt.config
64import salt.defaults.exitcodes
65import salt.ext.tornado.ioloop
66import salt.ext.tornado.iostream
67import salt.log.setup
68import salt.payload
69import salt.transport.client
70import salt.transport.ipc
71import salt.utils.asynchronous
72import salt.utils.cache
73import salt.utils.dicttrim
74import salt.utils.files
75import salt.utils.platform
76import salt.utils.process
77import salt.utils.stringutils
78import salt.utils.zeromq
79
80log = logging.getLogger(__name__)
81
82# The SUB_EVENT set is for functions that require events fired based on
83# component executions, like the state system
84SUB_EVENT = ("state.highstate", "state.sls")
85
86TAGEND = "\n\n"  # long tag delimiter
87TAGPARTER = "/"  # name spaced tag delimiter
88SALT = "salt"  # base prefix for all salt/ events
89# dict map of namespaced base tag prefixes for salt events
90TAGS = {
91    "auth": "auth",  # prefix for all salt/auth events
92    "job": "job",  # prefix for all salt/job events (minion jobs)
93    "key": "key",  # prefix for all salt/key events
94    "minion": "minion",  # prefix for all salt/minion events
95    # (minion sourced events)
96    "syndic": "syndic",  # prefix for all salt/syndic events
97    # (syndic minion sourced events)
98    "run": "run",  # prefix for all salt/run events (salt runners)
99    "wheel": "wheel",  # prefix for all salt/wheel events
100    "cloud": "cloud",  # prefix for all salt/cloud events
101    "fileserver": "fileserver",  # prefix for all salt/fileserver events
102    "queue": "queue",  # prefix for all salt/queue events
103}
104
105
106def get_event(
107    node,
108    sock_dir=None,
109    transport="zeromq",
110    opts=None,
111    listen=True,
112    io_loop=None,
113    keep_loop=False,
114    raise_errors=False,
115):
116    """
117    Return an event object suitable for the named transport
118
119    :param IOLoop io_loop: Pass in an io_loop if you want asynchronous
120                           operation for obtaining events. Eg use of
121                           set_event_handler() API. Otherwise, operation
122                           will be synchronous.
123    """
124    sock_dir = sock_dir or opts["sock_dir"]
125    # TODO: AIO core is separate from transport
126    if node == "master":
127        return MasterEvent(
128            sock_dir,
129            opts,
130            listen=listen,
131            io_loop=io_loop,
132            keep_loop=keep_loop,
133            raise_errors=raise_errors,
134        )
135    return SaltEvent(
136        node,
137        sock_dir,
138        opts,
139        listen=listen,
140        io_loop=io_loop,
141        keep_loop=keep_loop,
142        raise_errors=raise_errors,
143    )
144
145
146def get_master_event(opts, sock_dir, listen=True, io_loop=None, raise_errors=False):
147    """
148    Return an event object suitable for the named transport
149    """
150    # TODO: AIO core is separate from transport
151    if opts["transport"] in ("zeromq", "tcp", "detect"):
152        return MasterEvent(
153            sock_dir, opts, listen=listen, io_loop=io_loop, raise_errors=raise_errors
154        )
155
156
157def fire_args(opts, jid, tag_data, prefix=""):
158    """
159    Fire an event containing the arguments passed to an orchestration job
160    """
161    try:
162        tag_suffix = [jid, "args"]
163    except NameError:
164        pass
165    else:
166        tag = tagify(tag_suffix, prefix)
167        try:
168            _event = get_master_event(opts, opts["sock_dir"], listen=False)
169            _event.fire_event(tag_data, tag=tag)
170        except Exception as exc:  # pylint: disable=broad-except
171            # Don't let a problem here hold up the rest of the orchestration
172            log.warning(
173                "Failed to fire args event %s with data %s: %s", tag, tag_data, exc
174            )
175
176
177def tagify(suffix="", prefix="", base=SALT):
178    """
179    convenience function to build a namespaced event tag string
180    from joining with the TABPART character the base, prefix and suffix
181
182    If string prefix is a valid key in TAGS Then use the value of key prefix
183    Else use prefix string
184
185    If suffix is a list Then join all string elements of suffix individually
186    Else use string suffix
187
188    """
189    parts = [base, TAGS.get(prefix, prefix)]
190    if hasattr(suffix, "append"):  # list so extend parts
191        parts.extend(suffix)
192    else:  # string so append
193        parts.append(suffix)
194
195    for index, _ in enumerate(parts):
196        try:
197            parts[index] = salt.utils.stringutils.to_str(parts[index])
198        except TypeError:
199            parts[index] = str(parts[index])
200    return TAGPARTER.join([part for part in parts if part])
201
202
203class SaltEvent:
204    """
205    Warning! Use the get_event function or the code will not be
206    RAET compatible
207    The base class used to manage salt events
208    """
209
210    def __init__(
211        self,
212        node,
213        sock_dir=None,
214        opts=None,
215        listen=True,
216        io_loop=None,
217        keep_loop=False,
218        raise_errors=False,
219    ):
220        """
221        :param IOLoop io_loop: Pass in an io_loop if you want asynchronous
222                               operation for obtaining events. Eg use of
223                               set_event_handler() API. Otherwise, operation
224                               will be synchronous.
225        :param Bool keep_loop: Pass a boolean to determine if we want to keep
226                               the io loop or destroy it when the event handle
227                               is destroyed. This is useful when using event
228                               loops from within third party asynchronous code
229        """
230        self.keep_loop = keep_loop
231        if io_loop is not None:
232            self.io_loop = io_loop
233            self._run_io_loop_sync = False
234        else:
235            self.io_loop = salt.ext.tornado.ioloop.IOLoop()
236            self._run_io_loop_sync = True
237        self.cpub = False
238        self.cpush = False
239        self.subscriber = None
240        self.pusher = None
241        self.raise_errors = raise_errors
242
243        if opts is None:
244            opts = {}
245        if node == "master":
246            self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
247        else:
248            self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
249        self.opts.update(opts)
250
251        if sock_dir is None:
252            sock_dir = self.opts["sock_dir"]
253        else:
254            self.opts["sock_dir"] = sock_dir
255
256        if salt.utils.platform.is_windows() and "ipc_mode" not in opts:
257            self.opts["ipc_mode"] = "tcp"
258        self.puburi, self.pulluri = self.__load_uri(sock_dir, node)
259        self.pending_tags = []
260        self.pending_events = []
261        self.__load_cache_regex()
262        if listen and not self.cpub:
263            # Only connect to the publisher at initialization time if
264            # we know we want to listen. If we connect to the publisher
265            # and don't read out events from the buffer on an on-going basis,
266            # the buffer will grow resulting in big memory usage.
267            self.connect_pub()
268
269    @classmethod
270    def __load_cache_regex(cls):
271        """
272        Initialize the regular expression cache and put it in the
273        class namespace. The regex search strings will be prepend with '^'
274        """
275        # This is in the class namespace, to minimize cache memory
276        # usage and maximize cache hits
277        # The prepend='^' is to reduce differences in behavior between
278        # the default 'startswith' and the optional 'regex' match_type
279        cls.cache_regex = salt.utils.cache.CacheRegex(prepend="^")
280
281    def __load_uri(self, sock_dir, node):
282        """
283        Return the string URI for the location of the pull and pub sockets to
284        use for firing and listening to events
285        """
286        if node == "master":
287            if self.opts["ipc_mode"] == "tcp":
288                puburi = int(self.opts["tcp_master_pub_port"])
289                pulluri = int(self.opts["tcp_master_pull_port"])
290            else:
291                puburi = os.path.join(sock_dir, "master_event_pub.ipc")
292                pulluri = os.path.join(sock_dir, "master_event_pull.ipc")
293        else:
294            if self.opts["ipc_mode"] == "tcp":
295                puburi = int(self.opts["tcp_pub_port"])
296                pulluri = int(self.opts["tcp_pull_port"])
297            else:
298                hash_type = getattr(hashlib, self.opts["hash_type"])
299                # Only use the first 10 chars to keep longer hashes from exceeding the
300                # max socket path length.
301                minion_id = self.opts.get("hash_id", self.opts["id"])
302                id_hash = hash_type(
303                    salt.utils.stringutils.to_bytes(minion_id)
304                ).hexdigest()[:10]
305                puburi = os.path.join(
306                    sock_dir, "minion_event_{}_pub.ipc".format(id_hash)
307                )
308                pulluri = os.path.join(
309                    sock_dir, "minion_event_{}_pull.ipc".format(id_hash)
310                )
311        log.debug("%s PUB socket URI: %s", self.__class__.__name__, puburi)
312        log.debug("%s PULL socket URI: %s", self.__class__.__name__, pulluri)
313        return puburi, pulluri
314
315    def subscribe(self, tag=None, match_type=None):
316        """
317        Subscribe to events matching the passed tag.
318
319        If you do not subscribe to a tag, events will be discarded by calls to
320        get_event that request a different tag. In contexts where many different
321        jobs are outstanding it is important to subscribe to prevent one call
322        to get_event from discarding a response required by a subsequent call
323        to get_event.
324        """
325        if tag is None:
326            return
327        match_func = self._get_match_func(match_type)
328        self.pending_tags.append([tag, match_func])
329
330    def unsubscribe(self, tag, match_type=None):
331        """
332        Un-subscribe to events matching the passed tag.
333        """
334        if tag is None:
335            return
336        match_func = self._get_match_func(match_type)
337
338        try:
339            self.pending_tags.remove([tag, match_func])
340        except ValueError:
341            pass
342
343        old_events = self.pending_events
344        self.pending_events = []
345        for evt in old_events:
346            if any(
347                pmatch_func(evt["tag"], ptag) for ptag, pmatch_func in self.pending_tags
348            ):
349                self.pending_events.append(evt)
350
351    def connect_pub(self, timeout=None):
352        """
353        Establish the publish connection
354        """
355        if self.cpub:
356            return True
357
358        if self._run_io_loop_sync:
359            with salt.utils.asynchronous.current_ioloop(self.io_loop):
360                if self.subscriber is None:
361                    self.subscriber = salt.utils.asynchronous.SyncWrapper(
362                        salt.transport.ipc.IPCMessageSubscriber,
363                        args=(self.puburi,),
364                        kwargs={"io_loop": self.io_loop},
365                        loop_kwarg="io_loop",
366                    )
367                try:
368                    self.subscriber.connect(timeout=timeout)
369                    self.cpub = True
370                except salt.ext.tornado.iostream.StreamClosedError:
371                    log.error("Encountered StreamClosedException")
372                except OSError as exc:
373                    if exc.errno != errno.ENOENT:
374                        raise
375                    log.error("Error opening stream, file does not exist")
376                except Exception as exc:  # pylint: disable=broad-except
377                    log.info(
378                        "An exception occurred connecting publisher: %s",
379                        exc,
380                        exc_info_on_loglevel=logging.DEBUG,
381                    )
382        else:
383            if self.subscriber is None:
384                self.subscriber = salt.transport.ipc.IPCMessageSubscriber(
385                    self.puburi, io_loop=self.io_loop
386                )
387
388            # For the asynchronous case, the connect will be defered to when
389            # set_event_handler() is invoked.
390            self.cpub = True
391        return self.cpub
392
393    def close_pub(self):
394        """
395        Close the publish connection (if established)
396        """
397        if not self.cpub:
398            return
399
400        self.subscriber.close()
401        self.subscriber = None
402        self.pending_events = []
403        self.cpub = False
404
405    def connect_pull(self, timeout=1):
406        """
407        Establish a connection with the event pull socket
408        Default timeout is 1 s
409        """
410        if self.cpush:
411            return True
412
413        if self._run_io_loop_sync:
414            with salt.utils.asynchronous.current_ioloop(self.io_loop):
415                if self.pusher is None:
416                    self.pusher = salt.utils.asynchronous.SyncWrapper(
417                        salt.transport.ipc.IPCMessageClient,
418                        args=(self.pulluri,),
419                        kwargs={"io_loop": self.io_loop},
420                        loop_kwarg="io_loop",
421                    )
422                try:
423                    self.pusher.connect(timeout=timeout)
424                    self.cpush = True
425                except Exception as exc:  # pylint: disable=broad-except
426                    log.error(
427                        "Unable to connect pusher: %s",
428                        exc,
429                        exc_info_on_loglevel=logging.DEBUG,
430                    )
431        else:
432            if self.pusher is None:
433                self.pusher = salt.transport.ipc.IPCMessageClient(
434                    self.pulluri, io_loop=self.io_loop
435                )
436            # For the asynchronous case, the connect will be deferred to when
437            # fire_event() is invoked.
438            self.cpush = True
439        return self.cpush
440
441    def close_pull(self):
442        """
443        Close the pusher connection (if established)
444        """
445        if not self.cpush:
446            return
447
448        self.pusher.close()
449        self.pusher = None
450        self.cpush = False
451
452    @classmethod
453    def unpack(cls, raw):
454        mtag, sep, mdata = raw.partition(
455            salt.utils.stringutils.to_bytes(TAGEND)
456        )  # split tag from data
457        mtag = salt.utils.stringutils.to_str(mtag)
458        data = salt.payload.loads(mdata, encoding="utf-8")
459        return mtag, data
460
461    def _get_match_func(self, match_type=None):
462        if match_type is None:
463            match_type = self.opts["event_match_type"]
464        return getattr(self, "_match_tag_{}".format(match_type), None)
465
466    def _check_pending(self, tag, match_func=None):
467        """Check the pending_events list for events that match the tag
468
469        :param tag: The tag to search for
470        :type tag: str
471        :param tags_regex: List of re expressions to search for also
472        :type tags_regex: list[re.compile()]
473        :return:
474        """
475        if match_func is None:
476            match_func = self._get_match_func()
477        old_events = self.pending_events
478        self.pending_events = []
479        ret = None
480        for evt in old_events:
481            if match_func(evt["tag"], tag):
482                if ret is None:
483                    ret = evt
484                    log.trace("get_event() returning cached event = %s", ret)
485                else:
486                    self.pending_events.append(evt)
487            elif any(
488                pmatch_func(evt["tag"], ptag) for ptag, pmatch_func in self.pending_tags
489            ):
490                self.pending_events.append(evt)
491            else:
492                log.trace(
493                    "get_event() discarding cached event that no longer has any"
494                    " subscriptions = %s",
495                    evt,
496                )
497        return ret
498
499    @staticmethod
500    def _match_tag_startswith(event_tag, search_tag):
501        """
502        Check if the event_tag matches the search check.
503        Uses startswith to check.
504        Return True (matches) or False (no match)
505        """
506        return event_tag.startswith(search_tag)
507
508    @staticmethod
509    def _match_tag_endswith(event_tag, search_tag):
510        """
511        Check if the event_tag matches the search check.
512        Uses endswith to check.
513        Return True (matches) or False (no match)
514        """
515        return event_tag.endswith(search_tag)
516
517    @staticmethod
518    def _match_tag_find(event_tag, search_tag):
519        """
520        Check if the event_tag matches the search check.
521        Uses find to check.
522        Return True (matches) or False (no match)
523        """
524        return event_tag.find(search_tag) >= 0
525
526    def _match_tag_regex(self, event_tag, search_tag):
527        """
528        Check if the event_tag matches the search check.
529        Uses regular expression search to check.
530        Return True (matches) or False (no match)
531        """
532        return self.cache_regex.get(search_tag).search(event_tag) is not None
533
534    def _match_tag_fnmatch(self, event_tag, search_tag):
535        """
536        Check if the event_tag matches the search check.
537        Uses fnmatch to check.
538        Return True (matches) or False (no match)
539        """
540        return fnmatch.fnmatch(event_tag, search_tag)
541
542    def _subproxy_match(self, data):
543        if self.opts.get("subproxy", False):
544            return self.opts["id"] == data.get("proxy_target", None)
545        return True
546
547    def _get_event(self, wait, tag, match_func=None, no_block=False):
548        if match_func is None:
549            match_func = self._get_match_func()
550        start = time.time()
551        timeout_at = start + wait
552        run_once = False
553        if no_block is True:
554            wait = 0
555        elif wait == 0:
556            # If no_block is False and wait is 0, that
557            # means an infinite timeout.
558            wait = None
559        while (run_once is False and not wait) or time.time() <= timeout_at:
560            if no_block is True:
561                if run_once is True:
562                    break
563                # Trigger that at least a single iteration has gone through
564                run_once = True
565            try:
566                if not self.cpub and not self.connect_pub(timeout=wait):
567                    break
568                raw = self.subscriber.read(timeout=wait)
569                if raw is None:
570                    break
571                mtag, data = self.unpack(raw)
572                ret = {"data": data, "tag": mtag}
573            except KeyboardInterrupt:
574                return {"tag": "salt/event/exit", "data": {}}
575            except salt.ext.tornado.iostream.StreamClosedError:
576                if self.raise_errors:
577                    raise
578                else:
579                    return None
580            except RuntimeError:
581                return None
582
583            if not match_func(ret["tag"], tag) or not self._subproxy_match(ret["data"]):
584                # tag not match
585                if any(
586                    pmatch_func(ret["tag"], ptag)
587                    for ptag, pmatch_func in self.pending_tags
588                ):
589                    log.trace("get_event() caching unwanted event = %s", ret)
590                    self.pending_events.append(ret)
591                if wait:  # only update the wait timeout if we had one
592                    wait = timeout_at - time.time()
593                continue
594
595            log.trace("get_event() received = %s", ret)
596            return ret
597        log.trace("_get_event() waited %s seconds and received nothing", wait)
598        return None
599
600    def get_event(
601        self,
602        wait=5,
603        tag="",
604        full=False,
605        match_type=None,
606        no_block=False,
607        auto_reconnect=False,
608    ):
609        """
610        Get a single publication.
611        If no publication is available, then block for up to ``wait`` seconds.
612        Return publication if it is available or ``None`` if no publication is
613        available.
614
615        If wait is 0, then block forever.
616
617        tag
618            Only return events matching the given tag. If not specified, or set
619            to an empty string, all events are returned. It is recommended to
620            always be selective on what is to be returned in the event that
621            multiple requests are being multiplexed.
622
623        match_type
624            Set the function to match the search tag with event tags.
625             - 'startswith' : search for event tags that start with tag
626             - 'endswith' : search for event tags that end with tag
627             - 'find' : search for event tags that contain tag
628             - 'regex' : regex search '^' + tag event tags
629             - 'fnmatch' : fnmatch tag event tags matching
630            Default is opts['event_match_type'] or 'startswith'
631
632            .. versionadded:: 2015.8.0
633
634        no_block
635            Define if getting the event should be a blocking call or not.
636            Defaults to False to keep backwards compatibility.
637
638            .. versionadded:: 2015.8.0
639
640        Notes:
641
642        Searches cached publications first. If no cached publications are found
643        that match the given tag specification, new publications are received
644        and checked.
645
646        If a publication is received that does not match the tag specification,
647        it is DISCARDED unless it is subscribed to via subscribe() which will
648        cause it to be cached.
649
650        If a caller is not going to call get_event immediately after sending a
651        request, it MUST subscribe the result to ensure the response is not lost
652        should other regions of code call get_event for other purposes.
653        """
654        log.trace("Get event. tag: %s", tag)
655        assert self._run_io_loop_sync
656
657        match_func = self._get_match_func(match_type)
658
659        ret = self._check_pending(tag, match_func)
660        if ret is None:
661            with salt.utils.asynchronous.current_ioloop(self.io_loop):
662                if auto_reconnect:
663                    raise_errors = self.raise_errors
664                    self.raise_errors = True
665                    while True:
666                        try:
667                            ret = self._get_event(wait, tag, match_func, no_block)
668                            break
669                        except salt.ext.tornado.iostream.StreamClosedError:
670                            self.close_pub()
671                            self.connect_pub(timeout=wait)
672                            continue
673                    self.raise_errors = raise_errors
674                else:
675                    ret = self._get_event(wait, tag, match_func, no_block)
676
677        if ret is None or full:
678            return ret
679        else:
680            return ret["data"]
681
682    def get_event_noblock(self):
683        """
684        Get the raw event without blocking or any other niceties
685        """
686        assert self._run_io_loop_sync
687
688        if not self.cpub:
689            if not self.connect_pub():
690                return None
691        raw = self.subscriber._read(timeout=0)
692        if raw is None:
693            return None
694        mtag, data = self.unpack(raw)
695        return {"data": data, "tag": mtag}
696
697    def get_event_block(self):
698        """
699        Get the raw event in a blocking fashion. This is slower, but it decreases the
700        possibility of dropped events.
701        """
702        assert self._run_io_loop_sync
703
704        if not self.cpub:
705            if not self.connect_pub():
706                return None
707        raw = self.subscriber._read(timeout=None)
708        if raw is None:
709            return None
710        mtag, data = self.unpack(raw)
711        return {"data": data, "tag": mtag}
712
713    def iter_events(self, tag="", full=False, match_type=None, auto_reconnect=False):
714        """
715        Creates a generator that continuously listens for events
716        """
717        while True:
718            data = self.get_event(
719                tag=tag, full=full, match_type=match_type, auto_reconnect=auto_reconnect
720            )
721            if data is None:
722                continue
723            yield data
724
725    @salt.ext.tornado.gen.coroutine
726    def fire_event_async(self, data, tag, cb=None, timeout=1000):
727        """
728        Send a single event into the publisher with payload dict "data" and
729        event identifier "tag"
730
731        The default is 1000 ms
732        """
733        if self.opts.get("subproxy", False):
734            data["proxy_target"] = self.opts["id"]
735
736        if not str(tag):  # no empty tags allowed
737            raise ValueError("Empty tag.")
738
739        if not isinstance(data, MutableMapping):  # data must be dict
740            raise ValueError("Dict object expected, not '{}'.".format(data))
741
742        if not self.cpush:
743            if timeout is not None:
744                timeout_s = float(timeout) / 1000
745            else:
746                timeout_s = None
747            if not self.connect_pull(timeout=timeout_s):
748                return False
749
750        data["_stamp"] = datetime.datetime.utcnow().isoformat()
751
752        tagend = TAGEND
753        # Since the pack / unpack logic here is for local events only,
754        # it is safe to change the wire protocol. The mechanism
755        # that sends events from minion to master is outside this
756        # file.
757        dump_data = salt.payload.dumps(data, use_bin_type=True)
758
759        serialized_data = salt.utils.dicttrim.trim_dict(
760            dump_data,
761            self.opts["max_event_size"],
762            is_msgpacked=True,
763            use_bin_type=True,
764        )
765        log.debug("Sending event: tag = %s; data = %s", tag, data)
766        event = b"".join(
767            [
768                salt.utils.stringutils.to_bytes(tag),
769                salt.utils.stringutils.to_bytes(tagend),
770                serialized_data,
771            ]
772        )
773        msg = salt.utils.stringutils.to_bytes(event, "utf-8")
774        ret = yield self.pusher.send(msg)
775        if cb is not None:
776            cb(ret)
777
778    def fire_event(self, data, tag, timeout=1000):
779        """
780        Send a single event into the publisher with payload dict "data" and
781        event identifier "tag"
782
783        The default is 1000 ms
784        """
785        if self.opts.get("subproxy", False):
786            data["proxy_target"] = self.opts["id"]
787
788        if not str(tag):  # no empty tags allowed
789            raise ValueError("Empty tag.")
790
791        if not isinstance(data, MutableMapping):  # data must be dict
792            raise ValueError("Dict object expected, not '{}'.".format(data))
793
794        if not self.cpush:
795            if timeout is not None:
796                timeout_s = float(timeout) / 1000
797            else:
798                timeout_s = None
799            if not self.connect_pull(timeout=timeout_s):
800                return False
801
802        data["_stamp"] = datetime.datetime.utcnow().isoformat()
803
804        tagend = TAGEND
805        # Since the pack / unpack logic here is for local events only,
806        # it is safe to change the wire protocol. The mechanism
807        # that sends events from minion to master is outside this
808        # file.
809        dump_data = salt.payload.dumps(data, use_bin_type=True)
810
811        serialized_data = salt.utils.dicttrim.trim_dict(
812            dump_data,
813            self.opts["max_event_size"],
814            is_msgpacked=True,
815            use_bin_type=True,
816        )
817        log.debug("Sending event: tag = %s; data = %s", tag, data)
818        event = b"".join(
819            [
820                salt.utils.stringutils.to_bytes(tag),
821                salt.utils.stringutils.to_bytes(tagend),
822                serialized_data,
823            ]
824        )
825        msg = salt.utils.stringutils.to_bytes(event, "utf-8")
826        if self._run_io_loop_sync:
827            with salt.utils.asynchronous.current_ioloop(self.io_loop):
828                try:
829                    self.pusher.send(msg)
830                except Exception as exc:  # pylint: disable=broad-except
831                    log.debug(
832                        "Publisher send failed with exception: %s",
833                        exc,
834                        exc_info_on_loglevel=logging.DEBUG,
835                    )
836                    raise
837        else:
838            self.io_loop.spawn_callback(self.pusher.send, msg)
839        return True
840
841    def fire_master(self, data, tag, timeout=1000):
842        """'
843        Send a single event to the master, with the payload "data" and the
844        event identifier "tag".
845
846        Default timeout is 1000ms
847        """
848        msg = {"tag": tag, "data": data, "events": None, "pretag": None}
849        return self.fire_event(msg, "fire_master", timeout)
850
851    def destroy(self):
852        if self.subscriber is not None:
853            self.close_pub()
854        if self.pusher is not None:
855            self.close_pull()
856        if self._run_io_loop_sync and not self.keep_loop:
857            self.io_loop.close()
858
859    def _fire_ret_load_specific_fun(self, load, fun_index=0):
860        """
861        Helper function for fire_ret_load
862        """
863        if isinstance(load["fun"], list):
864            # Multi-function job
865            fun = load["fun"][fun_index]
866            # 'retcode' was already validated to exist and be non-zero
867            # for the given function in the caller.
868            if isinstance(load["retcode"], list):
869                # Multi-function ordered
870                ret = load.get("return")
871                if isinstance(ret, list) and len(ret) > fun_index:
872                    ret = ret[fun_index]
873                else:
874                    ret = {}
875                retcode = load["retcode"][fun_index]
876            else:
877                ret = load.get("return", {})
878                ret = ret.get(fun, {})
879                retcode = load["retcode"][fun]
880        else:
881            # Single-function job
882            fun = load["fun"]
883            ret = load.get("return", {})
884            retcode = load["retcode"]
885
886        try:
887            for tag, data in ret.items():
888                data["retcode"] = retcode
889                tags = tag.split("_|-")
890                if data.get("result") is False:
891                    self.fire_event(
892                        data, "{}.{}".format(tags[0], tags[-1])
893                    )  # old dup event
894                    data["jid"] = load["jid"]
895                    data["id"] = load["id"]
896                    data["success"] = False
897                    data["return"] = "Error: {}.{}".format(tags[0], tags[-1])
898                    data["fun"] = fun
899                    data["user"] = load["user"]
900                    self.fire_event(
901                        data,
902                        tagify([load["jid"], "sub", load["id"], "error", fun], "job"),
903                    )
904        except Exception as exc:  # pylint: disable=broad-except
905            log.error(
906                "Event iteration failed with exception: %s",
907                exc,
908                exc_info_on_loglevel=logging.DEBUG,
909            )
910
911    def fire_ret_load(self, load):
912        """
913        Fire events based on information in the return load
914        """
915        if load.get("retcode") and load.get("fun"):
916            if isinstance(load["fun"], list):
917                # Multi-function job
918                if isinstance(load["retcode"], list):
919                    multifunc_ordered = True
920                else:
921                    multifunc_ordered = False
922
923                for fun_index in range(0, len(load["fun"])):
924                    fun = load["fun"][fun_index]
925                    if multifunc_ordered:
926                        if (
927                            len(load["retcode"]) > fun_index
928                            and load["retcode"][fun_index]
929                            and fun in SUB_EVENT
930                        ):
931                            # Minion fired a bad retcode, fire an event
932                            self._fire_ret_load_specific_fun(load, fun_index)
933                    else:
934                        if load["retcode"].get(fun, 0) and fun in SUB_EVENT:
935                            # Minion fired a bad retcode, fire an event
936                            self._fire_ret_load_specific_fun(load, fun_index)
937            else:
938                # Single-function job
939                if load["fun"] in SUB_EVENT:
940                    # Minion fired a bad retcode, fire an event
941                    self._fire_ret_load_specific_fun(load)
942
943    def set_event_handler(self, event_handler):
944        """
945        Invoke the event_handler callback each time an event arrives.
946        """
947        assert not self._run_io_loop_sync
948
949        if not self.cpub:
950            self.connect_pub()
951        # This will handle reconnects
952        return self.subscriber.read_async(event_handler)
953
954    # pylint: disable=W1701
955    def __del__(self):
956        # skip exceptions in destroy-- since destroy() doesn't cover interpreter
957        # shutdown-- where globals start going missing
958        try:
959            self.destroy()
960        except Exception:  # pylint: disable=broad-except
961            pass
962
963    # pylint: enable=W1701
964
965    def __enter__(self):
966        return self
967
968    def __exit__(self, *args):
969        self.destroy()
970
971
972class MasterEvent(SaltEvent):
973    """
974    Warning! Use the get_event function or the code will not be
975    RAET compatible
976    Create a master event management object
977    """
978
979    def __init__(
980        self,
981        sock_dir,
982        opts=None,
983        listen=True,
984        io_loop=None,
985        keep_loop=False,
986        raise_errors=False,
987    ):
988        super().__init__(
989            "master",
990            sock_dir,
991            opts,
992            listen=listen,
993            io_loop=io_loop,
994            keep_loop=keep_loop,
995            raise_errors=raise_errors,
996        )
997
998
999class LocalClientEvent(MasterEvent):
1000    """
1001    Warning! Use the get_event function or the code will not be
1002    RAET compatible
1003    This class is just used to differentiate who is handling the events,
1004    specially on logs, but it's the same as MasterEvent.
1005    """
1006
1007
1008class NamespacedEvent:
1009    """
1010    A wrapper for sending events within a specific base namespace
1011    """
1012
1013    def __init__(self, event, base, print_func=None):
1014        self.event = event
1015        self.base = base
1016        self.print_func = print_func
1017
1018    def fire_event(self, data, tag):
1019        self.event.fire_event(data, tagify(tag, base=self.base))
1020        if self.print_func is not None:
1021            self.print_func(tag, data)
1022
1023    def destroy(self):
1024        self.event.destroy()
1025
1026    def __enter__(self):
1027        return self
1028
1029    def __exit__(self, *args):
1030        self.destroy()
1031
1032
1033class MinionEvent(SaltEvent):
1034    """
1035    Warning! Use the get_event function or the code will not be
1036    RAET compatible
1037    Create a master event management object
1038    """
1039
1040    def __init__(self, opts, listen=True, io_loop=None, raise_errors=False):
1041        super().__init__(
1042            "minion",
1043            sock_dir=opts.get("sock_dir"),
1044            opts=opts,
1045            listen=listen,
1046            io_loop=io_loop,
1047            raise_errors=raise_errors,
1048        )
1049
1050
1051class AsyncEventPublisher:
1052    """
1053    An event publisher class intended to run in an ioloop (within a single process)
1054
1055    TODO: remove references to "minion_event" whenever we need to use this for other things
1056    """
1057
1058    def __init__(self, opts, io_loop=None):
1059        self.opts = salt.config.DEFAULT_MINION_OPTS.copy()
1060        default_minion_sock_dir = self.opts["sock_dir"]
1061        self.opts.update(opts)
1062
1063        self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current()
1064        self._closing = False
1065        self.publisher = None
1066        self.puller = None
1067
1068        hash_type = getattr(hashlib, self.opts["hash_type"])
1069        # Only use the first 10 chars to keep longer hashes from exceeding the
1070        # max socket path length.
1071        id_hash = hash_type(
1072            salt.utils.stringutils.to_bytes(self.opts["id"])
1073        ).hexdigest()[:10]
1074        epub_sock_path = os.path.join(
1075            self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash)
1076        )
1077        if os.path.exists(epub_sock_path):
1078            os.unlink(epub_sock_path)
1079        epull_sock_path = os.path.join(
1080            self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash)
1081        )
1082        if os.path.exists(epull_sock_path):
1083            os.unlink(epull_sock_path)
1084
1085        if self.opts["ipc_mode"] == "tcp":
1086            epub_uri = int(self.opts["tcp_pub_port"])
1087            epull_uri = int(self.opts["tcp_pull_port"])
1088        else:
1089            epub_uri = epub_sock_path
1090            epull_uri = epull_sock_path
1091
1092        log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri)
1093        log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri)
1094
1095        minion_sock_dir = self.opts["sock_dir"]
1096
1097        if not os.path.isdir(minion_sock_dir):
1098            # Let's try to create the directory defined on the configuration
1099            # file
1100            try:
1101                os.makedirs(minion_sock_dir, 0o755)
1102            except OSError as exc:
1103                log.error("Could not create SOCK_DIR: %s", exc)
1104                # Let's not fail yet and try using the default path
1105                if minion_sock_dir == default_minion_sock_dir:
1106                    # We're already trying the default system path, stop now!
1107                    raise
1108
1109                if not os.path.isdir(default_minion_sock_dir):
1110                    try:
1111                        os.makedirs(default_minion_sock_dir, 0o755)
1112                    except OSError as exc:
1113                        log.error("Could not create SOCK_DIR: %s", exc)
1114                        # Let's stop at this stage
1115                        raise
1116
1117        self.publisher = salt.transport.ipc.IPCMessagePublisher(
1118            self.opts, epub_uri, io_loop=self.io_loop
1119        )
1120
1121        self.puller = salt.transport.ipc.IPCMessageServer(
1122            epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish
1123        )
1124
1125        log.info("Starting pull socket on %s", epull_uri)
1126        with salt.utils.files.set_umask(0o177):
1127            self.publisher.start()
1128            self.puller.start()
1129
1130    def handle_publish(self, package, _):
1131        """
1132        Get something from epull, publish it out epub, and return the package (or None)
1133        """
1134        try:
1135            self.publisher.publish(package)
1136            return package
1137        # Add an extra fallback in case a forked process leeks through
1138        except Exception:  # pylint: disable=broad-except
1139            log.critical("Unexpected error while polling minion events", exc_info=True)
1140            return None
1141
1142    def close(self):
1143        if self._closing:
1144            return
1145        self._closing = True
1146        if self.publisher is not None:
1147            self.publisher.close()
1148        if self.puller is not None:
1149            self.puller.close()
1150
1151
1152class EventPublisher(salt.utils.process.SignalHandlingProcess):
1153    """
1154    The interface that takes master events and republishes them out to anyone
1155    who wants to listen
1156    """
1157
1158    def __init__(self, opts, **kwargs):
1159        super().__init__(**kwargs)
1160        self.opts = salt.config.DEFAULT_MASTER_OPTS.copy()
1161        self.opts.update(opts)
1162        self._closing = False
1163        self.io_loop = None
1164        self.puller = None
1165        self.publisher = None
1166
1167    def run(self):
1168        """
1169        Bind the pub and pull sockets for events
1170        """
1171        salt.utils.process.appendproctitle(self.__class__.__name__)
1172
1173        if (
1174            self.opts["event_publisher_niceness"]
1175            and not salt.utils.platform.is_windows()
1176        ):
1177            log.info(
1178                "setting EventPublisher niceness to %i",
1179                self.opts["event_publisher_niceness"],
1180            )
1181            os.nice(self.opts["event_publisher_niceness"])
1182
1183        self.io_loop = salt.ext.tornado.ioloop.IOLoop()
1184        with salt.utils.asynchronous.current_ioloop(self.io_loop):
1185            if self.opts["ipc_mode"] == "tcp":
1186                epub_uri = int(self.opts["tcp_master_pub_port"])
1187                epull_uri = int(self.opts["tcp_master_pull_port"])
1188            else:
1189                epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc")
1190                epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc")
1191
1192            self.publisher = salt.transport.ipc.IPCMessagePublisher(
1193                self.opts, epub_uri, io_loop=self.io_loop
1194            )
1195
1196            self.puller = salt.transport.ipc.IPCMessageServer(
1197                epull_uri,
1198                io_loop=self.io_loop,
1199                payload_handler=self.handle_publish,
1200            )
1201
1202            # Start the master event publisher
1203            with salt.utils.files.set_umask(0o177):
1204                self.publisher.start()
1205                self.puller.start()
1206                if self.opts["ipc_mode"] != "tcp" and (
1207                    self.opts["publisher_acl"] or self.opts["external_auth"]
1208                ):
1209                    os.chmod(  # nosec
1210                        os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"),
1211                        0o666,
1212                    )
1213
1214            atexit.register(self.close)
1215            with contextlib.suppress(KeyboardInterrupt):
1216                try:
1217                    self.io_loop.start()
1218                finally:
1219                    # Make sure the IO loop and respective sockets are closed and destroyed
1220                    self.close()
1221
1222    def handle_publish(self, package, _):
1223        """
1224        Get something from epull, publish it out epub, and return the package (or None)
1225        """
1226        try:
1227            self.publisher.publish(package)
1228            return package
1229        # Add an extra fallback in case a forked process leeks through
1230        except Exception:  # pylint: disable=broad-except
1231            log.critical("Unexpected error while polling master events", exc_info=True)
1232            return None
1233
1234    def close(self):
1235        if self._closing:
1236            return
1237        self._closing = True
1238        atexit.unregister(self.close)
1239        if self.publisher is not None:
1240            self.publisher.close()
1241            self.publisher = None
1242        if self.puller is not None:
1243            self.puller.close()
1244            self.puller = None
1245        if self.io_loop is not None:
1246            self.io_loop.close()
1247            self.io_loop = None
1248
1249    def _handle_signals(self, signum, sigframe):
1250        self.close()
1251        super()._handle_signals(signum, sigframe)
1252
1253
1254class EventReturn(salt.utils.process.SignalHandlingProcess):
1255    """
1256    A dedicated process which listens to the master event bus and queues
1257    and forwards events to the specified returner.
1258    """
1259
1260    def __init__(self, opts, **kwargs):
1261        """
1262        Initialize the EventReturn system
1263
1264        Return an EventReturn instance
1265        """
1266        # This is required because the process is forked and the module no
1267        # longer exists in the global namespace.
1268        import salt.minion
1269
1270        super().__init__(**kwargs)
1271
1272        self.opts = opts
1273        self.event_return_queue = self.opts["event_return_queue"]
1274        self.event_return_queue_max_seconds = self.opts.get(
1275            "event_return_queue_max_seconds", 0
1276        )
1277        local_minion_opts = self.opts.copy()
1278        local_minion_opts["file_client"] = "local"
1279        self.minion = salt.minion.MasterMinion(local_minion_opts)
1280        self.event_queue = []
1281        self.stop = False
1282
1283    def _handle_signals(self, signum, sigframe):
1284        # Flush and terminate
1285        if self.event_queue:
1286            self.flush_events()
1287        self.stop = True
1288        super()._handle_signals(signum, sigframe)
1289
1290    def flush_events(self):
1291        if isinstance(self.opts["event_return"], list):
1292            # Multiple event returners
1293            for r in self.opts["event_return"]:
1294                log.debug("Calling event returner %s, one of many.", r)
1295                event_return = "{}.event_return".format(r)
1296                self._flush_event_single(event_return)
1297        else:
1298            # Only a single event returner
1299            log.debug(
1300                "Calling event returner %s, only one configured.",
1301                self.opts["event_return"],
1302            )
1303            event_return = "{}.event_return".format(self.opts["event_return"])
1304            self._flush_event_single(event_return)
1305        del self.event_queue[:]
1306
1307    def _flush_event_single(self, event_return):
1308        if event_return in self.minion.returners:
1309            try:
1310                self.minion.returners[event_return](self.event_queue)
1311            except Exception as exc:  # pylint: disable=broad-except
1312                log.error(
1313                    "Could not store events - returner '%s' raised exception: %s",
1314                    event_return,
1315                    exc,
1316                )
1317                # don't waste processing power unnecessarily on converting a
1318                # potentially huge dataset to a string
1319                if log.level <= logging.DEBUG:
1320                    log.debug(
1321                        "Event data that caused an exception: %s", self.event_queue
1322                    )
1323        else:
1324            log.error(
1325                "Could not store return for event(s) - returner '%s' not found.",
1326                event_return,
1327            )
1328
1329    def run(self):
1330        """
1331        Spin up the multiprocess event returner
1332        """
1333        salt.utils.process.appendproctitle(self.__class__.__name__)
1334
1335        if self.opts["event_return_niceness"] and not salt.utils.platform.is_windows():
1336            log.info(
1337                "setting EventReturn niceness to %i", self.opts["event_return_niceness"]
1338            )
1339            os.nice(self.opts["event_return_niceness"])
1340
1341        self.event = get_event("master", opts=self.opts, listen=True)
1342        events = self.event.iter_events(full=True)
1343        self.event.fire_event({}, "salt/event_listen/start")
1344        try:
1345            # events below is a generator, we will iterate until we get the salt/event/exit tag
1346            oldestevent = None
1347            for event in events:
1348
1349                if event["tag"] == "salt/event/exit":
1350                    # We're done eventing
1351                    self.stop = True
1352                if self._filter(event):
1353                    # This event passed the filter, add it to the queue
1354                    self.event_queue.append(event)
1355                too_long_in_queue = False
1356
1357                # if max_seconds is >0, then we want to make sure we flush the queue
1358                # every event_return_queue_max_seconds seconds,  If it's 0, don't
1359                # apply any of this logic
1360                if self.event_return_queue_max_seconds > 0:
1361                    rightnow = datetime.datetime.now()
1362                    if not oldestevent:
1363                        oldestevent = rightnow
1364                    age_in_seconds = (rightnow - oldestevent).seconds
1365                    if age_in_seconds > 0:
1366                        log.debug(
1367                            "Oldest event in queue is %s seconds old.", age_in_seconds
1368                        )
1369                    if age_in_seconds >= self.event_return_queue_max_seconds:
1370                        too_long_in_queue = True
1371                        oldestevent = None
1372                    else:
1373                        too_long_in_queue = False
1374
1375                    if too_long_in_queue:
1376                        log.debug(
1377                            "Oldest event has been in queue too long, will flush queue"
1378                        )
1379
1380                # If we are over the max queue size or the oldest item in the queue has been there too long
1381                # then flush the queue
1382                if (
1383                    len(self.event_queue) >= self.event_return_queue
1384                    or too_long_in_queue
1385                ):
1386                    log.debug("Flushing %s events.", len(self.event_queue))
1387                    self.flush_events()
1388                    oldestevent = None
1389                if self.stop:
1390                    # We saw the salt/event/exit tag, we can stop eventing
1391                    break
1392        finally:  # flush all we have at this moment
1393            # No matter what, make sure we flush the queue even when we are exiting
1394            # and there will be no more events.
1395            if self.event_queue:
1396                log.debug("Flushing %s events.", len(self.event_queue))
1397
1398                self.flush_events()
1399
1400    def _filter(self, event):
1401        """
1402        Take an event and run it through configured filters.
1403
1404        Returns True if event should be stored, else False
1405        """
1406        tag = event["tag"]
1407        if self.opts["event_return_whitelist"]:
1408            ret = False
1409        else:
1410            ret = True
1411        for whitelist_match in self.opts["event_return_whitelist"]:
1412            if fnmatch.fnmatch(tag, whitelist_match):
1413                ret = True
1414                break
1415        for blacklist_match in self.opts["event_return_blacklist"]:
1416            if fnmatch.fnmatch(tag, blacklist_match):
1417                ret = False
1418                break
1419        return ret
1420
1421
1422class StateFire:
1423    """
1424    Evaluate the data from a state run and fire events on the master and minion
1425    for each returned chunk that is not "green"
1426    This object is made to only run on a minion
1427    """
1428
1429    def __init__(self, opts, auth=None):
1430        self.opts = opts
1431        if not auth:
1432            self.auth = salt.crypt.SAuth(self.opts)
1433        else:
1434            self.auth = auth
1435
1436    def fire_master(self, data, tag, preload=None):
1437        """
1438        Fire an event off on the master server
1439
1440        CLI Example:
1441
1442        .. code-block:: bash
1443
1444            salt '*' event.fire_master 'stuff to be in the event' 'tag'
1445        """
1446        load = {}
1447        if preload:
1448            load.update(preload)
1449
1450        load.update(
1451            {
1452                "id": self.opts["id"],
1453                "tag": tag,
1454                "data": data,
1455                "cmd": "_minion_event",
1456                "tok": self.auth.gen_token(b"salt"),
1457            }
1458        )
1459
1460        with salt.transport.client.ReqChannel.factory(self.opts) as channel:
1461            try:
1462                channel.send(load)
1463            except Exception as exc:  # pylint: disable=broad-except
1464                log.info(
1465                    "An exception occurred on fire_master: %s",
1466                    exc,
1467                    exc_info_on_loglevel=logging.DEBUG,
1468                )
1469        return True
1470
1471    def fire_running(self, running):
1472        """
1473        Pass in a state "running" dict, this is the return dict from a state
1474        call. The dict will be processed and fire events.
1475
1476        By default yellows and reds fire events on the master and minion, but
1477        this can be configured.
1478        """
1479        load = {"id": self.opts["id"], "events": [], "cmd": "_minion_event"}
1480        for stag in sorted(running, key=lambda k: running[k].get("__run_num__", 0)):
1481            if running[stag]["result"] and not running[stag]["changes"]:
1482                continue
1483            tag = "state_{}_{}".format(
1484                str(running[stag]["result"]),
1485                "True" if running[stag]["changes"] else "False",
1486            )
1487            load["events"].append({"tag": tag, "data": running[stag]})
1488        with salt.transport.client.ReqChannel.factory(self.opts) as channel:
1489            try:
1490                channel.send(load)
1491            except Exception as exc:  # pylint: disable=broad-except
1492                log.info(
1493                    "An exception occurred on fire_master: %s",
1494                    exc,
1495                    exc_info_on_loglevel=logging.DEBUG,
1496                )
1497        return True
1498