1""" 2Zeromq transport classes 3""" 4import errno 5import hashlib 6import logging 7import os 8import signal 9import sys 10import threading 11import uuid 12from random import randint 13 14import salt.auth 15import salt.crypt 16import salt.ext.tornado 17import salt.ext.tornado.concurrent 18import salt.ext.tornado.gen 19import salt.ext.tornado.ioloop 20import salt.log.setup 21import salt.payload 22import salt.transport.client 23import salt.transport.mixins.auth 24import salt.transport.server 25import salt.utils.event 26import salt.utils.files 27import salt.utils.minions 28import salt.utils.process 29import salt.utils.stringutils 30import salt.utils.verify 31import salt.utils.versions 32import salt.utils.zeromq 33import zmq.error 34import zmq.eventloop.ioloop 35import zmq.eventloop.zmqstream 36from salt._compat import ipaddress 37from salt.exceptions import SaltException, SaltReqTimeoutError 38from salt.utils.zeromq import LIBZMQ_VERSION_INFO, ZMQ_VERSION_INFO, zmq 39 40try: 41 import zmq.utils.monitor 42 43 HAS_ZMQ_MONITOR = True 44except ImportError: 45 HAS_ZMQ_MONITOR = False 46 47 48try: 49 from M2Crypto import RSA 50 51 HAS_M2 = True 52except ImportError: 53 HAS_M2 = False 54 try: 55 from Cryptodome.Cipher import PKCS1_OAEP 56 except ImportError: 57 from Crypto.Cipher import PKCS1_OAEP # nosec 58 59 60log = logging.getLogger(__name__) 61 62 63def _get_master_uri(master_ip, master_port, source_ip=None, source_port=None): 64 """ 65 Return the ZeroMQ URI to connect the Minion to the Master. 66 It supports different source IP / port, given the ZeroMQ syntax: 67 // Connecting using a IP address and bind to an IP address 68 rc = zmq_connect(socket, "tcp://192.168.1.17:5555;192.168.1.1:5555"); assert (rc == 0); 69 Source: http://api.zeromq.org/4-1:zmq-tcp 70 """ 71 72 from salt.utils.zeromq import ip_bracket 73 74 master_uri = "tcp://{master_ip}:{master_port}".format( 75 master_ip=ip_bracket(master_ip), master_port=master_port 76 ) 77 if source_ip or source_port: 78 if LIBZMQ_VERSION_INFO >= (4, 1, 6) and ZMQ_VERSION_INFO >= (16, 0, 1): 79 # The source:port syntax for ZeroMQ has been added in libzmq 4.1.6 80 # which is included in the pyzmq wheels starting with 16.0.1. 81 if source_ip and source_port: 82 master_uri = ( 83 "tcp://{source_ip}:{source_port};{master_ip}:{master_port}".format( 84 source_ip=ip_bracket(source_ip), 85 source_port=source_port, 86 master_ip=ip_bracket(master_ip), 87 master_port=master_port, 88 ) 89 ) 90 elif source_ip and not source_port: 91 master_uri = "tcp://{source_ip}:0;{master_ip}:{master_port}".format( 92 source_ip=ip_bracket(source_ip), 93 master_ip=ip_bracket(master_ip), 94 master_port=master_port, 95 ) 96 elif source_port and not source_ip: 97 ip_any = ( 98 "0.0.0.0" 99 if ipaddress.ip_address(master_ip).version == 4 100 else ip_bracket("::") 101 ) 102 master_uri = ( 103 "tcp://{ip_any}:{source_port};{master_ip}:{master_port}".format( 104 ip_any=ip_any, 105 source_port=source_port, 106 master_ip=ip_bracket(master_ip), 107 master_port=master_port, 108 ) 109 ) 110 else: 111 log.warning( 112 "Unable to connect to the Master using a specific source IP / port" 113 ) 114 log.warning("Consider upgrading to pyzmq >= 16.0.1 and libzmq >= 4.1.6") 115 log.warning( 116 "Specific source IP / port for connecting to master returner port:" 117 " configuraion ignored" 118 ) 119 120 return master_uri 121 122 123class AsyncZeroMQReqChannel(salt.transport.client.ReqChannel): 124 """ 125 Encapsulate sending routines to ZeroMQ. 126 127 ZMQ Channels default to 'crypt=aes' 128 """ 129 130 async_methods = [ 131 "crypted_transfer_decode_dictentry", 132 "_crypted_transfer", 133 "_do_transfer", 134 "_uncrypted_transfer", 135 "send", 136 ] 137 close_methods = [ 138 "close", 139 ] 140 141 def __init__(self, opts, **kwargs): 142 self.opts = dict(opts) 143 self.ttype = "zeromq" 144 145 # crypt defaults to 'aes' 146 self.crypt = kwargs.get("crypt", "aes") 147 148 if "master_uri" in kwargs: 149 self.opts["master_uri"] = kwargs["master_uri"] 150 151 self._io_loop = kwargs.get("io_loop") 152 if self._io_loop is None: 153 self._io_loop = salt.ext.tornado.ioloop.IOLoop.current() 154 155 if self.crypt != "clear": 156 # we don't need to worry about auth as a kwarg, since its a singleton 157 self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self._io_loop) 158 log.debug( 159 "Connecting the Minion to the Master URI (for the return server): %s", 160 self.master_uri, 161 ) 162 self.message_client = AsyncReqMessageClientPool( 163 self.opts, 164 args=( 165 self.opts, 166 self.master_uri, 167 ), 168 kwargs={"io_loop": self._io_loop}, 169 ) 170 self._closing = False 171 172 def close(self): 173 """ 174 Since the message_client creates sockets and assigns them to the IOLoop we have to 175 specifically destroy them, since we aren't the only ones with references to the FDs 176 """ 177 if self._closing: 178 return 179 log.debug("Closing %s instance", self.__class__.__name__) 180 self._closing = True 181 if hasattr(self, "message_client"): 182 self.message_client.close() 183 184 # pylint: disable=W1701 185 def __del__(self): 186 try: 187 self.close() 188 except OSError as exc: 189 if exc.errno != errno.EBADF: 190 # If its not a bad file descriptor error, raise 191 raise 192 193 # pylint: enable=W1701 194 195 @property 196 def master_uri(self): 197 if "master_uri" in self.opts: 198 return self.opts["master_uri"] 199 200 # if by chance master_uri is not there.. 201 if "master_ip" in self.opts: 202 return _get_master_uri( 203 self.opts["master_ip"], 204 self.opts["master_port"], 205 source_ip=self.opts.get("source_ip"), 206 source_port=self.opts.get("source_ret_port"), 207 ) 208 209 # if we've reached here something is very abnormal 210 raise SaltException("ReqChannel: missing master_uri/master_ip in self.opts") 211 212 def _package_load(self, load): 213 return { 214 "enc": self.crypt, 215 "load": load, 216 "version": 2, 217 } 218 219 @salt.ext.tornado.gen.coroutine 220 def crypted_transfer_decode_dictentry( 221 self, load, dictkey=None, tries=3, timeout=60 222 ): 223 nonce = uuid.uuid4().hex 224 load["nonce"] = nonce 225 if not self.auth.authenticated: 226 # Return control back to the caller, continue when authentication succeeds 227 yield self.auth.authenticate() 228 229 # Return control to the caller. When send() completes, resume by 230 # populating ret with the Future.result 231 ret = yield self.message_client.send( 232 self._package_load(self.auth.crypticle.dumps(load)), 233 timeout=timeout, 234 tries=tries, 235 ) 236 237 if "key" not in ret: 238 # Reauth in the case our key is deleted on the master side. 239 yield self.auth.authenticate() 240 ret = yield self.message_client.send( 241 self._package_load(self.auth.crypticle.dumps(load)), 242 timeout=timeout, 243 tries=tries, 244 ) 245 246 key = self.auth.get_keys() 247 if HAS_M2: 248 aes = key.private_decrypt(ret["key"], RSA.pkcs1_oaep_padding) 249 else: 250 cipher = PKCS1_OAEP.new(key) 251 aes = cipher.decrypt(ret["key"]) 252 253 # Decrypt using the public key. 254 pcrypt = salt.crypt.Crypticle(self.opts, aes) 255 signed_msg = pcrypt.loads(ret[dictkey]) 256 257 # Validate the master's signature. 258 master_pubkey_path = os.path.join(self.opts["pki_dir"], "minion_master.pub") 259 if not salt.crypt.verify_signature( 260 master_pubkey_path, signed_msg["data"], signed_msg["sig"] 261 ): 262 raise salt.crypt.AuthenticationError( 263 "Pillar payload signature failed to validate." 264 ) 265 266 # Make sure the signed key matches the key we used to decrypt the data. 267 data = salt.payload.loads(signed_msg["data"]) 268 if data["key"] != ret["key"]: 269 raise salt.crypt.AuthenticationError("Key verification failed.") 270 271 # Validate the nonce. 272 if data["nonce"] != nonce: 273 raise salt.crypt.AuthenticationError("Pillar nonce verification failed.") 274 raise salt.ext.tornado.gen.Return(data["pillar"]) 275 276 @salt.ext.tornado.gen.coroutine 277 def _crypted_transfer(self, load, tries=3, timeout=60, raw=False): 278 """ 279 Send a load across the wire, with encryption 280 281 In case of authentication errors, try to renegotiate authentication 282 and retry the method. 283 284 Indeed, we can fail too early in case of a master restart during a 285 minion state execution call 286 287 :param dict load: A load to send across the wire 288 :param int tries: The number of times to make before failure 289 :param int timeout: The number of seconds on a response before failing 290 """ 291 nonce = uuid.uuid4().hex 292 if load and isinstance(load, dict): 293 load["nonce"] = nonce 294 295 @salt.ext.tornado.gen.coroutine 296 def _do_transfer(): 297 # Yield control to the caller. When send() completes, resume by populating data with the Future.result 298 data = yield self.message_client.send( 299 self._package_load(self.auth.crypticle.dumps(load)), 300 timeout=timeout, 301 tries=tries, 302 ) 303 # we may not have always data 304 # as for example for saltcall ret submission, this is a blind 305 # communication, we do not subscribe to return events, we just 306 # upload the results to the master 307 if data: 308 data = self.auth.crypticle.loads(data, raw, nonce) 309 if not raw: 310 data = salt.transport.frame.decode_embedded_strs(data) 311 raise salt.ext.tornado.gen.Return(data) 312 313 if not self.auth.authenticated: 314 # Return control back to the caller, resume when authentication succeeds 315 yield self.auth.authenticate() 316 try: 317 # We did not get data back the first time. Retry. 318 ret = yield _do_transfer() 319 except salt.crypt.AuthenticationError: 320 # If auth error, return control back to the caller, continue when authentication succeeds 321 yield self.auth.authenticate() 322 ret = yield _do_transfer() 323 raise salt.ext.tornado.gen.Return(ret) 324 325 @salt.ext.tornado.gen.coroutine 326 def _uncrypted_transfer(self, load, tries=3, timeout=60): 327 """ 328 Send a load across the wire in cleartext 329 330 :param dict load: A load to send across the wire 331 :param int tries: The number of times to make before failure 332 :param int timeout: The number of seconds on a response before failing 333 """ 334 ret = yield self.message_client.send( 335 self._package_load(load), 336 timeout=timeout, 337 tries=tries, 338 ) 339 340 raise salt.ext.tornado.gen.Return(ret) 341 342 @salt.ext.tornado.gen.coroutine 343 def send(self, load, tries=3, timeout=60, raw=False): 344 """ 345 Send a request, return a future which will complete when we send the message 346 """ 347 if self.crypt == "clear": 348 ret = yield self._uncrypted_transfer(load, tries=tries, timeout=timeout) 349 else: 350 ret = yield self._crypted_transfer( 351 load, tries=tries, timeout=timeout, raw=raw 352 ) 353 raise salt.ext.tornado.gen.Return(ret) 354 355 356class AsyncZeroMQPubChannel( 357 salt.transport.mixins.auth.AESPubClientMixin, salt.transport.client.AsyncPubChannel 358): 359 """ 360 A transport channel backed by ZeroMQ for a Salt Publisher to use to 361 publish commands to connected minions 362 """ 363 364 async_methods = [ 365 "connect", 366 "_decode_messages", 367 ] 368 close_methods = [ 369 "close", 370 ] 371 372 def __init__(self, opts, **kwargs): 373 self.opts = opts 374 self.ttype = "zeromq" 375 self.io_loop = kwargs.get("io_loop") 376 self._closing = False 377 378 if self.io_loop is None: 379 self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() 380 381 self.hexid = hashlib.sha1( 382 salt.utils.stringutils.to_bytes(self.opts["id"]) 383 ).hexdigest() 384 self.auth = salt.crypt.AsyncAuth(self.opts, io_loop=self.io_loop) 385 self.context = zmq.Context() 386 self._socket = self.context.socket(zmq.SUB) 387 388 if self.opts["zmq_filtering"]: 389 # TODO: constants file for "broadcast" 390 self._socket.setsockopt(zmq.SUBSCRIBE, b"broadcast") 391 if self.opts.get("__role") == "syndic": 392 self._socket.setsockopt(zmq.SUBSCRIBE, b"syndic") 393 else: 394 self._socket.setsockopt( 395 zmq.SUBSCRIBE, salt.utils.stringutils.to_bytes(self.hexid) 396 ) 397 else: 398 self._socket.setsockopt(zmq.SUBSCRIBE, b"") 399 400 self._socket.setsockopt( 401 zmq.IDENTITY, salt.utils.stringutils.to_bytes(self.opts["id"]) 402 ) 403 404 # TODO: cleanup all the socket opts stuff 405 if hasattr(zmq, "TCP_KEEPALIVE"): 406 self._socket.setsockopt(zmq.TCP_KEEPALIVE, self.opts["tcp_keepalive"]) 407 self._socket.setsockopt( 408 zmq.TCP_KEEPALIVE_IDLE, self.opts["tcp_keepalive_idle"] 409 ) 410 self._socket.setsockopt( 411 zmq.TCP_KEEPALIVE_CNT, self.opts["tcp_keepalive_cnt"] 412 ) 413 self._socket.setsockopt( 414 zmq.TCP_KEEPALIVE_INTVL, self.opts["tcp_keepalive_intvl"] 415 ) 416 417 recon_delay = self.opts["recon_default"] 418 419 if self.opts["recon_randomize"]: 420 recon_delay = randint( 421 self.opts["recon_default"], 422 self.opts["recon_default"] + self.opts["recon_max"], 423 ) 424 425 log.debug( 426 "Generated random reconnect delay between '%sms' and '%sms' (%s)", 427 self.opts["recon_default"], 428 self.opts["recon_default"] + self.opts["recon_max"], 429 recon_delay, 430 ) 431 432 log.debug("Setting zmq_reconnect_ivl to '%sms'", recon_delay) 433 self._socket.setsockopt(zmq.RECONNECT_IVL, recon_delay) 434 435 if hasattr(zmq, "RECONNECT_IVL_MAX"): 436 log.debug( 437 "Setting zmq_reconnect_ivl_max to '%sms'", 438 self.opts["recon_default"] + self.opts["recon_max"], 439 ) 440 441 self._socket.setsockopt(zmq.RECONNECT_IVL_MAX, self.opts["recon_max"]) 442 443 if (self.opts["ipv6"] is True or ":" in self.opts["master_ip"]) and hasattr( 444 zmq, "IPV4ONLY" 445 ): 446 # IPv6 sockets work for both IPv6 and IPv4 addresses 447 self._socket.setsockopt(zmq.IPV4ONLY, 0) 448 449 if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]: 450 self._monitor = ZeroMQSocketMonitor(self._socket) 451 self._monitor.start_io_loop(self.io_loop) 452 453 def close(self): 454 if self._closing is True: 455 return 456 457 self._closing = True 458 459 if hasattr(self, "_monitor") and self._monitor is not None: 460 self._monitor.stop() 461 self._monitor = None 462 if hasattr(self, "_stream"): 463 self._stream.close(0) 464 elif hasattr(self, "_socket"): 465 self._socket.close(0) 466 if hasattr(self, "context") and self.context.closed is False: 467 self.context.term() 468 469 # pylint: disable=W1701 470 def __del__(self): 471 self.close() 472 473 # pylint: enable=W1701 474 def __enter__(self): 475 return self 476 477 def __exit__(self, *args): 478 self.close() 479 480 # TODO: this is the time to see if we are connected, maybe use the req channel to guess? 481 @salt.ext.tornado.gen.coroutine 482 def connect(self): 483 if not self.auth.authenticated: 484 yield self.auth.authenticate() 485 486 # if this is changed from the default, we assume it was intentional 487 if int(self.opts.get("publish_port", 4506)) != 4506: 488 self.publish_port = self.opts.get("publish_port") 489 # else take the relayed publish_port master reports 490 else: 491 self.publish_port = self.auth.creds["publish_port"] 492 493 log.debug( 494 "Connecting the Minion to the Master publish port, using the URI: %s", 495 self.master_pub, 496 ) 497 self._socket.connect(self.master_pub) 498 499 @property 500 def master_pub(self): 501 """ 502 Return the master publish port 503 """ 504 return _get_master_uri( 505 self.opts["master_ip"], 506 self.publish_port, 507 source_ip=self.opts.get("source_ip"), 508 source_port=self.opts.get("source_publish_port"), 509 ) 510 511 @salt.ext.tornado.gen.coroutine 512 def _decode_messages(self, messages): 513 """ 514 Take the zmq messages, decrypt/decode them into a payload 515 516 :param list messages: A list of messages to be decoded 517 """ 518 messages_len = len(messages) 519 # if it was one message, then its old style 520 if messages_len == 1: 521 payload = salt.payload.loads(messages[0]) 522 # 2 includes a header which says who should do it 523 elif messages_len == 2: 524 message_target = salt.utils.stringutils.to_str(messages[0]) 525 if ( 526 self.opts.get("__role") != "syndic" 527 and message_target not in ("broadcast", self.hexid) 528 ) or ( 529 self.opts.get("__role") == "syndic" 530 and message_target not in ("broadcast", "syndic") 531 ): 532 log.debug("Publish received for not this minion: %s", message_target) 533 raise salt.ext.tornado.gen.Return(None) 534 payload = salt.payload.loads(messages[1]) 535 else: 536 raise Exception( 537 "Invalid number of messages ({}) in zeromq pubmessage from master".format( 538 len(messages_len) 539 ) 540 ) 541 # Yield control back to the caller. When the payload has been decoded, assign 542 # the decoded payload to 'ret' and resume operation 543 ret = yield self._decode_payload(payload) 544 raise salt.ext.tornado.gen.Return(ret) 545 546 @property 547 def stream(self): 548 """ 549 Return the current zmqstream, creating one if necessary 550 """ 551 if not hasattr(self, "_stream"): 552 self._stream = zmq.eventloop.zmqstream.ZMQStream( 553 self._socket, io_loop=self.io_loop 554 ) 555 return self._stream 556 557 def on_recv(self, callback): 558 """ 559 Register a callback for received messages (that we didn't initiate) 560 561 :param func callback: A function which should be called when data is received 562 """ 563 if callback is None: 564 return self.stream.on_recv(None) 565 566 @salt.ext.tornado.gen.coroutine 567 def wrap_callback(messages): 568 payload = yield self._decode_messages(messages) 569 if payload is not None: 570 callback(payload) 571 572 return self.stream.on_recv(wrap_callback) 573 574 575class ZeroMQReqServerChannel( 576 salt.transport.mixins.auth.AESReqServerMixin, salt.transport.server.ReqServerChannel 577): 578 def __init__(self, opts): 579 salt.transport.server.ReqServerChannel.__init__(self, opts) 580 self._closing = False 581 self._monitor = None 582 self._w_monitor = None 583 584 def zmq_device(self): 585 """ 586 Multiprocessing target for the zmq queue device 587 """ 588 self.__setup_signals() 589 salt.utils.process.appendproctitle("MWorkerQueue") 590 self.context = zmq.Context(self.opts["worker_threads"]) 591 # Prepare the zeromq sockets 592 self.uri = "tcp://{interface}:{ret_port}".format(**self.opts) 593 self.clients = self.context.socket(zmq.ROUTER) 594 if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"): 595 # IPv6 sockets work for both IPv6 and IPv4 addresses 596 self.clients.setsockopt(zmq.IPV4ONLY, 0) 597 self.clients.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000)) 598 self._start_zmq_monitor() 599 self.workers = self.context.socket(zmq.DEALER) 600 601 if self.opts["mworker_queue_niceness"] and not salt.utils.platform.is_windows(): 602 log.info( 603 "setting mworker_queue niceness to %d", 604 self.opts["mworker_queue_niceness"], 605 ) 606 os.nice(self.opts["mworker_queue_niceness"]) 607 608 if self.opts.get("ipc_mode", "") == "tcp": 609 self.w_uri = "tcp://127.0.0.1:{}".format( 610 self.opts.get("tcp_master_workers", 4515) 611 ) 612 else: 613 self.w_uri = "ipc://{}".format( 614 os.path.join(self.opts["sock_dir"], "workers.ipc") 615 ) 616 617 log.info("Setting up the master communication server") 618 self.clients.bind(self.uri) 619 self.workers.bind(self.w_uri) 620 621 while True: 622 if self.clients.closed or self.workers.closed: 623 break 624 try: 625 zmq.device(zmq.QUEUE, self.clients, self.workers) 626 except zmq.ZMQError as exc: 627 if exc.errno == errno.EINTR: 628 continue 629 raise 630 except (KeyboardInterrupt, SystemExit): 631 break 632 633 def close(self): 634 """ 635 Cleanly shutdown the router socket 636 """ 637 if self._closing: 638 return 639 log.info("MWorkerQueue under PID %s is closing", os.getpid()) 640 self._closing = True 641 if getattr(self, "_monitor", None) is not None: 642 self._monitor.stop() 643 self._monitor = None 644 if getattr(self, "_w_monitor", None) is not None: 645 self._w_monitor.stop() 646 self._w_monitor = None 647 if hasattr(self, "clients") and self.clients.closed is False: 648 self.clients.close() 649 if hasattr(self, "workers") and self.workers.closed is False: 650 self.workers.close() 651 if hasattr(self, "stream"): 652 self.stream.close() 653 if hasattr(self, "_socket") and self._socket.closed is False: 654 self._socket.close() 655 if hasattr(self, "context") and self.context.closed is False: 656 self.context.term() 657 658 def pre_fork(self, process_manager): 659 """ 660 Pre-fork we need to create the zmq router device 661 662 :param func process_manager: An instance of salt.utils.process.ProcessManager 663 """ 664 salt.transport.mixins.auth.AESReqServerMixin.pre_fork(self, process_manager) 665 process_manager.add_process(self.zmq_device) 666 667 def _start_zmq_monitor(self): 668 """ 669 Starts ZMQ monitor for debugging purposes. 670 :return: 671 """ 672 # Socket monitor shall be used the only for debug 673 # purposes so using threading doesn't look too bad here 674 675 if HAS_ZMQ_MONITOR and self.opts["zmq_monitor"]: 676 log.debug("Starting ZMQ monitor") 677 import threading 678 679 self._w_monitor = ZeroMQSocketMonitor(self._socket) 680 threading.Thread(target=self._w_monitor.start_poll).start() 681 log.debug("ZMQ monitor has been started started") 682 683 def post_fork(self, payload_handler, io_loop): 684 """ 685 After forking we need to create all of the local sockets to listen to the 686 router 687 688 :param func payload_handler: A function to called to handle incoming payloads as 689 they are picked up off the wire 690 :param IOLoop io_loop: An instance of a Tornado IOLoop, to handle event scheduling 691 """ 692 self.payload_handler = payload_handler 693 self.io_loop = io_loop 694 695 self.context = zmq.Context(1) 696 self._socket = self.context.socket(zmq.REP) 697 self._start_zmq_monitor() 698 699 if self.opts.get("ipc_mode", "") == "tcp": 700 self.w_uri = "tcp://127.0.0.1:{}".format( 701 self.opts.get("tcp_master_workers", 4515) 702 ) 703 else: 704 self.w_uri = "ipc://{}".format( 705 os.path.join(self.opts["sock_dir"], "workers.ipc") 706 ) 707 log.info("Worker binding to socket %s", self.w_uri) 708 self._socket.connect(self.w_uri) 709 710 salt.transport.mixins.auth.AESReqServerMixin.post_fork( 711 self, payload_handler, io_loop 712 ) 713 714 self.stream = zmq.eventloop.zmqstream.ZMQStream( 715 self._socket, io_loop=self.io_loop 716 ) 717 self.stream.on_recv_stream(self.handle_message) 718 719 @salt.ext.tornado.gen.coroutine 720 def handle_message(self, stream, payload): 721 """ 722 Handle incoming messages from underlying TCP streams 723 724 :stream ZMQStream stream: A ZeroMQ stream. 725 See http://zeromq.github.io/pyzmq/api/generated/zmq.eventloop.zmqstream.html 726 727 :param dict payload: A payload to process 728 """ 729 try: 730 payload = salt.payload.loads(payload[0]) 731 payload = self._decode_payload(payload) 732 except Exception as exc: # pylint: disable=broad-except 733 exc_type = type(exc).__name__ 734 if exc_type == "AuthenticationError": 735 log.debug( 736 "Minion failed to auth to master. Since the payload is " 737 "encrypted, it is not known which minion failed to " 738 "authenticate. It is likely that this is a transient " 739 "failure due to the master rotating its public key." 740 ) 741 else: 742 log.error("Bad load from minion: %s: %s", exc_type, exc) 743 stream.send(salt.payload.dumps("bad load")) 744 raise salt.ext.tornado.gen.Return() 745 746 # TODO helper functions to normalize payload? 747 if not isinstance(payload, dict) or not isinstance(payload.get("load"), dict): 748 log.error( 749 "payload and load must be a dict. Payload was: %s and load was %s", 750 payload, 751 payload.get("load"), 752 ) 753 stream.send(salt.payload.dumps("payload and load must be a dict")) 754 raise salt.ext.tornado.gen.Return() 755 756 try: 757 id_ = payload["load"].get("id", "") 758 if "\0" in id_: 759 log.error("Payload contains an id with a null byte: %s", payload) 760 stream.send(salt.payload.dumps("bad load: id contains a null byte")) 761 raise salt.ext.tornado.gen.Return() 762 except TypeError: 763 log.error("Payload contains non-string id: %s", payload) 764 stream.send( 765 salt.payload.dumps("bad load: id {} is not a string".format(id_)) 766 ) 767 raise salt.ext.tornado.gen.Return() 768 769 version = 0 770 if "version" in payload: 771 version = payload["version"] 772 773 sign_messages = False 774 if version > 1: 775 sign_messages = True 776 777 # intercept the "_auth" commands, since the main daemon shouldn't know 778 # anything about our key auth 779 if payload["enc"] == "clear" and payload.get("load", {}).get("cmd") == "_auth": 780 stream.send(salt.payload.dumps(self._auth(payload["load"], sign_messages))) 781 raise salt.ext.tornado.gen.Return() 782 783 nonce = None 784 if version > 1: 785 nonce = payload["load"].pop("nonce", None) 786 787 # TODO: test 788 try: 789 # Take the payload_handler function that was registered when we created the channel 790 # and call it, returning control to the caller until it completes 791 ret, req_opts = yield self.payload_handler(payload) 792 except Exception as e: # pylint: disable=broad-except 793 # always attempt to return an error to the minion 794 stream.send("Some exception handling minion payload") 795 log.error("Some exception handling a payload from minion", exc_info=True) 796 raise salt.ext.tornado.gen.Return() 797 798 req_fun = req_opts.get("fun", "send") 799 if req_fun == "send_clear": 800 stream.send(salt.payload.dumps(ret)) 801 elif req_fun == "send": 802 stream.send(salt.payload.dumps(self.crypticle.dumps(ret, nonce))) 803 elif req_fun == "send_private": 804 stream.send( 805 salt.payload.dumps( 806 self._encrypt_private( 807 ret, 808 req_opts["key"], 809 req_opts["tgt"], 810 nonce, 811 sign_messages, 812 ) 813 ) 814 ) 815 else: 816 log.error("Unknown req_fun %s", req_fun) 817 # always attempt to return an error to the minion 818 stream.send("Server-side exception handling payload") 819 raise salt.ext.tornado.gen.Return() 820 821 def __setup_signals(self): 822 signal.signal(signal.SIGINT, self._handle_signals) 823 signal.signal(signal.SIGTERM, self._handle_signals) 824 825 def _handle_signals(self, signum, sigframe): 826 msg = "{} received a ".format(self.__class__.__name__) 827 if signum == signal.SIGINT: 828 msg += "SIGINT" 829 elif signum == signal.SIGTERM: 830 msg += "SIGTERM" 831 msg += ". Exiting" 832 log.debug(msg) 833 self.close() 834 sys.exit(salt.defaults.exitcodes.EX_OK) 835 836 837def _set_tcp_keepalive(zmq_socket, opts): 838 """ 839 Ensure that TCP keepalives are set as specified in "opts". 840 841 Warning: Failure to set TCP keepalives on the salt-master can result in 842 not detecting the loss of a minion when the connection is lost or when 843 its host has been terminated without first closing the socket. 844 Salt's Presence System depends on this connection status to know if a minion 845 is "present". 846 847 Warning: Failure to set TCP keepalives on minions can result in frequent or 848 unexpected disconnects! 849 """ 850 if hasattr(zmq, "TCP_KEEPALIVE") and opts: 851 if "tcp_keepalive" in opts: 852 zmq_socket.setsockopt(zmq.TCP_KEEPALIVE, opts["tcp_keepalive"]) 853 if "tcp_keepalive_idle" in opts: 854 zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_IDLE, opts["tcp_keepalive_idle"]) 855 if "tcp_keepalive_cnt" in opts: 856 zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_CNT, opts["tcp_keepalive_cnt"]) 857 if "tcp_keepalive_intvl" in opts: 858 zmq_socket.setsockopt(zmq.TCP_KEEPALIVE_INTVL, opts["tcp_keepalive_intvl"]) 859 860 861class ZeroMQPubServerChannel(salt.transport.server.PubServerChannel): 862 """ 863 Encapsulate synchronous operations for a publisher channel 864 """ 865 866 _sock_data = threading.local() 867 868 def __init__(self, opts): 869 self.opts = opts 870 self.ckminions = salt.utils.minions.CkMinions(self.opts) 871 872 def connect(self): 873 return salt.ext.tornado.gen.sleep(5) 874 875 def _publish_daemon(self, log_queue=None): 876 """ 877 Bind to the interface specified in the configuration file 878 """ 879 salt.utils.process.appendproctitle(self.__class__.__name__) 880 881 if self.opts["pub_server_niceness"] and not salt.utils.platform.is_windows(): 882 log.info( 883 "setting Publish daemon niceness to %i", 884 self.opts["pub_server_niceness"], 885 ) 886 os.nice(self.opts["pub_server_niceness"]) 887 888 if log_queue: 889 salt.log.setup.set_multiprocessing_logging_queue(log_queue) 890 salt.log.setup.setup_multiprocessing_logging(log_queue) 891 892 # Set up the context 893 context = zmq.Context(1) 894 # Prepare minion publish socket 895 pub_sock = context.socket(zmq.PUB) 896 _set_tcp_keepalive(pub_sock, self.opts) 897 # if 2.1 >= zmq < 3.0, we only have one HWM setting 898 try: 899 pub_sock.setsockopt(zmq.HWM, self.opts.get("pub_hwm", 1000)) 900 # in zmq >= 3.0, there are separate send and receive HWM settings 901 except AttributeError: 902 # Set the High Water Marks. For more information on HWM, see: 903 # http://api.zeromq.org/4-1:zmq-setsockopt 904 pub_sock.setsockopt(zmq.SNDHWM, self.opts.get("pub_hwm", 1000)) 905 pub_sock.setsockopt(zmq.RCVHWM, self.opts.get("pub_hwm", 1000)) 906 if self.opts["ipv6"] is True and hasattr(zmq, "IPV4ONLY"): 907 # IPv6 sockets work for both IPv6 and IPv4 addresses 908 pub_sock.setsockopt(zmq.IPV4ONLY, 0) 909 pub_sock.setsockopt(zmq.BACKLOG, self.opts.get("zmq_backlog", 1000)) 910 pub_sock.setsockopt(zmq.LINGER, -1) 911 pub_uri = "tcp://{interface}:{publish_port}".format(**self.opts) 912 # Prepare minion pull socket 913 pull_sock = context.socket(zmq.PULL) 914 pull_sock.setsockopt(zmq.LINGER, -1) 915 916 if self.opts.get("ipc_mode", "") == "tcp": 917 pull_uri = "tcp://127.0.0.1:{}".format( 918 self.opts.get("tcp_master_publish_pull", 4514) 919 ) 920 else: 921 pull_uri = "ipc://{}".format( 922 os.path.join(self.opts["sock_dir"], "publish_pull.ipc") 923 ) 924 salt.utils.zeromq.check_ipc_path_max_len(pull_uri) 925 926 # Start the minion command publisher 927 log.info("Starting the Salt Publisher on %s", pub_uri) 928 pub_sock.bind(pub_uri) 929 930 # Securely create socket 931 log.info("Starting the Salt Puller on %s", pull_uri) 932 with salt.utils.files.set_umask(0o177): 933 pull_sock.bind(pull_uri) 934 935 try: 936 while True: 937 # Catch and handle EINTR from when this process is sent 938 # SIGUSR1 gracefully so we don't choke and die horribly 939 try: 940 log.debug("Publish daemon getting data from puller %s", pull_uri) 941 package = pull_sock.recv() 942 package = salt.payload.loads(package) 943 package = self.pack_publish(package) 944 log.debug("Publish daemon received payload. size=%d", len(package)) 945 946 unpacked_package = salt.payload.unpackage(package) 947 unpacked_package = salt.transport.frame.decode_embedded_strs( 948 unpacked_package 949 ) 950 payload = unpacked_package["payload"] 951 log.trace("Accepted unpacked package from puller") 952 if self.opts["zmq_filtering"]: 953 # if you have a specific topic list, use that 954 if "topic_lst" in unpacked_package: 955 for topic in unpacked_package["topic_lst"]: 956 log.trace( 957 "Sending filtered data over publisher %s", pub_uri 958 ) 959 # zmq filters are substring match, hash the topic 960 # to avoid collisions 961 htopic = salt.utils.stringutils.to_bytes( 962 hashlib.sha1( 963 salt.utils.stringutils.to_bytes(topic) 964 ).hexdigest() 965 ) 966 pub_sock.send(htopic, flags=zmq.SNDMORE) 967 pub_sock.send(payload) 968 log.trace("Filtered data has been sent") 969 970 # Syndic broadcast 971 if self.opts.get("order_masters"): 972 log.trace("Sending filtered data to syndic") 973 pub_sock.send(b"syndic", flags=zmq.SNDMORE) 974 pub_sock.send(payload) 975 log.trace("Filtered data has been sent to syndic") 976 # otherwise its a broadcast 977 else: 978 # TODO: constants file for "broadcast" 979 log.trace( 980 "Sending broadcasted data over publisher %s", pub_uri 981 ) 982 pub_sock.send(b"broadcast", flags=zmq.SNDMORE) 983 pub_sock.send(payload) 984 log.trace("Broadcasted data has been sent") 985 else: 986 log.trace( 987 "Sending ZMQ-unfiltered data over publisher %s", pub_uri 988 ) 989 pub_sock.send(payload) 990 log.trace("Unfiltered data has been sent") 991 except zmq.ZMQError as exc: 992 if exc.errno == errno.EINTR: 993 continue 994 raise 995 996 except KeyboardInterrupt: 997 log.trace("Publish daemon caught Keyboard interupt, tearing down") 998 # Cleanly close the sockets if we're shutting down 999 if pub_sock.closed is False: 1000 pub_sock.close() 1001 if pull_sock.closed is False: 1002 pull_sock.close() 1003 if context.closed is False: 1004 context.term() 1005 1006 def pre_fork(self, process_manager, kwargs=None): 1007 """ 1008 Do anything necessary pre-fork. Since this is on the master side this will 1009 primarily be used to create IPC channels and create our daemon process to 1010 do the actual publishing 1011 1012 :param func process_manager: A ProcessManager, from salt.utils.process.ProcessManager 1013 """ 1014 process_manager.add_process(self._publish_daemon, kwargs=kwargs) 1015 1016 @property 1017 def pub_sock(self): 1018 """ 1019 This thread's zmq publisher socket. This socket is stored on the class 1020 so that multiple instantiations in the same thread will re-use a single 1021 zmq socket. 1022 """ 1023 try: 1024 return self._sock_data.sock 1025 except AttributeError: 1026 pass 1027 1028 def pub_connect(self): 1029 """ 1030 Create and connect this thread's zmq socket. If a publisher socket 1031 already exists "pub_close" is called before creating and connecting a 1032 new socket. 1033 """ 1034 if self.pub_sock: 1035 self.pub_close() 1036 self._sock_data._ctx = zmq.Context() 1037 self._sock_data.sock = self._sock_data._ctx.socket(zmq.PUSH) 1038 self.pub_sock.setsockopt(zmq.LINGER, -1) 1039 if self.opts.get("ipc_mode", "") == "tcp": 1040 pull_uri = "tcp://127.0.0.1:{}".format( 1041 self.opts.get("tcp_master_publish_pull", 4514) 1042 ) 1043 else: 1044 pull_uri = "ipc://{}".format( 1045 os.path.join(self.opts["sock_dir"], "publish_pull.ipc") 1046 ) 1047 log.debug("Connecting to pub server: %s", pull_uri) 1048 self.pub_sock.connect(pull_uri) 1049 return self._sock_data.sock 1050 1051 def pub_close(self): 1052 """ 1053 Disconnect an existing publisher socket and remove it from the local 1054 thread's cache. 1055 """ 1056 if hasattr(self._sock_data, "sock"): 1057 self._sock_data.sock.close() 1058 delattr(self._sock_data, "sock") 1059 if hasattr(self._sock_data, "_ctx"): 1060 self._sock_data._ctx.destroy() 1061 1062 def pack_publish(self, load): 1063 payload = {"enc": "aes"} 1064 load["serial"] = salt.master.SMaster.get_serial() 1065 crypticle = salt.crypt.Crypticle( 1066 self.opts, salt.master.SMaster.secrets["aes"]["secret"].value 1067 ) 1068 payload["load"] = crypticle.dumps(load) 1069 if self.opts["sign_pub_messages"]: 1070 master_pem_path = os.path.join(self.opts["pki_dir"], "master.pem") 1071 log.debug("Signing data packet") 1072 payload["sig"] = salt.crypt.sign_message(master_pem_path, payload["load"]) 1073 int_payload = {"payload": salt.payload.dumps(payload)} 1074 1075 # add some targeting stuff for lists only (for now) 1076 if load["tgt_type"] == "list": 1077 int_payload["topic_lst"] = load["tgt"] 1078 1079 # If zmq_filtering is enabled, target matching has to happen master side 1080 match_targets = ["pcre", "glob", "list"] 1081 if self.opts["zmq_filtering"] and load["tgt_type"] in match_targets: 1082 # Fetch a list of minions that match 1083 _res = self.ckminions.check_minions(load["tgt"], tgt_type=load["tgt_type"]) 1084 match_ids = _res["minions"] 1085 1086 log.debug("Publish Side Match: %s", match_ids) 1087 # Send list of miions thru so zmq can target them 1088 int_payload["topic_lst"] = match_ids 1089 payload = salt.payload.dumps(int_payload) 1090 log.debug( 1091 "Sending payload to publish daemon. jid=%s size=%d", 1092 load.get("jid", None), 1093 len(payload), 1094 ) 1095 return payload 1096 1097 def publish(self, load): 1098 """ 1099 Publish "load" to minions. This send the load to the publisher daemon 1100 process with does the actual sending to minions. 1101 1102 :param dict load: A load to be sent across the wire to minions 1103 """ 1104 if not self.pub_sock: 1105 self.pub_connect() 1106 self.pub_sock.send(salt.payload.dumps(load)) 1107 log.debug("Sent payload to publish daemon.") 1108 1109 1110class AsyncReqMessageClientPool(salt.transport.MessageClientPool): 1111 """ 1112 Wrapper class of AsyncReqMessageClientPool to avoid blocking waiting while writing data to socket. 1113 """ 1114 1115 def __init__(self, opts, args=None, kwargs=None): 1116 self._closing = False 1117 super().__init__(AsyncReqMessageClient, opts, args=args, kwargs=kwargs) 1118 1119 def close(self): 1120 if self._closing: 1121 return 1122 1123 self._closing = True 1124 for message_client in self.message_clients: 1125 message_client.close() 1126 self.message_clients = [] 1127 1128 def send(self, *args, **kwargs): 1129 message_clients = sorted(self.message_clients, key=lambda x: len(x.send_queue)) 1130 return message_clients[0].send(*args, **kwargs) 1131 1132 def __enter__(self): 1133 return self 1134 1135 def __exit__(self, *args): 1136 self.close() 1137 1138 1139# TODO: unit tests! 1140class AsyncReqMessageClient: 1141 """ 1142 This class wraps the underlying zeromq REQ socket and gives a future-based 1143 interface to sending and recieving messages. This works around the primary 1144 limitation of serialized send/recv on the underlying socket by queueing the 1145 message sends in this class. In the future if we decide to attempt to multiplex 1146 we can manage a pool of REQ/REP sockets-- but for now we'll just do them in serial 1147 """ 1148 1149 def __init__(self, opts, addr, linger=0, io_loop=None): 1150 """ 1151 Create an asynchronous message client 1152 1153 :param dict opts: The salt opts dictionary 1154 :param str addr: The interface IP address to bind to 1155 :param int linger: The number of seconds to linger on a ZMQ socket. See 1156 http://api.zeromq.org/2-1:zmq-setsockopt [ZMQ_LINGER] 1157 :param IOLoop io_loop: A Tornado IOLoop event scheduler [tornado.ioloop.IOLoop] 1158 """ 1159 self.opts = opts 1160 self.addr = addr 1161 self.linger = linger 1162 if io_loop is None: 1163 self.io_loop = salt.ext.tornado.ioloop.IOLoop.current() 1164 else: 1165 self.io_loop = io_loop 1166 1167 self.context = zmq.Context() 1168 1169 # wire up sockets 1170 self._init_socket() 1171 1172 self.send_queue = [] 1173 # mapping of message -> future 1174 self.send_future_map = {} 1175 1176 self.send_timeout_map = {} # message -> timeout 1177 self._closing = False 1178 1179 # TODO: timeout all in-flight sessions, or error 1180 def close(self): 1181 try: 1182 if self._closing: 1183 return 1184 except AttributeError: 1185 # We must have been called from __del__ 1186 # The python interpreter has nuked most attributes already 1187 return 1188 else: 1189 self._closing = True 1190 if hasattr(self, "stream") and self.stream is not None: 1191 if ZMQ_VERSION_INFO < (14, 3, 0): 1192 # stream.close() doesn't work properly on pyzmq < 14.3.0 1193 if self.stream.socket: 1194 self.stream.socket.close() 1195 self.stream.io_loop.remove_handler(self.stream.socket) 1196 # set this to None, more hacks for messed up pyzmq 1197 self.stream.socket = None 1198 self.socket.close() 1199 else: 1200 self.stream.close() 1201 self.socket = None 1202 self.stream = None 1203 if self.context.closed is False: 1204 self.context.term() 1205 1206 # pylint: disable=W1701 1207 def __del__(self): 1208 self.close() 1209 1210 # pylint: enable=W1701 1211 1212 def _init_socket(self): 1213 if hasattr(self, "stream"): 1214 self.stream.close() # pylint: disable=E0203 1215 self.socket.close() # pylint: disable=E0203 1216 del self.stream 1217 del self.socket 1218 1219 self.socket = self.context.socket(zmq.REQ) 1220 1221 # socket options 1222 if hasattr(zmq, "RECONNECT_IVL_MAX"): 1223 self.socket.setsockopt(zmq.RECONNECT_IVL_MAX, 5000) 1224 1225 _set_tcp_keepalive(self.socket, self.opts) 1226 if self.addr.startswith("tcp://["): 1227 # Hint PF type if bracket enclosed IPv6 address 1228 if hasattr(zmq, "IPV6"): 1229 self.socket.setsockopt(zmq.IPV6, 1) 1230 elif hasattr(zmq, "IPV4ONLY"): 1231 self.socket.setsockopt(zmq.IPV4ONLY, 0) 1232 self.socket.linger = self.linger 1233 log.debug("Trying to connect to: %s", self.addr) 1234 self.socket.connect(self.addr) 1235 self.stream = zmq.eventloop.zmqstream.ZMQStream( 1236 self.socket, io_loop=self.io_loop 1237 ) 1238 1239 @salt.ext.tornado.gen.coroutine 1240 def _internal_send_recv(self): 1241 while len(self.send_queue) > 0: 1242 message = self.send_queue[0] 1243 future = self.send_future_map.get(message, None) 1244 if future is None: 1245 # Timedout 1246 del self.send_queue[0] 1247 continue 1248 1249 # send 1250 def mark_future(msg): 1251 if not future.done(): 1252 data = salt.payload.loads(msg[0]) 1253 future.set_result(data) 1254 1255 self.stream.on_recv(mark_future) 1256 self.stream.send(message) 1257 1258 try: 1259 ret = yield future 1260 except Exception as err: # pylint: disable=broad-except 1261 log.debug("Re-init ZMQ socket: %s", err) 1262 self._init_socket() # re-init the zmq socket (no other way in zmq) 1263 del self.send_queue[0] 1264 continue 1265 del self.send_queue[0] 1266 self.send_future_map.pop(message, None) 1267 self.remove_message_timeout(message) 1268 1269 def remove_message_timeout(self, message): 1270 if message not in self.send_timeout_map: 1271 return 1272 timeout = self.send_timeout_map.pop(message, None) 1273 if timeout is not None: 1274 # Hasn't been already timedout 1275 self.io_loop.remove_timeout(timeout) 1276 1277 def timeout_message(self, message): 1278 """ 1279 Handle a message timeout by removing it from the sending queue 1280 and informing the caller 1281 1282 :raises: SaltReqTimeoutError 1283 """ 1284 future = self.send_future_map.pop(message, None) 1285 # In a race condition the message might have been sent by the time 1286 # we're timing it out. Make sure the future is not None 1287 if future is not None: 1288 del self.send_timeout_map[message] 1289 if future.attempts < future.tries: 1290 future.attempts += 1 1291 log.debug( 1292 "SaltReqTimeoutError, retrying. (%s/%s)", 1293 future.attempts, 1294 future.tries, 1295 ) 1296 self.send( 1297 message, 1298 timeout=future.timeout, 1299 tries=future.tries, 1300 future=future, 1301 ) 1302 1303 else: 1304 future.set_exception(SaltReqTimeoutError("Message timed out")) 1305 1306 def send( 1307 self, message, timeout=None, tries=3, future=None, callback=None, raw=False 1308 ): 1309 """ 1310 Return a future which will be completed when the message has a response 1311 """ 1312 if future is None: 1313 future = salt.ext.tornado.concurrent.Future() 1314 future.tries = tries 1315 future.attempts = 0 1316 future.timeout = timeout 1317 # if a future wasn't passed in, we need to serialize the message 1318 message = salt.payload.dumps(message) 1319 if callback is not None: 1320 1321 def handle_future(future): 1322 response = future.result() 1323 self.io_loop.add_callback(callback, response) 1324 1325 future.add_done_callback(handle_future) 1326 # Add this future to the mapping 1327 self.send_future_map[message] = future 1328 1329 if self.opts.get("detect_mode") is True: 1330 timeout = 1 1331 1332 if timeout is not None: 1333 send_timeout = self.io_loop.call_later( 1334 timeout, self.timeout_message, message 1335 ) 1336 self.send_timeout_map[message] = send_timeout 1337 1338 if len(self.send_queue) == 0: 1339 self.io_loop.spawn_callback(self._internal_send_recv) 1340 1341 self.send_queue.append(message) 1342 1343 return future 1344 1345 1346class ZeroMQSocketMonitor: 1347 __EVENT_MAP = None 1348 1349 def __init__(self, socket): 1350 """ 1351 Create ZMQ monitor sockets 1352 1353 More information: 1354 http://api.zeromq.org/4-0:zmq-socket-monitor 1355 """ 1356 self._socket = socket 1357 self._monitor_socket = self._socket.get_monitor_socket() 1358 self._monitor_stream = None 1359 1360 def start_io_loop(self, io_loop): 1361 log.trace("Event monitor start!") 1362 self._monitor_stream = zmq.eventloop.zmqstream.ZMQStream( 1363 self._monitor_socket, io_loop=io_loop 1364 ) 1365 self._monitor_stream.on_recv(self.monitor_callback) 1366 1367 def start_poll(self): 1368 log.trace("Event monitor start!") 1369 try: 1370 while self._monitor_socket is not None and self._monitor_socket.poll(): 1371 msg = self._monitor_socket.recv_multipart() 1372 self.monitor_callback(msg) 1373 except (AttributeError, zmq.error.ContextTerminated): 1374 # We cannot log here because we'll get an interrupted system call in trying 1375 # to flush the logging buffer as we terminate 1376 pass 1377 1378 @property 1379 def event_map(self): 1380 if ZeroMQSocketMonitor.__EVENT_MAP is None: 1381 event_map = {} 1382 for name in dir(zmq): 1383 if name.startswith("EVENT_"): 1384 value = getattr(zmq, name) 1385 event_map[value] = name 1386 ZeroMQSocketMonitor.__EVENT_MAP = event_map 1387 return ZeroMQSocketMonitor.__EVENT_MAP 1388 1389 def monitor_callback(self, msg): 1390 evt = zmq.utils.monitor.parse_monitor_message(msg) 1391 evt["description"] = self.event_map[evt["event"]] 1392 log.debug("ZeroMQ event: %s", evt) 1393 if evt["event"] == zmq.EVENT_MONITOR_STOPPED: 1394 self.stop() 1395 1396 def stop(self): 1397 if self._socket is None: 1398 return 1399 self._socket.disable_monitor() 1400 self._socket = None 1401 self._monitor_socket = None 1402 if self._monitor_stream is not None: 1403 self._monitor_stream.close() 1404 self._monitor_stream = None 1405 log.trace("Event monitor done!") 1406