1""" 2Manage events 3 4Events are all fired off via a zeromq 'pub' socket, and listened to with local 5zeromq 'sub' sockets 6 7 8All of the formatting is self contained in the event module, so we should be 9able to modify the structure in the future since the same module used to read 10events is the same module used to fire off events. 11 12Old style event messages were comprised of two parts delimited at the 20 char 13point. The first 20 characters are used for the zeromq subscriber to match 14publications and 20 characters was chosen because it was at the time a few more 15characters than the length of a jid (Job ID). Any tags of length less than 20 16characters were padded with "|" chars out to 20 characters. 17 18Although not explicit, the data for an event comprised a python dict that was 19serialized by msgpack. 20 21New style event messages support event tags longer than 20 characters while 22still being backwards compatible with old style tags. 23 24The longer tags better enable name spaced event tags which tend to be longer. 25Moreover, the constraint that the event data be a python dict is now an 26explicit constraint and fire-event will now raise a ValueError if not. Tags 27must be ascii safe strings, that is, have values less than 0x80 28 29Since the msgpack dict (map) indicators have values greater than or equal to 300x80 it can be unambiguously determined if the start of data is at char 21 31or not. 32 33In the new style, when the tag is longer than 20 characters, an end of tag 34string is appended to the tag given by the string constant TAGEND, that is, two 35line feeds '\n\n'. When the tag is less than 20 characters then the tag is 36padded with pipes "|" out to 20 characters as before. When the tag is exactly 3720 characters no padded is done. 38 39The get_event method intelligently figures out if the tag is longer than 20 40characters. 41 42 43The convention for namespacing is to use dot characters "." as the name space 44delimiter. The name space "salt" is reserved by SaltStack for internal events. 45 46For example: 47Namespaced tag 48 'salt.runner.manage.status.start' 49 50""" 51 52import atexit 53import contextlib 54import datetime 55import errno 56import fnmatch 57import hashlib 58import logging 59import os 60import time 61from collections.abc import MutableMapping 62 63import salt.config 64import salt.defaults.exitcodes 65import salt.ext.tornado.ioloop 66import salt.ext.tornado.iostream 67import salt.log.setup 68import salt.payload 69import salt.transport.client 70import salt.transport.ipc 71import salt.utils.asynchronous 72import salt.utils.cache 73import salt.utils.dicttrim 74import salt.utils.files 75import salt.utils.platform 76import salt.utils.process 77import salt.utils.stringutils 78import salt.utils.zeromq 79 80log = logging.getLogger(__name__) 81 82# The SUB_EVENT set is for functions that require events fired based on 83# component executions, like the state system 84SUB_EVENT = ("state.highstate", "state.sls") 85 86TAGEND = "\n\n" # long tag delimiter 87TAGPARTER = "/" # name spaced tag delimiter 88SALT = "salt" # base prefix for all salt/ events 89# dict map of namespaced base tag prefixes for salt events 90TAGS = { 91 "auth": "auth", # prefix for all salt/auth events 92 "job": "job", # prefix for all salt/job events (minion jobs) 93 "key": "key", # prefix for all salt/key events 94 "minion": "minion", # prefix for all salt/minion events 95 # (minion sourced events) 96 "syndic": "syndic", # prefix for all salt/syndic events 97 # (syndic minion sourced events) 98 "run": "run", # prefix for all salt/run events (salt runners) 99 "wheel": "wheel", # prefix for all salt/wheel events 100 "cloud": "cloud", # prefix for all salt/cloud events 101 "fileserver": "fileserver", # prefix for all salt/fileserver events 102 "queue": "queue", # prefix for all salt/queue events 103} 104 105 106def get_event( 107 node, 108 sock_dir=None, 109 transport="zeromq", 110 opts=None, 111 listen=True, 112 io_loop=None, 113 keep_loop=False, 114 raise_errors=False, 115): 116 """ 117 Return an event object suitable for the named transport 118 119 :param IOLoop io_loop: Pass in an io_loop if you want asynchronous 120 operation for obtaining events. Eg use of 121 set_event_handler() API. Otherwise, operation 122 will be synchronous. 123 """ 124 sock_dir = sock_dir or opts["sock_dir"] 125 # TODO: AIO core is separate from transport 126 if node == "master": 127 return MasterEvent( 128 sock_dir, 129 opts, 130 listen=listen, 131 io_loop=io_loop, 132 keep_loop=keep_loop, 133 raise_errors=raise_errors, 134 ) 135 return SaltEvent( 136 node, 137 sock_dir, 138 opts, 139 listen=listen, 140 io_loop=io_loop, 141 keep_loop=keep_loop, 142 raise_errors=raise_errors, 143 ) 144 145 146def get_master_event(opts, sock_dir, listen=True, io_loop=None, raise_errors=False): 147 """ 148 Return an event object suitable for the named transport 149 """ 150 # TODO: AIO core is separate from transport 151 if opts["transport"] in ("zeromq", "tcp", "detect"): 152 return MasterEvent( 153 sock_dir, opts, listen=listen, io_loop=io_loop, raise_errors=raise_errors 154 ) 155 156 157def fire_args(opts, jid, tag_data, prefix=""): 158 """ 159 Fire an event containing the arguments passed to an orchestration job 160 """ 161 try: 162 tag_suffix = [jid, "args"] 163 except NameError: 164 pass 165 else: 166 tag = tagify(tag_suffix, prefix) 167 try: 168 _event = get_master_event(opts, opts["sock_dir"], listen=False) 169 _event.fire_event(tag_data, tag=tag) 170 except Exception as exc: # pylint: disable=broad-except 171 # Don't let a problem here hold up the rest of the orchestration 172 log.warning( 173 "Failed to fire args event %s with data %s: %s", tag, tag_data, exc 174 ) 175 176 177def tagify(suffix="", prefix="", base=SALT): 178 """ 179 convenience function to build a namespaced event tag string 180 from joining with the TABPART character the base, prefix and suffix 181 182 If string prefix is a valid key in TAGS Then use the value of key prefix 183 Else use prefix string 184 185 If suffix is a list Then join all string elements of suffix individually 186 Else use string suffix 187 188 """ 189 parts = [base, TAGS.get(prefix, prefix)] 190 if hasattr(suffix, "append"): # list so extend parts 191 parts.extend(suffix) 192 else: # string so append 193 parts.append(suffix) 194 195 for index, _ in enumerate(parts): 196 try: 197 parts[index] = salt.utils.stringutils.to_str(parts[index]) 198 except TypeError: 199 parts[index] = str(parts[index]) 200 return TAGPARTER.join([part for part in parts if part]) 201 202 203class SaltEvent: 204 """ 205 Warning! Use the get_event function or the code will not be 206 RAET compatible 207 The base class used to manage salt events 208 """ 209 210 def __init__( 211 self, 212 node, 213 sock_dir=None, 214 opts=None, 215 listen=True, 216 io_loop=None, 217 keep_loop=False, 218 raise_errors=False, 219 ): 220 """ 221 :param IOLoop io_loop: Pass in an io_loop if you want asynchronous 222 operation for obtaining events. Eg use of 223 set_event_handler() API. Otherwise, operation 224 will be synchronous. 225 :param Bool keep_loop: Pass a boolean to determine if we want to keep 226 the io loop or destroy it when the event handle 227 is destroyed. This is useful when using event 228 loops from within third party asynchronous code 229 """ 230 self.keep_loop = keep_loop 231 if io_loop is not None: 232 self.io_loop = io_loop 233 self._run_io_loop_sync = False 234 else: 235 self.io_loop = salt.ext.tornado.ioloop.IOLoop() 236 self._run_io_loop_sync = True 237 self.cpub = False 238 self.cpush = False 239 self.subscriber = None 240 self.pusher = None 241 self.raise_errors = raise_errors 242 243 if opts is None: 244 opts = {} 245 if node == "master": 246 self.opts = salt.config.DEFAULT_MASTER_OPTS.copy() 247 else: 248 self.opts = salt.config.DEFAULT_MINION_OPTS.copy() 249 self.opts.update(opts) 250 251 if sock_dir is None: 252 sock_dir = self.opts["sock_dir"] 253 else: 254 self.opts["sock_dir"] = sock_dir 255 256 if salt.utils.platform.is_windows() and "ipc_mode" not in opts: 257 self.opts["ipc_mode"] = "tcp" 258 self.puburi, self.pulluri = self.__load_uri(sock_dir, node) 259 self.pending_tags = [] 260 self.pending_events = [] 261 self.__load_cache_regex() 262 if listen and not self.cpub: 263 # Only connect to the publisher at initialization time if 264 # we know we want to listen. If we connect to the publisher 265 # and don't read out events from the buffer on an on-going basis, 266 # the buffer will grow resulting in big memory usage. 267 self.connect_pub() 268 269 @classmethod 270 def __load_cache_regex(cls): 271 """ 272 Initialize the regular expression cache and put it in the 273 class namespace. The regex search strings will be prepend with '^' 274 """ 275 # This is in the class namespace, to minimize cache memory 276 # usage and maximize cache hits 277 # The prepend='^' is to reduce differences in behavior between 278 # the default 'startswith' and the optional 'regex' match_type 279 cls.cache_regex = salt.utils.cache.CacheRegex(prepend="^") 280 281 def __load_uri(self, sock_dir, node): 282 """ 283 Return the string URI for the location of the pull and pub sockets to 284 use for firing and listening to events 285 """ 286 if node == "master": 287 if self.opts["ipc_mode"] == "tcp": 288 puburi = int(self.opts["tcp_master_pub_port"]) 289 pulluri = int(self.opts["tcp_master_pull_port"]) 290 else: 291 puburi = os.path.join(sock_dir, "master_event_pub.ipc") 292 pulluri = os.path.join(sock_dir, "master_event_pull.ipc") 293 else: 294 if self.opts["ipc_mode"] == "tcp": 295 puburi = int(self.opts["tcp_pub_port"]) 296 pulluri = int(self.opts["tcp_pull_port"]) 297 else: 298 hash_type = getattr(hashlib, self.opts["hash_type"]) 299 # Only use the first 10 chars to keep longer hashes from exceeding the 300 # max socket path length. 301 minion_id = self.opts.get("hash_id", self.opts["id"]) 302 id_hash = hash_type( 303 salt.utils.stringutils.to_bytes(minion_id) 304 ).hexdigest()[:10] 305 puburi = os.path.join( 306 sock_dir, "minion_event_{}_pub.ipc".format(id_hash) 307 ) 308 pulluri = os.path.join( 309 sock_dir, "minion_event_{}_pull.ipc".format(id_hash) 310 ) 311 log.debug("%s PUB socket URI: %s", self.__class__.__name__, puburi) 312 log.debug("%s PULL socket URI: %s", self.__class__.__name__, pulluri) 313 return puburi, pulluri 314 315 def subscribe(self, tag=None, match_type=None): 316 """ 317 Subscribe to events matching the passed tag. 318 319 If you do not subscribe to a tag, events will be discarded by calls to 320 get_event that request a different tag. In contexts where many different 321 jobs are outstanding it is important to subscribe to prevent one call 322 to get_event from discarding a response required by a subsequent call 323 to get_event. 324 """ 325 if tag is None: 326 return 327 match_func = self._get_match_func(match_type) 328 self.pending_tags.append([tag, match_func]) 329 330 def unsubscribe(self, tag, match_type=None): 331 """ 332 Un-subscribe to events matching the passed tag. 333 """ 334 if tag is None: 335 return 336 match_func = self._get_match_func(match_type) 337 338 try: 339 self.pending_tags.remove([tag, match_func]) 340 except ValueError: 341 pass 342 343 old_events = self.pending_events 344 self.pending_events = [] 345 for evt in old_events: 346 if any( 347 pmatch_func(evt["tag"], ptag) for ptag, pmatch_func in self.pending_tags 348 ): 349 self.pending_events.append(evt) 350 351 def connect_pub(self, timeout=None): 352 """ 353 Establish the publish connection 354 """ 355 if self.cpub: 356 return True 357 358 if self._run_io_loop_sync: 359 with salt.utils.asynchronous.current_ioloop(self.io_loop): 360 if self.subscriber is None: 361 self.subscriber = salt.utils.asynchronous.SyncWrapper( 362 salt.transport.ipc.IPCMessageSubscriber, 363 args=(self.puburi,), 364 kwargs={"io_loop": self.io_loop}, 365 loop_kwarg="io_loop", 366 ) 367 try: 368 self.subscriber.connect(timeout=timeout) 369 self.cpub = True 370 except salt.ext.tornado.iostream.StreamClosedError: 371 log.error("Encountered StreamClosedException") 372 except OSError as exc: 373 if exc.errno != errno.ENOENT: 374 raise 375 log.error("Error opening stream, file does not exist") 376 except Exception as exc: # pylint: disable=broad-except 377 log.info( 378 "An exception occurred connecting publisher: %s", 379 exc, 380 exc_info_on_loglevel=logging.DEBUG, 381 ) 382 else: 383 if self.subscriber is None: 384 self.subscriber = salt.transport.ipc.IPCMessageSubscriber( 385 self.puburi, io_loop=self.io_loop 386 ) 387 388 # For the asynchronous case, the connect will be defered to when 389 # set_event_handler() is invoked. 390 self.cpub = True 391 return self.cpub 392 393 def close_pub(self): 394 """ 395 Close the publish connection (if established) 396 """ 397 if not self.cpub: 398 return 399 400 self.subscriber.close() 401 self.subscriber = None 402 self.pending_events = [] 403 self.cpub = False 404 405 def connect_pull(self, timeout=1): 406 """ 407 Establish a connection with the event pull socket 408 Default timeout is 1 s 409 """ 410 if self.cpush: 411 return True 412 413 if self._run_io_loop_sync: 414 with salt.utils.asynchronous.current_ioloop(self.io_loop): 415 if self.pusher is None: 416 self.pusher = salt.utils.asynchronous.SyncWrapper( 417 salt.transport.ipc.IPCMessageClient, 418 args=(self.pulluri,), 419 kwargs={"io_loop": self.io_loop}, 420 loop_kwarg="io_loop", 421 ) 422 try: 423 self.pusher.connect(timeout=timeout) 424 self.cpush = True 425 except Exception as exc: # pylint: disable=broad-except 426 log.error( 427 "Unable to connect pusher: %s", 428 exc, 429 exc_info_on_loglevel=logging.DEBUG, 430 ) 431 else: 432 if self.pusher is None: 433 self.pusher = salt.transport.ipc.IPCMessageClient( 434 self.pulluri, io_loop=self.io_loop 435 ) 436 # For the asynchronous case, the connect will be deferred to when 437 # fire_event() is invoked. 438 self.cpush = True 439 return self.cpush 440 441 def close_pull(self): 442 """ 443 Close the pusher connection (if established) 444 """ 445 if not self.cpush: 446 return 447 448 self.pusher.close() 449 self.pusher = None 450 self.cpush = False 451 452 @classmethod 453 def unpack(cls, raw): 454 mtag, sep, mdata = raw.partition( 455 salt.utils.stringutils.to_bytes(TAGEND) 456 ) # split tag from data 457 mtag = salt.utils.stringutils.to_str(mtag) 458 data = salt.payload.loads(mdata, encoding="utf-8") 459 return mtag, data 460 461 def _get_match_func(self, match_type=None): 462 if match_type is None: 463 match_type = self.opts["event_match_type"] 464 return getattr(self, "_match_tag_{}".format(match_type), None) 465 466 def _check_pending(self, tag, match_func=None): 467 """Check the pending_events list for events that match the tag 468 469 :param tag: The tag to search for 470 :type tag: str 471 :param tags_regex: List of re expressions to search for also 472 :type tags_regex: list[re.compile()] 473 :return: 474 """ 475 if match_func is None: 476 match_func = self._get_match_func() 477 old_events = self.pending_events 478 self.pending_events = [] 479 ret = None 480 for evt in old_events: 481 if match_func(evt["tag"], tag): 482 if ret is None: 483 ret = evt 484 log.trace("get_event() returning cached event = %s", ret) 485 else: 486 self.pending_events.append(evt) 487 elif any( 488 pmatch_func(evt["tag"], ptag) for ptag, pmatch_func in self.pending_tags 489 ): 490 self.pending_events.append(evt) 491 else: 492 log.trace( 493 "get_event() discarding cached event that no longer has any" 494 " subscriptions = %s", 495 evt, 496 ) 497 return ret 498 499 @staticmethod 500 def _match_tag_startswith(event_tag, search_tag): 501 """ 502 Check if the event_tag matches the search check. 503 Uses startswith to check. 504 Return True (matches) or False (no match) 505 """ 506 return event_tag.startswith(search_tag) 507 508 @staticmethod 509 def _match_tag_endswith(event_tag, search_tag): 510 """ 511 Check if the event_tag matches the search check. 512 Uses endswith to check. 513 Return True (matches) or False (no match) 514 """ 515 return event_tag.endswith(search_tag) 516 517 @staticmethod 518 def _match_tag_find(event_tag, search_tag): 519 """ 520 Check if the event_tag matches the search check. 521 Uses find to check. 522 Return True (matches) or False (no match) 523 """ 524 return event_tag.find(search_tag) >= 0 525 526 def _match_tag_regex(self, event_tag, search_tag): 527 """ 528 Check if the event_tag matches the search check. 529 Uses regular expression search to check. 530 Return True (matches) or False (no match) 531 """ 532 return self.cache_regex.get(search_tag).search(event_tag) is not None 533 534 def _match_tag_fnmatch(self, event_tag, search_tag): 535 """ 536 Check if the event_tag matches the search check. 537 Uses fnmatch to check. 538 Return True (matches) or False (no match) 539 """ 540 return fnmatch.fnmatch(event_tag, search_tag) 541 542 def _subproxy_match(self, data): 543 if self.opts.get("subproxy", False): 544 return self.opts["id"] == data.get("proxy_target", None) 545 return True 546 547 def _get_event(self, wait, tag, match_func=None, no_block=False): 548 if match_func is None: 549 match_func = self._get_match_func() 550 start = time.time() 551 timeout_at = start + wait 552 run_once = False 553 if no_block is True: 554 wait = 0 555 elif wait == 0: 556 # If no_block is False and wait is 0, that 557 # means an infinite timeout. 558 wait = None 559 while (run_once is False and not wait) or time.time() <= timeout_at: 560 if no_block is True: 561 if run_once is True: 562 break 563 # Trigger that at least a single iteration has gone through 564 run_once = True 565 try: 566 if not self.cpub and not self.connect_pub(timeout=wait): 567 break 568 raw = self.subscriber.read(timeout=wait) 569 if raw is None: 570 break 571 mtag, data = self.unpack(raw) 572 ret = {"data": data, "tag": mtag} 573 except KeyboardInterrupt: 574 return {"tag": "salt/event/exit", "data": {}} 575 except salt.ext.tornado.iostream.StreamClosedError: 576 if self.raise_errors: 577 raise 578 else: 579 return None 580 except RuntimeError: 581 return None 582 583 if not match_func(ret["tag"], tag) or not self._subproxy_match(ret["data"]): 584 # tag not match 585 if any( 586 pmatch_func(ret["tag"], ptag) 587 for ptag, pmatch_func in self.pending_tags 588 ): 589 log.trace("get_event() caching unwanted event = %s", ret) 590 self.pending_events.append(ret) 591 if wait: # only update the wait timeout if we had one 592 wait = timeout_at - time.time() 593 continue 594 595 log.trace("get_event() received = %s", ret) 596 return ret 597 log.trace("_get_event() waited %s seconds and received nothing", wait) 598 return None 599 600 def get_event( 601 self, 602 wait=5, 603 tag="", 604 full=False, 605 match_type=None, 606 no_block=False, 607 auto_reconnect=False, 608 ): 609 """ 610 Get a single publication. 611 If no publication is available, then block for up to ``wait`` seconds. 612 Return publication if it is available or ``None`` if no publication is 613 available. 614 615 If wait is 0, then block forever. 616 617 tag 618 Only return events matching the given tag. If not specified, or set 619 to an empty string, all events are returned. It is recommended to 620 always be selective on what is to be returned in the event that 621 multiple requests are being multiplexed. 622 623 match_type 624 Set the function to match the search tag with event tags. 625 - 'startswith' : search for event tags that start with tag 626 - 'endswith' : search for event tags that end with tag 627 - 'find' : search for event tags that contain tag 628 - 'regex' : regex search '^' + tag event tags 629 - 'fnmatch' : fnmatch tag event tags matching 630 Default is opts['event_match_type'] or 'startswith' 631 632 .. versionadded:: 2015.8.0 633 634 no_block 635 Define if getting the event should be a blocking call or not. 636 Defaults to False to keep backwards compatibility. 637 638 .. versionadded:: 2015.8.0 639 640 Notes: 641 642 Searches cached publications first. If no cached publications are found 643 that match the given tag specification, new publications are received 644 and checked. 645 646 If a publication is received that does not match the tag specification, 647 it is DISCARDED unless it is subscribed to via subscribe() which will 648 cause it to be cached. 649 650 If a caller is not going to call get_event immediately after sending a 651 request, it MUST subscribe the result to ensure the response is not lost 652 should other regions of code call get_event for other purposes. 653 """ 654 log.trace("Get event. tag: %s", tag) 655 assert self._run_io_loop_sync 656 657 match_func = self._get_match_func(match_type) 658 659 ret = self._check_pending(tag, match_func) 660 if ret is None: 661 with salt.utils.asynchronous.current_ioloop(self.io_loop): 662 if auto_reconnect: 663 raise_errors = self.raise_errors 664 self.raise_errors = True 665 while True: 666 try: 667 ret = self._get_event(wait, tag, match_func, no_block) 668 break 669 except salt.ext.tornado.iostream.StreamClosedError: 670 self.close_pub() 671 self.connect_pub(timeout=wait) 672 continue 673 self.raise_errors = raise_errors 674 else: 675 ret = self._get_event(wait, tag, match_func, no_block) 676 677 if ret is None or full: 678 return ret 679 else: 680 return ret["data"] 681 682 def get_event_noblock(self): 683 """ 684 Get the raw event without blocking or any other niceties 685 """ 686 assert self._run_io_loop_sync 687 688 if not self.cpub: 689 if not self.connect_pub(): 690 return None 691 raw = self.subscriber._read(timeout=0) 692 if raw is None: 693 return None 694 mtag, data = self.unpack(raw) 695 return {"data": data, "tag": mtag} 696 697 def get_event_block(self): 698 """ 699 Get the raw event in a blocking fashion. This is slower, but it decreases the 700 possibility of dropped events. 701 """ 702 assert self._run_io_loop_sync 703 704 if not self.cpub: 705 if not self.connect_pub(): 706 return None 707 raw = self.subscriber._read(timeout=None) 708 if raw is None: 709 return None 710 mtag, data = self.unpack(raw) 711 return {"data": data, "tag": mtag} 712 713 def iter_events(self, tag="", full=False, match_type=None, auto_reconnect=False): 714 """ 715 Creates a generator that continuously listens for events 716 """ 717 while True: 718 data = self.get_event( 719 tag=tag, full=full, match_type=match_type, auto_reconnect=auto_reconnect 720 ) 721 if data is None: 722 continue 723 yield data 724 725 @salt.ext.tornado.gen.coroutine 726 def fire_event_async(self, data, tag, cb=None, timeout=1000): 727 """ 728 Send a single event into the publisher with payload dict "data" and 729 event identifier "tag" 730 731 The default is 1000 ms 732 """ 733 if self.opts.get("subproxy", False): 734 data["proxy_target"] = self.opts["id"] 735 736 if not str(tag): # no empty tags allowed 737 raise ValueError("Empty tag.") 738 739 if not isinstance(data, MutableMapping): # data must be dict 740 raise ValueError("Dict object expected, not '{}'.".format(data)) 741 742 if not self.cpush: 743 if timeout is not None: 744 timeout_s = float(timeout) / 1000 745 else: 746 timeout_s = None 747 if not self.connect_pull(timeout=timeout_s): 748 return False 749 750 data["_stamp"] = datetime.datetime.utcnow().isoformat() 751 752 tagend = TAGEND 753 # Since the pack / unpack logic here is for local events only, 754 # it is safe to change the wire protocol. The mechanism 755 # that sends events from minion to master is outside this 756 # file. 757 dump_data = salt.payload.dumps(data, use_bin_type=True) 758 759 serialized_data = salt.utils.dicttrim.trim_dict( 760 dump_data, 761 self.opts["max_event_size"], 762 is_msgpacked=True, 763 use_bin_type=True, 764 ) 765 log.debug("Sending event: tag = %s; data = %s", tag, data) 766 event = b"".join( 767 [ 768 salt.utils.stringutils.to_bytes(tag), 769 salt.utils.stringutils.to_bytes(tagend), 770 serialized_data, 771 ] 772 ) 773 msg = salt.utils.stringutils.to_bytes(event, "utf-8") 774 ret = yield self.pusher.send(msg) 775 if cb is not None: 776 cb(ret) 777 778 def fire_event(self, data, tag, timeout=1000): 779 """ 780 Send a single event into the publisher with payload dict "data" and 781 event identifier "tag" 782 783 The default is 1000 ms 784 """ 785 if self.opts.get("subproxy", False): 786 data["proxy_target"] = self.opts["id"] 787 788 if not str(tag): # no empty tags allowed 789 raise ValueError("Empty tag.") 790 791 if not isinstance(data, MutableMapping): # data must be dict 792 raise ValueError("Dict object expected, not '{}'.".format(data)) 793 794 if not self.cpush: 795 if timeout is not None: 796 timeout_s = float(timeout) / 1000 797 else: 798 timeout_s = None 799 if not self.connect_pull(timeout=timeout_s): 800 return False 801 802 data["_stamp"] = datetime.datetime.utcnow().isoformat() 803 804 tagend = TAGEND 805 # Since the pack / unpack logic here is for local events only, 806 # it is safe to change the wire protocol. The mechanism 807 # that sends events from minion to master is outside this 808 # file. 809 dump_data = salt.payload.dumps(data, use_bin_type=True) 810 811 serialized_data = salt.utils.dicttrim.trim_dict( 812 dump_data, 813 self.opts["max_event_size"], 814 is_msgpacked=True, 815 use_bin_type=True, 816 ) 817 log.debug("Sending event: tag = %s; data = %s", tag, data) 818 event = b"".join( 819 [ 820 salt.utils.stringutils.to_bytes(tag), 821 salt.utils.stringutils.to_bytes(tagend), 822 serialized_data, 823 ] 824 ) 825 msg = salt.utils.stringutils.to_bytes(event, "utf-8") 826 if self._run_io_loop_sync: 827 with salt.utils.asynchronous.current_ioloop(self.io_loop): 828 try: 829 self.pusher.send(msg) 830 except Exception as exc: # pylint: disable=broad-except 831 log.debug( 832 "Publisher send failed with exception: %s", 833 exc, 834 exc_info_on_loglevel=logging.DEBUG, 835 ) 836 raise 837 else: 838 self.io_loop.spawn_callback(self.pusher.send, msg) 839 return True 840 841 def fire_master(self, data, tag, timeout=1000): 842 """' 843 Send a single event to the master, with the payload "data" and the 844 event identifier "tag". 845 846 Default timeout is 1000ms 847 """ 848 msg = {"tag": tag, "data": data, "events": None, "pretag": None} 849 return self.fire_event(msg, "fire_master", timeout) 850 851 def destroy(self): 852 if self.subscriber is not None: 853 self.close_pub() 854 if self.pusher is not None: 855 self.close_pull() 856 if self._run_io_loop_sync and not self.keep_loop: 857 self.io_loop.close() 858 859 def _fire_ret_load_specific_fun(self, load, fun_index=0): 860 """ 861 Helper function for fire_ret_load 862 """ 863 if isinstance(load["fun"], list): 864 # Multi-function job 865 fun = load["fun"][fun_index] 866 # 'retcode' was already validated to exist and be non-zero 867 # for the given function in the caller. 868 if isinstance(load["retcode"], list): 869 # Multi-function ordered 870 ret = load.get("return") 871 if isinstance(ret, list) and len(ret) > fun_index: 872 ret = ret[fun_index] 873 else: 874 ret = {} 875 retcode = load["retcode"][fun_index] 876 else: 877 ret = load.get("return", {}) 878 ret = ret.get(fun, {}) 879 retcode = load["retcode"][fun] 880 else: 881 # Single-function job 882 fun = load["fun"] 883 ret = load.get("return", {}) 884 retcode = load["retcode"] 885 886 try: 887 for tag, data in ret.items(): 888 data["retcode"] = retcode 889 tags = tag.split("_|-") 890 if data.get("result") is False: 891 self.fire_event( 892 data, "{}.{}".format(tags[0], tags[-1]) 893 ) # old dup event 894 data["jid"] = load["jid"] 895 data["id"] = load["id"] 896 data["success"] = False 897 data["return"] = "Error: {}.{}".format(tags[0], tags[-1]) 898 data["fun"] = fun 899 data["user"] = load["user"] 900 self.fire_event( 901 data, 902 tagify([load["jid"], "sub", load["id"], "error", fun], "job"), 903 ) 904 except Exception as exc: # pylint: disable=broad-except 905 log.error( 906 "Event iteration failed with exception: %s", 907 exc, 908 exc_info_on_loglevel=logging.DEBUG, 909 ) 910 911 def fire_ret_load(self, load): 912 """ 913 Fire events based on information in the return load 914 """ 915 if load.get("retcode") and load.get("fun"): 916 if isinstance(load["fun"], list): 917 # Multi-function job 918 if isinstance(load["retcode"], list): 919 multifunc_ordered = True 920 else: 921 multifunc_ordered = False 922 923 for fun_index in range(0, len(load["fun"])): 924 fun = load["fun"][fun_index] 925 if multifunc_ordered: 926 if ( 927 len(load["retcode"]) > fun_index 928 and load["retcode"][fun_index] 929 and fun in SUB_EVENT 930 ): 931 # Minion fired a bad retcode, fire an event 932 self._fire_ret_load_specific_fun(load, fun_index) 933 else: 934 if load["retcode"].get(fun, 0) and fun in SUB_EVENT: 935 # Minion fired a bad retcode, fire an event 936 self._fire_ret_load_specific_fun(load, fun_index) 937 else: 938 # Single-function job 939 if load["fun"] in SUB_EVENT: 940 # Minion fired a bad retcode, fire an event 941 self._fire_ret_load_specific_fun(load) 942 943 def set_event_handler(self, event_handler): 944 """ 945 Invoke the event_handler callback each time an event arrives. 946 """ 947 assert not self._run_io_loop_sync 948 949 if not self.cpub: 950 self.connect_pub() 951 # This will handle reconnects 952 return self.subscriber.read_async(event_handler) 953 954 # pylint: disable=W1701 955 def __del__(self): 956 # skip exceptions in destroy-- since destroy() doesn't cover interpreter 957 # shutdown-- where globals start going missing 958 try: 959 self.destroy() 960 except Exception: # pylint: disable=broad-except 961 pass 962 963 # pylint: enable=W1701 964 965 def __enter__(self): 966 return self 967 968 def __exit__(self, *args): 969 self.destroy() 970 971 972class MasterEvent(SaltEvent): 973 """ 974 Warning! Use the get_event function or the code will not be 975 RAET compatible 976 Create a master event management object 977 """ 978 979 def __init__( 980 self, 981 sock_dir, 982 opts=None, 983 listen=True, 984 io_loop=None, 985 keep_loop=False, 986 raise_errors=False, 987 ): 988 super().__init__( 989 "master", 990 sock_dir, 991 opts, 992 listen=listen, 993 io_loop=io_loop, 994 keep_loop=keep_loop, 995 raise_errors=raise_errors, 996 ) 997 998 999class LocalClientEvent(MasterEvent): 1000 """ 1001 Warning! Use the get_event function or the code will not be 1002 RAET compatible 1003 This class is just used to differentiate who is handling the events, 1004 specially on logs, but it's the same as MasterEvent. 1005 """ 1006 1007 1008class NamespacedEvent: 1009 """ 1010 A wrapper for sending events within a specific base namespace 1011 """ 1012 1013 def __init__(self, event, base, print_func=None): 1014 self.event = event 1015 self.base = base 1016 self.print_func = print_func 1017 1018 def fire_event(self, data, tag): 1019 self.event.fire_event(data, tagify(tag, base=self.base)) 1020 if self.print_func is not None: 1021 self.print_func(tag, data) 1022 1023 def destroy(self): 1024 self.event.destroy() 1025 1026 def __enter__(self): 1027 return self 1028 1029 def __exit__(self, *args): 1030 self.destroy() 1031 1032 1033class MinionEvent(SaltEvent): 1034 """ 1035 Warning! Use the get_event function or the code will not be 1036 RAET compatible 1037 Create a master event management object 1038 """ 1039 1040 def __init__(self, opts, listen=True, io_loop=None, raise_errors=False): 1041 super().__init__( 1042 "minion", 1043 sock_dir=opts.get("sock_dir"), 1044 opts=opts, 1045 listen=listen, 1046 io_loop=io_loop, 1047 raise_errors=raise_errors, 1048 ) 1049 1050 1051class AsyncEventPublisher: 1052 """ 1053 An event publisher class intended to run in an ioloop (within a single process) 1054 1055 TODO: remove references to "minion_event" whenever we need to use this for other things 1056 """ 1057 1058 def __init__(self, opts, io_loop=None): 1059 self.opts = salt.config.DEFAULT_MINION_OPTS.copy() 1060 default_minion_sock_dir = self.opts["sock_dir"] 1061 self.opts.update(opts) 1062 1063 self.io_loop = io_loop or salt.ext.tornado.ioloop.IOLoop.current() 1064 self._closing = False 1065 self.publisher = None 1066 self.puller = None 1067 1068 hash_type = getattr(hashlib, self.opts["hash_type"]) 1069 # Only use the first 10 chars to keep longer hashes from exceeding the 1070 # max socket path length. 1071 id_hash = hash_type( 1072 salt.utils.stringutils.to_bytes(self.opts["id"]) 1073 ).hexdigest()[:10] 1074 epub_sock_path = os.path.join( 1075 self.opts["sock_dir"], "minion_event_{}_pub.ipc".format(id_hash) 1076 ) 1077 if os.path.exists(epub_sock_path): 1078 os.unlink(epub_sock_path) 1079 epull_sock_path = os.path.join( 1080 self.opts["sock_dir"], "minion_event_{}_pull.ipc".format(id_hash) 1081 ) 1082 if os.path.exists(epull_sock_path): 1083 os.unlink(epull_sock_path) 1084 1085 if self.opts["ipc_mode"] == "tcp": 1086 epub_uri = int(self.opts["tcp_pub_port"]) 1087 epull_uri = int(self.opts["tcp_pull_port"]) 1088 else: 1089 epub_uri = epub_sock_path 1090 epull_uri = epull_sock_path 1091 1092 log.debug("%s PUB socket URI: %s", self.__class__.__name__, epub_uri) 1093 log.debug("%s PULL socket URI: %s", self.__class__.__name__, epull_uri) 1094 1095 minion_sock_dir = self.opts["sock_dir"] 1096 1097 if not os.path.isdir(minion_sock_dir): 1098 # Let's try to create the directory defined on the configuration 1099 # file 1100 try: 1101 os.makedirs(minion_sock_dir, 0o755) 1102 except OSError as exc: 1103 log.error("Could not create SOCK_DIR: %s", exc) 1104 # Let's not fail yet and try using the default path 1105 if minion_sock_dir == default_minion_sock_dir: 1106 # We're already trying the default system path, stop now! 1107 raise 1108 1109 if not os.path.isdir(default_minion_sock_dir): 1110 try: 1111 os.makedirs(default_minion_sock_dir, 0o755) 1112 except OSError as exc: 1113 log.error("Could not create SOCK_DIR: %s", exc) 1114 # Let's stop at this stage 1115 raise 1116 1117 self.publisher = salt.transport.ipc.IPCMessagePublisher( 1118 self.opts, epub_uri, io_loop=self.io_loop 1119 ) 1120 1121 self.puller = salt.transport.ipc.IPCMessageServer( 1122 epull_uri, io_loop=self.io_loop, payload_handler=self.handle_publish 1123 ) 1124 1125 log.info("Starting pull socket on %s", epull_uri) 1126 with salt.utils.files.set_umask(0o177): 1127 self.publisher.start() 1128 self.puller.start() 1129 1130 def handle_publish(self, package, _): 1131 """ 1132 Get something from epull, publish it out epub, and return the package (or None) 1133 """ 1134 try: 1135 self.publisher.publish(package) 1136 return package 1137 # Add an extra fallback in case a forked process leeks through 1138 except Exception: # pylint: disable=broad-except 1139 log.critical("Unexpected error while polling minion events", exc_info=True) 1140 return None 1141 1142 def close(self): 1143 if self._closing: 1144 return 1145 self._closing = True 1146 if self.publisher is not None: 1147 self.publisher.close() 1148 if self.puller is not None: 1149 self.puller.close() 1150 1151 1152class EventPublisher(salt.utils.process.SignalHandlingProcess): 1153 """ 1154 The interface that takes master events and republishes them out to anyone 1155 who wants to listen 1156 """ 1157 1158 def __init__(self, opts, **kwargs): 1159 super().__init__(**kwargs) 1160 self.opts = salt.config.DEFAULT_MASTER_OPTS.copy() 1161 self.opts.update(opts) 1162 self._closing = False 1163 self.io_loop = None 1164 self.puller = None 1165 self.publisher = None 1166 1167 def run(self): 1168 """ 1169 Bind the pub and pull sockets for events 1170 """ 1171 salt.utils.process.appendproctitle(self.__class__.__name__) 1172 1173 if ( 1174 self.opts["event_publisher_niceness"] 1175 and not salt.utils.platform.is_windows() 1176 ): 1177 log.info( 1178 "setting EventPublisher niceness to %i", 1179 self.opts["event_publisher_niceness"], 1180 ) 1181 os.nice(self.opts["event_publisher_niceness"]) 1182 1183 self.io_loop = salt.ext.tornado.ioloop.IOLoop() 1184 with salt.utils.asynchronous.current_ioloop(self.io_loop): 1185 if self.opts["ipc_mode"] == "tcp": 1186 epub_uri = int(self.opts["tcp_master_pub_port"]) 1187 epull_uri = int(self.opts["tcp_master_pull_port"]) 1188 else: 1189 epub_uri = os.path.join(self.opts["sock_dir"], "master_event_pub.ipc") 1190 epull_uri = os.path.join(self.opts["sock_dir"], "master_event_pull.ipc") 1191 1192 self.publisher = salt.transport.ipc.IPCMessagePublisher( 1193 self.opts, epub_uri, io_loop=self.io_loop 1194 ) 1195 1196 self.puller = salt.transport.ipc.IPCMessageServer( 1197 epull_uri, 1198 io_loop=self.io_loop, 1199 payload_handler=self.handle_publish, 1200 ) 1201 1202 # Start the master event publisher 1203 with salt.utils.files.set_umask(0o177): 1204 self.publisher.start() 1205 self.puller.start() 1206 if self.opts["ipc_mode"] != "tcp" and ( 1207 self.opts["publisher_acl"] or self.opts["external_auth"] 1208 ): 1209 os.chmod( # nosec 1210 os.path.join(self.opts["sock_dir"], "master_event_pub.ipc"), 1211 0o666, 1212 ) 1213 1214 atexit.register(self.close) 1215 with contextlib.suppress(KeyboardInterrupt): 1216 try: 1217 self.io_loop.start() 1218 finally: 1219 # Make sure the IO loop and respective sockets are closed and destroyed 1220 self.close() 1221 1222 def handle_publish(self, package, _): 1223 """ 1224 Get something from epull, publish it out epub, and return the package (or None) 1225 """ 1226 try: 1227 self.publisher.publish(package) 1228 return package 1229 # Add an extra fallback in case a forked process leeks through 1230 except Exception: # pylint: disable=broad-except 1231 log.critical("Unexpected error while polling master events", exc_info=True) 1232 return None 1233 1234 def close(self): 1235 if self._closing: 1236 return 1237 self._closing = True 1238 atexit.unregister(self.close) 1239 if self.publisher is not None: 1240 self.publisher.close() 1241 self.publisher = None 1242 if self.puller is not None: 1243 self.puller.close() 1244 self.puller = None 1245 if self.io_loop is not None: 1246 self.io_loop.close() 1247 self.io_loop = None 1248 1249 def _handle_signals(self, signum, sigframe): 1250 self.close() 1251 super()._handle_signals(signum, sigframe) 1252 1253 1254class EventReturn(salt.utils.process.SignalHandlingProcess): 1255 """ 1256 A dedicated process which listens to the master event bus and queues 1257 and forwards events to the specified returner. 1258 """ 1259 1260 def __init__(self, opts, **kwargs): 1261 """ 1262 Initialize the EventReturn system 1263 1264 Return an EventReturn instance 1265 """ 1266 # This is required because the process is forked and the module no 1267 # longer exists in the global namespace. 1268 import salt.minion 1269 1270 super().__init__(**kwargs) 1271 1272 self.opts = opts 1273 self.event_return_queue = self.opts["event_return_queue"] 1274 self.event_return_queue_max_seconds = self.opts.get( 1275 "event_return_queue_max_seconds", 0 1276 ) 1277 local_minion_opts = self.opts.copy() 1278 local_minion_opts["file_client"] = "local" 1279 self.minion = salt.minion.MasterMinion(local_minion_opts) 1280 self.event_queue = [] 1281 self.stop = False 1282 1283 def _handle_signals(self, signum, sigframe): 1284 # Flush and terminate 1285 if self.event_queue: 1286 self.flush_events() 1287 self.stop = True 1288 super()._handle_signals(signum, sigframe) 1289 1290 def flush_events(self): 1291 if isinstance(self.opts["event_return"], list): 1292 # Multiple event returners 1293 for r in self.opts["event_return"]: 1294 log.debug("Calling event returner %s, one of many.", r) 1295 event_return = "{}.event_return".format(r) 1296 self._flush_event_single(event_return) 1297 else: 1298 # Only a single event returner 1299 log.debug( 1300 "Calling event returner %s, only one configured.", 1301 self.opts["event_return"], 1302 ) 1303 event_return = "{}.event_return".format(self.opts["event_return"]) 1304 self._flush_event_single(event_return) 1305 del self.event_queue[:] 1306 1307 def _flush_event_single(self, event_return): 1308 if event_return in self.minion.returners: 1309 try: 1310 self.minion.returners[event_return](self.event_queue) 1311 except Exception as exc: # pylint: disable=broad-except 1312 log.error( 1313 "Could not store events - returner '%s' raised exception: %s", 1314 event_return, 1315 exc, 1316 ) 1317 # don't waste processing power unnecessarily on converting a 1318 # potentially huge dataset to a string 1319 if log.level <= logging.DEBUG: 1320 log.debug( 1321 "Event data that caused an exception: %s", self.event_queue 1322 ) 1323 else: 1324 log.error( 1325 "Could not store return for event(s) - returner '%s' not found.", 1326 event_return, 1327 ) 1328 1329 def run(self): 1330 """ 1331 Spin up the multiprocess event returner 1332 """ 1333 salt.utils.process.appendproctitle(self.__class__.__name__) 1334 1335 if self.opts["event_return_niceness"] and not salt.utils.platform.is_windows(): 1336 log.info( 1337 "setting EventReturn niceness to %i", self.opts["event_return_niceness"] 1338 ) 1339 os.nice(self.opts["event_return_niceness"]) 1340 1341 self.event = get_event("master", opts=self.opts, listen=True) 1342 events = self.event.iter_events(full=True) 1343 self.event.fire_event({}, "salt/event_listen/start") 1344 try: 1345 # events below is a generator, we will iterate until we get the salt/event/exit tag 1346 oldestevent = None 1347 for event in events: 1348 1349 if event["tag"] == "salt/event/exit": 1350 # We're done eventing 1351 self.stop = True 1352 if self._filter(event): 1353 # This event passed the filter, add it to the queue 1354 self.event_queue.append(event) 1355 too_long_in_queue = False 1356 1357 # if max_seconds is >0, then we want to make sure we flush the queue 1358 # every event_return_queue_max_seconds seconds, If it's 0, don't 1359 # apply any of this logic 1360 if self.event_return_queue_max_seconds > 0: 1361 rightnow = datetime.datetime.now() 1362 if not oldestevent: 1363 oldestevent = rightnow 1364 age_in_seconds = (rightnow - oldestevent).seconds 1365 if age_in_seconds > 0: 1366 log.debug( 1367 "Oldest event in queue is %s seconds old.", age_in_seconds 1368 ) 1369 if age_in_seconds >= self.event_return_queue_max_seconds: 1370 too_long_in_queue = True 1371 oldestevent = None 1372 else: 1373 too_long_in_queue = False 1374 1375 if too_long_in_queue: 1376 log.debug( 1377 "Oldest event has been in queue too long, will flush queue" 1378 ) 1379 1380 # If we are over the max queue size or the oldest item in the queue has been there too long 1381 # then flush the queue 1382 if ( 1383 len(self.event_queue) >= self.event_return_queue 1384 or too_long_in_queue 1385 ): 1386 log.debug("Flushing %s events.", len(self.event_queue)) 1387 self.flush_events() 1388 oldestevent = None 1389 if self.stop: 1390 # We saw the salt/event/exit tag, we can stop eventing 1391 break 1392 finally: # flush all we have at this moment 1393 # No matter what, make sure we flush the queue even when we are exiting 1394 # and there will be no more events. 1395 if self.event_queue: 1396 log.debug("Flushing %s events.", len(self.event_queue)) 1397 1398 self.flush_events() 1399 1400 def _filter(self, event): 1401 """ 1402 Take an event and run it through configured filters. 1403 1404 Returns True if event should be stored, else False 1405 """ 1406 tag = event["tag"] 1407 if self.opts["event_return_whitelist"]: 1408 ret = False 1409 else: 1410 ret = True 1411 for whitelist_match in self.opts["event_return_whitelist"]: 1412 if fnmatch.fnmatch(tag, whitelist_match): 1413 ret = True 1414 break 1415 for blacklist_match in self.opts["event_return_blacklist"]: 1416 if fnmatch.fnmatch(tag, blacklist_match): 1417 ret = False 1418 break 1419 return ret 1420 1421 1422class StateFire: 1423 """ 1424 Evaluate the data from a state run and fire events on the master and minion 1425 for each returned chunk that is not "green" 1426 This object is made to only run on a minion 1427 """ 1428 1429 def __init__(self, opts, auth=None): 1430 self.opts = opts 1431 if not auth: 1432 self.auth = salt.crypt.SAuth(self.opts) 1433 else: 1434 self.auth = auth 1435 1436 def fire_master(self, data, tag, preload=None): 1437 """ 1438 Fire an event off on the master server 1439 1440 CLI Example: 1441 1442 .. code-block:: bash 1443 1444 salt '*' event.fire_master 'stuff to be in the event' 'tag' 1445 """ 1446 load = {} 1447 if preload: 1448 load.update(preload) 1449 1450 load.update( 1451 { 1452 "id": self.opts["id"], 1453 "tag": tag, 1454 "data": data, 1455 "cmd": "_minion_event", 1456 "tok": self.auth.gen_token(b"salt"), 1457 } 1458 ) 1459 1460 with salt.transport.client.ReqChannel.factory(self.opts) as channel: 1461 try: 1462 channel.send(load) 1463 except Exception as exc: # pylint: disable=broad-except 1464 log.info( 1465 "An exception occurred on fire_master: %s", 1466 exc, 1467 exc_info_on_loglevel=logging.DEBUG, 1468 ) 1469 return True 1470 1471 def fire_running(self, running): 1472 """ 1473 Pass in a state "running" dict, this is the return dict from a state 1474 call. The dict will be processed and fire events. 1475 1476 By default yellows and reds fire events on the master and minion, but 1477 this can be configured. 1478 """ 1479 load = {"id": self.opts["id"], "events": [], "cmd": "_minion_event"} 1480 for stag in sorted(running, key=lambda k: running[k].get("__run_num__", 0)): 1481 if running[stag]["result"] and not running[stag]["changes"]: 1482 continue 1483 tag = "state_{}_{}".format( 1484 str(running[stag]["result"]), 1485 "True" if running[stag]["changes"] else "False", 1486 ) 1487 load["events"].append({"tag": tag, "data": running[stag]}) 1488 with salt.transport.client.ReqChannel.factory(self.opts) as channel: 1489 try: 1490 channel.send(load) 1491 except Exception as exc: # pylint: disable=broad-except 1492 log.info( 1493 "An exception occurred on fire_master: %s", 1494 exc, 1495 exc_info_on_loglevel=logging.DEBUG, 1496 ) 1497 return True 1498